docs: archive legacy guides and streamline copilot instructions governance

This commit is contained in:
2026-04-01 08:37:50 +00:00
parent 365d8f58f3
commit 06411edfab
15 changed files with 199 additions and 580 deletions

View File

@@ -0,0 +1,757 @@
# 🚀 Client Monitoring Implementation Guide
**Phase-based implementation guide for basic monitoring in development phase**
---
## ✅ Phase 1: Server-Side Database Foundation
**Status:** ✅ COMPLETE
**Dependencies:** None - Already implemented
**Time estimate:** Completed
### ✅ Step 1.1: Database Migration
**File:** `server/alembic/versions/c1d2e3f4g5h6_add_client_monitoring.py`
**What it does:**
- Creates `client_logs` table for centralized logging
- Adds health monitoring columns to `clients` table
- Creates indexes for efficient querying
**To apply:**
```bash
cd /workspace/server
alembic upgrade head
```
### ✅ Step 1.2: Update Data Models
**File:** `models/models.py`
**What was added:**
- New enums: `LogLevel`, `ProcessStatus`, `ScreenHealthStatus`
- Updated `Client` model with health tracking fields
- New `ClientLog` model for log storage
---
## 🔧 Phase 2: Server-Side Backend Logic
**Status:** ✅ COMPLETE
**Dependencies:** Phase 1 complete
**Time estimate:** 2-3 hours
### Step 2.1: Extend MQTT Listener
**File:** `listener/listener.py`
**What to add:**
```python
# Add new topic subscriptions in on_connect():
client.subscribe("infoscreen/+/logs/error")
client.subscribe("infoscreen/+/logs/warn")
client.subscribe("infoscreen/+/logs/info") # Dev mode only
client.subscribe("infoscreen/+/health")
# Add new handler in on_message():
def handle_log_message(uuid, level, payload):
"""Store client log in database"""
from models.models import ClientLog, LogLevel
from server.database import Session
import json
session = Session()
try:
log_entry = ClientLog(
client_uuid=uuid,
timestamp=payload.get('timestamp', datetime.now(timezone.utc)),
level=LogLevel[level],
message=payload.get('message', ''),
context=json.dumps(payload.get('context', {}))
)
session.add(log_entry)
session.commit()
print(f"[LOG] {uuid} {level}: {payload.get('message', '')}")
except Exception as e:
print(f"Error saving log: {e}")
session.rollback()
finally:
session.close()
def handle_health_message(uuid, payload):
"""Update client health status"""
from models.models import Client, ProcessStatus
from server.database import Session
session = Session()
try:
client = session.query(Client).filter_by(uuid=uuid).first()
if client:
client.current_event_id = payload.get('expected_state', {}).get('event_id')
client.current_process = payload.get('actual_state', {}).get('process')
status_str = payload.get('actual_state', {}).get('status')
if status_str:
client.process_status = ProcessStatus[status_str]
client.process_pid = payload.get('actual_state', {}).get('pid')
session.commit()
except Exception as e:
print(f"Error updating health: {e}")
session.rollback()
finally:
session.close()
```
**Topic routing logic:**
```python
# In on_message callback, add routing:
if topic.endswith('/logs/error'):
handle_log_message(uuid, 'ERROR', payload)
elif topic.endswith('/logs/warn'):
handle_log_message(uuid, 'WARN', payload)
elif topic.endswith('/logs/info'):
handle_log_message(uuid, 'INFO', payload)
elif topic.endswith('/health'):
handle_health_message(uuid, payload)
```
### Step 2.2: Create API Routes
**File:** `server/routes/client_logs.py` (NEW)
```python
from flask import Blueprint, jsonify, request
from server.database import Session
from server.permissions import admin_or_higher
from models.models import ClientLog, Client
from sqlalchemy import desc
import json
client_logs_bp = Blueprint("client_logs", __name__, url_prefix="/api/client-logs")
@client_logs_bp.route("/<uuid>/logs", methods=["GET"])
@admin_or_higher
def get_client_logs(uuid):
"""
Get logs for a specific client
Query params:
- level: ERROR, WARN, INFO, DEBUG (optional)
- limit: number of entries (default 50, max 500)
- since: ISO timestamp (optional)
"""
session = Session()
try:
level = request.args.get('level')
limit = min(int(request.args.get('limit', 50)), 500)
since = request.args.get('since')
query = session.query(ClientLog).filter_by(client_uuid=uuid)
if level:
from models.models import LogLevel
query = query.filter_by(level=LogLevel[level])
if since:
from datetime import datetime
since_dt = datetime.fromisoformat(since.replace('Z', '+00:00'))
query = query.filter(ClientLog.timestamp >= since_dt)
logs = query.order_by(desc(ClientLog.timestamp)).limit(limit).all()
result = []
for log in logs:
result.append({
"id": log.id,
"timestamp": log.timestamp.isoformat() if log.timestamp else None,
"level": log.level.value if log.level else None,
"message": log.message,
"context": json.loads(log.context) if log.context else {}
})
session.close()
return jsonify({"logs": result, "count": len(result)})
except Exception as e:
session.close()
return jsonify({"error": str(e)}), 500
@client_logs_bp.route("/summary", methods=["GET"])
@admin_or_higher
def get_logs_summary():
"""Get summary of errors/warnings across all clients"""
session = Session()
try:
from sqlalchemy import func
from models.models import LogLevel
from datetime import datetime, timedelta
# Last 24 hours
since = datetime.utcnow() - timedelta(hours=24)
stats = session.query(
ClientLog.client_uuid,
ClientLog.level,
func.count(ClientLog.id).label('count')
).filter(
ClientLog.timestamp >= since
).group_by(
ClientLog.client_uuid,
ClientLog.level
).all()
result = {}
for stat in stats:
uuid = stat.client_uuid
if uuid not in result:
result[uuid] = {"ERROR": 0, "WARN": 0, "INFO": 0}
result[uuid][stat.level.value] = stat.count
session.close()
return jsonify({"summary": result, "period_hours": 24})
except Exception as e:
session.close()
return jsonify({"error": str(e)}), 500
```
**Register in `server/wsgi.py`:**
```python
from server.routes.client_logs import client_logs_bp
app.register_blueprint(client_logs_bp)
```
### Step 2.3: Add Health Data to Heartbeat Handler
**File:** `listener/listener.py` (extend existing heartbeat handler)
```python
# Modify existing heartbeat handler to capture health data
def on_message(client, userdata, message):
topic = message.topic
# Existing heartbeat logic...
if '/heartbeat' in topic:
uuid = extract_uuid_from_topic(topic)
try:
payload = json.loads(message.payload.decode())
# Update last_alive (existing)
session = Session()
client_obj = session.query(Client).filter_by(uuid=uuid).first()
if client_obj:
client_obj.last_alive = datetime.now(timezone.utc)
# NEW: Update health data if present in heartbeat
if 'process_status' in payload:
client_obj.process_status = ProcessStatus[payload['process_status']]
if 'current_process' in payload:
client_obj.current_process = payload['current_process']
if 'process_pid' in payload:
client_obj.process_pid = payload['process_pid']
if 'current_event_id' in payload:
client_obj.current_event_id = payload['current_event_id']
session.commit()
session.close()
except Exception as e:
print(f"Error processing heartbeat: {e}")
```
---
## 🖥️ Phase 3: Client-Side Implementation
**Status:** ✅ COMPLETE
**Dependencies:** Phase 2 complete
**Time estimate:** 3-4 hours
### Step 3.1: Create Client Watchdog Script
**File:** `client/watchdog.py` (NEW - on client device)
```python
#!/usr/bin/env python3
"""
Client-side process watchdog
Monitors VLC, Chromium, PDF viewer and reports health to server
"""
import psutil
import paho.mqtt.client as mqtt
import json
import time
from datetime import datetime, timezone
import sys
import os
class MediaWatchdog:
def __init__(self, client_uuid, mqtt_broker, mqtt_port=1883):
self.uuid = client_uuid
self.mqtt_client = mqtt.Client()
self.mqtt_client.connect(mqtt_broker, mqtt_port, 60)
self.mqtt_client.loop_start()
self.current_process = None
self.current_event_id = None
self.restart_attempts = 0
self.MAX_RESTARTS = 3
def send_log(self, level, message, context=None):
"""Send log message to server via MQTT"""
topic = f"infoscreen/{self.uuid}/logs/{level.lower()}"
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"message": message,
"context": context or {}
}
self.mqtt_client.publish(topic, json.dumps(payload), qos=1)
print(f"[{level}] {message}")
def send_health(self, process_name, pid, status, event_id=None):
"""Send health status to server"""
topic = f"infoscreen/{self.uuid}/health"
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"expected_state": {
"event_id": event_id
},
"actual_state": {
"process": process_name,
"pid": pid,
"status": status # 'running', 'crashed', 'starting', 'stopped'
}
}
self.mqtt_client.publish(topic, json.dumps(payload), qos=1, retain=False)
def is_process_running(self, process_name):
"""Check if a process is running"""
for proc in psutil.process_iter(['name', 'pid']):
try:
if process_name.lower() in proc.info['name'].lower():
return proc.info['pid']
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return None
def monitor_loop(self):
"""Main monitoring loop"""
print(f"Watchdog started for client {self.uuid}")
self.send_log("INFO", "Watchdog service started", {"uuid": self.uuid})
while True:
try:
# Check expected process (would be set by main event handler)
if self.current_process:
pid = self.is_process_running(self.current_process)
if pid:
# Process is running
self.send_health(
self.current_process,
pid,
"running",
self.current_event_id
)
self.restart_attempts = 0 # Reset on success
else:
# Process crashed
self.send_log(
"ERROR",
f"Process {self.current_process} crashed or stopped",
{
"event_id": self.current_event_id,
"process": self.current_process,
"restart_attempt": self.restart_attempts
}
)
if self.restart_attempts < self.MAX_RESTARTS:
self.send_log("WARN", f"Attempting restart ({self.restart_attempts + 1}/{self.MAX_RESTARTS})")
self.restart_attempts += 1
# TODO: Implement restart logic (call event handler)
else:
self.send_log("ERROR", "Max restart attempts exceeded", {
"event_id": self.current_event_id
})
time.sleep(5) # Check every 5 seconds
except KeyboardInterrupt:
print("Watchdog stopped by user")
break
except Exception as e:
self.send_log("ERROR", f"Watchdog error: {str(e)}", {
"exception": str(e),
"traceback": str(sys.exc_info())
})
time.sleep(10) # Wait longer on error
if __name__ == "__main__":
import sys
if len(sys.argv) < 3:
print("Usage: python watchdog.py <client_uuid> <mqtt_broker>")
sys.exit(1)
uuid = sys.argv[1]
broker = sys.argv[2]
watchdog = MediaWatchdog(uuid, broker)
watchdog.monitor_loop()
```
### Step 3.2: Integrate with Existing Event Handler
**File:** `client/event_handler.py` (modify existing)
```python
# When starting a new event, notify watchdog
def play_event(event_data):
event_type = event_data.get('event_type')
event_id = event_data.get('id')
if event_type == 'video':
process_name = 'vlc'
# Start VLC...
elif event_type == 'website':
process_name = 'chromium'
# Start Chromium...
elif event_type == 'presentation':
process_name = 'pdf_viewer' # or your PDF tool
# Start PDF viewer...
# Notify watchdog about expected process
watchdog.current_process = process_name
watchdog.current_event_id = event_id
watchdog.restart_attempts = 0
```
### Step 3.3: Enhanced Heartbeat Payload
**File:** `client/heartbeat.py` (modify existing)
```python
# Modify existing heartbeat to include process status
def send_heartbeat(mqtt_client, uuid):
# Get current process status
current_process = None
process_pid = None
process_status = "stopped"
# Check if expected process is running
if watchdog.current_process:
pid = watchdog.is_process_running(watchdog.current_process)
if pid:
current_process = watchdog.current_process
process_pid = pid
process_status = "running"
payload = {
"uuid": uuid,
"timestamp": datetime.now(timezone.utc).isoformat(),
# Existing fields...
# NEW health fields:
"current_process": current_process,
"process_pid": process_pid,
"process_status": process_status,
"current_event_id": watchdog.current_event_id
}
mqtt_client.publish(f"infoscreen/{uuid}/heartbeat", json.dumps(payload))
```
---
## 🎨 Phase 4: Dashboard UI Integration
**Status:** ✅ COMPLETE
**Dependencies:** Phases 2 & 3 complete
**Time estimate:** 2-3 hours
### Step 4.1: Create Log Viewer Component
**File:** `dashboard/src/ClientLogs.tsx` (NEW)
```typescript
import React from 'react';
import { GridComponent, ColumnsDirective, ColumnDirective, Page, Inject } from '@syncfusion/ej2-react-grids';
interface LogEntry {
id: number;
timestamp: string;
level: 'ERROR' | 'WARN' | 'INFO' | 'DEBUG';
message: string;
context: any;
}
interface ClientLogsProps {
clientUuid: string;
}
export const ClientLogs: React.FC<ClientLogsProps> = ({ clientUuid }) => {
const [logs, setLogs] = React.useState<LogEntry[]>([]);
const [loading, setLoading] = React.useState(false);
const loadLogs = async (level?: string) => {
setLoading(true);
try {
const params = new URLSearchParams({ limit: '50' });
if (level) params.append('level', level);
const response = await fetch(`/api/client-logs/${clientUuid}/logs?${params}`);
const data = await response.json();
setLogs(data.logs);
} catch (err) {
console.error('Failed to load logs:', err);
} finally {
setLoading(false);
}
};
React.useEffect(() => {
loadLogs();
const interval = setInterval(() => loadLogs(), 30000); // Refresh every 30s
return () => clearInterval(interval);
}, [clientUuid]);
const levelTemplate = (props: any) => {
const colors = {
ERROR: 'text-red-600 bg-red-100',
WARN: 'text-yellow-600 bg-yellow-100',
INFO: 'text-blue-600 bg-blue-100',
DEBUG: 'text-gray-600 bg-gray-100'
};
return (
<span className={`px-2 py-1 rounded ${colors[props.level as keyof typeof colors]}`}>
{props.level}
</span>
);
};
return (
<div>
<div className="mb-4 flex gap-2">
<button onClick={() => loadLogs()} className="e-btn e-primary">All</button>
<button onClick={() => loadLogs('ERROR')} className="e-btn e-danger">Errors</button>
<button onClick={() => loadLogs('WARN')} className="e-btn e-warning">Warnings</button>
<button onClick={() => loadLogs('INFO')} className="e-btn e-info">Info</button>
</div>
<GridComponent
dataSource={logs}
allowPaging={true}
pageSettings={{ pageSize: 20 }}
>
<ColumnsDirective>
<ColumnDirective field='timestamp' headerText='Time' width='180' format='yMd HH:mm:ss' />
<ColumnDirective field='level' headerText='Level' width='100' template={levelTemplate} />
<ColumnDirective field='message' headerText='Message' width='400' />
</ColumnsDirective>
<Inject services={[Page]} />
</GridComponent>
</div>
);
};
```
### Step 4.2: Add Health Indicators to Client Cards
**File:** `dashboard/src/clients.tsx` (modify existing)
```typescript
// Add health indicator to client card
const getHealthBadge = (client: Client) => {
if (!client.process_status) {
return <span className="badge badge-secondary">Unknown</span>;
}
const badges = {
running: <span className="badge badge-success"> Running</span>,
crashed: <span className="badge badge-danger"> Crashed</span>,
starting: <span className="badge badge-warning"> Starting</span>,
stopped: <span className="badge badge-secondary"> Stopped</span>
};
return badges[client.process_status] || null;
};
// In client card render:
<div className="client-card">
<h3>{client.hostname || client.uuid}</h3>
<div>Status: {getHealthBadge(client)}</div>
<div>Process: {client.current_process || 'None'}</div>
<div>Event ID: {client.current_event_id || 'None'}</div>
<button onClick={() => showLogs(client.uuid)}>View Logs</button>
</div>
```
### Step 4.3: Add System Health Dashboard (Superadmin)
**File:** `dashboard/src/SystemMonitor.tsx` (NEW)
```typescript
import React from 'react';
import { ClientLogs } from './ClientLogs';
export const SystemMonitor: React.FC = () => {
const [summary, setSummary] = React.useState<any>({});
const loadSummary = async () => {
const response = await fetch('/api/client-logs/summary');
const data = await response.json();
setSummary(data.summary);
};
React.useEffect(() => {
loadSummary();
const interval = setInterval(loadSummary, 30000);
return () => clearInterval(interval);
}, []);
return (
<div className="system-monitor">
<h2>System Health Monitor (Superadmin)</h2>
<div className="alert-panel">
<h3>Active Issues</h3>
{Object.entries(summary).map(([uuid, stats]: [string, any]) => (
stats.ERROR > 0 || stats.WARN > 5 ? (
<div key={uuid} className="alert">
🔴 {uuid}: {stats.ERROR} errors, {stats.WARN} warnings (24h)
</div>
) : null
))}
</div>
{/* Real-time log stream */}
<div className="log-stream">
<h3>Recent Logs (All Clients)</h3>
{/* Implement real-time log aggregation */}
</div>
</div>
);
};
```
---
## 🧪 Phase 5: Testing & Validation
**Status:** ✅ COMPLETE
**Dependencies:** All previous phases
**Time estimate:** 1-2 hours
### Step 5.1: Server-Side Tests
```bash
# Test database migration
cd /workspace/server
alembic upgrade head
alembic downgrade -1
alembic upgrade head
# Test API endpoints
curl -X GET "http://localhost:8000/api/client-logs/<uuid>/logs?limit=10"
curl -X GET "http://localhost:8000/api/client-logs/summary"
```
### Step 5.2: Client-Side Tests
```bash
# On client device
python3 watchdog.py <your-uuid> <mqtt-broker-ip>
# Simulate process crash
pkill vlc # Should trigger error log and restart attempt
# Check MQTT messages
mosquitto_sub -h <broker> -t "infoscreen/+/logs/#" -v
mosquitto_sub -h <broker> -t "infoscreen/+/health" -v
```
### Step 5.3: Dashboard Tests
1. Open dashboard and navigate to Clients page
2. Verify health indicators show correct status
3. Click "View Logs" and verify logs appear
4. Navigate to System Monitor (superadmin)
5. Verify summary statistics are correct
---
## 📝 Configuration Summary
### Environment Variables
**Server (docker-compose.yml):**
```yaml
- LOG_RETENTION_DAYS=90 # How long to keep logs
- DEBUG_MODE=true # Enable INFO level logging via MQTT
```
**Client:**
```bash
export MQTT_BROKER="your-server-ip"
export CLIENT_UUID="abc-123-def"
export WATCHDOG_ENABLED=true
```
### MQTT Topics Reference
| Topic Pattern | Direction | Purpose |
|--------------|-----------|---------|
| `infoscreen/{uuid}/logs/error` | Client → Server | Error messages |
| `infoscreen/{uuid}/logs/warn` | Client → Server | Warning messages |
| `infoscreen/{uuid}/logs/info` | Client → Server | Info (dev only) |
| `infoscreen/{uuid}/health` | Client → Server | Health metrics |
| `infoscreen/{uuid}/heartbeat` | Client → Server | Enhanced heartbeat |
### Database Tables
**client_logs:**
- Stores all centralized logs
- Indexed by client_uuid, timestamp, level
- Auto-cleanup after 90 days (recommended)
**clients (extended):**
- `current_event_id`: Which event should be playing
- `current_process`: Expected process name
- `process_status`: running/crashed/starting/stopped
- `process_pid`: Process ID
- `screen_health_status`: OK/BLACK/FROZEN/UNKNOWN
- `last_screenshot_analyzed`: Last analysis time
- `last_screenshot_hash`: For frozen detection
---
## 🎯 Next Steps After Implementation
1. **Deploy Phase 1-2** to staging environment
2. **Test with 1-2 pilot clients** before full rollout
3. **Monitor traffic & performance** (should be minimal)
4. **Fine-tune log levels** based on actual noise
5. **Add alerting** (email/Slack when errors > threshold)
6. **Implement screenshot analysis** (Phase 2 enhancement)
7. **Add trending/analytics** (which clients are least reliable)
---
## 🚨 Troubleshooting
**Logs not appearing in database:**
- Check MQTT broker logs: `docker logs infoscreen-mqtt`
- Verify listener subscriptions: Check `listener/listener.py` logs
- Test MQTT manually: `mosquitto_pub -h broker -t "infoscreen/test/logs/error" -m '{"message":"test"}'`
**High database growth:**
- Check log_retention cleanup cronjob
- Reduce INFO level logging frequency
- Add sampling (log every 10th occurrence instead of all)
**Client watchdog not detecting crashes:**
- Verify psutil can see processes: `ps aux | grep vlc`
- Check permissions (may need sudo for some process checks)
- Increase monitor loop frequency for faster detection
---
## ✅ Completion Checklist
- [x] Phase 1: Database migration applied
- [x] Phase 2: Listener extended for log topics
- [x] Phase 2: API endpoints created and tested
- [x] Phase 3: Client watchdog implemented
- [x] Phase 3: Enhanced heartbeat deployed
- [x] Phase 4: Dashboard log viewer working
- [x] Phase 4: Health indicators visible
- [x] Phase 5: End-to-end testing complete
- [x] Documentation updated with new features
- [x] Production deployment plan created
---
**Last Updated:** 2026-03-24
**Author:** GitHub Copilot
**For:** Infoscreen 2025 Project

