Compare commits

...

2 Commits

Author SHA1 Message Date
RobbStarkAustria
25cf4e3322 feat(mqtt): finalize dashboard screenshot payload v2 and trigger flow
- switch dashboard payload to grouped schema v2.0 in simclient
- support immediate event-triggered screenshot sends via meta.json signaling
- update README and copilot instructions to document v2 payload and trigger behavior
- update migration checklist to reflect completed client/server rollout
2026-03-30 17:53:58 +02:00
RobbStarkAustria
77db2bc565 Way to V2 messaging 2026-03-30 14:18:56 +02:00
6 changed files with 405 additions and 54 deletions

View File

@@ -12,6 +12,9 @@
-**Keep screenshot consent notice in docs** when describing dashboard screenshot feature -**Keep screenshot consent notice in docs** when describing dashboard screenshot feature
-**Event-start/event-stop screenshots must preserve metadata** - See SCREENSHOT_MQTT_FIX.md for critical race condition that was fixed -**Event-start/event-stop screenshots must preserve metadata** - See SCREENSHOT_MQTT_FIX.md for critical race condition that was fixed
-**Screenshot updates must keep `latest.jpg` and `meta.json` in sync** (simclient prefers `latest.jpg`) -**Screenshot updates must keep `latest.jpg` and `meta.json` in sync** (simclient prefers `latest.jpg`)
-**Dashboard payload uses grouped v2 schema** (`message/content/runtime/metadata`, `schema_version="2.0"`)
-**Event-triggered screenshots**: `display_manager` arms a `threading.Timer` after start/stop, captures, writes `meta.json` with `send_immediately=true`; simclient fires within ≤1s
-**Payload assembly is centralized** in `_build_dashboard_payload()` — do not build dashboard JSON at call sites
### Key Files & Locations ### Key Files & Locations
- **Display logic**: `src/display_manager.py` (controls presentations/video/web) - **Display logic**: `src/display_manager.py` (controls presentations/video/web)
@@ -488,31 +491,49 @@ The screenshot capture and transmission system has been implemented with separat
- **Rotation**: Keeps max N files (default 20), deletes older - **Rotation**: Keeps max N files (default 20), deletes older
- **Timing**: Production captures when display process is active (unless `SCREENSHOT_ALWAYS=1`); development allows periodic idle captures to keep dashboard fresh - **Timing**: Production captures when display process is active (unless `SCREENSHOT_ALWAYS=1`); development allows periodic idle captures to keep dashboard fresh
- **Reliability**: Stale/invalid pending trigger metadata is ignored automatically to avoid lock-up of periodic updates - **Reliability**: Stale/invalid pending trigger metadata is ignored automatically to avoid lock-up of periodic updates
- **Event-triggered captures**: `_trigger_event_screenshot(type, delay)` arms a one-shot `threading.Timer` after event start/stop; timer is cancelled and replaced on rapid event switches; default delays: presentation=4s, video=2s, web=5s (env-configurable)
- **IPC signal file** (`screenshots/meta.json`): written atomically by `display_manager` after each capture; contains `type`, `captured_at`, `file`, `send_immediately`; `send_immediately=true` for event-triggered, `false` for periodic
### Transmission Strategy (simclient.py) ### Transmission Strategy (simclient.py)
- **Source**: Prefers `screenshots/latest.jpg` if present, falls back to newest timestamped file - **Source**: Prefers `screenshots/latest.jpg` if present, falls back to newest timestamped file
- **Topic**: `infoscreen/{client_id}/dashboard` - **Topic**: `infoscreen/{client_id}/dashboard`
- **Format**: JSON with base64-encoded image data - **Format**: JSON with base64-encoded image data, grouped v2 schema
- **Payload Structure**: - **Schema version**: `"2.0"` (legacy flat fields removed; all fields grouped)
- **Payload builder**: `_build_dashboard_payload()` in `simclient.py` — single source of truth
- **Payload Structure** (v2):
```json ```json
{ {
"timestamp": "ISO datetime", "message": { "client_id": "UUID", "status": "alive" },
"client_id": "UUID", "content": {
"status": "alive",
"screenshot": { "screenshot": {
"filename": "latest.jpg", "filename": "latest.jpg",
"data": "base64...", "data": "base64...",
"timestamp": "ISO datetime", "timestamp": "ISO datetime",
"size": 12345 "size": 12345
}
}, },
"system_info": { "runtime": {
"hostname": "...", "system_info": { "hostname": "...", "ip": "...", "uptime": 123456.78 },
"ip": "...", "process_health": { "event_type": "...", "process_status": "...", ... }
"uptime": 123456.78 },
"metadata": {
"schema_version": "2.0",
"producer": "simclient",
"published_at": "ISO datetime",
"capture": {
"type": "periodic | event_start | event_stop",
"captured_at": "ISO datetime",
"age_s": 0.9,
"triggered": false,
"send_immediately": false
},
"transport": { "topic": "infoscreen/.../dashboard", "qos": 0, "publisher": "simclient" }
} }
} }
``` ```
- **Logging**: Logs publish success/failure with file size for monitoring - **Capture types**: `periodic` (interval-based), `event_start` (N seconds after event launch), `event_stop` (1s after process killed)
- **Triggered send**: `display_manager` sets `send_immediately=true` in `meta.json`; simclient 1-second tick detects and fires within ≤1s
- **Logging**: `Dashboard published: schema=2.0 type=<type> screenshot=<file> (<bytes>) age=<s>`
### Scalability Considerations ### Scalability Considerations
- **Client-side resize/compress**: Reduces bandwidth and broker load (recommended for 50+ clients) - **Client-side resize/compress**: Reduces bandwidth and broker load (recommended for 50+ clients)

