# HTTP API that receives weather data via POST and stores in PostgreSQL import os import json import logging from datetime import datetime from pathlib import Path from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Request from pydantic import BaseModel import psycopg2 from psycopg2.extras import RealDictCursor import uvicorn # Logging konfigurieren logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Umgebungsvariablen laden - eine Ebene höher env_path = Path(__file__).parent.parent / '.env' load_dotenv(dotenv_path=env_path) # Konfiguration COLLECTOR_PORT = int(os.getenv('COLLECTOR_PORT', 8001)) DB_HOST = os.getenv('DB_HOST', 'localhost') DB_PORT = int(os.getenv('DB_PORT', 5432)) DB_NAME = os.getenv('DB_NAME', 'wetterstation') DB_USER = os.getenv('DB_USER') DB_PASSWORD = os.getenv('DB_PASSWORD') # FastAPI App app = FastAPI(title="Weather Data Collector API") # Pydantic Models class WeatherDataInput(BaseModel): # Unterstütze beide Formate: datetime (String) oder dateTime (Unix-Timestamp) datetime: str | None = None dateTime: int | None = None # Unterstütze beide Feldnamen temperature: float | None = None outTemp: float | None = None # Fahrenheit humidity: int | None = None outHumidity: float | None = None pressure: float | None = None barometer: float | None = None # inHg windSpeed: float | None = None # mph wind_speed: float | None = None windGust: float | None = None # mph wind_gust: float | None = None windDir: float | None = None wind_dir: float | None = None rain: float | None = None rainRate: float | None = None rain_rate: float | None = None model_config = {"extra": "allow"} def get_datetime_string(self) -> str: """Konvertiere dateTime (Unix-Timestamp) zu datetime (String)""" if self.datetime: return self.datetime elif self.dateTime: from datetime import datetime as dt return dt.fromtimestamp(self.dateTime).strftime('%Y-%m-%d %H:%M:%S') raise ValueError("Weder datetime noch dateTime vorhanden") def get_temperature_celsius(self) -> float | None: """Konvertiere Temperatur von Fahrenheit zu Celsius falls nötig""" if self.temperature is not None: return self.temperature elif self.outTemp is not None: # Fahrenheit zu Celsius: (F - 32) * 5/9 return (self.outTemp - 32) * 5 / 9 return None def get_humidity_int(self) -> int | None: """Hole Humidity-Wert""" if self.humidity is not None: return int(self.humidity) elif self.outHumidity is not None: return int(self.outHumidity) return None def get_pressure_hpa(self) -> float | None: """Konvertiere Druck von inHg zu hPa falls nötig""" if self.pressure is not None: return self.pressure elif self.barometer is not None: # inHg zu hPa: inHg * 33.8639 return self.barometer * 33.8639 return None def get_wind_speed(self) -> float | None: """Hole Windgeschwindigkeit""" return self.windSpeed if self.windSpeed is not None else self.wind_speed def get_wind_gust(self) -> float | None: """Hole Windböen""" return self.windGust if self.windGust is not None else self.wind_gust def get_wind_dir(self) -> float | None: """Hole Windrichtung""" return self.windDir if self.windDir is not None else self.wind_dir def get_rain_rate(self) -> float | None: """Hole Regenrate""" return self.rainRate if self.rainRate is not None else self.rain_rate # Datenbankverbindung def get_db_connection(): """Datenbankverbindung herstellen""" try: conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD ) return conn except Exception as e: logger.error(f"Datenbankverbindungsfehler: {e}") raise HTTPException(status_code=500, detail="Datenbankverbindung fehlgeschlagen") def setup_database(): """Tabelle erstellen falls nicht vorhanden""" try: conn = get_db_connection() with conn.cursor() as cursor: cursor.execute(""" CREATE TABLE IF NOT EXISTS weather_data ( id SERIAL PRIMARY KEY, datetime TIMESTAMPTZ NOT NULL, temperature FLOAT, humidity INTEGER, pressure FLOAT, wind_speed FLOAT, wind_gust FLOAT, wind_dir FLOAT, rain FLOAT, rain_rate FLOAT, received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(datetime) ) """) conn.commit() logger.info("Tabelle weather_data bereit") conn.close() except Exception as e: logger.error(f"Fehler bei Datenbanksetup: {e}") raise # API Endpoints @app.on_event("startup") async def startup_event(): """Bei Start die Datenbank initialisieren""" logger.info("Collector API startet...") setup_database() logger.info(f"API läuft auf Port {COLLECTOR_PORT}") @app.get("/") async def root(): """Root Endpoint - GET zeigt Info""" return { "message": "Weather Data Collector API", "version": "1.0.0", "endpoint": "POST /weather or POST /" } @app.post("/") async def root_post(request: Request): """Root Endpoint - POST akzeptiert Wetterdaten (Alias für /weather)""" try: # Rohen Body lesen body = await request.body() body_str = body.decode('utf-8') logger.info(f"POST auf Root - Raw Body: {body_str}") # Als JSON parsen data_dict = json.loads(body_str) logger.info(f"POST auf Root - Parsed JSON: {data_dict}") # Zu Pydantic Model konvertieren data = WeatherDataInput(**data_dict) return await receive_weather_data(data) except json.JSONDecodeError as e: logger.error(f"JSON Parse Error: {e}") raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}") except Exception as e: logger.error(f"Fehler bei Root POST: {e}") raise HTTPException(status_code=422, detail=f"Validation error: {str(e)}") @app.post("/debug") async def debug_post(request: dict): """Debug Endpoint - akzeptiert beliebige JSON und loggt sie""" logger.info(f"Debug: Empfangene Rohdaten: {request}") return {"status": "logged", "data": request} @app.get("/health") async def health_check(): """Health Check""" try: conn = get_db_connection() with conn.cursor() as cursor: cursor.execute("SELECT 1") conn.close() return {"status": "healthy", "database": "connected"} except Exception as e: raise HTTPException(status_code=503, detail=f"Database error: {str(e)}") @app.post("/weather") async def receive_weather_data(data: WeatherDataInput): """Wetterdaten empfangen und speichern""" logger.info(f"Empfangene Daten: {data.model_dump()}") try: conn = get_db_connection() try: # Konvertiere zu den richtigen Werten dt_string = data.get_datetime_string() temp_c = data.get_temperature_celsius() humidity = data.get_humidity_int() pressure = data.get_pressure_hpa() wind_speed = data.get_wind_speed() wind_gust = data.get_wind_gust() wind_dir = data.get_wind_dir() rain = data.rain rain_rate = data.get_rain_rate() logger.info(f"Konvertierte Daten - datetime: {dt_string}, temp: {temp_c}°C, humidity: {humidity}%, pressure: {pressure} hPa") with conn.cursor() as cursor: cursor.execute(""" INSERT INTO weather_data (datetime, temperature, humidity, pressure, wind_speed, wind_gust, wind_dir, rain, rain_rate) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (datetime) DO UPDATE SET temperature = EXCLUDED.temperature, humidity = EXCLUDED.humidity, pressure = EXCLUDED.pressure, wind_speed = EXCLUDED.wind_speed, wind_gust = EXCLUDED.wind_gust, wind_dir = EXCLUDED.wind_dir, rain = EXCLUDED.rain, rain_rate = EXCLUDED.rain_rate """, ( dt_string, temp_c, humidity, pressure, wind_speed, wind_gust, wind_dir, rain, rain_rate )) conn.commit() logger.info(f"Daten gespeichert für {dt_string} (UTC)") return { "status": "success", "message": f"Weather data for {dt_string} saved successfully" } finally: conn.close() except Exception as e: logger.error(f"Fehler beim Speichern: {e}") raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") def main(): """Hauptfunktion""" # Prüfen ob alle nötigen Umgebungsvariablen gesetzt sind required_vars = ['DB_USER', 'DB_PASSWORD'] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: logger.error(f"Fehlende Umgebungsvariablen: {', '.join(missing_vars)}") logger.error("Bitte .env Datei mit den erforderlichen Werten erstellen") return uvicorn.run(app, host="0.0.0.0", port=COLLECTOR_PORT) if __name__ == "__main__": main()