Files
wetter2oag/weather_report.py
T
admin 0fcb20ae1c Initial commit: Wetterdiagramme der Sternwarte Welzheim
Erstellt monatliche Wetterdiagramme (Temperatur Min/Max, Stundenmittel,
Niederschlag) aus der Wetterstation-API und verschickt sie per Mail.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 15:14:14 +02:00

257 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Erstellt Wetterdiagramme für den letzten Kalendermonat und verschickt sie per Mail."""
import os
import smtplib
import tempfile
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import requests
from dotenv import load_dotenv
load_dotenv()
API_BASE_URL = os.getenv("API_BASE_URL", "https://stwwetter.fuerst-stuttgart.de/api")
MAIL_SERVER = os.getenv("MAIL_SERVER")
MAIL_PORT = int(os.getenv("MAIL_PORT", "587"))
MAIL_USER = os.getenv("MAIL_USER")
MAIL_PASSWORD = os.getenv("MAIL_PASSWORD")
MAIL_FROM = os.getenv("MAIL_FROM")
MAIL_TO = [a.strip() for a in os.getenv("MAIL_TO", "").split(",") if a.strip()]
MAIL_SUBJECT = os.getenv("MAIL_SUBJECT", "Wetterbericht letzter Monat")
MONTHS_DE = [
"Januar", "Februar", "März", "April", "Mai", "Juni",
"Juli", "August", "September", "Oktober", "November", "Dezember",
]
plt.rcParams.update({
"figure.facecolor": "#f8f9fa",
"axes.facecolor": "#ffffff",
"axes.grid": True,
"grid.alpha": 0.4,
"axes.spines.top": False,
"axes.spines.right": False,
"font.size": 11,
})
def last_month_range() -> tuple[datetime, datetime, str]:
"""Gibt (start, end, Monatsname) für den letzten Kalendermonat zurück."""
today = datetime.now(timezone.utc)
first_of_this = today.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
end = first_of_this - timedelta(seconds=1) # letzter Moment des Vormonats
start = end.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
label = f"{MONTHS_DE[start.month - 1]} {start.year}"
return start, end, label
def fetch(path: str, params: dict) -> list:
url = f"{API_BASE_URL}{path}"
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
return resp.json()
def parse_dt(s: str) -> datetime:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
def chart_temp_minmax(start: datetime, end: datetime, label: str) -> bytes:
data = fetch("/weather/daily-aggregated-range", {
"start": start.isoformat(),
"end": end.isoformat(),
})
dates = [parse_dt(d["datetime"]) for d in data]
t_min = [d["min_temperature"] for d in data]
t_max = [d["max_temperature"] for d in data]
fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(dates, t_max, color="#e05c2a", linewidth=2, marker="o", markersize=4,
label="Tages-Maximum")
ax.plot(dates, t_min, color="#2a7be0", linewidth=2, marker="o", markersize=4,
label="Tages-Minimum")
ax.fill_between(dates, t_min, t_max, alpha=0.12, color="#888888")
ax.set_title(f"Temperaturverlauf (Tages-Min / Tages-Max) {label}",
fontweight="bold", pad=14)
ax.set_ylabel("Temperatur (°C)")
ax.set_xlim(start, end)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%d.%m."))
ax.xaxis.set_major_locator(mdates.DayLocator(interval=3))
fig.autofmt_xdate(rotation=0, ha="center")
ax.legend(framealpha=0.8)
fig.tight_layout()
return _fig_to_bytes(fig)
def chart_temp_hourly(start: datetime, end: datetime, label: str) -> bytes:
data = fetch("/weather/hourly-aggregated-range", {
"start": start.isoformat(),
"end": end.isoformat(),
})
dates = [parse_dt(d["datetime"]) for d in data]
temps = [d["temperature"] for d in data]
fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(dates, temps, color="#2a7be0", linewidth=1.5, label="Stundenmittel")
ax.set_title(f"Temperaturverlauf (Stundenmittel) {label}",
fontweight="bold", pad=14)
ax.set_ylabel("Temperatur (°C)")
ax.set_xlim(start, end)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%d.%m."))
ax.xaxis.set_major_locator(mdates.DayLocator(interval=3))
fig.autofmt_xdate(rotation=0, ha="center")
ax.legend(framealpha=0.8)
fig.tight_layout()
return _fig_to_bytes(fig)
def chart_rain_daily(start: datetime, end: datetime, label: str) -> bytes:
# Das rain-Feld ist ein kumulativer Zähler; Tagessumme = max - min je Tag
data = fetch("/weather/range", {
"start": start.isoformat(),
"end": end.isoformat(),
"limit": 50000,
})
by_day: dict[str, list[float]] = defaultdict(list)
for d in data:
if d.get("rain") is not None:
day_key = parse_dt(d["datetime"]).strftime("%Y-%m-%d")
by_day[day_key].append(d["rain"])
dates = []
rain = []
for day_key in sorted(by_day):
vals = by_day[day_key]
daily = max(vals) - min(vals)
if daily < 0: # Zählerreset → nehme max als Tagessumme
daily = max(vals)
dates.append(datetime.fromisoformat(day_key).replace(tzinfo=timezone.utc))
rain.append(round(daily, 1))
fig, ax = plt.subplots(figsize=(12, 5))
colors = ["#2a7be0" if r > 0 else "#ccddee" for r in rain]
ax.bar(dates, rain, color=colors, width=0.7, label="Niederschlag")
total = sum(rain)
ax.set_title(
f"Niederschlag pro Tag {label} (Summe: {total:.1f} mm)",
fontweight="bold", pad=14,
)
ax.set_ylabel("Niederschlag (mm)")
ax.set_xlim(start - timedelta(hours=12), end + timedelta(hours=12))
ax.xaxis.set_major_formatter(mdates.DateFormatter("%d.%m."))
ax.xaxis.set_major_locator(mdates.DayLocator(interval=3))
fig.autofmt_xdate(rotation=0, ha="center")
ax.legend(framealpha=0.8)
fig.tight_layout()
return _fig_to_bytes(fig)
def _fig_to_bytes(fig: plt.Figure) -> bytes:
ax = fig.axes[0]
ax.text(0.99, 0.03, "Wetterstation der Sternwarte Welzheim",
ha="right", va="bottom", fontsize=9, color="#bbbbbb",
transform=ax.transAxes)
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
fig.savefig(f.name, dpi=150, bbox_inches="tight")
path = f.name
plt.close(fig)
with open(path, "rb") as f:
data = f.read()
os.unlink(path)
return data
def send_email(charts: dict[str, bytes], label: str) -> None:
if not all([MAIL_SERVER, MAIL_USER, MAIL_PASSWORD, MAIL_FROM, MAIL_TO]):
print("E-Mail-Konfiguration unvollständig Mail wird nicht gesendet.")
return
msg = MIMEMultipart("mixed")
msg["Subject"] = f"{MAIL_SUBJECT} {label}"
msg["From"] = MAIL_FROM
msg["To"] = ", ".join(MAIL_TO)
body = MIMEText(
f"<html><body>"
f"<h2>Wetterbericht {label}</h2>"
f"<p>Anbei die Wetterdiagramme für {label}.</p>"
+ "".join(
f'<p><strong>{name}</strong><br>'
f'<img src="cid:{cid}" style="max-width:100%;"/></p>'
for (name, _), cid in zip(
charts.items(),
["chart1@weather", "chart2@weather", "chart3@weather"],
)
)
+ "</body></html>",
"html",
)
msg.attach(body)
filenames = [
"temperatur_minmax.png",
"temperatur_stundenmittel.png",
"niederschlag_taglich.png",
]
for (name, png), cid, fname in zip(
charts.items(),
["chart1@weather", "chart2@weather", "chart3@weather"],
filenames,
):
img = MIMEImage(png, name=fname)
img.add_header("Content-ID", f"<{cid}>")
img.add_header("Content-Disposition", "inline", filename=fname)
msg.attach(img)
with smtplib.SMTP(MAIL_SERVER, MAIL_PORT) as smtp:
smtp.ehlo()
smtp.starttls()
smtp.login(MAIL_USER, MAIL_PASSWORD)
smtp.send_message(msg)
print(f"Mail gesendet an: {', '.join(MAIL_TO)}")
def main() -> None:
start, end, label = last_month_range()
print(f"Hole Wetterdaten für {label} ({start.date()} {end.date()}) …")
charts = {
"Tages-Min / Tages-Max Temperatur": chart_temp_minmax(start, end, label),
"Temperatur Stundenmittel": chart_temp_hourly(start, end, label),
"Niederschlag pro Tag": chart_rain_daily(start, end, label),
}
print("Diagramme erstellt.")
os.makedirs("images", exist_ok=True)
for fname, (_, png) in zip(
["temperatur_minmax.png", "temperatur_stundenmittel.png", "niederschlag_taglich.png"],
charts.items(),
):
path = os.path.join("images", fname)
with open(path, "wb") as f:
f.write(png)
print(f"{path}")
send_email(charts, label)
if __name__ == "__main__":
main()