View File

@@ -0,0 +1,61 @@
# MQTT Payload Migration Checklist (One Page)
Use this checklist to migrate from legacy flat dashboard payload to grouped v2 payload.
## A. Client Implementation
- [x] Create branch for migration work.
- [x] Capture one baseline message from MQTT (legacy format).
- [x] Implement one canonical payload builder function.
- [x] Emit grouped blocks in this order: `message`, `content`, `runtime`, `metadata`.
- [x] Add `metadata.schema_version = "2.0"`.
- [x] Add `metadata.producer = "simclient"`.
- [x] Add `metadata.published_at` in UTC ISO format.
- [x] Map capture type to `metadata.capture.type` (`periodic`, `event_start`, `event_stop`).
- [x] Map screenshot freshness to `metadata.capture.age_s`.
- [x] Keep screenshot object unchanged in semantics (`filename`, `data`, `timestamp`, `size`).
- [x] Keep trigger behavior unchanged (periodic and triggered sends still work).
- [x] Add publish log fields: schema version, capture type, age.
- [x] Validate all 3 paths end-to-end:
- [x] periodic
- [x] event_start
- [x] event_stop
## B. Server Migration
- [x] Add grouped v2 parser (`message/content/runtime/metadata`).
- [x] Add temporary legacy fallback parser.
- [x] Normalize both parsers into one internal server model.
- [x] Mark required fields:
- [x] `message.client_id`
- [x] `message.status`
- [x] `metadata.schema_version`
- [x] `metadata.capture.type`
- [x] Keep optional fields tolerated (`runtime.process_health`, `content.screenshot`).
- [x] Update dashboard consumers to use normalized model (not raw legacy keys).
- [x] Add migration counters:
- [x] v2 parse success
- [x] legacy fallback usage
- [x] parse failures
- [x] Test compatibility matrix:
- [x] new client -> new server
- [x] legacy client -> new server
- [x] Run short soak in dev.
## C. Cutover and Cleanup
- [ ] Set v2 as primary parser path on server.
- [ ] Confirm fallback usage is near zero for agreed window.
- [ ] Remove legacy parser/fallback.
- [ ] Remove client-side temporary compatibility fields (if used).
- [ ] Keep one canonical schema sample in repo.
- [ ] Close migration ticket with final validation evidence.
## Quick Go/No-Go Gate
Go only if all are true:
- [ ] No parse failures in dev soak
- [ ] All 3 capture types visible in dashboard
- [ ] Screenshot payload integrity unchanged
- [ ] Metadata group present and complete

View File

