- Add GET /api/clients/crashed endpoint (process_status=crashed or stale heartbeat) - Add restart_app command action with same lifecycle + lockout as reboot_host - Scheduler: crash auto-recovery loop (CRASH_RECOVERY_ENABLED flag, lockout, MQTT publish) - Scheduler: unconditional command expiry sweep per poll cycle (sweep_expired_commands) - Listener: subscribe to infoscreen/+/service_failed; persist service_failed_at + unit - Listener: extract broker_connection block from health payload; persist reconnect_count + last_disconnect_at - DB migration b1c2d3e4f5a6: service_failed_at, service_failed_unit, mqtt_reconnect_count, mqtt_last_disconnect_at on clients - Add GET /api/clients/service_failed and POST /api/clients/<uuid>/clear_service_failed - Monitoring overview API: include mqtt_reconnect_count + mqtt_last_disconnect_at per client - Frontend: orange service-failed alert panel (hidden when empty, auto-refresh, quittieren action) - Frontend: MQTT reconnect count + last disconnect in client detail panel - MQTT auth hardening: listener/scheduler/server use env credentials; broker enforces allow_anonymous false - Client command lifecycle foundation: ClientCommand model, reboot_host/shutdown_host, full ACK lifecycle - Docs: TECH-CHANGELOG, DEV-CHANGELOG, MQTT_EVENT_PAYLOAD_GUIDE, copilot-instructions updated - Add implementation-plans/, RESTART_VALIDATION_CHECKLIST.md, TODO.md
484 lines
19 KiB
Python
484 lines
19 KiB
Python
# scheduler/scheduler.py
|
|
|
|
import os
|
|
import logging
|
|
from .db_utils import (
|
|
get_active_events,
|
|
get_system_setting_value,
|
|
compute_group_power_intent_basis,
|
|
build_group_power_intent_body,
|
|
compute_group_power_intent_fingerprint,
|
|
get_crash_recovery_candidates,
|
|
issue_crash_recovery_command,
|
|
finalize_crash_recovery_command,
|
|
sweep_expired_commands,
|
|
)
|
|
import paho.mqtt.client as mqtt
|
|
import json
|
|
import datetime
|
|
import time
|
|
import uuid
|
|
import ssl
|
|
|
|
|
|
MQTT_BROKER_HOST = os.getenv("MQTT_BROKER_HOST", os.getenv("MQTT_BROKER_URL", "mqtt"))
|
|
MQTT_BROKER_PORT = int(os.getenv("MQTT_BROKER_PORT", os.getenv("MQTT_PORT", "1883")))
|
|
MQTT_USERNAME = os.getenv("MQTT_USER") or os.getenv("MQTT_USERNAME")
|
|
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD")
|
|
MQTT_TLS_ENABLED = os.getenv("MQTT_TLS_ENABLED", "false").strip().lower() in ("1", "true", "yes", "on")
|
|
MQTT_TLS_CA_CERT = os.getenv("MQTT_TLS_CA_CERT")
|
|
MQTT_TLS_CERTFILE = os.getenv("MQTT_TLS_CERTFILE")
|
|
MQTT_TLS_KEYFILE = os.getenv("MQTT_TLS_KEYFILE")
|
|
MQTT_TLS_INSECURE = os.getenv("MQTT_TLS_INSECURE", "false").strip().lower() in ("1", "true", "yes", "on")
|
|
|
|
|
|
def _to_utc_z(dt: datetime.datetime) -> str:
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=datetime.timezone.utc)
|
|
else:
|
|
dt = dt.astimezone(datetime.timezone.utc)
|
|
return dt.isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def _republish_cached_power_intents(client, last_power_intents, power_intent_metrics):
|
|
if not last_power_intents:
|
|
return
|
|
|
|
logging.info(
|
|
"MQTT reconnect power-intent republish count=%s",
|
|
len(last_power_intents),
|
|
)
|
|
for gid, cached in last_power_intents.items():
|
|
topic = f"infoscreen/groups/{gid}/power/intent"
|
|
client.publish(topic, cached["payload"], qos=1, retain=True)
|
|
power_intent_metrics["retained_republish_total"] += 1
|
|
|
|
|
|
def _publish_group_power_intents(
|
|
client,
|
|
events,
|
|
now,
|
|
poll_interval,
|
|
heartbeat_enabled,
|
|
expiry_multiplier,
|
|
min_expiry_seconds,
|
|
last_power_intents,
|
|
power_intent_metrics,
|
|
):
|
|
expiry_seconds = max(
|
|
expiry_multiplier * poll_interval,
|
|
min_expiry_seconds,
|
|
)
|
|
|
|
candidate_group_ids = set()
|
|
for event in events:
|
|
group_id = event.get("group_id")
|
|
if group_id is None:
|
|
continue
|
|
try:
|
|
candidate_group_ids.add(int(group_id))
|
|
except (TypeError, ValueError):
|
|
continue
|
|
candidate_group_ids.update(last_power_intents.keys())
|
|
|
|
for gid in sorted(candidate_group_ids):
|
|
# Guard: validate group_id is a valid positive integer
|
|
if not isinstance(gid, int) or gid <= 0:
|
|
logging.error(
|
|
"event=power_intent_publish_error reason=invalid_group_id group_id=%s",
|
|
gid,
|
|
)
|
|
continue
|
|
|
|
intent_basis = compute_group_power_intent_basis(
|
|
events=events,
|
|
group_id=gid,
|
|
now_utc=now,
|
|
adjacency_seconds=0,
|
|
)
|
|
intent_body = build_group_power_intent_body(
|
|
intent_basis=intent_basis,
|
|
poll_interval_sec=poll_interval,
|
|
)
|
|
fingerprint = compute_group_power_intent_fingerprint(intent_body)
|
|
previous = last_power_intents.get(gid)
|
|
is_transition_publish = previous is None or previous["fingerprint"] != fingerprint
|
|
is_heartbeat_publish = bool(heartbeat_enabled and not is_transition_publish)
|
|
|
|
if not is_transition_publish and not is_heartbeat_publish:
|
|
continue
|
|
|
|
intent_id = previous["intent_id"] if previous and not is_transition_publish else str(uuid.uuid4())
|
|
|
|
# Guard: validate intent_id is not empty
|
|
if not intent_id or not isinstance(intent_id, str) or len(intent_id.strip()) == 0:
|
|
logging.error(
|
|
"event=power_intent_publish_error group_id=%s reason=invalid_intent_id",
|
|
gid,
|
|
)
|
|
continue
|
|
|
|
issued_at = now
|
|
expires_at = issued_at + datetime.timedelta(seconds=expiry_seconds)
|
|
|
|
# Guard: validate expiry window is positive and issued_at has valid timezone
|
|
if expires_at <= issued_at:
|
|
logging.error(
|
|
"event=power_intent_publish_error group_id=%s reason=invalid_expiry issued_at=%s expires_at=%s",
|
|
gid,
|
|
_to_utc_z(issued_at),
|
|
_to_utc_z(expires_at),
|
|
)
|
|
continue
|
|
|
|
issued_at_str = _to_utc_z(issued_at)
|
|
expires_at_str = _to_utc_z(expires_at)
|
|
|
|
# Guard: ensure Z suffix on timestamps (format validation)
|
|
if not issued_at_str.endswith("Z") or not expires_at_str.endswith("Z"):
|
|
logging.error(
|
|
"event=power_intent_publish_error group_id=%s reason=invalid_timestamp_format issued_at=%s expires_at=%s",
|
|
gid,
|
|
issued_at_str,
|
|
expires_at_str,
|
|
)
|
|
continue
|
|
|
|
payload_dict = {
|
|
**intent_body,
|
|
"intent_id": intent_id,
|
|
"issued_at": issued_at_str,
|
|
"expires_at": expires_at_str,
|
|
}
|
|
|
|
# Guard: ensure payload serialization succeeds before publishing
|
|
try:
|
|
payload = json.dumps(payload_dict, sort_keys=True, separators=(",", ":"))
|
|
except (TypeError, ValueError) as e:
|
|
logging.error(
|
|
"event=power_intent_publish_error group_id=%s reason=payload_serialization_error error=%s",
|
|
gid,
|
|
str(e),
|
|
)
|
|
continue
|
|
|
|
topic = f"infoscreen/groups/{gid}/power/intent"
|
|
|
|
result = client.publish(topic, payload, qos=1, retain=True)
|
|
result.wait_for_publish(timeout=5.0)
|
|
if result.rc != mqtt.MQTT_ERR_SUCCESS:
|
|
power_intent_metrics["publish_error_total"] += 1
|
|
logging.error(
|
|
"event=power_intent_publish_error group_id=%s desired_state=%s intent_id=%s "
|
|
"transition_publish=%s heartbeat_publish=%s topic=%s qos=1 retained=true rc=%s",
|
|
gid,
|
|
payload_dict.get("desired_state"),
|
|
intent_id,
|
|
is_transition_publish,
|
|
is_heartbeat_publish,
|
|
topic,
|
|
result.rc,
|
|
)
|
|
continue
|
|
|
|
last_power_intents[gid] = {
|
|
"fingerprint": fingerprint,
|
|
"intent_id": intent_id,
|
|
"payload": payload,
|
|
}
|
|
if is_transition_publish:
|
|
power_intent_metrics["intent_transitions_total"] += 1
|
|
if is_heartbeat_publish:
|
|
power_intent_metrics["heartbeat_republish_total"] += 1
|
|
power_intent_metrics["publish_success_total"] += 1
|
|
logging.info(
|
|
"event=power_intent_publish group_id=%s desired_state=%s reason=%s intent_id=%s "
|
|
"issued_at=%s expires_at=%s transition_publish=%s heartbeat_publish=%s "
|
|
"topic=%s qos=1 retained=true",
|
|
gid,
|
|
payload_dict.get("desired_state"),
|
|
payload_dict.get("reason"),
|
|
intent_id,
|
|
issued_at_str,
|
|
expires_at_str,
|
|
is_transition_publish,
|
|
is_heartbeat_publish,
|
|
topic,
|
|
)
|
|
|
|
|
|
def _env_bool(name: str, default: bool) -> bool:
|
|
value = os.getenv(name)
|
|
if value is None:
|
|
return default
|
|
return value.strip().lower() in ("1", "true", "yes", "on")
|
|
|
|
# Logging-Konfiguration
|
|
from logging.handlers import RotatingFileHandler
|
|
LOG_PATH = os.path.join(os.path.dirname(__file__), "scheduler.log")
|
|
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
|
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
|
|
log_handlers = [
|
|
RotatingFileHandler(
|
|
LOG_PATH,
|
|
maxBytes=10*1024*1024, # 10 MB
|
|
backupCount=2, # 1 current + 2 backups = 3 files total
|
|
encoding="utf-8"
|
|
)
|
|
]
|
|
if os.getenv("DEBUG_MODE", "0") in ("1", "true", "True"):
|
|
log_handlers.append(logging.StreamHandler())
|
|
logging.basicConfig(
|
|
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=log_handlers
|
|
)
|
|
|
|
|
|
def main():
|
|
# Fix für die veraltete API - explizit callback_api_version setzen
|
|
client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
|
|
client.reconnect_delay_set(min_delay=1, max_delay=30)
|
|
|
|
if MQTT_USERNAME and MQTT_PASSWORD:
|
|
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
|
|
|
|
if MQTT_TLS_ENABLED:
|
|
client.tls_set(
|
|
ca_certs=MQTT_TLS_CA_CERT,
|
|
certfile=MQTT_TLS_CERTFILE,
|
|
keyfile=MQTT_TLS_KEYFILE,
|
|
cert_reqs=ssl.CERT_REQUIRED,
|
|
)
|
|
if MQTT_TLS_INSECURE:
|
|
client.tls_insecure_set(True)
|
|
|
|
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL_SECONDS", "30"))
|
|
# 0 = aus; z.B. 600 für alle 10 Min
|
|
# initial value from DB or fallback to env
|
|
try:
|
|
db_val = get_system_setting_value("refresh_seconds", None)
|
|
REFRESH_SECONDS = int(db_val) if db_val is not None else int(os.getenv("REFRESH_SECONDS", "0"))
|
|
except Exception:
|
|
REFRESH_SECONDS = int(os.getenv("REFRESH_SECONDS", "0"))
|
|
|
|
# TV power intent (PR-1): group-level publishing is feature-flagged and disabled by default.
|
|
POWER_INTENT_PUBLISH_ENABLED = _env_bool("POWER_INTENT_PUBLISH_ENABLED", False)
|
|
POWER_INTENT_HEARTBEAT_ENABLED = _env_bool("POWER_INTENT_HEARTBEAT_ENABLED", True)
|
|
POWER_INTENT_EXPIRY_MULTIPLIER = int(os.getenv("POWER_INTENT_EXPIRY_MULTIPLIER", "3"))
|
|
POWER_INTENT_MIN_EXPIRY_SECONDS = int(os.getenv("POWER_INTENT_MIN_EXPIRY_SECONDS", "90"))
|
|
CRASH_RECOVERY_ENABLED = _env_bool("CRASH_RECOVERY_ENABLED", False)
|
|
CRASH_RECOVERY_GRACE_SECONDS = int(os.getenv("CRASH_RECOVERY_GRACE_SECONDS", "180"))
|
|
|
|
logging.info(
|
|
"Scheduler config: poll_interval=%ss refresh_seconds=%s power_intent_enabled=%s "
|
|
"power_intent_heartbeat=%s power_intent_expiry_multiplier=%s power_intent_min_expiry=%ss "
|
|
"crash_recovery_enabled=%s crash_recovery_grace=%ss",
|
|
POLL_INTERVAL,
|
|
REFRESH_SECONDS,
|
|
POWER_INTENT_PUBLISH_ENABLED,
|
|
POWER_INTENT_HEARTBEAT_ENABLED,
|
|
POWER_INTENT_EXPIRY_MULTIPLIER,
|
|
POWER_INTENT_MIN_EXPIRY_SECONDS,
|
|
CRASH_RECOVERY_ENABLED,
|
|
CRASH_RECOVERY_GRACE_SECONDS,
|
|
)
|
|
# Konfigurierbares Zeitfenster in Tagen (Standard: 7)
|
|
WINDOW_DAYS = int(os.getenv("EVENTS_WINDOW_DAYS", "7"))
|
|
last_payloads = {} # group_id -> payload
|
|
last_published_at = {} # group_id -> epoch seconds
|
|
last_power_intents = {} # group_id -> {fingerprint, intent_id, payload}
|
|
power_intent_metrics = {
|
|
"intent_transitions_total": 0,
|
|
"publish_success_total": 0,
|
|
"publish_error_total": 0,
|
|
"heartbeat_republish_total": 0,
|
|
"retained_republish_total": 0,
|
|
}
|
|
|
|
# Beim (Re-)Connect alle bekannten retained Payloads erneut senden
|
|
def on_connect(client, userdata, flags, reasonCode, properties=None):
|
|
logging.info(
|
|
f"MQTT connected (reasonCode={reasonCode}) - republishing {len(last_payloads)} groups")
|
|
for gid, payload in last_payloads.items():
|
|
topic = f"infoscreen/events/{gid}"
|
|
client.publish(topic, payload, retain=True)
|
|
|
|
if POWER_INTENT_PUBLISH_ENABLED:
|
|
_republish_cached_power_intents(client, last_power_intents, power_intent_metrics)
|
|
|
|
client.on_connect = on_connect
|
|
|
|
client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT)
|
|
client.loop_start()
|
|
logging.info(
|
|
"MQTT connection configured host=%s port=%s tls=%s auth=%s",
|
|
MQTT_BROKER_HOST,
|
|
MQTT_BROKER_PORT,
|
|
MQTT_TLS_ENABLED,
|
|
bool(MQTT_USERNAME and MQTT_PASSWORD),
|
|
)
|
|
|
|
while True:
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
# refresh interval can change at runtime (superadmin settings)
|
|
try:
|
|
db_val = get_system_setting_value("refresh_seconds", None)
|
|
REFRESH_SECONDS = int(db_val) if db_val is not None else REFRESH_SECONDS
|
|
except Exception:
|
|
pass
|
|
# Query window: next N days to capture upcoming events and recurring instances
|
|
# Clients need to know what's coming, not just what's active right now
|
|
end_window = now + datetime.timedelta(days=WINDOW_DAYS)
|
|
logging.debug(f"Fetching events window start={now.isoformat()} end={end_window.isoformat()} (days={WINDOW_DAYS})")
|
|
# Hole alle aktiven Events (bereits formatierte Dictionaries)
|
|
try:
|
|
events = get_active_events(now, end_window)
|
|
logging.debug(f"Fetched {len(events)} events for publishing window")
|
|
except Exception as e:
|
|
logging.exception(f"Error while fetching events: {e}")
|
|
events = []
|
|
|
|
|
|
# Filter: Only include events active at 'now'
|
|
active_events = []
|
|
for event in events:
|
|
start = event.get("start")
|
|
end = event.get("end")
|
|
# Parse ISO strings to datetime
|
|
try:
|
|
start_dt = datetime.datetime.fromisoformat(start)
|
|
end_dt = datetime.datetime.fromisoformat(end)
|
|
# Make both tz-aware (UTC) if naive
|
|
if start_dt.tzinfo is None:
|
|
start_dt = start_dt.replace(tzinfo=datetime.timezone.utc)
|
|
if end_dt.tzinfo is None:
|
|
end_dt = end_dt.replace(tzinfo=datetime.timezone.utc)
|
|
except Exception:
|
|
continue
|
|
if start_dt <= now < end_dt:
|
|
active_events.append(event)
|
|
|
|
# Gruppiere nur aktive Events nach group_id
|
|
groups = {}
|
|
for event in active_events:
|
|
gid = event.get("group_id")
|
|
if gid not in groups:
|
|
groups[gid] = []
|
|
groups[gid].append(event)
|
|
|
|
if not groups:
|
|
logging.debug("No events grouped for any client group in current window")
|
|
|
|
# Sende pro Gruppe die Eventliste als retained Message, nur bei Änderung
|
|
for gid, event_list in groups.items():
|
|
# stabile Reihenfolge, um unnötige Publishes zu vermeiden
|
|
event_list.sort(key=lambda e: (e.get("start"), e.get("id")))
|
|
payload = json.dumps(
|
|
event_list, sort_keys=True, separators=(",", ":"))
|
|
topic = f"infoscreen/events/{gid}"
|
|
|
|
should_send = (last_payloads.get(gid) != payload)
|
|
if not should_send and REFRESH_SECONDS:
|
|
last_ts = last_published_at.get(gid, 0)
|
|
if time.time() - last_ts >= REFRESH_SECONDS:
|
|
should_send = True
|
|
|
|
if should_send:
|
|
result = client.publish(topic, payload, retain=True)
|
|
if result.rc != mqtt.MQTT_ERR_SUCCESS:
|
|
logging.error(
|
|
f"Fehler beim Publish für Gruppe {gid}: {mqtt.error_string(result.rc)}")
|
|
else:
|
|
logging.info(f"Events für Gruppe {gid} gesendet (count={len(event_list)})")
|
|
last_payloads[gid] = payload
|
|
last_published_at[gid] = time.time()
|
|
|
|
# Entferne Gruppen, die nicht mehr existieren (leere retained Message senden)
|
|
inactive_gids = set(last_payloads.keys()) - set(groups.keys())
|
|
for gid in inactive_gids:
|
|
topic = f"infoscreen/events/{gid}"
|
|
result = client.publish(topic, payload="[]", retain=True)
|
|
if result.rc != mqtt.MQTT_ERR_SUCCESS:
|
|
logging.error(
|
|
f"Fehler beim Entfernen für Gruppe {gid}: {mqtt.error_string(result.rc)}")
|
|
else:
|
|
logging.info(
|
|
f"Events für Gruppe {gid} entfernt (leere retained Message gesendet)")
|
|
del last_payloads[gid]
|
|
last_published_at.pop(gid, None)
|
|
|
|
if POWER_INTENT_PUBLISH_ENABLED:
|
|
_publish_group_power_intents(
|
|
client=client,
|
|
events=events,
|
|
now=now,
|
|
poll_interval=POLL_INTERVAL,
|
|
heartbeat_enabled=POWER_INTENT_HEARTBEAT_ENABLED,
|
|
expiry_multiplier=POWER_INTENT_EXPIRY_MULTIPLIER,
|
|
min_expiry_seconds=POWER_INTENT_MIN_EXPIRY_SECONDS,
|
|
last_power_intents=last_power_intents,
|
|
power_intent_metrics=power_intent_metrics,
|
|
)
|
|
|
|
logging.debug(
|
|
"event=power_intent_metrics intent_transitions_total=%s publish_success_total=%s "
|
|
"publish_error_total=%s heartbeat_republish_total=%s retained_republish_total=%s",
|
|
power_intent_metrics["intent_transitions_total"],
|
|
power_intent_metrics["publish_success_total"],
|
|
power_intent_metrics["publish_error_total"],
|
|
power_intent_metrics["heartbeat_republish_total"],
|
|
power_intent_metrics["retained_republish_total"],
|
|
)
|
|
|
|
if CRASH_RECOVERY_ENABLED:
|
|
try:
|
|
candidates = get_crash_recovery_candidates(CRASH_RECOVERY_GRACE_SECONDS)
|
|
if candidates:
|
|
logging.info("event=crash_recovery_scan candidates=%s", len(candidates))
|
|
for candidate in candidates:
|
|
cuuid = candidate["uuid"]
|
|
reason = candidate["reason"]
|
|
try:
|
|
command_id, payload, topic, compat_topic = issue_crash_recovery_command(
|
|
client_uuid=cuuid,
|
|
reason=reason,
|
|
)
|
|
result = client.publish(topic, json.dumps(payload), qos=1, retain=False)
|
|
result.wait_for_publish(timeout=5.0)
|
|
compat_result = client.publish(compat_topic, json.dumps(payload), qos=1, retain=False)
|
|
compat_result.wait_for_publish(timeout=5.0)
|
|
success = result.rc == mqtt.MQTT_ERR_SUCCESS
|
|
error = None if success else mqtt.error_string(result.rc)
|
|
finalize_crash_recovery_command(command_id, published=success, error=error)
|
|
if success:
|
|
logging.info(
|
|
"event=crash_recovery_command_issued client_uuid=%s reason=%s command_id=%s",
|
|
cuuid, reason, command_id,
|
|
)
|
|
else:
|
|
logging.error(
|
|
"event=crash_recovery_publish_failed client_uuid=%s reason=%s command_id=%s error=%s",
|
|
cuuid, reason, command_id, error,
|
|
)
|
|
except Exception as cmd_exc:
|
|
logging.error(
|
|
"event=crash_recovery_command_error client_uuid=%s reason=%s error=%s",
|
|
cuuid, reason, cmd_exc,
|
|
)
|
|
except Exception as scan_exc:
|
|
logging.error("event=crash_recovery_scan_error error=%s", scan_exc)
|
|
|
|
try:
|
|
expired_count = sweep_expired_commands()
|
|
if expired_count:
|
|
logging.info("event=command_expiry_sweep expired=%s", expired_count)
|
|
except Exception as sweep_exc:
|
|
logging.error("event=command_expiry_sweep_error error=%s", sweep_exc)
|
|
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|