# HTTP API that receives weather data via POST and stores in PostgreSQL import os import json import logging from datetime import datetime from pathlib import Path from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Request from pydantic import BaseModel import psycopg2 from psycopg2.extras import RealDictCursor import uvicorn # Logging konfigurieren logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Umgebungsvariablen laden - eine Ebene höher env_path = Path(__file__).parent.parent / '.env' load_dotenv(dotenv_path=env_path) # Konfiguration COLLECTOR_PORT = int(os.getenv('COLLECTOR_PORT', 8001)) DB_HOST = os.getenv('DB_HOST', 'localhost') DB_PORT = int(os.getenv('DB_PORT', 5432)) DB_NAME = os.getenv('DB_NAME', 'wetterstation') DB_USER = os.getenv('DB_USER') DB_PASSWORD = os.getenv('DB_PASSWORD') # FastAPI App app = FastAPI(title="Weather Data Collector API") # Pydantic Models class WeatherDataInput(BaseModel): # Zeitstempel: ISO-String (time), datetime-String oder Unix-Timestamp time: str | None = None datetime: str | None = None dateTime: int | None = None # Außentemperatur (Celsius): tempOut, temperature oder outTemp (Fahrenheit) tempOut: float | None = None # Celsius (neues Format) temperature: float | None = None outTemp: float | None = None # Fahrenheit (altes Format) # Innentemperatur tempIn: float | None = None # Celsius # Außenfeuchte humOut: int | None = None humidity: int | None = None outHumidity: float | None = None # Innenfeuchte humIn: int | None = None # Luftdruck pressure: float | None = None barometer: float | None = None # inHg barTrend: int | None = None # hPa/Stunde # Wind windAvg: float | None = None # m/s Durchschnitt (neues Format) windSpeed: float | None = None wind_speed: float | None = None windGust: float | None = None wind_gust: float | None = None windDir: float | None = None wind_dir: float | None = None # Niederschlag rain: float | None = None rainRate: float | None = None rain_rate: float | None = None # Vorhersage forecast: int | None = None model_config = {"extra": "allow"} def get_datetime_string(self) -> str: """Zeitstempel als String zurückgeben""" if self.time: return self.time elif self.datetime: return self.datetime elif self.dateTime: from datetime import datetime as dt return dt.fromtimestamp(self.dateTime).strftime('%Y-%m-%d %H:%M:%S') raise ValueError("Kein Zeitstempel vorhanden (time, datetime oder dateTime)") def get_temperature_celsius(self) -> float | None: """Außentemperatur in Celsius""" if self.tempOut is not None: return self.tempOut elif self.temperature is not None: return self.temperature elif self.outTemp is not None: return (self.outTemp - 32) * 5 / 9 return None def get_temp_in(self) -> float | None: """Innentemperatur in Celsius""" return self.tempIn def get_humidity_int(self) -> int | None: """Außenfeuchte""" if self.humOut is not None: return int(self.humOut) elif self.humidity is not None: return int(self.humidity) elif self.outHumidity is not None: return int(self.outHumidity) return None def get_humidity_in(self) -> int | None: """Innenfeuchte""" return int(self.humIn) if self.humIn is not None else None def get_pressure_hpa(self) -> float | None: """Luftdruck in hPa""" if self.pressure is not None: return self.pressure elif self.barometer is not None: return self.barometer * 33.8639 return None def get_wind_speed(self) -> float | None: """Durchschnittliche Windgeschwindigkeit""" if self.windAvg is not None: return self.windAvg elif self.windSpeed is not None: return self.windSpeed return self.wind_speed def get_wind_gust(self) -> float | None: """Windböe""" return self.windGust if self.windGust is not None else self.wind_gust def get_wind_dir(self) -> float | None: """Windrichtung""" return self.windDir if self.windDir is not None else self.wind_dir def get_rain_rate(self) -> float | None: """Regenrate""" return self.rainRate if self.rainRate is not None else self.rain_rate # Datenbankverbindung def get_db_connection(): """Datenbankverbindung herstellen""" try: conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD ) return conn except Exception as e: logger.error(f"Datenbankverbindungsfehler: {e}") raise HTTPException(status_code=500, detail="Datenbankverbindung fehlgeschlagen") def setup_database(): """Tabelle erstellen und fehlende Spalten ergänzen""" try: conn = get_db_connection() with conn.cursor() as cursor: cursor.execute(""" CREATE TABLE IF NOT EXISTS weather_data ( id SERIAL PRIMARY KEY, datetime TIMESTAMPTZ NOT NULL, temperature FLOAT, humidity INTEGER, pressure FLOAT, wind_speed FLOAT, wind_gust FLOAT, wind_dir FLOAT, rain FLOAT, rain_rate FLOAT, received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(datetime) ) """) # Neue Spalten ergänzen (idempotent) cursor.execute("ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS temp_in FLOAT") cursor.execute("ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS humidity_in INTEGER") cursor.execute("ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS forecast INTEGER") cursor.execute("ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS bar_trend INTEGER") conn.commit() logger.info("Tabelle weather_data bereit (inkl. neuer Spalten)") conn.close() except Exception as e: logger.error(f"Fehler bei Datenbanksetup: {e}") raise # API Endpoints @app.on_event("startup") async def startup_event(): """Bei Start die Datenbank initialisieren""" logger.info("Collector API startet...") setup_database() logger.info(f"API läuft auf Port {COLLECTOR_PORT}") @app.get("/") async def root(): """Root Endpoint - GET zeigt Info""" return { "message": "Weather Data Collector API", "version": "1.0.0", "endpoint": "POST /weather or POST /" } @app.post("/") async def root_post(request: Request): """Root Endpoint - POST akzeptiert Wetterdaten (Alias für /weather)""" try: # Rohen Body lesen body = await request.body() body_str = body.decode('utf-8') logger.info(f"POST auf Root - Raw Body: {body_str}") # Als JSON parsen data_dict = json.loads(body_str) logger.info(f"POST auf Root - Parsed JSON: {data_dict}") # Zu Pydantic Model konvertieren data = WeatherDataInput(**data_dict) return await receive_weather_data(data) except json.JSONDecodeError as e: logger.error(f"JSON Parse Error: {e}") raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}") except Exception as e: logger.error(f"Fehler bei Root POST: {e}") raise HTTPException(status_code=422, detail=f"Validation error: {str(e)}") @app.post("/debug") async def debug_post(request: dict): """Debug Endpoint - akzeptiert beliebige JSON und loggt sie""" logger.info(f"Debug: Empfangene Rohdaten: {request}") return {"status": "logged", "data": request} @app.get("/health") async def health_check(): """Health Check""" try: conn = get_db_connection() with conn.cursor() as cursor: cursor.execute("SELECT 1") conn.close() return {"status": "healthy", "database": "connected"} except Exception as e: raise HTTPException(status_code=503, detail=f"Database error: {str(e)}") @app.post("/weather") async def receive_weather_data(data: WeatherDataInput): """Wetterdaten empfangen und speichern""" logger.info(f"Empfangene Daten: {data.model_dump()}") try: conn = get_db_connection() try: # Konvertiere zu den richtigen Werten dt_string = data.get_datetime_string() temp_c = data.get_temperature_celsius() temp_in = data.get_temp_in() humidity = data.get_humidity_int() humidity_in = data.get_humidity_in() pressure = data.get_pressure_hpa() bar_trend = data.barTrend wind_speed = data.get_wind_speed() wind_gust = data.get_wind_gust() wind_dir = data.get_wind_dir() rain = data.rain rain_rate = data.get_rain_rate() forecast = data.forecast logger.info( f"Konvertierte Daten - datetime: {dt_string}, " f"tempOut: {temp_c}°C, tempIn: {temp_in}°C, " f"humOut: {humidity}%, humIn: {humidity_in}%, " f"pressure: {pressure} hPa, barTrend: {bar_trend}" ) with conn.cursor() as cursor: cursor.execute(""" INSERT INTO weather_data (datetime, temperature, temp_in, humidity, humidity_in, pressure, bar_trend, wind_speed, wind_gust, wind_dir, rain, rain_rate, forecast) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (datetime) DO UPDATE SET temperature = EXCLUDED.temperature, temp_in = EXCLUDED.temp_in, humidity = EXCLUDED.humidity, humidity_in = EXCLUDED.humidity_in, pressure = EXCLUDED.pressure, bar_trend = EXCLUDED.bar_trend, wind_speed = EXCLUDED.wind_speed, wind_gust = EXCLUDED.wind_gust, wind_dir = EXCLUDED.wind_dir, rain = EXCLUDED.rain, rain_rate = EXCLUDED.rain_rate, forecast = EXCLUDED.forecast """, ( dt_string, temp_c, temp_in, humidity, humidity_in, pressure, bar_trend, wind_speed, wind_gust, wind_dir, rain, rain_rate, forecast )) conn.commit() logger.info(f"Daten gespeichert für {dt_string} (UTC)") return { "status": "success", "message": f"Weather data for {dt_string} saved successfully" } finally: conn.close() except Exception as e: logger.error(f"Fehler beim Speichern: {e}") raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") def main(): """Hauptfunktion""" # Prüfen ob alle nötigen Umgebungsvariablen gesetzt sind required_vars = ['DB_USER', 'DB_PASSWORD'] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: logger.error(f"Fehlende Umgebungsvariablen: {', '.join(missing_vars)}") logger.error("Bitte .env Datei mit den erforderlichen Werten erstellen") return uvicorn.run(app, host="0.0.0.0", port=COLLECTOR_PORT) if __name__ == "__main__": main()