Files
infoscreen/scheduler/db_utils.py
2025-10-10 15:20:14 +00:00

114 lines
4.2 KiB
Python

# scheduler/db_utils.py
from dotenv import load_dotenv
import os
from datetime import datetime
from sqlalchemy.orm import sessionmaker, joinedload
from sqlalchemy import create_engine
from models.models import Event, EventMedia
load_dotenv('/workspace/.env')
# DB-URL aus Umgebungsvariable oder Fallback
DB_CONN = os.environ.get("DB_CONN", "mysql+pymysql://user:password@db/dbname")
engine = create_engine(DB_CONN)
Session = sessionmaker(bind=engine)
# Base URL from .env for file URLs
API_BASE_URL = os.environ.get("API_BASE_URL", "http://server:8000")
def get_active_events(start: datetime, end: datetime, group_id: int = None):
session = Session()
try:
# Now this will work with the relationship defined
query = session.query(Event).options(
joinedload(Event.event_media)
).filter(Event.is_active == True)
if start and end:
query = query.filter(Event.start < end, Event.end > start)
if group_id:
query = query.filter(Event.group_id == group_id)
events = query.all()
formatted_events = []
for event in events:
formatted_event = format_event_with_media(event)
formatted_events.append(formatted_event)
return formatted_events
finally:
session.close()
def format_event_with_media(event):
"""Transform Event + EventMedia into client-expected format"""
event_dict = {
"id": event.id,
"title": event.title,
"start": str(event.start),
"end": str(event.end),
"group_id": event.group_id,
}
# Now you can directly access event.event_media
import logging
if event.event_media:
media = event.event_media
if event.event_type.value == "presentation":
event_dict["presentation"] = {
"type": "slideshow",
"files": [],
"slide_interval": event.slideshow_interval or 5000,
"auto_advance": True
}
# Debug: log media_type
logging.debug(
f"[Scheduler] EventMedia id={media.id} media_type={getattr(media.media_type, 'value', str(media.media_type))}")
# Check for PDF conversion for ppt/pptx/odp
from sqlalchemy.orm import scoped_session
from models.models import Conversion, ConversionStatus
session = scoped_session(Session)
pdf_url = None
if getattr(media.media_type, 'value', str(media.media_type)) in ("ppt", "pptx", "odp"):
conversion = session.query(Conversion).filter_by(
source_event_media_id=media.id,
target_format="pdf",
status=ConversionStatus.ready
).order_by(Conversion.completed_at.desc()).first()
logging.debug(
f"[Scheduler] Conversion lookup for media_id={media.id}: found={bool(conversion)}, path={getattr(conversion, 'target_path', None) if conversion else None}")
if conversion and conversion.target_path:
# Serve via /api/files/converted/<path>
pdf_url = f"{API_BASE_URL}/api/files/converted/{conversion.target_path}"
session.remove()
if pdf_url:
filename = os.path.basename(pdf_url)
event_dict["presentation"]["files"].append({
"name": filename,
"url": pdf_url,
"checksum": None,
"size": None
})
logging.info(
f"[Scheduler] Using converted PDF for event_media_id={media.id}: {pdf_url}")
elif media.file_path:
filename = os.path.basename(media.file_path)
event_dict["presentation"]["files"].append({
"name": filename,
"url": f"{API_BASE_URL}/api/files/{media.id}/{filename}",
"checksum": None,
"size": None
})
logging.info(
f"[Scheduler] Using original file for event_media_id={media.id}: {filename}")
# Add other event types...
return event_dict