# scheduler/db_utils.py from dotenv import load_dotenv import os from datetime import datetime import logging from sqlalchemy.orm import sessionmaker, joinedload from sqlalchemy import create_engine, or_, and_, text from models.models import Event, EventMedia, EventException, SystemSetting from dateutil.rrule import rrulestr from urllib.request import Request, urlopen from datetime import timezone # Load .env only in development to mirror server/database.py behavior if os.getenv("ENV", "development") == "development": # Expect .env at workspace root load_dotenv('/workspace/.env') # DB-URL aus Umgebungsvariable oder Fallback wie im Server DB_URL = os.environ.get("DB_CONN") if not DB_URL: DB_USER = os.environ.get("DB_USER", "infoscreen_admin") DB_PASSWORD = os.environ.get("DB_PASSWORD", "KqtpM7wmNd&mFKs") DB_HOST = os.environ.get("DB_HOST", "db") DB_NAME = os.environ.get("DB_NAME", "infoscreen_by_taa") DB_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}" print(f"[Scheduler] Using DB_URL: {DB_URL}") engine = create_engine(DB_URL) # Proactive connectivity check to surface errors early try: with engine.connect() as conn: conn.execute(text("SELECT 1")) print("[Scheduler] DB connectivity OK") except Exception as db_exc: print(f"[Scheduler] DB connectivity FAILED: {db_exc}") Session = sessionmaker(bind=engine) # Base URL from .env for file URLs API_BASE_URL = os.environ.get("API_BASE_URL", "http://server:8000") # Cache conversion decisions per media to avoid repeated lookups/logs within the scheduler lifetime _media_conversion_cache = {} # media_id -> pdf_url or None _media_decision_logged = set() # media_id(s) already logged def get_active_events(start: datetime, end: datetime, group_id: int = None): session = Session() try: # Now this will work with the relationship defined query = session.query(Event).options( joinedload(Event.event_media) ).filter(Event.is_active == True) if start and end: # Include: # 1) Non-recurring events that overlap [start, end] # 2) Recurring events whose recurrence window intersects [start, end] # We consider dtstart (Event.start) <= end and (recurrence_end is NULL or >= start) non_recurring_overlap = and_( Event.recurrence_rule == None, Event.start < end, Event.end > start, ) recurring_window = and_( Event.recurrence_rule != None, Event.start <= end, or_(Event.recurrence_end == None, Event.recurrence_end >= start), ) query = query.filter(or_(non_recurring_overlap, recurring_window)) if group_id: query = query.filter(Event.group_id == group_id) # Log base event count before expansion try: base_count = query.count() # Additional diagnostics: split counts non_rec_q = session.query(Event.id).filter(Event.is_active == True) rec_q = session.query(Event.id).filter(Event.is_active == True) if start and end: non_rec_q = non_rec_q.filter(non_recurring_overlap) rec_q = rec_q.filter(recurring_window) if group_id: non_rec_q = non_rec_q.filter(Event.group_id == group_id) rec_q = rec_q.filter(Event.group_id == group_id) non_rec_count = non_rec_q.count() rec_count = rec_q.count() logging.debug(f"[Scheduler] Base events total={base_count} non_recurring_overlap={non_rec_count} recurring_window={rec_count}") except Exception: base_count = None events = query.all() logging.debug(f"[Scheduler] Base events fetched: {len(events)} (count={base_count})") if len(events) == 0: # Quick probe: are there any active events at all? try: any_active = session.query(Event).filter(Event.is_active == True).count() logging.info(f"[Scheduler] Active events in DB (any group, any time): {any_active}") except Exception as e: logging.warning(f"[Scheduler] Could not count active events: {e}") formatted_events = [] for event in events: # If event has RRULE, expand into instances within [start, end] if event.recurrence_rule: try: # Ensure dtstart is timezone-aware (UTC if naive) dtstart = event.start if dtstart.tzinfo is None: dtstart = dtstart.replace(tzinfo=timezone.utc) r = rrulestr(event.recurrence_rule, dtstart=dtstart) # Ensure query bounds are timezone-aware query_start = start if start.tzinfo else start.replace(tzinfo=timezone.utc) query_end = end if end.tzinfo else end.replace(tzinfo=timezone.utc) # Clamp by recurrence_end if present if getattr(event, "recurrence_end", None): rec_end = event.recurrence_end if rec_end and rec_end.tzinfo is None: rec_end = rec_end.replace(tzinfo=timezone.utc) if rec_end and rec_end < query_end: query_end = rec_end # iterate occurrences within range # Use a lookback equal to the event's duration to catch occurrences that started # before query_start but are still running within the window. duration = (event.end - event.start) if (event.end and event.start) else None lookback_start = query_start if duration: lookback_start = query_start - duration occ_starts = r.between(lookback_start, query_end, inc=True) for occ_start in occ_starts: occ_end = (occ_start + duration) if duration else occ_start # Apply exceptions exc = session.query(EventException).filter( EventException.event_id == event.id, EventException.exception_date == occ_start.date() ).first() if exc: if exc.is_skipped: continue if exc.override_start: occ_start = exc.override_start if exc.override_end: occ_end = exc.override_end # Filter out instances that do not overlap [start, end] if not (occ_start < end and occ_end > start): continue inst = format_event_with_media(event) # Apply overrides to title/description if provided if exc and exc.override_title: inst["title"] = exc.override_title if exc and exc.override_description: inst["description"] = exc.override_description inst["start"] = occ_start.isoformat() inst["end"] = occ_end.isoformat() inst["occurrence_of_id"] = event.id formatted_events.append(inst) except Exception as e: # On parse error, fall back to single event formatting logging.warning(f"Failed to parse recurrence rule for event {event.id}: {e}") formatted_events.append(format_event_with_media(event)) else: formatted_events.append(format_event_with_media(event)) return formatted_events finally: session.close() def get_system_setting_value(key: str, default: str | None = None) -> str | None: """Fetch a system setting value by key from DB. Returns the setting's string value or the provided default if missing. """ session = Session() try: setting = session.query(SystemSetting).filter_by(key=key).first() return setting.value if setting else default except Exception as e: logging.debug(f"[Scheduler] Failed to read system setting '{key}': {e}") return default finally: session.close() def format_event_with_media(event): """Transform Event + EventMedia into client-expected format""" event_dict = { "id": event.id, "title": event.title, "start": str(event.start), "end": str(event.end), "group_id": event.group_id, "event_type": event.event_type.value if event.event_type else None, # Carry recurrence metadata for consumers if needed "recurrence_rule": getattr(event, "recurrence_rule", None), "recurrence_end": (event.recurrence_end.isoformat() if getattr(event, "recurrence_end", None) else None), } # Now you can directly access event.event_media if event.event_media: media = event.event_media if event.event_type.value == "presentation": event_dict["presentation"] = { "type": "slideshow", "files": [], "slide_interval": event.slideshow_interval or 5000, "auto_advance": True, "page_progress": getattr(event, "page_progress", True), "auto_progress": getattr(event, "auto_progress", True) } # Avoid per-call media-type debug to reduce log noise # Decide file URL with caching to avoid repeated DB lookups/logs pdf_url = _media_conversion_cache.get(media.id, None) if pdf_url is None and getattr(media.media_type, 'value', str(media.media_type)) in ("ppt", "pptx", "odp"): from sqlalchemy.orm import scoped_session from models.models import Conversion, ConversionStatus session = scoped_session(Session) try: conversion = session.query(Conversion).filter_by( source_event_media_id=media.id, target_format="pdf", status=ConversionStatus.ready ).order_by(Conversion.completed_at.desc()).first() logging.debug( f"[Scheduler] Conversion lookup for media_id={media.id}: found={bool(conversion)}, path={getattr(conversion, 'target_path', None) if conversion else None}") if conversion and conversion.target_path: pdf_url = f"{API_BASE_URL}/api/files/converted/{conversion.target_path}" finally: session.remove() # Cache the decision (even if None) to avoid repeated lookups in the same run _media_conversion_cache[media.id] = pdf_url # Build file entry and log decision only once per media if pdf_url: filename = os.path.basename(pdf_url) event_dict["presentation"]["files"].append({ "name": filename, "url": pdf_url, "checksum": None, "size": None }) if media.id not in _media_decision_logged: logging.debug( f"[Scheduler] Using converted PDF for event_media_id={media.id}: {pdf_url}") _media_decision_logged.add(media.id) elif media.file_path: filename = os.path.basename(media.file_path) event_dict["presentation"]["files"].append({ "name": filename, "url": f"{API_BASE_URL}/api/files/{media.id}/{filename}", "checksum": None, "size": None }) if media.id not in _media_decision_logged: logging.debug( f"[Scheduler] Using original file for event_media_id={media.id}: {filename}") _media_decision_logged.add(media.id) # Handle website and webuntis events (both display a website) elif event.event_type.value in ("website", "webuntis"): event_dict["website"] = { "type": "browser", "url": media.url if media.url else None } if media.id not in _media_decision_logged: logging.debug( f"[Scheduler] Using website URL for event_media_id={media.id} (type={event.event_type.value}): {media.url}") _media_decision_logged.add(media.id) # Handle video events elif event.event_type.value == "video": filename = os.path.basename(media.file_path) if media.file_path else "video" # Use streaming endpoint for better video playback support stream_url = f"{API_BASE_URL}/api/eventmedia/stream/{media.id}/{filename}" # Best-effort: probe the streaming endpoint for cheap metadata (HEAD request) mime_type = None size = None accept_ranges = False try: req = Request(stream_url, method='HEAD') with urlopen(req, timeout=2) as resp: # getheader returns None if missing mime_type = resp.getheader('Content-Type') length = resp.getheader('Content-Length') if length: try: size = int(length) except Exception: size = None accept_ranges = (resp.getheader('Accept-Ranges') or '').lower() == 'bytes' except Exception as e: # Don't fail the scheduler for probe errors; log once per media if media.id not in _media_decision_logged: logging.debug(f"[Scheduler] HEAD probe for media_id={media.id} failed: {e}") event_dict["video"] = { "type": "media", "url": stream_url, "autoplay": getattr(event, "autoplay", True), "loop": getattr(event, "loop", False), "volume": getattr(event, "volume", 0.8), # Best-effort metadata to help clients decide how to stream "mime_type": mime_type, "size": size, "accept_ranges": accept_ranges, # Optional richer info (may be null if not available): duration (seconds), resolution, bitrate "duration": None, "resolution": None, "bitrate": None, "qualities": [], "thumbnails": [], "checksum": None, } if media.id not in _media_decision_logged: logging.debug( f"[Scheduler] Using video streaming URL for event_media_id={media.id}: {filename}") _media_decision_logged.add(media.id) # Add other event types (message, etc.) here as needed... return event_dict