Files
infoscreen/CLIENT_MONITORING_IMPLEMENTATION_GUIDE.md
Olaf 9c330f984f feat(monitoring): complete monitoring pipeline and fix presentation flag persistence
add superadmin monitoring dashboard with protected route, menu entry, and monitoring data client
add monitoring overview API endpoint and improve log serialization/aggregation for dashboard use
extend listener health/log handling with robust status/event/timestamp normalization and screenshot payload extraction
improve screenshot persistence and retrieval (timestamp-aware uploads, latest screenshot endpoint fallback)
fix page_progress and auto_progress persistence/serialization across create, update, and detached occurrence flows
align technical and project docs to reflect implemented monitoring and no-version-bump backend changes
add documentation sync log entry and include minor compose env indentation cleanup
2026-03-24 11:18:33 +00:00

24 KiB

🚀 Client Monitoring Implementation Guide

Phase-based implementation guide for basic monitoring in development phase


Phase 1: Server-Side Database Foundation

Status: COMPLETE
Dependencies: None - Already implemented
Time estimate: Completed

Step 1.1: Database Migration

File: server/alembic/versions/c1d2e3f4g5h6_add_client_monitoring.py
What it does:

  • Creates client_logs table for centralized logging
  • Adds health monitoring columns to clients table
  • Creates indexes for efficient querying

To apply:

cd /workspace/server
alembic upgrade head

Step 1.2: Update Data Models

File: models/models.py
What was added:

  • New enums: LogLevel, ProcessStatus, ScreenHealthStatus
  • Updated Client model with health tracking fields
  • New ClientLog model for log storage

🔧 Phase 2: Server-Side Backend Logic

Status: COMPLETE
Dependencies: Phase 1 complete
Time estimate: 2-3 hours

Step 2.1: Extend MQTT Listener

File: listener/listener.py
What to add:

# Add new topic subscriptions in on_connect():
client.subscribe("infoscreen/+/logs/error")
client.subscribe("infoscreen/+/logs/warn")
client.subscribe("infoscreen/+/logs/info")  # Dev mode only
client.subscribe("infoscreen/+/health")

# Add new handler in on_message():
def handle_log_message(uuid, level, payload):
    """Store client log in database"""
    from models.models import ClientLog, LogLevel
    from server.database import Session
    import json
    
    session = Session()
    try:
        log_entry = ClientLog(
            client_uuid=uuid,
            timestamp=payload.get('timestamp', datetime.now(timezone.utc)),
            level=LogLevel[level],
            message=payload.get('message', ''),
            context=json.dumps(payload.get('context', {}))
        )
        session.add(log_entry)
        session.commit()
        print(f"[LOG] {uuid} {level}: {payload.get('message', '')}")
    except Exception as e:
        print(f"Error saving log: {e}")
        session.rollback()
    finally:
        session.close()

def handle_health_message(uuid, payload):
    """Update client health status"""
    from models.models import Client, ProcessStatus
    from server.database import Session
    
    session = Session()
    try:
        client = session.query(Client).filter_by(uuid=uuid).first()
        if client:
            client.current_event_id = payload.get('expected_state', {}).get('event_id')
            client.current_process = payload.get('actual_state', {}).get('process')
            
            status_str = payload.get('actual_state', {}).get('status')
            if status_str:
                client.process_status = ProcessStatus[status_str]
            
            client.process_pid = payload.get('actual_state', {}).get('pid')
            session.commit()
    except Exception as e:
        print(f"Error updating health: {e}")
        session.rollback()
    finally:
        session.close()

Topic routing logic:

# In on_message callback, add routing:
if topic.endswith('/logs/error'):
    handle_log_message(uuid, 'ERROR', payload)
elif topic.endswith('/logs/warn'):
    handle_log_message(uuid, 'WARN', payload)
elif topic.endswith('/logs/info'):
    handle_log_message(uuid, 'INFO', payload)
elif topic.endswith('/health'):
    handle_health_message(uuid, payload)

Step 2.2: Create API Routes

File: server/routes/client_logs.py (NEW)

from flask import Blueprint, jsonify, request
from server.database import Session
from server.permissions import admin_or_higher
from models.models import ClientLog, Client
from sqlalchemy import desc
import json

client_logs_bp = Blueprint("client_logs", __name__, url_prefix="/api/client-logs")

