Files
infoscreen-dev/src/display_manager.py
2026-03-29 15:04:09 +02:00

2275 lines
102 KiB
Python

#!/usr/bin/env python3
"""
Display Manager - Monitors events and controls display software
This daemon process:
1. Watches current_event.json for changes
2. Manages lifecycle of display applications (LibreOffice, Chromium, VLC)
3. Respects event timing (start/end times)
4. Handles graceful application transitions
"""
import os
import sys
import json
import time
import logging
import signal
import subprocess
import shutil
from datetime import datetime, timezone
from pathlib import Path
from logging.handlers import RotatingFileHandler
from typing import Optional, Dict, List, IO
from urllib.parse import urlparse, urlsplit, urlunsplit, quote
import requests
import psutil
import json as _json
import threading
import time as _time
from dotenv import load_dotenv
# Load environment
env_paths = [
os.path.join(os.path.dirname(__file__), ".env"),
os.path.join(os.path.expanduser("~"), "infoscreen-dev", ".env"),
]
for env_path in env_paths:
if os.path.exists(env_path):
load_dotenv(env_path)
break
# Best-effort display env bootstrap for non-interactive starts (nohup/systemd/ssh).
# If both Wayland and X11 vars are missing, default to X11 :0 which is the
# common kiosk display on Raspberry Pi deployments.
if not os.environ.get("WAYLAND_DISPLAY") and not os.environ.get("DISPLAY"):
os.environ["DISPLAY"] = os.getenv("DISPLAY", ":0")
# X11 capture tools may also require XAUTHORITY when started outside a desktop
# session shell; default to ~/.Xauthority when available.
if os.environ.get("DISPLAY") and not os.environ.get("XAUTHORITY"):
xauth_default = os.path.join(os.path.expanduser("~"), ".Xauthority")
if os.path.exists(xauth_default):
os.environ["XAUTHORITY"] = xauth_default
# Configuration
ENV = os.getenv("ENV", "development")
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG" if ENV == "development" else "INFO")
# Screenshot capture configuration (distinct from transmission interval used by simclient)
SCREENSHOT_CAPTURE_INTERVAL = int(os.getenv("SCREENSHOT_CAPTURE_INTERVAL", os.getenv("SCREENSHOT_INTERVAL", "30"))) # seconds
SCREENSHOT_MAX_WIDTH = int(os.getenv("SCREENSHOT_MAX_WIDTH", "800")) # Width to downscale (preserve aspect)
SCREENSHOT_JPEG_QUALITY = int(os.getenv("SCREENSHOT_JPEG_QUALITY", "70")) # JPEG quality 1-95
SCREENSHOT_MAX_FILES = int(os.getenv("SCREENSHOT_MAX_FILES", "20")) # Rotate old screenshots
SCREENSHOT_ALWAYS = os.getenv("SCREENSHOT_ALWAYS", "0").lower() in ("1","true","yes")
# Delay (seconds) before triggered screenshot fires after event start/stop
SCREENSHOT_TRIGGER_DELAY_PRESENTATION = int(os.getenv("SCREENSHOT_TRIGGER_DELAY_PRESENTATION", "4"))
SCREENSHOT_TRIGGER_DELAY_VIDEO = int(os.getenv("SCREENSHOT_TRIGGER_DELAY_VIDEO", "2"))
SCREENSHOT_TRIGGER_DELAY_WEB = int(os.getenv("SCREENSHOT_TRIGGER_DELAY_WEB", "5"))
CHECK_INTERVAL = int(os.getenv("DISPLAY_CHECK_INTERVAL", "5")) # seconds
PRESENTATION_DIR = os.path.join(os.path.dirname(__file__), "presentation")
EVENT_FILE = os.path.join(os.path.dirname(__file__), "current_event.json")
CLIENT_SETTINGS_FILE = os.path.join(os.path.dirname(__file__), "config", "client_settings.json")
# HDMI-CEC Configuration
# Note: CEC is automatically disabled in development mode to avoid constantly switching TV on/off
CEC_ENABLED = os.getenv("CEC_ENABLED", "true").lower() in ("true", "1", "yes")
if ENV == "development":
CEC_ENABLED = False # Override: disable CEC in development mode
CEC_DEVICE = os.getenv("CEC_DEVICE", "TV") # Target device name (TV, 0, etc.)
CEC_TURN_OFF_DELAY = int(os.getenv("CEC_TURN_OFF_DELAY", "30")) # seconds after last event ends
CEC_POWER_ON_WAIT = int(os.getenv("CEC_POWER_ON_WAIT", "3")) # seconds to wait after turning TV on
CEC_POWER_OFF_WAIT = int(os.getenv("CEC_POWER_OFF_WAIT", "2")) # seconds to wait after turning TV off
# Setup logging
LOG_PATH = os.path.join(os.path.dirname(__file__), "..", "logs", "display_manager.log")
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
logging.basicConfig(
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
format="%(asctime)s.%(msecs)03dZ [%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
handlers=[
RotatingFileHandler(LOG_PATH, maxBytes=2*1024*1024, backupCount=5),
logging.StreamHandler()
]
)
# Force all logging timestamps to UTC (affects %(asctime)s in all formatters).
logging.Formatter.converter = time.gmtime
# Log CEC mode after logging is configured
if ENV == "development":
logging.info("[DEV MODE] HDMI-CEC automatically disabled (TV control off)")
elif CEC_ENABLED:
logging.info(f"[CEC] HDMI-CEC enabled: TV control active (device: {CEC_DEVICE})")
else:
logging.info("[CEC] HDMI-CEC disabled in configuration")
# Setup monitoring logger (separate from main display_manager.log for health/crash tracking)
MONITORING_LOG_PATH = os.path.join(os.path.dirname(__file__), "..", "logs", "monitoring.log")
os.makedirs(os.path.dirname(MONITORING_LOG_PATH), exist_ok=True)
monitoring_logger = logging.getLogger("monitoring")
monitoring_logger.setLevel(getattr(logging, LOG_LEVEL.upper(), logging.INFO))
monitoring_handler = RotatingFileHandler(MONITORING_LOG_PATH, maxBytes=5*1024*1024, backupCount=5)
monitoring_handler.setFormatter(logging.Formatter("%(asctime)s.%(msecs)03dZ [%(levelname)s] %(message)s", "%Y-%m-%dT%H:%M:%S"))
monitoring_logger.addHandler(monitoring_handler)
monitoring_logger.propagate = False # Don't duplicate to main logger
logging.info(f"Monitoring logger initialized: {MONITORING_LOG_PATH}")
# Health state file (shared bridge between display_manager and simclient)
HEALTH_STATE_FILE = os.path.join(os.path.dirname(__file__), "current_process_health.json")
class ProcessHealthState:
"""Track and persist process health state for monitoring integration"""
def __init__(self):
self.event_id = None
self.event_type = None
self.process_name = None
self.process_pid = None
self.status = "stopped" # running, crashed, starting, stopped
self.restart_count = 0
self.max_restarts = 3
self.last_update = datetime.now(timezone.utc).isoformat()
def to_dict(self) -> Dict:
return {
"event_id": self.event_id,
"event_type": self.event_type,
"current_process": self.process_name,
"process_pid": self.process_pid,
"process_status": self.status,
"restart_count": self.restart_count,
"timestamp": datetime.now(timezone.utc).isoformat()
}
def save(self):
"""Persist health state to file for simclient to read"""
try:
with open(HEALTH_STATE_FILE, "w") as f:
json.dump(self.to_dict(), f, indent=2)
except Exception as e:
monitoring_logger.error(f"Failed to save health state: {e}")
def update_running(self, event_id: str, event_type: str, process_name: str, pid: Optional[int]):
"""Mark process as running"""
self.event_id = event_id
self.event_type = event_type
self.process_name = process_name
self.process_pid = pid
self.status = "running"
self.restart_count = 0
self.save()
monitoring_logger.info(f"Process started: event_id={event_id} event_type={event_type} process={process_name} pid={pid}")
def update_crashed(self):
"""Mark process as crashed"""
self.status = "crashed"
self.save()
monitoring_logger.warning(f"Process crashed: event_id={self.event_id} event_type={self.event_type} process={self.process_name} restart_count={self.restart_count}/{self.max_restarts}")
def update_restart_attempt(self):
"""Increment restart counter and check if max exceeded"""
self.restart_count += 1
self.save()
if self.restart_count <= self.max_restarts:
monitoring_logger.warning(f"Restarting process: attempt {self.restart_count}/{self.max_restarts} for {self.process_name}")
else:
monitoring_logger.error(f"Max restart attempts exceeded for {self.process_name}: event_id={self.event_id}")
def update_stopped(self):
"""Mark process as stopped (normal event end)"""
self.status = "stopped"
self.event_id = None
self.event_type = None
self.process_name = None
self.process_pid = None
self.restart_count = 0
self.save()
monitoring_logger.info("Process stopped (event ended or no active event)")
class HDMICECController:
"""Controls HDMI-CEC to turn TV on/off automatically
Uses cec-client from libcec to send CEC commands to the connected TV.
Automatically turns TV on when events start and off when events end (with configurable delay).
"""
def __init__(self, enabled: bool = True, device: str = "TV", turn_off_delay: int = 30,
power_on_wait: int = 3, power_off_wait: int = 2):
"""
Args:
enabled: Whether CEC control is enabled
device: Target CEC device (TV, 0, etc.)
turn_off_delay: Seconds to wait after last event ends before turning off TV
power_on_wait: Seconds to wait after sending power ON command (for TV to boot)
power_off_wait: Seconds to wait after sending power OFF command
"""
self.enabled = enabled
self.device = device
self.turn_off_delay = turn_off_delay
self.power_on_wait = power_on_wait
self.power_off_wait = power_off_wait
self.tv_state = None # None = unknown, True = on, False = off
self.turn_off_timer = None
if not self.enabled:
logging.info("HDMI-CEC control disabled")
return
# Check if cec-client is available
if not self._check_cec_available():
logging.warning("cec-client not found - HDMI-CEC control disabled")
logging.info("Install with: sudo apt-get install cec-utils")
self.enabled = False
return
logging.info(f"HDMI-CEC controller initialized (device: {self.device}, turn_off_delay: {self.turn_off_delay}s)")
# Try to detect current TV state
self._detect_tv_state()
def _check_cec_available(self) -> bool:
"""Check if cec-client command is available"""
try:
subprocess.run(
['which', 'cec-client'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True
)
return True
except subprocess.CalledProcessError:
return False
def _run_cec_command(self, command: str, timeout: int = 10) -> bool:
"""Run a CEC command via cec-client
Args:
command: CEC command to send (e.g., 'on 0', 'standby 0')
timeout: Command timeout in seconds
Returns:
True if command succeeded, False otherwise
"""
if not self.enabled:
return False
try:
# Use echo to pipe command to cec-client
# cec-client -s -d 1 means: single command mode, log level 1
result = subprocess.run(
f'echo "{command}" | cec-client -s -d 1',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
check=False
)
output = result.stdout.decode('utf-8', errors='ignore')
# Check for common success indicators in output
success = (
result.returncode == 0 or
'power status changed' in output.lower() or
'power on' in output.lower() or
'standby' in output.lower()
)
if success:
logging.debug(f"CEC command '{command}' executed successfully")
else:
logging.warning(f"CEC command '{command}' may have failed (rc={result.returncode})")
return success
except subprocess.TimeoutExpired:
logging.error(f"CEC command '{command}' timed out after {timeout}s")
return False
except Exception as e:
logging.error(f"Error running CEC command '{command}': {e}")
return False
def _detect_tv_state(self):
"""Try to detect current TV power state"""
if not self.enabled:
return
try:
# Query power status of device 0 (TV)
result = subprocess.run(
'echo "pow 0" | cec-client -s -d 1',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=5,
check=False
)
output = result.stdout.decode('utf-8', errors='ignore').lower()
if 'power status: on' in output or 'power status: 0' in output:
self.tv_state = True
logging.info("TV detected as ON")
elif 'power status: standby' in output or 'power status: 1' in output:
self.tv_state = False
logging.info("TV detected as STANDBY/OFF")
else:
logging.debug(f"Could not detect TV state. Output: {output[:200]}")
except Exception as e:
logging.debug(f"Could not detect TV state: {e}")
def turn_on(self) -> bool:
"""Turn TV on via HDMI-CEC
Returns:
True if command succeeded or TV was already on
"""
if not self.enabled:
return False
# Cancel any pending turn-off timer
if self.turn_off_timer:
self.turn_off_timer.cancel()
self.turn_off_timer = None
logging.debug("Cancelled pending TV turn-off timer")
# Skip if TV is already on
if self.tv_state is True:
logging.debug("TV already on, skipping CEC command")
return True
logging.info("Turning TV ON via HDMI-CEC...")
# Send power on command to device 0 (TV)
success = self._run_cec_command(f'on {self.device}')
if success:
self.tv_state = True
logging.info("TV turned ON successfully")
# Give TV time to actually power on (TVs can be slow)
if self.power_on_wait > 0:
logging.debug(f"Waiting {self.power_on_wait} seconds for TV to power on...")
import time
time.sleep(self.power_on_wait)
else:
logging.warning("Failed to turn TV ON")
return success
def turn_off(self, delayed: bool = False) -> bool:
"""Turn TV off via HDMI-CEC
Args:
delayed: If True, uses configured delay before turning off
Returns:
True if command succeeded or was scheduled
"""
if not self.enabled:
return False
if delayed and self.turn_off_delay > 0:
# Schedule delayed turn-off
if self.turn_off_timer:
self.turn_off_timer.cancel()
logging.info(f"Scheduling TV turn-off in {self.turn_off_delay}s...")
self.turn_off_timer = threading.Timer(
self.turn_off_delay,
self._turn_off_now
)
self.turn_off_timer.daemon = True
self.turn_off_timer.start()
return True
else:
# Immediate turn-off
return self._turn_off_now()
def _turn_off_now(self) -> bool:
"""Internal method to turn TV off immediately"""
# Skip if TV is already off
if self.tv_state is False:
logging.debug("TV already off, skipping CEC command")
return True
logging.info("Turning TV OFF via HDMI-CEC...")
# Send standby command to device 0 (TV)
success = self._run_cec_command(f'standby {self.device}')
if success:
self.tv_state = False
logging.info("TV turned OFF successfully")
# Give TV time to actually power off
if self.power_off_wait > 0:
logging.debug(f"Waiting {self.power_off_wait} seconds for TV to power off...")
import time
time.sleep(self.power_off_wait)
else:
logging.warning("Failed to turn TV OFF")
return success
def cancel_turn_off(self):
"""Cancel any pending turn-off timer"""
if self.turn_off_timer:
self.turn_off_timer.cancel()
self.turn_off_timer = None
logging.debug("Cancelled TV turn-off timer")
class DisplayProcess:
"""Manages a running display application process"""
def __init__(self, process: Optional[subprocess.Popen] = None, event_type: str = "", event_id: str = "", log_file: Optional[IO[bytes]] = None, log_path: Optional[str] = None, player: Optional[object] = None, volume_pct: Optional[int] = None):
"""process: subprocess.Popen when using external binary
player: python-vlc MediaPlayer or MediaListPlayer when using libvlc
"""
self.process = process
self.player = player
self.event_type = event_type
self.event_id = event_id
self.start_time = datetime.now(timezone.utc)
self.log_file = log_file
self.log_path = log_path
self.volume_pct = volume_pct
def is_running(self) -> bool:
"""Check if the underlying display is still running.
Works for subprocess.Popen-based processes and for python-vlc player objects.
"""
try:
if self.process:
return self.process.poll() is None
if self.player:
# python-vlc MediaPlayer: is_playing() returns 1 while playing
if hasattr(self.player, 'is_playing'):
try:
if bool(self.player.is_playing()):
return True
# A plain is_playing()==0 can happen briefly during opening/
# buffering on HTTP streams. Treat those states as still alive.
state = None
if hasattr(self.player, 'get_state'):
state = self.player.get_state()
if state is not None:
import vlc as _vlc
return state in (
_vlc.State.Opening,
_vlc.State.Buffering,
_vlc.State.Playing,
_vlc.State.Paused,
)
return False
except Exception:
pass
# MediaListPlayer may not have is_playing - try to inspect media player state
try:
state = None
if hasattr(self.player, 'get_state'):
state = self.player.get_state()
elif hasattr(self.player, 'get_media_player'):
mp = self.player.get_media_player()
if mp and hasattr(mp, 'get_state'):
state = mp.get_state()
# Consider ended/stopped states as not running
import vlc as _vlc
if state is None:
return False
return state not in (_vlc.State.Ended, _vlc.State.Stopped, _vlc.State.Error)
except Exception:
return False
return False
except Exception:
return False
def terminate(self, force: bool = False):
"""Terminate the display process or player gracefully or forcefully"""
# Always attempt to cleanup both subprocess and python-vlc player resources.
# Do not return early if is_running() is False; there may still be resources to release.
try:
# If it's an external subprocess, handle as before
if self.process:
pid_info = f" (PID: {getattr(self.process, 'pid', 'unknown')})"
if force:
logging.warning(f"Force killing {self.event_type} process{pid_info}")
try:
self.process.kill()
except Exception:
pass
else:
logging.info(f"Terminating {self.event_type} process gracefully{pid_info}")
try:
self.process.terminate()
except Exception:
pass
# Wait for process to exit (with timeout)
try:
self.process.wait(timeout=5)
logging.info(f"{self.event_type} process terminated successfully")
except subprocess.TimeoutExpired:
if not force:
logging.warning(f"{self.event_type} didn't terminate gracefully, forcing kill")
try:
self.process.kill()
self.process.wait(timeout=2)
except Exception:
pass
# If it's a python-vlc player, stop it
# Attempt to stop and release python-vlc players if present
if self.player:
try:
logging.info(f"Stopping vlc player for {self.event_type}")
# Call stop() if available
stop_fn = getattr(self.player, 'stop', None)
if callable(stop_fn):
try:
stop_fn()
except Exception:
pass
# Try to stop/release underlying media player (MediaListPlayer -> get_media_player)
try:
mp = None
if hasattr(self.player, 'get_media_player'):
mp = self.player.get_media_player()
elif hasattr(self.player, 'get_media'):
# Some wrappers may expose media directly
mp = self.player
if mp:
try:
if hasattr(mp, 'stop'):
mp.stop()
except Exception:
pass
# Release media player if supported
rel = getattr(mp, 'release', None)
if callable(rel):
try:
rel()
except Exception:
pass
except Exception:
pass
# Finally, try to release the top-level player object
try:
rel_top = getattr(self.player, 'release', None)
if callable(rel_top):
rel_top()
except Exception:
pass
# Remove reference to player so GC can collect underlying libvlc resources
self.player = None
except Exception as e:
logging.error(f"Error stopping vlc player: {e}")
except Exception as e:
logging.error(f"Error terminating process/player: {e}")
finally:
# Close log file handle if open
if self.log_file and not self.log_file.closed:
try:
self.log_file.close()
except Exception:
pass
class DisplayManager:
"""Main display manager that orchestrates event display"""
def __init__(self):
self.current_process: Optional[DisplayProcess] = None
self.current_event_data: Optional[Dict] = None
self.last_file_mtime: Optional[float] = None
self.running = True
self.client_settings_mtime: Optional[float] = None
self.client_volume_multiplier = 1.0
self._video_duration_cache: Dict[str, float] = {}
# Initialize health state tracking for process monitoring
self.health = ProcessHealthState()
# Initialize HDMI-CEC controller
self.cec = HDMICECController(
enabled=CEC_ENABLED,
device=CEC_DEVICE,
turn_off_delay=CEC_TURN_OFF_DELAY,
power_on_wait=CEC_POWER_ON_WAIT,
power_off_wait=CEC_POWER_OFF_WAIT
)
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
# Screenshot directory (shared with simclient container via volume)
self.screenshot_dir = os.path.join(os.path.dirname(__file__), 'screenshots')
os.makedirs(self.screenshot_dir, exist_ok=True)
# Start background screenshot thread
self._screenshot_thread = threading.Thread(target=self._screenshot_loop, daemon=True)
self._screenshot_thread.start()
# Pending one-shot timer for event-triggered screenshots (event_start / event_stop)
self._pending_trigger_timer: Optional[threading.Timer] = None
self._load_client_settings(force=True)
def _normalize_volume_level(self, value, default: float = 1.0) -> float:
"""Normalize an event/client volume level to a 0.0-1.0 range."""
try:
level = float(value)
except (TypeError, ValueError):
level = default
return max(0.0, min(1.0, level))
def _extract_client_volume_multiplier(self, settings: Dict) -> float:
"""Read the client-wide video multiplier from persisted dashboard settings."""
if not isinstance(settings, dict):
return 1.0
candidate = settings.get('video_volume_multiplier')
if candidate is None:
candidate = settings.get('volume_multiplier')
audio_settings = settings.get('audio')
if isinstance(audio_settings, dict):
candidate = audio_settings.get('video_volume_multiplier', candidate)
candidate = audio_settings.get('volume_multiplier', candidate)
return self._normalize_volume_level(candidate, default=1.0)
def _load_client_settings(self, force: bool = False):
"""Load dashboard-managed client settings from disk when changed."""
try:
if not os.path.exists(CLIENT_SETTINGS_FILE):
if force or self.client_volume_multiplier != 1.0:
self.client_settings_mtime = None
self.client_volume_multiplier = 1.0
logging.info("No client settings found, using default video multiplier: 1.00")
return
current_mtime = os.path.getmtime(CLIENT_SETTINGS_FILE)
if not force and self.client_settings_mtime == current_mtime:
return
with open(CLIENT_SETTINGS_FILE, 'r', encoding='utf-8') as f:
settings = json.load(f)
self.client_settings_mtime = current_mtime
self.client_volume_multiplier = self._extract_client_volume_multiplier(settings)
logging.info(
"Loaded client settings from %s, video multiplier: %.2f",
CLIENT_SETTINGS_FILE,
self.client_volume_multiplier,
)
except Exception as e:
logging.warning(f"Could not load client settings from {CLIENT_SETTINGS_FILE}: {e}")
def _calculate_video_volume_pct(self, video: Dict) -> int:
"""Combine event volume with the client-level multiplier for VLC."""
if video.get('muted', False):
logging.info("Video volume resolved: muted=True, effective=0%")
return 0
self._load_client_settings()
event_volume = self._normalize_volume_level(video.get('volume'), default=1.0)
effective_volume = event_volume * self.client_volume_multiplier
effective_pct = int(round(max(0.0, min(1.0, effective_volume)) * 100))
logging.info(
"Video volume resolved: event=%.2f client=%.2f effective=%d%%",
event_volume,
self.client_volume_multiplier,
effective_pct,
)
return effective_pct
def _apply_vlc_volume(self, player: object, volume_pct: int, context: str, retries: bool = False):
"""Apply VLC volume, retrying briefly after playback starts when requested."""
if player is None:
return
def _set_volume_once() -> bool:
try:
result = player.audio_set_volume(volume_pct)
current_volume = None
try:
current_volume = player.audio_get_volume()
except Exception:
current_volume = None
if result == -1:
logging.debug("VLC rejected volume %d%% for %s", volume_pct, context)
return False
if current_volume not in (None, -1):
logging.info("Applied VLC volume for %s: %d%%", context, current_volume)
return True
logging.debug("Applied VLC volume for %s: requested %d%%", context, volume_pct)
return True
except Exception as e:
logging.debug(f"Could not set volume on {context}: {e}")
return False
if _set_volume_once() or not retries:
return
def _worker():
for delay in (0.2, 0.7, 1.5):
time.sleep(delay)
if _set_volume_once():
return
logging.warning("Failed to apply VLC volume for %s after playback start", context)
threading.Thread(target=_worker, daemon=True).start()
def _get_video_audio_target(self, display_process: DisplayProcess) -> Optional[object]:
"""Return the VLC object that accepts audio volume changes."""
player = getattr(display_process, 'player', None)
if not player:
return None
if hasattr(player, 'audio_set_volume'):
return player
if hasattr(player, 'get_media_player'):
try:
media_player = player.get_media_player()
if media_player and hasattr(media_player, 'audio_set_volume'):
return media_player
except Exception:
return None
return None
def _apply_runtime_video_settings(self, event: Dict):
"""Apply client-setting volume changes to an already running video."""
if not self.current_process or self.current_process.event_type != 'video':
return
video = event.get('video', {}) if isinstance(event.get('video', {}), dict) else {}
desired_volume_pct = self._calculate_video_volume_pct(video)
if self.current_process.volume_pct == desired_volume_pct:
return
audio_target = self._get_video_audio_target(self.current_process)
if audio_target is None:
logging.debug(
"Skipping live volume update for current video event; no controllable VLC player is attached"
)
self.current_process.volume_pct = desired_volume_pct
return
self._apply_vlc_volume(audio_target, desired_volume_pct, 'active video runtime update')
self.current_process.volume_pct = desired_volume_pct
def _signal_handler(self, signum, frame):
"""Handle shutdown signals"""
logging.info(f"Received signal {signum}, shutting down gracefully...")
self.running = False
self.stop_current_display()
# Turn off TV when shutting down (delayed)
self.cec.turn_off(delayed=True)
sys.exit(0)
def read_event_file(self) -> Optional[Dict]:
"""Read and parse current_event.json"""
try:
if not os.path.exists(EVENT_FILE):
return None
# Check if file has changed
current_mtime = os.path.getmtime(EVENT_FILE)
if self.last_file_mtime and current_mtime == self.last_file_mtime:
return self.current_event_data # No change
self.last_file_mtime = current_mtime
with open(EVENT_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
logging.info(f"Event file updated, read: {json.dumps(data, indent=2)}")
return data
except json.JSONDecodeError as e:
logging.error(f"Invalid JSON in event file: {e}")
return None
except Exception as e:
logging.error(f"Error reading event file: {e}")
return None
def is_event_active(self, event: Dict) -> bool:
"""Check if event should be displayed based on start/end times
Note: Event times are expected to be in UTC (as sent from server).
We compare them with current UTC time.
"""
# Get current time in UTC
now_utc = datetime.now(timezone.utc)
try:
# Parse start time if present (assume UTC)
if 'start' in event:
start_str = event['start'].replace(' ', 'T')
# Parse as naive datetime and make it UTC-aware
start_time = datetime.fromisoformat(start_str)
if start_time.tzinfo is None:
start_time = start_time.replace(tzinfo=timezone.utc)
if now_utc < start_time:
# Calculate time until start
time_until = (start_time - now_utc).total_seconds()
logging.debug(f"Event not started yet. Start: {start_time} UTC, "
f"Now: {now_utc.strftime('%Y-%m-%d %H:%M:%S')} UTC, "
f"Time until start: {int(time_until)}s")
return False
# Parse end time if present (assume UTC)
if 'end' in event:
end_str = event['end'].replace(' ', 'T')
# Parse as naive datetime and make it UTC-aware
end_time = datetime.fromisoformat(end_str)
if end_time.tzinfo is None:
end_time = end_time.replace(tzinfo=timezone.utc)
if now_utc > end_time:
# Calculate time since end
time_since = (now_utc - end_time).total_seconds()
logging.debug(f"Event has ended. End: {end_time} UTC, "
f"Now: {now_utc.strftime('%Y-%m-%d %H:%M:%S')} UTC, "
f"Time since end: {int(time_since)}s")
return False
# Event is active
logging.debug(f"Event is active. Current time: {now_utc.strftime('%Y-%m-%d %H:%M:%S')} UTC")
return True
except Exception as e:
logging.error(f"Error parsing event times: {e}")
# If we can't parse times, assume event is active
return True
def get_event_identifier(self, event: Dict) -> str:
"""Generate unique identifier for an event"""
# Use event ID if available
if 'id' in event:
return f"event_{event['id']}"
# Prefer explicit event_type when present
etype = event.get('event_type')
if etype:
if etype == 'presentation' and 'presentation' in event:
files = event['presentation'].get('files', [])
if files:
return f"presentation_{files[0].get('name', 'unknown')}"
return f"presentation_unknown"
if etype in ('webpage', 'webuntis', 'website'):
# webuntis/webpage may include website or web key
url = None
if 'website' in event and isinstance(event['website'], dict):
url = event['website'].get('url')
if 'web' in event and isinstance(event['web'], dict):
url = url or event['web'].get('url')
return f"{etype}_{url or 'unknown'}"
# Fallback to previous content-based identifiers
if 'presentation' in event:
files = event['presentation'].get('files', [])
if files:
return f"presentation_{files[0].get('name', 'unknown')}"
elif 'web' in event:
return f"web_{event['web'].get('url', 'unknown')}"
elif 'video' in event:
return f"video_{event['video'].get('url', 'unknown')}"
return f"unknown_{abs(hash(json.dumps(event))) }"
def stop_current_display(self, turn_off_tv: bool = True):
"""Stop the currently running display process
Args:
turn_off_tv: If True, schedule TV turn-off (with delay)
"""
if self.current_process:
logging.info(f"Stopping current display: {self.current_process.event_type}")
self.current_process.terminate()
# If process didn't terminate, force kill
time.sleep(1)
if self.current_process.is_running():
self.current_process.terminate(force=True)
# Update health state to reflect process stopped
self.health.update_stopped()
self.current_process = None
self.current_event_data = None
# Capture a screenshot ~1s after stop so the dashboard shows the cleared screen
self._trigger_event_screenshot("event_stop", 1.0)
# Turn off TV when display stops (with configurable delay)
if turn_off_tv:
self.cec.turn_off(delayed=True)
def start_presentation(self, event: Dict) -> Optional[DisplayProcess]:
"""Start presentation display (PDF/PowerPoint/LibreOffice) using Impressive
This method supports:
1. Native PDF files - used directly as slideshows
2. PPTX/PPT/ODP files - converted to PDF using LibreOffice headless
3. Auto-advance and loop support via Impressive PDF presenter
4. Falls back to evince/okular if Impressive not available
All presentation types support the same slideshow features:
- auto_advance: Enable automatic slide progression
- slide_interval: Seconds between slides (when auto_advance=true)
- loop: Restart presentation after last slide vs. exit
"""
try:
presentation = event.get('presentation', {})
files = presentation.get('files', [])
if not files:
logging.error("No presentation files specified")
return None
# Get first file
file_info = files[0]
filename = file_info.get('name') or file_info.get('filename')
if not filename:
logging.error("No filename in presentation event")
return None
file_path = os.path.join(PRESENTATION_DIR, filename)
if not os.path.exists(file_path):
logging.error(f"Presentation file not found: {file_path}")
return None
logging.info(f"Starting presentation: {filename}")
# Determine file type and get absolute path
file_ext = os.path.splitext(filename)[1].lower()
abs_file_path = os.path.abspath(file_path)
# Get presentation settings
auto_advance = presentation.get('auto_advance', False)
slide_interval = presentation.get('slide_interval', 10)
loop_enabled = presentation.get('loop', False)
# Get scheduler-specific progress display settings
# Prefer values under presentation, fallback to top-level for backward compatibility
page_progress = presentation.get('page_progress', event.get('page_progress', False))
auto_progress = presentation.get('auto_progress', event.get('auto_progress', False))
logging.debug(f"Resolved progress flags: page_progress={page_progress}, auto_progress={auto_progress}")
if auto_progress and not auto_advance:
logging.warning("auto_progress is true but auto_advance is false; Impressive's auto-progress is most useful with --auto")
# For timed events (with end time), default to loop mode to keep showing
# until the event expires, unless explicitly set to not loop
if not loop_enabled and 'end' in event:
logging.info("Event has end time - enabling loop mode to keep presentation active")
loop_enabled = True
# Handle different presentation file types
if file_ext == '.pdf':
# PDF files are used directly (no conversion needed)
logging.info(f"Using PDF file directly: {filename}")
elif file_ext in ['.pptx', '.ppt', '.odp']:
# Convert PPTX/PPT/ODP to PDF for Impressive
logging.info(f"Converting {file_ext} to PDF for Impressive...")
pdf_path = abs_file_path.rsplit('.', 1)[0] + '.pdf'
# Check if PDF already exists and is newer than source
pdf_exists = os.path.exists(pdf_path)
if pdf_exists:
pdf_mtime = os.path.getmtime(pdf_path)
source_mtime = os.path.getmtime(abs_file_path)
if pdf_mtime >= source_mtime:
logging.info(f"Using existing PDF: {os.path.basename(pdf_path)}")
abs_file_path = pdf_path
file_ext = '.pdf'
else:
logging.info("Source file newer than PDF, reconverting...")
pdf_exists = False
# Convert if needed
if not pdf_exists:
try:
convert_cmd = [
'libreoffice',
'--headless',
'--convert-to', 'pdf',
'--outdir', PRESENTATION_DIR,
abs_file_path
]
logging.debug(f"Conversion command: {' '.join(convert_cmd)}")
result = subprocess.run(
convert_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=60,
check=True
)
if os.path.exists(pdf_path):
logging.info(f"Converted to PDF: {os.path.basename(pdf_path)}")
abs_file_path = pdf_path
file_ext = '.pdf'
else:
logging.error("PDF conversion failed - file not created")
return None
except subprocess.TimeoutExpired:
logging.error("PDF conversion timed out after 60s")
return None
except subprocess.CalledProcessError as e:
logging.error(f"PDF conversion failed: {e.stderr.decode()}")
return None
except Exception as e:
logging.error(f"PDF conversion error: {e}")
return None
else:
# Unsupported file format
logging.error(f"Unsupported presentation format: {file_ext}")
logging.info("Supported formats: PDF (native), PPTX, PPT, ODP (converted to PDF)")
return None
# At this point we have a PDF file (either native or converted)
if file_ext != '.pdf':
logging.error(f"Internal error: Expected PDF but got {file_ext}")
return None
# Use impressive with venv environment (where pygame is installed)
if self._command_exists('impressive'):
impressive_bin = shutil.which('impressive') or 'impressive'
cmd = [impressive_bin, '--fullscreen', '--nooverview']
# Add auto-advance if requested
if auto_advance:
cmd.extend(['--auto', str(slide_interval)])
logging.info(f"Auto-advance enabled (interval: {slide_interval}s)")
# Add loop or autoquit based on setting
if loop_enabled:
cmd.append('--wrap')
logging.info("Loop mode enabled (presentation will restart after last slide)")
else:
cmd.append('--autoquit')
logging.info("Auto-quit enabled (will exit after last slide)")
# Add progress bar options from scheduler
if page_progress:
cmd.append('--page-progress')
logging.info("Page progress bar enabled (shows overall position in presentation)")
if auto_progress:
cmd.append('--auto-progress')
logging.info("Auto-progress bar enabled (shows per-page countdown during auto-advance)")
cmd.append(abs_file_path)
logging.info(f"Using Impressive PDF presenter with auto-advance support")
# Fallback to evince or okular without auto-advance
elif self._command_exists('evince'):
cmd = ['evince', '--presentation', abs_file_path]
logging.warning("Impressive not available, using evince (no auto-advance support)")
if auto_advance:
logging.warning(f"Auto-advance requested ({slide_interval}s) but evince doesn't support it")
if loop_enabled:
logging.warning("Loop mode requested but evince doesn't support it")
elif self._command_exists('okular'):
cmd = ['okular', '--presentation', abs_file_path]
logging.warning("Impressive not available, using okular (no auto-advance support)")
if auto_advance:
logging.warning(f"Auto-advance requested ({slide_interval}s) but okular doesn't support it")
if loop_enabled:
logging.warning("Loop mode requested but okular doesn't support it")
else:
logging.error("No PDF viewer found (impressive, evince, or okular)")
logging.info("Install Impressive: sudo apt install impressive")
return None
logging.debug(f"Full command: {' '.join(cmd)}")
# Start the process, redirect output to log file to avoid PIPE buffer issues
impressive_log_path = os.path.join(os.path.dirname(LOG_PATH), 'impressive.out.log')
os.makedirs(os.path.dirname(impressive_log_path), exist_ok=True)
impressive_log = open(impressive_log_path, 'ab', buffering=0)
# Set up environment to use venv where pygame is installed
env_vars = dict(os.environ)
# Ensure venv is activated for impressive (which needs pygame)
venv_path = os.path.join(os.path.dirname(__file__), '..', 'venv')
if os.path.exists(venv_path):
venv_bin = os.path.join(venv_path, 'bin')
# Prepend venv bin to PATH so impressive uses venv's python
current_path = env_vars.get('PATH', '')
env_vars['PATH'] = f"{venv_bin}:{current_path}"
env_vars['VIRTUAL_ENV'] = os.path.abspath(venv_path)
logging.debug(f"Using venv for impressive: {venv_path}")
# Set display
env_vars['DISPLAY'] = os.environ.get('DISPLAY', ':0')
process = subprocess.Popen(
cmd,
stdout=impressive_log,
stderr=subprocess.STDOUT,
env=env_vars,
preexec_fn=os.setsid # Create new process group for better cleanup
)
event_id = self.get_event_identifier(event)
logging.info(f"Presentation started with PID: {process.pid}")
# Update health state for monitoring
self.health.update_running(event_id, 'presentation', 'impressive', process.pid)
return DisplayProcess(process, 'presentation', event_id, log_file=impressive_log, log_path=impressive_log_path)
except Exception as e:
logging.error(f"Error starting presentation: {e}")
return None
def start_video(self, event: Dict) -> Optional[DisplayProcess]:
"""Start video playback"""
try:
video = event.get('video', {})
video_url = video.get('url')
vol_pct = self._calculate_video_volume_pct(video)
if not video_url:
logging.error("No video URL specified")
return None
# Normalize file-server host alias (e.g., http://server:8000/...) -> configured FILE_SERVER
video_url = self._resolve_file_url(video_url)
video_url = self._sanitize_media_url(video_url)
logging.info(f"Starting video: {video_url}")
# Prefer using python-vlc (libvlc) for finer control
try:
import vlc
except Exception:
vlc = None
if not vlc:
# Fallback to launching external vlc binary
if self._command_exists('vlc'):
external_muted = bool(video.get('muted', False) or vol_pct == 0)
external_gain = max(0.0, min(1.0, vol_pct / 100.0))
if external_muted:
logging.info("External VLC fallback: forcing mute via --no-audio")
else:
logging.info(
"External VLC fallback: applying startup gain %.2f from effective volume %d%%",
external_gain,
vol_pct,
)
cmd = [
'vlc',
'--fullscreen',
'--no-video-title-show',
'--loop' if video.get('loop', False) else '--play-and-exit',
video_url
]
if external_muted:
cmd.insert(-1, '--no-audio')
else:
# VLC CLI gain is a linear multiplier where 1.0 is default.
cmd.insert(-1, f'--gain={external_gain:.2f}')
video_log_path = os.path.join(os.path.dirname(LOG_PATH), 'video_player.out.log')
os.makedirs(os.path.dirname(video_log_path), exist_ok=True)
video_log = open(video_log_path, 'ab', buffering=0)
env_vars = {}
for k in ['PATH', 'HOME', 'USER', 'LOGNAME', 'SHELL', 'TERM', 'LANG', 'LC_ALL']:
if k in os.environ:
env_vars[k] = os.environ[k]
env_vars['DISPLAY'] = os.environ.get('DISPLAY', ':0')
process = subprocess.Popen(
cmd,
stdout=video_log,
stderr=subprocess.STDOUT,
env=env_vars,
preexec_fn=os.setsid
)
event_id = self.get_event_identifier(event)
logging.info(f"Video started with PID: {process.pid} (external vlc)")
# Update health state for monitoring
self.health.update_running(event_id, 'video', 'vlc', process.pid)
return DisplayProcess(process=process, event_type='video', event_id=event_id, log_file=video_log, log_path=video_log_path, volume_pct=vol_pct)
else:
logging.error("No video player found (python-vlc or vlc binary)")
return None
# Use libvlc via python-vlc
try:
# Keep python-vlc behavior aligned with external vlc fullscreen mode.
_muted = video.get('muted', False) or vol_pct == 0
_instance_args = ['--fullscreen', '--no-video-title-show']
if _muted:
# Disable audio output entirely at the instance level to avoid
# the race between async audio init and audio_set_volume(0).
_instance_args.append('--no-audio')
instance = vlc.Instance(*_instance_args)
def _force_fullscreen(player_obj, label: str):
"""Retry fullscreen toggle because video outputs may attach asynchronously."""
if not player_obj:
return
def _worker():
for delay in (0.0, 0.4, 1.0):
if delay > 0:
time.sleep(delay)
try:
player_obj.set_fullscreen(True)
except Exception as e:
logging.debug(f"Could not force fullscreen for {label}: {e}")
threading.Thread(target=_worker, daemon=True).start()
autoplay = bool(video.get('autoplay', True))
loop_flag = bool(video.get('loop', False))
if loop_flag:
# Use a plain MediaPlayer with :input-repeat so everything stays on
# our custom `instance` (e.g. with --no-audio when muted).
# MediaListPlayer() would create its own default instance,
# bypassing flags like --no-audio.
mp = instance.media_player_new()
m = instance.media_new(video_url)
m.add_option(':input-repeat=65535') # ~infinite loop
mp.set_media(m)
self._apply_vlc_volume(mp, vol_pct, 'python-vlc loop MediaPlayer')
if autoplay:
try:
mp.play()
self._apply_vlc_volume(mp, vol_pct, 'python-vlc loop MediaPlayer', retries=True)
_force_fullscreen(mp, 'python-vlc loop MediaPlayer')
except Exception as e:
logging.error(f"Failed to start loop media player: {e}")
event_id = self.get_event_identifier(event)
runtime_pid = os.getpid()
logging.info(f"Video started via python-vlc (loop MediaPlayer), runtime PID: {runtime_pid}")
self.health.update_running(event_id, 'video', 'vlc', runtime_pid)
return DisplayProcess(process=None, event_type='video', event_id=event_id, log_file=None, log_path=None, player=mp, volume_pct=vol_pct)
else:
# Single-play MediaPlayer
mp = instance.media_player_new()
media = instance.media_new(video_url)
mp.set_media(media)
self._apply_vlc_volume(mp, vol_pct, 'python-vlc MediaPlayer')
if autoplay:
try:
mp.play()
self._apply_vlc_volume(mp, vol_pct, 'python-vlc MediaPlayer', retries=True)
_force_fullscreen(mp, 'python-vlc MediaPlayer')
except Exception as e:
logging.error(f"Failed to start media player: {e}")
event_id = self.get_event_identifier(event)
runtime_pid = os.getpid()
logging.info(f"Video started via python-vlc (MediaPlayer), runtime PID: {runtime_pid}")
# python-vlc runs in-process (no external vlc child), so publish this process PID.
self.health.update_running(event_id, 'video', 'vlc', runtime_pid)
return DisplayProcess(process=None, event_type='video', event_id=event_id, log_file=None, log_path=None, player=mp, volume_pct=vol_pct)
except Exception as e:
logging.error(f"Error starting video with python-vlc: {e}")
return None
except Exception as e:
logging.error(f"Error starting video: {e}")
return None
def _resolve_file_url(self, url: str) -> str:
"""Resolve URLs that use the special host 'server' to the configured file server.
Priority:
- If FILE_SERVER_BASE_URL is set, use that as the base and append path/query
- Otherwise use FILE_SERVER_HOST (or default to MQTT_BROKER) + FILE_SERVER_PORT + FILE_SERVER_SCHEME
Examples:
http://server:8000/api/... -> http://<FILE_SERVER_HOST>:<PORT>/api/...
"""
try:
if not url:
return url
# Parse the incoming URL
parsed = urlparse(url)
hostname = (parsed.hostname or '').lower()
# Only rewrite when hostname is exactly 'server'
if hostname != 'server':
return url
# Helper to read env var and strip inline comments/whitespace
def _clean_env(name: str) -> Optional[str]:
v = os.getenv(name)
if v is None:
return None
# Remove inline comment starting with '#'
if '#' in v:
v = v.split('#', 1)[0]
v = v.strip()
return v or None
# FILE_SERVER_BASE_URL takes precedence
base = _clean_env('FILE_SERVER_BASE_URL')
if base:
# Ensure no trailing slash on base and preserve path
base = base.rstrip('/')
path = parsed.path or ''
if not path.startswith('/'):
path = '/' + path
new_url = base + path
if parsed.query:
new_url = new_url + '?' + parsed.query
logging.info(f"Rewriting 'server' URL using FILE_SERVER_BASE_URL: {new_url}")
return new_url
# Otherwise build from components
file_host = _clean_env('FILE_SERVER_HOST') or _clean_env('MQTT_BROKER')
file_port = _clean_env('FILE_SERVER_PORT')
file_scheme = _clean_env('FILE_SERVER_SCHEME') or 'http'
if not file_host:
logging.warning("FILE_SERVER_HOST and MQTT_BROKER are not set; leaving URL unchanged")
return url
netloc = file_host
if file_port:
netloc = f"{file_host}:{file_port}"
# Rebuild URL
new_url = f"{file_scheme}://{netloc}{parsed.path or ''}"
if parsed.query:
new_url = new_url + '?' + parsed.query
logging.info(f"Rewriting 'server' URL to: {new_url}")
return new_url
except Exception as e:
logging.debug(f"Error resolving file url '{url}': {e}")
return url
def _sanitize_media_url(self, url: str) -> str:
"""Percent-encode media URLs so VLC/ffmpeg handle spaces/unicode reliably.
Some event payloads include raw spaces or non-ASCII characters in URL paths.
Requests usually tolerates that, but VLC/ffmpeg can fail to open those URLs.
"""
try:
if not url:
return url
parts = urlsplit(url)
if parts.scheme not in ('http', 'https'):
return url
safe_path = quote(parts.path or '/', safe="/%:@!$&'()*+,;=-._~")
safe_query = quote(parts.query or '', safe="=&%:@!$'()*+,;/?-._~")
sanitized = urlunsplit((parts.scheme, parts.netloc, safe_path, safe_query, parts.fragment))
if sanitized != url:
logging.info("Sanitized media URL for VLC compatibility")
logging.debug(f"Media URL before: {url}")
logging.debug(f"Media URL after: {sanitized}")
return sanitized
except Exception as e:
logging.debug(f"Could not sanitize media URL '{url}': {e}")
return url
def start_webpage(self, event: Dict, autoscroll_enabled: bool = False) -> Optional[DisplayProcess]:
"""Start webpage display in kiosk mode"""
try:
# Support both legacy 'web' key and scheduler-provided 'website' object
web = event.get('web', {}) if isinstance(event.get('web', {}), dict) else {}
website = event.get('website', {}) if isinstance(event.get('website', {}), dict) else {}
# website.url takes precedence
url = website.get('url') or web.get('url')
# Resolve any 'server' host placeholders to configured file server
url = self._resolve_file_url(url)
# Normalize URL: if scheme missing, assume http
if url:
parsed = urlparse(url)
if not parsed.scheme:
logging.info(f"Normalizing URL by adding http:// scheme: {url} -> http://{url}")
url = f"http://{url}"
if not url:
logging.error("No web URL specified")
return None
logging.info(f"Starting webpage: {url}")
# Use Chromium in kiosk mode
if self._command_exists('chromium-browser'):
browser = 'chromium-browser'
elif self._command_exists('chromium'):
browser = 'chromium'
elif self._command_exists('google-chrome'):
browser = 'google-chrome'
else:
logging.error("No browser found (chromium or chrome)")
return None
cmd = [browser, '--remote-debugging-port=9222']
# If autoscroll is requested, load the small local extension that injects autoscroll
if autoscroll_enabled:
autoscroll_ext = os.path.join(os.path.dirname(__file__), 'chrome_autoscroll')
cmd.append(f"--load-extension={autoscroll_ext}")
# Common kiosk flags
cmd.extend([
'--kiosk',
'--no-first-run',
'--disable-infobars',
'--disable-session-crashed-bubble',
'--disable-restore-session-state',
url
])
browser_log_path = os.path.join(os.path.dirname(LOG_PATH), 'browser.out.log')
os.makedirs(os.path.dirname(browser_log_path), exist_ok=True)
browser_log = open(browser_log_path, 'ab', buffering=0)
env_vars = {}
# Only keep essential environment variables
for k in ['PATH', 'HOME', 'USER', 'LOGNAME', 'SHELL', 'TERM', 'LANG', 'LC_ALL']:
if k in os.environ:
env_vars[k] = os.environ[k]
# Set display settings
env_vars['DISPLAY'] = os.environ.get('DISPLAY', ':0')
process = subprocess.Popen(
cmd,
stdout=browser_log,
stderr=subprocess.STDOUT,
env=env_vars,
preexec_fn=os.setsid
)
event_id = self.get_event_identifier(event)
event_type = event.get('event_type', 'webpage') or 'webpage'
logging.info(f"Webpage started with PID: {process.pid}")
# Update health state for monitoring (track chromium browser)
self.health.update_running(event_id, event_type, browser, process.pid)
# Inject auto-scroll JS via Chrome DevTools Protocol (CDP) if enabled and available
if autoscroll_enabled:
try:
# Run injection in background thread so it doesn't block the main loop
t = threading.Thread(target=self._inject_autoscroll_cdp, args=(process.pid, url, 60), daemon=True)
t.start()
except Exception as e:
logging.debug(f"Autoscroll injection thread failed to start: {e}")
return DisplayProcess(process, 'webpage', event_id, log_file=browser_log, log_path=browser_log_path)
except Exception as e:
logging.error(f"Error starting webpage: {e}")
return None
def start_display_for_event(self, event: Dict) -> Optional[DisplayProcess]:
"""Start appropriate display software for the given event"""
process = None
handled = False
# First, respect explicit event_type if provided by scheduler
etype = event.get('event_type')
if etype:
etype = etype.lower()
if etype == 'presentation':
process = self.start_presentation(event)
handled = True
elif etype in ('webuntis', 'webpage', 'website'):
# webuntis and webpage both show a browser kiosk
# Ensure the URL is taken from 'website.url' or 'web.url'
# Normalize event to include a 'web' key so start_webpage can use it
if 'website' in event and isinstance(event['website'], dict):
# copy into 'web' for compatibility
event.setdefault('web', {})
if 'url' not in event['web']:
event['web']['url'] = event['website'].get('url')
# Only enable autoscroll for explicit scheduler event_type 'website'
autoscroll_flag = (etype == 'website')
process = self.start_webpage(event, autoscroll_enabled=autoscroll_flag)
handled = True
if not handled:
# Fallback to legacy keys
if 'presentation' in event:
process = self.start_presentation(event)
elif 'video' in event:
process = self.start_video(event)
elif 'web' in event:
process = self.start_webpage(event)
else:
logging.error(f"Unknown event type/structure: {list(event.keys())}")
if process is not None:
delay = self._get_trigger_delay(event)
self._trigger_event_screenshot("event_start", delay)
return process
def _command_exists(self, command: str) -> bool:
"""Check if a command exists in PATH"""
try:
subprocess.run(
['which', command],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True
)
return True
except subprocess.CalledProcessError:
return False
def _inject_autoscroll_cdp(self, browser_pid: int, url: str, duration_seconds: int = 10):
"""Connect to Chrome DevTools Protocol and inject an auto-scroll JS for duration_seconds.
This function assumes Chromium was started with --remote-debugging-port=9222.
It is non-blocking when called from a background thread.
"""
try:
# Lazy import websocket to keep runtime optional
try:
import websocket
except Exception:
logging.info("websocket-client not installed; skipping autoscroll injection")
return
# Discover DevTools targets (retry a few times; Chromium may not have opened the port yet)
targets = None
for attempt in range(5):
try:
resp = requests.get('http://127.0.0.1:9222/json', timeout=2)
targets = resp.json()
if targets:
break
except Exception as e:
logging.debug(f"Attempt {attempt+1}: could not fetch DevTools targets: {e}")
_time.sleep(0.5)
if not targets:
logging.debug('No DevTools targets discovered; skipping autoscroll')
return
# Try to find the matching target by URL (prefer exact match), then by substring, then fallback to first
target_ws = None
for t in targets:
turl = t.get('url', '')
if url and (turl == url or turl.startswith(url) or url in turl):
target_ws = t.get('webSocketDebuggerUrl')
logging.debug(f"Matched DevTools target by url: {turl}")
break
if not target_ws and targets:
target_ws = targets[0].get('webSocketDebuggerUrl')
logging.debug(f"Falling back to first DevTools target: {targets[0].get('url')}")
if not target_ws:
logging.debug('No DevTools websocket URL found; skipping autoscroll')
return
# Build the auto-scroll JS: scroll down over duration_seconds then jump back to top and repeat
dur_ms = int(duration_seconds * 1000)
js_template = (
"(function(){{"
"var duration = {duration_ms};"
"var stepMs = 50;"
"try{{"
" if(window.__autoScrollInterval){{ clearInterval(window.__autoScrollInterval); delete window.__autoScrollInterval; }}"
" var totalScroll = Math.max(document.documentElement.scrollHeight, document.body.scrollHeight) - window.innerHeight;"
" if(totalScroll <= 0){{ return; }}"
" var steps = Math.max(1, Math.round(duration/stepMs));"
" var stepPx = totalScroll/steps;"
" var step = 0;"
" window.__autoScrollInterval = setInterval(function(){{ window.scrollBy(0, stepPx); step++; if(step>=steps){{ window.scrollTo(0,0); step=0; }} }}, stepMs);"
" console.info('Auto-scroll started');"
"}}catch(e){{ console.error('Auto-scroll error', e); }}"
"}})();"
)
js = js_template.format(duration_ms=dur_ms)
# Connect via websocket and send Runtime.enable + Runtime.evaluate
# Some Chromium builds require a sensible Origin header during the websocket handshake
# to avoid a 403 Forbidden response. Use localhost origin which is safe for local control.
try:
ws = websocket.create_connection(target_ws, timeout=5, header=["Origin: http://127.0.0.1"])
except TypeError:
# Older websocket-client versions accept 'origin' keyword instead
ws = websocket.create_connection(target_ws, timeout=5, origin="http://127.0.0.1")
msg_id = 1
def send_recv(method, params=None):
nonlocal msg_id
payload = {"id": msg_id, "method": method}
if params:
payload["params"] = params
ws.send(_json.dumps(payload))
msg_id += 1
try:
return ws.recv()
except Exception as ex:
logging.debug(f"No response from DevTools for {method}: {ex}")
return None
# Enable runtime (some pages may require it)
send_recv('Runtime.enable')
# Evaluate the autoscroll script
resp = send_recv('Runtime.evaluate', {"expression": js, "awaitPromise": False})
logging.debug(f"DevTools evaluate response: {resp}")
try:
ws.close()
except Exception:
pass
logging.info(f"Attempted autoscroll injection for {duration_seconds}s into page {url}")
except Exception as e:
logging.debug(f"Error injecting autoscroll via CDP: {e}")
def process_events(self):
"""Main processing loop - check for event changes and manage display"""
event_data = self.read_event_file()
# No event file or empty event - stop current display
if not event_data:
if self.current_process:
logging.info("No active event - stopping current display")
self.stop_current_display()
return
# Handle event arrays (take first event)
events_to_process = event_data if isinstance(event_data, list) else [event_data]
if not events_to_process:
if self.current_process:
logging.info("Empty event list - stopping current display")
self.stop_current_display()
return
# Process first active event
active_event = None
for event in events_to_process:
if self.is_event_active(event):
active_event = event
break
if not active_event:
if self.current_process:
logging.info("No active events in time window - stopping current display")
self.stop_current_display()
return
# Get event identifier
event_id = self.get_event_identifier(active_event)
# Check if this is a new/different event
if self.current_process:
if self.current_process.event_id == event_id:
# Same event - check if process is still running
if not self.current_process.is_running():
exit_code = None
player_state = None
if getattr(self.current_process, 'process', None):
try:
exit_code = self.current_process.process.returncode
except Exception:
exit_code = None
elif getattr(self.current_process, 'player', None):
try:
player = self.current_process.player
if hasattr(player, 'get_state'):
player_state = player.get_state()
elif hasattr(player, 'get_media_player'):
mp = player.get_media_player()
if mp and hasattr(mp, 'get_state'):
player_state = mp.get_state()
except Exception as e:
logging.debug(f"Could not read VLC player state: {e}")
logging.warning(f"Display process exited (exit code: {exit_code})")
if player_state is not None:
logging.warning(f"VLC player state at exit detection: {player_state}")
# Try to surface last lines of the related log file, if any
if getattr(self.current_process, 'log_path', None):
try:
with open(self.current_process.log_path, 'rb') as lf:
lf.seek(0, os.SEEK_END)
size = lf.tell()
lf.seek(max(0, size - 4096), os.SEEK_SET)
tail = lf.read().decode('utf-8', errors='ignore')
logging.warning("Last output from process log:\n" + tail.splitlines()[-20:][0] if tail else "(no output)")
except Exception as e:
logging.debug(f"Could not read process log tail: {e}")
# Consider exit code 0 as normal if presentation used autoquit explicitly (no loop)
if self.current_process.event_type == 'presentation' and exit_code == 0:
logging.info("Presentation process ended with exit code 0 (likely normal completion).")
self.current_process = None
# Update health state to show normal completion
self.health.update_stopped()
# Don't turn off TV yet - event might still be active
return
# Process crashed unexpectedly
self.health.update_crashed()
self.health.update_restart_attempt()
if self.health.restart_count > self.health.max_restarts:
logging.error(f"Max restart attempts exceeded - giving up on {self.health.process_name}")
self.current_process = None
return
logging.info("Restarting display process...")
self.current_process = None
# Don't turn off TV when restarting same event
else:
# Everything is fine, continue
# Cancel any pending TV turn-off since event is still active
self.cec.cancel_turn_off()
self._apply_runtime_video_settings(active_event)
return
else:
# Different event - stop current and start new
logging.info(f"Event changed from {self.current_process.event_id} to {event_id}")
# Don't turn off TV when switching between events
self.stop_current_display(turn_off_tv=False)
# Start new display
logging.info(f"Starting display for event: {event_id}")
# Log event timing information for debugging
if 'start' in active_event:
logging.info(f" Event start time (UTC): {active_event['start']}")
if 'end' in active_event:
logging.info(f" Event end time (UTC): {active_event['end']}")
# Turn on TV before starting display
self.cec.turn_on()
new_process = self.start_display_for_event(active_event)
if new_process:
self.current_process = new_process
self.current_event_data = active_event
logging.info(f"Display started successfully for {event_id}")
else:
logging.error(f"Failed to start display for {event_id}")
def run(self):
"""Main run loop"""
logging.info("Display Manager starting...")
logging.info(f"Monitoring event file: {EVENT_FILE}")
logging.info(f"Check interval: {CHECK_INTERVAL}s")
logging.info(f"Screenshot capture interval: {SCREENSHOT_CAPTURE_INTERVAL}s (max width {SCREENSHOT_MAX_WIDTH}px, quality {SCREENSHOT_JPEG_QUALITY})")
# Log timezone information for debugging
now_utc = datetime.now(timezone.utc)
logging.info(f"Current time (UTC): {now_utc.strftime('%Y-%m-%d %H:%M:%S %Z')}")
logging.info("Event times are expected in UTC format")
while self.running:
try:
self.process_events()
time.sleep(CHECK_INTERVAL)
except Exception as e:
logging.error(f"Error in main loop: {e}", exc_info=True)
time.sleep(CHECK_INTERVAL)
logging.info("Display Manager stopped")
# -------------------------------------------------------------
# Screenshot capture subsystem
# -------------------------------------------------------------
def _write_screenshot_meta(self, capture_type: str, final_path: str, send_immediately: bool = False):
"""Write screenshots/meta.json atomically so simclient can detect new captures.
IMPORTANT: Protect event-triggered metadata from being overwritten by periodic captures.
If a periodic screenshot is captured while an event-triggered one is still pending
transmission (send_immediately=True), skip writing meta.json to preserve the event's metadata.
Args:
capture_type: 'periodic', 'event_start', or 'event_stop'
final_path: absolute path of the just-written screenshot file
send_immediately: True for triggered (event) captures, False for periodic ones
"""
try:
def _pending_trigger_is_valid(meta: Dict) -> bool:
"""Return True only for fresh, actionable pending trigger metadata.
This prevents a stale/corrupt pending flag from permanently blocking
periodic updates (meta.json/latest.jpg) if simclient was down or test
data left send_immediately=True behind.
"""
try:
if not meta.get('send_immediately'):
return False
mtype = str(meta.get('type') or '')
if mtype not in ('event_start', 'event_stop'):
return False
mfile = str(meta.get('file') or '').strip()
if not mfile:
return False
file_path = os.path.join(self.screenshot_dir, mfile)
if not os.path.exists(file_path):
logging.warning(
f"Ignoring stale pending screenshot meta: missing file '{mfile}'"
)
return False
captured_at_raw = meta.get('captured_at')
if not captured_at_raw:
return False
captured_at = datetime.fromisoformat(str(captured_at_raw).replace('Z', '+00:00'))
age_s = (datetime.now(timezone.utc) - captured_at.astimezone(timezone.utc)).total_seconds()
# Guard against malformed/future timestamps that could lock
# the pipeline by appearing permanently "fresh".
if age_s < -5:
logging.warning(
f"Ignoring invalid pending screenshot meta: future captured_at (age={age_s:.1f}s)"
)
return False
# Triggered screenshots should be consumed quickly (<= 1s). Use a
# generous safety window to avoid false negatives under load.
if age_s > 30:
logging.warning(
f"Ignoring stale pending screenshot meta: type={mtype}, age={age_s:.1f}s"
)
return False
return True
except Exception:
return False
meta_path = os.path.join(self.screenshot_dir, 'meta.json')
# PROTECTION: Don't overwrite pending event-triggered metadata with periodic capture
if not send_immediately and capture_type == "periodic":
try:
if os.path.exists(meta_path):
with open(meta_path, 'r', encoding='utf-8') as f:
existing_meta = json.load(f)
# If there's a pending event-triggered capture, skip this periodic write
if _pending_trigger_is_valid(existing_meta):
logging.debug(f"Skipping periodic meta.json to preserve pending {existing_meta.get('type')} (send_immediately=True)")
return
except Exception:
pass # If we can't read existing meta, proceed with writing new one
meta = {
"captured_at": datetime.now(timezone.utc).isoformat(),
"file": os.path.basename(final_path),
"type": capture_type,
"send_immediately": send_immediately,
}
tmp_path = meta_path + '.tmp'
with open(tmp_path, 'w', encoding='utf-8') as f:
json.dump(meta, f)
os.replace(tmp_path, meta_path)
logging.debug(f"Screenshot meta written: type={capture_type}, send_immediately={send_immediately}")
except Exception as e:
logging.debug(f"Could not write screenshot meta: {e}")
def _get_trigger_delay(self, event: Dict) -> float:
"""Return the post-launch capture delay in seconds appropriate for the event type."""
etype = (event.get('event_type') or '').lower()
if etype == 'presentation' or 'presentation' in event:
return float(SCREENSHOT_TRIGGER_DELAY_PRESENTATION)
if etype in ('webuntis', 'webpage', 'website') or 'web' in event:
return float(SCREENSHOT_TRIGGER_DELAY_WEB)
if 'video' in event:
return float(SCREENSHOT_TRIGGER_DELAY_VIDEO)
return float(SCREENSHOT_TRIGGER_DELAY_PRESENTATION) # safe default
def _trigger_event_screenshot(self, capture_type: str, delay: float):
"""Arm a one-shot timer to capture a triggered screenshot after *delay* seconds.
Cancels any already-pending trigger so rapid event switches only produce
one screenshot after the final transition settles, not one per intermediate state.
"""
if self._pending_trigger_timer is not None:
self._pending_trigger_timer.cancel()
self._pending_trigger_timer = None
def _do_capture():
self._pending_trigger_timer = None
self._capture_screenshot(capture_type)
t = threading.Timer(delay, _do_capture)
t.daemon = True
t.start()
self._pending_trigger_timer = t
logging.debug(f"Screenshot trigger armed: type={capture_type}, delay={delay}s")
def _screenshot_loop(self):
"""Background loop that captures screenshots periodically while an event is active.
Runs in a daemon thread. Only captures when a display process is active and running.
"""
last_capture = 0
while self.running:
try:
if SCREENSHOT_CAPTURE_INTERVAL <= 0:
time.sleep(60)
continue
now = time.time()
if now - last_capture >= SCREENSHOT_CAPTURE_INTERVAL:
process_active = bool(self.current_process and self.current_process.is_running())
# In development we keep dashboard screenshots fresh even when idle,
# otherwise dashboards can look "dead" with stale images.
capture_idle_in_dev = (ENV == "development")
if SCREENSHOT_ALWAYS or process_active or capture_idle_in_dev:
self._capture_screenshot()
last_capture = now
else:
# When no active process we can optionally capture blank screen once every 5 intervals
# but for now skip to avoid noise.
pass
time.sleep(1)
except Exception as e:
logging.debug(f"Screenshot loop error: {e}")
time.sleep(5)
def _capture_screenshot(self, capture_type: str = "periodic"):
"""Capture a screenshot of the current display and store it in the shared screenshots directory.
Strategy:
1. Prefer 'scrot' if available (fast, simple)
2. Fallback to ImageMagick 'import -window root'
3. Fallback to Pillow-based X11 grab using xwd pipe if available
4. If none available, log warning once.
Downscale and JPEG-compress for bandwidth savings.
Maintains a 'latest.jpg' file and rotates older screenshots beyond SCREENSHOT_MAX_FILES.
"""
try:
ts = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
raw_path = os.path.join(self.screenshot_dir, f'screenshot_{ts}.png')
final_path = os.path.join(self.screenshot_dir, f'screenshot_{ts}.jpg')
captured = False
def command_exists(cmd: str) -> bool:
try:
subprocess.run(['which', cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
return True
except Exception:
return False
# Detect session type
session = 'wayland' if os.environ.get('WAYLAND_DISPLAY') else 'x11'
display_env = os.environ.get('DISPLAY')
logging.debug(f"Screenshot session={session} DISPLAY={display_env} WAYLAND_DISPLAY={os.environ.get('WAYLAND_DISPLAY')}")
# Video planes can be black in compositor screenshots; try direct frame extraction first.
if self._capture_video_frame(raw_path):
captured = True
logging.debug("Screenshot source: direct video frame extraction")
if session == 'wayland':
# 1W: grim (wayland/sway, wlroots) captures root
if not captured and command_exists('grim'):
cmd = ['grim', raw_path]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=15)
if result.returncode == 0 and os.path.exists(raw_path):
captured = True
else:
logging.debug(f"grim failed rc={result.returncode}: {result.stderr.decode(errors='ignore')[:120]}")
# 2W: gnome-screenshot (GNOME Wayland)
if not captured and command_exists('gnome-screenshot'):
cmd = ['gnome-screenshot', '-f', raw_path]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=20)
if result.returncode == 0 and os.path.exists(raw_path):
captured = True
else:
logging.debug(f"gnome-screenshot failed rc={result.returncode}: {result.stderr.decode(errors='ignore')[:120]}")
# 3W: spectacle (KDE) if available
if not captured and command_exists('spectacle'):
cmd = ['spectacle', '--noninteractive', '--fullscreen', '--output', raw_path]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=20)
if result.returncode == 0 and os.path.exists(raw_path):
captured = True
else:
logging.debug(f"spectacle failed rc={result.returncode}: {result.stderr.decode(errors='ignore')[:120]}")
else:
# X11 path
# 1X: scrot
if not captured and command_exists('scrot'):
cmd = ['scrot', raw_path]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=15)
if result.returncode == 0 and os.path.exists(raw_path):
captured = True
else:
logging.debug(f"scrot failed rc={result.returncode}: {result.stderr.decode(errors='ignore')[:120]}")
# 2X: import (ImageMagick)
if not captured and command_exists('import'):
cmd = ['import', '-window', 'root', raw_path]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=15)
if result.returncode == 0 and os.path.exists(raw_path):
captured = True
else:
logging.debug(f"import failed rc={result.returncode}: {result.stderr.decode(errors='ignore')[:120]}")
# 3X: Try xwd pipe -> convert via ImageMagick if available
if not captured and command_exists('xwd') and command_exists('convert'):
xwd_file = os.path.join(self.screenshot_dir, f'xwd_{ts}.xwd')
try:
r1 = subprocess.run(['xwd', '-root', '-silent', '-out', xwd_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=15)
if r1.returncode == 0 and os.path.exists(xwd_file):
r2 = subprocess.run(['convert', xwd_file, raw_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=30)
if r2.returncode == 0 and os.path.exists(raw_path):
captured = True
if os.path.exists(xwd_file):
try:
os.remove(xwd_file)
except Exception:
pass
except Exception as e:
logging.debug(f"xwd/convert pipeline failed: {e}")
if not captured:
# Capture can fail in headless/TTY sessions even when tools exist.
logging.warning(
"Screenshot capture failed for current session "
f"(DISPLAY={os.environ.get('DISPLAY')}, "
f"WAYLAND_DISPLAY={os.environ.get('WAYLAND_DISPLAY')}, "
f"XDG_SESSION_TYPE={os.environ.get('XDG_SESSION_TYPE')}). "
"Ensure display-manager runs in a desktop session or exports DISPLAY/XAUTHORITY. "
"For X11 install/use 'scrot' or ImageMagick; for Wayland use 'grim' or 'gnome-screenshot'."
)
return
# Open image and downscale/compress
try:
from PIL import Image
with Image.open(raw_path) as im:
width, height = im.size
if width > SCREENSHOT_MAX_WIDTH > 0:
new_height = int(height * (SCREENSHOT_MAX_WIDTH / width))
im = im.resize((SCREENSHOT_MAX_WIDTH, new_height), Image.LANCZOS)
im = im.convert('RGB') # Ensure JPEG compatible
im.save(final_path, 'JPEG', quality=SCREENSHOT_JPEG_QUALITY, optimize=True)
except Exception as e:
logging.debug(f"Resize/compress error: {e}; keeping raw PNG")
final_path = raw_path # Fallback
# Remove raw PNG if we produced JPEG
if final_path != raw_path and os.path.exists(raw_path):
try:
os.remove(raw_path)
except Exception:
pass
# Maintain latest.jpg as an atomic copy so readers never see a missing
# or broken pointer while a new screenshot is being published.
# PROTECTION: Don't update latest.jpg for periodic captures if event-triggered is pending
should_update_latest = True
if capture_type == "periodic":
try:
meta_path = os.path.join(self.screenshot_dir, 'meta.json')
if os.path.exists(meta_path):
with open(meta_path, 'r', encoding='utf-8') as f:
existing_meta = json.load(f)
# If there's a pending event-triggered capture, don't update latest.jpg
if _pending_trigger_is_valid(existing_meta):
should_update_latest = False
logging.debug(f"Skipping latest.jpg update to preserve pending {existing_meta.get('type')} screenshot")
except Exception:
pass # If we can't read meta, proceed with updating latest.jpg
latest_link = os.path.join(self.screenshot_dir, 'latest.jpg')
if should_update_latest:
try:
latest_tmp = os.path.join(self.screenshot_dir, 'latest.jpg.tmp')
shutil.copyfile(final_path, latest_tmp)
os.replace(latest_tmp, latest_link)
except Exception as e:
logging.debug(f"Could not update latest.jpg: {e}")
# Rotate old screenshots
try:
files = sorted([f for f in os.listdir(self.screenshot_dir) if f.lower().startswith('screenshot_')], reverse=True)
if len(files) > SCREENSHOT_MAX_FILES:
for old in files[SCREENSHOT_MAX_FILES:]:
try:
os.remove(os.path.join(self.screenshot_dir, old))
except Exception:
pass
except Exception:
pass
size = None
try:
size = os.path.getsize(final_path)
except Exception:
pass
logged_size = size if size is not None else 'unknown'
self._write_screenshot_meta(capture_type, final_path, send_immediately=(capture_type != "periodic"))
logging.info(f"Screenshot captured: {os.path.basename(final_path)} ({logged_size} bytes) type={capture_type}")
except Exception as e:
logging.debug(f"Screenshot capture failure: {e}")
def _capture_video_frame(self, target_path: str) -> bool:
"""Capture a frame from the active video event directly.
Returns True when a frame was successfully written to target_path.
"""
try:
if not self.current_process or self.current_process.event_type != 'video':
return False
# Prefer python-vlc direct snapshot when available.
player = getattr(self.current_process, 'player', None)
if player:
media_player = None
if hasattr(player, 'video_take_snapshot'):
media_player = player
elif hasattr(player, 'get_media_player'):
try:
media_player = player.get_media_player()
except Exception:
media_player = None
if media_player and hasattr(media_player, 'video_take_snapshot'):
try:
rc = media_player.video_take_snapshot(0, target_path, 0, 0)
if rc == 0 and os.path.exists(target_path) and os.path.getsize(target_path) > 0:
return True
logging.debug(f"python-vlc snapshot failed with rc={rc}")
except Exception as e:
logging.debug(f"python-vlc snapshot error: {e}")
# Fallback: extract one frame from source URL using ffmpeg.
# This covers external VLC fallback mode where no python-vlc player object exists.
if not self._command_exists('ffmpeg'):
return False
video = self.current_event_data.get('video', {}) if isinstance(self.current_event_data, dict) else {}
source_url = video.get('url') if isinstance(video, dict) else None
source_url = self._resolve_file_url(source_url) if source_url else None
source_url = self._sanitize_media_url(source_url) if source_url else None
if not source_url:
return False
loop_enabled = bool(video.get('loop', False)) if isinstance(video, dict) else False
elapsed_seconds = 0.0
try:
elapsed_seconds = max(
0.0,
(datetime.now(timezone.utc) - self.current_process.start_time).total_seconds()
)
except Exception:
elapsed_seconds = 0.0
# Use a playback-relative seek point so repeated captures are not the same frame.
seek_seconds = max(0.2, elapsed_seconds)
duration_seconds = self._get_video_duration_seconds(source_url)
if duration_seconds and duration_seconds > 1.0:
safe_span = max(0.5, duration_seconds - 0.5)
if loop_enabled:
seek_seconds = 0.25 + (elapsed_seconds % safe_span)
else:
seek_seconds = min(seek_seconds, safe_span)
cmd = [
'ffmpeg',
'-y',
'-hide_banner',
'-loglevel', 'error',
'-ss', f'{seek_seconds:.3f}',
'-i', source_url,
'-frames:v', '1',
target_path,
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=20)
if result.returncode == 0 and os.path.exists(target_path) and os.path.getsize(target_path) > 0:
logging.debug(
"Video frame extracted via ffmpeg: seek=%.3fs elapsed=%.3fs duration=%s loop=%s",
seek_seconds,
elapsed_seconds,
f"{duration_seconds:.3f}s" if duration_seconds else "unknown",
loop_enabled,
)
return True
logging.debug(f"ffmpeg frame capture failed rc={result.returncode}: {result.stderr.decode(errors='ignore')[:160]}")
return False
except Exception as e:
logging.debug(f"Direct video frame capture failed: {e}")
return False
def _get_video_duration_seconds(self, source_url: str) -> Optional[float]:
"""Return media duration in seconds for a video URL, using a small in-memory cache."""
try:
cached = self._video_duration_cache.get(source_url)
if cached and cached > 0:
return cached
if not self._command_exists('ffprobe'):
return None
cmd = [
'ffprobe',
'-v', 'error',
'-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1',
source_url,
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10)
if result.returncode != 0:
return None
raw = result.stdout.decode('utf-8', errors='ignore').strip()
duration = float(raw)
if duration > 0:
self._video_duration_cache[source_url] = duration
return duration
return None
except Exception:
return None
def main():
"""Entry point"""
manager = DisplayManager()
manager.run()
if __name__ == "__main__":
main()