from fastapi import FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field, ConfigDict from typing import List, Optional from datetime import datetime, timedelta import os from pathlib import Path from dotenv import load_dotenv import psycopg from psycopg.rows import dict_row import logging # Logging konfigurieren logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Umgebungsvariablen laden env_path = Path(__file__).parent.parent / '.env' load_dotenv(dotenv_path=env_path) # Datenbank-Konfiguration DB_HOST = os.getenv('DB_HOST', 'localhost') DB_PORT = int(os.getenv('DB_PORT', 5432)) DB_NAME = os.getenv('DB_NAME', 'wetterstation') DB_USER = os.getenv('DB_USER') DB_PASSWORD = os.getenv('DB_PASSWORD') # FastAPI App erstellen app = FastAPI( title="Wetterstation API", description="API zum Auslesen von Wetterdaten", version="1.0.0" ) # CORS Middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Pydantic Models class WeatherData(BaseModel): model_config = ConfigDict(from_attributes=True) id: int datetime: datetime temperature: Optional[float] = None humidity: Optional[int] = None pressure: Optional[float] = None wind_speed: Optional[float] = None wind_gust: Optional[float] = None wind_dir: Optional[float] = None rain: Optional[float] = None rain_rate: Optional[float] = None received_at: datetime class WeatherStats(BaseModel): avg_temperature: Optional[float] = None min_temperature: Optional[float] = None max_temperature: Optional[float] = None avg_humidity: Optional[float] = None avg_pressure: Optional[float] = None avg_wind_speed: Optional[float] = None max_wind_gust: Optional[float] = None total_rain: Optional[float] = None data_points: int class HealthResponse(BaseModel): status: str database: str timestamp: datetime # Datenbankverbindung def get_db_connection(): """Erstellt eine Datenbankverbindung""" try: conn = psycopg.connect( host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, row_factory=dict_row ) return conn except Exception as e: logger.error(f"Datenbankverbindungsfehler: {e}") raise HTTPException(status_code=500, detail="Datenbankverbindung fehlgeschlagen") # API Endpoints @app.get("/", tags=["General"]) async def root(): """Root Endpoint""" return { "message": "Wetterstation API", "version": "1.0.0", "docs": "/docs" } @app.get("/health", response_model=HealthResponse, tags=["General"]) async def health_check(): """Health Check Endpoint""" try: conn = get_db_connection() with conn.cursor() as cursor: cursor.execute("SELECT 1") conn.close() db_status = "connected" except Exception: db_status = "disconnected" return { "status": "ok" if db_status == "connected" else "error", "database": db_status, "timestamp": datetime.now() } @app.get("/weather/latest", response_model=WeatherData, tags=["Weather Data"]) async def get_latest_weather(): """Gibt die neuesten Wetterdaten zurück""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT id, datetime, temperature, humidity, pressure, wind_speed * 1.60934 as wind_speed, wind_gust * 1.60934 as wind_gust, wind_dir, rain, rain_rate, received_at FROM weather_data ORDER BY datetime DESC LIMIT 1 """) result = cursor.fetchone() if not result: raise HTTPException(status_code=404, detail="Keine Daten verfügbar") return dict(result) finally: conn.close() @app.get("/weather/current", response_model=WeatherData, tags=["Weather Data"]) async def get_current_weather(): """Alias für /weather/latest - gibt aktuelle Wetterdaten zurück""" return await get_latest_weather() @app.get("/weather/history", response_model=List[WeatherData], tags=["Weather Data"]) async def get_weather_history( hours: int = Query(24, ge=1, le=168, description="Anzahl Stunden zurück (max 168 = 7 Tage)"), limit: int = Query(1000, ge=1, le=10000, description="Maximale Anzahl Datensätze") ): """Gibt historische Wetterdaten der letzten X Stunden zurück""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT id, datetime, temperature, humidity, pressure, wind_speed * 1.60934 as wind_speed, wind_gust * 1.60934 as wind_gust, wind_dir, rain, rain_rate, received_at FROM weather_data WHERE datetime >= NOW() - make_interval(hours => %s) ORDER BY datetime DESC LIMIT %s """, (hours, limit)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() @app.get("/weather/range", response_model=List[WeatherData], tags=["Weather Data"]) async def get_weather_by_date_range( start: datetime = Query(..., description="Startdatum (ISO 8601)"), end: datetime = Query(..., description="Enddatum (ISO 8601)"), limit: int = Query(10000, ge=1, le=50000, description="Maximale Anzahl Datensätze") ): """Gibt Wetterdaten für einen bestimmten Zeitraum zurück""" if start >= end: raise HTTPException(status_code=400, detail="Startdatum muss vor Enddatum liegen") conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT * FROM weather_data WHERE datetime BETWEEN %s AND %s ORDER BY datetime ASC LIMIT %s """, (start, end, limit)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() @app.get("/weather/stats", response_model=WeatherStats, tags=["Statistics"]) async def get_weather_statistics( hours: int = Query(24, ge=1, le=168, description="Zeitraum in Stunden für Statistiken") ): """Gibt aggregierte Statistiken für den angegebenen Zeitraum zurück""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT AVG(temperature) as avg_temperature, MIN(temperature) as min_temperature, MAX(temperature) as max_temperature, AVG(humidity) as avg_humidity, AVG(pressure) as avg_pressure, AVG(wind_speed * 1.60934) as avg_wind_speed, MAX(wind_gust * 1.60934) as max_wind_gust, SUM(rain) as total_rain, COUNT(*) as data_points FROM weather_data WHERE datetime >= NOW() - make_interval(hours => %s) """, (hours,)) result = cursor.fetchone() if not result or result['data_points'] == 0: raise HTTPException(status_code=404, detail="Keine Daten für den Zeitraum verfügbar") return dict(result) finally: conn.close() @app.get("/weather/daily", response_model=List[WeatherStats], tags=["Statistics"]) async def get_daily_statistics( days: int = Query(7, ge=1, le=90, description="Anzahl Tage zurück (max 90)") ): """Gibt tägliche Statistiken für die letzten X Tage zurück""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT DATE(datetime) as date, AVG(temperature) as avg_temperature, MIN(temperature) as min_temperature, MAX(temperature) as max_temperature, AVG(humidity) as avg_humidity, AVG(pressure) as avg_pressure, AVG(wind_speed * 1.60934) as avg_wind_speed, MAX(wind_gust * 1.60934) as max_wind_gust, SUM(rain) as total_rain, COUNT(*) as data_points FROM weather_data WHERE datetime >= NOW() - make_interval(days => %s) GROUP BY DATE(datetime) ORDER BY date DESC """, (days,)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() @app.get("/weather/temperature", response_model=List[dict], tags=["Weather Data"]) async def get_temperature_data( hours: int = Query(24, ge=1, le=168, description="Anzahl Stunden zurück") ): """Gibt nur Temperatur-Zeitreihen zurück (optimiert für Diagramme)""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT datetime, temperature FROM weather_data WHERE datetime >= NOW() - make_interval(hours => %s) AND temperature IS NOT NULL ORDER BY datetime ASC """, (hours,)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() @app.get("/weather/wind", response_model=List[dict], tags=["Weather Data"]) async def get_wind_data( hours: int = Query(24, ge=1, le=168, description="Anzahl Stunden zurück") ): """Gibt nur Wind-Daten zurück (Geschwindigkeit, Richtung, Böen)""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT datetime, wind_speed * 1.60934 as wind_speed, wind_gust * 1.60934 as wind_gust, wind_dir FROM weather_data WHERE datetime >= NOW() - make_interval(hours => %s) ORDER BY datetime ASC """, (hours,)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() @app.get("/weather/rain", response_model=List[dict], tags=["Weather Data"]) async def get_rain_data( hours: int = Query(24, ge=1, le=168, description="Anzahl Stunden zurück") ): """Gibt nur Regen-Daten zurück""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT datetime, rain, rain_rate FROM weather_data WHERE datetime >= NOW() - make_interval(hours => %s) ORDER BY datetime ASC """, (hours,)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() @app.get("/weather/hourly-aggregated", response_model=List[WeatherData], tags=["Aggregated Data"]) async def get_hourly_aggregated_data( days: int = Query(7, ge=1, le=60, description="Anzahl Tage zurück (max 60)") ): """Gibt stündlich aggregierte Wetterdaten zurück (Stundenmittel)""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT 0 as id, date_trunc('hour', datetime) as datetime, AVG(temperature) as temperature, ROUND(AVG(humidity)) as humidity, AVG(pressure) as pressure, AVG(wind_speed * 1.60934) as wind_speed, MAX(wind_gust * 1.60934) as wind_gust, AVG(wind_dir) as wind_dir, AVG(rain) as rain, AVG(rain_rate) as rain_rate, MAX(received_at) as received_at FROM weather_data WHERE datetime >= NOW() - make_interval(days => %s) GROUP BY date_trunc('hour', datetime) ORDER BY datetime ASC """, (days,)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() @app.get("/weather/daily-aggregated", response_model=List[dict], tags=["Aggregated Data"]) async def get_daily_aggregated_data( days: int = Query(365, ge=1, le=730, description="Anzahl Tage zurück (max 730)") ): """Gibt täglich aggregierte Wetterdaten zurück (Tagesmittel mit Min/Max-Temperaturen)""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT date_trunc('day', datetime) as datetime, AVG(temperature)::float as temperature, MIN(temperature)::float as min_temperature, MAX(temperature)::float as max_temperature, ROUND(AVG(humidity))::int as humidity, MIN(humidity)::int as min_humidity, MAX(humidity)::int as max_humidity, AVG(pressure)::float as pressure, MIN(pressure)::float as min_pressure, MAX(pressure)::float as max_pressure, AVG(wind_speed * 1.60934)::float as wind_speed, MAX(wind_gust * 1.60934)::float as wind_gust, AVG(wind_dir)::float as wind_dir, SUM(rain)::float as total_rain FROM weather_data WHERE datetime >= NOW() - make_interval(days => %s) GROUP BY date_trunc('day', datetime) ORDER BY datetime ASC """, (days,)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() @app.get("/weather/daily-with-minmax", response_model=List[dict], tags=["Aggregated Data"]) async def get_daily_with_minmax_data( days: int = Query(30, ge=1, le=90, description="Anzahl Tage zurück (max 90)") ): """Gibt täglich aggregierte Wetterdaten mit Min/Max-Temperaturen zurück""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT date_trunc('day', datetime) as datetime, AVG(temperature)::float as temperature, MIN(temperature)::float as min_temperature, MAX(temperature)::float as max_temperature, ROUND(AVG(humidity))::int as humidity, MIN(humidity)::int as min_humidity, MAX(humidity)::int as max_humidity, AVG(pressure)::float as pressure, MIN(pressure)::float as min_pressure, MAX(pressure)::float as max_pressure, AVG(wind_speed * 1.60934)::float as wind_speed, MAX(wind_gust * 1.60934)::float as wind_gust, AVG(wind_dir)::float as wind_dir, SUM(rain)::float as total_rain FROM weather_data WHERE datetime >= NOW() - make_interval(days => %s) GROUP BY date_trunc('day', datetime) ORDER BY datetime ASC """, (days,)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() @app.get("/weather/rain-daily", response_model=List[dict], tags=["Aggregated Data"]) async def get_daily_rain_data( days: int = Query(30, ge=1, le=365, description="Anzahl Tage zurück") ): """Gibt tägliche Regensummen zurück""" conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT date_trunc('day', datetime) as date, SUM(rain) as total_rain FROM weather_data WHERE datetime >= NOW() - make_interval(days => %s) GROUP BY date_trunc('day', datetime) ORDER BY date ASC """, (days,)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() @app.get("/weather/rain-weekly", response_model=List[dict], tags=["Aggregated Data"]) async def get_weekly_rain_data( days: int = Query(365, ge=1, le=730, description="Anzahl Tage zurück") ): """Gibt wöchentliche Regensummen zurück (Woche = Mo-So)""" conn = get_db_connection() try: with conn.cursor() as cursor: # Bei 365 Tagen: alle verfügbaren Daten zurückgeben if days >= 365: cursor.execute(""" SELECT date_trunc('week', datetime) as week_start, SUM(rain) as total_rain FROM weather_data GROUP BY date_trunc('week', datetime) ORDER BY week_start ASC """) else: cursor.execute(""" SELECT date_trunc('week', datetime) as week_start, SUM(rain) as total_rain FROM weather_data WHERE datetime >= NOW() - make_interval(days => %s) GROUP BY date_trunc('week', datetime) ORDER BY week_start ASC """, (days,)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() @app.get("/weather/hourly-aggregated-range", response_model=List[dict], tags=["Aggregated Data"]) async def get_hourly_aggregated_range( start: datetime = Query(..., description="Startdatum (ISO 8601)"), end: datetime = Query(..., description="Enddatum (ISO 8601)") ): """Gibt stündlich aggregierte Wetterdaten für einen bestimmten Zeitraum zurück""" if start >= end: raise HTTPException(status_code=400, detail="Startdatum muss vor Enddatum liegen") conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT date_trunc('hour', datetime) as datetime, AVG(temperature)::float as temperature, ROUND(AVG(humidity))::int as humidity, AVG(pressure)::float as pressure, AVG(wind_speed * 1.60934)::float as wind_speed, MAX(wind_gust * 1.60934)::float as wind_gust, AVG(wind_dir)::float as wind_dir FROM weather_data WHERE datetime BETWEEN %s AND %s GROUP BY date_trunc('hour', datetime) ORDER BY datetime ASC """, (start, end)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() @app.get("/weather/daily-aggregated-range", response_model=List[dict], tags=["Aggregated Data"]) async def get_daily_aggregated_range( start: datetime = Query(..., description="Startdatum (ISO 8601)"), end: datetime = Query(..., description="Enddatum (ISO 8601)") ): """Gibt täglich aggregierte Wetterdaten mit Min/Max-Temperaturen für einen bestimmten Zeitraum zurück""" if start >= end: raise HTTPException(status_code=400, detail="Startdatum muss vor Enddatum liegen") conn = get_db_connection() try: with conn.cursor() as cursor: cursor.execute(""" SELECT date_trunc('day', datetime) as datetime, AVG(temperature)::float as temperature, MIN(temperature)::float as min_temperature, MAX(temperature)::float as max_temperature, ROUND(AVG(humidity))::int as humidity, MIN(humidity)::int as min_humidity, MAX(humidity)::int as max_humidity, AVG(pressure)::float as pressure, MIN(pressure)::float as min_pressure, MAX(pressure)::float as max_pressure, AVG(wind_speed * 1.60934)::float as wind_speed, MAX(wind_gust * 1.60934)::float as wind_gust, AVG(wind_dir)::float as wind_dir, SUM(rain)::float as total_rain FROM weather_data WHERE datetime BETWEEN %s AND %s GROUP BY date_trunc('day', datetime) ORDER BY datetime ASC """, (start, end)) results = cursor.fetchall() return [dict(row) for row in results] finally: conn.close() if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)