V 1.4.0 Komplette Absicherung mit Hilfe von Claude
This commit is contained in:
364
collector/main.py_old
Normal file
364
collector/main.py_old
Normal file
@@ -0,0 +1,364 @@
|
||||
# HTTP API that receives weather data via POST and stores in PostgreSQL
|
||||
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from pydantic import BaseModel
|
||||
import psycopg2
|
||||
from psycopg2.extras import RealDictCursor
|
||||
import uvicorn
|
||||
|
||||
# Logging konfigurieren
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Umgebungsvariablen laden - eine Ebene höher
|
||||
env_path = Path(__file__).parent.parent / '.env'
|
||||
load_dotenv(dotenv_path=env_path)
|
||||
|
||||
# Konfiguration
|
||||
COLLECTOR_PORT = int(os.getenv('COLLECTOR_PORT', 8001))
|
||||
|
||||
DB_HOST = os.getenv('DB_HOST', 'localhost')
|
||||
DB_PORT = int(os.getenv('DB_PORT', 5432))
|
||||
DB_NAME = os.getenv('DB_NAME', 'wetterstation')
|
||||
DB_USER = os.getenv('DB_USER')
|
||||
DB_PASSWORD = os.getenv('DB_PASSWORD')
|
||||
|
||||
# FastAPI App
|
||||
app = FastAPI(title="Weather Data Collector API")
|
||||
|
||||
|
||||
# Pydantic Models
|
||||
class WeatherDataInput(BaseModel):
|
||||
# Zeitstempel: ISO-String (time), datetime-String oder Unix-Timestamp
|
||||
time: str | None = None
|
||||
datetime: str | None = None
|
||||
dateTime: int | None = None
|
||||
|
||||
# Außentemperatur (Celsius): tempOut, temperature oder outTemp (Fahrenheit)
|
||||
tempOut: float | None = None # Celsius (neues Format)
|
||||
temperature: float | None = None
|
||||
outTemp: float | None = None # Fahrenheit (altes Format)
|
||||
|
||||
# Innentemperatur
|
||||
tempIn: float | None = None # Celsius
|
||||
|
||||
# Außenfeuchte
|
||||
humOut: int | None = None
|
||||
humidity: int | None = None
|
||||
outHumidity: float | None = None
|
||||
|
||||
# Innenfeuchte
|
||||
humIn: int | None = None
|
||||
|
||||
# Luftdruck
|
||||
pressure: float | None = None
|
||||
barometer: float | None = None # inHg
|
||||
barTrend: int | None = None # hPa/Stunde
|
||||
|
||||
# Wind
|
||||
windAvg: float | None = None # m/s Durchschnitt (neues Format)
|
||||
windSpeed: float | None = None
|
||||
wind_speed: float | None = None
|
||||
windGust: float | None = None
|
||||
wind_gust: float | None = None
|
||||
windDir: float | None = None
|
||||
wind_dir: float | None = None
|
||||
|
||||
# Niederschlag
|
||||
rain: float | None = None
|
||||
rainRate: float | None = None
|
||||
rain_rate: float | None = None
|
||||
|
||||
# Vorhersage
|
||||
forecast: int | None = None
|
||||
|
||||
model_config = {"extra": "allow"}
|
||||
|
||||
def get_datetime_string(self) -> str:
|
||||
"""Zeitstempel als String zurückgeben"""
|
||||
if self.time:
|
||||
return self.time
|
||||
elif self.datetime:
|
||||
return self.datetime
|
||||
elif self.dateTime:
|
||||
from datetime import datetime as dt
|
||||
return dt.fromtimestamp(self.dateTime).strftime('%Y-%m-%d %H:%M:%S')
|
||||
raise ValueError("Kein Zeitstempel vorhanden (time, datetime oder dateTime)")
|
||||
|
||||
def get_temperature_celsius(self) -> float | None:
|
||||
"""Außentemperatur in Celsius"""
|
||||
if self.tempOut is not None:
|
||||
return self.tempOut
|
||||
elif self.temperature is not None:
|
||||
return self.temperature
|
||||
elif self.outTemp is not None:
|
||||
return (self.outTemp - 32) * 5 / 9
|
||||
return None
|
||||
|
||||
def get_temp_in(self) -> float | None:
|
||||
"""Innentemperatur in Celsius"""
|
||||
return self.tempIn
|
||||
|
||||
def get_humidity_int(self) -> int | None:
|
||||
"""Außenfeuchte"""
|
||||
if self.humOut is not None:
|
||||
return int(self.humOut)
|
||||
elif self.humidity is not None:
|
||||
return int(self.humidity)
|
||||
elif self.outHumidity is not None:
|
||||
return int(self.outHumidity)
|
||||
return None
|
||||
|
||||
def get_humidity_in(self) -> int | None:
|
||||
"""Innenfeuchte"""
|
||||
return int(self.humIn) if self.humIn is not None else None
|
||||
|
||||
def get_pressure_hpa(self) -> float | None:
|
||||
"""Luftdruck in hPa"""
|
||||
if self.pressure is not None:
|
||||
return self.pressure
|
||||
elif self.barometer is not None:
|
||||
return self.barometer * 33.8639
|
||||
return None
|
||||
|
||||
def get_wind_speed(self) -> float | None:
|
||||
"""Durchschnittliche Windgeschwindigkeit"""
|
||||
if self.windAvg is not None:
|
||||
return self.windAvg
|
||||
elif self.windSpeed is not None:
|
||||
return self.windSpeed
|
||||
return self.wind_speed
|
||||
|
||||
def get_wind_gust(self) -> float | None:
|
||||
"""Windböe"""
|
||||
return self.windGust if self.windGust is not None else self.wind_gust
|
||||
|
||||
def get_wind_dir(self) -> float | None:
|
||||
"""Windrichtung"""
|
||||
return self.windDir if self.windDir is not None else self.wind_dir
|
||||
|
||||
def get_rain_rate(self) -> float | None:
|
||||
"""Regenrate"""
|
||||
return self.rainRate if self.rainRate is not None else self.rain_rate
|
||||
|
||||
|
||||
# Datenbankverbindung
|
||||
def get_db_connection():
|
||||
"""Datenbankverbindung herstellen"""
|
||||
try:
|
||||
conn = psycopg2.connect(
|
||||
host=DB_HOST,
|
||||
port=DB_PORT,
|
||||
database=DB_NAME,
|
||||
user=DB_USER,
|
||||
password=DB_PASSWORD
|
||||
)
|
||||
return conn
|
||||
except Exception as e:
|
||||
logger.error(f"Datenbankverbindungsfehler: {e}")
|
||||
raise HTTPException(status_code=500, detail="Datenbankverbindung fehlgeschlagen")
|
||||
|
||||
|
||||
def setup_database():
|
||||
"""Tabelle erstellen und fehlende Spalten ergänzen"""
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS weather_data (
|
||||
id SERIAL PRIMARY KEY,
|
||||
datetime TIMESTAMPTZ NOT NULL,
|
||||
temperature FLOAT,
|
||||
humidity INTEGER,
|
||||
pressure FLOAT,
|
||||
wind_speed FLOAT,
|
||||
wind_gust FLOAT,
|
||||
wind_dir FLOAT,
|
||||
rain FLOAT,
|
||||
rain_rate FLOAT,
|
||||
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(datetime)
|
||||
)
|
||||
""")
|
||||
# Neue Spalten ergänzen (idempotent)
|
||||
cursor.execute("ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS temp_in FLOAT")
|
||||
cursor.execute("ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS humidity_in INTEGER")
|
||||
cursor.execute("ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS forecast INTEGER")
|
||||
cursor.execute("ALTER TABLE weather_data ADD COLUMN IF NOT EXISTS bar_trend INTEGER")
|
||||
conn.commit()
|
||||
logger.info("Tabelle weather_data bereit (inkl. neuer Spalten)")
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler bei Datenbanksetup: {e}")
|
||||
raise
|
||||
|
||||
|
||||
# API Endpoints
|
||||
@app.on_event("startup")
|
||||
async def startup_event():
|
||||
"""Bei Start die Datenbank initialisieren"""
|
||||
logger.info("Collector API startet...")
|
||||
setup_database()
|
||||
logger.info(f"API läuft auf Port {COLLECTOR_PORT}")
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def root():
|
||||
"""Root Endpoint - GET zeigt Info"""
|
||||
return {
|
||||
"message": "Weather Data Collector API",
|
||||
"version": "1.0.0",
|
||||
"endpoint": "POST /weather or POST /"
|
||||
}
|
||||
|
||||
|
||||
@app.post("/")
|
||||
async def root_post(request: Request):
|
||||
"""Root Endpoint - POST akzeptiert Wetterdaten (Alias für /weather)"""
|
||||
try:
|
||||
# Rohen Body lesen
|
||||
body = await request.body()
|
||||
body_str = body.decode('utf-8')
|
||||
logger.info(f"POST auf Root - Raw Body: {body_str}")
|
||||
|
||||
# Als JSON parsen
|
||||
data_dict = json.loads(body_str)
|
||||
logger.info(f"POST auf Root - Parsed JSON: {data_dict}")
|
||||
|
||||
# Zu Pydantic Model konvertieren
|
||||
data = WeatherDataInput(**data_dict)
|
||||
return await receive_weather_data(data)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"JSON Parse Error: {e}")
|
||||
raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}")
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler bei Root POST: {e}")
|
||||
raise HTTPException(status_code=422, detail=f"Validation error: {str(e)}")
|
||||
|
||||
|
||||
@app.post("/debug")
|
||||
async def debug_post(request: dict):
|
||||
"""Debug Endpoint - akzeptiert beliebige JSON und loggt sie"""
|
||||
logger.info(f"Debug: Empfangene Rohdaten: {request}")
|
||||
return {"status": "logged", "data": request}
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health_check():
|
||||
"""Health Check"""
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("SELECT 1")
|
||||
conn.close()
|
||||
return {"status": "healthy", "database": "connected"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=503, detail=f"Database error: {str(e)}")
|
||||
|
||||
|
||||
@app.post("/weather")
|
||||
async def receive_weather_data(data: WeatherDataInput):
|
||||
"""Wetterdaten empfangen und speichern"""
|
||||
logger.info(f"Empfangene Daten: {data.model_dump()}")
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
try:
|
||||
# Konvertiere zu den richtigen Werten
|
||||
dt_string = data.get_datetime_string()
|
||||
temp_c = data.get_temperature_celsius()
|
||||
temp_in = data.get_temp_in()
|
||||
humidity = data.get_humidity_int()
|
||||
humidity_in = data.get_humidity_in()
|
||||
pressure = data.get_pressure_hpa()
|
||||
bar_trend = data.barTrend
|
||||
wind_speed = data.get_wind_speed()
|
||||
wind_gust = data.get_wind_gust()
|
||||
wind_dir = data.get_wind_dir()
|
||||
rain = data.rain
|
||||
rain_rate = data.get_rain_rate()
|
||||
forecast = data.forecast
|
||||
|
||||
logger.info(
|
||||
f"Konvertierte Daten - datetime: {dt_string}, "
|
||||
f"tempOut: {temp_c}°C, tempIn: {temp_in}°C, "
|
||||
f"humOut: {humidity}%, humIn: {humidity_in}%, "
|
||||
f"pressure: {pressure} hPa, barTrend: {bar_trend}"
|
||||
)
|
||||
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
INSERT INTO weather_data
|
||||
(datetime, temperature, temp_in, humidity, humidity_in,
|
||||
pressure, bar_trend, wind_speed, wind_gust, wind_dir,
|
||||
rain, rain_rate, forecast)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (datetime) DO UPDATE SET
|
||||
temperature = EXCLUDED.temperature,
|
||||
temp_in = EXCLUDED.temp_in,
|
||||
humidity = EXCLUDED.humidity,
|
||||
humidity_in = EXCLUDED.humidity_in,
|
||||
pressure = EXCLUDED.pressure,
|
||||
bar_trend = EXCLUDED.bar_trend,
|
||||
wind_speed = EXCLUDED.wind_speed,
|
||||
wind_gust = EXCLUDED.wind_gust,
|
||||
wind_dir = EXCLUDED.wind_dir,
|
||||
rain = EXCLUDED.rain,
|
||||
rain_rate = EXCLUDED.rain_rate,
|
||||
forecast = EXCLUDED.forecast
|
||||
""", (
|
||||
dt_string,
|
||||
temp_c,
|
||||
temp_in,
|
||||
humidity,
|
||||
humidity_in,
|
||||
pressure,
|
||||
bar_trend,
|
||||
wind_speed,
|
||||
wind_gust,
|
||||
wind_dir,
|
||||
rain,
|
||||
rain_rate,
|
||||
forecast
|
||||
))
|
||||
conn.commit()
|
||||
logger.info(f"Daten gespeichert für {dt_string} (UTC)")
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": f"Weather data for {dt_string} saved successfully"
|
||||
}
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Speichern: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
|
||||
|
||||
|
||||
def main():
|
||||
"""Hauptfunktion"""
|
||||
# Prüfen ob alle nötigen Umgebungsvariablen gesetzt sind
|
||||
required_vars = ['DB_USER', 'DB_PASSWORD']
|
||||
missing_vars = [var for var in required_vars if not os.getenv(var)]
|
||||
|
||||
if missing_vars:
|
||||
logger.error(f"Fehlende Umgebungsvariablen: {', '.join(missing_vars)}")
|
||||
logger.error("Bitte .env Datei mit den erforderlichen Werten erstellen")
|
||||
return
|
||||
|
||||
uvicorn.run(app, host="0.0.0.0", port=COLLECTOR_PORT)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user