#!/usr/bin/env python3 """ SQLite Query Tool mit formatierter DateTime-Ausgabe Zeigt dateTime als 'YYYY-MM-DD HH:MM:SS' statt Unix-Timestamp """ import sqlite3 import sys from datetime import datetime from pathlib import Path def format_datetime(timestamp): """Konvertiert Unix-Timestamp in lesbares Format""" try: if timestamp is None: return 'NULL' return datetime.fromtimestamp(int(timestamp)).strftime('%Y-%m-%d %H:%M:%S') except (ValueError, OSError): return str(timestamp) def format_value(value): """Formatiert Werte für die Ausgabe""" if value is None: return 'NULL' if isinstance(value, float): return f'{value:.2f}' return str(value) def execute_query(db_path, query=None, limit=None): """Führt eine SELECT-Abfrage aus und zeigt Ergebnisse formatiert an""" if not Path(db_path).exists(): print(f"Fehler: Datenbank '{db_path}' nicht gefunden!") return False conn = sqlite3.connect(db_path) try: cursor = conn.cursor() # Wenn keine Query angegeben, zeige verfügbare Tabellen if not query: cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'") tables = cursor.fetchall() if not tables: print("Keine Tabellen gefunden!") return False print("\nVerfügbare Tabellen:") for table, in tables: cursor.execute(f"SELECT COUNT(*) FROM {table}") count = cursor.fetchone()[0] print(f" - {table} ({count} Datensätze)") # Erste Tabelle als Standard verwenden default_table = tables[0][0] query = f"SELECT * FROM {default_table}" if limit: query += f" LIMIT {limit}" else: query += " LIMIT 10" print(f"\nStandard-Abfrage: {query}\n") # Query ausführen cursor.execute(query) # Spaltennamen holen columns = [description[0] for description in cursor.description] # Prüfen, ob dateTime-Spalte vorhanden ist datetime_index = None if 'dateTime' in columns: datetime_index = columns.index('dateTime') # Daten abrufen rows = cursor.fetchall() if not rows: print("Keine Ergebnisse gefunden.") return True print(f"Anzahl Ergebnisse: {len(rows)}\n") # Spaltenbreiten berechnen col_widths = [] for i, col in enumerate(columns): if i == datetime_index: max_width = max(len(col), 19) # 'YYYY-MM-DD HH:MM:SS' = 19 Zeichen else: max_width = len(col) for row in rows[:100]: # Nur erste 100 Zeilen für Breitenberechnung max_width = max(max_width, len(format_value(row[i]))) col_widths.append(min(max_width, 30)) # Max 30 Zeichen pro Spalte # Header ausgeben header = ' | '.join(col.ljust(col_widths[i]) for i, col in enumerate(columns)) print(header) print('-' * len(header)) # Daten ausgeben for row in rows: formatted_row = [] for i, value in enumerate(row): if i == datetime_index: formatted_row.append(format_datetime(value).ljust(col_widths[i])) else: formatted_row.append(format_value(value).ljust(col_widths[i])) print(' | '.join(formatted_row)) return True except sqlite3.Error as e: print(f"Datenbankfehler: {e}") return False finally: conn.close() def interactive_mode(db_path): """Interaktiver Modus für mehrere Abfragen""" print("=" * 70) print("SQLite Query Tool - Interaktiver Modus") print("=" * 70) print("\nTipps:") print(" - Geben Sie SQL-Abfragen direkt ein") print(" - Drücken Sie Enter ohne Eingabe für Standard-Abfrage (erste 10 Zeilen)") print(" - Geben Sie 'exit' oder 'quit' zum Beenden ein") print(" - dateTime wird automatisch als 'YYYY-MM-DD HH:MM:SS' angezeigt\n") execute_query(db_path) # Zeige verfügbare Tabellen und Standard-Abfrage while True: try: query = input("\nSQL> ").strip() if query.lower() in ('exit', 'quit', 'q'): print("Auf Wiedersehen!") break if not query: # Standard-Abfrage execute_query(db_path) else: execute_query(db_path, query) except KeyboardInterrupt: print("\n\nUnterbrochen. Auf Wiedersehen!") break except EOFError: print("\nAuf Wiedersehen!") break def main(): if len(sys.argv) < 2: print("Verwendung:") print(" python sqlite_query.py # Interaktiver Modus") print(" python sqlite_query.py '' # Einzelne Abfrage") print(" python sqlite_query.py [limit] # Tabelle anzeigen") print("\nBeispiele:") print(" python sqlite_query.py wetter.db") print(" python sqlite_query.py wetter.db 'SELECT * FROM weather_data WHERE outTemp > 20'") print(" python sqlite_query.py wetter.db weather_data 50") sys.exit(1) db_path = sys.argv[1] if len(sys.argv) == 2: # Interaktiver Modus interactive_mode(db_path) elif len(sys.argv) == 3: # Einzelne Query oder Tabellenname arg = sys.argv[2] if arg.upper().startswith('SELECT') or arg.upper().startswith('WITH'): execute_query(db_path, arg) else: # Als Tabellenname interpretieren execute_query(db_path, f"SELECT * FROM {arg} LIMIT 10") elif len(sys.argv) == 4: # Tabelle mit Limit table = sys.argv[2] limit = sys.argv[3] execute_query(db_path, f"SELECT * FROM {table} LIMIT {limit}") if __name__ == "__main__": main()