Files
wetterstation_python/tools/sqlite_query.py

194 lines
6.2 KiB
Python

#!/usr/bin/env python3
"""
SQLite Query Tool mit formatierter DateTime-Ausgabe
Zeigt dateTime als 'YYYY-MM-DD HH:MM:SS' statt Unix-Timestamp
"""
import sqlite3
import sys
from datetime import datetime
from pathlib import Path
def format_datetime(timestamp):
"""Konvertiert Unix-Timestamp in lesbares Format"""
try:
if timestamp is None:
return 'NULL'
return datetime.fromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H:%M:%S')
except (ValueError, OSError):
return str(timestamp)
def format_value(value):
"""Formatiert Werte für die Ausgabe"""
if value is None:
return 'NULL'
if isinstance(value, float):
return f'{value:.2f}'
return str(value)
def execute_query(db_path, query=None, limit=None):
"""Führt eine SELECT-Abfrage aus und zeigt Ergebnisse formatiert an"""
if not Path(db_path).exists():
print(f"Fehler: Datenbank '{db_path}' nicht gefunden!")
return False
conn = sqlite3.connect(db_path)
try:
cursor = conn.cursor()
# Wenn keine Query angegeben, zeige verfügbare Tabellen
if not query:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
tables = cursor.fetchall()
if not tables:
print("Keine Tabellen gefunden!")
return False
print("\nVerfügbare Tabellen:")
for table, in tables:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
print(f" - {table} ({count} Datensätze)")
# Erste Tabelle als Standard verwenden
default_table = tables[0][0]
query = f"SELECT * FROM {default_table}"
if limit:
query += f" LIMIT {limit}"
else:
query += " LIMIT 10"
print(f"\nStandard-Abfrage: {query}\n")
# Query ausführen
cursor.execute(query)
# Spaltennamen holen
columns = [description[0] for description in cursor.description]
# Prüfen, ob dateTime-Spalte vorhanden ist
datetime_index = None
if 'dateTime' in columns:
datetime_index = columns.index('dateTime')
# Daten abrufen
rows = cursor.fetchall()
if not rows:
print("Keine Ergebnisse gefunden.")
return True
print(f"Anzahl Ergebnisse: {len(rows)}\n")
# Spaltenbreiten berechnen
col_widths = []
for i, col in enumerate(columns):
if i == datetime_index:
max_width = max(len(col), 19) # 'YYYY-MM-DD HH:MM:SS' = 19 Zeichen
else:
max_width = len(col)
for row in rows[:100]: # Nur erste 100 Zeilen für Breitenberechnung
max_width = max(max_width, len(format_value(row[i])))
col_widths.append(min(max_width, 30)) # Max 30 Zeichen pro Spalte
# Header ausgeben
header = ' | '.join(col.ljust(col_widths[i]) for i, col in enumerate(columns))
print(header)
print('-' * len(header))
# Daten ausgeben
for row in rows:
formatted_row = []
for i, value in enumerate(row):
if i == datetime_index:
formatted_row.append(format_datetime(value).ljust(col_widths[i]))
else:
formatted_row.append(format_value(value).ljust(col_widths[i]))
print(' | '.join(formatted_row))
return True
except sqlite3.Error as e:
print(f"Datenbankfehler: {e}")
return False
finally:
conn.close()
def interactive_mode(db_path):
"""Interaktiver Modus für mehrere Abfragen"""
print("=" * 70)
print("SQLite Query Tool - Interaktiver Modus")
print("=" * 70)
print("\nTipps:")
print(" - Geben Sie SQL-Abfragen direkt ein")
print(" - Drücken Sie Enter ohne Eingabe für Standard-Abfrage (erste 10 Zeilen)")
print(" - Geben Sie 'exit' oder 'quit' zum Beenden ein")
print(" - dateTime wird automatisch als 'YYYY-MM-DD HH:MM:SS' angezeigt\n")
execute_query(db_path) # Zeige verfügbare Tabellen und Standard-Abfrage
while True:
try:
query = input("\nSQL> ").strip()
if query.lower() in ('exit', 'quit', 'q'):
print("Auf Wiedersehen!")
break
if not query:
# Standard-Abfrage
execute_query(db_path)
else:
execute_query(db_path, query)
except KeyboardInterrupt:
print("\n\nUnterbrochen. Auf Wiedersehen!")
break
except EOFError:
print("\nAuf Wiedersehen!")
break
def main():
if len(sys.argv) < 2:
print("Verwendung:")
print(" python sqlite_query.py <datenbank.db> # Interaktiver Modus")
print(" python sqlite_query.py <datenbank.db> '<SQL-Query>' # Einzelne Abfrage")
print(" python sqlite_query.py <datenbank.db> <tabelle> [limit] # Tabelle anzeigen")
print("\nBeispiele:")
print(" python sqlite_query.py wetter.db")
print(" python sqlite_query.py wetter.db 'SELECT * FROM weather_data WHERE outTemp > 20'")
print(" python sqlite_query.py wetter.db weather_data 50")
sys.exit(1)
db_path = sys.argv[1]
if len(sys.argv) == 2:
# Interaktiver Modus
interactive_mode(db_path)
elif len(sys.argv) == 3:
# Einzelne Query oder Tabellenname
arg = sys.argv[2]
if arg.upper().startswith('SELECT') or arg.upper().startswith('WITH'):
execute_query(db_path, arg)
else:
# Als Tabellenname interpretieren
execute_query(db_path, f"SELECT * FROM {arg} LIMIT 10")
elif len(sys.argv) == 4:
# Tabelle mit Limit
table = sys.argv[2]
limit = sys.argv[3]
execute_query(db_path, f"SELECT * FROM {table} LIMIT {limit}")
if __name__ == "__main__":
main()