feat(tv-power): implement server PR1 with tests and documentation

This commit is contained in:
2026-04-01 08:07:18 +00:00
parent b5f5f30005
commit 3fc7d33e43
15 changed files with 1997 additions and 3 deletions

365
test_power_intent_canary.py Normal file
View File

@@ -0,0 +1,365 @@
#!/usr/bin/env python3
"""
Manual canary validation helper for TV power-intent Phase 1 server publishing.
This script demonstrates expected power-intent payloads and validates the
computation and publishing logic without requiring a full broker connection.
Usage:
python test_power_intent_canary.py
"""
import json
import sys
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
sys.path.insert(0, '/workspace')
from scheduler.db_utils import (
compute_group_power_intent_basis,
build_group_power_intent_body,
compute_group_power_intent_fingerprint,
)
def utc_now():
"""Get UTC now."""
return datetime.now(timezone.utc)
def test_scenario_1_no_active_events():
"""Scenario 1: No active events => OFF intent."""
print("\n" + "="*70)
print("SCENARIO 1: No active events => desired_state=OFF")
print("="*70)
now = utc_now()
events = [] # empty group
group_id = 1
intent_basis = compute_group_power_intent_basis(
events=events,
group_id=group_id,
now_utc=now
)
desired_state = intent_basis["desired_state"]
reason = intent_basis["reason"]
assert desired_state == "off", f"Expected 'off', got '{desired_state}'"
assert reason == "no_active_event", f"Expected reason 'no_active_event', got '{reason}'"
payload_body = build_group_power_intent_body(intent_basis, poll_interval_sec=15)
print(f"✓ Desired state: {desired_state}")
print(f"✓ Reason: {reason}")
print(f"✓ Event window: {intent_basis.get('event_window_start')} to {intent_basis.get('event_window_end')}")
print(f"✓ Payload body (pretty-print):")
print(json.dumps(payload_body, indent=2))
# Validate payload structure
assert "desired_state" in payload_body
assert payload_body["desired_state"] == "off"
assert "group_id" in payload_body
print("✓ Payload structure validated")
def test_scenario_2_single_active_event():
"""Scenario 2: One active event now => ON intent."""
print("\n" + "="*70)
print("SCENARIO 2: One active event now => desired_state=ON")
print("="*70)
now = utc_now()
start = now - timedelta(seconds=60)
end = now + timedelta(seconds=300)
group_id = 1
events = [
{
"id": 101,
"group_id": group_id,
"start": start.isoformat(),
"end": end.isoformat(),
"subject": "Morning Announcement",
"event_type": "message",
}
]
intent_basis = compute_group_power_intent_basis(
events=events,
group_id=group_id,
now_utc=now
)
desired_state = intent_basis["desired_state"]
reason = intent_basis["reason"]
assert desired_state == "on", f"Expected 'on', got '{desired_state}'"
assert reason == "active_event", f"Expected reason 'active_event', got '{reason}'"
payload_body = build_group_power_intent_body(intent_basis, poll_interval_sec=15)
print(f"✓ Desired state: {desired_state}")
print(f"✓ Reason: {reason}")
print(f"✓ event_window_start: {intent_basis.get('event_window_start')}")
print(f"✓ event_window_end: {intent_basis.get('event_window_end')}")
print(f"✓ Payload body (pretty-print):")
print(json.dumps(payload_body, indent=2))
assert payload_body["desired_state"] == "on"
print("✓ Payload structure validated")
def test_scenario_3_adjacent_events_no_off_blip():
"""Scenario 3: Adjacent events (no gap) => no OFF blip between them."""
print("\n" + "="*70)
print("SCENARIO 3: Adjacent events => no OFF between them")
print("="*70)
# Event 1: ends at T+300
# Event 2: starts at T+300 (adjacent, no gap)
base = utc_now()
group_id = 2
events_at_boundary = [
{
"id": 201,
"group_id": group_id,
"start": (base + timedelta(seconds=0)).isoformat(),
"end": (base + timedelta(seconds=300)).isoformat(),
"subject": "Event 1",
"event_type": "presentation",
},
{
"id": 202,
"group_id": group_id,
"start": (base + timedelta(seconds=300)).isoformat(),
"end": (base + timedelta(seconds=600)).isoformat(),
"subject": "Event 2",
"event_type": "presentation",
},
]
# Sample times: before, at boundary, and after
scenarios = [
("Before boundary (Event 1 active)", base + timedelta(seconds=150)),
("At boundary (no gap)", base + timedelta(seconds=300)),
("After boundary (Event 2 active)", base + timedelta(seconds=450)),
]
for label, sample_time in scenarios:
intent_basis = compute_group_power_intent_basis(
events=events_at_boundary,
group_id=group_id,
now_utc=sample_time
)
desired_state = intent_basis["desired_state"]
reason = intent_basis["reason"]
print(f"\n {label}:")
print(f" Desired state: {desired_state}")
print(f" Reason: {reason}")
assert desired_state == "on", f"Expected 'on' at {label}, got '{desired_state}'"
print("\n✓ All boundary times show 'on' => no OFF blip between adjacent events")
def test_scenario_4_gap_between_events():
"""Scenario 4: Gap between events => OFF when not covered."""
print("\n" + "="*70)
print("SCENARIO 4: Gap between events => OFF during gap")
print("="*70)
base = utc_now()
group_id = 3
events_with_gap = [
{
"id": 301,
"group_id": group_id,
"start": (base + timedelta(seconds=0)).isoformat(),
"end": (base + timedelta(seconds=300)).isoformat(),
"subject": "Event 1",
"event_type": "presentation",
},
{
"id": 302,
"group_id": group_id,
"start": (base + timedelta(seconds=600)).isoformat(),
"end": (base + timedelta(seconds=900)).isoformat(),
"subject": "Event 2",
"event_type": "presentation",
},
]
# Sample during gap: T+450 is between end(300) and start(600)
gap_time = base + timedelta(seconds=450)
intent_basis = compute_group_power_intent_basis(
events=events_with_gap,
group_id=group_id,
now_utc=gap_time
)
desired_state = intent_basis["desired_state"]
reason = intent_basis["reason"]
print(f"Sample time: {gap_time.isoformat()}")
print(f"Desired state: {desired_state}")
print(f"Reason: {reason}")
assert desired_state == "off", f"Expected 'off' during gap, got '{desired_state}'"
print("✓ Correctly recognizes gap => OFF")
def test_scenario_5_semantic_fingerprint_stable():
"""Scenario 5: Semantic fingerprint is stable for unchanged state."""
print("\n" + "="*70)
print("SCENARIO 5: Semantic fingerprint stability (transition detection)")
print("="*70)
payload_body_1 = {
"schema_version": "1.0",
"group_id": 5,
"desired_state": "on",
"reason": "active_event",
"poll_interval_sec": 15,
"event_window_start": "2026-03-31T20:15:00Z",
"event_window_end": "2026-03-31T20:20:00Z",
"active_event_ids": [501],
}
payload_body_2 = {
"schema_version": "1.0",
"group_id": 5,
"desired_state": "on",
"reason": "active_event",
"poll_interval_sec": 15,
"event_window_start": "2026-03-31T20:15:00Z",
"event_window_end": "2026-03-31T20:20:00Z",
"active_event_ids": [501],
}
payload_body_3_different = {
"schema_version": "1.0",
"group_id": 5,
"desired_state": "off", # Changed
"reason": "no_active_event",
"poll_interval_sec": 15,
"event_window_start": None,
"event_window_end": None,
"active_event_ids": [],
}
fp1 = compute_group_power_intent_fingerprint(payload_body_1)
fp2 = compute_group_power_intent_fingerprint(payload_body_2)
fp3 = compute_group_power_intent_fingerprint(payload_body_3_different)
print(f"Payload 1 (on, event X): {fp1}")
print(f"Payload 2 (on, same event X): {fp2}")
print(f"Payload 3 (off, no event): {fp3}")
assert fp1 == fp2, "Identical payloads should have same fingerprint"
assert fp1 != fp3, "Different desired_state should have different fingerprint"
print("✓ Fingerprint is stable for same state (no spurious transitions)")
print("✓ Fingerprint changes on semantic transition")
def test_scenario_6_timestamp_format_validation():
"""Scenario 6: Payload body contains window start/end in UTC Z format."""
print("\n" + "="*70)
print("SCENARIO 6: Event window timestamp format validation")
print("="*70)
now = utc_now()
group_id = 6
events = [
{
"id": 601,
"group_id": group_id,
"start": (now - timedelta(seconds=60)).isoformat(),
"end": (now + timedelta(seconds=300)).isoformat(),
"subject": "Event",
"event_type": "message",
}
]
intent_basis = compute_group_power_intent_basis(
events=events,
group_id=group_id,
now_utc=now
)
window_start = intent_basis.get("event_window_start")
window_end = intent_basis.get("event_window_end")
print(f"Event window start: {window_start}")
print(f"Event window end: {window_end}")
if window_start:
assert window_start.endswith("Z"), f"event_window_start must end with Z: {window_start}"
if window_end:
assert window_end.endswith("Z"), f"event_window_end must end with Z: {window_end}"
# Validate they are valid RFC3339 timestamps
try:
if window_start:
dt_start = datetime.fromisoformat(window_start.replace("Z", "+00:00"))
if window_end:
dt_end = datetime.fromisoformat(window_end.replace("Z", "+00:00"))
if window_start:
assert dt_end > dt_start, "window_end must be after window_start"
print("✓ Event window timestamps are valid RFC3339 UTC format with Z suffix")
except Exception as e:
print(f"✗ Timestamp parsing failed: {e}")
raise
def main():
"""Run all scenarios."""
print("\n" + "="*70)
print("TV POWER INTENT PHASE-1 SERVER CANARY VALIDATION")
print("="*70)
print("\nThis script validates server-side power-intent computation logic")
print("without requiring an MQTT broker connection.\n")
try:
test_scenario_1_no_active_events()
test_scenario_2_single_active_event()
test_scenario_3_adjacent_events_no_off_blip()
test_scenario_4_gap_between_events()
test_scenario_5_semantic_fingerprint_stable()
test_scenario_6_timestamp_format_validation()
print("\n" + "="*70)
print("ALL CANARY SCENARIOS PASSED ✓")
print("="*70)
print("\nNext steps for full validation:")
print("1. Enable POWER_INTENT_PUBLISH_ENABLED=true in scheduler")
print("2. Subscribe to infoscreen/groups/+/power/intent in MQTT broker")
print("3. Run scheduler and observe:")
print(" - ON payload on event start")
print(" - Same intent_id across heartbeat republishes")
print(" - OFF payload on event end")
print(" - No OFF blip between adjacent events")
print("4. Restart scheduler and verify immediate ON republish")
print("5. Disconnect MQTT broker and verify reconnect republish")
print("\nSee TV_POWER_CANARY_VALIDATION_CHECKLIST.md for full validation matrix.")
return 0
except AssertionError as e:
print(f"\n✗ VALIDATION FAILED: {e}")
return 1
except Exception as e:
print(f"\n✗ UNEXPECTED ERROR: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())