feat(listener): migrate dashboard MQTT payload to v2-only grouped schema

- Replace _extract_image_and_timestamp() with v2-only _extract_dashboard_payload_fields()
- Add _classify_dashboard_payload() + parse metrics (v2_success, parse_failures)
- Add soft _validate_v2_required_fields() for warning-only field checks
- Remove legacy fallback after soak confirmed legacy_fallback=0
- Fix: forward msg.payload directly to handle_screenshot() to avoid re-wrap bug
- Add 33 parser tests in listener/test_listener_parser.py
- Add MQTT_PAYLOAD_MIGRATION_GUIDE.md documenting the 10-step migration process
- Update README.md and copilot-instructions.md to reflect v2-only schema
This commit is contained in:
2026-03-30 14:18:34 +00:00
parent 90ccbdf920
commit a58e9d3fca
5 changed files with 746 additions and 45 deletions

View File

@@ -32,6 +32,13 @@ Session = sessionmaker(bind=engine)
# API configuration
API_BASE_URL = os.getenv("API_BASE_URL", "http://server:8000")
# Dashboard payload migration observability
DASHBOARD_METRICS_LOG_EVERY = int(os.getenv("DASHBOARD_METRICS_LOG_EVERY", "5"))
DASHBOARD_PARSE_METRICS = {
"v2_success": 0,
"parse_failures": 0,
}
def normalize_process_status(value):
if value is None:
@@ -155,48 +162,157 @@ def apply_monitoring_update(client_obj, *, event_id=None, process_name=None, pro
client_obj.last_screenshot_analyzed = candidate
def _extract_image_and_timestamp(data):
image_value = None
timestamp_value = None
screenshot_type = None
def _normalize_screenshot_type(raw_type):
if raw_type is None:
return None
normalized = str(raw_type).strip().lower()
if normalized in ("periodic", "event_start", "event_stop"):
return normalized
return None
def _classify_dashboard_payload(data):
"""
Classify dashboard payload into migration categories for observability.
"""
if not isinstance(data, dict):
return None, None, None
return "parse_failures", None
screenshot_obj = data.get("screenshot") if isinstance(data.get("screenshot"), dict) else None
message_obj = data.get("message") if isinstance(data.get("message"), dict) else None
content_obj = data.get("content") if isinstance(data.get("content"), dict) else None
metadata_obj = data.get("metadata") if isinstance(data.get("metadata"), dict) else None
screenshot_meta_obj = screenshot_obj.get("metadata") if screenshot_obj and isinstance(screenshot_obj.get("metadata"), dict) else None
schema_version = metadata_obj.get("schema_version") if metadata_obj else None
for container in (data, screenshot_obj, metadata_obj, screenshot_meta_obj):
# v2 detection: grouped blocks available with metadata.
if message_obj is not None and content_obj is not None and metadata_obj is not None:
return "v2_success", schema_version
return "parse_failures", schema_version
def _record_dashboard_parse_metric(mode, uuid, schema_version=None, reason=None):
if mode not in DASHBOARD_PARSE_METRICS:
mode = "parse_failures"
DASHBOARD_PARSE_METRICS[mode] += 1
total = sum(DASHBOARD_PARSE_METRICS.values())
if mode == "v2_success":
if schema_version is None:
logging.warning(f"Dashboard payload from {uuid}: missing metadata.schema_version for grouped payload")
else:
version_text = str(schema_version).strip()
if not version_text.startswith("2"):
logging.warning(f"Dashboard payload from {uuid}: unknown schema_version={version_text}")
if mode == "parse_failures":
if reason:
logging.warning(f"Dashboard payload parse failure for {uuid}: {reason}")
else:
logging.warning(f"Dashboard payload parse failure for {uuid}")
if DASHBOARD_METRICS_LOG_EVERY > 0 and total % DASHBOARD_METRICS_LOG_EVERY == 0:
logging.info(
"Dashboard payload metrics: "
f"total={total}, "
f"v2_success={DASHBOARD_PARSE_METRICS['v2_success']}, "
f"parse_failures={DASHBOARD_PARSE_METRICS['parse_failures']}"
)
def _validate_v2_required_fields(data, uuid):
"""
Soft validation of required v2 fields for grouped dashboard payloads.
Logs a WARNING for each missing field. Never drops the message.
"""
message_obj = data.get("message") if isinstance(data.get("message"), dict) else {}
metadata_obj = data.get("metadata") if isinstance(data.get("metadata"), dict) else {}
capture_obj = metadata_obj.get("capture") if isinstance(metadata_obj.get("capture"), dict) else {}
missing = []
if not message_obj.get("client_id"):
missing.append("message.client_id")
if not message_obj.get("status"):
missing.append("message.status")
if not metadata_obj.get("schema_version"):
missing.append("metadata.schema_version")
if not capture_obj.get("type"):
missing.append("metadata.capture.type")
if missing:
logging.warning(
f"Dashboard v2 payload from {uuid} missing required fields: {', '.join(missing)}"
)
def _extract_dashboard_payload_fields(data):
"""
Parse dashboard payload fields from the grouped v2 schema only.
"""
if not isinstance(data, dict):
return {
"image": None,
"timestamp": None,
"screenshot_type": None,
"status": None,
"process_health": {},
}
# v2 grouped payload blocks
message_obj = data.get("message") if isinstance(data.get("message"), dict) else None
content_obj = data.get("content") if isinstance(data.get("content"), dict) else None
runtime_obj = data.get("runtime") if isinstance(data.get("runtime"), dict) else None
metadata_obj = data.get("metadata") if isinstance(data.get("metadata"), dict) else None
screenshot_obj = None
if isinstance(content_obj, dict) and isinstance(content_obj.get("screenshot"), dict):
screenshot_obj = content_obj.get("screenshot")
capture_obj = metadata_obj.get("capture") if metadata_obj and isinstance(metadata_obj.get("capture"), dict) else None
# Screenshot type comes from v2 metadata.capture.type.
screenshot_type = _normalize_screenshot_type(capture_obj.get("type") if capture_obj else None)
# Image from v2 content.screenshot.
image_value = None
for container in (screenshot_obj,):
if not isinstance(container, dict):
continue
raw_type = container.get("screenshot_type") or container.get("screenshotType")
if raw_type is not None:
normalized_type = str(raw_type).strip().lower()
if normalized_type in ("periodic", "event_start", "event_stop"):
screenshot_type = normalized_type
break
for key in ("image", "data"):
if isinstance(data.get(key), str) and data.get(key):
image_value = data.get(key)
break
if image_value is None and screenshot_obj is not None:
for key in ("image", "data"):
if isinstance(screenshot_obj.get(key), str) and screenshot_obj.get(key):
image_value = screenshot_obj.get(key)
break
for container in (data, screenshot_obj, metadata_obj, screenshot_meta_obj):
if not isinstance(container, dict):
continue
for key in ("timestamp", "captured_at", "capture_time", "created_at"):
for key in ("data", "image"):
value = container.get(key)
if value is not None:
timestamp_value = value
return image_value, timestamp_value, screenshot_type
if isinstance(value, str) and value:
image_value = value
break
if image_value is not None:
break
return image_value, timestamp_value, screenshot_type
# Timestamp precedence: v2 screenshot.timestamp -> capture.captured_at -> metadata.published_at
timestamp_value = None
timestamp_candidates = [
screenshot_obj.get("timestamp") if screenshot_obj else None,
capture_obj.get("captured_at") if capture_obj else None,
metadata_obj.get("published_at") if metadata_obj else None,
]
for value in timestamp_candidates:
if value is not None:
timestamp_value = value
break
# Monitoring fields from v2 message/runtime.
status_value = (message_obj or {}).get("status")
process_health = (runtime_obj or {}).get("process_health")
if not isinstance(process_health, dict):
process_health = {}
return {
"image": image_value,
"timestamp": timestamp_value,
"screenshot_type": screenshot_type,
"status": status_value,
"process_health": process_health,
}
def handle_screenshot(uuid, payload):
@@ -208,7 +324,10 @@ def handle_screenshot(uuid, payload):
# Try to parse as JSON first
try:
data = json.loads(payload.decode())
image_b64, timestamp_value, screenshot_type = _extract_image_and_timestamp(data)
extracted = _extract_dashboard_payload_fields(data)
image_b64 = extracted["image"]
timestamp_value = extracted["timestamp"]
screenshot_type = extracted["screenshot_type"]
if image_b64:
# Payload is JSON with base64 image
api_payload = {"image": image_b64}
@@ -274,22 +393,25 @@ def on_message(client, userdata, msg):
try:
payload_text = msg.payload.decode()
data = json.loads(payload_text)
image_b64, ts_value, screenshot_type = _extract_image_and_timestamp(data)
parse_mode, schema_version = _classify_dashboard_payload(data)
_record_dashboard_parse_metric(parse_mode, uuid, schema_version=schema_version)
if parse_mode == "v2_success":
_validate_v2_required_fields(data, uuid)
extracted = _extract_dashboard_payload_fields(data)
image_b64 = extracted["image"]
ts_value = extracted["timestamp"]
screenshot_type = extracted["screenshot_type"]
if image_b64:
logging.debug(f"Dashboard enthält Screenshot für {uuid}; Weiterleitung an API")
dashboard_payload = {"image": image_b64}
if ts_value is not None:
dashboard_payload["timestamp"] = ts_value
if screenshot_type:
dashboard_payload["screenshot_type"] = screenshot_type
api_payload = json.dumps(dashboard_payload).encode("utf-8")
handle_screenshot(uuid, api_payload)
# Forward original v2 payload so handle_screenshot can parse grouped fields.
handle_screenshot(uuid, msg.payload)
# Update last_alive if status present
if data.get("status") == "alive":
if extracted["status"] == "alive":
session = Session()
client_obj = session.query(Client).filter_by(uuid=uuid).first()
if client_obj:
process_health = data.get('process_health') or {}
process_health = extracted["process_health"]
apply_monitoring_update(
client_obj,
last_seen=datetime.datetime.now(datetime.UTC),
@@ -301,6 +423,7 @@ def on_message(client, userdata, msg):
session.commit()
session.close()
except Exception as e:
_record_dashboard_parse_metric("parse_failures", uuid, reason=str(e))
logging.error(f"Fehler beim Verarbeiten des Dashboard-Payloads von {uuid}: {e}")
return

View File

@@ -0,0 +1,378 @@
"""
Mixed-format integration tests for the dashboard payload parser.
Tests cover:
- Legacy top-level payload is rejected (v2-only mode)
- v2 grouped payload: periodic capture
- v2 grouped payload: event_start capture
- v2 grouped payload: event_stop capture
- Classification into v2_success / parse_failures
- Soft required-field validation (v2 only, never drops message)
- Edge cases: missing image, missing status, non-dict payload
"""
import sys
import os
import logging
import importlib.util
# listener/ has no __init__.py — load the module directly from its file path
os.environ.setdefault("DB_CONN", "sqlite:///:memory:") # prevent DB engine errors on import
_LISTENER_PATH = os.path.join(os.path.dirname(__file__), "listener.py")
_spec = importlib.util.spec_from_file_location("listener_module", _LISTENER_PATH)
_mod = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_mod)
_extract_dashboard_payload_fields = _mod._extract_dashboard_payload_fields
_classify_dashboard_payload = _mod._classify_dashboard_payload
_validate_v2_required_fields = _mod._validate_v2_required_fields
_normalize_screenshot_type = _mod._normalize_screenshot_type
DASHBOARD_PARSE_METRICS = _mod.DASHBOARD_PARSE_METRICS
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
IMAGE_B64 = "aGVsbG8=" # base64("hello")
LEGACY_PAYLOAD = {
"client_id": "uuid-legacy",
"status": "alive",
"screenshot": {
"data": IMAGE_B64,
"timestamp": "2026-03-30T10:00:00+00:00",
},
"screenshot_type": "periodic",
"process_health": {
"current_process": "impressive",
"process_pid": 1234,
"process_status": "running",
"event_id": 42,
},
}
def _make_v2(capture_type):
return {
"message": {
"client_id": "uuid-v2",
"status": "alive",
},
"content": {
"screenshot": {
"filename": "latest.jpg",
"data": IMAGE_B64,
"timestamp": "2026-03-30T10:15:41.123456+00:00",
"size": 6,
}
},
"runtime": {
"system_info": {
"hostname": "pi-display-01",
"ip": "192.168.1.42",
"uptime": 12345.0,
},
"process_health": {
"event_id": "evt-7",
"event_type": "presentation",
"current_process": "impressive",
"process_pid": 4123,
"process_status": "running",
"restart_count": 0,
},
},
"metadata": {
"schema_version": "2.0",
"producer": "simclient",
"published_at": "2026-03-30T10:15:42.004321+00:00",
"capture": {
"type": capture_type,
"captured_at": "2026-03-30T10:15:41.123456+00:00",
"age_s": 0.9,
"triggered": capture_type != "periodic",
"send_immediately": capture_type != "periodic",
},
"transport": {"qos": 0, "publisher": "simclient"},
},
}
V2_PERIODIC = _make_v2("periodic")
V2_EVT_START = _make_v2("event_start")
V2_EVT_STOP = _make_v2("event_stop")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def assert_eq(label, actual, expected):
assert actual == expected, f"FAIL [{label}]: expected {expected!r}, got {actual!r}"
def assert_not_none(label, actual):
assert actual is not None, f"FAIL [{label}]: expected non-None, got None"
def assert_none(label, actual):
assert actual is None, f"FAIL [{label}]: expected None, got {actual!r}"
def assert_warns(label, fn, substring):
"""Assert that fn() emits a logging.WARNING containing substring."""
records = []
handler = logging.handlers_collector(records)
logger = logging.getLogger()
logger.addHandler(handler)
try:
fn()
finally:
logger.removeHandler(handler)
warnings = [r.getMessage() for r in records if r.levelno == logging.WARNING]
assert any(substring in w for w in warnings), (
f"FAIL [{label}]: no WARNING containing {substring!r} found in {warnings}"
)
class _CapturingHandler(logging.Handler):
def __init__(self, records):
super().__init__()
self._records = records
def emit(self, record):
self._records.append(record)
def capture_warnings(fn):
"""Run fn(), return list of WARNING message strings."""
records = []
handler = _CapturingHandler(records)
logger = logging.getLogger()
logger.addHandler(handler)
try:
fn()
finally:
logger.removeHandler(handler)
return [r.getMessage() for r in records if r.levelno == logging.WARNING]
# ---------------------------------------------------------------------------
# Tests: _normalize_screenshot_type
# ---------------------------------------------------------------------------
def test_normalize_known_types():
for t in ("periodic", "event_start", "event_stop"):
assert_eq(f"normalize_{t}", _normalize_screenshot_type(t), t)
assert_eq(f"normalize_{t}_upper", _normalize_screenshot_type(t.upper()), t)
def test_normalize_unknown_returns_none():
assert_none("normalize_unknown", _normalize_screenshot_type("unknown"))
assert_none("normalize_none", _normalize_screenshot_type(None))
assert_none("normalize_empty", _normalize_screenshot_type(""))
# ---------------------------------------------------------------------------
# Tests: _classify_dashboard_payload
# ---------------------------------------------------------------------------
def test_classify_legacy():
mode, ver = _classify_dashboard_payload(LEGACY_PAYLOAD)
assert_eq("classify_legacy_mode", mode, "parse_failures")
assert_none("classify_legacy_version", ver)
def test_classify_v2_periodic():
mode, ver = _classify_dashboard_payload(V2_PERIODIC)
assert_eq("classify_v2_periodic_mode", mode, "v2_success")
assert_eq("classify_v2_periodic_version", ver, "2.0")
def test_classify_v2_event_start():
mode, ver = _classify_dashboard_payload(V2_EVT_START)
assert_eq("classify_v2_event_start_mode", mode, "v2_success")
def test_classify_v2_event_stop():
mode, ver = _classify_dashboard_payload(V2_EVT_STOP)
assert_eq("classify_v2_event_stop_mode", mode, "v2_success")
def test_classify_non_dict():
mode, ver = _classify_dashboard_payload("not a dict")
assert_eq("classify_non_dict", mode, "parse_failures")
def test_classify_empty_dict():
mode, ver = _classify_dashboard_payload({})
assert_eq("classify_empty_dict", mode, "parse_failures")
# ---------------------------------------------------------------------------
# Tests: _extract_dashboard_payload_fields — legacy payload rejected in v2-only mode
# ---------------------------------------------------------------------------
def test_legacy_image_not_extracted():
r = _extract_dashboard_payload_fields(LEGACY_PAYLOAD)
assert_none("legacy_image", r["image"])
def test_legacy_screenshot_type_missing():
r = _extract_dashboard_payload_fields(LEGACY_PAYLOAD)
assert_none("legacy_screenshot_type", r["screenshot_type"])
def test_legacy_status_missing():
r = _extract_dashboard_payload_fields(LEGACY_PAYLOAD)
assert_none("legacy_status", r["status"])
def test_legacy_process_health_empty():
r = _extract_dashboard_payload_fields(LEGACY_PAYLOAD)
assert_eq("legacy_process_health", r["process_health"], {})
def test_legacy_timestamp_missing():
r = _extract_dashboard_payload_fields(LEGACY_PAYLOAD)
assert_none("legacy_timestamp", r["timestamp"])
# ---------------------------------------------------------------------------
# Tests: _extract_dashboard_payload_fields — v2 periodic
# ---------------------------------------------------------------------------
def test_v2_periodic_image():
r = _extract_dashboard_payload_fields(V2_PERIODIC)
assert_eq("v2_periodic_image", r["image"], IMAGE_B64)
def test_v2_periodic_screenshot_type():
r = _extract_dashboard_payload_fields(V2_PERIODIC)
assert_eq("v2_periodic_type", r["screenshot_type"], "periodic")
def test_v2_periodic_status():
r = _extract_dashboard_payload_fields(V2_PERIODIC)
assert_eq("v2_periodic_status", r["status"], "alive")
def test_v2_periodic_process_health():
r = _extract_dashboard_payload_fields(V2_PERIODIC)
assert_eq("v2_periodic_pid", r["process_health"]["process_pid"], 4123)
assert_eq("v2_periodic_process", r["process_health"]["current_process"], "impressive")
def test_v2_periodic_timestamp_prefers_screenshot():
r = _extract_dashboard_payload_fields(V2_PERIODIC)
# screenshot.timestamp must take precedence over capture.captured_at / published_at
assert_eq("v2_periodic_ts", r["timestamp"], "2026-03-30T10:15:41.123456+00:00")
# ---------------------------------------------------------------------------
# Tests: _extract_dashboard_payload_fields — v2 event_start
# ---------------------------------------------------------------------------
def test_v2_event_start_type():
r = _extract_dashboard_payload_fields(V2_EVT_START)
assert_eq("v2_event_start_type", r["screenshot_type"], "event_start")
def test_v2_event_start_image():
r = _extract_dashboard_payload_fields(V2_EVT_START)
assert_eq("v2_event_start_image", r["image"], IMAGE_B64)
# ---------------------------------------------------------------------------
# Tests: _extract_dashboard_payload_fields — v2 event_stop
# ---------------------------------------------------------------------------
def test_v2_event_stop_type():
r = _extract_dashboard_payload_fields(V2_EVT_STOP)
assert_eq("v2_event_stop_type", r["screenshot_type"], "event_stop")
def test_v2_event_stop_image():
r = _extract_dashboard_payload_fields(V2_EVT_STOP)
assert_eq("v2_event_stop_image", r["image"], IMAGE_B64)
# ---------------------------------------------------------------------------
# Tests: _extract_dashboard_payload_fields — edge cases
# ---------------------------------------------------------------------------
def test_non_dict_returns_nulls():
r = _extract_dashboard_payload_fields("bad")
assert_none("non_dict_image", r["image"])
assert_none("non_dict_type", r["screenshot_type"])
assert_none("non_dict_status", r["status"])
def test_missing_image_returns_none():
payload = {**V2_PERIODIC, "content": {"screenshot": {"timestamp": "2026-03-30T10:00:00+00:00"}}}
r = _extract_dashboard_payload_fields(payload)
assert_none("missing_image", r["image"])
def test_missing_process_health_returns_empty_dict():
import copy
payload = copy.deepcopy(V2_PERIODIC)
del payload["runtime"]["process_health"]
r = _extract_dashboard_payload_fields(payload)
assert_eq("missing_ph", r["process_health"], {})
def test_timestamp_fallback_to_captured_at_when_no_screenshot_ts():
import copy
payload = copy.deepcopy(V2_PERIODIC)
del payload["content"]["screenshot"]["timestamp"]
r = _extract_dashboard_payload_fields(payload)
assert_eq("ts_fallback_captured_at", r["timestamp"], "2026-03-30T10:15:41.123456+00:00")
def test_timestamp_fallback_to_published_at_when_no_capture_ts():
import copy
payload = copy.deepcopy(V2_PERIODIC)
del payload["content"]["screenshot"]["timestamp"]
del payload["metadata"]["capture"]["captured_at"]
r = _extract_dashboard_payload_fields(payload)
assert_eq("ts_fallback_published_at", r["timestamp"], "2026-03-30T10:15:42.004321+00:00")
# ---------------------------------------------------------------------------
# Tests: _validate_v2_required_fields (soft — never raises)
# ---------------------------------------------------------------------------
def test_v2_valid_payload_no_warnings():
warnings = capture_warnings(lambda: _validate_v2_required_fields(V2_PERIODIC, "uuid-v2"))
assert warnings == [], f"FAIL: unexpected warnings for valid payload: {warnings}"
def test_v2_missing_client_id_warns():
import copy
payload = copy.deepcopy(V2_PERIODIC)
del payload["message"]["client_id"]
warnings = capture_warnings(lambda: _validate_v2_required_fields(payload, "uuid-v2"))
assert any("message.client_id" in w for w in warnings), f"FAIL: {warnings}"
def test_v2_missing_status_warns():
import copy
payload = copy.deepcopy(V2_PERIODIC)
del payload["message"]["status"]
warnings = capture_warnings(lambda: _validate_v2_required_fields(payload, "uuid-v2"))
assert any("message.status" in w for w in warnings), f"FAIL: {warnings}"
def test_v2_missing_schema_version_warns():
import copy
payload = copy.deepcopy(V2_PERIODIC)
del payload["metadata"]["schema_version"]
warnings = capture_warnings(lambda: _validate_v2_required_fields(payload, "uuid-v2"))
assert any("metadata.schema_version" in w for w in warnings), f"FAIL: {warnings}"
def test_v2_missing_capture_type_warns():
import copy
payload = copy.deepcopy(V2_PERIODIC)
del payload["metadata"]["capture"]["type"]
warnings = capture_warnings(lambda: _validate_v2_required_fields(payload, "uuid-v2"))
assert any("metadata.capture.type" in w for w in warnings), f"FAIL: {warnings}"
def test_v2_multiple_missing_fields_all_reported():
import copy
payload = copy.deepcopy(V2_PERIODIC)
del payload["message"]["client_id"]
del payload["metadata"]["capture"]["type"]
warnings = capture_warnings(lambda: _validate_v2_required_fields(payload, "uuid-v2"))
assert len(warnings) == 1, f"FAIL: expected 1 combined warning, got {warnings}"
assert "message.client_id" in warnings[0], f"FAIL: {warnings}"
assert "metadata.capture.type" in warnings[0], f"FAIL: {warnings}"
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def run_all():
tests = {k: v for k, v in globals().items() if k.startswith("test_") and callable(v)}
passed = failed = 0
for name, fn in sorted(tests.items()):
try:
fn()
print(f" PASS {name}")
passed += 1
except AssertionError as e:
print(f" FAIL {name}: {e}")
failed += 1
except Exception as e:
print(f" ERROR {name}: {type(e).__name__}: {e}")
failed += 1
print(f"\n{passed} passed, {failed} failed out of {passed + failed} tests")
return failed == 0
if __name__ == "__main__":
ok = run_all()
sys.exit(0 if ok else 1)