fix(screenshots): harden event-triggered MQTT screenshot flow and cleanup docs

- fix race where periodic captures could overwrite pending event_start and event_stop metadata before simclient published
- keep latest.jpg and meta.json synchronized so triggered screenshots are not lost
- add stale pending trigger self-healing to recover from old or invalid metadata states
- improve non-interactive capture reliability with DISPLAY and XAUTHORITY fallbacks
- allow periodic idle captures in development mode so dashboard previews stay fresh without active events
- add deeper simclient screenshot diagnostics for trigger and metadata handling
- add regression test script for metadata preservation and trigger delivery
- add root-cause and fix documentation for the screenshot MQTT issue
- align and deduplicate README screenshot and troubleshooting sections; update release notes to March 2026
- fix scripts/start-dev.sh .env loading to ignore comments safely and remove export invalid identifier warnings
This commit is contained in:
RobbStarkAustria
2026-03-29 10:38:29 +02:00
parent cda126018f
commit d6090a6179
7 changed files with 556 additions and 244 deletions

View File

@@ -39,6 +39,19 @@ for env_path in env_paths:
load_dotenv(env_path)
break
# Best-effort display env bootstrap for non-interactive starts (nohup/systemd/ssh).
# If both Wayland and X11 vars are missing, default to X11 :0 which is the
# common kiosk display on Raspberry Pi deployments.
if not os.environ.get("WAYLAND_DISPLAY") and not os.environ.get("DISPLAY"):
os.environ["DISPLAY"] = os.getenv("DISPLAY", ":0")
# X11 capture tools may also require XAUTHORITY when started outside a desktop
# session shell; default to ~/.Xauthority when available.
if os.environ.get("DISPLAY") and not os.environ.get("XAUTHORITY"):
xauth_default = os.path.join(os.path.expanduser("~"), ".Xauthority")
if os.path.exists(xauth_default):
os.environ["XAUTHORITY"] = xauth_default
# Configuration
ENV = os.getenv("ENV", "development")
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG" if ENV == "development" else "INFO")
@@ -48,6 +61,10 @@ SCREENSHOT_MAX_WIDTH = int(os.getenv("SCREENSHOT_MAX_WIDTH", "800")) # Width to
SCREENSHOT_JPEG_QUALITY = int(os.getenv("SCREENSHOT_JPEG_QUALITY", "70")) # JPEG quality 1-95
SCREENSHOT_MAX_FILES = int(os.getenv("SCREENSHOT_MAX_FILES", "20")) # Rotate old screenshots
SCREENSHOT_ALWAYS = os.getenv("SCREENSHOT_ALWAYS", "0").lower() in ("1","true","yes")
# Delay (seconds) before triggered screenshot fires after event start/stop
SCREENSHOT_TRIGGER_DELAY_PRESENTATION = int(os.getenv("SCREENSHOT_TRIGGER_DELAY_PRESENTATION", "4"))
SCREENSHOT_TRIGGER_DELAY_VIDEO = int(os.getenv("SCREENSHOT_TRIGGER_DELAY_VIDEO", "2"))
SCREENSHOT_TRIGGER_DELAY_WEB = int(os.getenv("SCREENSHOT_TRIGGER_DELAY_WEB", "5"))
CHECK_INTERVAL = int(os.getenv("DISPLAY_CHECK_INTERVAL", "5")) # seconds
PRESENTATION_DIR = os.path.join(os.path.dirname(__file__), "presentation")
EVENT_FILE = os.path.join(os.path.dirname(__file__), "current_event.json")
@@ -590,6 +607,9 @@ class DisplayManager:
self._screenshot_thread = threading.Thread(target=self._screenshot_loop, daemon=True)
self._screenshot_thread.start()
# Pending one-shot timer for event-triggered screenshots (event_start / event_stop)
self._pending_trigger_timer: Optional[threading.Timer] = None
self._load_client_settings(force=True)
def _normalize_volume_level(self, value, default: float = 1.0) -> float:
@@ -880,7 +900,9 @@ class DisplayManager:
self.health.update_stopped()
self.current_process = None
self.current_event_data = None
# Capture a screenshot ~1s after stop so the dashboard shows the cleared screen
self._trigger_event_screenshot("event_stop", 1.0)
# Turn off TV when display stops (with configurable delay)
if turn_off_tv:
self.cec.turn_off(delayed=True)
@@ -1431,13 +1453,17 @@ class DisplayManager:
def start_display_for_event(self, event: Dict) -> Optional[DisplayProcess]:
"""Start appropriate display software for the given event"""
process = None
handled = False
# First, respect explicit event_type if provided by scheduler
etype = event.get('event_type')
if etype:
etype = etype.lower()
if etype == 'presentation':
return self.start_presentation(event)
if etype in ('webuntis', 'webpage', 'website'):
process = self.start_presentation(event)
handled = True
elif etype in ('webuntis', 'webpage', 'website'):
# webuntis and webpage both show a browser kiosk
# Ensure the URL is taken from 'website.url' or 'web.url'
# Normalize event to include a 'web' key so start_webpage can use it
@@ -1448,18 +1474,25 @@ class DisplayManager:
event['web']['url'] = event['website'].get('url')
# Only enable autoscroll for explicit scheduler event_type 'website'
autoscroll_flag = (etype == 'website')
return self.start_webpage(event, autoscroll_enabled=autoscroll_flag)
process = self.start_webpage(event, autoscroll_enabled=autoscroll_flag)
handled = True
# Fallback to legacy keys
if 'presentation' in event:
return self.start_presentation(event)
elif 'video' in event:
return self.start_video(event)
elif 'web' in event:
return self.start_webpage(event)
else:
logging.error(f"Unknown event type/structure: {list(event.keys())}")
return None
if not handled:
# Fallback to legacy keys
if 'presentation' in event:
process = self.start_presentation(event)
elif 'video' in event:
process = self.start_video(event)
elif 'web' in event:
process = self.start_webpage(event)
else:
logging.error(f"Unknown event type/structure: {list(event.keys())}")
if process is not None:
delay = self._get_trigger_delay(event)
self._trigger_event_screenshot("event_start", delay)
return process
def _command_exists(self, command: str) -> bool:
"""Check if a command exists in PATH"""
@@ -1718,6 +1751,130 @@ class DisplayManager:
# -------------------------------------------------------------
# Screenshot capture subsystem
# -------------------------------------------------------------
def _write_screenshot_meta(self, capture_type: str, final_path: str, send_immediately: bool = False):
"""Write screenshots/meta.json atomically so simclient can detect new captures.
IMPORTANT: Protect event-triggered metadata from being overwritten by periodic captures.
If a periodic screenshot is captured while an event-triggered one is still pending
transmission (send_immediately=True), skip writing meta.json to preserve the event's metadata.
Args:
capture_type: 'periodic', 'event_start', or 'event_stop'
final_path: absolute path of the just-written screenshot file
send_immediately: True for triggered (event) captures, False for periodic ones
"""
try:
def _pending_trigger_is_valid(meta: Dict) -> bool:
"""Return True only for fresh, actionable pending trigger metadata.
This prevents a stale/corrupt pending flag from permanently blocking
periodic updates (meta.json/latest.jpg) if simclient was down or test
data left send_immediately=True behind.
"""
try:
if not meta.get('send_immediately'):
return False
mtype = str(meta.get('type') or '')
if mtype not in ('event_start', 'event_stop'):
return False
mfile = str(meta.get('file') or '').strip()
if not mfile:
return False
file_path = os.path.join(self.screenshot_dir, mfile)
if not os.path.exists(file_path):
logging.warning(
f"Ignoring stale pending screenshot meta: missing file '{mfile}'"
)
return False
captured_at_raw = meta.get('captured_at')
if not captured_at_raw:
return False
captured_at = datetime.fromisoformat(str(captured_at_raw).replace('Z', '+00:00'))
age_s = (datetime.now(timezone.utc) - captured_at.astimezone(timezone.utc)).total_seconds()
# Guard against malformed/future timestamps that could lock
# the pipeline by appearing permanently "fresh".
if age_s < -5:
logging.warning(
f"Ignoring invalid pending screenshot meta: future captured_at (age={age_s:.1f}s)"
)
return False
# Triggered screenshots should be consumed quickly (<= 1s). Use a
# generous safety window to avoid false negatives under load.
if age_s > 30:
logging.warning(
f"Ignoring stale pending screenshot meta: type={mtype}, age={age_s:.1f}s"
)
return False
return True
except Exception:
return False
meta_path = os.path.join(self.screenshot_dir, 'meta.json')
# PROTECTION: Don't overwrite pending event-triggered metadata with periodic capture
if not send_immediately and capture_type == "periodic":
try:
if os.path.exists(meta_path):
with open(meta_path, 'r', encoding='utf-8') as f:
existing_meta = json.load(f)
# If there's a pending event-triggered capture, skip this periodic write
if _pending_trigger_is_valid(existing_meta):
logging.debug(f"Skipping periodic meta.json to preserve pending {existing_meta.get('type')} (send_immediately=True)")
return
except Exception:
pass # If we can't read existing meta, proceed with writing new one
meta = {
"captured_at": datetime.now(timezone.utc).isoformat(),
"file": os.path.basename(final_path),
"type": capture_type,
"send_immediately": send_immediately,
}
tmp_path = meta_path + '.tmp'
with open(tmp_path, 'w', encoding='utf-8') as f:
json.dump(meta, f)
os.replace(tmp_path, meta_path)
logging.debug(f"Screenshot meta written: type={capture_type}, send_immediately={send_immediately}")
except Exception as e:
logging.debug(f"Could not write screenshot meta: {e}")
def _get_trigger_delay(self, event: Dict) -> float:
"""Return the post-launch capture delay in seconds appropriate for the event type."""
etype = (event.get('event_type') or '').lower()
if etype == 'presentation' or 'presentation' in event:
return float(SCREENSHOT_TRIGGER_DELAY_PRESENTATION)
if etype in ('webuntis', 'webpage', 'website') or 'web' in event:
return float(SCREENSHOT_TRIGGER_DELAY_WEB)
if 'video' in event:
return float(SCREENSHOT_TRIGGER_DELAY_VIDEO)
return float(SCREENSHOT_TRIGGER_DELAY_PRESENTATION) # safe default
def _trigger_event_screenshot(self, capture_type: str, delay: float):
"""Arm a one-shot timer to capture a triggered screenshot after *delay* seconds.
Cancels any already-pending trigger so rapid event switches only produce
one screenshot after the final transition settles, not one per intermediate state.
"""
if self._pending_trigger_timer is not None:
self._pending_trigger_timer.cancel()
self._pending_trigger_timer = None
def _do_capture():
self._pending_trigger_timer = None
self._capture_screenshot(capture_type)
t = threading.Timer(delay, _do_capture)
t.daemon = True
t.start()
self._pending_trigger_timer = t
logging.debug(f"Screenshot trigger armed: type={capture_type}, delay={delay}s")
def _screenshot_loop(self):
"""Background loop that captures screenshots periodically while an event is active.
@@ -1731,7 +1888,12 @@ class DisplayManager:
continue
now = time.time()
if now - last_capture >= SCREENSHOT_CAPTURE_INTERVAL:
if SCREENSHOT_ALWAYS or (self.current_process and self.current_process.is_running()):
process_active = bool(self.current_process and self.current_process.is_running())
# In development we keep dashboard screenshots fresh even when idle,
# otherwise dashboards can look "dead" with stale images.
capture_idle_in_dev = (ENV == "development")
if SCREENSHOT_ALWAYS or process_active or capture_idle_in_dev:
self._capture_screenshot()
last_capture = now
else:
@@ -1743,7 +1905,7 @@ class DisplayManager:
logging.debug(f"Screenshot loop error: {e}")
time.sleep(5)
def _capture_screenshot(self):
def _capture_screenshot(self, capture_type: str = "periodic"):
"""Capture a screenshot of the current display and store it in the shared screenshots directory.
Strategy:
@@ -1841,8 +2003,15 @@ class DisplayManager:
logging.debug(f"xwd/convert pipeline failed: {e}")
if not captured:
# Warn only occasionally
logging.warning("No screenshot tool available for current session. For X11, install 'scrot' or ImageMagick. For Wayland, install 'grim' or 'gnome-screenshot'.")
# Capture can fail in headless/TTY sessions even when tools exist.
logging.warning(
"Screenshot capture failed for current session "
f"(DISPLAY={os.environ.get('DISPLAY')}, "
f"WAYLAND_DISPLAY={os.environ.get('WAYLAND_DISPLAY')}, "
f"XDG_SESSION_TYPE={os.environ.get('XDG_SESSION_TYPE')}). "
"Ensure display-manager runs in a desktop session or exports DISPLAY/XAUTHORITY. "
"For X11 install/use 'scrot' or ImageMagick; for Wayland use 'grim' or 'gnome-screenshot'."
)
return
# Open image and downscale/compress
@@ -1868,13 +2037,29 @@ class DisplayManager:
# Maintain latest.jpg as an atomic copy so readers never see a missing
# or broken pointer while a new screenshot is being published.
latest_link = os.path.join(self.screenshot_dir, 'latest.jpg')
try:
latest_tmp = os.path.join(self.screenshot_dir, 'latest.jpg.tmp')
shutil.copyfile(final_path, latest_tmp)
os.replace(latest_tmp, latest_link)
except Exception as e:
logging.debug(f"Could not update latest.jpg: {e}")
# PROTECTION: Don't update latest.jpg for periodic captures if event-triggered is pending
should_update_latest = True
if capture_type == "periodic":
try:
meta_path = os.path.join(self.screenshot_dir, 'meta.json')
if os.path.exists(meta_path):
with open(meta_path, 'r', encoding='utf-8') as f:
existing_meta = json.load(f)
# If there's a pending event-triggered capture, don't update latest.jpg
if _pending_trigger_is_valid(existing_meta):
should_update_latest = False
logging.debug(f"Skipping latest.jpg update to preserve pending {existing_meta.get('type')} screenshot")
except Exception:
pass # If we can't read meta, proceed with updating latest.jpg
latest_link = os.path.join(self.screenshot_dir, 'latest.jpg')
if should_update_latest:
try:
latest_tmp = os.path.join(self.screenshot_dir, 'latest.jpg.tmp')
shutil.copyfile(final_path, latest_tmp)
os.replace(latest_tmp, latest_link)
except Exception as e:
logging.debug(f"Could not update latest.jpg: {e}")
# Rotate old screenshots
try:
@@ -1894,7 +2079,8 @@ class DisplayManager:
except Exception:
pass
logged_size = size if size is not None else 'unknown'
logging.info(f"Screenshot captured: {os.path.basename(final_path)} ({logged_size} bytes)")
self._write_screenshot_meta(capture_type, final_path, send_immediately=(capture_type != "periodic"))
logging.info(f"Screenshot captured: {os.path.basename(final_path)} ({logged_size} bytes) type={capture_type}")
except Exception as e:
logging.debug(f"Screenshot capture failure: {e}")

