First commit
This commit is contained in:
7
collector/.dockerignore
Normal file
7
collector/.dockerignore
Normal file
@@ -0,0 +1,7 @@
|
||||
.venv/
|
||||
__pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
*.pyd
|
||||
.env
|
||||
*.log
|
||||
13
collector/Dockerfile
Normal file
13
collector/Dockerfile
Normal file
@@ -0,0 +1,13 @@
|
||||
FROM python:3.13-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Installiere Requirements
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Kopiere Collector Code
|
||||
COPY main.py .
|
||||
|
||||
# Starte Collector
|
||||
CMD ["python", "main.py"]
|
||||
56
collector/README.md
Normal file
56
collector/README.md
Normal file
@@ -0,0 +1,56 @@
|
||||
# Wetterstation Data Collector
|
||||
|
||||
Dieses Programm liest Wetterdaten von einem MQTT-Broker und speichert sie in einer PostgreSQL-Datenbank.
|
||||
|
||||
## Features
|
||||
|
||||
- Verbindung zu MQTT-Broker mit TLS/SSL und Authentifizierung
|
||||
- Automatisches Speichern von Wetterdaten alle 5 Minuten
|
||||
- PostgreSQL-Datenbank mit automatischer Tabellenerstellung
|
||||
- Duplikatschutz über UPSERT
|
||||
- Logging für Monitoring und Fehlersuche
|
||||
|
||||
## Setup
|
||||
|
||||
1. **Python-Pakete installieren:**
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
2. **.env Datei erstellen:**
|
||||
```bash
|
||||
cp .env.example .env
|
||||
```
|
||||
Dann `.env` mit deinen Zugangsdaten ausfüllen:
|
||||
- MQTT Broker Zugangsdaten (Username, Password)
|
||||
- PostgreSQL Datenbank Zugangsdaten (User, Password, DB Name)
|
||||
|
||||
3. **PostgreSQL Datenbank erstellen:**
|
||||
```sql
|
||||
CREATE DATABASE wetterstation;
|
||||
```
|
||||
|
||||
## Verwendung
|
||||
|
||||
```bash
|
||||
python main.py
|
||||
```
|
||||
|
||||
Das Programm läuft kontinuierlich und:
|
||||
- Verbindet sich mit dem MQTT-Broker rexfue.de:18883
|
||||
- Abonniert das Topic "vantage/live"
|
||||
- Speichert eingehende Wetterdaten in der Datenbank
|
||||
- Kann mit Ctrl+C beendet werden
|
||||
|
||||
## Datenstruktur
|
||||
|
||||
Die empfangenen Daten enthalten:
|
||||
- `datetime`: Zeitstempel der Messung
|
||||
- `temperature`: Temperatur in °C
|
||||
- `humidity`: Luftfeuchtigkeit in %
|
||||
- `pressure`: Luftdruck in hPa
|
||||
- `wind_speed`: Windgeschwindigkeit
|
||||
- `wind_gust`: Windböen
|
||||
- `wind_dir`: Windrichtung in Grad
|
||||
- `rain`: Niederschlagsmenge
|
||||
- `rain_rate`: Niederschlagsrate
|
||||
204
collector/main.py
Normal file
204
collector/main.py
Normal file
@@ -0,0 +1,204 @@
|
||||
# MQTT subscriber that reads weather data and stores in PostgreSQL
|
||||
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
import ssl
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from dotenv import load_dotenv
|
||||
import paho.mqtt.client as mqtt
|
||||
import psycopg2
|
||||
from psycopg2.extras import RealDictCursor
|
||||
|
||||
# Logging konfigurieren
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Umgebungsvariablen laden - eine Ebene höher
|
||||
env_path = Path(__file__).parent.parent / '.env'
|
||||
load_dotenv(dotenv_path=env_path)
|
||||
|
||||
# Konfiguration
|
||||
MQTT_BROKER = os.getenv('MQTT_BROKER', 'rexfue.de')
|
||||
MQTT_PORT = int(os.getenv('MQTT_PORT', 1883))
|
||||
MQTT_USERNAME = os.getenv('MQTT_USERNAME')
|
||||
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD')
|
||||
MQTT_TOPIC = os.getenv('MQTT_TOPIC', 'vantage/live')
|
||||
|
||||
DB_HOST = os.getenv('DB_HOST', 'localhost')
|
||||
DB_PORT = int(os.getenv('DB_PORT', 5432))
|
||||
DB_NAME = os.getenv('DB_NAME', 'wetterstation')
|
||||
DB_USER = os.getenv('DB_USER')
|
||||
DB_PASSWORD = os.getenv('DB_PASSWORD')
|
||||
|
||||
|
||||
class WeatherDataCollector:
|
||||
"""Klasse zum Sammeln und Speichern von Wetterdaten aus MQTT in PostgreSQL"""
|
||||
|
||||
def __init__(self):
|
||||
self.db_conn = None
|
||||
self.mqtt_client = None
|
||||
self.setup_database()
|
||||
self.setup_mqtt()
|
||||
|
||||
def setup_database(self):
|
||||
"""Datenbankverbindung herstellen und Tabelle erstellen"""
|
||||
try:
|
||||
self.db_conn = psycopg2.connect(
|
||||
host=DB_HOST,
|
||||
port=DB_PORT,
|
||||
database=DB_NAME,
|
||||
user=DB_USER,
|
||||
password=DB_PASSWORD
|
||||
)
|
||||
logger.info("Datenbankverbindung hergestellt")
|
||||
|
||||
# Tabelle erstellen falls nicht vorhanden
|
||||
with self.db_conn.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS weather_data (
|
||||
id SERIAL PRIMARY KEY,
|
||||
datetime TIMESTAMP NOT NULL,
|
||||
temperature FLOAT,
|
||||
humidity INTEGER,
|
||||
pressure FLOAT,
|
||||
wind_speed FLOAT,
|
||||
wind_gust FLOAT,
|
||||
wind_dir FLOAT,
|
||||
rain FLOAT,
|
||||
rain_rate FLOAT,
|
||||
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(datetime)
|
||||
)
|
||||
""")
|
||||
self.db_conn.commit()
|
||||
logger.info("Tabelle weather_data bereit")
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler bei Datenbankverbindung: {e}")
|
||||
raise
|
||||
|
||||
def setup_mqtt(self):
|
||||
"""MQTT Client konfigurieren"""
|
||||
self.mqtt_client = mqtt.Client()
|
||||
self.mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
|
||||
|
||||
# Callbacks setzen
|
||||
self.mqtt_client.on_connect = self.on_connect
|
||||
self.mqtt_client.on_message = self.on_message
|
||||
self.mqtt_client.on_disconnect = self.on_disconnect
|
||||
|
||||
logger.info(f"MQTT Client konfiguriert für {MQTT_BROKER}:{MQTT_PORT}")
|
||||
|
||||
def on_connect(self, client, userdata, flags, rc):
|
||||
"""Callback wenn MQTT Verbindung hergestellt wird"""
|
||||
if rc == 0:
|
||||
logger.info("Mit MQTT Broker verbunden")
|
||||
client.subscribe(MQTT_TOPIC)
|
||||
logger.info(f"Topic abonniert: {MQTT_TOPIC}")
|
||||
else:
|
||||
logger.error(f"Verbindung fehlgeschlagen mit Code {rc}")
|
||||
|
||||
def on_disconnect(self, client, userdata, rc):
|
||||
"""Callback wenn MQTT Verbindung getrennt wird"""
|
||||
if rc != 0:
|
||||
logger.warning(f"Unerwartete Trennung vom Broker. Code: {rc}")
|
||||
|
||||
def on_message(self, client, userdata, msg):
|
||||
"""Callback wenn MQTT Nachricht empfangen wird"""
|
||||
try:
|
||||
payload = msg.payload.decode('utf-8')
|
||||
logger.info(f"Nachricht empfangen auf {msg.topic}: {payload}")
|
||||
|
||||
# JSON parsen
|
||||
data = json.loads(payload)
|
||||
|
||||
# In Datenbank speichern
|
||||
self.save_to_database(data)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Fehler beim JSON-Parsen: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler bei Nachrichtenverarbeitung: {e}")
|
||||
|
||||
def save_to_database(self, data):
|
||||
"""Wetterdaten in PostgreSQL speichern"""
|
||||
try:
|
||||
with self.db_conn.cursor() as cursor:
|
||||
cursor.execute("""
|
||||
INSERT INTO weather_data
|
||||
(datetime, temperature, humidity, pressure, wind_speed,
|
||||
wind_gust, wind_dir, rain, rain_rate)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (datetime) DO UPDATE SET
|
||||
temperature = EXCLUDED.temperature,
|
||||
humidity = EXCLUDED.humidity,
|
||||
pressure = EXCLUDED.pressure,
|
||||
wind_speed = EXCLUDED.wind_speed,
|
||||
wind_gust = EXCLUDED.wind_gust,
|
||||
wind_dir = EXCLUDED.wind_dir,
|
||||
rain = EXCLUDED.rain,
|
||||
rain_rate = EXCLUDED.rain_rate
|
||||
""", (
|
||||
data.get('datetime'),
|
||||
data.get('temperature'),
|
||||
data.get('humidity'),
|
||||
data.get('pressure'),
|
||||
data.get('wind_speed'),
|
||||
data.get('wind_gust'),
|
||||
data.get('wind_dir'),
|
||||
data.get('rain'),
|
||||
data.get('rain_rate')
|
||||
))
|
||||
self.db_conn.commit()
|
||||
logger.info(f"Daten gespeichert für {data.get('datetime')}")
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Speichern in Datenbank: {e}")
|
||||
self.db_conn.rollback()
|
||||
|
||||
def start(self):
|
||||
"""MQTT Client starten und auf Nachrichten warten"""
|
||||
try:
|
||||
self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
|
||||
logger.info("Starte MQTT Loop...")
|
||||
self.mqtt_client.loop_forever()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Programm wird beendet...")
|
||||
except Exception as e:
|
||||
logger.error(f"Fehler beim Start: {e}")
|
||||
finally:
|
||||
self.cleanup()
|
||||
|
||||
def cleanup(self):
|
||||
"""Ressourcen aufräumen"""
|
||||
if self.mqtt_client:
|
||||
self.mqtt_client.disconnect()
|
||||
logger.info("MQTT Verbindung getrennt")
|
||||
if self.db_conn:
|
||||
self.db_conn.close()
|
||||
logger.info("Datenbankverbindung geschlossen")
|
||||
|
||||
|
||||
def main():
|
||||
"""Hauptfunktion"""
|
||||
logger.info("Wetterstation Collector startet...")
|
||||
|
||||
# Prüfen ob alle nötigen Umgebungsvariablen gesetzt sind
|
||||
required_vars = ['MQTT_USERNAME', 'MQTT_PASSWORD', 'DB_USER', 'DB_PASSWORD']
|
||||
missing_vars = [var for var in required_vars if not os.getenv(var)]
|
||||
|
||||
if missing_vars:
|
||||
logger.error(f"Fehlende Umgebungsvariablen: {', '.join(missing_vars)}")
|
||||
logger.error("Bitte .env Datei mit den erforderlichen Werten erstellen")
|
||||
return
|
||||
|
||||
collector = WeatherDataCollector()
|
||||
collector.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
3
collector/requirements.txt
Normal file
3
collector/requirements.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
paho-mqtt==1.6.1
|
||||
psycopg2-binary==2.9.10
|
||||
python-dotenv==1.0.0
|
||||
Reference in New Issue
Block a user