@client_logs_bp.route("/<uuid>/logs", methods=["GET"])
@admin_or_higher
def get_client_logs(uuid):
    """
    Get logs for a specific client
    Query params:
      - level: ERROR, WARN, INFO, DEBUG (optional)
      - limit: number of entries (default 50, max 500)
      - since: ISO timestamp (optional)
    """
    session = Session()
    try:
        level = request.args.get('level')
        limit = min(int(request.args.get('limit', 50)), 500)
        since = request.args.get('since')
        
        query = session.query(ClientLog).filter_by(client_uuid=uuid)
        
        if level:
            from models.models import LogLevel
            query = query.filter_by(level=LogLevel[level])
        
        if since:
            from datetime import datetime
            since_dt = datetime.fromisoformat(since.replace('Z', '+00:00'))
            query = query.filter(ClientLog.timestamp >= since_dt)
        
        logs = query.order_by(desc(ClientLog.timestamp)).limit(limit).all()
        
        result = []
        for log in logs:
            result.append({
                "id": log.id,
                "timestamp": log.timestamp.isoformat() if log.timestamp else None,
                "level": log.level.value if log.level else None,
                "message": log.message,
                "context": json.loads(log.context) if log.context else {}
            })
        
        session.close()
        return jsonify({"logs": result, "count": len(result)})
    
    except Exception as e:
        session.close()
        return jsonify({"error": str(e)}), 500

@client_logs_bp.route("/summary", methods=["GET"])
@admin_or_higher
def get_logs_summary():
    """Get summary of errors/warnings across all clients"""
    session = Session()
    try:
        from sqlalchemy import func
        from models.models import LogLevel
        from datetime import datetime, timedelta
        
        # Last 24 hours
        since = datetime.utcnow() - timedelta(hours=24)
        
        stats = session.query(
            ClientLog.client_uuid,
            ClientLog.level,
            func.count(ClientLog.id).label('count')
        ).filter(
            ClientLog.timestamp >= since
        ).group_by(
            ClientLog.client_uuid,
            ClientLog.level
        ).all()
        
        result = {}
        for stat in stats:
            uuid = stat.client_uuid
            if uuid not in result:
                result[uuid] = {"ERROR": 0, "WARN": 0, "INFO": 0}
            result[uuid][stat.level.value] = stat.count
        
        session.close()
        return jsonify({"summary": result, "period_hours": 24})
    
    except Exception as e:
        session.close()
        return jsonify({"error": str(e)}), 500

Register in server/wsgi.py:

from server.routes.client_logs import client_logs_bp
app.register_blueprint(client_logs_bp)

Step 2.3: Add Health Data to Heartbeat Handler

File: listener/listener.py (extend existing heartbeat handler)

# Modify existing heartbeat handler to capture health data
def on_message(client, userdata, message):
    topic = message.topic
    
    # Existing heartbeat logic...
    if '/heartbeat' in topic:
        uuid = extract_uuid_from_topic(topic)
        try:
            payload = json.loads(message.payload.decode())
            
            # Update last_alive (existing)
            session = Session()
            client_obj = session.query(Client).filter_by(uuid=uuid).first()
            if client_obj:
                client_obj.last_alive = datetime.now(timezone.utc)
                
                # NEW: Update health data if present in heartbeat
                if 'process_status' in payload:
                    client_obj.process_status = ProcessStatus[payload['process_status']]
                if 'current_process' in payload:
                    client_obj.current_process = payload['current_process']
                if 'process_pid' in payload:
                    client_obj.process_pid = payload['process_pid']
                if 'current_event_id' in payload:
                    client_obj.current_event_id = payload['current_event_id']
                
                session.commit()
            session.close()
        except Exception as e:
            print(f"Error processing heartbeat: {e}")

🖥️ Phase 3: Client-Side Implementation

Status: COMPLETE
Dependencies: Phase 2 complete
Time estimate: 3-4 hours

Step 3.1: Create Client Watchdog Script

File: client/watchdog.py (NEW - on client device)

#!/usr/bin/env python3
"""
Client-side process watchdog
Monitors VLC, Chromium, PDF viewer and reports health to server
"""
import psutil
import paho.mqtt.client as mqtt
import json
import time
from datetime import datetime, timezone
import sys
import os