@@ -0,0 +1,194 @@
# MQTT Payload Migration Guide
## Purpose
This guide describes a practical migration from the current dashboard screenshot payload to a grouped schema, with client-side implementation first and server-side migration second.
## Scope
- Environment: development and alpha systems (no production installs)
- Message topic: infoscreen/<client_id>/dashboard
- Capture types to preserve: periodic, event_start, event_stop
## Target Schema (v2)
The canonical message should be grouped into four logical blocks in this order:
1. message
2. content
3. runtime
4. metadata
Example shape:
```json
{
"message": {
"client_id": "<uuid>",
"status": "alive"
},
"content": {
"screenshot": {
"filename": "latest.jpg",
"data": "<base64>",
"timestamp": "2026-03-30T10:15:41.123456+00:00",
"size": 183245
}
},
"runtime": {
"system_info": {
"hostname": "pi-display-01",
"ip": "192.168.1.42",
"uptime": 123456.7
},
"process_health": {
"event_id": "evt-123",
"event_type": "presentation",
"current_process": "impressive",
"process_pid": 4123,
"process_status": "running",
"restart_count": 0
}
},
"metadata": {
"schema_version": "2.0",
"producer": "simclient",
"published_at": "2026-03-30T10:15:42.004321+00:00",
"capture": {
"type": "periodic",
"captured_at": "2026-03-30T10:15:41.123456+00:00",
"age_s": 0.9,
"triggered": false,
"send_immediately": false
},
"transport": {
"qos": 0,
"publisher": "simclient"
}
}
}
```
## Step-by-Step: Client-Side First
1. Create a migration branch.
- Example: feature/payload-v2
2. Freeze a baseline sample from MQTT.
- Capture one payload via mosquitto_sub and store it for comparison.
3. Implement one canonical payload builder.
- Centralize JSON assembly in one function only.
- Do not duplicate payload construction across code paths.
4. Add versioned metadata.
- Set metadata.schema_version = "2.0".
- Add metadata.producer = "simclient".
- Add metadata.published_at in UTC ISO format.
5. Map existing data into grouped blocks.
- client_id/status -> message
- screenshot object -> content.screenshot
- system_info/process_health -> runtime
- capture mode and freshness -> metadata.capture
6. Preserve existing capture semantics.
- Keep type values unchanged: periodic, event_start, event_stop.
- Keep UTC ISO timestamps.
- Keep screenshot encoding and size behavior unchanged.
7. Optional short-term compatibility mode (recommended for one sprint).
- Either:
- Keep current legacy fields in parallel, or
- Add a legacy block with old field names.
- Goal: prevent immediate server breakage while parser updates are merged.
8. Improve publish logs for verification.
- Log schema_version, metadata.capture.type, metadata.capture.age_s.
9. Validate all three capture paths end-to-end.
- periodic capture
- event_start trigger capture
- event_stop trigger capture
10. Lock the client contract.
- Save one validated JSON sample per capture type.
- Use those samples in server parser tests.
## Step-by-Step: Server-Side Migration
1. Add support for grouped v2 parsing.
- Parse from message/content/runtime/metadata first.
2. Add fallback parser for legacy payload (temporary).
- If grouped keys are absent, parse old top-level keys.
3. Normalize to one internal server model.
- Convert both parser paths into one DTO/entity used by dashboard logic.
4. Validate required fields.
- Required:
- message.client_id
- message.status
- metadata.schema_version
- metadata.capture.type
- Optional:
- runtime.process_health
- content.screenshot (if no screenshot available)
5. Update dashboard consumers.
- Read grouped fields from internal model (not raw old keys).
6. Add migration observability.
- Counters:
- v2 parse success
- legacy fallback usage
- parse failures
- Warning log for unknown schema_version.
7. Run mixed-format integration tests.
- New client -> new server
- Legacy client -> new server (fallback path)
8. Cut over to v2 preferred.
- Keep fallback for short soak period only.
9. Remove fallback and legacy assumptions.
- After stability window, remove old parser path.
10. Final cleanup.
- Keep one schema doc and test fixtures.
- Remove temporary compatibility switches.
## Legacy to v2 Field Mapping
| Legacy field | v2 field |
|---|---|
| client_id | message.client_id |
| status | message.status |
| screenshot | content.screenshot |
| screenshot_type | metadata.capture.type |
| screenshot_age_s | metadata.capture.age_s |
| timestamp | metadata.published_at |
| system_info | runtime.system_info |
| process_health | runtime.process_health |
## Acceptance Criteria
1. All capture types parse and display correctly.
- periodic
- event_start
- event_stop
2. Screenshot payload integrity is unchanged.
- filename, data, timestamp, size remain valid.
3. Metadata is centrally visible at message end.
- schema_version, capture metadata, transport metadata all inside metadata.
4. No regression in dashboard update timing.
- Triggered screenshots still publish quickly.
## Suggested Timeline (Dev Only)
1. Day 1: client v2 payload implementation + local tests
2. Day 2: server v2 parser + fallback
3. Day 3-5: soak in dev, monitor parse metrics
4. Day 6+: remove fallback and finalize v2-only

View File

