feat(monitoring): add server-side client logging and health infrastructure
- add Alembic migration c1d2e3f4g5h6 for client monitoring:
- create client_logs table with FK to clients.uuid and performance indexes
- extend clients with process/health tracking fields
- extend data model with ClientLog, LogLevel, ProcessStatus, and ScreenHealthStatus
- enhance listener MQTT handling:
- subscribe to logs and health topics
- persist client logs from infoscreen/{uuid}/logs/{level}
- process health payloads and enrich heartbeat-derived client state
- add monitoring API blueprint server/routes/client_logs.py:
- GET /api/client-logs/<uuid>/logs
- GET /api/client-logs/summary
- GET /api/client-logs/recent-errors
- GET /api/client-logs/test
- register client_logs blueprint in server/wsgi.py
- align compose/dev runtime for listener live-code execution
- add client-side implementation docs:
- CLIENT_MONITORING_SPECIFICATION.md
- CLIENT_MONITORING_IMPLEMENTATION_GUIDE.md
- update TECH-CHANGELOG.md and copilot-instructions.md:
- document monitoring changes
- codify post-release technical-notes/no-version-bump convention
This commit is contained in:
@@ -7,7 +7,7 @@ import requests
|
||||
import paho.mqtt.client as mqtt
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from models.models import Client
|
||||
from models.models import Client, ClientLog, LogLevel, ProcessStatus
|
||||
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(message)s')
|
||||
|
||||
# Load .env in development
|
||||
@@ -78,7 +78,14 @@ def on_connect(client, userdata, flags, reasonCode, properties):
|
||||
client.subscribe("infoscreen/+/heartbeat")
|
||||
client.subscribe("infoscreen/+/screenshot")
|
||||
client.subscribe("infoscreen/+/dashboard")
|
||||
logging.info(f"MQTT connected (reasonCode: {reasonCode}); (re)subscribed to discovery, heartbeats, screenshots, and dashboards")
|
||||
|
||||
# Subscribe to monitoring topics
|
||||
client.subscribe("infoscreen/+/logs/error")
|
||||
client.subscribe("infoscreen/+/logs/warn")
|
||||
client.subscribe("infoscreen/+/logs/info")
|
||||
client.subscribe("infoscreen/+/health")
|
||||
|
||||
logging.info(f"MQTT connected (reasonCode: {reasonCode}); (re)subscribed to discovery, heartbeats, screenshots, dashboards, logs, and health")
|
||||
except Exception as e:
|
||||
logging.error(f"Subscribe failed on connect: {e}")
|
||||
|
||||
@@ -124,15 +131,123 @@ def on_message(client, userdata, msg):
|
||||
# Heartbeat-Handling
|
||||
if topic.startswith("infoscreen/") and topic.endswith("/heartbeat"):
|
||||
uuid = topic.split("/")[1]
|
||||
try:
|
||||
# Parse payload to get optional health data
|
||||
payload_data = json.loads(msg.payload.decode())
|
||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||
payload_data = {}
|
||||
|
||||
session = Session()
|
||||
client_obj = session.query(Client).filter_by(uuid=uuid).first()
|
||||
if client_obj:
|
||||
client_obj.last_alive = datetime.datetime.now(datetime.UTC)
|
||||
|
||||
# Update health fields if present in heartbeat
|
||||
if 'process_status' in payload_data:
|
||||
try:
|
||||
client_obj.process_status = ProcessStatus[payload_data['process_status']]
|
||||
except (KeyError, TypeError):
|
||||
pass
|
||||
|
||||
if 'current_process' in payload_data:
|
||||
client_obj.current_process = payload_data.get('current_process')
|
||||
|
||||
if 'process_pid' in payload_data:
|
||||
client_obj.process_pid = payload_data.get('process_pid')
|
||||
|
||||
if 'current_event_id' in payload_data:
|
||||
client_obj.current_event_id = payload_data.get('current_event_id')
|
||||
|
||||
session.commit()
|
||||
logging.info(
|
||||
f"Heartbeat von {uuid} empfangen, last_alive (UTC) aktualisiert.")
|
||||
logging.info(f"Heartbeat von {uuid} empfangen, last_alive (UTC) aktualisiert.")
|
||||
session.close()
|
||||
return
|
||||
|
||||
# Log-Handling (ERROR, WARN, INFO)
|
||||
if topic.startswith("infoscreen/") and "/logs/" in topic:
|
||||
parts = topic.split("/")
|
||||
if len(parts) >= 4:
|
||||
uuid = parts[1]
|
||||
level_str = parts[3].upper() # 'error', 'warn', 'info' -> 'ERROR', 'WARN', 'INFO'
|
||||
|
||||
try:
|
||||
payload_data = json.loads(msg.payload.decode())
|
||||
message = payload_data.get('message', '')
|
||||
timestamp_str = payload_data.get('timestamp')
|
||||
context = payload_data.get('context', {})
|
||||
|
||||
# Parse timestamp or use current time
|
||||
if timestamp_str:
|
||||
try:
|
||||
log_timestamp = datetime.datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
|
||||
if log_timestamp.tzinfo is None:
|
||||
log_timestamp = log_timestamp.replace(tzinfo=datetime.UTC)
|
||||
except ValueError:
|
||||
log_timestamp = datetime.datetime.now(datetime.UTC)
|
||||
else:
|
||||
log_timestamp = datetime.datetime.now(datetime.UTC)
|
||||
|
||||
# Store in database
|
||||
session = Session()
|
||||
try:
|
||||
log_level = LogLevel[level_str]
|
||||
log_entry = ClientLog(
|
||||
client_uuid=uuid,
|
||||
timestamp=log_timestamp,
|
||||
level=log_level,
|
||||
message=message,
|
||||
context=json.dumps(context) if context else None
|
||||
)
|
||||
session.add(log_entry)
|
||||
session.commit()
|
||||
logging.info(f"[{level_str}] {uuid}: {message}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error saving log from {uuid}: {e}")
|
||||
session.rollback()
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
||||
logging.error(f"Could not parse log payload from {uuid}: {e}")
|
||||
return
|
||||
|
||||
# Health-Handling
|
||||
if topic.startswith("infoscreen/") and topic.endswith("/health"):
|
||||
uuid = topic.split("/")[1]
|
||||
try:
|
||||
payload_data = json.loads(msg.payload.decode())
|
||||
|
||||
session = Session()
|
||||
client_obj = session.query(Client).filter_by(uuid=uuid).first()
|
||||
if client_obj:
|
||||
# Update expected state
|
||||
expected = payload_data.get('expected_state', {})
|
||||
if 'event_id' in expected:
|
||||
client_obj.current_event_id = expected['event_id']
|
||||
|
||||
# Update actual state
|
||||
actual = payload_data.get('actual_state', {})
|
||||
if 'process' in actual:
|
||||
client_obj.current_process = actual['process']
|
||||
|
||||
if 'pid' in actual:
|
||||
client_obj.process_pid = actual['pid']
|
||||
|
||||
if 'status' in actual:
|
||||
try:
|
||||
client_obj.process_status = ProcessStatus[actual['status']]
|
||||
except (KeyError, TypeError):
|
||||
pass
|
||||
|
||||
session.commit()
|
||||
logging.debug(f"Health update from {uuid}: {actual.get('process')} ({actual.get('status')})")
|
||||
session.close()
|
||||
|
||||
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
||||
logging.error(f"Could not parse health payload from {uuid}: {e}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing health from {uuid}: {e}")
|
||||
return
|
||||
|
||||
# Discovery-Handling
|
||||
if topic == "infoscreen/discovery":
|
||||
|
||||
Reference in New Issue
Block a user