# scheduler/db_utils.py from dotenv import load_dotenv import os from datetime import datetime, timedelta, timezone import hashlib import json import logging from sqlalchemy.orm import sessionmaker, joinedload from sqlalchemy import create_engine, or_, and_, text import uuid as _uuid_mod from models.models import Event, EventMedia, EventException, SystemSetting, Client, ClientCommand, ProcessStatus from dateutil.rrule import rrulestr from urllib.request import Request, urlopen from datetime import timezone # Load .env only in development to mirror server/database.py behavior if os.getenv("ENV", "development") == "development": # Expect .env at workspace root load_dotenv('/workspace/.env') # DB-URL aus Umgebungsvariable oder Fallback wie im Server DB_URL = os.environ.get("DB_CONN") if not DB_URL: DB_USER = os.environ.get("DB_USER", "infoscreen_admin") DB_PASSWORD = os.environ.get("DB_PASSWORD", "KqtpM7wmNd&mFKs") DB_HOST = os.environ.get("DB_HOST", "db") DB_NAME = os.environ.get("DB_NAME", "infoscreen_by_taa") DB_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}" print(f"[Scheduler] Using DB_URL: {DB_URL}") engine = create_engine(DB_URL) # Proactive connectivity check to surface errors early try: with engine.connect() as conn: conn.execute(text("SELECT 1")) print("[Scheduler] DB connectivity OK") except Exception as db_exc: print(f"[Scheduler] DB connectivity FAILED: {db_exc}") Session = sessionmaker(bind=engine) # Base URL from .env for file URLs API_BASE_URL = os.environ.get("API_BASE_URL", "http://server:8000") # Cache conversion decisions per media to avoid repeated lookups/logs within the scheduler lifetime _media_conversion_cache = {} # media_id -> pdf_url or None _media_decision_logged = set() # media_id(s) already logged def get_active_events(start: datetime, end: datetime, group_id: int = None): session = Session() try: # Now this will work with the relationship defined query = session.query(Event).options( joinedload(Event.event_media) ).filter(Event.is_active == True) if start and end: # Include: # 1) Non-recurring events that overlap [start, end] # 2) Recurring events whose recurrence window intersects [start, end] # We consider dtstart (Event.start) <= end and (recurrence_end is NULL or >= start) non_recurring_overlap = and_( Event.recurrence_rule == None, Event.start < end, Event.end > start, ) recurring_window = and_( Event.recurrence_rule != None, Event.start <= end, or_(Event.recurrence_end == None, Event.recurrence_end >= start), ) query = query.filter(or_(non_recurring_overlap, recurring_window)) if group_id: query = query.filter(Event.group_id == group_id) # Log base event count before expansion try: base_count = query.count() # Additional diagnostics: split counts non_rec_q = session.query(Event.id).filter(Event.is_active == True) rec_q = session.query(Event.id).filter(Event.is_active == True) if start and end: non_rec_q = non_rec_q.filter(non_recurring_overlap) rec_q = rec_q.filter(recurring_window) if group_id: non_rec_q = non_rec_q.filter(Event.group_id == group_id) rec_q = rec_q.filter(Event.group_id == group_id) non_rec_count = non_rec_q.count() rec_count = rec_q.count() logging.debug(f"[Scheduler] Base events total={base_count} non_recurring_overlap={non_rec_count} recurring_window={rec_count}") except Exception: base_count = None events = query.all() logging.debug(f"[Scheduler] Base events fetched: {len(events)} (count={base_count})") if len(events) == 0: # Quick probe: are there any active events at all? try: any_active = session.query(Event).filter(Event.is_active == True).count() logging.info(f"[Scheduler] Active events in DB (any group, any time): {any_active}") except Exception as e: logging.warning(f"[Scheduler] Could not count active events: {e}") formatted_events = [] for event in events: # If event has RRULE, expand into instances within [start, end] if event.recurrence_rule: try: # Ensure dtstart is timezone-aware (UTC if naive) dtstart = event.start if dtstart.tzinfo is None: dtstart = dtstart.replace(tzinfo=timezone.utc) r = rrulestr(event.recurrence_rule, dtstart=dtstart) # Ensure query bounds are timezone-aware query_start = start if start.tzinfo else start.replace(tzinfo=timezone.utc) query_end = end if end.tzinfo else end.replace(tzinfo=timezone.utc) # Clamp by recurrence_end if present if getattr(event, "recurrence_end", None): rec_end = event.recurrence_end if rec_end and rec_end.tzinfo is None: rec_end = rec_end.replace(tzinfo=timezone.utc) if rec_end and rec_end < query_end: query_end = rec_end # iterate occurrences within range # Use a lookback equal to the event's duration to catch occurrences that started # before query_start but are still running within the window. duration = (event.end - event.start) if (event.end and event.start) else None lookback_start = query_start if duration: lookback_start = query_start - duration occ_starts = r.between(lookback_start, query_end, inc=True) for occ_start in occ_starts: occ_end = (occ_start + duration) if duration else occ_start # Apply exceptions exc = session.query(EventException).filter( EventException.event_id == event.id, EventException.exception_date == occ_start.date() ).first() if exc: if exc.is_skipped: continue if exc.override_start: occ_start = exc.override_start if exc.override_end: occ_end = exc.override_end # Filter out instances that do not overlap [start, end] if not (occ_start < end and occ_end > start): continue inst = format_event_with_media(event) # Apply overrides to title/description if provided if exc and exc.override_title: inst["title"] = exc.override_title if exc and exc.override_description: inst["description"] = exc.override_description inst["start"] = occ_start.isoformat() inst["end"] = occ_end.isoformat() inst["occurrence_of_id"] = event.id formatted_events.append(inst) except Exception as e: # On parse error, fall back to single event formatting logging.warning(f"Failed to parse recurrence rule for event {event.id}: {e}") formatted_events.append(format_event_with_media(event)) else: formatted_events.append(format_event_with_media(event)) return formatted_events finally: session.close() def get_system_setting_value(key: str, default: str | None = None) -> str | None: """Fetch a system setting value by key from DB. Returns the setting's string value or the provided default if missing. """ session = Session() try: setting = session.query(SystemSetting).filter_by(key=key).first() return setting.value if setting else default except Exception as e: logging.debug(f"[Scheduler] Failed to read system setting '{key}': {e}") return default finally: session.close() def _parse_utc_datetime(value): """Parse datetime-like values and normalize to timezone-aware UTC.""" if value is None: return None if isinstance(value, datetime): dt = value else: try: dt = datetime.fromisoformat(str(value)) except Exception: return None if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) def _normalize_group_id(group_id): try: return int(group_id) except (TypeError, ValueError): return None def _event_range_from_dict(event): start = _parse_utc_datetime(event.get("start")) end = _parse_utc_datetime(event.get("end")) if start is None or end is None or end <= start: return None return start, end def _merge_ranges(ranges, adjacency_seconds=0): """Merge overlapping or adjacent [start, end] ranges.""" if not ranges: return [] ranges_sorted = sorted(ranges, key=lambda r: (r[0], r[1])) merged = [ranges_sorted[0]] adjacency_delta = max(0, int(adjacency_seconds)) for current_start, current_end in ranges_sorted[1:]: last_start, last_end = merged[-1] if current_start <= last_end or (current_start - last_end).total_seconds() <= adjacency_delta: if current_end > last_end: merged[-1] = (last_start, current_end) else: merged.append((current_start, current_end)) return merged def compute_group_power_intent_basis(events, group_id, now_utc=None, adjacency_seconds=0): """Return pure, deterministic power intent basis for one group at a point in time. The returned mapping intentionally excludes volatile fields such as intent_id, issued_at and expires_at. """ normalized_gid = _normalize_group_id(group_id) effective_now = _parse_utc_datetime(now_utc) or datetime.now(timezone.utc) ranges = [] active_event_ids = [] for event in events or []: if _normalize_group_id(event.get("group_id")) != normalized_gid: continue parsed_range = _event_range_from_dict(event) if parsed_range is None: continue start, end = parsed_range ranges.append((start, end)) if start <= effective_now < end: event_id = event.get("id") if event_id is not None: active_event_ids.append(event_id) merged_ranges = _merge_ranges(ranges, adjacency_seconds=adjacency_seconds) active_window_start = None active_window_end = None for window_start, window_end in merged_ranges: if window_start <= effective_now < window_end: active_window_start = window_start active_window_end = window_end break desired_state = "on" if active_window_start is not None else "off" reason = "active_event" if desired_state == "on" else "no_active_event" return { "schema_version": "1.0", "group_id": normalized_gid, "desired_state": desired_state, "reason": reason, "poll_interval_sec": None, "event_window_start": active_window_start.isoformat().replace("+00:00", "Z") if active_window_start else None, "event_window_end": active_window_end.isoformat().replace("+00:00", "Z") if active_window_end else None, "active_event_ids": sorted(set(active_event_ids)), } def build_group_power_intent_body(intent_basis, poll_interval_sec): """Build deterministic payload body (without intent_id/issued_at/expires_at).""" body = { "schema_version": intent_basis.get("schema_version", "1.0"), "group_id": intent_basis.get("group_id"), "desired_state": intent_basis.get("desired_state", "off"), "reason": intent_basis.get("reason", "no_active_event"), "poll_interval_sec": int(poll_interval_sec), "event_window_start": intent_basis.get("event_window_start"), "event_window_end": intent_basis.get("event_window_end"), "active_event_ids": list(intent_basis.get("active_event_ids", [])), } return body def compute_group_power_intent_fingerprint(intent_body): """Create a stable hash for semantic transition detection.""" canonical_json = json.dumps(intent_body, sort_keys=True, separators=(",", ":")) return hashlib.sha256(canonical_json.encode("utf-8")).hexdigest() def format_event_with_media(event): """Transform Event + EventMedia into client-expected format""" event_dict = { "id": event.id, "title": event.title, "start": str(event.start), "end": str(event.end), "group_id": event.group_id, "event_type": event.event_type.value if event.event_type else None, # Carry recurrence metadata for consumers if needed "recurrence_rule": getattr(event, "recurrence_rule", None), "recurrence_end": (event.recurrence_end.isoformat() if getattr(event, "recurrence_end", None) else None), } # Now you can directly access event.event_media if event.event_media: media = event.event_media if event.event_type.value == "presentation": event_dict["presentation"] = { "type": "slideshow", "files": [], "slide_interval": event.slideshow_interval or 5000, "auto_advance": True, "page_progress": getattr(event, "page_progress", True), "auto_progress": getattr(event, "auto_progress", True) } # Avoid per-call media-type debug to reduce log noise # Decide file URL with caching to avoid repeated DB lookups/logs pdf_url = _media_conversion_cache.get(media.id, None) if pdf_url is None and getattr(media.media_type, 'value', str(media.media_type)) in ("ppt", "pptx", "odp"): from sqlalchemy.orm import scoped_session from models.models import Conversion, ConversionStatus session = scoped_session(Session) try: conversion = session.query(Conversion).filter_by( source_event_media_id=media.id, target_format="pdf", status=ConversionStatus.ready ).order_by(Conversion.completed_at.desc()).first() logging.debug( f"[Scheduler] Conversion lookup for media_id={media.id}: found={bool(conversion)}, path={getattr(conversion, 'target_path', None) if conversion else None}") if conversion and conversion.target_path: pdf_url = f"{API_BASE_URL}/api/files/converted/{conversion.target_path}" finally: session.remove() # Cache the decision (even if None) to avoid repeated lookups in the same run _media_conversion_cache[media.id] = pdf_url # Build file entry and log decision only once per media if pdf_url: filename = os.path.basename(pdf_url) event_dict["presentation"]["files"].append({ "name": filename, "url": pdf_url, "checksum": None, "size": None }) if media.id not in _media_decision_logged: logging.debug( f"[Scheduler] Using converted PDF for event_media_id={media.id}: {pdf_url}") _media_decision_logged.add(media.id) elif media.file_path: filename = os.path.basename(media.file_path) event_dict["presentation"]["files"].append({ "name": filename, "url": f"{API_BASE_URL}/api/files/{media.id}/{filename}", "checksum": None, "size": None }) if media.id not in _media_decision_logged: logging.debug( f"[Scheduler] Using original file for event_media_id={media.id}: {filename}") _media_decision_logged.add(media.id) # Handle website and webuntis events (both display a website) elif event.event_type.value in ("website", "webuntis"): event_dict["website"] = { "type": "browser", "url": media.url if media.url else None } if media.id not in _media_decision_logged: logging.debug( f"[Scheduler] Using website URL for event_media_id={media.id} (type={event.event_type.value}): {media.url}") _media_decision_logged.add(media.id) # Handle video events elif event.event_type.value == "video": filename = os.path.basename(media.file_path) if media.file_path else "video" # Use streaming endpoint for better video playback support stream_url = f"{API_BASE_URL}/api/eventmedia/stream/{media.id}/{filename}" # Best-effort: probe the streaming endpoint for cheap metadata (HEAD request) mime_type = None size = None accept_ranges = False try: req = Request(stream_url, method='HEAD') with urlopen(req, timeout=2) as resp: # getheader returns None if missing mime_type = resp.getheader('Content-Type') length = resp.getheader('Content-Length') if length: try: size = int(length) except Exception: size = None accept_ranges = (resp.getheader('Accept-Ranges') or '').lower() == 'bytes' except Exception as e: # Don't fail the scheduler for probe errors; log once per media if media.id not in _media_decision_logged: logging.debug(f"[Scheduler] HEAD probe for media_id={media.id} failed: {e}") event_dict["video"] = { "type": "media", "url": stream_url, "autoplay": getattr(event, "autoplay", True), "loop": getattr(event, "loop", False), "volume": getattr(event, "volume", 0.8), "muted": getattr(event, "muted", False), # Best-effort metadata to help clients decide how to stream "mime_type": mime_type, "size": size, "accept_ranges": accept_ranges, # Optional richer info (may be null if not available): duration (seconds), resolution, bitrate "duration": None, "resolution": None, "bitrate": None, "qualities": [], "thumbnails": [], "checksum": None, } if media.id not in _media_decision_logged: logging.debug( f"[Scheduler] Using video streaming URL for event_media_id={media.id}: {filename}") _media_decision_logged.add(media.id) # Add other event types (message, etc.) here as needed... return event_dict # --------------------------------------------------------------------------- # Crash detection / auto-recovery helpers # --------------------------------------------------------------------------- _CRASH_RECOVERY_SCHEMA_VERSION = "1.0" _CRASH_COMMAND_TOPIC = "infoscreen/{uuid}/commands" _CRASH_COMMAND_COMPAT_TOPIC = "infoscreen/{uuid}/command" _CRASH_RECOVERY_EXPIRY_SECONDS = int(os.getenv("CRASH_RECOVERY_COMMAND_EXPIRY_SECONDS", "240")) _CRASH_RECOVERY_LOCKOUT_MINUTES = int(os.getenv("CRASH_RECOVERY_LOCKOUT_MINUTES", "15")) def get_crash_recovery_candidates(heartbeat_grace_seconds: int) -> list: """ Returns a list of dicts for active clients that are crashed (process_status=crashed) or heartbeat-stale, and don't already have a recent recovery command in the lockout window. """ session = Session() try: now = datetime.now(timezone.utc) stale_cutoff = now - timedelta(seconds=heartbeat_grace_seconds) lockout_cutoff = now - timedelta(minutes=_CRASH_RECOVERY_LOCKOUT_MINUTES) candidates = ( session.query(Client) .filter(Client.is_active == True) .filter( or_( Client.process_status == ProcessStatus.crashed, Client.last_alive < stale_cutoff, ) ) .all() ) result = [] for c in candidates: recent = ( session.query(ClientCommand) .filter(ClientCommand.client_uuid == c.uuid) .filter(ClientCommand.created_at >= lockout_cutoff) .filter(ClientCommand.action.in_(["reboot_host", "restart_app"])) .first() ) if recent: continue crash_reason = ( "process_crashed" if c.process_status == ProcessStatus.crashed else "heartbeat_stale" ) result.append({ "uuid": c.uuid, "reason": crash_reason, "process_status": c.process_status.value if c.process_status else None, "last_alive": c.last_alive, }) return result finally: session.close() def issue_crash_recovery_command(client_uuid: str, reason: str) -> tuple: """ Writes a ClientCommand (reboot_host) for crash recovery to the DB. Returns (command_id, payload_dict) for the caller to publish over MQTT. Also returns the MQTT topic strings. """ session = Session() try: now = datetime.now(timezone.utc) expires_at = now + timedelta(seconds=_CRASH_RECOVERY_EXPIRY_SECONDS) command_id = str(_uuid_mod.uuid4()) command = ClientCommand( command_id=command_id, client_uuid=client_uuid, action="reboot_host", status="queued", reason=reason, requested_by=None, issued_at=now, expires_at=expires_at, ) session.add(command) session.commit() command.status = "publish_in_progress" session.commit() payload = { "schema_version": _CRASH_RECOVERY_SCHEMA_VERSION, "command_id": command_id, "client_uuid": client_uuid, "action": "reboot_host", "issued_at": now.isoformat().replace("+00:00", "Z"), "expires_at": expires_at.isoformat().replace("+00:00", "Z"), "requested_by": None, "reason": reason, } topic = _CRASH_COMMAND_TOPIC.format(uuid=client_uuid) compat_topic = _CRASH_COMMAND_COMPAT_TOPIC.format(uuid=client_uuid) return command_id, payload, topic, compat_topic except Exception: session.rollback() raise finally: session.close() def finalize_crash_recovery_command(command_id: str, published: bool, error: str = None) -> None: """Updates command status after MQTT publish attempt.""" session = Session() try: cmd = session.query(ClientCommand).filter_by(command_id=command_id).first() if not cmd: return now = datetime.now(timezone.utc) if published: cmd.status = "published" cmd.published_at = now else: cmd.status = "failed" cmd.failed_at = now cmd.error_code = "mqtt_publish_failed" cmd.error_message = error or "Unknown publish error" session.commit() finally: session.close() _TERMINAL_COMMAND_STATUSES = {"completed", "failed", "expired", "canceled", "blocked_safety"} def sweep_expired_commands() -> int: """Marks non-terminal commands whose expires_at has passed as expired. Returns the number of commands updated. """ session = Session() try: now = datetime.now(timezone.utc) commands = ( session.query(ClientCommand) .filter( ClientCommand.expires_at < now, ClientCommand.status.notin_(_TERMINAL_COMMAND_STATUSES), ) .all() ) if not commands: return 0 for cmd in commands: cmd.status = "expired" cmd.failed_at = now cmd.error_code = "expired" cmd.error_message = "Command expired before terminal state was reached." session.commit() return len(commands) except Exception: session.rollback() raise finally: session.close()