194 lines
6.2 KiB
Python
194 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SQLite Query Tool mit formatierter DateTime-Ausgabe
|
|
Zeigt dateTime als 'YYYY-MM-DD HH:MM:SS' statt Unix-Timestamp
|
|
"""
|
|
|
|
import sqlite3
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
|
|
def format_datetime(timestamp):
|
|
"""Konvertiert Unix-Timestamp in lesbares Format"""
|
|
try:
|
|
if timestamp is None:
|
|
return 'NULL'
|
|
return datetime.fromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H:%M:%S')
|
|
except (ValueError, OSError):
|
|
return str(timestamp)
|
|
|
|
|
|
def format_value(value):
|
|
"""Formatiert Werte für die Ausgabe"""
|
|
if value is None:
|
|
return 'NULL'
|
|
if isinstance(value, float):
|
|
return f'{value:.2f}'
|
|
return str(value)
|
|
|
|
|
|
def execute_query(db_path, query=None, limit=None):
|
|
"""Führt eine SELECT-Abfrage aus und zeigt Ergebnisse formatiert an"""
|
|
|
|
if not Path(db_path).exists():
|
|
print(f"Fehler: Datenbank '{db_path}' nicht gefunden!")
|
|
return False
|
|
|
|
conn = sqlite3.connect(db_path)
|
|
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Wenn keine Query angegeben, zeige verfügbare Tabellen
|
|
if not query:
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
|
|
tables = cursor.fetchall()
|
|
|
|
if not tables:
|
|
print("Keine Tabellen gefunden!")
|
|
return False
|
|
|
|
print("\nVerfügbare Tabellen:")
|
|
for table, in tables:
|
|
cursor.execute(f"SELECT COUNT(*) FROM {table}")
|
|
count = cursor.fetchone()[0]
|
|
print(f" - {table} ({count} Datensätze)")
|
|
|
|
# Erste Tabelle als Standard verwenden
|
|
default_table = tables[0][0]
|
|
query = f"SELECT * FROM {default_table}"
|
|
if limit:
|
|
query += f" LIMIT {limit}"
|
|
else:
|
|
query += " LIMIT 10"
|
|
|
|
print(f"\nStandard-Abfrage: {query}\n")
|
|
|
|
# Query ausführen
|
|
cursor.execute(query)
|
|
|
|
# Spaltennamen holen
|
|
columns = [description[0] for description in cursor.description]
|
|
|
|
# Prüfen, ob dateTime-Spalte vorhanden ist
|
|
datetime_index = None
|
|
if 'dateTime' in columns:
|
|
datetime_index = columns.index('dateTime')
|
|
|
|
# Daten abrufen
|
|
rows = cursor.fetchall()
|
|
|
|
if not rows:
|
|
print("Keine Ergebnisse gefunden.")
|
|
return True
|
|
|
|
print(f"Anzahl Ergebnisse: {len(rows)}\n")
|
|
|
|
# Spaltenbreiten berechnen
|
|
col_widths = []
|
|
for i, col in enumerate(columns):
|
|
if i == datetime_index:
|
|
max_width = max(len(col), 19) # 'YYYY-MM-DD HH:MM:SS' = 19 Zeichen
|
|
else:
|
|
max_width = len(col)
|
|
for row in rows[:100]: # Nur erste 100 Zeilen für Breitenberechnung
|
|
max_width = max(max_width, len(format_value(row[i])))
|
|
col_widths.append(min(max_width, 30)) # Max 30 Zeichen pro Spalte
|
|
|
|
# Header ausgeben
|
|
header = ' | '.join(col.ljust(col_widths[i]) for i, col in enumerate(columns))
|
|
print(header)
|
|
print('-' * len(header))
|
|
|
|
# Daten ausgeben
|
|
for row in rows:
|
|
formatted_row = []
|
|
for i, value in enumerate(row):
|
|
if i == datetime_index:
|
|
formatted_row.append(format_datetime(value).ljust(col_widths[i]))
|
|
else:
|
|
formatted_row.append(format_value(value).ljust(col_widths[i]))
|
|
print(' | '.join(formatted_row))
|
|
|
|
return True
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"Datenbankfehler: {e}")
|
|
return False
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def interactive_mode(db_path):
|
|
"""Interaktiver Modus für mehrere Abfragen"""
|
|
print("=" * 70)
|
|
print("SQLite Query Tool - Interaktiver Modus")
|
|
print("=" * 70)
|
|
print("\nTipps:")
|
|
print(" - Geben Sie SQL-Abfragen direkt ein")
|
|
print(" - Drücken Sie Enter ohne Eingabe für Standard-Abfrage (erste 10 Zeilen)")
|
|
print(" - Geben Sie 'exit' oder 'quit' zum Beenden ein")
|
|
print(" - dateTime wird automatisch als 'YYYY-MM-DD HH:MM:SS' angezeigt\n")
|
|
|
|
execute_query(db_path) # Zeige verfügbare Tabellen und Standard-Abfrage
|
|
|
|
while True:
|
|
try:
|
|
query = input("\nSQL> ").strip()
|
|
|
|
if query.lower() in ('exit', 'quit', 'q'):
|
|
print("Auf Wiedersehen!")
|
|
break
|
|
|
|
if not query:
|
|
# Standard-Abfrage
|
|
execute_query(db_path)
|
|
else:
|
|
execute_query(db_path, query)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\nUnterbrochen. Auf Wiedersehen!")
|
|
break
|
|
except EOFError:
|
|
print("\nAuf Wiedersehen!")
|
|
break
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Verwendung:")
|
|
print(" python sqlite_query.py <datenbank.db> # Interaktiver Modus")
|
|
print(" python sqlite_query.py <datenbank.db> '<SQL-Query>' # Einzelne Abfrage")
|
|
print(" python sqlite_query.py <datenbank.db> <tabelle> [limit] # Tabelle anzeigen")
|
|
print("\nBeispiele:")
|
|
print(" python sqlite_query.py wetter.db")
|
|
print(" python sqlite_query.py wetter.db 'SELECT * FROM weather_data WHERE outTemp > 20'")
|
|
print(" python sqlite_query.py wetter.db weather_data 50")
|
|
sys.exit(1)
|
|
|
|
db_path = sys.argv[1]
|
|
|
|
if len(sys.argv) == 2:
|
|
# Interaktiver Modus
|
|
interactive_mode(db_path)
|
|
elif len(sys.argv) == 3:
|
|
# Einzelne Query oder Tabellenname
|
|
arg = sys.argv[2]
|
|
if arg.upper().startswith('SELECT') or arg.upper().startswith('WITH'):
|
|
execute_query(db_path, arg)
|
|
else:
|
|
# Als Tabellenname interpretieren
|
|
execute_query(db_path, f"SELECT * FROM {arg} LIMIT 10")
|
|
elif len(sys.argv) == 4:
|
|
# Tabelle mit Limit
|
|
table = sys.argv[2]
|
|
limit = sys.argv[3]
|
|
execute_query(db_path, f"SELECT * FROM {table} LIMIT {limit}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|