class MediaWatchdog:
    def __init__(self, client_uuid, mqtt_broker, mqtt_port=1883):
        self.uuid = client_uuid
        self.mqtt_client = mqtt.Client()
        self.mqtt_client.connect(mqtt_broker, mqtt_port, 60)
        self.mqtt_client.loop_start()
        
        self.current_process = None
        self.current_event_id = None
        self.restart_attempts = 0
        self.MAX_RESTARTS = 3
    
    def send_log(self, level, message, context=None):
        """Send log message to server via MQTT"""
        topic = f"infoscreen/{self.uuid}/logs/{level.lower()}"
        payload = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "message": message,
            "context": context or {}
        }
        self.mqtt_client.publish(topic, json.dumps(payload), qos=1)
        print(f"[{level}] {message}")
    
    def send_health(self, process_name, pid, status, event_id=None):
        """Send health status to server"""
        topic = f"infoscreen/{self.uuid}/health"
        payload = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "expected_state": {
                "event_id": event_id
            },
            "actual_state": {
                "process": process_name,
                "pid": pid,
                "status": status  # 'running', 'crashed', 'starting', 'stopped'
            }
        }
        self.mqtt_client.publish(topic, json.dumps(payload), qos=1, retain=False)
    
    def is_process_running(self, process_name):
        """Check if a process is running"""
        for proc in psutil.process_iter(['name', 'pid']):
            try:
                if process_name.lower() in proc.info['name'].lower():
                    return proc.info['pid']
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass
        return None
    
    def monitor_loop(self):
        """Main monitoring loop"""
        print(f"Watchdog started for client {self.uuid}")
        self.send_log("INFO", "Watchdog service started", {"uuid": self.uuid})
        
        while True:
            try:
                # Check expected process (would be set by main event handler)
                if self.current_process:
                    pid = self.is_process_running(self.current_process)
                    
                    if pid:
                        # Process is running
                        self.send_health(
                            self.current_process,
                            pid,
                            "running",
                            self.current_event_id
                        )
                        self.restart_attempts = 0  # Reset on success
                    else:
                        # Process crashed
                        self.send_log(
                            "ERROR",
                            f"Process {self.current_process} crashed or stopped",
                            {
                                "event_id": self.current_event_id,
                                "process": self.current_process,
                                "restart_attempt": self.restart_attempts
                            }
                        )
                        
                        if self.restart_attempts < self.MAX_RESTARTS:
                            self.send_log("WARN", f"Attempting restart ({self.restart_attempts + 1}/{self.MAX_RESTARTS})")
                            self.restart_attempts += 1
                            # TODO: Implement restart logic (call event handler)
                        else:
                            self.send_log("ERROR", "Max restart attempts exceeded", {
                                "event_id": self.current_event_id
                            })
                
                time.sleep(5)  # Check every 5 seconds
                
            except KeyboardInterrupt:
                print("Watchdog stopped by user")
                break
            except Exception as e:
                self.send_log("ERROR", f"Watchdog error: {str(e)}", {
                    "exception": str(e),
                    "traceback": str(sys.exc_info())
                })
                time.sleep(10)  # Wait longer on error

if __name__ == "__main__":
    import sys
    if len(sys.argv) < 3:
        print("Usage: python watchdog.py <client_uuid> <mqtt_broker>")
        sys.exit(1)
    
    uuid = sys.argv[1]
    broker = sys.argv[2]
    
    watchdog = MediaWatchdog(uuid, broker)
    watchdog.monitor_loop()

Step 3.2: Integrate with Existing Event Handler

File: client/event_handler.py (modify existing)

# When starting a new event, notify watchdog
def play_event(event_data):
    event_type = event_data.get('event_type')
    event_id = event_data.get('id')
    
    if event_type == 'video':
        process_name = 'vlc'
        # Start VLC...
    elif event_type == 'website':
        process_name = 'chromium'
        # Start Chromium...
    elif event_type == 'presentation':
        process_name = 'pdf_viewer'  # or your PDF tool
        # Start PDF viewer...
    
    # Notify watchdog about expected process
    watchdog.current_process = process_name
    watchdog.current_event_id = event_id
    watchdog.restart_attempts = 0

Step 3.3: Enhanced Heartbeat Payload

File: client/heartbeat.py (modify existing)

# Modify existing heartbeat to include process status
def send_heartbeat(mqtt_client, uuid):
    # Get current process status
    current_process = None
    process_pid = None
    process_status = "stopped"
    
    # Check if expected process is running
    if watchdog.current_process:
        pid = watchdog.is_process_running(watchdog.current_process)
        if pid:
            current_process = watchdog.current_process
            process_pid = pid
            process_status = "running"
    
    payload = {
        "uuid": uuid,
        "timestamp": datetime.now(timezone.utc).isoformat(),
        # Existing fields...
        # NEW health fields:
        "current_process": current_process,
        "process_pid": process_pid,
        "process_status": process_status,
        "current_event_id": watchdog.current_event_id
    }
    
    mqtt_client.publish(f"infoscreen/{uuid}/heartbeat", json.dumps(payload))

🎨 Phase 4: Dashboard UI Integration

Status: COMPLETE
Dependencies: Phases 2 & 3 complete
Time estimate: 2-3 hours

