Way to V2 messaging
This commit is contained in:
61
MQTT_PAYLOAD_MIGRATION_CHECKLIST.md
Normal file
61
MQTT_PAYLOAD_MIGRATION_CHECKLIST.md
Normal file
@@ -0,0 +1,61 @@
|
||||
# MQTT Payload Migration Checklist (One Page)
|
||||
|
||||
Use this checklist to migrate from legacy flat dashboard payload to grouped v2 payload.
|
||||
|
||||
## A. Client Implementation
|
||||
|
||||
- [ ] Create branch for migration work.
|
||||
- [ ] Capture one baseline message from MQTT (legacy format).
|
||||
- [ ] Implement one canonical payload builder function.
|
||||
- [ ] Emit grouped blocks in this order: `message`, `content`, `runtime`, `metadata`.
|
||||
- [ ] Add `metadata.schema_version = "2.0"`.
|
||||
- [ ] Add `metadata.producer = "simclient"`.
|
||||
- [ ] Add `metadata.published_at` in UTC ISO format.
|
||||
- [ ] Map capture type to `metadata.capture.type` (`periodic`, `event_start`, `event_stop`).
|
||||
- [ ] Map screenshot freshness to `metadata.capture.age_s`.
|
||||
- [ ] Keep screenshot object unchanged in semantics (`filename`, `data`, `timestamp`, `size`).
|
||||
- [ ] Keep trigger behavior unchanged (periodic and triggered sends still work).
|
||||
- [ ] Add publish log fields: schema version, capture type, age.
|
||||
- [ ] Validate all 3 paths end-to-end:
|
||||
- [ ] periodic
|
||||
- [ ] event_start
|
||||
- [ ] event_stop
|
||||
|
||||
## B. Server Migration
|
||||
|
||||
- [ ] Add grouped v2 parser (`message/content/runtime/metadata`).
|
||||
- [ ] Add temporary legacy fallback parser.
|
||||
- [ ] Normalize both parsers into one internal server model.
|
||||
- [ ] Mark required fields:
|
||||
- [ ] `message.client_id`
|
||||
- [ ] `message.status`
|
||||
- [ ] `metadata.schema_version`
|
||||
- [ ] `metadata.capture.type`
|
||||
- [ ] Keep optional fields tolerated (`runtime.process_health`, `content.screenshot`).
|
||||
- [ ] Update dashboard consumers to use normalized model (not raw legacy keys).
|
||||
- [ ] Add migration counters:
|
||||
- [ ] v2 parse success
|
||||
- [ ] legacy fallback usage
|
||||
- [ ] parse failures
|
||||
- [ ] Test compatibility matrix:
|
||||
- [ ] new client -> new server
|
||||
- [ ] legacy client -> new server
|
||||
- [ ] Run short soak in dev.
|
||||
|
||||
## C. Cutover and Cleanup
|
||||
|
||||
- [ ] Set v2 as primary parser path on server.
|
||||
- [ ] Confirm fallback usage is near zero for agreed window.
|
||||
- [ ] Remove legacy parser/fallback.
|
||||
- [ ] Remove client-side temporary compatibility fields (if used).
|
||||
- [ ] Keep one canonical schema sample in repo.
|
||||
- [ ] Close migration ticket with final validation evidence.
|
||||
|
||||
## Quick Go/No-Go Gate
|
||||
|
||||
Go only if all are true:
|
||||
|
||||
- [ ] No parse failures in dev soak
|
||||
- [ ] All 3 capture types visible in dashboard
|
||||
- [ ] Screenshot payload integrity unchanged
|
||||
- [ ] Metadata group present and complete
|
||||
194
MQTT_PAYLOAD_MIGRATION_GUIDE.md
Normal file
194
MQTT_PAYLOAD_MIGRATION_GUIDE.md
Normal file
@@ -0,0 +1,194 @@
|
||||
# MQTT Payload Migration Guide
|
||||
|
||||
## Purpose
|
||||
This guide describes a practical migration from the current dashboard screenshot payload to a grouped schema, with client-side implementation first and server-side migration second.
|
||||
|
||||
## Scope
|
||||
- Environment: development and alpha systems (no production installs)
|
||||
- Message topic: infoscreen/<client_id>/dashboard
|
||||
- Capture types to preserve: periodic, event_start, event_stop
|
||||
|
||||
## Target Schema (v2)
|
||||
The canonical message should be grouped into four logical blocks in this order:
|
||||
|
||||
1. message
|
||||
2. content
|
||||
3. runtime
|
||||
4. metadata
|
||||
|
||||
Example shape:
|
||||
|
||||
```json
|
||||
{
|
||||
"message": {
|
||||
"client_id": "<uuid>",
|
||||
"status": "alive"
|
||||
},
|
||||
"content": {
|
||||
"screenshot": {
|
||||
"filename": "latest.jpg",
|
||||
"data": "<base64>",
|
||||
"timestamp": "2026-03-30T10:15:41.123456+00:00",
|
||||
"size": 183245
|
||||
}
|
||||
},
|
||||
"runtime": {
|
||||
"system_info": {
|
||||
"hostname": "pi-display-01",
|
||||
"ip": "192.168.1.42",
|
||||
"uptime": 123456.7
|
||||
},
|
||||
"process_health": {
|
||||
"event_id": "evt-123",
|
||||
"event_type": "presentation",
|
||||
"current_process": "impressive",
|
||||
"process_pid": 4123,
|
||||
"process_status": "running",
|
||||
"restart_count": 0
|
||||
}
|
||||
},
|
||||
"metadata": {
|
||||
"schema_version": "2.0",
|
||||
"producer": "simclient",
|
||||
"published_at": "2026-03-30T10:15:42.004321+00:00",
|
||||
"capture": {
|
||||
"type": "periodic",
|
||||
"captured_at": "2026-03-30T10:15:41.123456+00:00",
|
||||
"age_s": 0.9,
|
||||
"triggered": false,
|
||||
"send_immediately": false
|
||||
},
|
||||
"transport": {
|
||||
"qos": 0,
|
||||
"publisher": "simclient"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Step-by-Step: Client-Side First
|
||||
|
||||
1. Create a migration branch.
|
||||
- Example: feature/payload-v2
|
||||
|
||||
2. Freeze a baseline sample from MQTT.
|
||||
- Capture one payload via mosquitto_sub and store it for comparison.
|
||||
|
||||
3. Implement one canonical payload builder.
|
||||
- Centralize JSON assembly in one function only.
|
||||
- Do not duplicate payload construction across code paths.
|
||||
|
||||
4. Add versioned metadata.
|
||||
- Set metadata.schema_version = "2.0".
|
||||
- Add metadata.producer = "simclient".
|
||||
- Add metadata.published_at in UTC ISO format.
|
||||
|
||||
5. Map existing data into grouped blocks.
|
||||
- client_id/status -> message
|
||||
- screenshot object -> content.screenshot
|
||||
- system_info/process_health -> runtime
|
||||
- capture mode and freshness -> metadata.capture
|
||||
|
||||
6. Preserve existing capture semantics.
|
||||
- Keep type values unchanged: periodic, event_start, event_stop.
|
||||
- Keep UTC ISO timestamps.
|
||||
- Keep screenshot encoding and size behavior unchanged.
|
||||
|
||||
7. Optional short-term compatibility mode (recommended for one sprint).
|
||||
- Either:
|
||||
- Keep current legacy fields in parallel, or
|
||||
- Add a legacy block with old field names.
|
||||
- Goal: prevent immediate server breakage while parser updates are merged.
|
||||
|
||||
8. Improve publish logs for verification.
|
||||
- Log schema_version, metadata.capture.type, metadata.capture.age_s.
|
||||
|
||||
9. Validate all three capture paths end-to-end.
|
||||
- periodic capture
|
||||
- event_start trigger capture
|
||||
- event_stop trigger capture
|
||||
|
||||
10. Lock the client contract.
|
||||
- Save one validated JSON sample per capture type.
|
||||
- Use those samples in server parser tests.
|
||||
|
||||
## Step-by-Step: Server-Side Migration
|
||||
|
||||
1. Add support for grouped v2 parsing.
|
||||
- Parse from message/content/runtime/metadata first.
|
||||
|
||||
2. Add fallback parser for legacy payload (temporary).
|
||||
- If grouped keys are absent, parse old top-level keys.
|
||||
|
||||
3. Normalize to one internal server model.
|
||||
- Convert both parser paths into one DTO/entity used by dashboard logic.
|
||||
|
||||
4. Validate required fields.
|
||||
- Required:
|
||||
- message.client_id
|
||||
- message.status
|
||||
- metadata.schema_version
|
||||
- metadata.capture.type
|
||||
- Optional:
|
||||
- runtime.process_health
|
||||
- content.screenshot (if no screenshot available)
|
||||
|
||||
5. Update dashboard consumers.
|
||||
- Read grouped fields from internal model (not raw old keys).
|
||||
|
||||
6. Add migration observability.
|
||||
- Counters:
|
||||
- v2 parse success
|
||||
- legacy fallback usage
|
||||
- parse failures
|
||||
- Warning log for unknown schema_version.
|
||||
|
||||
7. Run mixed-format integration tests.
|
||||
- New client -> new server
|
||||
- Legacy client -> new server (fallback path)
|
||||
|
||||
8. Cut over to v2 preferred.
|
||||
- Keep fallback for short soak period only.
|
||||
|
||||
9. Remove fallback and legacy assumptions.
|
||||
- After stability window, remove old parser path.
|
||||
|
||||
10. Final cleanup.
|
||||
- Keep one schema doc and test fixtures.
|
||||
- Remove temporary compatibility switches.
|
||||
|
||||
## Legacy to v2 Field Mapping
|
||||
|
||||
| Legacy field | v2 field |
|
||||
|---|---|
|
||||
| client_id | message.client_id |
|
||||
| status | message.status |
|
||||
| screenshot | content.screenshot |
|
||||
| screenshot_type | metadata.capture.type |
|
||||
| screenshot_age_s | metadata.capture.age_s |
|
||||
| timestamp | metadata.published_at |
|
||||
| system_info | runtime.system_info |
|
||||
| process_health | runtime.process_health |
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
1. All capture types parse and display correctly.
|
||||
- periodic
|
||||
- event_start
|
||||
- event_stop
|
||||
|
||||
2. Screenshot payload integrity is unchanged.
|
||||
- filename, data, timestamp, size remain valid.
|
||||
|
||||
3. Metadata is centrally visible at message end.
|
||||
- schema_version, capture metadata, transport metadata all inside metadata.
|
||||
|
||||
4. No regression in dashboard update timing.
|
||||
- Triggered screenshots still publish quickly.
|
||||
|
||||
## Suggested Timeline (Dev Only)
|
||||
|
||||
1. Day 1: client v2 payload implementation + local tests
|
||||
2. Day 2: server v2 parser + fallback
|
||||
3. Day 3-5: soak in dev, monitor parse metrics
|
||||
4. Day 6+: remove fallback and finalize v2-only
|
||||
26
mqqt-message baseline.json
Normal file
26
mqqt-message baseline.json
Normal file
File diff suppressed because one or more lines are too long
@@ -664,15 +664,12 @@ def _read_and_clear_meta():
|
||||
return None
|
||||
|
||||
|
||||
def send_screenshot_heartbeat(client, client_id, capture_type: str = "periodic"):
|
||||
"""Send heartbeat with screenshot to server for dashboard monitoring"""
|
||||
try:
|
||||
screenshot_info = get_latest_screenshot()
|
||||
def _build_dashboard_payload(client_id: str, screenshot_info: dict, health: dict, capture_type: str, trigger_meta: dict = None) -> dict:
|
||||
"""Build the dashboard payload in one canonical place.
|
||||
|
||||
# Also read health state and include in heartbeat
|
||||
health = read_health_state()
|
||||
|
||||
# Compute screenshot age so the server can flag stale images
|
||||
Keeping payload assembly centralized avoids schema drift across call sites.
|
||||
"""
|
||||
published_at = datetime.now(timezone.utc).isoformat()
|
||||
screenshot_age_s = None
|
||||
if screenshot_info:
|
||||
try:
|
||||
@@ -681,8 +678,28 @@ def send_screenshot_heartbeat(client, client_id, capture_type: str = "periodic")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
heartbeat_data = {
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
capture_meta = {
|
||||
"type": capture_type,
|
||||
"captured_at": (trigger_meta or {}).get("captured_at") or (screenshot_info or {}).get("timestamp"),
|
||||
"age_s": screenshot_age_s,
|
||||
"triggered": bool(trigger_meta and trigger_meta.get("send_immediately")),
|
||||
"send_immediately": bool(trigger_meta and trigger_meta.get("send_immediately")),
|
||||
}
|
||||
|
||||
process_health_payload = None
|
||||
if health:
|
||||
process_health_payload = {
|
||||
"event_id": health.get("event_id"),
|
||||
"event_type": health.get("event_type"),
|
||||
"current_process": health.get("current_process"),
|
||||
"process_pid": health.get("process_pid"),
|
||||
"process_status": health.get("process_status"),
|
||||
"restart_count": health.get("restart_count", 0)
|
||||
}
|
||||
|
||||
payload = {
|
||||
# Legacy fields kept during migration so existing server parsing remains intact.
|
||||
"timestamp": published_at,
|
||||
"client_id": client_id,
|
||||
"status": "alive",
|
||||
"screenshot_type": capture_type,
|
||||
@@ -692,19 +709,57 @@ def send_screenshot_heartbeat(client, client_id, capture_type: str = "periodic")
|
||||
"hostname": socket.gethostname(),
|
||||
"ip": get_ip(),
|
||||
"uptime": time.time() # Could be replaced with actual uptime
|
||||
}
|
||||
},
|
||||
# New grouped schema (v2-compat)
|
||||
"message": {
|
||||
"client_id": client_id,
|
||||
"status": "alive",
|
||||
},
|
||||
"content": {
|
||||
"screenshot": screenshot_info,
|
||||
},
|
||||
"runtime": {
|
||||
"system_info": {
|
||||
"hostname": socket.gethostname(),
|
||||
"ip": get_ip(),
|
||||
"uptime": time.time(),
|
||||
},
|
||||
"process_health": process_health_payload,
|
||||
},
|
||||
"metadata": {
|
||||
"schema_version": "2.0-compat",
|
||||
"producer": "simclient",
|
||||
"published_at": published_at,
|
||||
"capture": capture_meta,
|
||||
"transport": {
|
||||
"topic": f"infoscreen/{client_id}/dashboard",
|
||||
"qos": 0,
|
||||
"publisher": "simclient",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# Include health info if available (from display_manager)
|
||||
if health:
|
||||
heartbeat_data["process_health"] = {
|
||||
"event_id": health.get("event_id"),
|
||||
"event_type": health.get("event_type"),
|
||||
"current_process": health.get("current_process"),
|
||||
"process_pid": health.get("process_pid"),
|
||||
"process_status": health.get("process_status"),
|
||||
"restart_count": health.get("restart_count", 0)
|
||||
}
|
||||
if process_health_payload:
|
||||
payload["process_health"] = process_health_payload
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
def send_screenshot_heartbeat(client, client_id, capture_type: str = "periodic", trigger_meta: dict = None):
|
||||
"""Send heartbeat with screenshot to server for dashboard monitoring"""
|
||||
try:
|
||||
screenshot_info = get_latest_screenshot()
|
||||
|
||||
# Also read health state and include in heartbeat
|
||||
health = read_health_state()
|
||||
|
||||
heartbeat_data = _build_dashboard_payload(
|
||||
client_id=client_id,
|
||||
screenshot_info=screenshot_info,
|
||||
health=health,
|
||||
capture_type=capture_type,
|
||||
trigger_meta=trigger_meta,
|
||||
)
|
||||
|
||||
# Send to dashboard monitoring topic
|
||||
dashboard_topic = f"infoscreen/{client_id}/dashboard"
|
||||
@@ -757,7 +812,7 @@ def screenshot_service_thread(client, client_id):
|
||||
capture_type = meta['type'] if (triggered and meta) else "periodic"
|
||||
if triggered:
|
||||
logging.info(f"Sending triggered screenshot: type={capture_type}")
|
||||
send_screenshot_heartbeat(client, client_id, capture_type)
|
||||
send_screenshot_heartbeat(client, client_id, capture_type, trigger_meta=meta)
|
||||
last_sent = now
|
||||
except Exception as e:
|
||||
logging.error(f"Screenshot service error: {e}")
|
||||
|
||||
Reference in New Issue
Block a user