Files

635 lines
21 KiB
Python

# HTTP API that receives weather data via POST and stores in PostgreSQL
import os
import json
import logging
import secrets
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request, Header, Depends, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import BaseModel, ConfigDict, field_validator
import psycopg
from psycopg_pool import ConnectionPool
from slowapi import Limiter
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
import uvicorn
# --------------------------------------------------------------------------- #
# Konfiguration
# --------------------------------------------------------------------------- #
env_path = Path(__file__).parent.parent / ".env"
load_dotenv(dotenv_path=env_path)
COLLECTOR_PORT = int(os.getenv("COLLECTOR_PORT", 8001))
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", 5432))
DB_NAME = os.getenv("DB_NAME", "wetterstation")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
# Sicherheit
COLLECTOR_API_KEY = os.getenv("COLLECTOR_API_KEY")
ENVIRONMENT = os.getenv("ENVIRONMENT", "production").lower()
IS_DEV = ENVIRONMENT in ("dev", "development", "local")
# Limits
MAX_BODY_BYTES = int(os.getenv("COLLECTOR_MAX_BODY_BYTES", 16 * 1024)) # 16 KiB
RATE_LIMIT = os.getenv("COLLECTOR_RATE_LIMIT", "30/minute")
# Logging — keine Rohdaten auf INFO mehr
logging.basicConfig(
level=logging.DEBUG if IS_DEV else logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# --------------------------------------------------------------------------- #
# Connection Pool
# --------------------------------------------------------------------------- #
def _build_conninfo() -> str:
return (
f"host={DB_HOST} port={DB_PORT} dbname={DB_NAME} "
f"user={DB_USER} password={DB_PASSWORD}"
)
pool: Optional[ConnectionPool] = None
# --------------------------------------------------------------------------- #
# Rate-Limiter
# --------------------------------------------------------------------------- #
def _limit_key(request: Request) -> str:
"""Rate-Limit-Key: bei API-Key danach, sonst nach IP.
Hinter Traefik nutzt slowapi standardmaessig die Peer-IP, was der
Proxy-IP entspricht. Wenn ein API-Key da ist, bevorzugen wir den.
"""
api_key = request.headers.get("x-api-key")
if api_key:
# nur Praefix einsetzen, damit der volle Key nicht in Logs landet
return f"key:{api_key[:8]}"
fwd = request.headers.get("x-forwarded-for")
if fwd:
return f"ip:{fwd.split(',')[0].strip()}"
return f"ip:{get_remote_address(request)}"
limiter = Limiter(key_func=_limit_key, default_limits=[])
# --------------------------------------------------------------------------- #
# Auth-Dependency
# --------------------------------------------------------------------------- #
async def require_api_key(x_api_key: Optional[str] = Header(default=None)) -> None:
"""Prueft den API-Key timing-safe gegen die Konfiguration."""
if not COLLECTOR_API_KEY:
# Fail-closed: wenn kein Key konfiguriert ist, ist die API gesperrt.
logger.error(
"COLLECTOR_API_KEY ist nicht gesetzt - alle Schreibzugriffe blockiert."
)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Service not configured",
)
if not x_api_key or not secrets.compare_digest(x_api_key, COLLECTOR_API_KEY):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing API key",
)
# --------------------------------------------------------------------------- #
# Pydantic Models mit Plausibilitaetspruefung
# --------------------------------------------------------------------------- #
class WeatherDataInput(BaseModel):
# extra-Felder verwerfen statt akzeptieren -> kein Pollution
model_config = ConfigDict(extra="ignore")
# Zeitstempel
time: Optional[str] = None
datetime: Optional[str] = None
dateTime: Optional[int] = None
# Aussentemperatur
tempOut: Optional[float] = None # Celsius (neu)
temperature: Optional[float] = None # Celsius
outTemp: Optional[float] = None # Fahrenheit (alt)
# Innentemperatur
tempIn: Optional[float] = None # Celsius
# Aussenfeuchte
humOut: Optional[int] = None
humidity: Optional[int] = None
outHumidity: Optional[float] = None
# Innenfeuchte
humIn: Optional[int] = None
# Luftdruck
pressure: Optional[float] = None # hPa
barometer: Optional[float] = None # inHg
barTrend: Optional[int] = None
# Wind
windAvg: Optional[float] = None
windSpeed: Optional[float] = None
wind_speed: Optional[float] = None
windGust: Optional[float] = None
wind_gust: Optional[float] = None
windDir: Optional[float] = None
wind_dir: Optional[float] = None
# Niederschlag
rain: Optional[float] = None
rainRate: Optional[float] = None
rain_rate: Optional[float] = None
# Vorhersage
forecast: Optional[int] = None
# ---- Validatoren -----------------------------------------------------
@field_validator("tempOut", "temperature", "tempIn")
@classmethod
def _temp_celsius_range(cls, v: Optional[float]) -> Optional[float]:
if v is not None and not (-90.0 <= v <= 70.0):
raise ValueError("temperature out of plausible range (Celsius)")
return v
@field_validator("outTemp")
@classmethod
def _temp_fahrenheit_range(cls, v: Optional[float]) -> Optional[float]:
if v is not None and not (-130.0 <= v <= 160.0):
raise ValueError("outTemp out of plausible range (Fahrenheit)")
return v
@field_validator("humOut", "humidity", "humIn")
@classmethod
def _humidity_int_range(cls, v: Optional[int]) -> Optional[int]:
if v is not None and not (0 <= v <= 100):
raise ValueError("humidity out of range")
return v
@field_validator("outHumidity")
@classmethod
def _humidity_float_range(cls, v: Optional[float]) -> Optional[float]:
if v is not None and not (0.0 <= v <= 100.0):
raise ValueError("outHumidity out of range")
return v
@field_validator("pressure")
@classmethod
def _pressure_hpa_range(cls, v: Optional[float]) -> Optional[float]:
if v is not None and not (800.0 <= v <= 1100.0):
raise ValueError("pressure (hPa) out of plausible range")
return v
@field_validator("barometer")
@classmethod
def _pressure_inhg_range(cls, v: Optional[float]) -> Optional[float]:
if v is not None and not (23.0 <= v <= 32.5):
raise ValueError("barometer (inHg) out of plausible range")
return v
@field_validator("windAvg", "windSpeed", "wind_speed", "windGust", "wind_gust")
@classmethod
def _wind_speed_range(cls, v: Optional[float]) -> Optional[float]:
if v is not None and not (0.0 <= v <= 120.0):
raise ValueError("wind speed out of plausible range")
return v
@field_validator("windDir", "wind_dir")
@classmethod
def _wind_dir_range(cls, v: Optional[float]) -> Optional[float]:
if v is not None and not (0.0 <= v <= 360.0):
raise ValueError("wind_dir out of range")
return v
@field_validator("rain", "rainRate", "rain_rate")
@classmethod
def _rain_range(cls, v: Optional[float]) -> Optional[float]:
if v is not None and not (0.0 <= v <= 1000.0):
raise ValueError("rain value out of plausible range")
return v
# ---- Konvertierungen -------------------------------------------------
def get_datetime_string(self) -> str:
if self.time:
return self.time
if self.datetime:
return self.datetime
if self.dateTime is not None:
# Plausibilitaet: 2000-01-01 .. 2100-01-01
if not (946684800 <= self.dateTime <= 4102444800):
raise ValueError("dateTime timestamp out of plausible range")
return datetime.fromtimestamp(self.dateTime).strftime("%Y-%m-%d %H:%M:%S")
raise ValueError("Kein Zeitstempel vorhanden (time, datetime oder dateTime)")
def get_temperature_celsius(self) -> Optional[float]:
if self.tempOut is not None:
return self.tempOut
if self.temperature is not None:
return self.temperature
if self.outTemp is not None:
return (self.outTemp - 32) * 5 / 9
return None
def get_temp_in(self) -> Optional[float]:
return self.tempIn
def get_humidity_int(self) -> Optional[int]:
if self.humOut is not None:
return int(self.humOut)
if self.humidity is not None:
return int(self.humidity)
if self.outHumidity is not None:
return int(self.outHumidity)
return None
def get_humidity_in(self) -> Optional[int]:
return int(self.humIn) if self.humIn is not None else None
def get_pressure_hpa(self) -> Optional[float]:
if self.pressure is not None:
return self.pressure
if self.barometer is not None:
return self.barometer * 33.8639
return None
def get_wind_speed(self) -> Optional[float]:
if self.windAvg is not None:
return self.windAvg
if self.windSpeed is not None:
return self.windSpeed
return self.wind_speed
def get_wind_gust(self) -> Optional[float]:
return self.windGust if self.windGust is not None else self.wind_gust
def get_wind_dir(self) -> Optional[float]:
return self.windDir if self.windDir is not None else self.wind_dir
def get_rain_rate(self) -> Optional[float]:
return self.rainRate if self.rainRate is not None else self.rain_rate
# --------------------------------------------------------------------------- #
# Datenbank-Setup
# --------------------------------------------------------------------------- #
def setup_database() -> None:
"""Tabelle, fehlende Spalten und Index anlegen (idempotent)."""
assert pool is not None
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS weather_data (
id SERIAL PRIMARY KEY,
datetime TIMESTAMPTZ NOT NULL,
temperature FLOAT,
humidity INTEGER,
pressure FLOAT,
wind_speed FLOAT,
wind_gust FLOAT,
wind_dir FLOAT,
rain FLOAT,
rain_rate FLOAT,
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(datetime)
)
"""
)
cursor.execute(
"ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS temp_in FLOAT"
)
cursor.execute(
"ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS humidity_in INTEGER"
)
cursor.execute(
"ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS forecast INTEGER"
)
cursor.execute(
"ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS bar_trend INTEGER"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_weather_datetime_desc "
"ON weather_data (datetime DESC)"
)
conn.commit()
logger.info("Tabelle weather_data und Index bereit")
# --------------------------------------------------------------------------- #
# FastAPI Lifespan
# --------------------------------------------------------------------------- #
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
# Pflicht-Variablen pruefen — fail fast
missing = [v for v in ("DB_USER", "DB_PASSWORD") if not os.getenv(v)]
if missing:
raise RuntimeError(
f"Fehlende Umgebungsvariablen: {', '.join(missing)}"
)
if not COLLECTOR_API_KEY:
raise RuntimeError(
"COLLECTOR_API_KEY ist nicht gesetzt. "
"Mindestens 32 Zeichen empfohlen (z.B. via 'openssl rand -hex 32')."
)
if len(COLLECTOR_API_KEY) < 16:
raise RuntimeError(
"COLLECTOR_API_KEY ist zu kurz (Minimum 16 Zeichen)."
)
pool = ConnectionPool(
conninfo=_build_conninfo(),
min_size=1,
max_size=5,
timeout=10,
kwargs={"autocommit": False},
)
pool.wait()
logger.info("Connection Pool initialisiert (min=1, max=5)")
setup_database()
logger.info("Collector laeuft auf Port %d (env=%s)", COLLECTOR_PORT, ENVIRONMENT)
try:
yield
finally:
if pool is not None:
pool.close()
logger.info("Connection Pool geschlossen")
# --------------------------------------------------------------------------- #
# FastAPI App
# --------------------------------------------------------------------------- #
app = FastAPI(
title="Weather Data Collector API",
docs_url="/docs" if IS_DEV else None,
redoc_url=None,
openapi_url="/openapi.json" if IS_DEV else None,
lifespan=lifespan,
)
# Rate-Limiter an die App binden
app.state.limiter = limiter
@app.exception_handler(RateLimitExceeded)
async def _rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={"detail": "Too many requests"},
)
@app.exception_handler(RequestValidationError)
async def _validation_handler(request: Request, exc: RequestValidationError):
# Details ins Log, generische Antwort an den Client.
logger.warning("Validation error on %s: %s", request.url.path, exc.errors())
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"detail": "Validation error"},
)
@app.exception_handler(Exception)
async def _unhandled_handler(request: Request, exc: Exception):
# NIE Stacktraces oder str(exc) an den Client zurueckgeben.
logger.exception("Unhandled error on %s", request.url.path)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": "Internal server error"},
)
# --------------------------------------------------------------------------- #
# Body-Size-Middleware
# --------------------------------------------------------------------------- #
@app.middleware("http")
async def _limit_body_size(request: Request, call_next):
if request.method in ("POST", "PUT", "PATCH"):
cl = request.headers.get("content-length")
if cl is not None:
try:
if int(cl) > MAX_BODY_BYTES:
return JSONResponse(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
content={"detail": "Payload too large"},
)
except ValueError:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={"detail": "Invalid Content-Length"},
)
return await call_next(request)
# --------------------------------------------------------------------------- #
# Endpoints
# --------------------------------------------------------------------------- #
@app.get("/")
async def root():
"""Info-Endpunkt (kein Auth noetig)."""
return {
"service": "Weather Data Collector",
"version": "2.0.0",
"endpoint": "POST /weather (X-API-Key required)",
}
@app.get("/health")
async def health_check():
"""Health-Check ohne Auth, aber ohne sensitive Details."""
try:
assert pool is not None
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
return {"status": "healthy"}
except Exception:
logger.exception("Health-Check fehlgeschlagen")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Service unavailable",
)
def _store_weather(data: WeatherDataInput) -> dict:
"""Schreibt einen Datenpunkt; setzt voraus, dass `data` validiert ist."""
assert pool is not None
dt_string = data.get_datetime_string()
values = (
dt_string,
data.get_temperature_celsius(),
data.get_temp_in(),
data.get_humidity_int(),
data.get_humidity_in(),
data.get_pressure_hpa(),
data.barTrend,
data.get_wind_speed(),
data.get_wind_gust(),
data.get_wind_dir(),
data.rain,
data.get_rain_rate(),
data.forecast,
)
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO weather_data
(datetime, temperature, temp_in, humidity, humidity_in,
pressure, bar_trend, wind_speed, wind_gust, wind_dir,
rain, rain_rate, forecast)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (datetime) DO UPDATE SET
temperature = EXCLUDED.temperature,
temp_in = EXCLUDED.temp_in,
humidity = EXCLUDED.humidity,
humidity_in = EXCLUDED.humidity_in,
pressure = EXCLUDED.pressure,
bar_trend = EXCLUDED.bar_trend,
wind_speed = EXCLUDED.wind_speed,
wind_gust = EXCLUDED.wind_gust,
wind_dir = EXCLUDED.wind_dir,
rain = EXCLUDED.rain,
rain_rate = EXCLUDED.rain_rate,
forecast = EXCLUDED.forecast
""",
values,
)
conn.commit()
logger.info("Datenpunkt gespeichert fuer %s", dt_string)
return {"status": "success", "datetime": dt_string}
@app.post("/weather", dependencies=[Depends(require_api_key)])
@limiter.limit(RATE_LIMIT)
async def receive_weather_data(request: Request, data: WeatherDataInput):
"""Wetterdaten empfangen und speichern (Auth + Rate-Limit)."""
try:
return _store_weather(data)
except ValueError as e:
# Konvertierungs-Fehler (z.B. fehlender Zeitstempel)
logger.warning("Bad request on /weather: %s", e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid input",
)
except psycopg.Error:
logger.exception("DB-Fehler beim Speichern")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Storage unavailable",
)
@app.post("/", dependencies=[Depends(require_api_key)])
@limiter.limit(RATE_LIMIT)
async def root_post(request: Request):
"""Alias fuer POST /weather (Auth + Rate-Limit)."""
body = await request.body()
if len(body) > MAX_BODY_BYTES:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="Payload too large",
)
try:
data_dict = json.loads(body.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid JSON",
)
try:
data = WeatherDataInput(**data_dict)
except Exception:
# Pydantic-Fehler enthalten ggf. Werte aus dem Body — nicht durchreichen.
logger.warning("Validation failed on POST /")
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Validation error",
)
try:
return _store_weather(data)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid input",
)
except psycopg.Error:
logger.exception("DB-Fehler beim Speichern")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Storage unavailable",
)
# Debug-Endpunkt nur in DEV-Modus + nur mit API-Key
if IS_DEV:
@app.post("/debug", dependencies=[Depends(require_api_key)])
async def debug_post(request: Request):
body = await request.body()
if len(body) > MAX_BODY_BYTES:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="Payload too large",
)
try:
payload = json.loads(body.decode("utf-8"))
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON")
logger.debug("Debug payload: %s", payload)
return {"status": "logged"}
# --------------------------------------------------------------------------- #
# Entry-Point
# --------------------------------------------------------------------------- #
def main() -> None:
uvicorn.run(
app,
host="0.0.0.0",
port=COLLECTOR_PORT,
# In Produktion liegt Traefik davor und terminiert TLS.
# X-Forwarded-* nur dann auswerten, wenn man dem Proxy vertraut.
proxy_headers=True,
forwarded_allow_ips="*",
)
if __name__ == "__main__":
main()