# MQTT subscriber that reads weather data and stores in PostgreSQL import os import json import logging import ssl from datetime import datetime from pathlib import Path from dotenv import load_dotenv import paho.mqtt.client as mqtt import psycopg2 from psycopg2.extras import RealDictCursor # Logging konfigurieren logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Umgebungsvariablen laden - eine Ebene höher env_path = Path(__file__).parent.parent / '.env' load_dotenv(dotenv_path=env_path) # Konfiguration MQTT_BROKER = os.getenv('MQTT_BROKER', 'rexfue.de') MQTT_PORT = int(os.getenv('MQTT_PORT', 1883)) MQTT_USERNAME = os.getenv('MQTT_USERNAME') MQTT_PASSWORD = os.getenv('MQTT_PASSWORD') MQTT_TOPIC = os.getenv('MQTT_TOPIC', 'vantage/live') DB_HOST = os.getenv('DB_HOST', 'localhost') DB_PORT = int(os.getenv('DB_PORT', 5432)) DB_NAME = os.getenv('DB_NAME', 'wetterstation') DB_USER = os.getenv('DB_USER') DB_PASSWORD = os.getenv('DB_PASSWORD') class WeatherDataCollector: """Klasse zum Sammeln und Speichern von Wetterdaten aus MQTT in PostgreSQL""" def __init__(self): self.db_conn = None self.mqtt_client = None self.setup_database() self.setup_mqtt() def setup_database(self): """Datenbankverbindung herstellen und Tabelle erstellen""" try: self.db_conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD ) logger.info("Datenbankverbindung hergestellt") # Tabelle erstellen falls nicht vorhanden with self.db_conn.cursor() as cursor: cursor.execute(""" CREATE TABLE IF NOT EXISTS weather_data ( id SERIAL PRIMARY KEY, datetime TIMESTAMP NOT NULL, temperature FLOAT, humidity INTEGER, pressure FLOAT, wind_speed FLOAT, wind_gust FLOAT, wind_dir FLOAT, rain FLOAT, rain_rate FLOAT, received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(datetime) ) """) self.db_conn.commit() logger.info("Tabelle weather_data bereit") except Exception as e: logger.error(f"Fehler bei Datenbankverbindung: {e}") raise def setup_mqtt(self): """MQTT Client konfigurieren""" self.mqtt_client = mqtt.Client() self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) # Callbacks setzen self.mqtt_client.on_connect = self.on_connect self.mqtt_client.on_message = self.on_message self.mqtt_client.on_disconnect = self.on_disconnect logger.info(f"MQTT Client konfiguriert für {MQTT_BROKER}:{MQTT_PORT}") def on_connect(self, client, userdata, flags, rc): """Callback wenn MQTT Verbindung hergestellt wird""" if rc == 0: logger.info("Mit MQTT Broker verbunden") client.subscribe(MQTT_TOPIC) logger.info(f"Topic abonniert: {MQTT_TOPIC}") else: logger.error(f"Verbindung fehlgeschlagen mit Code {rc}") def on_disconnect(self, client, userdata, rc): """Callback wenn MQTT Verbindung getrennt wird""" if rc != 0: logger.warning(f"Unerwartete Trennung vom Broker. Code: {rc}") def on_message(self, client, userdata, msg): """Callback wenn MQTT Nachricht empfangen wird""" try: payload = msg.payload.decode('utf-8') logger.info(f"Nachricht empfangen auf {msg.topic}: {payload}") # JSON parsen data = json.loads(payload) # In Datenbank speichern self.save_to_database(data) except json.JSONDecodeError as e: logger.error(f"Fehler beim JSON-Parsen: {e}") except Exception as e: logger.error(f"Fehler bei Nachrichtenverarbeitung: {e}") def save_to_database(self, data): """Wetterdaten in PostgreSQL speichern""" try: with self.db_conn.cursor() as cursor: cursor.execute(""" INSERT INTO weather_data (datetime, temperature, humidity, pressure, wind_speed, wind_gust, wind_dir, rain, rain_rate) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (datetime) DO UPDATE SET temperature = EXCLUDED.temperature, humidity = EXCLUDED.humidity, pressure = EXCLUDED.pressure, wind_speed = EXCLUDED.wind_speed, wind_gust = EXCLUDED.wind_gust, wind_dir = EXCLUDED.wind_dir, rain = EXCLUDED.rain, rain_rate = EXCLUDED.rain_rate """, ( data.get('datetime'), data.get('temperature'), data.get('humidity'), data.get('pressure'), data.get('wind_speed'), data.get('wind_gust'), data.get('wind_dir'), data.get('rain'), data.get('rain_rate') )) self.db_conn.commit() logger.info(f"Daten gespeichert für {data.get('datetime')}") except Exception as e: logger.error(f"Fehler beim Speichern in Datenbank: {e}") self.db_conn.rollback() def start(self): """MQTT Client starten und auf Nachrichten warten""" try: self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) logger.info("Starte MQTT Loop...") self.mqtt_client.loop_forever() except KeyboardInterrupt: logger.info("Programm wird beendet...") except Exception as e: logger.error(f"Fehler beim Start: {e}") finally: self.cleanup() def cleanup(self): """Ressourcen aufräumen""" if self.mqtt_client: self.mqtt_client.disconnect() logger.info("MQTT Verbindung getrennt") if self.db_conn: self.db_conn.close() logger.info("Datenbankverbindung geschlossen") def main(): """Hauptfunktion""" logger.info("Wetterstation Collector startet...") # Prüfen ob alle nötigen Umgebungsvariablen gesetzt sind required_vars = ['MQTT_USERNAME', 'MQTT_PASSWORD', 'DB_USER', 'DB_PASSWORD'] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: logger.error(f"Fehlende Umgebungsvariablen: {', '.join(missing_vars)}") logger.error("Bitte .env Datei mit den erforderlichen Werten erstellen") return collector = WeatherDataCollector() collector.start() if __name__ == "__main__": main()