172 lines
6.6 KiB
JavaScript
172 lines
6.6 KiB
JavaScript
/**
|
||
* wetter.js – Hauptprogramm
|
||
*
|
||
* Ablauf beim Start:
|
||
* 1. SQLite-Datenbank öffnen (wetter.db)
|
||
* 2. Letzten archivierten Zeitstempel lesen → Archiv nachladen
|
||
* 3. Archivdaten in DB schreiben
|
||
* 4. LOOP-Schleife starten: alle 30 s Echtzeit-Daten holen & in DB schreiben
|
||
*/
|
||
|
||
import "dotenv/config";
|
||
import path from "path";
|
||
import { fileURLToPath } from "url";
|
||
|
||
import { openDb, getLatestTs, insertRecords, insertRecord } from "./db.js";
|
||
import { readArchiveSince, connectStation, fetchLoopData } from "./davis.js";
|
||
|
||
const __dirname = path.dirname(fileURLToPath(import.meta.url));
|
||
const DB_PATH = process.env.DB_PATH ?? path.join(__dirname, "wetter.db");
|
||
const LOOP_INTERVAL_MS = Number(process.env.LOOP_INTERVAL_MS ?? 30_000);
|
||
const DB_INTERVAL_MS = 5 * 60 * 1000; // alle 5 min in DB schreiben
|
||
const POST_URL = process.env.POST_URL ?? null;
|
||
const COLLECTOR_API_KEY = process.env.COLLECTOR_API_KEY;
|
||
|
||
// ── Hilfsfunktionen ────────────────────────────────────────────────────────
|
||
|
||
const fmt24h = (d) => d.toLocaleTimeString("de-DE", { hour12: false });
|
||
const fmtDateTime = (d) => d.toLocaleString("de-DE", { hour12: false });
|
||
function log(msg) { console.log (`[${fmt24h(new Date())}] ${msg}`); }
|
||
function warn(msg) { console.warn(`[${fmt24h(new Date())}] WARN ${msg}`); }
|
||
function err(msg) { console.error(`[${fmt24h(new Date())}] ERROR ${msg}`); }
|
||
|
||
function postData(data) {
|
||
if (!POST_URL) return;
|
||
const headers = { "Content-Type": "application/json" };
|
||
if (COLLECTOR_API_KEY) headers["X-API-Key"] = COLLECTOR_API_KEY;
|
||
fetch(POST_URL, { method: "POST", headers, body: JSON.stringify(data) })
|
||
.then(async res => {
|
||
if (!res.ok) {
|
||
const text = await res.text().catch(() => "");
|
||
warn(`POST ${res.status} ${res.statusText}: ${text.slice(0, 200)}`);
|
||
}
|
||
})
|
||
.catch(e => warn("POST fehlgeschlagen: " + e.message));
|
||
}
|
||
|
||
// ── 5-Minuten-Aggregation ──────────────────────────────────────────────────
|
||
|
||
function aggregateBuffer(buf) {
|
||
const avg = (key) => {
|
||
const vals = buf.map(r => r[key]).filter(v => v !== null && v !== undefined);
|
||
if (!vals.length) return null;
|
||
return +( vals.reduce((a, b) => a + b, 0) / vals.length ).toFixed(1);
|
||
};
|
||
const peak = (key) => {
|
||
const vals = buf.map(r => r[key]).filter(v => v !== null && v !== undefined);
|
||
return vals.length ? +Math.max(...vals).toFixed(1) : null;
|
||
};
|
||
const last = (key) => {
|
||
for (let i = buf.length - 1; i >= 0; i--)
|
||
if (buf[i][key] !== null && buf[i][key] !== undefined) return buf[i][key];
|
||
return null;
|
||
};
|
||
return {
|
||
time: buf[buf.length - 1].time,
|
||
tempOut: avg('tempOut'),
|
||
tempIn: avg('tempIn'),
|
||
humOut: avg('humOut'),
|
||
humIn: avg('humIn'),
|
||
windAvg: avg('windAvg'),
|
||
windGust: peak('windGust'), // Spitzenwert der Periode
|
||
windDir: avg('windDir'),
|
||
pressure: avg('pressure'),
|
||
barTrend: last('barTrend'),
|
||
forecast: last('forecast'),
|
||
rain: last('rain'),
|
||
rainRate: avg('rainRate'),
|
||
};
|
||
}
|
||
|
||
// ── Archiv nachladen ───────────────────────────────────────────────────────
|
||
|
||
async function catchUpArchive(db) {
|
||
const latestTs = getLatestTs(db);
|
||
const since = latestTs
|
||
? new Date(latestTs * 1000) // ab letztem DB-Eintrag
|
||
: new Date(Date.now() - 24 * 60 * 60 * 1000); // Fallback: letzte 24 h
|
||
|
||
log(`Lade Archiv ab ${fmtDateTime(since)} ...`);
|
||
|
||
let lastPct = -1;
|
||
const records = await readArchiveSince(since, (cur, total) => {
|
||
const pct = Math.floor(cur / total * 100);
|
||
if (pct !== lastPct) {
|
||
process.stdout.write(`\r Archiv: ${pct}% (Seite ${cur}/${total})`);
|
||
lastPct = pct;
|
||
}
|
||
});
|
||
process.stdout.write("\r\x1b[K"); // Fortschrittszeile löschen
|
||
|
||
if (records.length === 0) {
|
||
log("Archiv: keine neuen Datensätze.");
|
||
return;
|
||
}
|
||
|
||
const inserted = insertRecords(db, records, "archive");
|
||
log(`Archiv: ${inserted} neue Datensätze gespeichert (${records.length} empfangen).`);
|
||
|
||
for (const r of records) postData(r);
|
||
}
|
||
|
||
// ── LOOP-Schleife ──────────────────────────────────────────────────────────
|
||
|
||
async function runLoop(db) {
|
||
let station = null;
|
||
const buffer = [];
|
||
let lastDbWrite = Date.now();
|
||
|
||
async function connect() {
|
||
station = await connectStation();
|
||
log("Verbunden mit Davis-Console.");
|
||
}
|
||
|
||
async function tick() {
|
||
try {
|
||
const data = await fetchLoopData(station);
|
||
buffer.push(data);
|
||
|
||
postData(data);
|
||
|
||
const now = Date.now();
|
||
if (now - lastDbWrite >= DB_INTERVAL_MS) {
|
||
const n = buffer.length;
|
||
const agg = aggregateBuffer(buffer);
|
||
buffer.length = 0;
|
||
lastDbWrite = now;
|
||
insertRecord(db, agg, "loop");
|
||
log(`DB: 5-min-Mittel gespeichert (${n} Messwerte)`);
|
||
}
|
||
|
||
log(
|
||
`Außen: ${data.tempOut?.toFixed(1)}°C ` +
|
||
`InnenT: ${data.tempIn?.toFixed(1)}°C ` +
|
||
`Feuchte: ${data.humOut}% ` +
|
||
`InnenH: ${data.humIn}% ` +
|
||
`Wind: ${data.windAvg} km/h ` +
|
||
`WindRichtung: ${data.windDir}° ` +
|
||
`Druck: ${data.pressure} hPa ` +
|
||
`Trend: ${data.barTrend ?? "n/a"} ` +
|
||
`Forecast: ${data.forecast}`
|
||
);
|
||
} catch (e) {
|
||
warn("LOOP-Fehler: " + e.message + " – Verbindung wird neu aufgebaut.");
|
||
try { await station?.disconnect(); } catch {}
|
||
station = null;
|
||
try { await connect(); } catch (ce) { err("Reconnect fehlgeschlagen: " + ce.message); }
|
||
}
|
||
setTimeout(tick, LOOP_INTERVAL_MS);
|
||
}
|
||
|
||
await connect();
|
||
tick();
|
||
}
|
||
|
||
// ── Hauptprogramm ──────────────────────────────────────────────────────────
|
||
|
||
const db = openDb(DB_PATH);
|
||
log(`Datenbank: ${DB_PATH}`);
|
||
|
||
await catchUpArchive(db);
|
||
await runLoop(db);
|