diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 98870f0..59dfd01 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -69,6 +69,10 @@ Keep docs synced with code. When you change services/MQTT/API/UTC/env or dev/pro - API stores typed screenshots, tracks latest/priority metadata, and serves priority images via `GET /screenshots//priority`. - Monitoring overview exposes screenshot priority state (`latestScreenshotType`, `priorityScreenshotType`, `priorityScreenshotReceivedAt`, `hasActivePriorityScreenshot`) and `summary.activePriorityScreenshots`. - Monitoring UI shows screenshot type badges and switches to faster refresh while priority screenshots are active. + - **MQTT Dashboard Payload v2 Cutover (no version bump)**: + - Dashboard payload parsing in `listener/listener.py` is now v2-only (`message`, `content`, `runtime`, `metadata`). + - Legacy top-level dashboard fallback was removed after migration soak (`legacy_fallback=0`). + - Listener observability summarizes parser health using `v2_success` and `parse_failures` counters. - **Presentation Flags Persistence Fix**: - Fixed persistence for presentation `page_progress` and `auto_progress` to ensure values are reliably stored and returned across create/update paths and detached occurrences @@ -146,6 +150,7 @@ Keep docs synced with code. When you change services/MQTT/API/UTC/env or dev/pro - Per-client group assignment (retained): `infoscreen/{uuid}/group_id` via `server/mqtt_helper.py`. - Client logs: `infoscreen/{uuid}/logs/{error|warn|info}` with JSON payload (timestamp, message, context); QoS 1 for ERROR/WARN, QoS 0 for INFO. - Client health: `infoscreen/{uuid}/health` with metrics (expected_state, actual_state, health_metrics); QoS 0, published every 5 seconds. + - Dashboard screenshots: `infoscreen/{uuid}/dashboard` uses grouped v2 payload blocks (`message`, `content`, `runtime`, `metadata`); listener reads screenshot data from `content.screenshot` and capture type from `metadata.capture.type`. - Screenshots: server-side folder `server/screenshots/`; API serves `/screenshots/{uuid}.jpg` (latest) and `/screenshots/{uuid}/priority` (active high-priority fallback to latest). - Dev Container guidance: If extensions reappear inside the container, remove UI-only extensions from `devcontainer.json` `extensions` and map them in `remote.extensionKind` as `"ui"`. diff --git a/MQTT_PAYLOAD_MIGRATION_GUIDE.md b/MQTT_PAYLOAD_MIGRATION_GUIDE.md new file mode 100644 index 0000000..d53719f --- /dev/null +++ b/MQTT_PAYLOAD_MIGRATION_GUIDE.md @@ -0,0 +1,194 @@ +# MQTT Payload Migration Guide + +## Purpose +This guide describes a practical migration from the current dashboard screenshot payload to a grouped schema, with client-side implementation first and server-side migration second. + +## Scope +- Environment: development and alpha systems (no production installs) +- Message topic: infoscreen//dashboard +- Capture types to preserve: periodic, event_start, event_stop + +## Target Schema (v2) +The canonical message should be grouped into four logical blocks in this order: + +1. message +2. content +3. runtime +4. metadata + +Example shape: + +```json +{ + "message": { + "client_id": "", + "status": "alive" + }, + "content": { + "screenshot": { + "filename": "latest.jpg", + "data": "", + "timestamp": "2026-03-30T10:15:41.123456+00:00", + "size": 183245 + } + }, + "runtime": { + "system_info": { + "hostname": "pi-display-01", + "ip": "192.168.1.42", + "uptime": 123456.7 + }, + "process_health": { + "event_id": "evt-123", + "event_type": "presentation", + "current_process": "impressive", + "process_pid": 4123, + "process_status": "running", + "restart_count": 0 + } + }, + "metadata": { + "schema_version": "2.0", + "producer": "simclient", + "published_at": "2026-03-30T10:15:42.004321+00:00", + "capture": { + "type": "periodic", + "captured_at": "2026-03-30T10:15:41.123456+00:00", + "age_s": 0.9, + "triggered": false, + "send_immediately": false + }, + "transport": { + "qos": 0, + "publisher": "simclient" + } + } +} +``` + +## Step-by-Step: Client-Side First + +1. Create a migration branch. +- Example: feature/payload-v2 + +2. Freeze a baseline sample from MQTT. +- Capture one payload via mosquitto_sub and store it for comparison. + +3. Implement one canonical payload builder. +- Centralize JSON assembly in one function only. +- Do not duplicate payload construction across code paths. + +4. Add versioned metadata. +- Set metadata.schema_version = "2.0". +- Add metadata.producer = "simclient". +- Add metadata.published_at in UTC ISO format. + +5. Map existing data into grouped blocks. +- client_id/status -> message +- screenshot object -> content.screenshot +- system_info/process_health -> runtime +- capture mode and freshness -> metadata.capture + +6. Preserve existing capture semantics. +- Keep type values unchanged: periodic, event_start, event_stop. +- Keep UTC ISO timestamps. +- Keep screenshot encoding and size behavior unchanged. + +7. Optional short-term compatibility mode (recommended for one sprint). +- Either: + - Keep current legacy fields in parallel, or + - Add a legacy block with old field names. +- Goal: prevent immediate server breakage while parser updates are merged. + +8. Improve publish logs for verification. +- Log schema_version, metadata.capture.type, metadata.capture.age_s. + +9. Validate all three capture paths end-to-end. +- periodic capture +- event_start trigger capture +- event_stop trigger capture + +10. Lock the client contract. +- Save one validated JSON sample per capture type. +- Use those samples in server parser tests. + +## Step-by-Step: Server-Side Migration + +1. Add support for grouped v2 parsing. +- Parse from message/content/runtime/metadata first. + +2. Add fallback parser for legacy payload (temporary). +- If grouped keys are absent, parse old top-level keys. + +3. Normalize to one internal server model. +- Convert both parser paths into one DTO/entity used by dashboard logic. + +4. Validate required fields. +- Required: + - message.client_id + - message.status + - metadata.schema_version + - metadata.capture.type +- Optional: + - runtime.process_health + - content.screenshot (if no screenshot available) + +5. Update dashboard consumers. +- Read grouped fields from internal model (not raw old keys). + +6. Add migration observability. +- Counters: + - v2 parse success + - legacy fallback usage + - parse failures +- Warning log for unknown schema_version. + +7. Run mixed-format integration tests. +- New client -> new server +- Legacy client -> new server (fallback path) + +8. Cut over to v2 preferred. +- Keep fallback for short soak period only. + +9. Remove fallback and legacy assumptions. +- After stability window, remove old parser path. + +10. Final cleanup. +- Keep one schema doc and test fixtures. +- Remove temporary compatibility switches. + +## Legacy to v2 Field Mapping + +| Legacy field | v2 field | +|---|---| +| client_id | message.client_id | +| status | message.status | +| screenshot | content.screenshot | +| screenshot_type | metadata.capture.type | +| screenshot_age_s | metadata.capture.age_s | +| timestamp | metadata.published_at | +| system_info | runtime.system_info | +| process_health | runtime.process_health | + +## Acceptance Criteria + +1. All capture types parse and display correctly. +- periodic +- event_start +- event_stop + +2. Screenshot payload integrity is unchanged. +- filename, data, timestamp, size remain valid. + +3. Metadata is centrally visible at message end. +- schema_version, capture metadata, transport metadata all inside metadata. + +4. No regression in dashboard update timing. +- Triggered screenshots still publish quickly. + +## Suggested Timeline (Dev Only) + +1. Day 1: client v2 payload implementation + local tests +2. Day 2: server v2 parser + fallback +3. Day 3-5: soak in dev, monitor parse metrics +4. Day 6+: remove fallback and finalize v2-only diff --git a/README.md b/README.md index b46f3ba..147c347 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ A comprehensive multi-service digital signage solution for educational instituti Data flow summary: - Listener: consumes discovery and heartbeat messages from the MQTT Broker and updates the API Server (client registration/heartbeats). -- Listener screenshot flow: consumes `infoscreen/{uuid}/screenshot` and `infoscreen/{uuid}/dashboard`, extracts `image`/`timestamp`/`screenshot_type` (`periodic`, `event_start`, `event_stop`) and forwards to `POST /api/clients/{uuid}/screenshot`. +- Listener screenshot flow: consumes `infoscreen/{uuid}/screenshot` and `infoscreen/{uuid}/dashboard`. Dashboard messages use grouped v2 schema (`message`, `content`, `runtime`, `metadata`); screenshot data is read from `content.screenshot`, capture type from `metadata.capture.type`, and forwarded to `POST /api/clients/{uuid}/screenshot`. - Scheduler: reads events from the API Server and publishes only currently active content to the MQTT Broker (retained topics per group). When a group has no active events, the scheduler clears its retained topic by publishing an empty list. All time comparisons are done in UTC; any naive timestamps are normalized. - Clients: send discovery/heartbeat via the MQTT Broker (handled by the Listener) and receive content from the Scheduler via MQTT. - Worker: receives conversion commands directly from the API Server and reports results/status back to the API (no MQTT involved). @@ -228,6 +228,7 @@ For detailed deployment instructions, see: - Monitoring system: End-to-end monitoring is now implemented. The listener ingests `logs/*` and `health` MQTT topics, the API exposes monitoring endpoints (`/api/client-logs/monitoring-overview`, `/api/client-logs/recent-errors`, `/api/client-logs//logs`), and the superadmin dashboard page shows live client status, screenshots, and recent errors. - Screenshot priority flow: Screenshot payloads now support `screenshot_type` (`periodic`, `event_start`, `event_stop`). `event_start` and `event_stop` are treated as high-priority screenshots; the API stores typed screenshots, maintains priority metadata, and serves active priority screenshots through `/screenshots/{uuid}/priority`. +- MQTT dashboard payload v2 cutover: Listener parsing is now v2-only for dashboard JSON payloads (`message/content/runtime/metadata`). Legacy top-level dashboard fallback has been removed after migration completion; parser observability tracks `v2_success` and `parse_failures`. - Presentation persistence fix: Fixed persistence of presentation flags so `page_progress` and `auto_progress` are reliably stored and returned for create/update flows and detached occurrences. - Additional improvements: Video/streaming, scheduler metadata, settings defaults, and UI refinements remain documented in the detailed sections below. diff --git a/listener/listener.py b/listener/listener.py index 63c8dce..d730e6d 100644 --- a/listener/listener.py +++ b/listener/listener.py @@ -32,6 +32,13 @@ Session = sessionmaker(bind=engine) # API configuration API_BASE_URL = os.getenv("API_BASE_URL", "http://server:8000") +# Dashboard payload migration observability +DASHBOARD_METRICS_LOG_EVERY = int(os.getenv("DASHBOARD_METRICS_LOG_EVERY", "5")) +DASHBOARD_PARSE_METRICS = { + "v2_success": 0, + "parse_failures": 0, +} + def normalize_process_status(value): if value is None: @@ -155,48 +162,157 @@ def apply_monitoring_update(client_obj, *, event_id=None, process_name=None, pro client_obj.last_screenshot_analyzed = candidate -def _extract_image_and_timestamp(data): - image_value = None - timestamp_value = None - screenshot_type = None +def _normalize_screenshot_type(raw_type): + if raw_type is None: + return None + normalized = str(raw_type).strip().lower() + if normalized in ("periodic", "event_start", "event_stop"): + return normalized + return None + + +def _classify_dashboard_payload(data): + """ + Classify dashboard payload into migration categories for observability. + """ if not isinstance(data, dict): - return None, None, None + return "parse_failures", None - screenshot_obj = data.get("screenshot") if isinstance(data.get("screenshot"), dict) else None + message_obj = data.get("message") if isinstance(data.get("message"), dict) else None + content_obj = data.get("content") if isinstance(data.get("content"), dict) else None metadata_obj = data.get("metadata") if isinstance(data.get("metadata"), dict) else None - screenshot_meta_obj = screenshot_obj.get("metadata") if screenshot_obj and isinstance(screenshot_obj.get("metadata"), dict) else None + schema_version = metadata_obj.get("schema_version") if metadata_obj else None - for container in (data, screenshot_obj, metadata_obj, screenshot_meta_obj): + # v2 detection: grouped blocks available with metadata. + if message_obj is not None and content_obj is not None and metadata_obj is not None: + return "v2_success", schema_version + + return "parse_failures", schema_version + + +def _record_dashboard_parse_metric(mode, uuid, schema_version=None, reason=None): + if mode not in DASHBOARD_PARSE_METRICS: + mode = "parse_failures" + + DASHBOARD_PARSE_METRICS[mode] += 1 + total = sum(DASHBOARD_PARSE_METRICS.values()) + + if mode == "v2_success": + if schema_version is None: + logging.warning(f"Dashboard payload from {uuid}: missing metadata.schema_version for grouped payload") + else: + version_text = str(schema_version).strip() + if not version_text.startswith("2"): + logging.warning(f"Dashboard payload from {uuid}: unknown schema_version={version_text}") + + if mode == "parse_failures": + if reason: + logging.warning(f"Dashboard payload parse failure for {uuid}: {reason}") + else: + logging.warning(f"Dashboard payload parse failure for {uuid}") + + if DASHBOARD_METRICS_LOG_EVERY > 0 and total % DASHBOARD_METRICS_LOG_EVERY == 0: + logging.info( + "Dashboard payload metrics: " + f"total={total}, " + f"v2_success={DASHBOARD_PARSE_METRICS['v2_success']}, " + f"parse_failures={DASHBOARD_PARSE_METRICS['parse_failures']}" + ) + + +def _validate_v2_required_fields(data, uuid): + """ + Soft validation of required v2 fields for grouped dashboard payloads. + Logs a WARNING for each missing field. Never drops the message. + """ + message_obj = data.get("message") if isinstance(data.get("message"), dict) else {} + metadata_obj = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} + capture_obj = metadata_obj.get("capture") if isinstance(metadata_obj.get("capture"), dict) else {} + + missing = [] + if not message_obj.get("client_id"): + missing.append("message.client_id") + if not message_obj.get("status"): + missing.append("message.status") + if not metadata_obj.get("schema_version"): + missing.append("metadata.schema_version") + if not capture_obj.get("type"): + missing.append("metadata.capture.type") + + if missing: + logging.warning( + f"Dashboard v2 payload from {uuid} missing required fields: {', '.join(missing)}" + ) + + +def _extract_dashboard_payload_fields(data): + """ + Parse dashboard payload fields from the grouped v2 schema only. + """ + if not isinstance(data, dict): + return { + "image": None, + "timestamp": None, + "screenshot_type": None, + "status": None, + "process_health": {}, + } + + # v2 grouped payload blocks + message_obj = data.get("message") if isinstance(data.get("message"), dict) else None + content_obj = data.get("content") if isinstance(data.get("content"), dict) else None + runtime_obj = data.get("runtime") if isinstance(data.get("runtime"), dict) else None + metadata_obj = data.get("metadata") if isinstance(data.get("metadata"), dict) else None + + screenshot_obj = None + if isinstance(content_obj, dict) and isinstance(content_obj.get("screenshot"), dict): + screenshot_obj = content_obj.get("screenshot") + + capture_obj = metadata_obj.get("capture") if metadata_obj and isinstance(metadata_obj.get("capture"), dict) else None + + # Screenshot type comes from v2 metadata.capture.type. + screenshot_type = _normalize_screenshot_type(capture_obj.get("type") if capture_obj else None) + + # Image from v2 content.screenshot. + image_value = None + for container in (screenshot_obj,): if not isinstance(container, dict): continue - raw_type = container.get("screenshot_type") or container.get("screenshotType") - if raw_type is not None: - normalized_type = str(raw_type).strip().lower() - if normalized_type in ("periodic", "event_start", "event_stop"): - screenshot_type = normalized_type - break - - for key in ("image", "data"): - if isinstance(data.get(key), str) and data.get(key): - image_value = data.get(key) - break - if image_value is None and screenshot_obj is not None: - for key in ("image", "data"): - if isinstance(screenshot_obj.get(key), str) and screenshot_obj.get(key): - image_value = screenshot_obj.get(key) - break - - for container in (data, screenshot_obj, metadata_obj, screenshot_meta_obj): - if not isinstance(container, dict): - continue - for key in ("timestamp", "captured_at", "capture_time", "created_at"): + for key in ("data", "image"): value = container.get(key) - if value is not None: - timestamp_value = value - return image_value, timestamp_value, screenshot_type + if isinstance(value, str) and value: + image_value = value + break + if image_value is not None: + break - return image_value, timestamp_value, screenshot_type + # Timestamp precedence: v2 screenshot.timestamp -> capture.captured_at -> metadata.published_at + timestamp_value = None + timestamp_candidates = [ + screenshot_obj.get("timestamp") if screenshot_obj else None, + capture_obj.get("captured_at") if capture_obj else None, + metadata_obj.get("published_at") if metadata_obj else None, + ] + + for value in timestamp_candidates: + if value is not None: + timestamp_value = value + break + + # Monitoring fields from v2 message/runtime. + status_value = (message_obj or {}).get("status") + process_health = (runtime_obj or {}).get("process_health") + if not isinstance(process_health, dict): + process_health = {} + + return { + "image": image_value, + "timestamp": timestamp_value, + "screenshot_type": screenshot_type, + "status": status_value, + "process_health": process_health, + } def handle_screenshot(uuid, payload): @@ -208,7 +324,10 @@ def handle_screenshot(uuid, payload): # Try to parse as JSON first try: data = json.loads(payload.decode()) - image_b64, timestamp_value, screenshot_type = _extract_image_and_timestamp(data) + extracted = _extract_dashboard_payload_fields(data) + image_b64 = extracted["image"] + timestamp_value = extracted["timestamp"] + screenshot_type = extracted["screenshot_type"] if image_b64: # Payload is JSON with base64 image api_payload = {"image": image_b64} @@ -274,22 +393,25 @@ def on_message(client, userdata, msg): try: payload_text = msg.payload.decode() data = json.loads(payload_text) - image_b64, ts_value, screenshot_type = _extract_image_and_timestamp(data) + parse_mode, schema_version = _classify_dashboard_payload(data) + _record_dashboard_parse_metric(parse_mode, uuid, schema_version=schema_version) + if parse_mode == "v2_success": + _validate_v2_required_fields(data, uuid) + + extracted = _extract_dashboard_payload_fields(data) + image_b64 = extracted["image"] + ts_value = extracted["timestamp"] + screenshot_type = extracted["screenshot_type"] if image_b64: logging.debug(f"Dashboard enthält Screenshot für {uuid}; Weiterleitung an API") - dashboard_payload = {"image": image_b64} - if ts_value is not None: - dashboard_payload["timestamp"] = ts_value - if screenshot_type: - dashboard_payload["screenshot_type"] = screenshot_type - api_payload = json.dumps(dashboard_payload).encode("utf-8") - handle_screenshot(uuid, api_payload) + # Forward original v2 payload so handle_screenshot can parse grouped fields. + handle_screenshot(uuid, msg.payload) # Update last_alive if status present - if data.get("status") == "alive": + if extracted["status"] == "alive": session = Session() client_obj = session.query(Client).filter_by(uuid=uuid).first() if client_obj: - process_health = data.get('process_health') or {} + process_health = extracted["process_health"] apply_monitoring_update( client_obj, last_seen=datetime.datetime.now(datetime.UTC), @@ -301,6 +423,7 @@ def on_message(client, userdata, msg): session.commit() session.close() except Exception as e: + _record_dashboard_parse_metric("parse_failures", uuid, reason=str(e)) logging.error(f"Fehler beim Verarbeiten des Dashboard-Payloads von {uuid}: {e}") return diff --git a/listener/test_listener_parser.py b/listener/test_listener_parser.py new file mode 100644 index 0000000..b1f901a --- /dev/null +++ b/listener/test_listener_parser.py @@ -0,0 +1,378 @@ +""" +Mixed-format integration tests for the dashboard payload parser. + +Tests cover: + - Legacy top-level payload is rejected (v2-only mode) + - v2 grouped payload: periodic capture + - v2 grouped payload: event_start capture + - v2 grouped payload: event_stop capture + - Classification into v2_success / parse_failures + - Soft required-field validation (v2 only, never drops message) + - Edge cases: missing image, missing status, non-dict payload +""" + +import sys +import os +import logging +import importlib.util + +# listener/ has no __init__.py — load the module directly from its file path +os.environ.setdefault("DB_CONN", "sqlite:///:memory:") # prevent DB engine errors on import +_LISTENER_PATH = os.path.join(os.path.dirname(__file__), "listener.py") +_spec = importlib.util.spec_from_file_location("listener_module", _LISTENER_PATH) +_mod = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_mod) + +_extract_dashboard_payload_fields = _mod._extract_dashboard_payload_fields +_classify_dashboard_payload = _mod._classify_dashboard_payload +_validate_v2_required_fields = _mod._validate_v2_required_fields +_normalize_screenshot_type = _mod._normalize_screenshot_type +DASHBOARD_PARSE_METRICS = _mod.DASHBOARD_PARSE_METRICS + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +IMAGE_B64 = "aGVsbG8=" # base64("hello") + +LEGACY_PAYLOAD = { + "client_id": "uuid-legacy", + "status": "alive", + "screenshot": { + "data": IMAGE_B64, + "timestamp": "2026-03-30T10:00:00+00:00", + }, + "screenshot_type": "periodic", + "process_health": { + "current_process": "impressive", + "process_pid": 1234, + "process_status": "running", + "event_id": 42, + }, +} + +def _make_v2(capture_type): + return { + "message": { + "client_id": "uuid-v2", + "status": "alive", + }, + "content": { + "screenshot": { + "filename": "latest.jpg", + "data": IMAGE_B64, + "timestamp": "2026-03-30T10:15:41.123456+00:00", + "size": 6, + } + }, + "runtime": { + "system_info": { + "hostname": "pi-display-01", + "ip": "192.168.1.42", + "uptime": 12345.0, + }, + "process_health": { + "event_id": "evt-7", + "event_type": "presentation", + "current_process": "impressive", + "process_pid": 4123, + "process_status": "running", + "restart_count": 0, + }, + }, + "metadata": { + "schema_version": "2.0", + "producer": "simclient", + "published_at": "2026-03-30T10:15:42.004321+00:00", + "capture": { + "type": capture_type, + "captured_at": "2026-03-30T10:15:41.123456+00:00", + "age_s": 0.9, + "triggered": capture_type != "periodic", + "send_immediately": capture_type != "periodic", + }, + "transport": {"qos": 0, "publisher": "simclient"}, + }, + } + +V2_PERIODIC = _make_v2("periodic") +V2_EVT_START = _make_v2("event_start") +V2_EVT_STOP = _make_v2("event_stop") + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def assert_eq(label, actual, expected): + assert actual == expected, f"FAIL [{label}]: expected {expected!r}, got {actual!r}" + +def assert_not_none(label, actual): + assert actual is not None, f"FAIL [{label}]: expected non-None, got None" + +def assert_none(label, actual): + assert actual is None, f"FAIL [{label}]: expected None, got {actual!r}" + +def assert_warns(label, fn, substring): + """Assert that fn() emits a logging.WARNING containing substring.""" + records = [] + handler = logging.handlers_collector(records) + logger = logging.getLogger() + logger.addHandler(handler) + try: + fn() + finally: + logger.removeHandler(handler) + warnings = [r.getMessage() for r in records if r.levelno == logging.WARNING] + assert any(substring in w for w in warnings), ( + f"FAIL [{label}]: no WARNING containing {substring!r} found in {warnings}" + ) + + +class _CapturingHandler(logging.Handler): + def __init__(self, records): + super().__init__() + self._records = records + + def emit(self, record): + self._records.append(record) + + +def capture_warnings(fn): + """Run fn(), return list of WARNING message strings.""" + records = [] + handler = _CapturingHandler(records) + logger = logging.getLogger() + logger.addHandler(handler) + try: + fn() + finally: + logger.removeHandler(handler) + return [r.getMessage() for r in records if r.levelno == logging.WARNING] + + +# --------------------------------------------------------------------------- +# Tests: _normalize_screenshot_type +# --------------------------------------------------------------------------- + +def test_normalize_known_types(): + for t in ("periodic", "event_start", "event_stop"): + assert_eq(f"normalize_{t}", _normalize_screenshot_type(t), t) + assert_eq(f"normalize_{t}_upper", _normalize_screenshot_type(t.upper()), t) + +def test_normalize_unknown_returns_none(): + assert_none("normalize_unknown", _normalize_screenshot_type("unknown")) + assert_none("normalize_none", _normalize_screenshot_type(None)) + assert_none("normalize_empty", _normalize_screenshot_type("")) + +# --------------------------------------------------------------------------- +# Tests: _classify_dashboard_payload +# --------------------------------------------------------------------------- + +def test_classify_legacy(): + mode, ver = _classify_dashboard_payload(LEGACY_PAYLOAD) + assert_eq("classify_legacy_mode", mode, "parse_failures") + assert_none("classify_legacy_version", ver) + +def test_classify_v2_periodic(): + mode, ver = _classify_dashboard_payload(V2_PERIODIC) + assert_eq("classify_v2_periodic_mode", mode, "v2_success") + assert_eq("classify_v2_periodic_version", ver, "2.0") + +def test_classify_v2_event_start(): + mode, ver = _classify_dashboard_payload(V2_EVT_START) + assert_eq("classify_v2_event_start_mode", mode, "v2_success") + +def test_classify_v2_event_stop(): + mode, ver = _classify_dashboard_payload(V2_EVT_STOP) + assert_eq("classify_v2_event_stop_mode", mode, "v2_success") + +def test_classify_non_dict(): + mode, ver = _classify_dashboard_payload("not a dict") + assert_eq("classify_non_dict", mode, "parse_failures") + +def test_classify_empty_dict(): + mode, ver = _classify_dashboard_payload({}) + assert_eq("classify_empty_dict", mode, "parse_failures") + +# --------------------------------------------------------------------------- +# Tests: _extract_dashboard_payload_fields — legacy payload rejected in v2-only mode +# --------------------------------------------------------------------------- + +def test_legacy_image_not_extracted(): + r = _extract_dashboard_payload_fields(LEGACY_PAYLOAD) + assert_none("legacy_image", r["image"]) + +def test_legacy_screenshot_type_missing(): + r = _extract_dashboard_payload_fields(LEGACY_PAYLOAD) + assert_none("legacy_screenshot_type", r["screenshot_type"]) + +def test_legacy_status_missing(): + r = _extract_dashboard_payload_fields(LEGACY_PAYLOAD) + assert_none("legacy_status", r["status"]) + +def test_legacy_process_health_empty(): + r = _extract_dashboard_payload_fields(LEGACY_PAYLOAD) + assert_eq("legacy_process_health", r["process_health"], {}) + +def test_legacy_timestamp_missing(): + r = _extract_dashboard_payload_fields(LEGACY_PAYLOAD) + assert_none("legacy_timestamp", r["timestamp"]) + +# --------------------------------------------------------------------------- +# Tests: _extract_dashboard_payload_fields — v2 periodic +# --------------------------------------------------------------------------- + +def test_v2_periodic_image(): + r = _extract_dashboard_payload_fields(V2_PERIODIC) + assert_eq("v2_periodic_image", r["image"], IMAGE_B64) + +def test_v2_periodic_screenshot_type(): + r = _extract_dashboard_payload_fields(V2_PERIODIC) + assert_eq("v2_periodic_type", r["screenshot_type"], "periodic") + +def test_v2_periodic_status(): + r = _extract_dashboard_payload_fields(V2_PERIODIC) + assert_eq("v2_periodic_status", r["status"], "alive") + +def test_v2_periodic_process_health(): + r = _extract_dashboard_payload_fields(V2_PERIODIC) + assert_eq("v2_periodic_pid", r["process_health"]["process_pid"], 4123) + assert_eq("v2_periodic_process", r["process_health"]["current_process"], "impressive") + +def test_v2_periodic_timestamp_prefers_screenshot(): + r = _extract_dashboard_payload_fields(V2_PERIODIC) + # screenshot.timestamp must take precedence over capture.captured_at / published_at + assert_eq("v2_periodic_ts", r["timestamp"], "2026-03-30T10:15:41.123456+00:00") + +# --------------------------------------------------------------------------- +# Tests: _extract_dashboard_payload_fields — v2 event_start +# --------------------------------------------------------------------------- + +def test_v2_event_start_type(): + r = _extract_dashboard_payload_fields(V2_EVT_START) + assert_eq("v2_event_start_type", r["screenshot_type"], "event_start") + +def test_v2_event_start_image(): + r = _extract_dashboard_payload_fields(V2_EVT_START) + assert_eq("v2_event_start_image", r["image"], IMAGE_B64) + +# --------------------------------------------------------------------------- +# Tests: _extract_dashboard_payload_fields — v2 event_stop +# --------------------------------------------------------------------------- + +def test_v2_event_stop_type(): + r = _extract_dashboard_payload_fields(V2_EVT_STOP) + assert_eq("v2_event_stop_type", r["screenshot_type"], "event_stop") + +def test_v2_event_stop_image(): + r = _extract_dashboard_payload_fields(V2_EVT_STOP) + assert_eq("v2_event_stop_image", r["image"], IMAGE_B64) + +# --------------------------------------------------------------------------- +# Tests: _extract_dashboard_payload_fields — edge cases +# --------------------------------------------------------------------------- + +def test_non_dict_returns_nulls(): + r = _extract_dashboard_payload_fields("bad") + assert_none("non_dict_image", r["image"]) + assert_none("non_dict_type", r["screenshot_type"]) + assert_none("non_dict_status", r["status"]) + +def test_missing_image_returns_none(): + payload = {**V2_PERIODIC, "content": {"screenshot": {"timestamp": "2026-03-30T10:00:00+00:00"}}} + r = _extract_dashboard_payload_fields(payload) + assert_none("missing_image", r["image"]) + +def test_missing_process_health_returns_empty_dict(): + import copy + payload = copy.deepcopy(V2_PERIODIC) + del payload["runtime"]["process_health"] + r = _extract_dashboard_payload_fields(payload) + assert_eq("missing_ph", r["process_health"], {}) + +def test_timestamp_fallback_to_captured_at_when_no_screenshot_ts(): + import copy + payload = copy.deepcopy(V2_PERIODIC) + del payload["content"]["screenshot"]["timestamp"] + r = _extract_dashboard_payload_fields(payload) + assert_eq("ts_fallback_captured_at", r["timestamp"], "2026-03-30T10:15:41.123456+00:00") + +def test_timestamp_fallback_to_published_at_when_no_capture_ts(): + import copy + payload = copy.deepcopy(V2_PERIODIC) + del payload["content"]["screenshot"]["timestamp"] + del payload["metadata"]["capture"]["captured_at"] + r = _extract_dashboard_payload_fields(payload) + assert_eq("ts_fallback_published_at", r["timestamp"], "2026-03-30T10:15:42.004321+00:00") + +# --------------------------------------------------------------------------- +# Tests: _validate_v2_required_fields (soft — never raises) +# --------------------------------------------------------------------------- + +def test_v2_valid_payload_no_warnings(): + warnings = capture_warnings(lambda: _validate_v2_required_fields(V2_PERIODIC, "uuid-v2")) + assert warnings == [], f"FAIL: unexpected warnings for valid payload: {warnings}" + +def test_v2_missing_client_id_warns(): + import copy + payload = copy.deepcopy(V2_PERIODIC) + del payload["message"]["client_id"] + warnings = capture_warnings(lambda: _validate_v2_required_fields(payload, "uuid-v2")) + assert any("message.client_id" in w for w in warnings), f"FAIL: {warnings}" + +def test_v2_missing_status_warns(): + import copy + payload = copy.deepcopy(V2_PERIODIC) + del payload["message"]["status"] + warnings = capture_warnings(lambda: _validate_v2_required_fields(payload, "uuid-v2")) + assert any("message.status" in w for w in warnings), f"FAIL: {warnings}" + +def test_v2_missing_schema_version_warns(): + import copy + payload = copy.deepcopy(V2_PERIODIC) + del payload["metadata"]["schema_version"] + warnings = capture_warnings(lambda: _validate_v2_required_fields(payload, "uuid-v2")) + assert any("metadata.schema_version" in w for w in warnings), f"FAIL: {warnings}" + +def test_v2_missing_capture_type_warns(): + import copy + payload = copy.deepcopy(V2_PERIODIC) + del payload["metadata"]["capture"]["type"] + warnings = capture_warnings(lambda: _validate_v2_required_fields(payload, "uuid-v2")) + assert any("metadata.capture.type" in w for w in warnings), f"FAIL: {warnings}" + +def test_v2_multiple_missing_fields_all_reported(): + import copy + payload = copy.deepcopy(V2_PERIODIC) + del payload["message"]["client_id"] + del payload["metadata"]["capture"]["type"] + warnings = capture_warnings(lambda: _validate_v2_required_fields(payload, "uuid-v2")) + assert len(warnings) == 1, f"FAIL: expected 1 combined warning, got {warnings}" + assert "message.client_id" in warnings[0], f"FAIL: {warnings}" + assert "metadata.capture.type" in warnings[0], f"FAIL: {warnings}" + +# --------------------------------------------------------------------------- +# Runner +# --------------------------------------------------------------------------- + +def run_all(): + tests = {k: v for k, v in globals().items() if k.startswith("test_") and callable(v)} + passed = failed = 0 + for name, fn in sorted(tests.items()): + try: + fn() + print(f" PASS {name}") + passed += 1 + except AssertionError as e: + print(f" FAIL {name}: {e}") + failed += 1 + except Exception as e: + print(f" ERROR {name}: {type(e).__name__}: {e}") + failed += 1 + print(f"\n{passed} passed, {failed} failed out of {passed + failed} tests") + return failed == 0 + + +if __name__ == "__main__": + ok = run_all() + sys.exit(0 if ok else 1)