From f32e472ea33159491fb97a572366ef91d2e22b72 Mon Sep 17 00:00:00 2001 From: rxf Date: Tue, 10 Feb 2026 14:06:42 +0100 Subject: [PATCH] HTTP-Empfang geht --- collector/main.py | 369 ++++++++++++++++++++++++------------- collector/requirements.txt | 3 +- docker-compose.yml | 2 + 3 files changed, 241 insertions(+), 133 deletions(-) diff --git a/collector/main.py b/collector/main.py index a14cb4d..be2bf17 100644 --- a/collector/main.py +++ b/collector/main.py @@ -1,15 +1,16 @@ -# MQTT subscriber that reads weather data and stores in PostgreSQL +# HTTP API that receives weather data via POST and stores in PostgreSQL import os import json import logging -import ssl from datetime import datetime from pathlib import Path from dotenv import load_dotenv -import paho.mqtt.client as mqtt +from fastapi import FastAPI, HTTPException, Request +from pydantic import BaseModel import psycopg2 from psycopg2.extras import RealDictCursor +import uvicorn # Logging konfigurieren logging.basicConfig( @@ -23,11 +24,7 @@ env_path = Path(__file__).parent.parent / '.env' load_dotenv(dotenv_path=env_path) # Konfiguration -MQTT_BROKER = os.getenv('MQTT_BROKER', 'rexfue.de') -MQTT_PORT = int(os.getenv('MQTT_PORT', 1883)) -MQTT_USERNAME = os.getenv('MQTT_USERNAME') -MQTT_PASSWORD = os.getenv('MQTT_PASSWORD') -MQTT_TOPIC = os.getenv('MQTT_TOPIC', 'vantage/live') +COLLECTOR_PORT = int(os.getenv('COLLECTOR_PORT', 8001)) DB_HOST = os.getenv('DB_HOST', 'localhost') DB_PORT = int(os.getenv('DB_PORT', 5432)) @@ -35,99 +32,223 @@ DB_NAME = os.getenv('DB_NAME', 'wetterstation') DB_USER = os.getenv('DB_USER') DB_PASSWORD = os.getenv('DB_PASSWORD') +# FastAPI App +app = FastAPI(title="Weather Data Collector API") -class WeatherDataCollector: - """Klasse zum Sammeln und Speichern von Wetterdaten aus MQTT in PostgreSQL""" + +# Pydantic Models +class WeatherDataInput(BaseModel): + # Unterstütze beide Formate: datetime (String) oder dateTime (Unix-Timestamp) + datetime: str | None = None + dateTime: int | None = None - def __init__(self): - self.db_conn = None - self.mqtt_client = None - self.setup_database() - self.setup_mqtt() + # Unterstütze beide Feldnamen + temperature: float | None = None + outTemp: float | None = None # Fahrenheit - def setup_database(self): - """Datenbankverbindung herstellen und Tabelle erstellen""" - try: - self.db_conn = psycopg2.connect( - host=DB_HOST, - port=DB_PORT, - database=DB_NAME, - user=DB_USER, - password=DB_PASSWORD - ) - logger.info("Datenbankverbindung hergestellt") - - # Tabelle erstellen falls nicht vorhanden - with self.db_conn.cursor() as cursor: - cursor.execute(""" - CREATE TABLE IF NOT EXISTS weather_data ( - id SERIAL PRIMARY KEY, - datetime TIMESTAMP NOT NULL, - temperature FLOAT, - humidity INTEGER, - pressure FLOAT, - wind_speed FLOAT, - wind_gust FLOAT, - wind_dir FLOAT, - rain FLOAT, - rain_rate FLOAT, - received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - UNIQUE(datetime) - ) - """) - self.db_conn.commit() - logger.info("Tabelle weather_data bereit") - except Exception as e: - logger.error(f"Fehler bei Datenbankverbindung: {e}") - raise + humidity: int | None = None + outHumidity: float | None = None - def setup_mqtt(self): - """MQTT Client konfigurieren""" - self.mqtt_client = mqtt.Client() - self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) + pressure: float | None = None + barometer: float | None = None # inHg + + windSpeed: float | None = None # mph + wind_speed: float | None = None + + windGust: float | None = None # mph + wind_gust: float | None = None + + windDir: float | None = None + wind_dir: float | None = None + + rain: float | None = None + rainRate: float | None = None + rain_rate: float | None = None + + model_config = {"extra": "allow"} + + def get_datetime_string(self) -> str: + """Konvertiere dateTime (Unix-Timestamp) zu datetime (String)""" + if self.datetime: + return self.datetime + elif self.dateTime: + from datetime import datetime as dt + return dt.fromtimestamp(self.dateTime).strftime('%Y-%m-%d %H:%M:%S') + raise ValueError("Weder datetime noch dateTime vorhanden") + + def get_temperature_celsius(self) -> float | None: + """Konvertiere Temperatur von Fahrenheit zu Celsius falls nötig""" + if self.temperature is not None: + return self.temperature + elif self.outTemp is not None: + # Fahrenheit zu Celsius: (F - 32) * 5/9 + return (self.outTemp - 32) * 5 / 9 + return None + + def get_humidity_int(self) -> int | None: + """Hole Humidity-Wert""" + if self.humidity is not None: + return int(self.humidity) + elif self.outHumidity is not None: + return int(self.outHumidity) + return None + + def get_pressure_hpa(self) -> float | None: + """Konvertiere Druck von inHg zu hPa falls nötig""" + if self.pressure is not None: + return self.pressure + elif self.barometer is not None: + # inHg zu hPa: inHg * 33.8639 + return self.barometer * 33.8639 + return None + + def get_wind_speed(self) -> float | None: + """Hole Windgeschwindigkeit""" + return self.windSpeed if self.windSpeed is not None else self.wind_speed + + def get_wind_gust(self) -> float | None: + """Hole Windböen""" + return self.windGust if self.windGust is not None else self.wind_gust + + def get_wind_dir(self) -> float | None: + """Hole Windrichtung""" + return self.windDir if self.windDir is not None else self.wind_dir + + def get_rain_rate(self) -> float | None: + """Hole Regenrate""" + return self.rainRate if self.rainRate is not None else self.rain_rate + + +# Datenbankverbindung +def get_db_connection(): + """Datenbankverbindung herstellen""" + try: + conn = psycopg2.connect( + host=DB_HOST, + port=DB_PORT, + database=DB_NAME, + user=DB_USER, + password=DB_PASSWORD + ) + return conn + except Exception as e: + logger.error(f"Datenbankverbindungsfehler: {e}") + raise HTTPException(status_code=500, detail="Datenbankverbindung fehlgeschlagen") + + +def setup_database(): + """Tabelle erstellen falls nicht vorhanden""" + try: + conn = get_db_connection() + with conn.cursor() as cursor: + cursor.execute(""" + CREATE TABLE IF NOT EXISTS weather_data ( + id SERIAL PRIMARY KEY, + datetime TIMESTAMPTZ NOT NULL, + temperature FLOAT, + humidity INTEGER, + pressure FLOAT, + wind_speed FLOAT, + wind_gust FLOAT, + wind_dir FLOAT, + rain FLOAT, + rain_rate FLOAT, + received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(datetime) + ) + """) + conn.commit() + logger.info("Tabelle weather_data bereit") + conn.close() + except Exception as e: + logger.error(f"Fehler bei Datenbanksetup: {e}") + raise + + +# API Endpoints +@app.on_event("startup") +async def startup_event(): + """Bei Start die Datenbank initialisieren""" + logger.info("Collector API startet...") + setup_database() + logger.info(f"API läuft auf Port {COLLECTOR_PORT}") + + +@app.get("/") +async def root(): + """Root Endpoint - GET zeigt Info""" + return { + "message": "Weather Data Collector API", + "version": "1.0.0", + "endpoint": "POST /weather or POST /" + } + + +@app.post("/") +async def root_post(request: Request): + """Root Endpoint - POST akzeptiert Wetterdaten (Alias für /weather)""" + try: + # Rohen Body lesen + body = await request.body() + body_str = body.decode('utf-8') + logger.info(f"POST auf Root - Raw Body: {body_str}") - # Callbacks setzen - self.mqtt_client.on_connect = self.on_connect - self.mqtt_client.on_message = self.on_message - self.mqtt_client.on_disconnect = self.on_disconnect + # Als JSON parsen + data_dict = json.loads(body_str) + logger.info(f"POST auf Root - Parsed JSON: {data_dict}") - logger.info(f"MQTT Client konfiguriert für {MQTT_BROKER}:{MQTT_PORT}") - - def on_connect(self, client, userdata, flags, rc): - """Callback wenn MQTT Verbindung hergestellt wird""" - if rc == 0: - logger.info("Mit MQTT Broker verbunden") - client.subscribe(MQTT_TOPIC) - logger.info(f"Topic abonniert: {MQTT_TOPIC}") - else: - logger.error(f"Verbindung fehlgeschlagen mit Code {rc}") - - def on_disconnect(self, client, userdata, rc): - """Callback wenn MQTT Verbindung getrennt wird""" - if rc != 0: - logger.warning(f"Unerwartete Trennung vom Broker. Code: {rc}") - - def on_message(self, client, userdata, msg): - """Callback wenn MQTT Nachricht empfangen wird""" + # Zu Pydantic Model konvertieren + data = WeatherDataInput(**data_dict) + return await receive_weather_data(data) + except json.JSONDecodeError as e: + logger.error(f"JSON Parse Error: {e}") + raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}") + except Exception as e: + logger.error(f"Fehler bei Root POST: {e}") + raise HTTPException(status_code=422, detail=f"Validation error: {str(e)}") + + +@app.post("/debug") +async def debug_post(request: dict): + """Debug Endpoint - akzeptiert beliebige JSON und loggt sie""" + logger.info(f"Debug: Empfangene Rohdaten: {request}") + return {"status": "logged", "data": request} + + +@app.get("/health") +async def health_check(): + """Health Check""" + try: + conn = get_db_connection() + with conn.cursor() as cursor: + cursor.execute("SELECT 1") + conn.close() + return {"status": "healthy", "database": "connected"} + except Exception as e: + raise HTTPException(status_code=503, detail=f"Database error: {str(e)}") + + +@app.post("/weather") +async def receive_weather_data(data: WeatherDataInput): + """Wetterdaten empfangen und speichern""" + logger.info(f"Empfangene Daten: {data.model_dump()}") + try: + conn = get_db_connection() try: - payload = msg.payload.decode('utf-8') - logger.info(f"Nachricht empfangen auf {msg.topic}: {payload}") + # Konvertiere zu den richtigen Werten + dt_string = data.get_datetime_string() + temp_c = data.get_temperature_celsius() + humidity = data.get_humidity_int() + pressure = data.get_pressure_hpa() + wind_speed = data.get_wind_speed() + wind_gust = data.get_wind_gust() + wind_dir = data.get_wind_dir() + rain = data.rain + rain_rate = data.get_rain_rate() - # JSON parsen - data = json.loads(payload) + logger.info(f"Konvertierte Daten - datetime: {dt_string}, temp: {temp_c}°C, humidity: {humidity}%, pressure: {pressure} hPa") - # In Datenbank speichern - self.save_to_database(data) - - except json.JSONDecodeError as e: - logger.error(f"Fehler beim JSON-Parsen: {e}") - except Exception as e: - logger.error(f"Fehler bei Nachrichtenverarbeitung: {e}") - - def save_to_database(self, data): - """Wetterdaten in PostgreSQL speichern""" - try: - with self.db_conn.cursor() as cursor: + with conn.cursor() as cursor: cursor.execute(""" INSERT INTO weather_data (datetime, temperature, humidity, pressure, wind_speed, @@ -143,51 +264,35 @@ class WeatherDataCollector: rain = EXCLUDED.rain, rain_rate = EXCLUDED.rain_rate """, ( - data.get('datetime'), - data.get('temperature'), - data.get('humidity'), - data.get('pressure'), - data.get('wind_speed'), - data.get('wind_gust'), - data.get('wind_dir'), - data.get('rain'), - data.get('rain_rate') + dt_string, + temp_c, + humidity, + pressure, + wind_speed, + wind_gust, + wind_dir, + rain, + rain_rate )) - self.db_conn.commit() - logger.info(f"Daten gespeichert für {data.get('datetime')}") - except Exception as e: - logger.error(f"Fehler beim Speichern in Datenbank: {e}") - self.db_conn.rollback() - - def start(self): - """MQTT Client starten und auf Nachrichten warten""" - try: - self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) - logger.info("Starte MQTT Loop...") - self.mqtt_client.loop_forever() - except KeyboardInterrupt: - logger.info("Programm wird beendet...") - except Exception as e: - logger.error(f"Fehler beim Start: {e}") + conn.commit() + logger.info(f"Daten gespeichert für {dt_string} (UTC)") + + return { + "status": "success", + "message": f"Weather data for {dt_string} saved successfully" + } finally: - self.cleanup() - - def cleanup(self): - """Ressourcen aufräumen""" - if self.mqtt_client: - self.mqtt_client.disconnect() - logger.info("MQTT Verbindung getrennt") - if self.db_conn: - self.db_conn.close() - logger.info("Datenbankverbindung geschlossen") + conn.close() + + except Exception as e: + logger.error(f"Fehler beim Speichern: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") def main(): """Hauptfunktion""" - logger.info("Wetterstation Collector startet...") - # Prüfen ob alle nötigen Umgebungsvariablen gesetzt sind - required_vars = ['MQTT_USERNAME', 'MQTT_PASSWORD', 'DB_USER', 'DB_PASSWORD'] + required_vars = ['DB_USER', 'DB_PASSWORD'] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: @@ -195,8 +300,8 @@ def main(): logger.error("Bitte .env Datei mit den erforderlichen Werten erstellen") return - collector = WeatherDataCollector() - collector.start() + uvicorn.run(app, host="0.0.0.0", port=COLLECTOR_PORT) + if __name__ == "__main__": diff --git a/collector/requirements.txt b/collector/requirements.txt index cbe4b3d..de6238b 100644 --- a/collector/requirements.txt +++ b/collector/requirements.txt @@ -1,3 +1,4 @@ -paho-mqtt==1.6.1 +fastapi==0.115.5 +uvicorn==0.34.0 psycopg2-binary==2.9.10 python-dotenv==1.0.0 diff --git a/docker-compose.yml b/docker-compose.yml index 9d73199..2b2c39e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,6 +41,8 @@ services: dockerfile: Dockerfile container_name: wetterstation_collector restart: unless-stopped + ports: + - "8001:8001" env_file: - ./.env environment: