Scheduler: Refactor database utilities and
scheduler logic
This commit is contained in:
Binary file not shown.
25
scheduler/db_utils.py
Normal file
25
scheduler/db_utils.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
# scheduler/db_utils.py
|
||||||
|
from models import Event
|
||||||
|
from sqlalchemy import create_engine
|
||||||
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
from datetime import datetime
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
sys.path.append('/workspace/server')
|
||||||
|
|
||||||
|
# DB-URL aus Umgebungsvariable oder Fallback
|
||||||
|
DB_CONN = os.environ.get("DB_CONN", "mysql+pymysql://user:password@db/dbname")
|
||||||
|
engine = create_engine(DB_CONN)
|
||||||
|
Session = sessionmaker(bind=engine)
|
||||||
|
|
||||||
|
|
||||||
|
def get_active_events(start: datetime, end: datetime, group_id: int = None):
|
||||||
|
session = Session()
|
||||||
|
query = session.query(Event).filter(Event.is_active == True)
|
||||||
|
if start and end:
|
||||||
|
query = query.filter(Event.start < end, Event.end > start)
|
||||||
|
if group_id:
|
||||||
|
query = query.filter(Event.group_id == group_id)
|
||||||
|
events = query.all()
|
||||||
|
session.close()
|
||||||
|
return events
|
||||||
@@ -1,19 +1,41 @@
|
|||||||
# scheduler/scheduler.py
|
# scheduler/scheduler.py
|
||||||
import time
|
from db_utils import get_active_events
|
||||||
import paho.mqtt.client as mqtt
|
import paho.mqtt.client as mqtt
|
||||||
|
import json
|
||||||
|
import datetime
|
||||||
|
import time
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
# Fix für die veraltete API - explizit callback_api_version setzen
|
# Fix für die veraltete API - explizit callback_api_version setzen
|
||||||
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
|
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
|
||||||
|
|
||||||
# Im Docker-Netzwerk: Hostname des MQTT-Brokers ist "mqtt", nicht "localhost"
|
|
||||||
client.connect("mqtt", 1883)
|
client.connect("mqtt", 1883)
|
||||||
|
|
||||||
|
POLL_INTERVAL = 10 # Sekunden
|
||||||
|
last_sent_event_ids = set()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# Hier später: Events aus DB lesen und MQTT-Nachricht senden
|
now = datetime.datetime.now()
|
||||||
print("Scheduler läuft...")
|
# Hole alle Events, die jetzt aktiv sind (start < now < end)
|
||||||
time.sleep(10)
|
events = get_active_events(now, now)
|
||||||
|
current_event_ids = set(event.id for event in events)
|
||||||
|
|
||||||
|
# Sende nur neue Events (die noch nicht gesendet wurden)
|
||||||
|
new_event_ids = current_event_ids - last_sent_event_ids
|
||||||
|
for event in events:
|
||||||
|
if event.id in new_event_ids:
|
||||||
|
# Beispiel: Sende Event-Daten als JSON auf Topic "infoscreen/events"
|
||||||
|
payload = json.dumps({
|
||||||
|
"id": event.id,
|
||||||
|
"title": getattr(event, "title", ""),
|
||||||
|
"start": str(getattr(event, "start", "")),
|
||||||
|
"end": str(getattr(event, "end", "")),
|
||||||
|
"group_id": getattr(event, "group_id", None),
|
||||||
|
})
|
||||||
|
client.publish("infoscreen/events", payload)
|
||||||
|
print(f"Event {event.id} gesendet: {payload}")
|
||||||
|
|
||||||
|
last_sent_event_ids = current_event_ids
|
||||||
|
time.sleep(POLL_INTERVAL)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user