# HTTP API that receives weather data via POST and stores in PostgreSQL import os import json import logging import secrets from contextlib import asynccontextmanager from datetime import datetime from pathlib import Path from typing import Optional from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Request, Header, Depends, status from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse from pydantic import BaseModel, ConfigDict, field_validator import psycopg from psycopg_pool import ConnectionPool from slowapi import Limiter from slowapi.errors import RateLimitExceeded from slowapi.util import get_remote_address import uvicorn # --------------------------------------------------------------------------- # # Konfiguration # --------------------------------------------------------------------------- # env_path = Path(__file__).parent.parent / ".env" load_dotenv(dotenv_path=env_path) COLLECTOR_PORT = int(os.getenv("COLLECTOR_PORT", 8001)) DB_HOST = os.getenv("DB_HOST", "localhost") DB_PORT = int(os.getenv("DB_PORT", 5432)) DB_NAME = os.getenv("DB_NAME", "wetterstation") DB_USER = os.getenv("DB_USER") DB_PASSWORD = os.getenv("DB_PASSWORD") # Sicherheit COLLECTOR_API_KEY = os.getenv("COLLECTOR_API_KEY") ENVIRONMENT = os.getenv("ENVIRONMENT", "production").lower() IS_DEV = ENVIRONMENT in ("dev", "development", "local") # Limits MAX_BODY_BYTES = int(os.getenv("COLLECTOR_MAX_BODY_BYTES", 16 * 1024)) # 16 KiB RATE_LIMIT = os.getenv("COLLECTOR_RATE_LIMIT", "30/minute") # Logging — keine Rohdaten auf INFO mehr logging.basicConfig( level=logging.DEBUG if IS_DEV else logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # # Connection Pool # --------------------------------------------------------------------------- # def _build_conninfo() -> str: return ( f"host={DB_HOST} port={DB_PORT} dbname={DB_NAME} " f"user={DB_USER} password={DB_PASSWORD}" ) pool: Optional[ConnectionPool] = None # --------------------------------------------------------------------------- # # Rate-Limiter # --------------------------------------------------------------------------- # def _limit_key(request: Request) -> str: """Rate-Limit-Key: bei API-Key danach, sonst nach IP. Hinter Traefik nutzt slowapi standardmaessig die Peer-IP, was der Proxy-IP entspricht. Wenn ein API-Key da ist, bevorzugen wir den. """ api_key = request.headers.get("x-api-key") if api_key: # nur Praefix einsetzen, damit der volle Key nicht in Logs landet return f"key:{api_key[:8]}" fwd = request.headers.get("x-forwarded-for") if fwd: return f"ip:{fwd.split(',')[0].strip()}" return f"ip:{get_remote_address(request)}" limiter = Limiter(key_func=_limit_key, default_limits=[]) # --------------------------------------------------------------------------- # # Auth-Dependency # --------------------------------------------------------------------------- # async def require_api_key(x_api_key: Optional[str] = Header(default=None)) -> None: """Prueft den API-Key timing-safe gegen die Konfiguration.""" if not COLLECTOR_API_KEY: # Fail-closed: wenn kein Key konfiguriert ist, ist die API gesperrt. logger.error( "COLLECTOR_API_KEY ist nicht gesetzt - alle Schreibzugriffe blockiert." ) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Service not configured", ) if not x_api_key or not secrets.compare_digest(x_api_key, COLLECTOR_API_KEY): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or missing API key", ) # --------------------------------------------------------------------------- # # Pydantic Models mit Plausibilitaetspruefung # --------------------------------------------------------------------------- # class WeatherDataInput(BaseModel): # extra-Felder verwerfen statt akzeptieren -> kein Pollution model_config = ConfigDict(extra="ignore") # Zeitstempel time: Optional[str] = None datetime: Optional[str] = None dateTime: Optional[int] = None # Aussentemperatur tempOut: Optional[float] = None # Celsius (neu) temperature: Optional[float] = None # Celsius outTemp: Optional[float] = None # Fahrenheit (alt) # Innentemperatur tempIn: Optional[float] = None # Celsius # Aussenfeuchte humOut: Optional[int] = None humidity: Optional[int] = None outHumidity: Optional[float] = None # Innenfeuchte humIn: Optional[int] = None # Luftdruck pressure: Optional[float] = None # hPa barometer: Optional[float] = None # inHg barTrend: Optional[int] = None # Wind windAvg: Optional[float] = None windSpeed: Optional[float] = None wind_speed: Optional[float] = None windGust: Optional[float] = None wind_gust: Optional[float] = None windDir: Optional[float] = None wind_dir: Optional[float] = None # Niederschlag rain: Optional[float] = None rainRate: Optional[float] = None rain_rate: Optional[float] = None # Vorhersage forecast: Optional[int] = None # ---- Validatoren ----------------------------------------------------- @field_validator("tempOut", "temperature", "tempIn") @classmethod def _temp_celsius_range(cls, v: Optional[float]) -> Optional[float]: if v is not None and not (-90.0 <= v <= 70.0): raise ValueError("temperature out of plausible range (Celsius)") return v @field_validator("outTemp") @classmethod def _temp_fahrenheit_range(cls, v: Optional[float]) -> Optional[float]: if v is not None and not (-130.0 <= v <= 160.0): raise ValueError("outTemp out of plausible range (Fahrenheit)") return v @field_validator("humOut", "humidity", "humIn") @classmethod def _humidity_int_range(cls, v: Optional[int]) -> Optional[int]: if v is not None and not (0 <= v <= 100): raise ValueError("humidity out of range") return v @field_validator("outHumidity") @classmethod def _humidity_float_range(cls, v: Optional[float]) -> Optional[float]: if v is not None and not (0.0 <= v <= 100.0): raise ValueError("outHumidity out of range") return v @field_validator("pressure") @classmethod def _pressure_hpa_range(cls, v: Optional[float]) -> Optional[float]: if v is not None and not (800.0 <= v <= 1100.0): raise ValueError("pressure (hPa) out of plausible range") return v @field_validator("barometer") @classmethod def _pressure_inhg_range(cls, v: Optional[float]) -> Optional[float]: if v is not None and not (23.0 <= v <= 32.5): raise ValueError("barometer (inHg) out of plausible range") return v @field_validator("windAvg", "windSpeed", "wind_speed", "windGust", "wind_gust") @classmethod def _wind_speed_range(cls, v: Optional[float]) -> Optional[float]: if v is not None and not (0.0 <= v <= 120.0): raise ValueError("wind speed out of plausible range") return v @field_validator("windDir", "wind_dir") @classmethod def _wind_dir_range(cls, v: Optional[float]) -> Optional[float]: if v is not None and not (0.0 <= v <= 360.0): raise ValueError("wind_dir out of range") return v @field_validator("rain", "rainRate", "rain_rate") @classmethod def _rain_range(cls, v: Optional[float]) -> Optional[float]: if v is not None and not (0.0 <= v <= 1000.0): raise ValueError("rain value out of plausible range") return v # ---- Konvertierungen ------------------------------------------------- def get_datetime_string(self) -> str: if self.time: return self.time if self.datetime: return self.datetime if self.dateTime is not None: # Plausibilitaet: 2000-01-01 .. 2100-01-01 if not (946684800 <= self.dateTime <= 4102444800): raise ValueError("dateTime timestamp out of plausible range") return datetime.fromtimestamp(self.dateTime).strftime("%Y-%m-%d %H:%M:%S") raise ValueError("Kein Zeitstempel vorhanden (time, datetime oder dateTime)") def get_temperature_celsius(self) -> Optional[float]: if self.tempOut is not None: return self.tempOut if self.temperature is not None: return self.temperature if self.outTemp is not None: return (self.outTemp - 32) * 5 / 9 return None def get_temp_in(self) -> Optional[float]: return self.tempIn def get_humidity_int(self) -> Optional[int]: if self.humOut is not None: return int(self.humOut) if self.humidity is not None: return int(self.humidity) if self.outHumidity is not None: return int(self.outHumidity) return None def get_humidity_in(self) -> Optional[int]: return int(self.humIn) if self.humIn is not None else None def get_pressure_hpa(self) -> Optional[float]: if self.pressure is not None: return self.pressure if self.barometer is not None: return self.barometer * 33.8639 return None def get_wind_speed(self) -> Optional[float]: if self.windAvg is not None: return self.windAvg if self.windSpeed is not None: return self.windSpeed return self.wind_speed def get_wind_gust(self) -> Optional[float]: return self.windGust if self.windGust is not None else self.wind_gust def get_wind_dir(self) -> Optional[float]: return self.windDir if self.windDir is not None else self.wind_dir def get_rain_rate(self) -> Optional[float]: return self.rainRate if self.rainRate is not None else self.rain_rate # --------------------------------------------------------------------------- # # Datenbank-Setup # --------------------------------------------------------------------------- # def setup_database() -> None: """Tabelle, fehlende Spalten und Index anlegen (idempotent).""" assert pool is not None with pool.connection() as conn: with conn.cursor() as cursor: cursor.execute( """ CREATE TABLE IF NOT EXISTS weather_data ( id SERIAL PRIMARY KEY, datetime TIMESTAMPTZ NOT NULL, temperature FLOAT, humidity INTEGER, pressure FLOAT, wind_speed FLOAT, wind_gust FLOAT, wind_dir FLOAT, rain FLOAT, rain_rate FLOAT, received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(datetime) ) """ ) cursor.execute( "ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS temp_in FLOAT" ) cursor.execute( "ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS humidity_in INTEGER" ) cursor.execute( "ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS forecast INTEGER" ) cursor.execute( "ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS bar_trend INTEGER" ) cursor.execute( "CREATE INDEX IF NOT EXISTS idx_weather_datetime_desc " "ON weather_data (datetime DESC)" ) conn.commit() logger.info("Tabelle weather_data und Index bereit") # --------------------------------------------------------------------------- # # FastAPI Lifespan # --------------------------------------------------------------------------- # @asynccontextmanager async def lifespan(app: FastAPI): global pool # Pflicht-Variablen pruefen — fail fast missing = [v for v in ("DB_USER", "DB_PASSWORD") if not os.getenv(v)] if missing: raise RuntimeError( f"Fehlende Umgebungsvariablen: {', '.join(missing)}" ) if not COLLECTOR_API_KEY: raise RuntimeError( "COLLECTOR_API_KEY ist nicht gesetzt. " "Mindestens 32 Zeichen empfohlen (z.B. via 'openssl rand -hex 32')." ) if len(COLLECTOR_API_KEY) < 16: raise RuntimeError( "COLLECTOR_API_KEY ist zu kurz (Minimum 16 Zeichen)." ) pool = ConnectionPool( conninfo=_build_conninfo(), min_size=1, max_size=5, timeout=10, kwargs={"autocommit": False}, ) pool.wait() logger.info("Connection Pool initialisiert (min=1, max=5)") setup_database() logger.info("Collector laeuft auf Port %d (env=%s)", COLLECTOR_PORT, ENVIRONMENT) try: yield finally: if pool is not None: pool.close() logger.info("Connection Pool geschlossen") # --------------------------------------------------------------------------- # # FastAPI App # --------------------------------------------------------------------------- # app = FastAPI( title="Weather Data Collector API", docs_url="/docs" if IS_DEV else None, redoc_url=None, openapi_url="/openapi.json" if IS_DEV else None, lifespan=lifespan, ) # Rate-Limiter an die App binden app.state.limiter = limiter @app.exception_handler(RateLimitExceeded) async def _rate_limit_handler(request: Request, exc: RateLimitExceeded): return JSONResponse( status_code=status.HTTP_429_TOO_MANY_REQUESTS, content={"detail": "Too many requests"}, ) @app.exception_handler(RequestValidationError) async def _validation_handler(request: Request, exc: RequestValidationError): # Details ins Log, generische Antwort an den Client. logger.warning("Validation error on %s: %s", request.url.path, exc.errors()) return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={"detail": "Validation error"}, ) @app.exception_handler(Exception) async def _unhandled_handler(request: Request, exc: Exception): # NIE Stacktraces oder str(exc) an den Client zurueckgeben. logger.exception("Unhandled error on %s", request.url.path) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"detail": "Internal server error"}, ) # --------------------------------------------------------------------------- # # Body-Size-Middleware # --------------------------------------------------------------------------- # @app.middleware("http") async def _limit_body_size(request: Request, call_next): if request.method in ("POST", "PUT", "PATCH"): cl = request.headers.get("content-length") if cl is not None: try: if int(cl) > MAX_BODY_BYTES: return JSONResponse( status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, content={"detail": "Payload too large"}, ) except ValueError: return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"detail": "Invalid Content-Length"}, ) return await call_next(request) # --------------------------------------------------------------------------- # # Endpoints # --------------------------------------------------------------------------- # @app.get("/") async def root(): """Info-Endpunkt (kein Auth noetig).""" return { "service": "Weather Data Collector", "version": "2.0.0", "endpoint": "POST /weather (X-API-Key required)", } @app.get("/health") async def health_check(): """Health-Check ohne Auth, aber ohne sensitive Details.""" try: assert pool is not None with pool.connection() as conn: with conn.cursor() as cursor: cursor.execute("SELECT 1") return {"status": "healthy"} except Exception: logger.exception("Health-Check fehlgeschlagen") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Service unavailable", ) def _store_weather(data: WeatherDataInput) -> dict: """Schreibt einen Datenpunkt; setzt voraus, dass `data` validiert ist.""" assert pool is not None dt_string = data.get_datetime_string() values = ( dt_string, data.get_temperature_celsius(), data.get_temp_in(), data.get_humidity_int(), data.get_humidity_in(), data.get_pressure_hpa(), data.barTrend, data.get_wind_speed(), data.get_wind_gust(), data.get_wind_dir(), data.rain, data.get_rain_rate(), data.forecast, ) with pool.connection() as conn: with conn.cursor() as cursor: cursor.execute( """ INSERT INTO weather_data (datetime, temperature, temp_in, humidity, humidity_in, pressure, bar_trend, wind_speed, wind_gust, wind_dir, rain, rain_rate, forecast) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (datetime) DO UPDATE SET temperature = EXCLUDED.temperature, temp_in = EXCLUDED.temp_in, humidity = EXCLUDED.humidity, humidity_in = EXCLUDED.humidity_in, pressure = EXCLUDED.pressure, bar_trend = EXCLUDED.bar_trend, wind_speed = EXCLUDED.wind_speed, wind_gust = EXCLUDED.wind_gust, wind_dir = EXCLUDED.wind_dir, rain = EXCLUDED.rain, rain_rate = EXCLUDED.rain_rate, forecast = EXCLUDED.forecast """, values, ) conn.commit() logger.info("Datenpunkt gespeichert fuer %s", dt_string) return {"status": "success", "datetime": dt_string} @app.post("/weather", dependencies=[Depends(require_api_key)]) @limiter.limit(RATE_LIMIT) async def receive_weather_data(request: Request, data: WeatherDataInput): """Wetterdaten empfangen und speichern (Auth + Rate-Limit).""" try: return _store_weather(data) except ValueError as e: # Konvertierungs-Fehler (z.B. fehlender Zeitstempel) logger.warning("Bad request on /weather: %s", e) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid input", ) except psycopg.Error: logger.exception("DB-Fehler beim Speichern") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Storage unavailable", ) @app.post("/", dependencies=[Depends(require_api_key)]) @limiter.limit(RATE_LIMIT) async def root_post(request: Request): """Alias fuer POST /weather (Auth + Rate-Limit).""" body = await request.body() if len(body) > MAX_BODY_BYTES: raise HTTPException( status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail="Payload too large", ) try: data_dict = json.loads(body.decode("utf-8")) except (UnicodeDecodeError, json.JSONDecodeError): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid JSON", ) try: data = WeatherDataInput(**data_dict) except Exception: # Pydantic-Fehler enthalten ggf. Werte aus dem Body — nicht durchreichen. logger.warning("Validation failed on POST /") raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Validation error", ) try: return _store_weather(data) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid input", ) except psycopg.Error: logger.exception("DB-Fehler beim Speichern") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Storage unavailable", ) # Debug-Endpunkt nur in DEV-Modus + nur mit API-Key if IS_DEV: @app.post("/debug", dependencies=[Depends(require_api_key)]) async def debug_post(request: Request): body = await request.body() if len(body) > MAX_BODY_BYTES: raise HTTPException( status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail="Payload too large", ) try: payload = json.loads(body.decode("utf-8")) except Exception: raise HTTPException(status_code=400, detail="Invalid JSON") logger.debug("Debug payload: %s", payload) return {"status": "logged"} # --------------------------------------------------------------------------- # # Entry-Point # --------------------------------------------------------------------------- # def main() -> None: uvicorn.run( app, host="0.0.0.0", port=COLLECTOR_PORT, # In Produktion liegt Traefik davor und terminiert TLS. # X-Forwarded-* nur dann auswerten, wenn man dem Proxy vertraut. proxy_headers=True, forwarded_allow_ips="*", ) if __name__ == "__main__": main()