@@ -394,7 +394,7 @@ The MQTT client ([src/simclient.py](src/simclient.py)) downloads presentation fi
#### Client → Server #### Client → Server
- `infoscreen/discovery` - Initial client announcement - `infoscreen/discovery` - Initial client announcement
- `infoscreen/{client_id}/heartbeat` - Regular status updates - `infoscreen/{client_id}/heartbeat` - Regular status updates
- `infoscreen/{client_id}/dashboard` - Screenshot images (base64) - `infoscreen/{client_id}/dashboard` - Dashboard payload v2 (grouped schema: message/content/runtime/metadata, includes screenshot base64, capture type, schema version)
- `infoscreen/{client_id}/health` - Process health state (`event_id`, process, pid, status) - `infoscreen/{client_id}/health` - Process health state (`event_id`, process, pid, status)
- `infoscreen/{client_id}/logs/error` - Forwarded client error logs - `infoscreen/{client_id}/logs/error` - Forwarded client error logs
- `infoscreen/{client_id}/logs/warn` - Forwarded client warning logs - `infoscreen/{client_id}/logs/warn` - Forwarded client warning logs
@@ -587,7 +587,8 @@ stat src/screenshots/latest.jpg
**Verify simclient is reading screenshots:** **Verify simclient is reading screenshots:**
```bash ```bash
tail -f logs/simclient.log | grep -i screenshot tail -f logs/simclient.log | grep -i screenshot
# Should show: "Dashboard heartbeat sent with screenshot: latest.jpg" # Should show: "Dashboard published: schema=2.0 type=periodic screenshot=latest.jpg"
# For event transitions: "Dashboard published: schema=2.0 type=event_start ..."
``` ```
## 📚 Documentation ## 📚 Documentation
@@ -771,3 +772,8 @@ For issues or questions:
- Stale/invalid pending trigger metadata now self-heals instead of blocking periodic updates. - Stale/invalid pending trigger metadata now self-heals instead of blocking periodic updates.
- Display environment fallbacks (`DISPLAY=:0`, `XAUTHORITY`) improved for non-interactive starts. - Display environment fallbacks (`DISPLAY=:0`, `XAUTHORITY`) improved for non-interactive starts.
- Development mode allows periodic idle captures to keep dashboard previews fresh when no event is active. - Development mode allows periodic idle captures to keep dashboard previews fresh when no event is active.
- Event-triggered screenshots added: `display_manager` captures a screenshot shortly after every event start and stop and signals `simclient` via `meta.json` (`send_immediately=true`). Capture delays are content-type aware (presentation: 4s, video: 2s, web: 5s, configurable via `.env`).
- `simclient` screenshot service thread now runs on a 1-second tick instead of a blocking sleep, so triggered sends fire within ≤1s of the `meta.json` signal.
- Dashboard payload migrated to grouped v2 schema (`message`, `content`, `runtime`, `metadata`). Legacy flat fields removed. `metadata.schema_version` is `"2.0"`. Payload assembly centralized in `_build_dashboard_payload()`.
- Tunable trigger delays added: `SCREENSHOT_TRIGGER_DELAY_PRESENTATION`, `SCREENSHOT_TRIGGER_DELAY_VIDEO`, `SCREENSHOT_TRIGGER_DELAY_WEB`.
- Rapid event switches handled safely: pending trigger timer is cancelled and replaced when a new event starts before the delay expires.

File diff suppressed because one or more lines are too long

View File

