HTTP-Empfang geht

This commit is contained in:
rxf
2026-02-10 14:06:42 +01:00
parent db1e2fd737
commit f32e472ea3
3 changed files with 241 additions and 133 deletions

View File

@@ -1,15 +1,16 @@
# MQTT subscriber that reads weather data and stores in PostgreSQL
# HTTP API that receives weather data via POST and stores in PostgreSQL
import os
import json
import logging
import ssl
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
import paho.mqtt.client as mqtt
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import psycopg2
from psycopg2.extras import RealDictCursor
import uvicorn
# Logging konfigurieren
logging.basicConfig(
@@ -23,11 +24,7 @@ env_path = Path(__file__).parent.parent / '.env'
load_dotenv(dotenv_path=env_path)
# Konfiguration
MQTT_BROKER = os.getenv('MQTT_BROKER', 'rexfue.de')
MQTT_PORT = int(os.getenv('MQTT_PORT', 1883))
MQTT_USERNAME = os.getenv('MQTT_USERNAME')
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD')
MQTT_TOPIC = os.getenv('MQTT_TOPIC', 'vantage/live')
COLLECTOR_PORT = int(os.getenv('COLLECTOR_PORT', 8001))
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', 5432))
@@ -35,99 +32,223 @@ DB_NAME = os.getenv('DB_NAME', 'wetterstation')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
# FastAPI App
app = FastAPI(title="Weather Data Collector API")
class WeatherDataCollector:
"""Klasse zum Sammeln und Speichern von Wetterdaten aus MQTT in PostgreSQL"""
def __init__(self):
self.db_conn = None
self.mqtt_client = None
self.setup_database()
self.setup_mqtt()
# Pydantic Models
class WeatherDataInput(BaseModel):
# Unterstütze beide Formate: datetime (String) oder dateTime (Unix-Timestamp)
datetime: str | None = None
dateTime: int | None = None
def setup_database(self):
"""Datenbankverbindung herstellen und Tabelle erstellen"""
# Unterstütze beide Feldnamen
temperature: float | None = None
outTemp: float | None = None # Fahrenheit
humidity: int | None = None
outHumidity: float | None = None
pressure: float | None = None
barometer: float | None = None # inHg
windSpeed: float | None = None # mph
wind_speed: float | None = None
windGust: float | None = None # mph
wind_gust: float | None = None
windDir: float | None = None
wind_dir: float | None = None
rain: float | None = None
rainRate: float | None = None
rain_rate: float | None = None
model_config = {"extra": "allow"}
def get_datetime_string(self) -> str:
"""Konvertiere dateTime (Unix-Timestamp) zu datetime (String)"""
if self.datetime:
return self.datetime
elif self.dateTime:
from datetime import datetime as dt
return dt.fromtimestamp(self.dateTime).strftime('%Y-%m-%d %H:%M:%S')
raise ValueError("Weder datetime noch dateTime vorhanden")
def get_temperature_celsius(self) -> float | None:
"""Konvertiere Temperatur von Fahrenheit zu Celsius falls nötig"""
if self.temperature is not None:
return self.temperature
elif self.outTemp is not None:
# Fahrenheit zu Celsius: (F - 32) * 5/9
return (self.outTemp - 32) * 5 / 9
return None
def get_humidity_int(self) -> int | None:
"""Hole Humidity-Wert"""
if self.humidity is not None:
return int(self.humidity)
elif self.outHumidity is not None:
return int(self.outHumidity)
return None
def get_pressure_hpa(self) -> float | None:
"""Konvertiere Druck von inHg zu hPa falls nötig"""
if self.pressure is not None:
return self.pressure
elif self.barometer is not None:
# inHg zu hPa: inHg * 33.8639
return self.barometer * 33.8639
return None
def get_wind_speed(self) -> float | None:
"""Hole Windgeschwindigkeit"""
return self.windSpeed if self.windSpeed is not None else self.wind_speed
def get_wind_gust(self) -> float | None:
"""Hole Windböen"""
return self.windGust if self.windGust is not None else self.wind_gust
def get_wind_dir(self) -> float | None:
"""Hole Windrichtung"""
return self.windDir if self.windDir is not None else self.wind_dir
def get_rain_rate(self) -> float | None:
"""Hole Regenrate"""
return self.rainRate if self.rainRate is not None else self.rain_rate
# Datenbankverbindung
def get_db_connection():
"""Datenbankverbindung herstellen"""
try:
conn = psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
return conn
except Exception as e:
logger.error(f"Datenbankverbindungsfehler: {e}")
raise HTTPException(status_code=500, detail="Datenbankverbindung fehlgeschlagen")
def setup_database():
"""Tabelle erstellen falls nicht vorhanden"""
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute("""
CREATE TABLE IF NOT EXISTS weather_data (
id SERIAL PRIMARY KEY,
datetime TIMESTAMPTZ NOT NULL,
temperature FLOAT,
humidity INTEGER,
pressure FLOAT,
wind_speed FLOAT,
wind_gust FLOAT,
wind_dir FLOAT,
rain FLOAT,
rain_rate FLOAT,
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(datetime)
)
""")
conn.commit()
logger.info("Tabelle weather_data bereit")
conn.close()
except Exception as e:
logger.error(f"Fehler bei Datenbanksetup: {e}")
raise
# API Endpoints
@app.on_event("startup")
async def startup_event():
"""Bei Start die Datenbank initialisieren"""
logger.info("Collector API startet...")
setup_database()
logger.info(f"API läuft auf Port {COLLECTOR_PORT}")
@app.get("/")
async def root():
"""Root Endpoint - GET zeigt Info"""
return {
"message": "Weather Data Collector API",
"version": "1.0.0",
"endpoint": "POST /weather or POST /"
}
@app.post("/")
async def root_post(request: Request):
"""Root Endpoint - POST akzeptiert Wetterdaten (Alias für /weather)"""
try:
# Rohen Body lesen
body = await request.body()
body_str = body.decode('utf-8')
logger.info(f"POST auf Root - Raw Body: {body_str}")
# Als JSON parsen
data_dict = json.loads(body_str)
logger.info(f"POST auf Root - Parsed JSON: {data_dict}")
# Zu Pydantic Model konvertieren
data = WeatherDataInput(**data_dict)
return await receive_weather_data(data)
except json.JSONDecodeError as e:
logger.error(f"JSON Parse Error: {e}")
raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}")
except Exception as e:
logger.error(f"Fehler bei Root POST: {e}")
raise HTTPException(status_code=422, detail=f"Validation error: {str(e)}")
@app.post("/debug")
async def debug_post(request: dict):
"""Debug Endpoint - akzeptiert beliebige JSON und loggt sie"""
logger.info(f"Debug: Empfangene Rohdaten: {request}")
return {"status": "logged", "data": request}
@app.get("/health")
async def health_check():
"""Health Check"""
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
conn.close()
return {"status": "healthy", "database": "connected"}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Database error: {str(e)}")
@app.post("/weather")
async def receive_weather_data(data: WeatherDataInput):
"""Wetterdaten empfangen und speichern"""
logger.info(f"Empfangene Daten: {data.model_dump()}")
try:
conn = get_db_connection()
try:
self.db_conn = psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
logger.info("Datenbankverbindung hergestellt")
# Konvertiere zu den richtigen Werten
dt_string = data.get_datetime_string()
temp_c = data.get_temperature_celsius()
humidity = data.get_humidity_int()
pressure = data.get_pressure_hpa()
wind_speed = data.get_wind_speed()
wind_gust = data.get_wind_gust()
wind_dir = data.get_wind_dir()
rain = data.rain
rain_rate = data.get_rain_rate()
# Tabelle erstellen falls nicht vorhanden
with self.db_conn.cursor() as cursor:
cursor.execute("""
CREATE TABLE IF NOT EXISTS weather_data (
id SERIAL PRIMARY KEY,
datetime TIMESTAMP NOT NULL,
temperature FLOAT,
humidity INTEGER,
pressure FLOAT,
wind_speed FLOAT,
wind_gust FLOAT,
wind_dir FLOAT,
rain FLOAT,
rain_rate FLOAT,
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(datetime)
)
""")
self.db_conn.commit()
logger.info("Tabelle weather_data bereit")
except Exception as e:
logger.error(f"Fehler bei Datenbankverbindung: {e}")
raise
logger.info(f"Konvertierte Daten - datetime: {dt_string}, temp: {temp_c}°C, humidity: {humidity}%, pressure: {pressure} hPa")
def setup_mqtt(self):
"""MQTT Client konfigurieren"""
self.mqtt_client = mqtt.Client()
self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
# Callbacks setzen
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_message = self.on_message
self.mqtt_client.on_disconnect = self.on_disconnect
logger.info(f"MQTT Client konfiguriert für {MQTT_BROKER}:{MQTT_PORT}")
def on_connect(self, client, userdata, flags, rc):
"""Callback wenn MQTT Verbindung hergestellt wird"""
if rc == 0:
logger.info("Mit MQTT Broker verbunden")
client.subscribe(MQTT_TOPIC)
logger.info(f"Topic abonniert: {MQTT_TOPIC}")
else:
logger.error(f"Verbindung fehlgeschlagen mit Code {rc}")
def on_disconnect(self, client, userdata, rc):
"""Callback wenn MQTT Verbindung getrennt wird"""
if rc != 0:
logger.warning(f"Unerwartete Trennung vom Broker. Code: {rc}")
def on_message(self, client, userdata, msg):
"""Callback wenn MQTT Nachricht empfangen wird"""
try:
payload = msg.payload.decode('utf-8')
logger.info(f"Nachricht empfangen auf {msg.topic}: {payload}")
# JSON parsen
data = json.loads(payload)
# In Datenbank speichern
self.save_to_database(data)
except json.JSONDecodeError as e:
logger.error(f"Fehler beim JSON-Parsen: {e}")
except Exception as e:
logger.error(f"Fehler bei Nachrichtenverarbeitung: {e}")
def save_to_database(self, data):
"""Wetterdaten in PostgreSQL speichern"""
try:
with self.db_conn.cursor() as cursor:
with conn.cursor() as cursor:
cursor.execute("""
INSERT INTO weather_data
(datetime, temperature, humidity, pressure, wind_speed,
@@ -143,51 +264,35 @@ class WeatherDataCollector:
rain = EXCLUDED.rain,
rain_rate = EXCLUDED.rain_rate
""", (
data.get('datetime'),
data.get('temperature'),
data.get('humidity'),
data.get('pressure'),
data.get('wind_speed'),
data.get('wind_gust'),
data.get('wind_dir'),
data.get('rain'),
data.get('rain_rate')
dt_string,
temp_c,
humidity,
pressure,
wind_speed,
wind_gust,
wind_dir,
rain,
rain_rate
))
self.db_conn.commit()
logger.info(f"Daten gespeichert für {data.get('datetime')}")
except Exception as e:
logger.error(f"Fehler beim Speichern in Datenbank: {e}")
self.db_conn.rollback()
conn.commit()
logger.info(f"Daten gespeichert für {dt_string} (UTC)")
def start(self):
"""MQTT Client starten und auf Nachrichten warten"""
try:
self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
logger.info("Starte MQTT Loop...")
self.mqtt_client.loop_forever()
except KeyboardInterrupt:
logger.info("Programm wird beendet...")
except Exception as e:
logger.error(f"Fehler beim Start: {e}")
return {
"status": "success",
"message": f"Weather data for {dt_string} saved successfully"
}
finally:
self.cleanup()
conn.close()
def cleanup(self):
"""Ressourcen aufräumen"""
if self.mqtt_client:
self.mqtt_client.disconnect()
logger.info("MQTT Verbindung getrennt")
if self.db_conn:
self.db_conn.close()
logger.info("Datenbankverbindung geschlossen")
except Exception as e:
logger.error(f"Fehler beim Speichern: {e}")
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
def main():
"""Hauptfunktion"""
logger.info("Wetterstation Collector startet...")
# Prüfen ob alle nötigen Umgebungsvariablen gesetzt sind
required_vars = ['MQTT_USERNAME', 'MQTT_PASSWORD', 'DB_USER', 'DB_PASSWORD']
required_vars = ['DB_USER', 'DB_PASSWORD']
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
@@ -195,8 +300,8 @@ def main():
logger.error("Bitte .env Datei mit den erforderlichen Werten erstellen")
return
collector = WeatherDataCollector()
collector.start()
uvicorn.run(app, host="0.0.0.0", port=COLLECTOR_PORT)
if __name__ == "__main__":

View File

@@ -1,3 +1,4 @@
paho-mqtt==1.6.1
fastapi==0.115.5
uvicorn==0.34.0
psycopg2-binary==2.9.10
python-dotenv==1.0.0

View File

@@ -41,6 +41,8 @@ services:
dockerfile: Dockerfile
container_name: wetterstation_collector
restart: unless-stopped
ports:
- "8001:8001"
env_file:
- ./.env
environment: