# scheduler/scheduler.py import os import logging from .db_utils import get_active_events import paho.mqtt.client as mqtt import json import datetime import time # Logging-Konfiguration ENV = os.getenv("ENV", "development") LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG" if ENV == "development" else "INFO") LOG_PATH = os.path.join(os.path.dirname(__file__), "scheduler.log") os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True) log_handlers = [] if ENV == "production": from logging.handlers import RotatingFileHandler log_handlers.append(RotatingFileHandler( LOG_PATH, maxBytes=2*1024*1024, backupCount=5, encoding="utf-8")) else: log_handlers.append(logging.FileHandler(LOG_PATH, encoding="utf-8")) if os.getenv("DEBUG_MODE", "1" if ENV == "development" else "0") in ("1", "true", "True"): log_handlers.append(logging.StreamHandler()) logging.basicConfig( level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), format="%(asctime)s [%(levelname)s] %(message)s", handlers=log_handlers ) def main(): # Fix für die veraltete API - explizit callback_api_version setzen client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) client.reconnect_delay_set(min_delay=1, max_delay=30) POLL_INTERVAL = 30 # Sekunden, Empfehlung für seltene Änderungen # 0 = aus; z.B. 600 für alle 10 Min REFRESH_SECONDS = int(os.getenv("REFRESH_SECONDS", "0")) # Konfigurierbares Zeitfenster in Tagen (Standard: 7) WINDOW_DAYS = int(os.getenv("EVENTS_WINDOW_DAYS", "7")) last_payloads = {} # group_id -> payload last_published_at = {} # group_id -> epoch seconds # Beim (Re-)Connect alle bekannten retained Payloads erneut senden def on_connect(client, userdata, flags, reasonCode, properties=None): logging.info( f"MQTT connected (reasonCode={reasonCode}) - republishing {len(last_payloads)} groups") for gid, payload in last_payloads.items(): topic = f"infoscreen/events/{gid}" client.publish(topic, payload, retain=True) client.on_connect = on_connect client.connect("mqtt", 1883) client.loop_start() while True: now = datetime.datetime.now(datetime.timezone.utc) # Query window: next N days to capture upcoming events and recurring instances # Clients need to know what's coming, not just what's active right now end_window = now + datetime.timedelta(days=WINDOW_DAYS) logging.debug(f"Fetching events window start={now.isoformat()} end={end_window.isoformat()} (days={WINDOW_DAYS})") # Hole alle aktiven Events (bereits formatierte Dictionaries) try: events = get_active_events(now, end_window) logging.debug(f"Fetched {len(events)} events for publishing window") except Exception as e: logging.exception(f"Error while fetching events: {e}") events = [] # Filter: Only include events active at 'now' active_events = [] for event in events: start = event.get("start") end = event.get("end") # Parse ISO strings to datetime try: start_dt = datetime.datetime.fromisoformat(start) end_dt = datetime.datetime.fromisoformat(end) # Make both tz-aware (UTC) if naive if start_dt.tzinfo is None: start_dt = start_dt.replace(tzinfo=datetime.timezone.utc) if end_dt.tzinfo is None: end_dt = end_dt.replace(tzinfo=datetime.timezone.utc) except Exception: continue if start_dt <= now < end_dt: active_events.append(event) # Gruppiere nur aktive Events nach group_id groups = {} for event in active_events: gid = event.get("group_id") if gid not in groups: groups[gid] = [] groups[gid].append(event) if not groups: logging.debug("No events grouped for any client group in current window") # Sende pro Gruppe die Eventliste als retained Message, nur bei Änderung for gid, event_list in groups.items(): # stabile Reihenfolge, um unnötige Publishes zu vermeiden event_list.sort(key=lambda e: (e.get("start"), e.get("id"))) payload = json.dumps( event_list, sort_keys=True, separators=(",", ":")) topic = f"infoscreen/events/{gid}" should_send = (last_payloads.get(gid) != payload) if not should_send and REFRESH_SECONDS: last_ts = last_published_at.get(gid, 0) if time.time() - last_ts >= REFRESH_SECONDS: should_send = True if should_send: result = client.publish(topic, payload, retain=True) if result.rc != mqtt.MQTT_ERR_SUCCESS: logging.error( f"Fehler beim Publish für Gruppe {gid}: {mqtt.error_string(result.rc)}") else: logging.info(f"Events für Gruppe {gid} gesendet (count={len(event_list)})") last_payloads[gid] = payload last_published_at[gid] = time.time() # Entferne Gruppen, die nicht mehr existieren (leere retained Message senden) inactive_gids = set(last_payloads.keys()) - set(groups.keys()) for gid in inactive_gids: topic = f"infoscreen/events/{gid}" result = client.publish(topic, payload="[]", retain=True) if result.rc != mqtt.MQTT_ERR_SUCCESS: logging.error( f"Fehler beim Entfernen für Gruppe {gid}: {mqtt.error_string(result.rc)}") else: logging.info( f"Events für Gruppe {gid} entfernt (leere retained Message gesendet)") del last_payloads[gid] last_published_at.pop(gid, None) time.sleep(POLL_INTERVAL) if __name__ == "__main__": main()