/** * wetter.js – Hauptprogramm * * Ablauf beim Start: * 1. SQLite-Datenbank öffnen (wetter.db) * 2. Letzten archivierten Zeitstempel lesen → Archiv nachladen * 3. Archivdaten in DB schreiben * 4. LOOP-Schleife starten: alle 30 s Echtzeit-Daten holen & in DB schreiben */ import "dotenv/config"; import path from "path"; import { fileURLToPath } from "url"; import { openDb, getLatestTs, insertRecords, insertRecord } from "./db.js"; import { readArchiveSince, connectStation, fetchLoopData } from "./davis.js"; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const DB_PATH = process.env.DB_PATH ?? path.join(__dirname, "wetter.db"); const LOOP_INTERVAL_MS = Number(process.env.LOOP_INTERVAL_MS ?? 30_000); const DB_INTERVAL_MS = 5 * 60 * 1000; // alle 5 min in DB schreiben const POST_URL = process.env.POST_URL ?? null; const COLLECTOR_API_KEY = process.env.COLLECTOR_API_KEY; // ── Hilfsfunktionen ──────────────────────────────────────────────────────── const fmt24h = (d) => d.toLocaleTimeString("de-DE", { hour12: false }); const fmtDateTime = (d) => d.toLocaleString("de-DE", { hour12: false }); function log(msg) { console.log (`[${fmt24h(new Date())}] ${msg}`); } function warn(msg) { console.warn(`[${fmt24h(new Date())}] WARN ${msg}`); } function err(msg) { console.error(`[${fmt24h(new Date())}] ERROR ${msg}`); } function postData(data) { if (!POST_URL) return; const headers = { "Content-Type": "application/json" }; if (COLLECTOR_API_KEY) headers["X-API-Key"] = COLLECTOR_API_KEY; fetch(POST_URL, { method: "POST", headers, body: JSON.stringify(data) }) .then(async res => { if (!res.ok) { const text = await res.text().catch(() => ""); warn(`POST ${res.status} ${res.statusText}: ${text.slice(0, 200)}`); } }) .catch(e => warn("POST fehlgeschlagen: " + e.message)); } // ── 5-Minuten-Aggregation ────────────────────────────────────────────────── function aggregateBuffer(buf) { const avg = (key) => { const vals = buf.map(r => r[key]).filter(v => v !== null && v !== undefined); if (!vals.length) return null; return +( vals.reduce((a, b) => a + b, 0) / vals.length ).toFixed(1); }; const peak = (key) => { const vals = buf.map(r => r[key]).filter(v => v !== null && v !== undefined); return vals.length ? +Math.max(...vals).toFixed(1) : null; }; const last = (key) => { for (let i = buf.length - 1; i >= 0; i--) if (buf[i][key] !== null && buf[i][key] !== undefined) return buf[i][key]; return null; }; return { time: buf[buf.length - 1].time, tempOut: avg('tempOut'), tempIn: avg('tempIn'), humOut: avg('humOut'), humIn: avg('humIn'), windAvg: avg('windAvg'), windGust: peak('windGust'), // Spitzenwert der Periode windDir: avg('windDir'), pressure: avg('pressure'), barTrend: last('barTrend'), forecast: last('forecast'), rain: last('rain'), rainRate: avg('rainRate'), }; } // ── Archiv nachladen ─────────────────────────────────────────────────────── async function catchUpArchive(db) { const latestTs = getLatestTs(db); const since = latestTs ? new Date(latestTs * 1000) // ab letztem DB-Eintrag : new Date(Date.now() - 24 * 60 * 60 * 1000); // Fallback: letzte 24 h log(`Lade Archiv ab ${fmtDateTime(since)} ...`); let lastPct = -1; const records = await readArchiveSince(since, (cur, total) => { const pct = Math.floor(cur / total * 100); if (pct !== lastPct) { process.stdout.write(`\r Archiv: ${pct}% (Seite ${cur}/${total})`); lastPct = pct; } }); process.stdout.write("\r\x1b[K"); // Fortschrittszeile löschen if (records.length === 0) { log("Archiv: keine neuen Datensätze."); return; } const inserted = insertRecords(db, records, "archive"); log(`Archiv: ${inserted} neue Datensätze gespeichert (${records.length} empfangen).`); for (const r of records) postData(r); } // ── LOOP-Schleife ────────────────────────────────────────────────────────── async function runLoop(db) { let station = null; const buffer = []; let lastDbWrite = Date.now(); async function connect() { station = await connectStation(); log("Verbunden mit Davis-Console."); } async function tick() { try { const data = await fetchLoopData(station); buffer.push(data); postData(data); const now = Date.now(); if (now - lastDbWrite >= DB_INTERVAL_MS) { const n = buffer.length; const agg = aggregateBuffer(buffer); buffer.length = 0; lastDbWrite = now; insertRecord(db, agg, "loop"); log(`DB: 5-min-Mittel gespeichert (${n} Messwerte)`); } log( `Außen: ${data.tempOut?.toFixed(1)}°C ` + `InnenT: ${data.tempIn?.toFixed(1)}°C ` + `Feuchte: ${data.humOut}% ` + `InnenH: ${data.humIn}% ` + `Wind: ${data.windAvg} km/h ` + `WindRichtung: ${data.windDir}° ` + `Druck: ${data.pressure} hPa ` + `Trend: ${data.barTrend ?? "n/a"} ` + `Forecast: ${data.forecast}` ); } catch (e) { warn("LOOP-Fehler: " + e.message + " – Verbindung wird neu aufgebaut."); try { await station?.disconnect(); } catch {} station = null; try { await connect(); } catch (ce) { err("Reconnect fehlgeschlagen: " + ce.message); } } setTimeout(tick, LOOP_INTERVAL_MS); } await connect(); tick(); } // ── Hauptprogramm ────────────────────────────────────────────────────────── const db = openDb(DB_PATH); log(`Datenbank: ${DB_PATH}`); await catchUpArchive(db); await runLoop(db);