From eb980928339ca84b52516120586b35354802b8cd Mon Sep 17 00:00:00 2001 From: rxf Date: Fri, 6 Feb 2026 19:04:26 +0100 Subject: [PATCH] First commit --- .gitignore | 54 ++++++++++ collector/.dockerignore | 7 ++ collector/Dockerfile | 13 +++ collector/README.md | 56 ++++++++++ collector/main.py | 204 +++++++++++++++++++++++++++++++++++++ collector/requirements.txt | 3 + docker-compose.yml | 55 ++++++++++ 7 files changed, 392 insertions(+) create mode 100644 .gitignore create mode 100644 collector/.dockerignore create mode 100644 collector/Dockerfile create mode 100644 collector/README.md create mode 100644 collector/main.py create mode 100644 collector/requirements.txt create mode 100644 docker-compose.yml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f988da3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,54 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual Environment +.venv/ +venv/ +ENV/ +env/ + +# Environment Variables +.env +.env.local + +# IDEs +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + +# Logs +*.log +logs/ + +# Docker +docker-compose.override.yml + +# Database +*.db +*.sqlite +*.sqlite3 + +# pgAdmin +pgadmin_data/ diff --git a/collector/.dockerignore b/collector/.dockerignore new file mode 100644 index 0000000..4b4eebc --- /dev/null +++ b/collector/.dockerignore @@ -0,0 +1,7 @@ +.venv/ +__pycache__/ +*.pyc +*.pyo +*.pyd +.env +*.log diff --git a/collector/Dockerfile b/collector/Dockerfile new file mode 100644 index 0000000..2fa77d8 --- /dev/null +++ b/collector/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.13-slim + +WORKDIR /app + +# Installiere Requirements +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Kopiere Collector Code +COPY main.py . + +# Starte Collector +CMD ["python", "main.py"] diff --git a/collector/README.md b/collector/README.md new file mode 100644 index 0000000..69405c9 --- /dev/null +++ b/collector/README.md @@ -0,0 +1,56 @@ +# Wetterstation Data Collector + +Dieses Programm liest Wetterdaten von einem MQTT-Broker und speichert sie in einer PostgreSQL-Datenbank. + +## Features + +- Verbindung zu MQTT-Broker mit TLS/SSL und Authentifizierung +- Automatisches Speichern von Wetterdaten alle 5 Minuten +- PostgreSQL-Datenbank mit automatischer Tabellenerstellung +- Duplikatschutz über UPSERT +- Logging für Monitoring und Fehlersuche + +## Setup + +1. **Python-Pakete installieren:** + ```bash + pip install -r requirements.txt + ``` + +2. **.env Datei erstellen:** + ```bash + cp .env.example .env + ``` + Dann `.env` mit deinen Zugangsdaten ausfüllen: + - MQTT Broker Zugangsdaten (Username, Password) + - PostgreSQL Datenbank Zugangsdaten (User, Password, DB Name) + +3. **PostgreSQL Datenbank erstellen:** + ```sql + CREATE DATABASE wetterstation; + ``` + +## Verwendung + +```bash +python main.py +``` + +Das Programm läuft kontinuierlich und: +- Verbindet sich mit dem MQTT-Broker rexfue.de:18883 +- Abonniert das Topic "vantage/live" +- Speichert eingehende Wetterdaten in der Datenbank +- Kann mit Ctrl+C beendet werden + +## Datenstruktur + +Die empfangenen Daten enthalten: +- `datetime`: Zeitstempel der Messung +- `temperature`: Temperatur in °C +- `humidity`: Luftfeuchtigkeit in % +- `pressure`: Luftdruck in hPa +- `wind_speed`: Windgeschwindigkeit +- `wind_gust`: Windböen +- `wind_dir`: Windrichtung in Grad +- `rain`: Niederschlagsmenge +- `rain_rate`: Niederschlagsrate diff --git a/collector/main.py b/collector/main.py new file mode 100644 index 0000000..a14cb4d --- /dev/null +++ b/collector/main.py @@ -0,0 +1,204 @@ +# MQTT subscriber that reads weather data and stores in PostgreSQL + +import os +import json +import logging +import ssl +from datetime import datetime +from pathlib import Path +from dotenv import load_dotenv +import paho.mqtt.client as mqtt +import psycopg2 +from psycopg2.extras import RealDictCursor + +# Logging konfigurieren +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Umgebungsvariablen laden - eine Ebene höher +env_path = Path(__file__).parent.parent / '.env' +load_dotenv(dotenv_path=env_path) + +# Konfiguration +MQTT_BROKER = os.getenv('MQTT_BROKER', 'rexfue.de') +MQTT_PORT = int(os.getenv('MQTT_PORT', 1883)) +MQTT_USERNAME = os.getenv('MQTT_USERNAME') +MQTT_PASSWORD = os.getenv('MQTT_PASSWORD') +MQTT_TOPIC = os.getenv('MQTT_TOPIC', 'vantage/live') + +DB_HOST = os.getenv('DB_HOST', 'localhost') +DB_PORT = int(os.getenv('DB_PORT', 5432)) +DB_NAME = os.getenv('DB_NAME', 'wetterstation') +DB_USER = os.getenv('DB_USER') +DB_PASSWORD = os.getenv('DB_PASSWORD') + + +class WeatherDataCollector: + """Klasse zum Sammeln und Speichern von Wetterdaten aus MQTT in PostgreSQL""" + + def __init__(self): + self.db_conn = None + self.mqtt_client = None + self.setup_database() + self.setup_mqtt() + + def setup_database(self): + """Datenbankverbindung herstellen und Tabelle erstellen""" + try: + self.db_conn = psycopg2.connect( + host=DB_HOST, + port=DB_PORT, + database=DB_NAME, + user=DB_USER, + password=DB_PASSWORD + ) + logger.info("Datenbankverbindung hergestellt") + + # Tabelle erstellen falls nicht vorhanden + with self.db_conn.cursor() as cursor: + cursor.execute(""" + CREATE TABLE IF NOT EXISTS weather_data ( + id SERIAL PRIMARY KEY, + datetime TIMESTAMP NOT NULL, + temperature FLOAT, + humidity INTEGER, + pressure FLOAT, + wind_speed FLOAT, + wind_gust FLOAT, + wind_dir FLOAT, + rain FLOAT, + rain_rate FLOAT, + received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE(datetime) + ) + """) + self.db_conn.commit() + logger.info("Tabelle weather_data bereit") + except Exception as e: + logger.error(f"Fehler bei Datenbankverbindung: {e}") + raise + + def setup_mqtt(self): + """MQTT Client konfigurieren""" + self.mqtt_client = mqtt.Client() + self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) + + # Callbacks setzen + self.mqtt_client.on_connect = self.on_connect + self.mqtt_client.on_message = self.on_message + self.mqtt_client.on_disconnect = self.on_disconnect + + logger.info(f"MQTT Client konfiguriert für {MQTT_BROKER}:{MQTT_PORT}") + + def on_connect(self, client, userdata, flags, rc): + """Callback wenn MQTT Verbindung hergestellt wird""" + if rc == 0: + logger.info("Mit MQTT Broker verbunden") + client.subscribe(MQTT_TOPIC) + logger.info(f"Topic abonniert: {MQTT_TOPIC}") + else: + logger.error(f"Verbindung fehlgeschlagen mit Code {rc}") + + def on_disconnect(self, client, userdata, rc): + """Callback wenn MQTT Verbindung getrennt wird""" + if rc != 0: + logger.warning(f"Unerwartete Trennung vom Broker. Code: {rc}") + + def on_message(self, client, userdata, msg): + """Callback wenn MQTT Nachricht empfangen wird""" + try: + payload = msg.payload.decode('utf-8') + logger.info(f"Nachricht empfangen auf {msg.topic}: {payload}") + + # JSON parsen + data = json.loads(payload) + + # In Datenbank speichern + self.save_to_database(data) + + except json.JSONDecodeError as e: + logger.error(f"Fehler beim JSON-Parsen: {e}") + except Exception as e: + logger.error(f"Fehler bei Nachrichtenverarbeitung: {e}") + + def save_to_database(self, data): + """Wetterdaten in PostgreSQL speichern""" + try: + with self.db_conn.cursor() as cursor: + cursor.execute(""" + INSERT INTO weather_data + (datetime, temperature, humidity, pressure, wind_speed, + wind_gust, wind_dir, rain, rain_rate) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (datetime) DO UPDATE SET + temperature = EXCLUDED.temperature, + humidity = EXCLUDED.humidity, + pressure = EXCLUDED.pressure, + wind_speed = EXCLUDED.wind_speed, + wind_gust = EXCLUDED.wind_gust, + wind_dir = EXCLUDED.wind_dir, + rain = EXCLUDED.rain, + rain_rate = EXCLUDED.rain_rate + """, ( + data.get('datetime'), + data.get('temperature'), + data.get('humidity'), + data.get('pressure'), + data.get('wind_speed'), + data.get('wind_gust'), + data.get('wind_dir'), + data.get('rain'), + data.get('rain_rate') + )) + self.db_conn.commit() + logger.info(f"Daten gespeichert für {data.get('datetime')}") + except Exception as e: + logger.error(f"Fehler beim Speichern in Datenbank: {e}") + self.db_conn.rollback() + + def start(self): + """MQTT Client starten und auf Nachrichten warten""" + try: + self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) + logger.info("Starte MQTT Loop...") + self.mqtt_client.loop_forever() + except KeyboardInterrupt: + logger.info("Programm wird beendet...") + except Exception as e: + logger.error(f"Fehler beim Start: {e}") + finally: + self.cleanup() + + def cleanup(self): + """Ressourcen aufräumen""" + if self.mqtt_client: + self.mqtt_client.disconnect() + logger.info("MQTT Verbindung getrennt") + if self.db_conn: + self.db_conn.close() + logger.info("Datenbankverbindung geschlossen") + + +def main(): + """Hauptfunktion""" + logger.info("Wetterstation Collector startet...") + + # Prüfen ob alle nötigen Umgebungsvariablen gesetzt sind + required_vars = ['MQTT_USERNAME', 'MQTT_PASSWORD', 'DB_USER', 'DB_PASSWORD'] + missing_vars = [var for var in required_vars if not os.getenv(var)] + + if missing_vars: + logger.error(f"Fehlende Umgebungsvariablen: {', '.join(missing_vars)}") + logger.error("Bitte .env Datei mit den erforderlichen Werten erstellen") + return + + collector = WeatherDataCollector() + collector.start() + + +if __name__ == "__main__": + main() + diff --git a/collector/requirements.txt b/collector/requirements.txt new file mode 100644 index 0000000..cbe4b3d --- /dev/null +++ b/collector/requirements.txt @@ -0,0 +1,3 @@ +paho-mqtt==1.6.1 +psycopg2-binary==2.9.10 +python-dotenv==1.0.0 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..a1fc26b --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,55 @@ +version: '3.8' + +services: + postgres: + image: postgres:16-alpine + container_name: wetterstation_db + restart: unless-stopped + environment: + POSTGRES_DB: ${DB_NAME} + POSTGRES_USER: ${DB_USER} + POSTGRES_PASSWORD: ${DB_PASSWORD} + ports: + - "${DB_PORT}:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${DB_USER} -d ${DB_NAME}"] + interval: 10s + timeout: 5s + retries: 5 + + pgadmin: + image: dpage/pgadmin4:latest + container_name: wetterstation_pgadmin + restart: unless-stopped + environment: + PGADMIN_DEFAULT_EMAIL: ${PGADMIN_EMAIL:-admin@admin.com} + PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_PASSWORD:-admin} + PGADMIN_CONFIG_SERVER_MODE: 'False' + ports: + - "5050:80" + volumes: + - pgadmin_data:/var/lib/pgadmin + depends_on: + - postgres + + collector: + build: + context: ./collector + dockerfile: Dockerfile + container_name: wetterstation_collector + restart: unless-stopped + env_file: + - ./.env + environment: + DB_HOST: postgres + depends_on: + postgres: + condition: service_healthy + +volumes: + postgres_data: + driver: local + pgadmin_data: + driver: local