View File

@@ -0,0 +1,194 @@
# MQTT Payload Migration Guide
## Purpose
This guide describes a practical migration from the current dashboard screenshot payload to a grouped schema, with client-side implementation first and server-side migration second.
## Scope
- Environment: development and alpha systems (no production installs)
- Message topic: infoscreen/<client_id>/dashboard
- Capture types to preserve: periodic, event_start, event_stop
## Target Schema (v2)
The canonical message should be grouped into four logical blocks in this order:
1. message
2. content
3. runtime
4. metadata
Example shape:
```json
{
"message": {
"client_id": "<uuid>",
"status": "alive"
},
"content": {
"screenshot": {
"filename": "latest.jpg",
"data": "<base64>",
"timestamp": "2026-03-30T10:15:41.123456+00:00",
"size": 183245
}
},
"runtime": {
"system_info": {
"hostname": "pi-display-01",
"ip": "192.168.1.42",
"uptime": 123456.7
},
"process_health": {
"event_id": "evt-123",
"event_type": "presentation",
"current_process": "impressive",
"process_pid": 4123,
"process_status": "running",
"restart_count": 0
}
},
"metadata": {
"schema_version": "2.0",
"producer": "simclient",
"published_at": "2026-03-30T10:15:42.004321+00:00",
"capture": {
"type": "periodic",
"captured_at": "2026-03-30T10:15:41.123456+00:00",
"age_s": 0.9,
"triggered": false,
"send_immediately": false
},
"transport": {
"qos": 0,
"publisher": "simclient"
}
}
}
```
## Step-by-Step: Client-Side First
1. Create a migration branch.
- Example: feature/payload-v2
2. Freeze a baseline sample from MQTT.
- Capture one payload via mosquitto_sub and store it for comparison.
3. Implement one canonical payload builder.
- Centralize JSON assembly in one function only.
- Do not duplicate payload construction across code paths.
4. Add versioned metadata.
- Set metadata.schema_version = "2.0".
- Add metadata.producer = "simclient".
- Add metadata.published_at in UTC ISO format.
5. Map existing data into grouped blocks.
- client_id/status -> message
- screenshot object -> content.screenshot
- system_info/process_health -> runtime
- capture mode and freshness -> metadata.capture
6. Preserve existing capture semantics.
- Keep type values unchanged: periodic, event_start, event_stop.
- Keep UTC ISO timestamps.
- Keep screenshot encoding and size behavior unchanged.
7. Optional short-term compatibility mode (recommended for one sprint).
- Either:
- Keep current legacy fields in parallel, or
- Add a legacy block with old field names.
- Goal: prevent immediate server breakage while parser updates are merged.
8. Improve publish logs for verification.
- Log schema_version, metadata.capture.type, metadata.capture.age_s.
9. Validate all three capture paths end-to-end.
- periodic capture
- event_start trigger capture
- event_stop trigger capture
10. Lock the client contract.
- Save one validated JSON sample per capture type.
- Use those samples in server parser tests.
## Step-by-Step: Server-Side Migration
1. Add support for grouped v2 parsing.
- Parse from message/content/runtime/metadata first.
2. Add fallback parser for legacy payload (temporary).
- If grouped keys are absent, parse old top-level keys.
3. Normalize to one internal server model.
- Convert both parser paths into one DTO/entity used by dashboard logic.
4. Validate required fields.
- Required:
- message.client_id
- message.status
- metadata.schema_version
- metadata.capture.type
- Optional:
- runtime.process_health
- content.screenshot (if no screenshot available)
5. Update dashboard consumers.
- Read grouped fields from internal model (not raw old keys).
6. Add migration observability.
- Counters:
- v2 parse success
- legacy fallback usage
- parse failures
- Warning log for unknown schema_version.
7. Run mixed-format integration tests.
- New client -> new server
- Legacy client -> new server (fallback path)
8. Cut over to v2 preferred.
- Keep fallback for short soak period only.
9. Remove fallback and legacy assumptions.
- After stability window, remove old parser path.
10. Final cleanup.
- Keep one schema doc and test fixtures.
- Remove temporary compatibility switches.
## Legacy to v2 Field Mapping
| Legacy field | v2 field |
|---|---|
| client_id | message.client_id |
| status | message.status |
| screenshot | content.screenshot |
| screenshot_type | metadata.capture.type |
| screenshot_age_s | metadata.capture.age_s |
| timestamp | metadata.published_at |
| system_info | runtime.system_info |
| process_health | runtime.process_health |
## Acceptance Criteria
1. All capture types parse and display correctly.
- periodic
- event_start
- event_stop
2. Screenshot payload integrity is unchanged.
- filename, data, timestamp, size remain valid.
3. Metadata is centrally visible at message end.
- schema_version, capture metadata, transport metadata all inside metadata.
4. No regression in dashboard update timing.
- Triggered screenshots still publish quickly.
## Suggested Timeline (Dev Only)
1. Day 1: client v2 payload implementation + local tests
2. Day 2: server v2 parser + fallback
3. Day 3-5: soak in dev, monitor parse metrics
4. Day 6+: remove fallback and finalize v2-only

