Files
wetterstation/api/main.py

338 lines
11 KiB
Python

from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional
from datetime import datetime, timedelta
import os
from pathlib import Path
from dotenv import load_dotenv
import psycopg
from psycopg.rows import dict_row
import logging
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Umgebungsvariablen laden
env_path = Path(__file__).parent.parent / '.env'
load_dotenv(dotenv_path=env_path)
# Datenbank-Konfiguration
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', 5432))
DB_NAME = os.getenv('DB_NAME', 'wetterstation')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
# FastAPI App erstellen
app = FastAPI(
title="Wetterstation API",
description="API zum Auslesen von Wetterdaten",
version="1.0.0"
)
# CORS Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic Models
class WeatherData(BaseModel):
id: int
datetime: datetime
temperature: Optional[float] = None
humidity: Optional[int] = None
pressure: Optional[float] = None
wind_speed: Optional[float] = None
wind_gust: Optional[float] = None
wind_dir: Optional[float] = None
rain: Optional[float] = None
rain_rate: Optional[float] = None
received_at: datetime
class Config:
from_attributes = True
class WeatherStats(BaseModel):
avg_temperature: Optional[float] = None
min_temperature: Optional[float] = None
max_temperature: Optional[float] = None
avg_humidity: Optional[float] = None
avg_pressure: Optional[float] = None
avg_wind_speed: Optional[float] = None
max_wind_gust: Optional[float] = None
total_rain: Optional[float] = None
data_points: int
class HealthResponse(BaseModel):
status: str
database: str
timestamp: datetime
# Datenbankverbindung
def get_db_connection():
"""Erstellt eine Datenbankverbindung"""
try:
conn = psycopg.connect(
host=DB_HOST,
port=DB_PORT,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
row_factory=dict_row
)
return conn
except Exception as e:
logger.error(f"Datenbankverbindungsfehler: {e}")
raise HTTPException(status_code=500, detail="Datenbankverbindung fehlgeschlagen")
# API Endpoints
@app.get("/", tags=["General"])
async def root():
"""Root Endpoint"""
return {
"message": "Wetterstation API",
"version": "1.0.0",
"docs": "/docs"
}
@app.get("/health", response_model=HealthResponse, tags=["General"])
async def health_check():
"""Health Check Endpoint"""
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
conn.close()
db_status = "connected"
except Exception:
db_status = "disconnected"
return {
"status": "ok" if db_status == "connected" else "error",
"database": db_status,
"timestamp": datetime.now()
}
@app.get("/weather/latest", response_model=WeatherData, tags=["Weather Data"])
async def get_latest_weather():
"""Gibt die neuesten Wetterdaten zurück"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT * FROM weather_data
ORDER BY datetime DESC
LIMIT 1
""")
result = cursor.fetchone()
if not result:
raise HTTPException(status_code=404, detail="Keine Daten verfügbar")
return dict(result)
finally:
conn.close()
@app.get("/weather/current", response_model=WeatherData, tags=["Weather Data"])
async def get_current_weather():
"""Alias für /weather/latest - gibt aktuelle Wetterdaten zurück"""
return await get_latest_weather()
@app.get("/weather/history", response_model=List[WeatherData], tags=["Weather Data"])
async def get_weather_history(
hours: int = Query(24, ge=1, le=168, description="Anzahl Stunden zurück (max 168 = 7 Tage)"),
limit: int = Query(1000, ge=1, le=10000, description="Maximale Anzahl Datensätze")
):
"""Gibt historische Wetterdaten der letzten X Stunden zurück"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT * FROM weather_data
WHERE datetime >= NOW() - make_interval(hours => %s)
ORDER BY datetime DESC
LIMIT %s
""", (hours, limit))
results = cursor.fetchall()
return [dict(row) for row in results]
finally:
conn.close()
@app.get("/weather/range", response_model=List[WeatherData], tags=["Weather Data"])
async def get_weather_by_date_range(
start: datetime = Query(..., description="Startdatum (ISO 8601)"),
end: datetime = Query(..., description="Enddatum (ISO 8601)"),
limit: int = Query(10000, ge=1, le=50000, description="Maximale Anzahl Datensätze")
):
"""Gibt Wetterdaten für einen bestimmten Zeitraum zurück"""
if start >= end:
raise HTTPException(status_code=400, detail="Startdatum muss vor Enddatum liegen")
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT * FROM weather_data
WHERE datetime BETWEEN %s AND %s
ORDER BY datetime ASC
LIMIT %s
""", (start, end, limit))
results = cursor.fetchall()
return [dict(row) for row in results]
finally:
conn.close()
@app.get("/weather/stats", response_model=WeatherStats, tags=["Statistics"])
async def get_weather_statistics(
hours: int = Query(24, ge=1, le=168, description="Zeitraum in Stunden für Statistiken")
):
"""Gibt aggregierte Statistiken für den angegebenen Zeitraum zurück"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT
AVG(temperature) as avg_temperature,
MIN(temperature) as min_temperature,
MAX(temperature) as max_temperature,
AVG(humidity) as avg_humidity,
AVG(pressure) as avg_pressure,
AVG(wind_speed) as avg_wind_speed,
MAX(wind_gust) as max_wind_gust,
SUM(rain) as total_rain,
COUNT(*) as data_points
FROM weather_data
WHERE datetime >= NOW() - make_interval(hours => %s)
""", (hours,))
result = cursor.fetchone()
if not result or result['data_points'] == 0:
raise HTTPException(status_code=404, detail="Keine Daten für den Zeitraum verfügbar")
return dict(result)
finally:
conn.close()
@app.get("/weather/daily", response_model=List[WeatherStats], tags=["Statistics"])
async def get_daily_statistics(
days: int = Query(7, ge=1, le=90, description="Anzahl Tage zurück (max 90)")
):
"""Gibt tägliche Statistiken für die letzten X Tage zurück"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT
DATE(datetime) as date,
AVG(temperature) as avg_temperature,
MIN(temperature) as min_temperature,
MAX(temperature) as max_temperature,
AVG(humidity) as avg_humidity,
AVG(pressure) as avg_pressure,
AVG(wind_speed) as avg_wind_speed,
MAX(wind_gust) as max_wind_gust,
SUM(rain) as total_rain,
COUNT(*) as data_points
FROM weather_data
WHERE datetime >= NOW() - make_interval(days => %s)
GROUP BY DATE(datetime)
ORDER BY date DESC
""", (days,))
results = cursor.fetchall()
return [dict(row) for row in results]
finally:
conn.close()
@app.get("/weather/temperature", response_model=List[dict], tags=["Weather Data"])
async def get_temperature_data(
hours: int = Query(24, ge=1, le=168, description="Anzahl Stunden zurück")
):
"""Gibt nur Temperatur-Zeitreihen zurück (optimiert für Diagramme)"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT datetime, temperature
FROM weather_data
WHERE datetime >= NOW() - make_interval(hours => %s)
AND temperature IS NOT NULL
ORDER BY datetime ASC
""", (hours,))
results = cursor.fetchall()
return [dict(row) for row in results]
finally:
conn.close()
@app.get("/weather/wind", response_model=List[dict], tags=["Weather Data"])
async def get_wind_data(
hours: int = Query(24, ge=1, le=168, description="Anzahl Stunden zurück")
):
"""Gibt nur Wind-Daten zurück (Geschwindigkeit, Richtung, Böen)"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT datetime, wind_speed, wind_gust, wind_dir
FROM weather_data
WHERE datetime >= NOW() - make_interval(hours => %s)
ORDER BY datetime ASC
""", (hours,))
results = cursor.fetchall()
return [dict(row) for row in results]
finally:
conn.close()
@app.get("/weather/rain", response_model=List[dict], tags=["Weather Data"])
async def get_rain_data(
hours: int = Query(24, ge=1, le=168, description="Anzahl Stunden zurück")
):
"""Gibt nur Regen-Daten zurück"""
conn = get_db_connection()
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT datetime, rain, rain_rate
FROM weather_data
WHERE datetime >= NOW() - make_interval(hours => %s)
ORDER BY datetime ASC
""", (hours,))
results = cursor.fetchall()
return [dict(row) for row in results]
finally:
conn.close()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)