feat: improve scheduler recurrence, DB config, and docs
- Broaden scheduler query window to next N days for proper recurring event expansion (scheduler.py) - Update DB connection logic for consistent .env loading and fallback (database.py) - Harden timezone handling and logging in scheduler and DB utils - Stop auto-deactivating recurring events before recurrence_end (API/events) - Update documentation to reflect new scheduler, API, and logging behavior
This commit is contained in:
@@ -2,22 +2,45 @@
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
from datetime import datetime
|
||||
import logging
|
||||
from sqlalchemy.orm import sessionmaker, joinedload
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy import create_engine, or_, and_, text
|
||||
from models.models import Event, EventMedia, EventException
|
||||
from dateutil.rrule import rrulestr
|
||||
from datetime import timezone
|
||||
|
||||
load_dotenv('/workspace/.env')
|
||||
# Load .env only in development to mirror server/database.py behavior
|
||||
if os.getenv("ENV", "development") == "development":
|
||||
# Expect .env at workspace root
|
||||
load_dotenv('/workspace/.env')
|
||||
|
||||
# DB-URL aus Umgebungsvariable oder Fallback
|
||||
DB_CONN = os.environ.get("DB_CONN", "mysql+pymysql://user:password@db/dbname")
|
||||
engine = create_engine(DB_CONN)
|
||||
# DB-URL aus Umgebungsvariable oder Fallback wie im Server
|
||||
DB_URL = os.environ.get("DB_CONN")
|
||||
if not DB_URL:
|
||||
DB_USER = os.environ.get("DB_USER", "infoscreen_admin")
|
||||
DB_PASSWORD = os.environ.get("DB_PASSWORD", "KqtpM7wmNd&mFKs")
|
||||
DB_HOST = os.environ.get("DB_HOST", "db")
|
||||
DB_NAME = os.environ.get("DB_NAME", "infoscreen_by_taa")
|
||||
DB_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"
|
||||
|
||||
print(f"[Scheduler] Using DB_URL: {DB_URL}")
|
||||
engine = create_engine(DB_URL)
|
||||
# Proactive connectivity check to surface errors early
|
||||
try:
|
||||
with engine.connect() as conn:
|
||||
conn.execute(text("SELECT 1"))
|
||||
print("[Scheduler] DB connectivity OK")
|
||||
except Exception as db_exc:
|
||||
print(f"[Scheduler] DB connectivity FAILED: {db_exc}")
|
||||
Session = sessionmaker(bind=engine)
|
||||
|
||||
# Base URL from .env for file URLs
|
||||
API_BASE_URL = os.environ.get("API_BASE_URL", "http://server:8000")
|
||||
|
||||
# Cache conversion decisions per media to avoid repeated lookups/logs within the scheduler lifetime
|
||||
_media_conversion_cache = {} # media_id -> pdf_url or None
|
||||
_media_decision_logged = set() # media_id(s) already logged
|
||||
|
||||
|
||||
def get_active_events(start: datetime, end: datetime, group_id: int = None):
|
||||
session = Session()
|
||||
@@ -28,21 +51,83 @@ def get_active_events(start: datetime, end: datetime, group_id: int = None):
|
||||
).filter(Event.is_active == True)
|
||||
|
||||
if start and end:
|
||||
query = query.filter(Event.start < end, Event.end > start)
|
||||
# Include:
|
||||
# 1) Non-recurring events that overlap [start, end]
|
||||
# 2) Recurring events whose recurrence window intersects [start, end]
|
||||
# We consider dtstart (Event.start) <= end and (recurrence_end is NULL or >= start)
|
||||
non_recurring_overlap = and_(
|
||||
Event.recurrence_rule == None,
|
||||
Event.start < end,
|
||||
Event.end > start,
|
||||
)
|
||||
recurring_window = and_(
|
||||
Event.recurrence_rule != None,
|
||||
Event.start <= end,
|
||||
or_(Event.recurrence_end == None, Event.recurrence_end >= start),
|
||||
)
|
||||
query = query.filter(or_(non_recurring_overlap, recurring_window))
|
||||
if group_id:
|
||||
query = query.filter(Event.group_id == group_id)
|
||||
|
||||
# Log base event count before expansion
|
||||
try:
|
||||
base_count = query.count()
|
||||
# Additional diagnostics: split counts
|
||||
non_rec_q = session.query(Event.id).filter(Event.is_active == True)
|
||||
rec_q = session.query(Event.id).filter(Event.is_active == True)
|
||||
if start and end:
|
||||
non_rec_q = non_rec_q.filter(non_recurring_overlap)
|
||||
rec_q = rec_q.filter(recurring_window)
|
||||
if group_id:
|
||||
non_rec_q = non_rec_q.filter(Event.group_id == group_id)
|
||||
rec_q = rec_q.filter(Event.group_id == group_id)
|
||||
non_rec_count = non_rec_q.count()
|
||||
rec_count = rec_q.count()
|
||||
logging.debug(f"[Scheduler] Base events total={base_count} non_recurring_overlap={non_rec_count} recurring_window={rec_count}")
|
||||
except Exception:
|
||||
base_count = None
|
||||
events = query.all()
|
||||
logging.debug(f"[Scheduler] Base events fetched: {len(events)} (count={base_count})")
|
||||
if len(events) == 0:
|
||||
# Quick probe: are there any active events at all?
|
||||
try:
|
||||
any_active = session.query(Event).filter(Event.is_active == True).count()
|
||||
logging.info(f"[Scheduler] Active events in DB (any group, any time): {any_active}")
|
||||
except Exception as e:
|
||||
logging.warning(f"[Scheduler] Could not count active events: {e}")
|
||||
|
||||
formatted_events = []
|
||||
for event in events:
|
||||
# If event has RRULE, expand into instances within [start, end]
|
||||
if event.recurrence_rule:
|
||||
try:
|
||||
r = rrulestr(event.recurrence_rule, dtstart=event.start)
|
||||
# Ensure dtstart is timezone-aware (UTC if naive)
|
||||
dtstart = event.start
|
||||
if dtstart.tzinfo is None:
|
||||
dtstart = dtstart.replace(tzinfo=timezone.utc)
|
||||
|
||||
r = rrulestr(event.recurrence_rule, dtstart=dtstart)
|
||||
|
||||
# Ensure query bounds are timezone-aware
|
||||
query_start = start if start.tzinfo else start.replace(tzinfo=timezone.utc)
|
||||
query_end = end if end.tzinfo else end.replace(tzinfo=timezone.utc)
|
||||
|
||||
# Clamp by recurrence_end if present
|
||||
if getattr(event, "recurrence_end", None):
|
||||
rec_end = event.recurrence_end
|
||||
if rec_end and rec_end.tzinfo is None:
|
||||
rec_end = rec_end.replace(tzinfo=timezone.utc)
|
||||
if rec_end and rec_end < query_end:
|
||||
query_end = rec_end
|
||||
|
||||
# iterate occurrences within range
|
||||
occ_starts = r.between(start, end, inc=True)
|
||||
# Use a lookback equal to the event's duration to catch occurrences that started
|
||||
# before query_start but are still running within the window.
|
||||
duration = (event.end - event.start) if (event.end and event.start) else None
|
||||
lookback_start = query_start
|
||||
if duration:
|
||||
lookback_start = query_start - duration
|
||||
occ_starts = r.between(lookback_start, query_end, inc=True)
|
||||
for occ_start in occ_starts:
|
||||
occ_end = (occ_start + duration) if duration else occ_start
|
||||
# Apply exceptions
|
||||
@@ -57,13 +142,22 @@ def get_active_events(start: datetime, end: datetime, group_id: int = None):
|
||||
occ_start = exc.override_start
|
||||
if exc.override_end:
|
||||
occ_end = exc.override_end
|
||||
# Filter out instances that do not overlap [start, end]
|
||||
if not (occ_start < end and occ_end > start):
|
||||
continue
|
||||
inst = format_event_with_media(event)
|
||||
# Apply overrides to title/description if provided
|
||||
if exc and exc.override_title:
|
||||
inst["title"] = exc.override_title
|
||||
if exc and exc.override_description:
|
||||
inst["description"] = exc.override_description
|
||||
inst["start"] = occ_start.isoformat()
|
||||
inst["end"] = occ_end.isoformat()
|
||||
inst["occurrence_of_id"] = event.id
|
||||
formatted_events.append(inst)
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
# On parse error, fall back to single event formatting
|
||||
logging.warning(f"Failed to parse recurrence rule for event {event.id}: {e}")
|
||||
formatted_events.append(format_event_with_media(event))
|
||||
else:
|
||||
formatted_events.append(format_event_with_media(event))
|
||||
@@ -87,7 +181,6 @@ def format_event_with_media(event):
|
||||
}
|
||||
|
||||
# Now you can directly access event.event_media
|
||||
import logging
|
||||
if event.event_media:
|
||||
media = event.event_media
|
||||
|
||||
@@ -99,28 +192,31 @@ def format_event_with_media(event):
|
||||
"auto_advance": True
|
||||
}
|
||||
|
||||
# Debug: log media_type
|
||||
logging.debug(
|
||||
f"[Scheduler] EventMedia id={media.id} media_type={getattr(media.media_type, 'value', str(media.media_type))}")
|
||||
# Avoid per-call media-type debug to reduce log noise
|
||||
|
||||
# Check for PDF conversion for ppt/pptx/odp
|
||||
from sqlalchemy.orm import scoped_session
|
||||
from models.models import Conversion, ConversionStatus
|
||||
session = scoped_session(Session)
|
||||
pdf_url = None
|
||||
if getattr(media.media_type, 'value', str(media.media_type)) in ("ppt", "pptx", "odp"):
|
||||
conversion = session.query(Conversion).filter_by(
|
||||
source_event_media_id=media.id,
|
||||
target_format="pdf",
|
||||
status=ConversionStatus.ready
|
||||
).order_by(Conversion.completed_at.desc()).first()
|
||||
logging.debug(
|
||||
f"[Scheduler] Conversion lookup for media_id={media.id}: found={bool(conversion)}, path={getattr(conversion, 'target_path', None) if conversion else None}")
|
||||
if conversion and conversion.target_path:
|
||||
# Serve via /api/files/converted/<path>
|
||||
pdf_url = f"{API_BASE_URL}/api/files/converted/{conversion.target_path}"
|
||||
session.remove()
|
||||
# Decide file URL with caching to avoid repeated DB lookups/logs
|
||||
pdf_url = _media_conversion_cache.get(media.id, None)
|
||||
|
||||
if pdf_url is None and getattr(media.media_type, 'value', str(media.media_type)) in ("ppt", "pptx", "odp"):
|
||||
from sqlalchemy.orm import scoped_session
|
||||
from models.models import Conversion, ConversionStatus
|
||||
session = scoped_session(Session)
|
||||
try:
|
||||
conversion = session.query(Conversion).filter_by(
|
||||
source_event_media_id=media.id,
|
||||
target_format="pdf",
|
||||
status=ConversionStatus.ready
|
||||
).order_by(Conversion.completed_at.desc()).first()
|
||||
logging.debug(
|
||||
f"[Scheduler] Conversion lookup for media_id={media.id}: found={bool(conversion)}, path={getattr(conversion, 'target_path', None) if conversion else None}")
|
||||
if conversion and conversion.target_path:
|
||||
pdf_url = f"{API_BASE_URL}/api/files/converted/{conversion.target_path}"
|
||||
finally:
|
||||
session.remove()
|
||||
# Cache the decision (even if None) to avoid repeated lookups in the same run
|
||||
_media_conversion_cache[media.id] = pdf_url
|
||||
|
||||
# Build file entry and log decision only once per media
|
||||
if pdf_url:
|
||||
filename = os.path.basename(pdf_url)
|
||||
event_dict["presentation"]["files"].append({
|
||||
@@ -129,8 +225,10 @@ def format_event_with_media(event):
|
||||
"checksum": None,
|
||||
"size": None
|
||||
})
|
||||
logging.info(
|
||||
f"[Scheduler] Using converted PDF for event_media_id={media.id}: {pdf_url}")
|
||||
if media.id not in _media_decision_logged:
|
||||
logging.debug(
|
||||
f"[Scheduler] Using converted PDF for event_media_id={media.id}: {pdf_url}")
|
||||
_media_decision_logged.add(media.id)
|
||||
elif media.file_path:
|
||||
filename = os.path.basename(media.file_path)
|
||||
event_dict["presentation"]["files"].append({
|
||||
@@ -139,8 +237,10 @@ def format_event_with_media(event):
|
||||
"checksum": None,
|
||||
"size": None
|
||||
})
|
||||
logging.info(
|
||||
f"[Scheduler] Using original file for event_media_id={media.id}: {filename}")
|
||||
if media.id not in _media_decision_logged:
|
||||
logging.debug(
|
||||
f"[Scheduler] Using original file for event_media_id={media.id}: {filename}")
|
||||
_media_decision_logged.add(media.id)
|
||||
|
||||
# Add other event types...
|
||||
|
||||
|
||||
@@ -37,6 +37,8 @@ def main():
|
||||
POLL_INTERVAL = 30 # Sekunden, Empfehlung für seltene Änderungen
|
||||
# 0 = aus; z.B. 600 für alle 10 Min
|
||||
REFRESH_SECONDS = int(os.getenv("REFRESH_SECONDS", "0"))
|
||||
# Konfigurierbares Zeitfenster in Tagen (Standard: 7)
|
||||
WINDOW_DAYS = int(os.getenv("EVENTS_WINDOW_DAYS", "7"))
|
||||
last_payloads = {} # group_id -> payload
|
||||
last_published_at = {} # group_id -> epoch seconds
|
||||
|
||||
@@ -55,8 +57,17 @@ def main():
|
||||
|
||||
while True:
|
||||
now = datetime.datetime.now(datetime.timezone.utc)
|
||||
# Query window: next N days to capture upcoming events and recurring instances
|
||||
# Clients need to know what's coming, not just what's active right now
|
||||
end_window = now + datetime.timedelta(days=WINDOW_DAYS)
|
||||
logging.debug(f"Fetching events window start={now.isoformat()} end={end_window.isoformat()} (days={WINDOW_DAYS})")
|
||||
# Hole alle aktiven Events (bereits formatierte Dictionaries)
|
||||
events = get_active_events(now, now)
|
||||
try:
|
||||
events = get_active_events(now, end_window)
|
||||
logging.debug(f"Fetched {len(events)} events for publishing window")
|
||||
except Exception as e:
|
||||
logging.exception(f"Error while fetching events: {e}")
|
||||
events = []
|
||||
|
||||
# Gruppiere Events nach group_id
|
||||
groups = {}
|
||||
@@ -67,6 +78,9 @@ def main():
|
||||
# Event ist bereits ein Dictionary im gewünschten Format
|
||||
groups[gid].append(event)
|
||||
|
||||
if not groups:
|
||||
logging.debug("No events grouped for any client group in current window")
|
||||
|
||||
# Sende pro Gruppe die Eventliste als retained Message, nur bei Änderung
|
||||
for gid, event_list in groups.items():
|
||||
# stabile Reihenfolge, um unnötige Publishes zu vermeiden
|
||||
@@ -87,7 +101,7 @@ def main():
|
||||
logging.error(
|
||||
f"Fehler beim Publish für Gruppe {gid}: {mqtt.error_string(result.rc)}")
|
||||
else:
|
||||
logging.info(f"Events für Gruppe {gid} gesendet")
|
||||
logging.info(f"Events für Gruppe {gid} gesendet (count={len(event_list)})")
|
||||
last_payloads[gid] = payload
|
||||
last_published_at[gid] = time.time()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user