View File

@@ -0,0 +1,477 @@
# Recommended Implementation: PPTX-to-PDF Conversion System
## Architecture Overview
**Asynchronous server-side conversion with database tracking**
```
User Upload → API saves PPTX + DB entry → Job in Queue
Client requests → API checks DB status → PDF ready? → Download PDF
→ Pending? → "Please wait"
→ Failed? → Retry/Error
```
## 1. Database Schema
```sql
CREATE TABLE media_files (
id UUID PRIMARY KEY,
filename VARCHAR(255),
original_path VARCHAR(512),
file_type VARCHAR(10),
mime_type VARCHAR(100),
uploaded_at TIMESTAMP,
updated_at TIMESTAMP
);
CREATE TABLE conversions (
id UUID PRIMARY KEY,
source_file_id UUID REFERENCES media_files(id) ON DELETE CASCADE,
target_format VARCHAR(10), -- 'pdf'
target_path VARCHAR(512), -- Path to generated PDF
status VARCHAR(20), -- 'pending', 'processing', 'ready', 'failed'
started_at TIMESTAMP,
completed_at TIMESTAMP,
error_message TEXT,
file_hash VARCHAR(64) -- Hash of PPTX for cache invalidation
);
CREATE INDEX idx_conversions_source ON conversions(source_file_id, target_format);
```
## 2. Components
### **API Server (existing)**
- Accepts uploads
- Creates DB entries
- Enqueues jobs
- Delivers status and files
### **Background Worker (new)**
- Runs as separate process in **same container** as API
- Processes conversion jobs from queue
- Can run multiple worker instances in parallel
- Technology: Python RQ, Celery, or similar
### **Message Queue**
- Redis (recommended for start - simple, fast)
- Alternative: RabbitMQ for more features
### **Redis Container (new)**
- Separate container for Redis
- Handles job queue
- Minimal resource footprint
## 3. Detailed Workflow
### **Upload Process:**
```python
@app.post("/upload")
async def upload_file(file):
# 1. Save PPTX
file_path = save_to_disk(file)
# 2. DB entry for original file
file_record = db.create_media_file({
'filename': file.filename,
'original_path': file_path,
'file_type': 'pptx'
})
# 3. Create conversion record
conversion = db.create_conversion({
'source_file_id': file_record.id,
'target_format': 'pdf',
'status': 'pending',
'file_hash': calculate_hash(file_path)
})
# 4. Enqueue job (asynchronous!)
queue.enqueue(convert_to_pdf, conversion.id)
# 5. Return immediately to user
return {
'file_id': file_record.id,
'status': 'uploaded',
'conversion_status': 'pending'
}
```
### **Worker Process:**
```python
def convert_to_pdf(conversion_id):
conversion = db.get_conversion(conversion_id)
source_file = db.get_media_file(conversion.source_file_id)
# Status update: processing
db.update_conversion(conversion_id, {
'status': 'processing',
'started_at': now()
})
try:
# LibreOffice Conversion
pdf_path = f"/data/converted/{conversion.id}.pdf"
subprocess.run([
'libreoffice',
'--headless',
'--convert-to', 'pdf',
'--outdir', '/data/converted/',
source_file.original_path
], check=True)
# Success
db.update_conversion(conversion_id, {
'status': 'ready',
'target_path': pdf_path,
'completed_at': now()
})
except Exception as e:
# Error
db.update_conversion(conversion_id, {
'status': 'failed',
'error_message': str(e),
'completed_at': now()
})
```
### **Client Download:**
```python
@app.get("/files/{file_id}/display")
async def get_display_file(file_id):
file = db.get_media_file(file_id)
# Only for PPTX: check PDF conversion
if file.file_type == 'pptx':
conversion = db.get_latest_conversion(file.id, target_format='pdf')
if not conversion:
# Shouldn't happen, but just to be safe
trigger_new_conversion(file.id)
return {'status': 'pending', 'message': 'Conversion is being created'}
if conversion.status == 'ready':
return FileResponse(conversion.target_path)
elif conversion.status == 'failed':
# Optional: Auto-retry
trigger_new_conversion(file.id)
return {'status': 'failed', 'error': conversion.error_message}
else: # pending or processing
return {'status': conversion.status, 'message': 'Please wait...'}
# Serve other file types directly
return FileResponse(file.original_path)
```
## 4. Docker Setup
```yaml
version: '3.8'
services:
# Your API Server
api:
build: ./api
command: uvicorn main:app --host 0.0.0.0 --port 8000
ports:
- "8000:8000"
volumes:
- ./data/uploads:/data/uploads
- ./data/converted:/data/converted
environment:
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://postgres:password@postgres:5432/infoscreen
depends_on:
- redis
- postgres
restart: unless-stopped
# Worker (same codebase as API, different command)
worker:
build: ./api # Same build as API!
command: python worker.py # or: rq worker
volumes:
- ./data/uploads:/data/uploads
- ./data/converted:/data/converted
environment:
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://postgres:password@postgres:5432/infoscreen
depends_on:
- redis
- postgres
restart: unless-stopped
# Optional: Multiple workers
deploy:
replicas: 2
# Redis - separate container
redis:
image: redis:7-alpine
volumes:
- redis-data:/data
# Optional: persistent configuration
command: redis-server --appendonly yes
restart: unless-stopped
# Your existing Postgres
postgres:
image: postgres:15
environment:
- POSTGRES_DB=infoscreen
- POSTGRES_PASSWORD=password
volumes:
- postgres-data:/var/lib/postgresql/data
restart: unless-stopped
# Optional: Redis Commander (UI for debugging)
redis-commander:
image: rediscommander/redis-commander
environment:
- REDIS_HOSTS=local:redis:6379
ports:
- "8081:8081"
depends_on:
- redis
volumes:
redis-data:
postgres-data:
```
## 5. Container Communication
Containers communicate via **Docker's internal network**:
```python
# In your API/Worker code:
import redis
# Connection to Redis
redis_client = redis.from_url('redis://redis:6379')
# ^^^^^^
# Container name = hostname in Docker network
```
Docker automatically creates DNS entries, so `redis` resolves to the Redis container.
## 6. Client Behavior (Pi5)
```python
# On the Pi5 client
def display_file(file_id):
response = api.get(f"/files/{file_id}/display")
if response.content_type == 'application/pdf':
# PDF is ready
download_and_display(response)
subprocess.run(['impressive', downloaded_pdf])
elif response.json()['status'] in ['pending', 'processing']:
# Wait and retry
show_loading_screen("Presentation is being prepared...")
time.sleep(5)
display_file(file_id) # Retry
else:
# Error
show_error_screen("Error loading presentation")
```
## 7. Additional Features
### **Cache Invalidation on PPTX Update:**
```python
@app.put("/files/{file_id}")
async def update_file(file_id, new_file):
# Delete old conversions
db.mark_conversions_as_obsolete(file_id)
# Update file
update_media_file(file_id, new_file)
# Trigger new conversion
trigger_conversion(file_id, 'pdf')
```
### **Status API for Monitoring:**
```python
@app.get("/admin/conversions/status")
async def get_conversion_stats():
return {
'pending': db.count(status='pending'),
'processing': db.count(status='processing'),
'failed': db.count(status='failed'),
'avg_duration_seconds': db.avg_duration()
}
```
### **Cleanup Job (Cronjob):**
```python
def cleanup_old_conversions():
# Remove PDFs from deleted files
db.delete_orphaned_conversions()
# Clean up old failed conversions
db.delete_old_failed_conversions(older_than_days=7)
```
## 8. Redis Container Details
### **Why Separate Container?**
**Separation of Concerns**: Each service has its own responsibility
**Independent Lifecycle Management**: Redis can be restarted/updated independently
**Better Scaling**: Redis can be moved to different hardware
**Easier Backup**: Redis data can be backed up separately
**Standard Docker Pattern**: Microservices architecture
### **Resource Usage:**
- RAM: ~10-50 MB for your use case
- CPU: Minimal
- Disk: Only for persistence (optional)
For 10 clients with occasional PPTX uploads, this is absolutely no problem.
## 9. Advantages of This Solution
**Scalable**: Workers can be scaled horizontally
**Performant**: Clients don't wait for conversion
**Robust**: Status tracking and error handling
**Maintainable**: Clear separation of responsibilities
**Transparent**: Status queryable at any time
**Efficient**: One-time conversion per file
**Future-proof**: Easily extensible for other formats
**Professional**: Industry-standard architecture
## 10. Migration Path
### **Phase 1 (MVP):**
- 1 worker process in API container
- Redis for queue (separate container)
- Basic DB schema
- Simple retry logic
### **Phase 2 (as needed):**
- Multiple worker instances
- Dedicated conversion service container
- Monitoring & alerting
- Prioritization logic
- Advanced caching strategies
**Start simple, scale when needed!**
## 11. Key Decisions Summary
| Aspect | Decision | Reason |
|--------|----------|--------|
| **Conversion Location** | Server-side | One conversion per file, consistent results |
| **Conversion Timing** | Asynchronous (on upload) | No client waiting time, predictable performance |
| **Data Storage** | Database-tracked | Status visibility, robust error handling |
| **Queue System** | Redis (separate container) | Standard pattern, scalable, maintainable |
| **Worker Architecture** | Background process in API container | Simple start, easy to separate later |
## 12. File Flow Diagram
```
┌─────────────┐
│ User Upload │
│ (PPTX) │
└──────┬──────┘
┌──────────────────┐
│ API Server │
│ 1. Save PPTX │
│ 2. Create DB rec │
│ 3. Enqueue job │
└──────┬───────────┘
┌──────────────────┐
│ Redis Queue │◄─────┐
└──────┬───────────┘ │
│ │
▼ │
┌──────────────────┐ │
│ Worker Process │ │
│ 1. Get job │ │
│ 2. Convert PPTX │ │
│ 3. Update DB │ │
└──────┬───────────┘ │
│ │
▼ │
┌──────────────────┐ │
│ PDF Storage │ │
└──────┬───────────┘ │
│ │
▼ │
┌──────────────────┐ │
│ Client Requests │ │
│ 1. Check DB │ │
│ 2. Download PDF │ │
│ 3. Display │──────┘
└──────────────────┘
(via impressive)
```
## 13. Implementation Checklist
### Database Setup
- [ ] Create `media_files` table
- [ ] Create `conversions` table
- [ ] Add indexes for performance
- [ ] Set up foreign key constraints
### API Changes
- [ ] Modify upload endpoint to create DB records
- [ ] Add conversion job enqueueing
- [ ] Implement file download endpoint with status checking
- [ ] Add status API for monitoring
- [ ] Implement cache invalidation on file update
### Worker Setup
- [ ] Create worker script/module
- [ ] Implement LibreOffice conversion logic
- [ ] Add error handling and retry logic
- [ ] Set up logging and monitoring
### Docker Configuration
- [ ] Add Redis container to docker-compose.yml
- [ ] Configure worker container
- [ ] Set up volume mounts for file storage
- [ ] Configure environment variables
- [ ] Set up container dependencies
### Client Updates
- [ ] Modify client to check conversion status
- [ ] Implement retry logic for pending conversions
- [ ] Add loading/waiting screens
- [ ] Implement error handling
### Testing
- [ ] Test upload → conversion → download flow
- [ ] Test multiple concurrent conversions
- [ ] Test error handling (corrupted PPTX, etc.)
- [ ] Test cache invalidation on file update
- [ ] Load test with multiple clients
### Monitoring & Operations
- [ ] Set up logging for conversions
- [ ] Implement cleanup job for old files
- [ ] Add metrics for conversion times
- [ ] Set up alerts for failed conversions
- [ ] Document backup procedures
---
**This architecture provides a solid foundation that's simple to start with but scales professionally as your needs grow!**

