test communication scheduler<->simclient
This commit is contained in:
@@ -1,40 +1,74 @@
|
||||
# scheduler/scheduler.py
|
||||
|
||||
import os
|
||||
import logging
|
||||
from scheduler.db_utils import get_active_events
|
||||
import paho.mqtt.client as mqtt
|
||||
import json
|
||||
import datetime
|
||||
import time
|
||||
|
||||
# Logging-Konfiguration
|
||||
ENV = os.getenv("ENV", "development")
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG" if ENV == "development" else "INFO")
|
||||
LOG_PATH = os.path.join(os.path.dirname(__file__), "scheduler.log")
|
||||
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
|
||||
log_handlers = []
|
||||
if ENV == "production":
|
||||
from logging.handlers import RotatingFileHandler
|
||||
log_handlers.append(RotatingFileHandler(
|
||||
LOG_PATH, maxBytes=2*1024*1024, backupCount=5, encoding="utf-8"))
|
||||
else:
|
||||
log_handlers.append(logging.FileHandler(LOG_PATH, encoding="utf-8"))
|
||||
if os.getenv("DEBUG_MODE", "1" if ENV == "development" else "0") in ("1", "true", "True"):
|
||||
log_handlers.append(logging.StreamHandler())
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=log_handlers
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
# Fix für die veraltete API - explizit callback_api_version setzen
|
||||
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
|
||||
client.connect("mqtt", 1883)
|
||||
|
||||
POLL_INTERVAL = 10 # Sekunden
|
||||
last_sent_event_ids = set()
|
||||
|
||||
POLL_INTERVAL = 30 # Sekunden, Empfehlung für seltene Änderungen
|
||||
last_payloads = {} # group_id -> payload
|
||||
while True:
|
||||
now = datetime.datetime.now()
|
||||
# Hole alle Events, die jetzt aktiv sind (start < now < end)
|
||||
now = datetime.datetime.now(datetime.timezone.utc)
|
||||
# Hole alle aktiven Events (Vergleich mit UTC)
|
||||
events = get_active_events(now, now)
|
||||
current_event_ids = set(event.id for event in events)
|
||||
|
||||
# Sende nur neue Events (die noch nicht gesendet wurden)
|
||||
new_event_ids = current_event_ids - last_sent_event_ids
|
||||
# Gruppiere Events nach group_id
|
||||
groups = {}
|
||||
for event in events:
|
||||
if event.id in new_event_ids:
|
||||
# Beispiel: Sende Event-Daten als JSON auf Topic "infoscreen/events"
|
||||
payload = json.dumps({
|
||||
"id": event.id,
|
||||
"title": getattr(event, "title", ""),
|
||||
"start": str(getattr(event, "start", "")),
|
||||
"end": str(getattr(event, "end", "")),
|
||||
"group_id": getattr(event, "group_id", None),
|
||||
})
|
||||
client.publish("infoscreen/events", payload)
|
||||
print(f"Event {event.id} gesendet: {payload}")
|
||||
|
||||
last_sent_event_ids = current_event_ids
|
||||
gid = getattr(event, "group_id", None)
|
||||
if gid not in groups:
|
||||
groups[gid] = []
|
||||
groups[gid].append({
|
||||
"id": event.id,
|
||||
"title": getattr(event, "title", ""),
|
||||
"start": str(getattr(event, "start", "")),
|
||||
"end": str(getattr(event, "end", "")),
|
||||
"group_id": gid,
|
||||
})
|
||||
# Sende pro Gruppe die Eventliste als retained Message, nur bei Änderung
|
||||
for gid, event_list in groups.items():
|
||||
topic = f"infoscreen/events/{gid}"
|
||||
payload = json.dumps(event_list)
|
||||
if last_payloads.get(gid) != payload:
|
||||
client.publish(topic, payload, retain=True)
|
||||
logging.info(f"Events für Gruppe {gid} gesendet: {payload}")
|
||||
last_payloads[gid] = payload
|
||||
# Entferne Gruppen, die nicht mehr existieren (optional: retained Message löschen)
|
||||
for gid in list(last_payloads.keys()):
|
||||
if gid not in groups:
|
||||
topic = f"infoscreen/events/{gid}"
|
||||
client.publish(topic, payload="[]", retain=True)
|
||||
logging.info(
|
||||
f"Events für Gruppe {gid} entfernt (leere retained Message gesendet)")
|
||||
del last_payloads[gid]
|
||||
time.sleep(POLL_INTERVAL)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user