import json import unittest from datetime import datetime, timedelta, timezone from scheduler.scheduler import ( _publish_group_power_intents, _republish_cached_power_intents, ) class _FakePublishResult: def __init__(self, rc=0): self.rc = rc self.wait_timeout = None def wait_for_publish(self, timeout=None): self.wait_timeout = timeout class _FakeMqttClient: def __init__(self, rc=0): self.rc = rc self.calls = [] def publish(self, topic, payload, qos=0, retain=False): result = _FakePublishResult(rc=self.rc) self.calls.append( { "topic": topic, "payload": payload, "qos": qos, "retain": retain, "result": result, } ) return result class PowerIntentSchedulerTests(unittest.TestCase): def test_transition_then_heartbeat_reuses_intent_id(self): client = _FakeMqttClient(rc=0) last_power_intents = {} metrics = { "intent_transitions_total": 0, "publish_success_total": 0, "publish_error_total": 0, "heartbeat_republish_total": 0, "retained_republish_total": 0, } events = [ { "id": 101, "group_id": 12, "start": "2026-03-31T10:00:00+00:00", "end": "2026-03-31T10:30:00+00:00", } ] now_first = datetime(2026, 3, 31, 10, 5, 0, tzinfo=timezone.utc) _publish_group_power_intents( client=client, events=events, now=now_first, poll_interval=15, heartbeat_enabled=True, expiry_multiplier=3, min_expiry_seconds=90, last_power_intents=last_power_intents, power_intent_metrics=metrics, ) first_payload = json.loads(client.calls[0]["payload"]) first_intent_id = first_payload["intent_id"] now_second = now_first + timedelta(seconds=15) _publish_group_power_intents( client=client, events=events, now=now_second, poll_interval=15, heartbeat_enabled=True, expiry_multiplier=3, min_expiry_seconds=90, last_power_intents=last_power_intents, power_intent_metrics=metrics, ) self.assertEqual(len(client.calls), 2) second_payload = json.loads(client.calls[1]["payload"]) self.assertEqual(first_payload["desired_state"], "on") self.assertEqual(second_payload["desired_state"], "on") self.assertEqual(first_intent_id, second_payload["intent_id"]) self.assertEqual(client.calls[0]["topic"], "infoscreen/groups/12/power/intent") self.assertEqual(client.calls[0]["qos"], 1) self.assertTrue(client.calls[0]["retain"]) self.assertEqual(metrics["intent_transitions_total"], 1) self.assertEqual(metrics["heartbeat_republish_total"], 1) self.assertEqual(metrics["publish_success_total"], 2) self.assertEqual(metrics["publish_error_total"], 0) def test_state_change_creates_new_intent_id(self): client = _FakeMqttClient(rc=0) last_power_intents = {} metrics = { "intent_transitions_total": 0, "publish_success_total": 0, "publish_error_total": 0, "heartbeat_republish_total": 0, "retained_republish_total": 0, } events_on = [ { "id": 88, "group_id": 3, "start": "2026-03-31T10:00:00+00:00", "end": "2026-03-31T10:30:00+00:00", } ] now_on = datetime(2026, 3, 31, 10, 5, 0, tzinfo=timezone.utc) _publish_group_power_intents( client=client, events=events_on, now=now_on, poll_interval=15, heartbeat_enabled=True, expiry_multiplier=3, min_expiry_seconds=90, last_power_intents=last_power_intents, power_intent_metrics=metrics, ) first_payload = json.loads(client.calls[0]["payload"]) events_off = [ { "id": 88, "group_id": 3, "start": "2026-03-31T10:00:00+00:00", "end": "2026-03-31T10:30:00+00:00", } ] now_off = datetime(2026, 3, 31, 10, 35, 0, tzinfo=timezone.utc) _publish_group_power_intents( client=client, events=events_off, now=now_off, poll_interval=15, heartbeat_enabled=True, expiry_multiplier=3, min_expiry_seconds=90, last_power_intents=last_power_intents, power_intent_metrics=metrics, ) second_payload = json.loads(client.calls[1]["payload"]) self.assertNotEqual(first_payload["intent_id"], second_payload["intent_id"]) self.assertEqual(second_payload["desired_state"], "off") self.assertEqual(metrics["intent_transitions_total"], 2) def test_republish_cached_power_intents(self): client = _FakeMqttClient(rc=0) metrics = { "intent_transitions_total": 0, "publish_success_total": 0, "publish_error_total": 0, "heartbeat_republish_total": 0, "retained_republish_total": 0, } cache = { 5: { "fingerprint": "abc", "intent_id": "intent-1", "payload": '{"group_id":5,"desired_state":"on"}', } } _republish_cached_power_intents(client, cache, metrics) self.assertEqual(len(client.calls), 1) self.assertEqual(client.calls[0]["topic"], "infoscreen/groups/5/power/intent") self.assertEqual(client.calls[0]["qos"], 1) self.assertTrue(client.calls[0]["retain"]) self.assertEqual(metrics["retained_republish_total"], 1) if __name__ == "__main__": unittest.main()