View File

@@ -0,0 +1,193 @@
# TV Power Coordination Canary Validation Checklist (Phase 1)
Manual verification checklist for Phase-1 server-side group-level power-intent publishing before production rollout.
## Preconditions
- Scheduler running with `POWER_INTENT_PUBLISH_ENABLED=true`
- One canary group selected for testing (example: group_id=1)
- Mosquitto broker running and accessible
- Database with seeded test data (canary group with events)
## Validation Scenarios
### 1. Baseline Payload Structure
**Goal**: Retained topic shows correct Phase-1 contract.
Instructions:
1. Subscribe to `infoscreen/groups/1/power/intent` (canary group, QoS 1)
2. Verify received payload contains:
- `schema_version: "1.0"`
- `group_id: 1`
- `desired_state: "on"` or `"off"` (string)
- `reason: "active_event"` or `"no_active_event"` (string)
- `intent_id: "<uuid>"` (not empty, valid UUID v4 format)
- `issued_at: "2026-03-31T14:22:15Z"` (ISO 8601 with Z suffix)
- `expires_at: "2026-03-31T14:24:00Z"` (ISO 8601 with Z suffix, always > issued_at)
- `poll_interval_sec: 30` (integer, matches scheduler poll interval)
- `active_event_ids: [...]` (array; empty when off)
- `event_window_start: "...Z"` or `null`
- `event_window_end: "...Z"` or `null`
**Pass criteria**: All fields present, correct types and formats, no extra/malformed fields.
### 2. Event Start Transition
**Goal**: ON intent published immediately when event becomes active.
Instructions:
1. Create an event for canary group starting 2 minutes from now
2. Wait for event start time
3. Check retained topic immediately after event start
4. Verify `desired_state: "on"` and `reason: "active_event"`
5. Note the `intent_id` value
**Pass criteria**:
- `desired_state: "on"` appears within 30 seconds of event start
- No OFF in between (if a prior OFF existed)
### 3. Event End Transition
**Goal**: OFF intent published when last active event ends.
Instructions:
1. In setup from Scenario 2, wait for the event to end (< 5 min duration)
2. Check retained topic after end time
3. Verify `desired_state: "off"` and `reason: "no_active_event"`
**Pass criteria**:
- `desired_state: "off"` appears within 30 seconds of event end
- New `intent_id` generated (different from Scenario 2)
### 4. Adjacent Events (No OFF Blip)
**Goal**: When one event ends and next starts immediately after, no OFF is published.
Instructions:
1. Create two consecutive events for canary group, each 3 minutes:
- Event A: 14:00-14:03
- Event B: 14:03-14:06
2. Watch retained topic through both event boundaries
3. Capture all `desired_state` changes
**Pass criteria**:
- `desired_state: "on"` throughout both events
- No OFF at 14:03 (boundary between them)
- One or two transitions total (transition at A start only, or at A start + semantic change reasons)
### 5. Heartbeat Republish (Unchanged Intent)
**Goal**: Intent republishes each poll cycle with same intent_id if state unchanged.
Instructions:
1. Create a long-duration event (15+ minutes) for canary group
2. Subscribe to power intent topic
3. Capture timestamps and intent_ids for 3 consecutive poll cycles (90 seconds with default 30s polls)
4. Verify:
- Payload received at T, T+30s, T+60s
- Same `intent_id` across all three
- Different `issued_at` timestamps (should increment by ~30s)
**Pass criteria**:
- At least 3 payloads received within ~90 seconds
- Same `intent_id` for all
- Each `issued_at` is later than previous
- Each `expires_at` is 90 seconds after its `issued_at`
### 6. Scheduler Restart (Immediate Republish)
**Goal**: On scheduler process start, immediate published active intent.
Instructions:
1. Create and start an event for canary group (duration ≥ 5 minutes)
2. Wait for event to be active
3. Kill and restart scheduler process
4. Check retained topic within 5 seconds after restart
5. Verify `desired_state: "on"` and `reason: "active_event"`
**Pass criteria**:
- Correct ON intent retained within 5 seconds of restart
- No OFF published during restart/reconnect
### 7. Broker Reconnection (Retained Recovery)
**Goal**: On MQTT reconnect, scheduler republishes cached intents.
Instructions:
1. Create and start an event for canary group
2. Subscribe to power intent topic
3. Note the current `intent_id` and payload
4. Restart Mosquitto broker (simulates network interruption)
5. Verify retained topic is immediately republished after reconnect
**Pass criteria**:
- Correct ON intent reappears on retained topic within 5 seconds of broker restart
- Same `intent_id` (no new transition UUID)
- Publish metrics show `retained_republish_total` incremented
### 8. Feature Flag Disable
**Goal**: No power-intent publishes when feature disabled.
Instructions:
1. Set `POWER_INTENT_PUBLISH_ENABLED=false` in scheduler env
2. Restart scheduler
3. Create and start a new event for canary group
4. Subscribe to power intent topic
5. Wait 90 seconds
**Pass criteria**:
- No messages on `infoscreen/groups/1/power/intent`
- Scheduler logs show no `event=power_intent_publish*` lines
### 9. Scheduler Logs Inspection
**Goal**: Logs contain structured fields for observability.
Instructions:
1. Run canary with one active event
2. Collect scheduler logs for 60 seconds
3. Filter for `event=power_intent_publish` lines
**Pass criteria**:
- Each log line contains: `group_id`, `desired_state`, `reason`, `intent_id`, `issued_at`, `expires_at`, `transition_publish`, `heartbeat_publish`, `topic`, `qos`, `retained`
- No malformed JSON in payloads
- Error logs (if any) are specific and actionable
### 10. Expiry Validation
**Goal**: Payloads never published with `expires_at <= issued_at`.
Instructions:
1. Capture power-intent payloads for 120+ seconds
2. Parse `issued_at` and `expires_at` for each
3. Verify `expires_at > issued_at` for all
**Pass criteria**:
- All 100% of payloads have valid expiry window
- Typical delta is 90 seconds (min expiry)
## Summary Report Template
After running all scenarios, capture:
```
Canary Validation Report
Date: [date]
Scheduler version: [git commit hash]
Test group ID: [id]
Environment: [dev/test/prod]
Scenario Results:
1. Baseline Payload: ✓/✗ [notes]
2. Event Start: ✓/✗ [notes]
3. Event End: ✓/✗ [notes]
4. Adjacent Events: ✓/✗ [notes]
5. Heartbeat Republish: ✓/✗ [notes]
6. Restart: ✓/✗ [notes]
7. Broker Reconnect: ✓/✗ [notes]
8. Feature Flag: ✓/✗ [notes]
9. Logs: ✓/✗ [notes]
10. Expiry Validation: ✓/✗ [notes]
Overall: [Ready for production / Blockers found]
Issues: [list if any]
```
## Rollout Gate
Power-intent Phase 1 is ready for production rollout only when:
- All 10 scenarios pass
- Zero unintended OFF between adjacent events
- All log fields present and correct
- Feature flag default remains `false`
- Transition latency <= 30 seconds nominal case

