Initial commit - copied workspace after database cleanup
This commit is contained in:
8
scheduler/Dockerfile
Normal file
8
scheduler/Dockerfile
Normal file
@@ -0,0 +1,8 @@
|
||||
FROM python:3.13-slim
|
||||
WORKDIR /app
|
||||
COPY scheduler/requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
COPY scheduler/ ./scheduler
|
||||
COPY models/ ./models
|
||||
ENV PYTHONPATH=/app
|
||||
CMD ["python", "-m", "scheduler.scheduler"]
|
||||
113
scheduler/db_utils.py
Normal file
113
scheduler/db_utils.py
Normal file
@@ -0,0 +1,113 @@
|
||||
# scheduler/db_utils.py
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
from datetime import datetime
|
||||
from sqlalchemy.orm import sessionmaker, joinedload
|
||||
from sqlalchemy import create_engine
|
||||
from models.models import Event, EventMedia
|
||||
|
||||
load_dotenv('/workspace/.env')
|
||||
|
||||
# DB-URL aus Umgebungsvariable oder Fallback
|
||||
DB_CONN = os.environ.get("DB_CONN", "mysql+pymysql://user:password@db/dbname")
|
||||
engine = create_engine(DB_CONN)
|
||||
Session = sessionmaker(bind=engine)
|
||||
|
||||
# Base URL from .env for file URLs
|
||||
API_BASE_URL = os.environ.get("API_BASE_URL", "http://server:8000")
|
||||
|
||||
|
||||
def get_active_events(start: datetime, end: datetime, group_id: int = None):
|
||||
session = Session()
|
||||
try:
|
||||
# Now this will work with the relationship defined
|
||||
query = session.query(Event).options(
|
||||
joinedload(Event.event_media)
|
||||
).filter(Event.is_active == True)
|
||||
|
||||
if start and end:
|
||||
query = query.filter(Event.start < end, Event.end > start)
|
||||
if group_id:
|
||||
query = query.filter(Event.group_id == group_id)
|
||||
|
||||
events = query.all()
|
||||
|
||||
formatted_events = []
|
||||
for event in events:
|
||||
formatted_event = format_event_with_media(event)
|
||||
formatted_events.append(formatted_event)
|
||||
|
||||
return formatted_events
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def format_event_with_media(event):
|
||||
"""Transform Event + EventMedia into client-expected format"""
|
||||
event_dict = {
|
||||
"id": event.id,
|
||||
"title": event.title,
|
||||
"start": str(event.start),
|
||||
"end": str(event.end),
|
||||
"group_id": event.group_id,
|
||||
}
|
||||
|
||||
# Now you can directly access event.event_media
|
||||
import logging
|
||||
if event.event_media:
|
||||
media = event.event_media
|
||||
|
||||
if event.event_type.value == "presentation":
|
||||
event_dict["presentation"] = {
|
||||
"type": "slideshow",
|
||||
"files": [],
|
||||
"slide_interval": event.slideshow_interval or 5000,
|
||||
"auto_advance": True
|
||||
}
|
||||
|
||||
# Debug: log media_type
|
||||
logging.debug(
|
||||
f"[Scheduler] EventMedia id={media.id} media_type={getattr(media.media_type, 'value', str(media.media_type))}")
|
||||
|
||||
# Check for PDF conversion for ppt/pptx/odp
|
||||
from sqlalchemy.orm import scoped_session
|
||||
from models.models import Conversion, ConversionStatus
|
||||
session = scoped_session(Session)
|
||||
pdf_url = None
|
||||
if getattr(media.media_type, 'value', str(media.media_type)) in ("ppt", "pptx", "odp"):
|
||||
conversion = session.query(Conversion).filter_by(
|
||||
source_event_media_id=media.id,
|
||||
target_format="pdf",
|
||||
status=ConversionStatus.ready
|
||||
).order_by(Conversion.completed_at.desc()).first()
|
||||
logging.debug(
|
||||
f"[Scheduler] Conversion lookup for media_id={media.id}: found={bool(conversion)}, path={getattr(conversion, 'target_path', None) if conversion else None}")
|
||||
if conversion and conversion.target_path:
|
||||
# Serve via /api/files/converted/<path>
|
||||
pdf_url = f"{API_BASE_URL}/api/files/converted/{conversion.target_path}"
|
||||
session.remove()
|
||||
|
||||
if pdf_url:
|
||||
filename = os.path.basename(pdf_url)
|
||||
event_dict["presentation"]["files"].append({
|
||||
"name": filename,
|
||||
"url": pdf_url,
|
||||
"checksum": None,
|
||||
"size": None
|
||||
})
|
||||
logging.info(
|
||||
f"[Scheduler] Using converted PDF for event_media_id={media.id}: {pdf_url}")
|
||||
elif media.file_path:
|
||||
filename = os.path.basename(media.file_path)
|
||||
event_dict["presentation"]["files"].append({
|
||||
"name": filename,
|
||||
"url": f"{API_BASE_URL}/api/files/{media.id}/{filename}",
|
||||
"checksum": None,
|
||||
"size": None
|
||||
})
|
||||
logging.info(
|
||||
f"[Scheduler] Using original file for event_media_id={media.id}: {filename}")
|
||||
|
||||
# Add other event types...
|
||||
|
||||
return event_dict
|
||||
4
scheduler/requirements.txt
Normal file
4
scheduler/requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
paho-mqtt
|
||||
sqlalchemy
|
||||
pymysql
|
||||
python-dotenv
|
||||
112
scheduler/scheduler.py
Normal file
112
scheduler/scheduler.py
Normal file
@@ -0,0 +1,112 @@
|
||||
# scheduler/scheduler.py
|
||||
|
||||
import os
|
||||
import logging
|
||||
from .db_utils import get_active_events
|
||||
import paho.mqtt.client as mqtt
|
||||
import json
|
||||
import datetime
|
||||
import time
|
||||
|
||||
# Logging-Konfiguration
|
||||
ENV = os.getenv("ENV", "development")
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG" if ENV == "development" else "INFO")
|
||||
LOG_PATH = os.path.join(os.path.dirname(__file__), "scheduler.log")
|
||||
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
|
||||
log_handlers = []
|
||||
if ENV == "production":
|
||||
from logging.handlers import RotatingFileHandler
|
||||
log_handlers.append(RotatingFileHandler(
|
||||
LOG_PATH, maxBytes=2*1024*1024, backupCount=5, encoding="utf-8"))
|
||||
else:
|
||||
log_handlers.append(logging.FileHandler(LOG_PATH, encoding="utf-8"))
|
||||
if os.getenv("DEBUG_MODE", "1" if ENV == "development" else "0") in ("1", "true", "True"):
|
||||
log_handlers.append(logging.StreamHandler())
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=log_handlers
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
# Fix für die veraltete API - explizit callback_api_version setzen
|
||||
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
|
||||
client.reconnect_delay_set(min_delay=1, max_delay=30)
|
||||
|
||||
POLL_INTERVAL = 30 # Sekunden, Empfehlung für seltene Änderungen
|
||||
# 0 = aus; z.B. 600 für alle 10 Min
|
||||
REFRESH_SECONDS = int(os.getenv("REFRESH_SECONDS", "0"))
|
||||
last_payloads = {} # group_id -> payload
|
||||
last_published_at = {} # group_id -> epoch seconds
|
||||
|
||||
# Beim (Re-)Connect alle bekannten retained Payloads erneut senden
|
||||
def on_connect(client, userdata, flags, reasonCode, properties=None):
|
||||
logging.info(
|
||||
f"MQTT connected (reasonCode={reasonCode}) - republishing {len(last_payloads)} groups")
|
||||
for gid, payload in last_payloads.items():
|
||||
topic = f"infoscreen/events/{gid}"
|
||||
client.publish(topic, payload, retain=True)
|
||||
|
||||
client.on_connect = on_connect
|
||||
|
||||
client.connect("mqtt", 1883)
|
||||
client.loop_start()
|
||||
|
||||
while True:
|
||||
now = datetime.datetime.now(datetime.timezone.utc)
|
||||
# Hole alle aktiven Events (bereits formatierte Dictionaries)
|
||||
events = get_active_events(now, now)
|
||||
|
||||
# Gruppiere Events nach group_id
|
||||
groups = {}
|
||||
for event in events:
|
||||
gid = event.get("group_id")
|
||||
if gid not in groups:
|
||||
groups[gid] = []
|
||||
# Event ist bereits ein Dictionary im gewünschten Format
|
||||
groups[gid].append(event)
|
||||
|
||||
# Sende pro Gruppe die Eventliste als retained Message, nur bei Änderung
|
||||
for gid, event_list in groups.items():
|
||||
# stabile Reihenfolge, um unnötige Publishes zu vermeiden
|
||||
event_list.sort(key=lambda e: (e.get("start"), e.get("id")))
|
||||
payload = json.dumps(
|
||||
event_list, sort_keys=True, separators=(",", ":"))
|
||||
topic = f"infoscreen/events/{gid}"
|
||||
|
||||
should_send = (last_payloads.get(gid) != payload)
|
||||
if not should_send and REFRESH_SECONDS:
|
||||
last_ts = last_published_at.get(gid, 0)
|
||||
if time.time() - last_ts >= REFRESH_SECONDS:
|
||||
should_send = True
|
||||
|
||||
if should_send:
|
||||
result = client.publish(topic, payload, retain=True)
|
||||
if result.rc != mqtt.MQTT_ERR_SUCCESS:
|
||||
logging.error(
|
||||
f"Fehler beim Publish für Gruppe {gid}: {mqtt.error_string(result.rc)}")
|
||||
else:
|
||||
logging.info(f"Events für Gruppe {gid} gesendet")
|
||||
last_payloads[gid] = payload
|
||||
last_published_at[gid] = time.time()
|
||||
|
||||
# Entferne Gruppen, die nicht mehr existieren (leere retained Message senden)
|
||||
for gid in list(last_payloads.keys()):
|
||||
if gid not in groups:
|
||||
topic = f"infoscreen/events/{gid}"
|
||||
result = client.publish(topic, payload="[]", retain=True)
|
||||
if result.rc != mqtt.MQTT_ERR_SUCCESS:
|
||||
logging.error(
|
||||
f"Fehler beim Entfernen für Gruppe {gid}: {mqtt.error_string(result.rc)}")
|
||||
else:
|
||||
logging.info(
|
||||
f"Events für Gruppe {gid} entfernt (leere retained Message gesendet)")
|
||||
del last_payloads[gid]
|
||||
last_published_at.pop(gid, None)
|
||||
|
||||
time.sleep(POLL_INTERVAL)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user