Step 4.1: Create Log Viewer Component

File: dashboard/src/ClientLogs.tsx (NEW)

import React from 'react';
import { GridComponent, ColumnsDirective, ColumnDirective, Page, Inject } from '@syncfusion/ej2-react-grids';

interface LogEntry {
  id: number;
  timestamp: string;
  level: 'ERROR' | 'WARN' | 'INFO' | 'DEBUG';
  message: string;
  context: any;
}

interface ClientLogsProps {
  clientUuid: string;
}

export const ClientLogs: React.FC<ClientLogsProps> = ({ clientUuid }) => {
  const [logs, setLogs] = React.useState<LogEntry[]>([]);
  const [loading, setLoading] = React.useState(false);

  const loadLogs = async (level?: string) => {
    setLoading(true);
    try {
      const params = new URLSearchParams({ limit: '50' });
      if (level) params.append('level', level);
      
      const response = await fetch(`/api/client-logs/${clientUuid}/logs?${params}`);
      const data = await response.json();
      setLogs(data.logs);
    } catch (err) {
      console.error('Failed to load logs:', err);
    } finally {
      setLoading(false);
    }
  };

  React.useEffect(() => {
    loadLogs();
    const interval = setInterval(() => loadLogs(), 30000); // Refresh every 30s
    return () => clearInterval(interval);
  }, [clientUuid]);

  const levelTemplate = (props: any) => {
    const colors = {
      ERROR: 'text-red-600 bg-red-100',
      WARN: 'text-yellow-600 bg-yellow-100',
      INFO: 'text-blue-600 bg-blue-100',
      DEBUG: 'text-gray-600 bg-gray-100'
    };
    return (
      <span className={`px-2 py-1 rounded ${colors[props.level as keyof typeof colors]}`}>
        {props.level}
      </span>
    );
  };

  return (
    <div>
      <div className="mb-4 flex gap-2">
        <button onClick={() => loadLogs()} className="e-btn e-primary">All</button>
        <button onClick={() => loadLogs('ERROR')} className="e-btn e-danger">Errors</button>
        <button onClick={() => loadLogs('WARN')} className="e-btn e-warning">Warnings</button>
        <button onClick={() => loadLogs('INFO')} className="e-btn e-info">Info</button>
      </div>

      <GridComponent
        dataSource={logs}
        allowPaging={true}
        pageSettings={{ pageSize: 20 }}
      >
        <ColumnsDirective>
          <ColumnDirective field='timestamp' headerText='Time' width='180' format='yMd HH:mm:ss' />
          <ColumnDirective field='level' headerText='Level' width='100' template={levelTemplate} />
          <ColumnDirective field='message' headerText='Message' width='400' />
        </ColumnsDirective>
        <Inject services={[Page]} />
      </GridComponent>
    </div>
  );
};

Step 4.2: Add Health Indicators to Client Cards

File: dashboard/src/clients.tsx (modify existing)

// Add health indicator to client card
const getHealthBadge = (client: Client) => {
  if (!client.process_status) {
    return <span className="badge badge-secondary">Unknown</span>;
  }
  
  const badges = {
    running: <span className="badge badge-success"> Running</span>,
    crashed: <span className="badge badge-danger"> Crashed</span>,
    starting: <span className="badge badge-warning"> Starting</span>,
    stopped: <span className="badge badge-secondary"> Stopped</span>
  };
  
  return badges[client.process_status] || null;
};

// In client card render:
<div className="client-card">
  <h3>{client.hostname || client.uuid}</h3>
  <div>Status: {getHealthBadge(client)}</div>
  <div>Process: {client.current_process || 'None'}</div>
  <div>Event ID: {client.current_event_id || 'None'}</div>
  <button onClick={() => showLogs(client.uuid)}>View Logs</button>
</div>

Step 4.3: Add System Health Dashboard (Superadmin)

File: dashboard/src/SystemMonitor.tsx (NEW)

import React from 'react';
import { ClientLogs } from './ClientLogs';

export const SystemMonitor: React.FC = () => {
  const [summary, setSummary] = React.useState<any>({});

  const loadSummary = async () => {
    const response = await fetch('/api/client-logs/summary');
    const data = await response.json();
    setSummary(data.summary);
  };

  React.useEffect(() => {
    loadSummary();
    const interval = setInterval(loadSummary, 30000);
    return () => clearInterval(interval);
  }, []);

  return (
    <div className="system-monitor">
      <h2>System Health Monitor (Superadmin)</h2>
      
      <div className="alert-panel">
        <h3>Active Issues</h3>
        {Object.entries(summary).map(([uuid, stats]: [string, any]) => (
          stats.ERROR > 0 || stats.WARN > 5 ? (
            <div key={uuid} className="alert">
              🔴 {uuid}: {stats.ERROR} errors, {stats.WARN} warnings (24h)
            </div>
          ) : null
        ))}
      </div>
      
      {/* Real-time log stream */}
      <div className="log-stream">
        <h3>Recent Logs (All Clients)</h3>
        {/* Implement real-time log aggregation */}
      </div>
    </div>
  );
};