View File

@@ -0,0 +1,214 @@
# TV Power Coordination Task List (Server + Client)
## Goal
Prevent unintended TV power-off during adjacent events while enabling coordinated, server-driven power intent via MQTT with robust client-side fallback.
## Scope
- Server publishes explicit TV power intent and event-window context.
- Client executes HDMI-CEC power actions with timer-safe behavior.
- Client falls back to local schedule/end-time logic if server intent is missing or stale.
- Existing event playback behavior remains backward compatible.
## Ownership Proposal
- Server team: Scheduler integration, power-intent publisher, reliability semantics.
- Client team: MQTT handler, state machine, CEC execution, fallback and observability.
## Server PR-1 Pointer
- For the strict, agreed server-first implementation path, use:
- `TV_POWER_PHASE_1_IMPLEMENTATION_CHECKLIST.md`
- Treat that checklist as the execution source of truth for Phase 1.
---
## 1. MQTT Contract (Shared Spec)
Phase-1 scope note:
- Group-level power intent is the only active server contract in Phase 1.
- Per-client power intent and client power state topics are deferred to Phase 2.
### 1.1 Topics
- Command/intent topic (retained):
- infoscreen/groups/{group_id}/power/intent
Phase-2 (deferred):
- Optional per-client command/intent topic (retained):
- infoscreen/{client_id}/power/intent
- Client state/ack topic:
- infoscreen/{client_id}/power/state
### 1.2 QoS and retain
- intent topics: QoS 1, retained=true
- state topic: QoS 0 or 1 (recommend QoS 0 initially), retained=false (Phase 2)
### 1.3 Intent payload schema (v1)
```json
{
"schema_version": "1.0",
"intent_id": "uuid-or-monotonic-id",
"group_id": 12,
"desired_state": "on",
"reason": "active_event",
"issued_at": "2026-03-31T12:00:00Z",
"expires_at": "2026-03-31T12:01:30Z",
"poll_interval_sec": 15,
"event_window_start": "2026-03-31T12:00:00Z",
"event_window_end": "2026-03-31T13:00:00Z",
"source": "scheduler"
}
```
### 1.4 State payload schema (client -> server)
Phase-2 (deferred).
```json
{
"schema_version": "1.0",
"intent_id": "last-applied-intent-id",
"client_id": "...",
"reported_at": "2026-03-31T12:00:01Z",
"power": {
"applied_state": "on",
"source": "mqtt_intent|local_fallback",
"result": "ok|skipped|error",
"detail": "free text"
}
}
```
### 1.5 Idempotency and ordering rules
- Client applies only newest valid intent by issued_at then intent_id tie-break.
- Duplicate intent_id must be ignored after first successful apply.
- Expired intents must not trigger new actions.
- Retained intent must be immediately usable after client reconnect.
### 1.6 Safety rules
- desired_state=on cancels any pending delayed-off timer before action.
- desired_state=off may schedule delayed-off, never immediate off during an active event window.
- If payload is malformed, client logs and ignores it.
---
## 2. Server Team Task List
### 2.1 Contract + scheduler mapping
- Finalize field names and UTC timestamp format with client team.
- Define when scheduler emits on/off intents for adjacent/overlapping events.
- Ensure contiguous events produce uninterrupted desired_state=on coverage.
### 2.2 Publisher implementation
- Add publisher for infoscreen/groups/{group_id}/power/intent.
- Support retained messages and QoS 1.
- Include expires_at based on scheduler poll interval (`max(3 x poll, 90s)`).
- Emit new intent_id only for semantic state transitions.
### 2.3 Reconnect and replay behavior
- On scheduler restart, republish current effective intent as retained.
- On event edits/cancellations, publish replacement retained intent.
### 2.4 Conflict policy
- Phase 1: not applicable (group-only intent).
- Phase 2: define precedence when both group and per-client intents exist.
- Recommended for Phase 2: per-client overrides group intent.
### 2.5 Monitoring and diagnostics
- Record publish attempts, broker ack results, and active retained payload.
- Add operational dashboard panels for intent age and last transition.
### 2.6 Server acceptance criteria
- Adjacent event windows do not produce off intent between events.
- Reconnect test: fresh client receives retained intent and powers correctly.
- Expired intent is never acted on by a conforming client.
---
## 3. Client Team Task List
### 3.1 MQTT subscription + parsing
- Phase 1: Subscribe to infoscreen/groups/{group_id}/power/intent.
- Phase 2 (optional): Subscribe to infoscreen/{client_id}/power/intent for per-device overrides.
- Parse schema_version=1.0 payload with strict validation.
### 3.2 Power state controller integration
- Add power-intent handler in display manager path that owns HDMI-CEC decisions.
- On desired_state=on:
- cancel delayed-off timer
- call CEC on only if needed
- On desired_state=off:
- schedule delayed off using configured grace_seconds (or local default)
- re-check active event before executing off
### 3.3 Fallback behavior (critical)
- If MQTT unreachable, intent missing, invalid, or expired:
- fall back to existing local event-time logic
- use event end as off trigger with existing delayed-off safety
- If local logic sees active event, enforce cancel of pending off timer.
### 3.4 Adjacent-event race hardening
- Guarantee pending off timer is canceled on any newly active event.
- Ensure event switch path never requests off while next event is active.
- Add explicit logging for timer create/cancel/fire with reason and event_id.
### 3.5 State publishing
- Publish apply results to infoscreen/{client_id}/power/state.
- Include source=mqtt_intent or local_fallback.
- Include last intent_id and result details for troubleshooting.
### 3.6 Config flags
- Add feature toggle:
- POWER_CONTROL_MODE=local|mqtt|hybrid (recommend default: hybrid)
- hybrid behavior:
- prefer valid mqtt intent
- automatically fall back to local logic
### 3.7 Client acceptance criteria
- Adjacent events: no unintended off between two active windows.
- Broker outage during event: TV remains on via local fallback.
- Broker recovery: retained intent reconciles state without oscillation.
- Duplicate/old intents do not cause repeated CEC toggles.
---
## 4. Integration Test Matrix (Joint)
## 4.1 Happy paths
- Single event start -> on intent -> TV on.
- Event end -> off intent -> delayed off -> TV off.
- Adjacent events (end==start or small gap) -> uninterrupted TV on.
## 4.2 Failure paths
- Broker down before event start.
- Broker down during active event.
- Malformed retained intent at reconnect.
- Delayed off armed, then new event starts before timer fires.
## 4.3 Consistency checks
- Client state topic reflects actual applied source and result.
- Logs include intent_id correlation across server and client.
---
## 5. Rollout Plan
### Phase 1: Contract and feature flags
- Freeze schema and topic naming for group-only intent.
- Ship client support behind POWER_CONTROL_MODE=hybrid.
### Phase 2: Server publisher rollout
- Enable publishing for test group only.
- Verify retained and reconnect behavior.
### Phase 3: Production enablement
- Enable hybrid mode fleet-wide.
- Observe for 1 week: off-between-adjacent-events incidents must be zero.
### Phase 4: Optional tightening
- If metrics are stable, evaluate mqtt-first policy while retaining local safety fallback.
---
## 6. Definition of Done
- Shared MQTT contract approved by both teams.
- Server and client implementations merged with tests.
- Adjacent-event regression test added and passing.
- Operational runbook updated (topics, payloads, fallback behavior, troubleshooting).
- Production monitoring confirms no unintended mid-schedule TV power-off.

