Files
infoscreen/scheduler/db_utils.py
RobbStarkAustria e6c19c189f feat(events): add webuntis event, unify website payload, bump UI to alpha.13
- Add `webuntis` event type; event creation resolves URL from system `supplement_table_url`
- Consolidate settings: remove separate webuntis-url endpoints; use GET/POST /api/system-settings/supplement-table
- Scheduler: emit top-level `event_type` and unified `website` payload (`{ "type":"browser","url":"..." }`) for website/webuntis
- Preserve presentation payloads (page_progress/auto_progress) — presentation messages remain backwards-compatible
- Update defaults (`init_defaults.py`) and remove duplicate webuntis setting
- Docs & metadata: bump program-info to 2025.1.0-alpha.13; update README, copilot-instructions, DEV- and TECH-CHANGELOGs; add MQTT_EVENT_PAYLOAD_GUIDE.md and WEBUNTIS_EVENT_IMPLEMENTATION.md
2025-10-19 11:35:41 +00:00

262 lines
12 KiB
Python

# scheduler/db_utils.py
from dotenv import load_dotenv
import os
from datetime import datetime
import logging
from sqlalchemy.orm import sessionmaker, joinedload
from sqlalchemy import create_engine, or_, and_, text
from models.models import Event, EventMedia, EventException
from dateutil.rrule import rrulestr
from datetime import timezone
# Load .env only in development to mirror server/database.py behavior
if os.getenv("ENV", "development") == "development":
# Expect .env at workspace root
load_dotenv('/workspace/.env')
# DB-URL aus Umgebungsvariable oder Fallback wie im Server
DB_URL = os.environ.get("DB_CONN")
if not DB_URL:
DB_USER = os.environ.get("DB_USER", "infoscreen_admin")
DB_PASSWORD = os.environ.get("DB_PASSWORD", "KqtpM7wmNd&mFKs")
DB_HOST = os.environ.get("DB_HOST", "db")
DB_NAME = os.environ.get("DB_NAME", "infoscreen_by_taa")
DB_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"
print(f"[Scheduler] Using DB_URL: {DB_URL}")
engine = create_engine(DB_URL)
# Proactive connectivity check to surface errors early
try:
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
print("[Scheduler] DB connectivity OK")
except Exception as db_exc:
print(f"[Scheduler] DB connectivity FAILED: {db_exc}")
Session = sessionmaker(bind=engine)
# Base URL from .env for file URLs
API_BASE_URL = os.environ.get("API_BASE_URL", "http://server:8000")
# Cache conversion decisions per media to avoid repeated lookups/logs within the scheduler lifetime
_media_conversion_cache = {} # media_id -> pdf_url or None
_media_decision_logged = set() # media_id(s) already logged
def get_active_events(start: datetime, end: datetime, group_id: int = None):
session = Session()
try:
# Now this will work with the relationship defined
query = session.query(Event).options(
joinedload(Event.event_media)
).filter(Event.is_active == True)
if start and end:
# Include:
# 1) Non-recurring events that overlap [start, end]
# 2) Recurring events whose recurrence window intersects [start, end]
# We consider dtstart (Event.start) <= end and (recurrence_end is NULL or >= start)
non_recurring_overlap = and_(
Event.recurrence_rule == None,
Event.start < end,
Event.end > start,
)
recurring_window = and_(
Event.recurrence_rule != None,
Event.start <= end,
or_(Event.recurrence_end == None, Event.recurrence_end >= start),
)
query = query.filter(or_(non_recurring_overlap, recurring_window))
if group_id:
query = query.filter(Event.group_id == group_id)
# Log base event count before expansion
try:
base_count = query.count()
# Additional diagnostics: split counts
non_rec_q = session.query(Event.id).filter(Event.is_active == True)
rec_q = session.query(Event.id).filter(Event.is_active == True)
if start and end:
non_rec_q = non_rec_q.filter(non_recurring_overlap)
rec_q = rec_q.filter(recurring_window)
if group_id:
non_rec_q = non_rec_q.filter(Event.group_id == group_id)
rec_q = rec_q.filter(Event.group_id == group_id)
non_rec_count = non_rec_q.count()
rec_count = rec_q.count()
logging.debug(f"[Scheduler] Base events total={base_count} non_recurring_overlap={non_rec_count} recurring_window={rec_count}")
except Exception:
base_count = None
events = query.all()
logging.debug(f"[Scheduler] Base events fetched: {len(events)} (count={base_count})")
if len(events) == 0:
# Quick probe: are there any active events at all?
try:
any_active = session.query(Event).filter(Event.is_active == True).count()
logging.info(f"[Scheduler] Active events in DB (any group, any time): {any_active}")
except Exception as e:
logging.warning(f"[Scheduler] Could not count active events: {e}")
formatted_events = []
for event in events:
# If event has RRULE, expand into instances within [start, end]
if event.recurrence_rule:
try:
# Ensure dtstart is timezone-aware (UTC if naive)
dtstart = event.start
if dtstart.tzinfo is None:
dtstart = dtstart.replace(tzinfo=timezone.utc)
r = rrulestr(event.recurrence_rule, dtstart=dtstart)
# Ensure query bounds are timezone-aware
query_start = start if start.tzinfo else start.replace(tzinfo=timezone.utc)
query_end = end if end.tzinfo else end.replace(tzinfo=timezone.utc)
# Clamp by recurrence_end if present
if getattr(event, "recurrence_end", None):
rec_end = event.recurrence_end
if rec_end and rec_end.tzinfo is None:
rec_end = rec_end.replace(tzinfo=timezone.utc)
if rec_end and rec_end < query_end:
query_end = rec_end
# iterate occurrences within range
# Use a lookback equal to the event's duration to catch occurrences that started
# before query_start but are still running within the window.
duration = (event.end - event.start) if (event.end and event.start) else None
lookback_start = query_start
if duration:
lookback_start = query_start - duration
occ_starts = r.between(lookback_start, query_end, inc=True)
for occ_start in occ_starts:
occ_end = (occ_start + duration) if duration else occ_start
# Apply exceptions
exc = session.query(EventException).filter(
EventException.event_id == event.id,
EventException.exception_date == occ_start.date()
).first()
if exc:
if exc.is_skipped:
continue
if exc.override_start:
occ_start = exc.override_start
if exc.override_end:
occ_end = exc.override_end
# Filter out instances that do not overlap [start, end]
if not (occ_start < end and occ_end > start):
continue
inst = format_event_with_media(event)
# Apply overrides to title/description if provided
if exc and exc.override_title:
inst["title"] = exc.override_title
if exc and exc.override_description:
inst["description"] = exc.override_description
inst["start"] = occ_start.isoformat()
inst["end"] = occ_end.isoformat()
inst["occurrence_of_id"] = event.id
formatted_events.append(inst)
except Exception as e:
# On parse error, fall back to single event formatting
logging.warning(f"Failed to parse recurrence rule for event {event.id}: {e}")
formatted_events.append(format_event_with_media(event))
else:
formatted_events.append(format_event_with_media(event))
return formatted_events
finally:
session.close()
def format_event_with_media(event):
"""Transform Event + EventMedia into client-expected format"""
event_dict = {
"id": event.id,
"title": event.title,
"start": str(event.start),
"end": str(event.end),
"group_id": event.group_id,
"event_type": event.event_type.value if event.event_type else None,
# Carry recurrence metadata for consumers if needed
"recurrence_rule": getattr(event, "recurrence_rule", None),
"recurrence_end": (event.recurrence_end.isoformat() if getattr(event, "recurrence_end", None) else None),
}
# Now you can directly access event.event_media
if event.event_media:
media = event.event_media
if event.event_type.value == "presentation":
event_dict["presentation"] = {
"type": "slideshow",
"files": [],
"slide_interval": event.slideshow_interval or 5000,
"auto_advance": True,
"page_progress": getattr(event, "page_progress", True),
"auto_progress": getattr(event, "auto_progress", True)
}
# Avoid per-call media-type debug to reduce log noise
# Decide file URL with caching to avoid repeated DB lookups/logs
pdf_url = _media_conversion_cache.get(media.id, None)
if pdf_url is None and getattr(media.media_type, 'value', str(media.media_type)) in ("ppt", "pptx", "odp"):
from sqlalchemy.orm import scoped_session
from models.models import Conversion, ConversionStatus
session = scoped_session(Session)
try:
conversion = session.query(Conversion).filter_by(
source_event_media_id=media.id,
target_format="pdf",
status=ConversionStatus.ready
).order_by(Conversion.completed_at.desc()).first()
logging.debug(
f"[Scheduler] Conversion lookup for media_id={media.id}: found={bool(conversion)}, path={getattr(conversion, 'target_path', None) if conversion else None}")
if conversion and conversion.target_path:
pdf_url = f"{API_BASE_URL}/api/files/converted/{conversion.target_path}"
finally:
session.remove()
# Cache the decision (even if None) to avoid repeated lookups in the same run
_media_conversion_cache[media.id] = pdf_url
# Build file entry and log decision only once per media
if pdf_url:
filename = os.path.basename(pdf_url)
event_dict["presentation"]["files"].append({
"name": filename,
"url": pdf_url,
"checksum": None,
"size": None
})
if media.id not in _media_decision_logged:
logging.debug(
f"[Scheduler] Using converted PDF for event_media_id={media.id}: {pdf_url}")
_media_decision_logged.add(media.id)
elif media.file_path:
filename = os.path.basename(media.file_path)
event_dict["presentation"]["files"].append({
"name": filename,
"url": f"{API_BASE_URL}/api/files/{media.id}/{filename}",
"checksum": None,
"size": None
})
if media.id not in _media_decision_logged:
logging.debug(
f"[Scheduler] Using original file for event_media_id={media.id}: {filename}")
_media_decision_logged.add(media.id)
# Handle website and webuntis events (both display a website)
elif event.event_type.value in ("website", "webuntis"):
event_dict["website"] = {
"type": "browser",
"url": media.url if media.url else None
}
if media.id not in _media_decision_logged:
logging.debug(
f"[Scheduler] Using website URL for event_media_id={media.id} (type={event.event_type.value}): {media.url}")
_media_decision_logged.add(media.id)
# Add other event types (video, message, etc.) here as needed...
return event_dict