🧪 Phase 5: Testing & Validation

Status: COMPLETE
Dependencies: All previous phases
Time estimate: 1-2 hours

Step 5.1: Server-Side Tests

# Test database migration
cd /workspace/server
alembic upgrade head
alembic downgrade -1
alembic upgrade head

# Test API endpoints
curl -X GET "http://localhost:8000/api/client-logs/<uuid>/logs?limit=10"
curl -X GET "http://localhost:8000/api/client-logs/summary"

Step 5.2: Client-Side Tests

# On client device
python3 watchdog.py <your-uuid> <mqtt-broker-ip>

# Simulate process crash
pkill vlc  # Should trigger error log and restart attempt

# Check MQTT messages
mosquitto_sub -h <broker> -t "infoscreen/+/logs/#" -v
mosquitto_sub -h <broker> -t "infoscreen/+/health" -v

Step 5.3: Dashboard Tests

  1. Open dashboard and navigate to Clients page
  2. Verify health indicators show correct status
  3. Click "View Logs" and verify logs appear
  4. Navigate to System Monitor (superadmin)
  5. Verify summary statistics are correct

📝 Configuration Summary

Environment Variables

Server (docker-compose.yml):

- LOG_RETENTION_DAYS=90  # How long to keep logs
- DEBUG_MODE=true        # Enable INFO level logging via MQTT

Client:

export MQTT_BROKER="your-server-ip"
export CLIENT_UUID="abc-123-def"
export WATCHDOG_ENABLED=true

MQTT Topics Reference

Topic Pattern Direction Purpose
infoscreen/{uuid}/logs/error Client → Server Error messages
infoscreen/{uuid}/logs/warn Client → Server Warning messages
infoscreen/{uuid}/logs/info Client → Server Info (dev only)
infoscreen/{uuid}/health Client → Server Health metrics
infoscreen/{uuid}/heartbeat Client → Server Enhanced heartbeat

Database Tables

client_logs:

  • Stores all centralized logs
  • Indexed by client_uuid, timestamp, level
  • Auto-cleanup after 90 days (recommended)

clients (extended):

  • current_event_id: Which event should be playing
  • current_process: Expected process name
  • process_status: running/crashed/starting/stopped
  • process_pid: Process ID
  • screen_health_status: OK/BLACK/FROZEN/UNKNOWN
  • last_screenshot_analyzed: Last analysis time
  • last_screenshot_hash: For frozen detection

🎯 Next Steps After Implementation

  1. Deploy Phase 1-2 to staging environment
  2. Test with 1-2 pilot clients before full rollout
  3. Monitor traffic & performance (should be minimal)
  4. Fine-tune log levels based on actual noise
  5. Add alerting (email/Slack when errors > threshold)
  6. Implement screenshot analysis (Phase 2 enhancement)
  7. Add trending/analytics (which clients are least reliable)

🚨 Troubleshooting

Logs not appearing in database:

  • Check MQTT broker logs: docker logs infoscreen-mqtt
  • Verify listener subscriptions: Check listener/listener.py logs
  • Test MQTT manually: mosquitto_pub -h broker -t "infoscreen/test/logs/error" -m '{"message":"test"}'

High database growth:

  • Check log_retention cleanup cronjob
  • Reduce INFO level logging frequency
  • Add sampling (log every 10th occurrence instead of all)

Client watchdog not detecting crashes:

  • Verify psutil can see processes: ps aux | grep vlc
  • Check permissions (may need sudo for some process checks)
  • Increase monitor loop frequency for faster detection

Completion Checklist

  • Phase 1: Database migration applied
  • Phase 2: Listener extended for log topics
  • Phase 2: API endpoints created and tested
  • Phase 3: Client watchdog implemented
  • Phase 3: Enhanced heartbeat deployed
  • Phase 4: Dashboard log viewer working
  • Phase 4: Health indicators visible
  • Phase 5: End-to-end testing complete
  • Documentation updated with new features
  • Production deployment plan created

Last Updated: 2026-03-24
Author: GitHub Copilot
For: Infoscreen 2025 Project