#!/usr/bin/env python3 """ Wetterstation - MQTT Datenempfang und Web-Visualisierung """ import sqlite3 import json import threading from datetime import datetime, timedelta from flask import Flask, render_template, jsonify import paho.mqtt.client as mqtt # Konfiguration MQTT_HOST = "rexfue.de" MQTT_PORT = 1883 MQTT_TOPIC = "vantage/live" # Bitte anpassen! MQTT_USER = "stzuhr" # Bitte anpassen! MQTT_PASSWORD = "74chQCYb" # Bitte anpassen! DB_FILE = "wetterdaten.db" app = Flask(__name__) class WetterDB: """Klasse für Datenbankoperationen""" def __init__(self, db_file): self.db_file = db_file self.init_db() def init_db(self): """Datenbank initialisieren""" conn = sqlite3.connect(self.db_file) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS wetterdaten ( id INTEGER PRIMARY KEY AUTOINCREMENT, datetime TEXT NOT NULL, pressure REAL, wind_gust REAL, wind_speed REAL, wind_dir REAL, rain_rate REAL, rain REAL, humidity INTEGER, temperature REAL ) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_datetime ON wetterdaten(datetime) ''') conn.commit() conn.close() def save_data(self, data): """Wetterdaten speichern""" conn = sqlite3.connect(self.db_file) cursor = conn.cursor() cursor.execute(''' INSERT INTO wetterdaten (datetime, pressure, wind_gust, wind_speed, wind_dir, rain_rate, rain, humidity, temperature) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( data['datetime'], data.get('pressure'), data.get('wind_gust'), data.get('wind_speed'), data.get('wind_dir'), data.get('rain_rate'), data.get('rain'), data.get('humidity'), data.get('temperature') )) conn.commit() conn.close() print(f"Daten gespeichert: {data['datetime']}") def get_data(self, hours=24): """Daten der letzten X Stunden abrufen""" conn = sqlite3.connect(self.db_file) conn.row_factory = sqlite3.Row cursor = conn.cursor() time_threshold = (datetime.now() - timedelta(hours=hours)).strftime('%Y-%m-%d %H:%M:%S') cursor.execute(''' SELECT * FROM wetterdaten WHERE datetime >= ? ORDER BY datetime ASC ''', (time_threshold,)) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def get_hourly_rain(self, hours=24): """Regenmenge pro Stunde berechnen""" conn = sqlite3.connect(self.db_file) cursor = conn.cursor() time_threshold = (datetime.now() - timedelta(hours=hours)).strftime('%Y-%m-%d %H:%M:%S') cursor.execute(''' SELECT strftime('%Y-%m-%d %H:00:00', datetime) as hour, SUM(rain_rate) as total_rain FROM wetterdaten WHERE datetime >= ? GROUP BY hour ORDER BY hour ASC ''', (time_threshold,)) rows = cursor.fetchall() conn.close() return [{'hour': row[0], 'rain': row[1] or 0} for row in rows] # Globale Datenbankinstanz db = WetterDB(DB_FILE) class MQTTClient: """MQTT Client für Datenempfang""" def __init__(self): self.client = mqtt.Client() self.client.username_pw_set(MQTT_USER, MQTT_PASSWORD) # Stabilere Verbindungen bei Abbrüchen try: self.client.reconnect_delay_set(min_delay=1, max_delay=120) except Exception: pass # Optionale Protokollierung (hilfreich beim Debuggen) try: self.client.enable_logger() except Exception: pass self.client.on_connect = self.on_connect self.client.on_message = self.on_message @staticmethod def _sanitize_data(payload: dict) -> dict: """Payload robust in erwartetes Format wandeln. - Fehlende `datetime` wird mit aktueller Zeit ergänzt - Felder in richtige Typen konvertieren """ def to_float(x): try: return float(x) if x is not None else None except Exception: return None def to_int(x): try: return int(x) if x is not None else None except Exception: return None # Zeitstempel: wenn vorhanden, in akzeptables Format bringen, sonst jetzt dt = payload.get('datetime') if isinstance(dt, (int, float)): # Epoch -> ISO dt = datetime.fromtimestamp(dt).strftime('%Y-%m-%d %H:%M:%S') elif isinstance(dt, str): # Versuchen, ISO-Varianten in 'YYYY-MM-DD HH:MM:SS' zu überführen # Entferne ggf. 'T' oder 'Z' dt_clean = dt.replace('T', ' ').replace('Z', '').strip() # Falls Millisekunden enthalten, abschneiden if '.' in dt_clean: dt_clean = dt_clean.split('.')[0] # Bei zu kurzem String: fallback auf jetzt if len(dt_clean) < 16: dt = datetime.now().strftime('%Y-%m-%d %H:%M:%S') else: dt = dt_clean else: dt = datetime.now().strftime('%Y-%m-%d %H:%M:%S') return { 'datetime': dt, 'pressure': to_float(payload.get('pressure')), 'wind_gust': to_float(payload.get('wind_gust')), 'wind_speed': to_float(payload.get('wind_speed')), 'wind_dir': to_float(payload.get('wind_dir')), 'rain_rate': to_float(payload.get('rain_rate')), 'rain': to_float(payload.get('rain')), 'humidity': to_int(payload.get('humidity')), 'temperature': to_float(payload.get('temperature')), } def on_connect(self, client, userdata, flags, rc): """Callback bei Verbindung""" if rc == 0: print(f"Mit MQTT Broker verbunden: {MQTT_HOST}") client.subscribe(MQTT_TOPIC) print(f"Topic abonniert: {MQTT_TOPIC}") else: print(f"Verbindungsfehler: {rc}") def on_message(self, client, userdata, msg): """Callback bei empfangener Nachricht""" try: raw = json.loads(msg.payload.decode()) data = self._sanitize_data(raw) print(f"Empfangen und gespeichert: {data}") db.save_data(data) except Exception as e: print(f"Fehler beim Verarbeiten der Nachricht: {e}") def start(self): """MQTT Client starten""" try: self.client.connect(MQTT_HOST, MQTT_PORT, 60) self.client.loop_start() print("MQTT Client gestartet") except Exception as e: print(f"MQTT Verbindungsfehler: {e}") # Flask Routes @app.route('/') def index(): """Hauptseite""" return render_template('index.html') @app.route('/api/data/') def get_data(period): """API Endpoint für Wetterdaten""" hours = 24 if period == 'day' else 168 # 168h = 1 Woche data = db.get_data(hours) rain_data = db.get_hourly_rain(hours) return jsonify({ 'data': data, 'rain_hourly': rain_data }) def main(): """Hauptprogramm""" print("Wetterstation wird gestartet...") # MQTT Client starten mqtt_client = MQTTClient() mqtt_client.start() # Flask Server starten print("\nWeb-Interface verfügbar unter: http://localhost:5003") print("Drücke CTRL+C zum Beenden\n") app.run(host='0.0.0.0', port=5003, debug=False) if __name__ == '__main__': main()