View File

@@ -0,0 +1,199 @@
# TV Power Coordination - Server PR-1 Implementation Checklist
Last updated: 2026-03-31
Scope: Server-side, group-only intent publishing, no client-state ingestion in this phase.
## Agreed Phase-1 Defaults
- Scope: Group-level intent only (no per-client intent).
- Poll source of truth: Scheduler poll interval.
- Publish mode: Hybrid (transition publish + heartbeat republish every poll).
- Expiry rule: `expires_at = issued_at + max(3 x poll_interval, 90s)`.
- State ingestion/acknowledgments: Deferred to Phase 2.
- Initial latency target: nominal <= 15s, worst-case <= 30s from schedule boundary.
## PR-1 Strict Checklist
### 1) Contract Freeze (docs first, hard gate)
- [x] Freeze v1 topic: `infoscreen/groups/{group_id}/power/intent`.
- [x] Freeze QoS: `1`.
- [x] Freeze retained flag: `true`.
- [x] Freeze mandatory payload fields:
- [x] `schema_version`
- [x] `intent_id`
- [x] `group_id`
- [x] `desired_state`
- [x] `reason`
- [x] `issued_at`
- [x] `expires_at`
- [x] `poll_interval_sec`
- [x] Freeze optional observability fields:
- [x] `event_window_start`
- [x] `event_window_end`
- [x] `source` (value: `scheduler`)
- [x] Add one ON example and one OFF example using UTC timestamps with `Z` suffix.
- [x] Add explicit precedence note: Phase 1 publishes only group intent.
### 2) Scheduler Configuration
- [x] Add env toggle: `POWER_INTENT_PUBLISH_ENABLED` (default `false`).
- [x] Add env toggle: `POWER_INTENT_HEARTBEAT_ENABLED` (default `true`).
- [x] Add env: `POWER_INTENT_EXPIRY_MULTIPLIER` (default `3`).
- [x] Add env: `POWER_INTENT_MIN_EXPIRY_SECONDS` (default `90`).
- [x] Add env reason defaults:
- [x] `POWER_INTENT_REASON_ACTIVE=active_event`
- [x] `POWER_INTENT_REASON_IDLE=no_active_event`
### 3) Deterministic Computation Layer (pure functions)
- [x] Add helper to compute effective desired state per group at `now_utc`.
- [x] Add helper to compute event window around `now` (for observability).
- [x] Add helper to build deterministic payload body (excluding volatile timestamps).
- [x] Add helper to compute semantic fingerprint for transition detection.
### 4) Transition + Heartbeat Semantics
- [x] Create new `intent_id` only on semantic transition:
- [x] desired state changes, or
- [x] reason changes, or
- [x] event window changes materially.
- [x] Keep `intent_id` stable for unchanged heartbeat republishes.
- [x] Refresh `issued_at` + `expires_at` on every heartbeat publish.
- [x] Guarantee UTC serialization with `Z` suffix for all intent timestamps.
### 5) MQTT Publishing Integration
- [x] Integrate power-intent publish in scheduler loop (per group, per cycle).
- [x] On transition: publish immediately.
- [x] On unchanged cycle and heartbeat enabled: republish unchanged intent.
- [x] Use QoS 1 and retained true for all intent publishes.
- [x] Wait for publish completion/ack and log result.
### 6) In-Memory Cache + Recovery
- [x] Cache last known intent state per `group_id`:
- [x] semantic fingerprint
- [x] current `intent_id`
- [x] last payload
- [x] last publish timestamp
- [x] On scheduler start: compute and publish current intents immediately.
- [x] On MQTT reconnect: republish cached retained intents immediately.
### 7) Safety Guards
- [x] Do not publish when `expires_at <= issued_at`.
- [x] Do not publish malformed payloads.
- [x] Skip invalid/missing group target and emit error log.
- [x] Ensure no OFF blip between adjacent/overlapping active windows.
### 8) Observability
- [x] Add structured log event for intent publish with:
- [x] `group_id`
- [x] `desired_state`
- [x] `reason`
- [x] `intent_id`
- [x] `issued_at`
- [x] `expires_at`
- [x] `heartbeat_publish` (bool)
- [x] `transition_publish` (bool)
- [x] `mqtt_topic`
- [x] `qos`
- [x] `retained`
- [x] publish result code/status
### 9) Testing (must-have)
- [x] Unit tests for computation:
- [x] no events => OFF
- [x] active event => ON
- [x] overlapping events => continuous ON
- [x] adjacent events (`end == next start`) => no OFF gap
- [x] true gap => OFF only outside coverage
- [x] recurrence-expanded active event => ON
- [x] fingerprint stability for unchanged semantics
- [x] Integration tests for publishing:
- [x] transition triggers new `intent_id`
- [x] unchanged cycle heartbeat keeps same `intent_id`
- [x] startup immediate publish
- [x] reconnect retained republish
- [x] expiry formula follows `max(3 x poll, 90s)`
- [x] feature flag disabled => zero power-intent publishes
### 10) Rollout Controls
- [x] Keep feature default OFF for first deploy.
- [x] Document canary strategy (single group first).
- [x] Define progression gates (single group -> partial fleet -> full fleet).
### 11) Manual Verification Matrix
- [x] Event start boundary -> ON publish appears (validation logic proven via canary script).
- [x] Event end boundary -> OFF publish appears (validation logic proven via canary script).
- [x] Adjacent events -> no OFF between windows (validation logic proven via canary script).
- [x] Scheduler restart during active event -> immediate ON retained republish (integration test coverage).
- [x] Broker reconnect -> retained republish converges correctly (integration test coverage).
### 12) PR-1 Acceptance Gate (all required)
- [x] Unit and integration tests pass. (8 tests, all green)
- [x] No malformed payloads in logs. (safety guards in place)
- [x] No unintended OFF in adjacent/overlapping scenarios. (proven in canary scenarios 3, 4)
- [x] Feature flag default remains OFF. (verified in scheduler defaults)
- [x] Documentation updated in same PR. (MQTT guide, README, AI maintenance, canary checklist)
## Suggested Low-Risk PR Split
1. PR-A: Contract and docs only.
2. PR-B: Pure computation helpers + unit tests.
3. PR-C: Scheduler publishing integration + reconnect/startup behavior + integration tests.
4. PR-D: Rollout toggles, canary notes, hardening.
## Notes for Future Sessions
- This checklist is the source of truth for Server PR-1.
- If implementation details evolve, update this file first before code changes.
- Keep payload examples and env defaults synchronized with scheduler behavior and deployment docs.
---
## Implementation Completion Summary (31 March 2026)
All PR-1 server-side items are complete. Below is a summary of deliverables:
### Code Changes
- **scheduler/scheduler.py**: Added power-intent configuration, publishing loop integration, in-memory cache, reconnect republish recovery, metrics counters.
- **scheduler/db_utils.py**: Added 4 pure computation helpers (basis, body builder, fingerprint, UTC parser/normalizer).
- **scheduler/test_power_intent_utils.py**: 5 unit tests covering computation logic and boundary cases.
- **scheduler/test_power_intent_scheduler.py**: 3 integration tests covering transition, heartbeat, and reconnect semantics.
### Documentation Changes
- **MQTT_EVENT_PAYLOAD_GUIDE.md**: Phase-1 group-only power-intent contract with schema, topic, QoS, retained flag, and ON/OFF examples.
- **README.md**: Added scheduler runtime configuration section with power-intent env vars and Phase-1 publish mode summary.
- **AI-INSTRUCTIONS-MAINTENANCE.md**: Added scheduler maintenance notes for power-intent semantics and Phase-2 deferral.
- **TV_POWER_PHASE_1_CANARY_VALIDATION.md**: 10-scenario manual validation matrix for operators.
- **TV_POWER_PHASE_1_IMPLEMENTATION_CHECKLIST.md**: This file; source of truth for PR-1 scope and acceptance criteria.
### Validation Artifacts
- **test_power_intent_canary.py**: Standalone canary validation script demonstrating 6 critical scenarios without broker dependency. All scenarios pass.
### Test Results
- Unit tests (db_utils): 5 passed
- Integration tests (scheduler): 3 passed
- Canary validation scenarios: 6 passed
- Total: 14/14 tests passed, 0 failures
### Feature Flag Status
- `POWER_INTENT_PUBLISH_ENABLED` defaults to `false` (feature off by default for safe first deploy)
- `POWER_INTENT_HEARTBEAT_ENABLED` defaults to `true` (heartbeat republish enabled when feature is on)
- All other power-intent env vars have safe defaults matching Phase-1 contract
### Branch
- Current branch: `feat/tv-power-server-pr1`
- Ready for PR review and merge pending acceptance gate sign-off
### Next Phase
- Phase 2 (deferred): Per-client override intent, client state acknowledgments, listener persistence of state
- Canary rollout strategy documented in `TV_POWER_PHASE_1_CANARY_VALIDATION.md`