@@ -664,15 +664,12 @@ def _read_and_clear_meta():
return None return None
def send_screenshot_heartbeat(client, client_id, capture_type: str = "periodic"): def _build_dashboard_payload(client_id: str, screenshot_info: dict, health: dict, capture_type: str, trigger_meta: dict = None) -> dict:
"""Send heartbeat with screenshot to server for dashboard monitoring""" """Build the dashboard payload in one canonical place.
try:
screenshot_info = get_latest_screenshot()
# Also read health state and include in heartbeat Keeping payload assembly centralized avoids schema drift across call sites.
health = read_health_state() """
published_at = datetime.now(timezone.utc).isoformat()
# Compute screenshot age so the server can flag stale images
screenshot_age_s = None screenshot_age_s = None
if screenshot_info: if screenshot_info:
try: try:
@@ -681,23 +678,17 @@ def send_screenshot_heartbeat(client, client_id, capture_type: str = "periodic")
except Exception: except Exception:
pass pass
heartbeat_data = { capture_meta = {
"timestamp": datetime.now(timezone.utc).isoformat(), "type": capture_type,
"client_id": client_id, "captured_at": (trigger_meta or {}).get("captured_at") or (screenshot_info or {}).get("timestamp"),
"status": "alive", "age_s": screenshot_age_s,
"screenshot_type": capture_type, "triggered": bool(trigger_meta and trigger_meta.get("send_immediately")),
"screenshot": screenshot_info, "send_immediately": bool(trigger_meta and trigger_meta.get("send_immediately")),
"screenshot_age_s": screenshot_age_s,
"system_info": {
"hostname": socket.gethostname(),
"ip": get_ip(),
"uptime": time.time() # Could be replaced with actual uptime
}
} }
# Include health info if available (from display_manager) process_health_payload = None
if health: if health:
heartbeat_data["process_health"] = { process_health_payload = {
"event_id": health.get("event_id"), "event_id": health.get("event_id"),
"event_type": health.get("event_type"), "event_type": health.get("event_type"),
"current_process": health.get("current_process"), "current_process": health.get("current_process"),
@@ -706,15 +697,67 @@ def send_screenshot_heartbeat(client, client_id, capture_type: str = "periodic")
"restart_count": health.get("restart_count", 0) "restart_count": health.get("restart_count", 0)
} }
payload = {
"message": {
"client_id": client_id,
"status": "alive",
},
"content": {
"screenshot": screenshot_info,
},
"runtime": {
"system_info": {
"hostname": socket.gethostname(),
"ip": get_ip(),
"uptime": time.time(),
},
"process_health": process_health_payload,
},
"metadata": {
"schema_version": "2.0",
"producer": "simclient",
"published_at": published_at,
"capture": capture_meta,
"transport": {
"topic": f"infoscreen/{client_id}/dashboard",
"qos": 0,
"publisher": "simclient",
},
},
}
return payload
def send_screenshot_heartbeat(client, client_id, capture_type: str = "periodic", trigger_meta: dict = None):
"""Send heartbeat with screenshot to server for dashboard monitoring"""
try:
screenshot_info = get_latest_screenshot()
# Also read health state and include in heartbeat
health = read_health_state()
heartbeat_data = _build_dashboard_payload(
client_id=client_id,
screenshot_info=screenshot_info,
health=health,
capture_type=capture_type,
trigger_meta=trigger_meta,
)
# Send to dashboard monitoring topic # Send to dashboard monitoring topic
dashboard_topic = f"infoscreen/{client_id}/dashboard" dashboard_topic = f"infoscreen/{client_id}/dashboard"
payload = json.dumps(heartbeat_data) payload = json.dumps(heartbeat_data)
res = client.publish(dashboard_topic, payload, qos=0) res = client.publish(dashboard_topic, payload, qos=0)
if res.rc == mqtt.MQTT_ERR_SUCCESS: if res.rc == mqtt.MQTT_ERR_SUCCESS:
age_str = f", age={heartbeat_data['metadata']['capture']['age_s']}s" if heartbeat_data['metadata']['capture']['age_s'] is not None else ""
if screenshot_info: if screenshot_info:
logging.info(f"Dashboard heartbeat sent with screenshot: {screenshot_info['filename']} ({screenshot_info['size']} bytes)") logging.info(
f"Dashboard published: schema=2.0 type={capture_type}"
f" screenshot={screenshot_info['filename']} ({screenshot_info['size']} bytes){age_str}"
)
else: else:
logging.info("Dashboard heartbeat sent (no screenshot available)") logging.info(f"Dashboard published: schema=2.0 type={capture_type} (no screenshot)")
elif res.rc == mqtt.MQTT_ERR_NO_CONN: elif res.rc == mqtt.MQTT_ERR_NO_CONN:
logging.warning("Dashboard heartbeat publish returned NO_CONN; will retry on next interval") logging.warning("Dashboard heartbeat publish returned NO_CONN; will retry on next interval")
else: else:
@@ -757,7 +800,7 @@ def screenshot_service_thread(client, client_id):
capture_type = meta['type'] if (triggered and meta) else "periodic" capture_type = meta['type'] if (triggered and meta) else "periodic"
if triggered: if triggered:
logging.info(f"Sending triggered screenshot: type={capture_type}") logging.info(f"Sending triggered screenshot: type={capture_type}")
send_screenshot_heartbeat(client, client_id, capture_type) send_screenshot_heartbeat(client, client_id, capture_type, trigger_meta=meta)
last_sent = now last_sent = now
except Exception as e: except Exception as e:
logging.error(f"Screenshot service error: {e}") logging.error(f"Screenshot service error: {e}")