View File

@@ -144,6 +144,9 @@ logging.info(f"Monitoring logger initialized: {MONITORING_LOG_PATH}")
# Health state file (written by display_manager, read by simclient)
HEALTH_STATE_FILE = os.path.join(os.path.dirname(__file__), "current_process_health.json")
CLIENT_SETTINGS_FILE = os.path.join(os.path.dirname(__file__), "config", "client_settings.json")
# Screenshot IPC (written by display_manager, polled by simclient)
SCREENSHOT_DIR = os.path.join(os.path.dirname(__file__), "screenshots")
SCREENSHOT_META_FILE = os.path.join(SCREENSHOT_DIR, "meta.json")
discovered = False
@@ -635,19 +638,56 @@ def publish_log_message(client, client_id, level: str, message: str, context: di
logging.debug(f"Error publishing log: {e}")
def send_screenshot_heartbeat(client, client_id):
def _read_and_clear_meta():
"""Read screenshots/meta.json and atomically clear the send_immediately flag.
Returns the parsed dict (with the *original* send_immediately value) if the
file exists and is valid JSON, else None. The flag is cleared on disk before
returning so a crash between read and publish does not re-send on the next tick.
"""
try:
if not os.path.exists(SCREENSHOT_META_FILE):
return None
with open(SCREENSHOT_META_FILE, 'r', encoding='utf-8') as f:
meta = json.load(f)
if meta.get('send_immediately'):
# Write cleared copy atomically so the flag is gone before we return
cleared = dict(meta)
cleared['send_immediately'] = False
tmp_path = SCREENSHOT_META_FILE + '.tmp'
with open(tmp_path, 'w', encoding='utf-8') as f:
json.dump(cleared, f)
os.replace(tmp_path, SCREENSHOT_META_FILE)
return meta # original dict; send_immediately is True if it was set
except Exception as e:
logging.debug(f"Could not read screenshot meta: {e}")
return None
def send_screenshot_heartbeat(client, client_id, capture_type: str = "periodic"):
"""Send heartbeat with screenshot to server for dashboard monitoring"""
try:
screenshot_info = get_latest_screenshot()
# Also read health state and include in heartbeat
health = read_health_state()
# Compute screenshot age so the server can flag stale images
screenshot_age_s = None
if screenshot_info:
try:
ts = datetime.fromisoformat(screenshot_info["timestamp"])
screenshot_age_s = round((datetime.now(timezone.utc) - ts).total_seconds(), 1)
except Exception:
pass
heartbeat_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"client_id": client_id,
"status": "alive",
"screenshot_type": capture_type,
"screenshot": screenshot_info,
"screenshot_age_s": screenshot_age_s,
"system_info": {
"hostname": socket.gethostname(),
"ip": get_ip(),
@@ -685,18 +725,46 @@ def send_screenshot_heartbeat(client, client_id):
def screenshot_service_thread(client, client_id):
"""Background thread for screenshot monitoring and transmission"""
logging.info(f"Screenshot service started with {SCREENSHOT_INTERVAL}s interval")
"""Background thread for screenshot monitoring and transmission.
Runs on a 1-second tick. A heartbeat is sent when either:
- display_manager set send_immediately=True in screenshots/meta.json
(event_start / event_stop triggered captures); fired within <=1 second, OR
- the periodic SCREENSHOT_INTERVAL has elapsed since the last send.
The interval timer resets on every send, so a triggered send pushes out the
next periodic heartbeat rather than causing a double-send shortly after.
"""
logging.info(f"Screenshot service started with {SCREENSHOT_INTERVAL}s periodic interval")
last_sent = 0.0
last_meta_type = None
while True:
try:
send_screenshot_heartbeat(client, client_id)
time.sleep(SCREENSHOT_INTERVAL)
time.sleep(1)
now = time.time()
meta = _read_and_clear_meta()
triggered = bool(meta and meta.get('send_immediately'))
interval_due = (now - last_sent) >= SCREENSHOT_INTERVAL
if meta:
current_type = meta.get('type', 'unknown')
if current_type != last_meta_type:
logging.debug(f"Meta.json detected: type={current_type}, send_immediately={meta.get('send_immediately')}, file={meta.get('file')}")
last_meta_type = current_type
if triggered or interval_due:
capture_type = meta['type'] if (triggered and meta) else "periodic"
if triggered:
logging.info(f"Sending triggered screenshot: type={capture_type}")
send_screenshot_heartbeat(client, client_id, capture_type)
last_sent = now
except Exception as e:
logging.error(f"Screenshot service error: {e}")
time.sleep(60) # Wait a minute before retrying
def main():
global discovered
print(f"[{datetime.now(timezone.utc).isoformat()}] simclient.py: program started")