View File

@@ -0,0 +1,56 @@
# Server Handoff: TV Power Coordination
## Status
Server PR-1 is implemented and merged (Phase 1).
## Source of Truth
- Contract: TV_POWER_INTENT_SERVER_CONTRACT_V1.md
- Implementation: scheduler/scheduler.py and scheduler/db_utils.py
- Validation checklist: TV_POWER_PHASE_1_CANARY_VALIDATION.md
## Active Phase 1 Scope
- Topic: infoscreen/groups/{group_id}/power/intent
- QoS: 1
- Retained: true
- Scope: group-level only
- Per-client intent/state topics: deferred to Phase 2
## Publish Semantics (Implemented)
- Semantic transition (`desired_state` or `reason` changed): new `intent_id` and immediate publish
- Heartbeat (no semantic change): same `intent_id`, refreshed `issued_at` and `expires_at`
- Scheduler startup: immediate publish before first poll wait
- MQTT reconnect: immediate retained republish of cached intents
## Payload Contract (Phase 1)
```json
{
"schema_version": "1.0",
"intent_id": "uuid4",
"group_id": 12,
"desired_state": "on",
"reason": "active_event",
"issued_at": "2026-04-01T06:00:03.496Z",
"expires_at": "2026-04-01T06:01:33.496Z",
"poll_interval_sec": 15,
"active_event_ids": [148],
"event_window_start": "2026-04-01T06:00:00Z",
"event_window_end": "2026-04-01T07:00:00Z"
}
```
Expiry rule:
- expires_at = issued_at + max(3 x poll_interval_sec, 90 seconds)
## Operational Notes
- Adjacent/overlapping events are merged into one active coverage window; no OFF blip at boundaries.
- Feature flag defaults are safe for rollout:
- POWER_INTENT_PUBLISH_ENABLED=false
- POWER_INTENT_HEARTBEAT_ENABLED=true
- POWER_INTENT_EXPIRY_MULTIPLIER=3
- POWER_INTENT_MIN_EXPIRY_SECONDS=90
- Keep this handoff concise and defer full details to the stable contract document.
## Phase 2 (Deferred)
- Per-client override topic: infoscreen/{client_uuid}/power/intent
- Client power state topic and acknowledgments
- Listener persistence of client-level power state