Files
2026-02-10 14:06:42 +01:00

310 lines
10 KiB
Python

# HTTP API that receives weather data via POST and stores in PostgreSQL
import os
import json
import logging
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import psycopg2
from psycopg2.extras import RealDictCursor
import uvicorn
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Umgebungsvariablen laden - eine Ebene höher
env_path = Path(__file__).parent.parent / '.env'
load_dotenv(dotenv_path=env_path)
# Konfiguration
COLLECTOR_PORT = int(os.getenv('COLLECTOR_PORT', 8001))
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', 5432))
DB_NAME = os.getenv('DB_NAME', 'wetterstation')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
# FastAPI App
app = FastAPI(title="Weather Data Collector API")
# Pydantic Models
class WeatherDataInput(BaseModel):
# Unterstütze beide Formate: datetime (String) oder dateTime (Unix-Timestamp)
datetime: str | None = None
dateTime: int | None = None
# Unterstütze beide Feldnamen
temperature: float | None = None
outTemp: float | None = None # Fahrenheit
humidity: int | None = None
outHumidity: float | None = None
pressure: float | None = None
barometer: float | None = None # inHg
windSpeed: float | None = None # mph
wind_speed: float | None = None
windGust: float | None = None # mph
wind_gust: float | None = None
windDir: float | None = None
wind_dir: float | None = None
rain: float | None = None
rainRate: float | None = None
rain_rate: float | None = None
model_config = {"extra": "allow"}
def get_datetime_string(self) -> str:
"""Konvertiere dateTime (Unix-Timestamp) zu datetime (String)"""
if self.datetime:
return self.datetime
elif self.dateTime:
from datetime import datetime as dt
return dt.fromtimestamp(self.dateTime).strftime('%Y-%m-%d %H:%M:%S')
raise ValueError("Weder datetime noch dateTime vorhanden")
def get_temperature_celsius(self) -> float | None:
"""Konvertiere Temperatur von Fahrenheit zu Celsius falls nötig"""
if self.temperature is not None:
return self.temperature
elif self.outTemp is not None:
# Fahrenheit zu Celsius: (F - 32) * 5/9
return (self.outTemp - 32) * 5 / 9
return None
def get_humidity_int(self) -> int | None:
"""Hole Humidity-Wert"""
if self.humidity is not None:
return int(self.humidity)
elif self.outHumidity is not None:
return int(self.outHumidity)
return None
def get_pressure_hpa(self) -> float | None:
"""Konvertiere Druck von inHg zu hPa falls nötig"""
if self.pressure is not None:
return self.pressure
elif self.barometer is not None:
# inHg zu hPa: inHg * 33.8639
return self.barometer * 33.8639
return None
def get_wind_speed(self) -> float | None:
"""Hole Windgeschwindigkeit"""
return self.windSpeed if self.windSpeed is not None else self.wind_speed
def get_wind_gust(self) -> float | None:
"""Hole Windböen"""
return self.windGust if self.windGust is not None else self.wind_gust
def get_wind_dir(self) -> float | None:
"""Hole Windrichtung"""
return self.windDir if self.windDir is not None else self.wind_dir
def get_rain_rate(self) -> float | None:
"""Hole Regenrate"""
return self.rainRate if self.rainRate is not None else self.rain_rate
# Datenbankverbindung
def get_db_connection():
"""Datenbankverbindung herstellen"""
try:
conn = psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
return conn
except Exception as e:
logger.error(f"Datenbankverbindungsfehler: {e}")
raise HTTPException(status_code=500, detail="Datenbankverbindung fehlgeschlagen")
def setup_database():
"""Tabelle erstellen falls nicht vorhanden"""
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute("""
CREATE TABLE IF NOT EXISTS weather_data (
id SERIAL PRIMARY KEY,
datetime TIMESTAMPTZ NOT NULL,
temperature FLOAT,
humidity INTEGER,
pressure FLOAT,
wind_speed FLOAT,
wind_gust FLOAT,
wind_dir FLOAT,
rain FLOAT,
rain_rate FLOAT,
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(datetime)
)
""")
conn.commit()
logger.info("Tabelle weather_data bereit")
conn.close()
except Exception as e:
logger.error(f"Fehler bei Datenbanksetup: {e}")
raise
# API Endpoints
@app.on_event("startup")
async def startup_event():
"""Bei Start die Datenbank initialisieren"""
logger.info("Collector API startet...")
setup_database()
logger.info(f"API läuft auf Port {COLLECTOR_PORT}")
@app.get("/")
async def root():
"""Root Endpoint - GET zeigt Info"""
return {
"message": "Weather Data Collector API",
"version": "1.0.0",
"endpoint": "POST /weather or POST /"
}
@app.post("/")
async def root_post(request: Request):
"""Root Endpoint - POST akzeptiert Wetterdaten (Alias für /weather)"""
try:
# Rohen Body lesen
body = await request.body()
body_str = body.decode('utf-8')
logger.info(f"POST auf Root - Raw Body: {body_str}")
# Als JSON parsen
data_dict = json.loads(body_str)
logger.info(f"POST auf Root - Parsed JSON: {data_dict}")
# Zu Pydantic Model konvertieren
data = WeatherDataInput(**data_dict)
return await receive_weather_data(data)
except json.JSONDecodeError as e:
logger.error(f"JSON Parse Error: {e}")
raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}")
except Exception as e:
logger.error(f"Fehler bei Root POST: {e}")
raise HTTPException(status_code=422, detail=f"Validation error: {str(e)}")
@app.post("/debug")
async def debug_post(request: dict):
"""Debug Endpoint - akzeptiert beliebige JSON und loggt sie"""
logger.info(f"Debug: Empfangene Rohdaten: {request}")
return {"status": "logged", "data": request}
@app.get("/health")
async def health_check():
"""Health Check"""
try:
conn = get_db_connection()
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
conn.close()
return {"status": "healthy", "database": "connected"}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Database error: {str(e)}")
@app.post("/weather")
async def receive_weather_data(data: WeatherDataInput):
"""Wetterdaten empfangen und speichern"""
logger.info(f"Empfangene Daten: {data.model_dump()}")
try:
conn = get_db_connection()
try:
# Konvertiere zu den richtigen Werten
dt_string = data.get_datetime_string()
temp_c = data.get_temperature_celsius()
humidity = data.get_humidity_int()
pressure = data.get_pressure_hpa()
wind_speed = data.get_wind_speed()
wind_gust = data.get_wind_gust()
wind_dir = data.get_wind_dir()
rain = data.rain
rain_rate = data.get_rain_rate()
logger.info(f"Konvertierte Daten - datetime: {dt_string}, temp: {temp_c}°C, humidity: {humidity}%, pressure: {pressure} hPa")
with conn.cursor() as cursor:
cursor.execute("""
INSERT INTO weather_data
(datetime, temperature, humidity, pressure, wind_speed,
wind_gust, wind_dir, rain, rain_rate)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (datetime) DO UPDATE SET
temperature = EXCLUDED.temperature,
humidity = EXCLUDED.humidity,
pressure = EXCLUDED.pressure,
wind_speed = EXCLUDED.wind_speed,
wind_gust = EXCLUDED.wind_gust,
wind_dir = EXCLUDED.wind_dir,
rain = EXCLUDED.rain,
rain_rate = EXCLUDED.rain_rate
""", (
dt_string,
temp_c,
humidity,
pressure,
wind_speed,
wind_gust,
wind_dir,
rain,
rain_rate
))
conn.commit()
logger.info(f"Daten gespeichert für {dt_string} (UTC)")
return {
"status": "success",
"message": f"Weather data for {dt_string} saved successfully"
}
finally:
conn.close()
except Exception as e:
logger.error(f"Fehler beim Speichern: {e}")
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
def main():
"""Hauptfunktion"""
# Prüfen ob alle nötigen Umgebungsvariablen gesetzt sind
required_vars = ['DB_USER', 'DB_PASSWORD']
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
logger.error(f"Fehlende Umgebungsvariablen: {', '.join(missing_vars)}")
logger.error("Bitte .env Datei mit den erforderlichen Werten erstellen")
return
uvicorn.run(app, host="0.0.0.0", port=COLLECTOR_PORT)
if __name__ == "__main__":
main()