Files
wetter_station/migrate_sqlite_to_postgres.py

220 lines
6.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Migration Tool: SQLite (wview) → PostgreSQL (wetterstation)
Migriert Wetterdaten vom 1.1.2025 bis heute
"""
import sqlite3
import psycopg
from datetime import datetime, timezone
import os
from pathlib import Path
from dotenv import load_dotenv
import sys
# Umgebungsvariablen laden
env_path = Path(__file__).parent / '.env'
load_dotenv(dotenv_path=env_path)
# Konfiguration
SQLITE_DB = "data/wview-archive.sdb"
START_DATE = datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
END_DATE = datetime(2026, 2, 8, 0, 0, 0, tzinfo=timezone.utc)
# PostgreSQL-Konfiguration
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', 5432))
DB_NAME = os.getenv('DB_NAME', 'wetterstation')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
# Soll die Tabelle vorher geleert werden?
TRUNCATE_TABLE = False # Auf False setzen, um vorhandene Daten zu behalten
# Konvertierungsfunktionen
def fahrenheit_to_celsius(f):
"""Fahrenheit → Celsius"""
if f is None:
return None
return (f - 32) * 5 / 9
def inches_hg_to_hpa(inhg):
"""inches Hg → hPa"""
if inhg is None:
return None
return inhg * 33.8639
def mph_to_kmh(mph):
"""mph → km/h"""
if mph is None:
return None
return mph * 1.60934
def inches_to_mm(inches):
"""inches → mm"""
if inches is None:
return None
return inches * 25.4
def unix_to_datetime(timestamp):
"""Unix timestamp → datetime"""
return datetime.fromtimestamp(timestamp, tz=timezone.utc)
def main():
print("=" * 60)
print("SQLite → PostgreSQL Migration")
print("=" * 60)
print(f"Quelle: {SQLITE_DB}")
print(f"Zeitraum: {START_DATE.date()} bis {END_DATE.date()}")
print(f"Ziel: PostgreSQL ({DB_HOST}:{DB_PORT}/{DB_NAME})")
print("=" * 60)
print()
# SQLite öffnen
try:
sqlite_conn = sqlite3.connect(SQLITE_DB)
sqlite_cursor = sqlite_conn.cursor()
print("✓ SQLite-Verbindung hergestellt")
except Exception as e:
print(f"✗ Fehler beim Öffnen der SQLite-Datenbank: {e}")
sys.exit(1)
# PostgreSQL öffnen
try:
pg_conn = psycopg.connect(
host=DB_HOST,
port=DB_PORT,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
pg_cursor = pg_conn.cursor()
print("✓ PostgreSQL-Verbindung hergestellt")
except Exception as e:
print(f"✗ Fehler beim Verbinden mit PostgreSQL: {e}")
sqlite_conn.close()
sys.exit(1)
# Tabelle leeren falls gewünscht
if TRUNCATE_TABLE:
print("\nLeere PostgreSQL-Tabelle weather_data...")
try:
pg_cursor.execute("TRUNCATE TABLE weather_data RESTART IDENTITY CASCADE")
pg_conn.commit()
print("✓ Tabelle geleert")
except Exception as e:
print(f"✗ Fehler beim Leeren der Tabelle: {e}")
sqlite_conn.close()
pg_conn.close()
sys.exit(1)
# Zeitraum in Unix timestamps umrechnen
start_ts = int(START_DATE.timestamp())
end_ts = int(END_DATE.timestamp())
# Daten aus SQLite laden
print(f"\nLade Daten aus SQLite (Zeitraum: {start_ts} - {end_ts})...")
sqlite_cursor.execute("""
SELECT
dateTime,
outTemp,
outHumidity,
barometer,
windSpeed,
windGust,
windDir,
rain,
rainRate
FROM archive
WHERE dateTime >= ? AND dateTime <= ?
ORDER BY dateTime ASC
""", (start_ts, end_ts))
rows = sqlite_cursor.fetchall()
print(f"{len(rows)} Datensätze gefunden")
if len(rows) == 0:
print("Keine Daten im angegebenen Zeitraum gefunden.")
sqlite_conn.close()
pg_conn.close()
return
# Migration durchführen
print("\nMigriere Daten...")
inserted = 0
skipped = 0
errors = 0
for row in rows:
try:
(dateTime, outTemp, outHumidity, barometer,
windSpeed, windGust, windDir, rain, rainRate) = row
# Konvertierungen
dt = unix_to_datetime(dateTime)
temp_c = fahrenheit_to_celsius(outTemp)
humidity = int(outHumidity) if outHumidity is not None else None
pressure_hpa = inches_hg_to_hpa(barometer)
wind_speed_kmh = mph_to_kmh(windSpeed)
wind_gust_kmh = mph_to_kmh(windGust)
rain_mm = inches_to_mm(rain)
rain_rate_mm = inches_to_mm(rainRate)
# In PostgreSQL einfügen
pg_cursor.execute("""
INSERT INTO weather_data
(datetime, temperature, humidity, pressure,
wind_speed, wind_gust, wind_dir, rain, rain_rate)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (datetime) DO NOTHING
""", (dt, temp_c, humidity, pressure_hpa,
wind_speed_kmh, wind_gust_kmh, windDir, rain_mm, rain_rate_mm))
if pg_cursor.rowcount > 0:
inserted += 1
if inserted % 1000 == 0:
pg_conn.commit()
print(f" {inserted} Datensätze eingefügt...")
else:
skipped += 1
except Exception as e:
errors += 1
if errors <= 5: # Zeige nur die ersten 5 Fehler
print(f" Fehler bei Datensatz {dateTime}: {e}")
# Commit verbleibende Daten
pg_conn.commit()
# Zusammenfassung
print("\n" + "=" * 60)
print("Migration abgeschlossen!")
print("=" * 60)
print(f"Eingefügt: {inserted} Datensätze")
print(f"Übersprungen: {skipped} Datensätze (bereits vorhanden)")
print(f"Fehler: {errors} Datensätze")
print("=" * 60)
# Zeitraum der migrierten Daten anzeigen
if inserted > 0:
pg_cursor.execute("""
SELECT MIN(datetime), MAX(datetime), COUNT(*)
FROM weather_data
WHERE datetime >= %s AND datetime <= %s
""", (START_DATE, END_DATE))
min_dt, max_dt, count = pg_cursor.fetchone()
print(f"\nDaten in PostgreSQL:")
print(f" Von: {min_dt}")
print(f" Bis: {max_dt}")
print(f" Gesamt: {count} Datensätze")
# Verbindungen schließen
sqlite_conn.close()
pg_conn.close()
print("\n✓ Fertig!")
if __name__ == "__main__":
main()