feat(monitoring): complete monitoring pipeline and fix presentation flag persistence

add superadmin monitoring dashboard with protected route, menu entry, and monitoring data client
add monitoring overview API endpoint and improve log serialization/aggregation for dashboard use
extend listener health/log handling with robust status/event/timestamp normalization and screenshot payload extraction
improve screenshot persistence and retrieval (timestamp-aware uploads, latest screenshot endpoint fallback)
fix page_progress and auto_progress persistence/serialization across create, update, and detached occurrence flows
align technical and project docs to reflect implemented monitoring and no-version-bump backend changes
add documentation sync log entry and include minor compose env indentation cleanup
This commit is contained in:
2026-03-24 11:18:33 +00:00
parent 3107d0f671
commit 9c330f984f
18 changed files with 2095 additions and 104 deletions

View File

@@ -3,15 +3,17 @@ import json
import logging
import datetime
import base64
import re
import requests
import paho.mqtt.client as mqtt
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models.models import Client, ClientLog, LogLevel, ProcessStatus
from models.models import Client, ClientLog, LogLevel, ProcessStatus, ScreenHealthStatus
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(message)s')
# Load .env in development
if os.getenv("ENV", "development") == "development":
# Load .env only when not already configured by Docker (API_BASE_URL not set by compose means we're outside a container)
_api_already_set = bool(os.environ.get("API_BASE_URL"))
if not _api_already_set and os.getenv("ENV", "development") == "development":
try:
from dotenv import load_dotenv
load_dotenv(".env")
@@ -31,6 +33,161 @@ Session = sessionmaker(bind=engine)
API_BASE_URL = os.getenv("API_BASE_URL", "http://server:8000")
def normalize_process_status(value):
if value is None:
return None
if isinstance(value, ProcessStatus):
return value
normalized = str(value).strip().lower()
if not normalized:
return None
try:
return ProcessStatus(normalized)
except ValueError:
return None
def normalize_event_id(value):
if value is None or isinstance(value, bool):
return None
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
normalized = str(value).strip()
if not normalized:
return None
if normalized.isdigit():
return int(normalized)
match = re.search(r"(\d+)$", normalized)
if match:
return int(match.group(1))
return None
def parse_timestamp(value):
if not value:
return None
if isinstance(value, (int, float)):
try:
ts_value = float(value)
if ts_value > 1e12:
ts_value = ts_value / 1000.0
return datetime.datetime.fromtimestamp(ts_value, datetime.UTC)
except (TypeError, ValueError, OverflowError):
return None
try:
value_str = str(value).strip()
if value_str.isdigit():
ts_value = float(value_str)
if ts_value > 1e12:
ts_value = ts_value / 1000.0
return datetime.datetime.fromtimestamp(ts_value, datetime.UTC)
parsed = datetime.datetime.fromisoformat(value_str.replace('Z', '+00:00'))
if parsed.tzinfo is None:
return parsed.replace(tzinfo=datetime.UTC)
return parsed.astimezone(datetime.UTC)
except ValueError:
return None
def infer_screen_health_status(payload_data):
explicit = payload_data.get('screen_health_status')
if explicit:
try:
return ScreenHealthStatus[str(explicit).strip().upper()]
except KeyError:
pass
metrics = payload_data.get('health_metrics') or {}
if metrics.get('screen_on') is False:
return ScreenHealthStatus.BLACK
last_frame_update = parse_timestamp(metrics.get('last_frame_update'))
if last_frame_update:
age_seconds = (datetime.datetime.now(datetime.UTC) - last_frame_update).total_seconds()
if age_seconds > 30:
return ScreenHealthStatus.FROZEN
return ScreenHealthStatus.OK
return None
def apply_monitoring_update(client_obj, *, event_id=None, process_name=None, process_pid=None,
process_status=None, last_seen=None, screen_health_status=None,
last_screenshot_analyzed=None):
if last_seen:
client_obj.last_alive = last_seen
normalized_event_id = normalize_event_id(event_id)
if normalized_event_id is not None:
client_obj.current_event_id = normalized_event_id
if process_name is not None:
client_obj.current_process = process_name
if process_pid is not None:
client_obj.process_pid = process_pid
normalized_status = normalize_process_status(process_status)
if normalized_status is not None:
client_obj.process_status = normalized_status
if screen_health_status is not None:
client_obj.screen_health_status = screen_health_status
if last_screenshot_analyzed is not None:
existing = client_obj.last_screenshot_analyzed
if existing is not None and existing.tzinfo is None:
existing = existing.replace(tzinfo=datetime.UTC)
candidate = last_screenshot_analyzed
if candidate.tzinfo is None:
candidate = candidate.replace(tzinfo=datetime.UTC)
if existing is None or candidate >= existing:
client_obj.last_screenshot_analyzed = candidate
def _extract_image_and_timestamp(data):
image_value = None
timestamp_value = None
if not isinstance(data, dict):
return None, None
screenshot_obj = data.get("screenshot") if isinstance(data.get("screenshot"), dict) else None
metadata_obj = data.get("metadata") if isinstance(data.get("metadata"), dict) else None
screenshot_meta_obj = screenshot_obj.get("metadata") if screenshot_obj and isinstance(screenshot_obj.get("metadata"), dict) else None
for key in ("image", "data"):
if isinstance(data.get(key), str) and data.get(key):
image_value = data.get(key)
break
if image_value is None and screenshot_obj is not None:
for key in ("image", "data"):
if isinstance(screenshot_obj.get(key), str) and screenshot_obj.get(key):
image_value = screenshot_obj.get(key)
break
for container in (data, screenshot_obj, metadata_obj, screenshot_meta_obj):
if not isinstance(container, dict):
continue
for key in ("timestamp", "captured_at", "capture_time", "created_at"):
value = container.get(key)
if value is not None:
timestamp_value = value
return image_value, timestamp_value
return image_value, timestamp_value
def handle_screenshot(uuid, payload):
"""
Handle screenshot data received via MQTT and forward to API.
@@ -40,13 +197,16 @@ def handle_screenshot(uuid, payload):
# Try to parse as JSON first
try:
data = json.loads(payload.decode())
if "image" in data:
image_b64, timestamp_value = _extract_image_and_timestamp(data)
if image_b64:
# Payload is JSON with base64 image
api_payload = {"image": data["image"]}
api_payload = {"image": image_b64}
if timestamp_value is not None:
api_payload["timestamp"] = timestamp_value
headers = {"Content-Type": "application/json"}
logging.debug(f"Forwarding base64 screenshot from {uuid} to API")
else:
logging.warning(f"Screenshot JSON from {uuid} missing 'image' field")
logging.warning(f"Screenshot JSON from {uuid} missing image/data field")
return
except (json.JSONDecodeError, UnicodeDecodeError):
# Payload is raw binary image data - encode to base64 for API
@@ -101,21 +261,28 @@ def on_message(client, userdata, msg):
try:
payload_text = msg.payload.decode()
data = json.loads(payload_text)
shot = data.get("screenshot")
if isinstance(shot, dict):
# Prefer 'data' field (base64) inside screenshot object
image_b64 = shot.get("data")
if image_b64:
logging.debug(f"Dashboard enthält Screenshot für {uuid}; Weiterleitung an API")
# Build a lightweight JSON with image field for API handler
api_payload = json.dumps({"image": image_b64}).encode("utf-8")
handle_screenshot(uuid, api_payload)
image_b64, ts_value = _extract_image_and_timestamp(data)
if image_b64:
logging.debug(f"Dashboard enthält Screenshot für {uuid}; Weiterleitung an API")
dashboard_payload = {"image": image_b64}
if ts_value is not None:
dashboard_payload["timestamp"] = ts_value
api_payload = json.dumps(dashboard_payload).encode("utf-8")
handle_screenshot(uuid, api_payload)
# Update last_alive if status present
if data.get("status") == "alive":
session = Session()
client_obj = session.query(Client).filter_by(uuid=uuid).first()
if client_obj:
client_obj.last_alive = datetime.datetime.now(datetime.UTC)
process_health = data.get('process_health') or {}
apply_monitoring_update(
client_obj,
last_seen=datetime.datetime.now(datetime.UTC),
event_id=process_health.get('event_id'),
process_name=process_health.get('current_process') or process_health.get('process'),
process_pid=process_health.get('process_pid') or process_health.get('pid'),
process_status=process_health.get('process_status') or process_health.get('status'),
)
session.commit()
session.close()
except Exception as e:
@@ -140,24 +307,14 @@ def on_message(client, userdata, msg):
session = Session()
client_obj = session.query(Client).filter_by(uuid=uuid).first()
if client_obj:
client_obj.last_alive = datetime.datetime.now(datetime.UTC)
# Update health fields if present in heartbeat
if 'process_status' in payload_data:
try:
client_obj.process_status = ProcessStatus[payload_data['process_status']]
except (KeyError, TypeError):
pass
if 'current_process' in payload_data:
client_obj.current_process = payload_data.get('current_process')
if 'process_pid' in payload_data:
client_obj.process_pid = payload_data.get('process_pid')
if 'current_event_id' in payload_data:
client_obj.current_event_id = payload_data.get('current_event_id')
apply_monitoring_update(
client_obj,
last_seen=datetime.datetime.now(datetime.UTC),
event_id=payload_data.get('current_event_id'),
process_name=payload_data.get('current_process'),
process_pid=payload_data.get('process_pid'),
process_status=payload_data.get('process_status'),
)
session.commit()
logging.info(f"Heartbeat von {uuid} empfangen, last_alive (UTC) aktualisiert.")
session.close()
@@ -222,23 +379,20 @@ def on_message(client, userdata, msg):
if client_obj:
# Update expected state
expected = payload_data.get('expected_state', {})
if 'event_id' in expected:
client_obj.current_event_id = expected['event_id']
# Update actual state
actual = payload_data.get('actual_state', {})
if 'process' in actual:
client_obj.current_process = actual['process']
if 'pid' in actual:
client_obj.process_pid = actual['pid']
if 'status' in actual:
try:
client_obj.process_status = ProcessStatus[actual['status']]
except (KeyError, TypeError):
pass
screen_health_status = infer_screen_health_status(payload_data)
apply_monitoring_update(
client_obj,
last_seen=datetime.datetime.now(datetime.UTC),
event_id=expected.get('event_id'),
process_name=actual.get('process'),
process_pid=actual.get('pid'),
process_status=actual.get('status'),
screen_health_status=screen_health_status,
last_screenshot_analyzed=parse_timestamp((payload_data.get('health_metrics') or {}).get('last_frame_update')),
)
session.commit()
logging.debug(f"Health update from {uuid}: {actual.get('process')} ({actual.get('status')})")
session.close()