Files
infoscreen-dev/src/simclient.py
RobbStarkAustria 0cd0d95612 feat: remote commands, systemd units, process observability, broker auth split
- Command intake (reboot/shutdown) on infoscreen/{uuid}/commands with ack lifecycle
- MQTT_USER/MQTT_PASSWORD_BROKER split from identity vars; configure_mqtt_security() updated
- infoscreen-simclient.service: Type=notify, WatchdogSec=60, Restart=on-failure
- infoscreen-notify-failure@.service + script: retained MQTT alert when systemd gives up (Gap 3)
- _sd_notify() watchdog keepalive in simclient main loop (Gap 1)
- broker_connection block in health payload: reconnect_count, last_disconnect_at (Gap 2)
- COMMAND_MOCK_REBOOT_IMMEDIATE_COMPLETE canary flag with safety guard
- SERVER_TEAM_ACTIONS.md: server-side integration action items
- Docs: README, CHANGELOG, src/README, copilot-instructions updated
- 43 tests passing
2026-04-05 08:36:50 +02:00

2075 lines
78 KiB
Python

# simclient/simclient.py
from logging.handlers import RotatingFileHandler
import time
import uuid
import json
import socket
import hashlib
import paho.mqtt.client as mqtt
import ssl
import os
import shutil
import re
import platform
import logging
import subprocess
from dotenv import load_dotenv
import requests
import base64
from datetime import datetime, timezone, timedelta
import threading
from urllib.parse import urlsplit, urlunsplit, unquote
def _sd_notify(msg: str) -> None:
"""Send a sd_notify message to systemd via NOTIFY_SOCKET.
Uses raw socket so no extra package (systemd-python) is required.
Safe to call when not running under systemd (NOTIFY_SOCKET unset).
"""
sock_path = os.environ.get("NOTIFY_SOCKET")
if not sock_path:
return
try:
addr = sock_path.lstrip("@") # abstract namespace sockets start with '@'
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as s:
s.connect(addr)
s.sendall(msg.encode())
except Exception:
pass # never crash the app over a watchdog notification
# ENV laden - support both container and native development
env_paths = [
"/workspace/simclient/.env", # Container path
os.path.join(os.path.dirname(__file__), ".env"), # Same directory
os.path.join(os.path.expanduser("~"), "infoscreen-dev", ".env"), # Development path
]
for env_path in env_paths:
if os.path.exists(env_path):
load_dotenv(env_path)
break
def _env_int(name, default):
"""Parse an int from environment variable, tolerating inline comments.
Examples:
- "10 # seconds" -> 10
- " 300ms" -> 300
- invalid or empty -> default
"""
raw = os.getenv(name)
if raw is None or str(raw).strip() == "":
return default
try:
# Remove inline comments
sanitized = str(raw).split('#', 1)[0].strip()
# Extract first integer occurrence
m = re.search(r"-?\d+", sanitized)
if m:
return int(m.group(0))
except Exception:
pass
return default
def _env_bool(name, default=False):
raw = os.getenv(name)
if raw is None:
return default
return str(raw).strip().lower() in ("1", "true", "yes", "on")
def _env_host(name, default):
"""Parse a hostname/IP from env, stripping inline comments and whitespace.
Example: "192.168.1.10 # comment" -> "192.168.1.10"
"""
raw = os.getenv(name)
if raw is None:
return default
# Remove inline comments and extra spaces
sanitized = str(raw).split('#', 1)[0].strip()
# If any whitespace remains, take the first token as host
if not sanitized:
return default
return sanitized.split()[0]
def _env_str_clean(name, default=""):
"""Parse a generic string from env, removing inline comments and trimming.
Returns the first whitespace-delimited token to avoid accidental comment tails.
"""
raw = os.getenv(name)
if raw is None:
return default
sanitized = str(raw).split('#', 1)[0].strip()
if not sanitized:
return default
return sanitized.split()[0]
# Konfiguration aus ENV
ENV = os.getenv("ENV", "development")
HEARTBEAT_INTERVAL = _env_int("HEARTBEAT_INTERVAL", 5 if ENV == "development" else 60)
SCREENSHOT_INTERVAL = _env_int("SCREENSHOT_INTERVAL", 30 if ENV == "development" else 300)
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG" if ENV == "development" else "INFO")
# Default to localhost in development, 'mqtt' (Docker compose service) otherwise
MQTT_BROKER = _env_host("MQTT_BROKER", "localhost" if ENV == "development" else "mqtt")
MQTT_PORT = _env_int("MQTT_PORT", 1883)
DEBUG_MODE = _env_bool("DEBUG_MODE", ENV == "development")
MQTT_USER = _env_str_clean("MQTT_USER", "")
MQTT_PASSWORD_BROKER = _env_str_clean("MQTT_PASSWORD_BROKER", "")
MQTT_USERNAME = _env_str_clean("MQTT_USERNAME", "")
MQTT_PASSWORD = _env_str_clean("MQTT_PASSWORD", "")
MQTT_TLS_CA_CERT = _env_str_clean("MQTT_TLS_CA_CERT", "")
MQTT_TLS_CERT = _env_str_clean("MQTT_TLS_CERT", "")
MQTT_TLS_KEY = _env_str_clean("MQTT_TLS_KEY", "")
MQTT_TLS_INSECURE = _env_bool("MQTT_TLS_INSECURE", False)
MQTT_TLS_ENABLED = _env_bool(
"MQTT_TLS_ENABLED",
bool(MQTT_TLS_CA_CERT or MQTT_TLS_CERT or MQTT_TLS_KEY),
)
MQTT_BROKER_FALLBACKS = []
_fallbacks_raw = os.getenv("MQTT_BROKER_FALLBACKS", "")
if _fallbacks_raw:
for item in _fallbacks_raw.split(","):
host = item.split('#', 1)[0].strip()
if host:
# Only take the first whitespace-delimited token
MQTT_BROKER_FALLBACKS.append(host.split()[0])
# File server/API configuration
# Defaults: use same host as MQTT broker, port 8000, http scheme
FILE_SERVER_BASE_URL = _env_str_clean("FILE_SERVER_BASE_URL", "")
_scheme_raw = _env_str_clean("FILE_SERVER_SCHEME", "http").lower()
FILE_SERVER_SCHEME = _scheme_raw if _scheme_raw in ("http", "https") else "http"
FILE_SERVER_HOST = _env_host("FILE_SERVER_HOST", MQTT_BROKER)
FILE_SERVER_PORT = _env_int("FILE_SERVER_PORT", 8000)
# Logging-Konfiguration
LOG_PATH = os.path.join(os.path.dirname(__file__), "simclient.log")
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
log_handlers = []
log_handlers.append(RotatingFileHandler(
LOG_PATH, maxBytes=2*1024*1024, backupCount=5, encoding="utf-8"))
if DEBUG_MODE:
log_handlers.append(logging.StreamHandler())
logging.basicConfig(
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
format="%(asctime)s.%(msecs)03dZ [%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
handlers=log_handlers
)
# Force all logging timestamps to UTC (affects %(asctime)s in all formatters).
logging.Formatter.converter = time.gmtime
# Setup monitoring logger (separate for health/crash events, local rotation only)
MONITORING_LOG_PATH = os.path.join(os.path.dirname(__file__), "..", "logs", "monitoring.log")
os.makedirs(os.path.dirname(MONITORING_LOG_PATH), exist_ok=True)
monitoring_logger = logging.getLogger("monitoring")
monitoring_logger.setLevel(getattr(logging, LOG_LEVEL.upper(), logging.INFO))
monitoring_handler = RotatingFileHandler(MONITORING_LOG_PATH, maxBytes=5*1024*1024, backupCount=5)
monitoring_handler.setFormatter(logging.Formatter("%(asctime)s.%(msecs)03dZ [%(levelname)s] %(message)s", "%Y-%m-%dT%H:%M:%S"))
monitoring_logger.addHandler(monitoring_handler)
monitoring_logger.propagate = False # Don't duplicate to main logger
logging.info(f"Monitoring logger initialized: {MONITORING_LOG_PATH}")
# Health state file (written by display_manager, read by simclient)
HEALTH_STATE_FILE = os.path.join(os.path.dirname(__file__), "current_process_health.json")
CLIENT_SETTINGS_FILE = os.path.join(os.path.dirname(__file__), "config", "client_settings.json")
# Screenshot IPC (written by display_manager, polled by simclient)
SCREENSHOT_DIR = os.path.join(os.path.dirname(__file__), "screenshots")
SCREENSHOT_META_FILE = os.path.join(SCREENSHOT_DIR, "meta.json")
POWER_CONTROL_MODE = os.getenv("POWER_CONTROL_MODE", "local").strip().lower()
POWER_INTENT_STATE_FILE = os.path.join(os.path.dirname(__file__), "power_intent_state.json")
POWER_STATE_FILE = os.path.join(os.path.dirname(__file__), "power_state.json")
COMMAND_STATE_DIR = os.path.join(os.path.dirname(__file__), "config")
PROCESSED_COMMANDS_FILE = os.path.join(COMMAND_STATE_DIR, "processed_commands.json")
LAST_COMMAND_STATE_FILE = os.path.join(COMMAND_STATE_DIR, "last_command_state.json")
COMMAND_HELPER_PATH = os.getenv("COMMAND_HELPER_PATH", "/usr/local/bin/infoscreen-cmd-helper.sh")
COMMAND_EXEC_TIMEOUT_SEC = _env_int("COMMAND_EXEC_TIMEOUT_SEC", 15)
COMMAND_DEDUPE_TTL_HOURS = _env_int("COMMAND_DEDUPE_TTL_HOURS", 24)
COMMAND_DEDUPE_MAX_ENTRIES = _env_int("COMMAND_DEDUPE_MAX_ENTRIES", 5000)
COMMAND_MOCK_REBOOT_IMMEDIATE_COMPLETE = _env_bool("COMMAND_MOCK_REBOOT_IMMEDIATE_COMPLETE", False)
NIL_COMMAND_ID = "00000000-0000-0000-0000-000000000000"
COMMAND_ACTIONS = ("reboot_host", "shutdown_host")
ACK_STATUSES = ("accepted", "execution_started", "completed", "failed")
COMMAND_ERROR_CODES = {
"invalid_schema",
"missing_field",
"stale_command",
"duplicate_command",
"permission_denied_local",
"execution_timeout",
"execution_failed",
"broker_unavailable",
"internal_error",
}
def command_requires_recovery_completion(action):
return action == "reboot_host"
def command_mock_reboot_immediate_complete_enabled(action):
if action != "reboot_host" or not COMMAND_MOCK_REBOOT_IMMEDIATE_COMPLETE:
return False
helper_basename = os.path.basename((COMMAND_HELPER_PATH or "").strip())
if helper_basename == "mock-command-helper.sh":
return True
logging.warning(
"Ignoring COMMAND_MOCK_REBOOT_IMMEDIATE_COMPLETE because helper is not mock: %s",
COMMAND_HELPER_PATH,
)
return False
discovered = False
def save_event_to_json(event_data):
"""Speichert eine Event-Nachricht in der Datei current_event.json
This function preserves ALL fields from the incoming event data,
including scheduler-specific fields like:
- page_progress: Current page/slide progress tracking
- auto_progress: Auto-progression state
- And any other fields sent by the scheduler
"""
try:
json_path = os.path.join(os.path.dirname(__file__), "current_event.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(event_data, f, ensure_ascii=False, indent=2)
logging.info(f"Event message saved to {json_path}")
# Log if scheduler-specific fields are present
if isinstance(event_data, list):
for idx, event in enumerate(event_data):
if isinstance(event, dict):
if 'page_progress' in event:
logging.debug(f"Event {idx}: page_progress = {event['page_progress']}")
if 'auto_progress' in event:
logging.debug(f"Event {idx}: auto_progress = {event['auto_progress']}")
elif isinstance(event_data, dict):
if 'page_progress' in event_data:
logging.debug(f"Event page_progress = {event_data['page_progress']}")
if 'auto_progress' in event_data:
logging.debug(f"Event auto_progress = {event_data['auto_progress']}")
except Exception as e:
logging.error(f"Error saving event message: {e}")
def delete_event_file():
"""Löscht die current_event.json Datei wenn kein Event aktiv ist"""
try:
json_path = os.path.join(os.path.dirname(__file__), "current_event.json")
if os.path.exists(json_path):
# Copy to last_event.json first so we keep a record of the last event
try:
last_path = os.path.join(os.path.dirname(__file__), "last_event.json")
# Use atomic replace: write to temp then replace
tmp_path = last_path + ".tmp"
shutil.copyfile(json_path, tmp_path)
os.replace(tmp_path, last_path)
logging.info(f"Copied {json_path} to {last_path} (last event)")
except Exception as e:
logging.warning(f"Could not copy current_event.json to last_event.json: {e}")
os.remove(json_path)
logging.info(f"Event file {json_path} deleted - no active event")
except Exception as e:
logging.error(f"Error deleting event file: {e}")
def is_empty_event(event_data):
"""Prüft ob eine Event-Nachricht bedeutet, dass kein Event aktiv ist"""
if event_data is None:
return True
# Verschiedene Möglichkeiten für "kein Event":
# 1. Leeres Dictionary
if not event_data:
return True
# 2. Explizite "null" oder "empty" Werte
if isinstance(event_data, dict):
# Event ist null/None
if event_data.get("event") is None or event_data.get("event") == "null":
return True
# Event ist explizit als "empty" oder "none" markiert
if str(event_data.get("event", "")).lower() in ["empty", "none", ""]:
return True
# Status zeigt an dass kein Event aktiv ist
status = str(event_data.get("status", "")).lower()
if status in ["inactive", "none", "empty", "cleared"]:
return True
# 3. String-basierte Events
if isinstance(event_data, str) and event_data.lower() in ["null", "none", "empty", ""]:
return True
return False
def _parse_utc_iso(value: str):
"""Parse ISO8601 timestamp with optional trailing Z into UTC-aware datetime."""
if not isinstance(value, str) or not value.strip():
raise ValueError("timestamp must be a non-empty string")
normalized = value.strip()
if normalized.endswith('Z'):
normalized = normalized[:-1] + '+00:00'
dt = datetime.fromisoformat(normalized)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def validate_power_intent_payload(payload, expected_group_id=None):
"""Validate frozen TV power intent contract v1 payload.
Returns tuple: (is_valid, result_dict, error_message)
"""
if not isinstance(payload, dict):
return False, None, "payload must be an object"
required_fields = (
"schema_version", "intent_id", "group_id", "desired_state", "reason",
"issued_at", "expires_at", "poll_interval_sec", "active_event_ids",
"event_window_start", "event_window_end"
)
for field in required_fields:
if field not in payload:
return False, None, f"missing required field: {field}"
if payload.get("schema_version") != "1.0":
return False, None, f"unsupported schema_version: {payload.get('schema_version')}"
desired_state = payload.get("desired_state")
if desired_state not in ("on", "off"):
return False, None, f"invalid desired_state: {desired_state}"
reason = payload.get("reason")
if reason not in ("active_event", "no_active_event"):
return False, None, f"invalid reason: {reason}"
intent_id = payload.get("intent_id")
if not isinstance(intent_id, str) or not intent_id.strip():
return False, None, "intent_id must be a non-empty string"
try:
group_id = int(payload.get("group_id"))
except Exception:
return False, None, f"invalid group_id: {payload.get('group_id')}"
if expected_group_id is not None:
try:
expected_group_id_int = int(expected_group_id)
except Exception:
expected_group_id_int = None
if expected_group_id_int is not None and expected_group_id_int != group_id:
return False, None, f"group_id mismatch: payload={group_id} expected={expected_group_id_int}"
try:
issued_at = _parse_utc_iso(payload.get("issued_at"))
expires_at = _parse_utc_iso(payload.get("expires_at"))
except Exception as e:
return False, None, f"invalid timestamp: {e}"
if expires_at <= issued_at:
return False, None, "expires_at must be later than issued_at"
if datetime.now(timezone.utc) > expires_at:
return False, None, "intent expired"
try:
poll_interval_sec = int(payload.get("poll_interval_sec"))
except Exception:
return False, None, f"invalid poll_interval_sec: {payload.get('poll_interval_sec')}"
if poll_interval_sec <= 0:
return False, None, "poll_interval_sec must be > 0"
active_event_ids = payload.get("active_event_ids")
if not isinstance(active_event_ids, list):
return False, None, "active_event_ids must be a list"
normalized_event_ids = []
for item in active_event_ids:
try:
normalized_event_ids.append(int(item))
except Exception:
return False, None, f"invalid active_event_id value: {item}"
for field in ("event_window_start", "event_window_end"):
value = payload.get(field)
if value is not None:
try:
_parse_utc_iso(value)
except Exception as e:
return False, None, f"invalid {field}: {e}"
normalized = {
"schema_version": "1.0",
"intent_id": intent_id.strip(),
"group_id": group_id,
"desired_state": desired_state,
"reason": reason,
"issued_at": payload.get("issued_at"),
"expires_at": payload.get("expires_at"),
"poll_interval_sec": poll_interval_sec,
"active_event_ids": normalized_event_ids,
"event_window_start": payload.get("event_window_start"),
"event_window_end": payload.get("event_window_end"),
}
return True, normalized, None
def write_power_intent_state(data):
"""Atomically write power intent state for display_manager consumption."""
try:
tmp_path = POWER_INTENT_STATE_FILE + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_path, POWER_INTENT_STATE_FILE)
except Exception as e:
logging.error(f"Error writing power intent state: {e}")
def _atomic_write_json(path, data):
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp_path = path + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_path, path)
def _read_json_or_default(path, default):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return default
def _extract_command_id(payload):
if isinstance(payload, dict):
value = payload.get("command_id")
if isinstance(value, str) and value.strip():
return value.strip()
return NIL_COMMAND_ID
def _as_uuid_str(value):
if not isinstance(value, str) or not value.strip():
raise ValueError("must be a non-empty string")
return str(uuid.UUID(value.strip()))
def _prune_processed_commands(commands):
if not isinstance(commands, dict):
return {}
cutoff = datetime.now(timezone.utc) - timedelta(hours=max(1, COMMAND_DEDUPE_TTL_HOURS))
kept = {}
sortable = []
for command_id, entry in commands.items():
if not isinstance(entry, dict):
continue
processed_at = entry.get("processed_at")
if not processed_at:
continue
try:
processed_dt = _parse_utc_iso(processed_at)
except Exception:
continue
if processed_dt < cutoff:
continue
kept[command_id] = entry
sortable.append((processed_dt, command_id))
max_entries = max(1, COMMAND_DEDUPE_MAX_ENTRIES)
if len(sortable) > max_entries:
sortable.sort(reverse=True)
keep_ids = {cid for _, cid in sortable[:max_entries]}
kept = {cid: entry for cid, entry in kept.items() if cid in keep_ids}
return kept
def load_processed_commands():
state = _read_json_or_default(PROCESSED_COMMANDS_FILE, {"commands": {}})
commands = state.get("commands") if isinstance(state, dict) else {}
commands = _prune_processed_commands(commands)
_atomic_write_json(PROCESSED_COMMANDS_FILE, {"commands": commands})
return commands
def persist_processed_commands(commands):
sanitized = _prune_processed_commands(commands)
_atomic_write_json(PROCESSED_COMMANDS_FILE, {"commands": sanitized})
def load_last_command_state():
state = _read_json_or_default(LAST_COMMAND_STATE_FILE, {})
if isinstance(state, dict):
return state
return {}
def write_last_command_state(data):
_atomic_write_json(LAST_COMMAND_STATE_FILE, data)
def validate_command_payload(payload, expected_client_uuid):
if not isinstance(payload, dict):
return False, None, "invalid_schema", "payload must be a JSON object"
required = {
"schema_version",
"command_id",
"client_uuid",
"action",
"issued_at",
"expires_at",
"requested_by",
"reason",
}
payload_keys = set(payload.keys())
missing = sorted(required - payload_keys)
if missing:
return False, None, "missing_field", f"missing required fields: {', '.join(missing)}"
extras = sorted(payload_keys - required)
if extras:
return False, None, "invalid_schema", f"unexpected fields: {', '.join(extras)}"
if payload.get("schema_version") != "1.0":
return False, None, "invalid_schema", "schema_version must be 1.0"
try:
command_id = _as_uuid_str(payload.get("command_id"))
except Exception:
return False, None, "invalid_schema", "command_id must be a valid UUID"
try:
client_uuid = _as_uuid_str(payload.get("client_uuid"))
except Exception:
return False, None, "invalid_schema", "client_uuid must be a valid UUID"
try:
expected_uuid = _as_uuid_str(expected_client_uuid)
except Exception:
expected_uuid = str(expected_client_uuid).strip()
if client_uuid != expected_uuid:
return False, None, "invalid_schema", "client_uuid does not match this client"
action = payload.get("action")
if action not in COMMAND_ACTIONS:
return False, None, "invalid_schema", f"action must be one of {COMMAND_ACTIONS}"
try:
issued_at = _parse_utc_iso(payload.get("issued_at"))
expires_at = _parse_utc_iso(payload.get("expires_at"))
except Exception as e:
return False, None, "invalid_schema", f"invalid timestamp: {e}"
if expires_at <= issued_at:
return False, None, "invalid_schema", "expires_at must be later than issued_at"
if datetime.now(timezone.utc) > expires_at:
return False, None, "stale_command", "command expired"
requested_by = payload.get("requested_by")
if requested_by is not None:
if not isinstance(requested_by, int) or requested_by < 1:
return False, None, "invalid_schema", "requested_by must be integer >= 1 or null"
reason = payload.get("reason")
if reason is not None:
if not isinstance(reason, str):
return False, None, "invalid_schema", "reason must be string or null"
if len(reason) > 2000:
return False, None, "invalid_schema", "reason exceeds max length 2000"
normalized = {
"schema_version": "1.0",
"command_id": command_id,
"client_uuid": client_uuid,
"action": action,
"issued_at": issued_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
"expires_at": expires_at.strftime("%Y-%m-%dT%H:%M:%SZ"),
"requested_by": requested_by,
"reason": reason,
}
return True, normalized, None, None
def publish_command_ack(
client,
client_uuid,
command_id,
status,
error_code=None,
error_message=None,
expires_at=None,
):
if status not in ACK_STATUSES:
raise ValueError(f"invalid ack status: {status}")
if status == "failed":
if not isinstance(error_code, str) or not error_code.strip():
error_code = "internal_error"
if not isinstance(error_message, str) or not error_message.strip():
error_message = "failed without diagnostic message"
else:
error_code = None
error_message = None
if isinstance(error_code, str):
error_code = error_code[:128]
if isinstance(error_message, str):
error_message = error_message[:4000]
ack_payload = {
"command_id": command_id,
"status": status,
"error_code": error_code,
"error_message": error_message,
}
encoded = json.dumps(ack_payload)
ack_topics = [
f"infoscreen/{client_uuid}/commands/ack",
f"infoscreen/{client_uuid}/command/ack",
]
retry_schedule = [0.5, 1, 2, 4, 5]
attempt = 0
while True:
all_ok = True
for topic in ack_topics:
result = client.publish(topic, encoded, qos=1, retain=False)
if result.rc != mqtt.MQTT_ERR_SUCCESS:
all_ok = False
logging.warning(
"Command ack publish failed: topic=%s status=%s rc=%s",
topic,
status,
result.rc,
)
if all_ok:
logging.info("Command ack published: command_id=%s status=%s", command_id, status)
return True
if expires_at:
try:
if datetime.now(timezone.utc) >= _parse_utc_iso(expires_at):
logging.warning("Command ack retry stopped at expiry: command_id=%s", command_id)
return False
except Exception:
pass
delay = retry_schedule[min(attempt, len(retry_schedule) - 1)]
attempt += 1
time.sleep(delay)
def on_message(client, userdata, msg, properties=None):
global discovered
logging.info(f"Received: {msg.topic} {msg.payload.decode()}")
if msg.topic.startswith("infoscreen/events/"):
event_payload = msg.payload.decode()
logging.info(f"Event message from scheduler received: {event_payload}")
try:
event_data = json.loads(event_payload)
if is_empty_event(event_data):
logging.info("No active event - deleting event file")
delete_event_file()
else:
save_event_to_json(event_data)
# Check if event_data is a list or a dictionary
if isinstance(event_data, list):
for event in event_data:
presentation_files = event.get("presentation", {}).get("files", [])
for file in presentation_files:
file_url = file.get("url")
if file_url:
download_presentation_file(file_url)
elif isinstance(event_data, dict):
presentation_files = event_data.get("presentation", {}).get("files", [])
for file in presentation_files:
file_url = file.get("url")
if file_url:
download_presentation_file(file_url)
except json.JSONDecodeError as e:
logging.error(f"Invalid JSON in event message: {e}")
if event_payload.strip().lower() in ["null", "none", "empty", ""]:
logging.info("Empty event message received - deleting event file")
delete_event_file()
else:
event_data = {"raw_message": event_payload, "error": "Invalid JSON format"}
save_event_to_json(event_data)
if msg.topic.endswith("/discovery_ack"):
discovered = True
logging.info("Discovery ACK received. Starting heartbeat.")
def get_mac_addresses():
macs = set()
try:
for root, dirs, files in os.walk('/sys/class/net/'):
for iface in dirs:
try:
with open(f'/sys/class/net/{iface}/address') as f:
mac = f.read().strip()
if mac and mac != '00:00:00:00:00:00':
macs.add(mac)
except Exception:
continue
break
except Exception:
pass
return sorted(macs)
def get_board_serial():
# Raspberry Pi: /proc/cpuinfo, andere: /sys/class/dmi/id/product_serial
serial = None
try:
with open('/proc/cpuinfo') as f:
for line in f:
if line.lower().startswith('serial'):
serial = line.split(':')[1].strip()
break
except Exception:
pass
if not serial:
try:
with open('/sys/class/dmi/id/product_serial') as f:
serial = f.read().strip()
except Exception:
pass
return serial or "unknown"
def get_ip():
# Versucht, die lokale IP zu ermitteln (nicht 127.0.0.1)
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "unknown"
def get_hardware_token():
serial = get_board_serial()
macs = get_mac_addresses()
token_raw = serial + "_" + "_".join(macs)
# Hashen für Datenschutz
token_hash = hashlib.sha256(token_raw.encode()).hexdigest()
return token_hash
def get_model():
# Versucht, das Modell auszulesen (z.B. Raspberry Pi, PC, etc.)
try:
if os.path.exists('/proc/device-tree/model'):
with open('/proc/device-tree/model') as f:
return f.read().strip()
elif os.path.exists('/sys/class/dmi/id/product_name'):
with open('/sys/class/dmi/id/product_name') as f:
return f.read().strip()
except Exception:
pass
return "unknown"
SOFTWARE_VERSION = "1.0.0" # Optional: Anpassen bei neuen Releases
def _detect_watchdog_enabled():
env_flag = os.getenv("WATCHDOG_ENABLED", "").strip().lower()
if env_flag in ("1", "true", "yes", "on"):
return True
if os.path.exists("/dev/watchdog"):
return True
return False
def _detect_boot_source():
try:
with open("/proc/cmdline", "r", encoding="utf-8") as f:
cmdline = f.read().strip()
for token in cmdline.split():
if token.startswith("root="):
return token.split("=", 1)[1]
except Exception:
pass
return "unknown"
def configure_mqtt_security(client):
# Prefer broker-scoped auth vars when present, fallback to legacy vars.
auth_username = MQTT_USER or MQTT_USERNAME
auth_password = MQTT_PASSWORD_BROKER if MQTT_USER else MQTT_PASSWORD
configured = {
"username": bool(auth_username),
"tls": False,
"tls_insecure": False,
}
if auth_username:
client.username_pw_set(auth_username, auth_password or None)
configured["username"] = True
logging.info("Configured MQTT username/password authentication")
if not MQTT_TLS_ENABLED:
return configured
tls_kwargs = {
"ca_certs": MQTT_TLS_CA_CERT or None,
"certfile": MQTT_TLS_CERT or None,
"keyfile": MQTT_TLS_KEY or None,
"tls_version": ssl.PROTOCOL_TLS_CLIENT,
}
client.tls_set(**tls_kwargs)
configured["tls"] = True
if MQTT_TLS_INSECURE:
client.tls_insecure_set(True)
configured["tls_insecure"] = True
logging.warning("MQTT TLS hostname verification disabled via MQTT_TLS_INSECURE")
else:
client.tls_insecure_set(False)
logging.info(
"Configured MQTT TLS: ca=%s client_cert=%s client_key=%s",
bool(MQTT_TLS_CA_CERT),
bool(MQTT_TLS_CERT),
bool(MQTT_TLS_KEY),
)
return configured
def send_discovery(client, client_id, hardware_token, ip_addr):
macs = get_mac_addresses()
discovery_msg = {
"uuid": client_id,
"hardware_token": hardware_token,
"ip": ip_addr,
"type": "infoscreen",
"hostname": socket.gethostname(),
"os_version": platform.platform(),
"software_version": SOFTWARE_VERSION,
"macs": macs,
"model": get_model(),
"capabilities": {
"recovery_class": "software_only",
"watchdog_enabled": _detect_watchdog_enabled(),
"boot_source": _detect_boot_source(),
"command_schema_version": "1.0",
},
}
client.publish("infoscreen/discovery", json.dumps(discovery_msg))
logging.info(f"Discovery message sent: {discovery_msg}")
def get_persistent_uuid(uuid_path=None):
if uuid_path is None:
uuid_path = os.path.join(os.path.dirname(__file__), "config", "client_uuid.txt")
# Prüfe, ob die Datei existiert
if os.path.exists(uuid_path):
with open(uuid_path, "r") as f:
return f.read().strip()
# Generiere neue UUID und speichere sie
new_uuid = str(uuid.uuid4())
os.makedirs(os.path.dirname(uuid_path), exist_ok=True)
with open(uuid_path, "w") as f:
f.write(new_uuid)
return new_uuid
def load_last_group_id(path):
try:
with open(path, 'r') as f:
return f.read().strip()
except Exception:
return None
def save_last_group_id(path, group_id):
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
f.write(str(group_id))
except Exception as e:
logging.error(f"Error saving group_id: {e}")
def download_presentation_file(url):
"""Downloads the presentation file from the given URL."""
try:
# Resolve URL to correct API host (same IP as MQTT broker by default)
resolved_url = resolve_file_url(url)
# Create the presentation directory if it doesn't exist
presentation_dir = os.path.join(os.path.dirname(__file__), "presentation")
os.makedirs(presentation_dir, exist_ok=True)
# Extract the filename from the (possibly encoded) URL
filename = unquote(urlsplit(resolved_url).path.split("/")[-1]) or "downloaded_file"
file_path = os.path.join(presentation_dir, filename)
# Check if the file already exists
if os.path.exists(file_path):
logging.info(f"File already exists: {file_path}")
return
# Download the file
logging.info(f"Downloading file from: {resolved_url}")
response = requests.get(resolved_url, timeout=20)
response.raise_for_status() # Raise an error for bad responses
# Save the file
with open(file_path, "wb") as f:
f.write(response.content)
logging.info(f"File downloaded successfully: {file_path}")
except Exception as e:
logging.error(f"Error downloading file: {e}")
def resolve_file_url(original_url: str) -> str:
"""Resolve/normalize a file URL to point to the configured file server.
Rules:
- If FILE_SERVER_BASE_URL is set, force scheme/host/port from it.
- Else default to FILE_SERVER_HOST (defaults to MQTT_BROKER) and FILE_SERVER_PORT (8000).
- Only rewrite host when incoming URL host is missing or equals 'server'.
- Preserve path and query.
"""
try:
parts = urlsplit(original_url)
# Determine target base
target_scheme = FILE_SERVER_SCHEME
target_host = FILE_SERVER_HOST
target_port = FILE_SERVER_PORT
if FILE_SERVER_BASE_URL:
base = urlsplit(FILE_SERVER_BASE_URL)
# Only assign if present to allow partial base definitions
if base.scheme:
target_scheme = base.scheme
if base.hostname:
target_host = base.hostname
if base.port:
target_port = base.port
# Decide whether to rewrite
incoming_host = parts.hostname
should_rewrite = (incoming_host is None) or (incoming_host.lower() == "server")
if should_rewrite:
# Build netloc with port (always include port to be explicit)
netloc = f"{target_host}:{target_port}" if target_port else target_host
new_parts = (
target_scheme,
netloc,
parts.path or "/",
parts.query,
parts.fragment,
)
return urlunsplit(new_parts)
else:
# Keep original if it's already a proper absolute URL
return original_url
except Exception as e:
logging.warning(f"Could not resolve URL, using original: {original_url} (error: {e})")
return original_url
def get_latest_screenshot():
"""Get the latest screenshot from the host OS shared folder"""
try:
screenshot_dir = os.path.join(os.path.dirname(__file__), "screenshots")
if not os.path.exists(screenshot_dir):
return None
# Prefer 'latest.jpg' symlink/copy if present (written by display_manager)
preferred_path = os.path.join(screenshot_dir, "latest.jpg")
if os.path.exists(preferred_path):
try:
with open(preferred_path, "rb") as f:
screenshot_data = base64.b64encode(f.read()).decode('utf-8')
file_stats = os.stat(preferred_path)
logging.debug(f"Using preferred latest.jpg for screenshot ({file_stats.st_size} bytes)")
return {
"filename": os.path.basename(preferred_path),
"data": screenshot_data,
"timestamp": datetime.fromtimestamp(file_stats.st_mtime, tz=timezone.utc).isoformat(),
"size": file_stats.st_size
}
except Exception as e:
logging.debug(f"Could not read latest.jpg, falling back to newest file: {e}")
# Find the most recent screenshot file
# Exclude 'latest.jpg' (it's just a pointer) and any broken symlinks
screenshot_files = [
f for f in os.listdir(screenshot_dir)
if f.lower().endswith(('.png', '.jpg', '.jpeg'))
and f != 'latest.jpg'
and os.path.exists(os.path.join(screenshot_dir, f))
]
if not screenshot_files:
return None
# Get the most recent file
latest_file = max(screenshot_files,
key=lambda f: os.path.getmtime(os.path.join(screenshot_dir, f)))
screenshot_path = os.path.join(screenshot_dir, latest_file)
# Read and encode screenshot
with open(screenshot_path, "rb") as f:
screenshot_data = base64.b64encode(f.read()).decode('utf-8')
# Get file info
file_stats = os.stat(screenshot_path)
info = {
"filename": latest_file,
"data": screenshot_data,
"timestamp": datetime.fromtimestamp(file_stats.st_mtime, tz=timezone.utc).isoformat(),
"size": file_stats.st_size
}
logging.debug(f"Selected latest screenshot: {latest_file} ({file_stats.st_size} bytes)")
return info
except Exception as e:
logging.error(f"Error reading screenshot: {e}")
return None
def read_health_state():
"""Read the health state file written by display_manager"""
try:
if not os.path.exists(HEALTH_STATE_FILE):
return None
with open(HEALTH_STATE_FILE, 'r') as f:
return json.load(f)
except Exception as e:
logging.debug(f"Could not read health state file: {e}")
return None
def read_power_state():
"""Read last power action state produced by display_manager."""
try:
if not os.path.exists(POWER_STATE_FILE):
return None
with open(POWER_STATE_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logging.debug(f"Could not read power state file: {e}")
return None
def publish_power_state_message(client, client_id, power_state: dict):
"""Publish power action telemetry to MQTT (best effort)."""
try:
if not isinstance(power_state, dict):
return
payload = dict(power_state)
payload["client_id"] = client_id
payload.setdefault("reported_at", datetime.now(timezone.utc).isoformat())
topic = f"infoscreen/{client_id}/power/state"
res = client.publish(topic, json.dumps(payload), qos=0)
if res.rc == mqtt.MQTT_ERR_SUCCESS:
p = payload.get("power", {})
logging.info(
"Power state published: state=%s source=%s result=%s",
p.get("applied_state"),
p.get("source"),
p.get("result"),
)
except Exception as e:
logging.debug(f"Could not publish power state: {e}")
def power_state_service_thread(client, client_id):
"""Background publisher for power action state changes."""
logging.info("Power state service started")
last_mtime = None
while True:
try:
time.sleep(1)
if not os.path.exists(POWER_STATE_FILE):
continue
mtime = os.path.getmtime(POWER_STATE_FILE)
if last_mtime is not None and mtime <= last_mtime:
continue
last_mtime = mtime
state = read_power_state()
if state:
publish_power_state_message(client, client_id, state)
except Exception as e:
logging.debug(f"Power state service error: {e}")
time.sleep(2)
def save_client_settings(settings_data):
"""Persist dashboard-managed client settings for the display manager."""
try:
os.makedirs(os.path.dirname(CLIENT_SETTINGS_FILE), exist_ok=True)
with open(CLIENT_SETTINGS_FILE, 'w', encoding='utf-8') as f:
json.dump(settings_data, f, ensure_ascii=False, indent=2)
logging.info(f"Client settings saved to {CLIENT_SETTINGS_FILE}")
except Exception as e:
logging.error(f"Error saving client settings: {e}")
def delete_client_settings():
"""Delete persisted client settings so defaults apply again."""
try:
if os.path.exists(CLIENT_SETTINGS_FILE):
os.remove(CLIENT_SETTINGS_FILE)
logging.info(f"Client settings deleted: {CLIENT_SETTINGS_FILE}")
except Exception as e:
logging.error(f"Error deleting client settings: {e}")
def publish_health_message(client, client_id, connection_state=None):
"""Publish health status to server via MQTT"""
try:
health = read_health_state()
if not health:
return # No active process
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"expected_state": {
"event_id": health.get("event_id")
},
"actual_state": {
"process": health.get("current_process"),
"pid": health.get("process_pid"),
"status": health.get("process_status")
}
}
if connection_state is not None:
last_disc = connection_state.get("last_disconnect")
payload["broker_connection"] = {
"broker_reachable": bool(connection_state.get("connected")),
"reconnect_count": connection_state.get("reconnect_count", 0),
"last_disconnect_at": (
datetime.fromtimestamp(last_disc, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
if last_disc else None
),
}
topic = f"infoscreen/{client_id}/health"
res = client.publish(topic, json.dumps(payload), qos=1)
if res.rc == mqtt.MQTT_ERR_SUCCESS:
logging.debug(f"Health message published: {health.get('current_process')} status={health.get('process_status')}")
else:
logging.debug(f"Health publish failed with code: {res.rc}")
except Exception as e:
logging.debug(f"Error publishing health: {e}")
def publish_log_message(client, client_id, level: str, message: str, context: dict = None):
"""Publish log message to server via MQTT (only if level is ERROR or WARN, unless DEBUG_MODE)"""
try:
# Filter logs: only send ERROR/WARN to server, keep INFO/DEBUG local-only unless DEBUG_MODE
if level.upper() == "INFO" and not DEBUG_MODE:
return # Keep INFO logs local only in production
if level.upper() == "DEBUG":
return # DEBUG logs always local-only
topic = f"infoscreen/{client_id}/logs/{level.lower()}"
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"message": message,
"context": context or {}
}
res = client.publish(topic, json.dumps(payload), qos=1)
if res.rc == mqtt.MQTT_ERR_SUCCESS:
monitoring_logger.log(getattr(logging, level.upper(), logging.INFO), f"[MQTT] {message}")
else:
logging.debug(f"Log publish failed ({level}) with code: {res.rc}")
except Exception as e:
logging.debug(f"Error publishing log: {e}")
def _read_and_clear_meta():
"""Read screenshots/meta.json and atomically clear the send_immediately flag.
Returns the parsed dict (with the *original* send_immediately value) if the
file exists and is valid JSON, else None. The flag is cleared on disk before
returning so a crash between read and publish does not re-send on the next tick.
"""
try:
if not os.path.exists(SCREENSHOT_META_FILE):
return None
with open(SCREENSHOT_META_FILE, 'r', encoding='utf-8') as f:
meta = json.load(f)
if meta.get('send_immediately'):
# Write cleared copy atomically so the flag is gone before we return
cleared = dict(meta)
cleared['send_immediately'] = False
tmp_path = SCREENSHOT_META_FILE + '.tmp'
with open(tmp_path, 'w', encoding='utf-8') as f:
json.dump(cleared, f)
os.replace(tmp_path, SCREENSHOT_META_FILE)
return meta # original dict; send_immediately is True if it was set
except Exception as e:
logging.debug(f"Could not read screenshot meta: {e}")
return None
def _build_dashboard_payload(client_id: str, screenshot_info: dict, health: dict, capture_type: str, trigger_meta: dict = None) -> dict:
"""Build the dashboard payload in one canonical place.
Keeping payload assembly centralized avoids schema drift across call sites.
"""
published_at = datetime.now(timezone.utc).isoformat()
screenshot_age_s = None
if screenshot_info:
try:
ts = datetime.fromisoformat(screenshot_info["timestamp"])
screenshot_age_s = round((datetime.now(timezone.utc) - ts).total_seconds(), 1)
except Exception:
pass
capture_meta = {
"type": capture_type,
"captured_at": (trigger_meta or {}).get("captured_at") or (screenshot_info or {}).get("timestamp"),
"age_s": screenshot_age_s,
"triggered": bool(trigger_meta and trigger_meta.get("send_immediately")),
"send_immediately": bool(trigger_meta and trigger_meta.get("send_immediately")),
}
process_health_payload = None
if health:
process_health_payload = {
"event_id": health.get("event_id"),
"event_type": health.get("event_type"),
"current_process": health.get("current_process"),
"process_pid": health.get("process_pid"),
"process_status": health.get("process_status"),
"restart_count": health.get("restart_count", 0)
}
payload = {
"message": {
"client_id": client_id,
"status": "alive",
},
"content": {
"screenshot": screenshot_info,
},
"runtime": {
"system_info": {
"hostname": socket.gethostname(),
"ip": get_ip(),
"uptime": time.time(),
},
"process_health": process_health_payload,
},
"metadata": {
"schema_version": "2.0",
"producer": "simclient",
"published_at": published_at,
"capture": capture_meta,
"transport": {
"topic": f"infoscreen/{client_id}/dashboard",
"qos": 0,
"publisher": "simclient",
},
},
}
return payload
def send_screenshot_heartbeat(client, client_id, capture_type: str = "periodic", trigger_meta: dict = None):
"""Send heartbeat with screenshot to server for dashboard monitoring"""
try:
screenshot_info = get_latest_screenshot()
# Also read health state and include in heartbeat
health = read_health_state()
heartbeat_data = _build_dashboard_payload(
client_id=client_id,
screenshot_info=screenshot_info,
health=health,
capture_type=capture_type,
trigger_meta=trigger_meta,
)
# Send to dashboard monitoring topic
dashboard_topic = f"infoscreen/{client_id}/dashboard"
payload = json.dumps(heartbeat_data)
res = client.publish(dashboard_topic, payload, qos=0)
if res.rc == mqtt.MQTT_ERR_SUCCESS:
age_str = f", age={heartbeat_data['metadata']['capture']['age_s']}s" if heartbeat_data['metadata']['capture']['age_s'] is not None else ""
if screenshot_info:
logging.info(
f"Dashboard published: schema=2.0 type={capture_type}"
f" screenshot={screenshot_info['filename']} ({screenshot_info['size']} bytes){age_str}"
)
else:
logging.info(f"Dashboard published: schema=2.0 type={capture_type} (no screenshot)")
elif res.rc == mqtt.MQTT_ERR_NO_CONN:
logging.warning("Dashboard heartbeat publish returned NO_CONN; will retry on next interval")
else:
logging.warning(f"Dashboard heartbeat publish failed with code: {res.rc}")
except Exception as e:
logging.error(f"Error sending screenshot heartbeat: {e}")
def screenshot_service_thread(client, client_id):
"""Background thread for screenshot monitoring and transmission.
Runs on a 1-second tick. A heartbeat is sent when either:
- display_manager set send_immediately=True in screenshots/meta.json
(event_start / event_stop triggered captures); fired within <=1 second, OR
- the periodic SCREENSHOT_INTERVAL has elapsed since the last send.
The interval timer resets on every send, so a triggered send pushes out the
next periodic heartbeat rather than causing a double-send shortly after.
"""
logging.info(f"Screenshot service started with {SCREENSHOT_INTERVAL}s periodic interval")
last_sent = 0.0
last_meta_type = None
while True:
try:
time.sleep(1)
now = time.time()
meta = _read_and_clear_meta()
triggered = bool(meta and meta.get('send_immediately'))
interval_due = (now - last_sent) >= SCREENSHOT_INTERVAL
if meta:
current_type = meta.get('type', 'unknown')
if current_type != last_meta_type:
logging.debug(f"Meta.json detected: type={current_type}, send_immediately={meta.get('send_immediately')}, file={meta.get('file')}")
last_meta_type = current_type
if triggered or interval_due:
capture_type = meta['type'] if (triggered and meta) else "periodic"
if triggered:
logging.info(f"Sending triggered screenshot: type={capture_type}")
send_screenshot_heartbeat(client, client_id, capture_type, trigger_meta=meta)
last_sent = now
except Exception as e:
logging.error(f"Screenshot service error: {e}")
time.sleep(60) # Wait a minute before retrying
def main():
global discovered
print(f"[{datetime.now(timezone.utc).isoformat()}] simclient.py: program started")
logging.info("Client starting - deleting old event file if present")
delete_event_file()
client_id = get_persistent_uuid()
hardware_token = get_hardware_token()
ip_addr = get_ip()
# Persistenz für group_id (needed in on_connect)
group_id_path = os.path.join(os.path.dirname(__file__), "config", "last_group_id.txt")
current_group_id = load_last_group_id(group_id_path)
event_topic = None
power_intent_topic = None
last_power_intent_id = None
last_power_issued_at = None
command_topic = f"infoscreen/{client_id}/commands"
command_topic_alias = f"infoscreen/{client_id}/command"
processed_commands = load_processed_commands()
pending_recovery_command = load_last_command_state()
# paho-mqtt v2: opt into latest callback API to avoid deprecation warnings.
client_kwargs = {"protocol": mqtt.MQTTv311}
try:
# Use enum when available (paho-mqtt >= 2.0)
if hasattr(mqtt, "CallbackAPIVersion"):
client_kwargs["callback_api_version"] = mqtt.CallbackAPIVersion.VERSION2
except Exception:
pass
client = mqtt.Client(**client_kwargs)
client.on_message = on_message
configure_mqtt_security(client)
# Enable automatic reconnection
client.reconnect_delay_set(min_delay=1, max_delay=120)
# Connection state tracking
connection_state = {
"connected": False,
"last_disconnect": None,
"reconnect_count": 0,
"connect_count": 0,
}
# Optional: Enable MQTT debug logging in DEBUG_MODE
if DEBUG_MODE:
def on_log(client, userdata, level, buf):
logging.debug(f"MQTT: {buf}")
client.on_log = on_log
# Define subscribe_event_topic BEFORE on_connect so it can be called from the callback
def subscribe_event_topic(new_group_id):
nonlocal event_topic, current_group_id
# Check if group actually changed to handle cleanup
group_changed = new_group_id != current_group_id
if group_changed:
if current_group_id is not None:
logging.info(f"Group change from {current_group_id} to {new_group_id} - deleting old event file")
delete_event_file()
if event_topic:
client.unsubscribe(event_topic)
logging.info(f"Unsubscribed from event topic: {event_topic}")
# Always ensure the event topic is subscribed
new_event_topic = f"infoscreen/events/{new_group_id}"
# Only subscribe if we don't already have this topic subscribed
if event_topic != new_event_topic:
if event_topic:
client.unsubscribe(event_topic)
logging.info(f"Unsubscribed from event topic: {event_topic}")
event_topic = new_event_topic
client.subscribe(event_topic)
logging.info(f"Subscribing to event topic: {event_topic} for group_id: {new_group_id}")
else:
logging.info(f"Event topic already subscribed: {event_topic}")
# Update current group_id and save it
if group_changed:
current_group_id = new_group_id
save_last_group_id(group_id_path, new_group_id)
def subscribe_power_intent_topic(new_group_id):
nonlocal power_intent_topic
if POWER_CONTROL_MODE not in ("hybrid", "mqtt"):
return
new_topic = f"infoscreen/groups/{new_group_id}/power/intent"
if power_intent_topic == new_topic:
logging.info(f"Power intent topic already subscribed: {power_intent_topic}")
return
if power_intent_topic:
client.unsubscribe(power_intent_topic)
logging.info(f"Unsubscribed from power intent topic: {power_intent_topic}")
power_intent_topic = new_topic
client.subscribe(power_intent_topic, qos=1)
logging.info(f"Subscribed to power intent topic: {power_intent_topic}")
# on_connect callback: Subscribe to all topics after connection is established
def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
connection_state["connected"] = True
connection_state["last_disconnect"] = None
connection_state["connect_count"] = connection_state.get("connect_count", 0) + 1
if connection_state["connect_count"] > 1:
connection_state["reconnect_count"] = connection_state.get("reconnect_count", 0) + 1
# Check if this is a reconnection
# paho-mqtt v2 provides ConnectFlags with attribute 'session_present'
# Older versions may provide dict-like flags; default to False.
is_reconnect = False
try:
if hasattr(flags, "session_present"):
is_reconnect = bool(getattr(flags, "session_present"))
elif isinstance(flags, dict):
is_reconnect = bool(flags.get("session present", False))
except Exception:
is_reconnect = False
if is_reconnect:
logging.info("MQTT reconnected successfully - resubscribing to all topics...")
else:
logging.info("MQTT connected successfully - subscribing to topics...")
# Discovery-ACK-Topic abonnieren
ack_topic = f"infoscreen/{client_id}/discovery_ack"
client.subscribe(ack_topic)
logging.info(f"Subscribed to: {ack_topic}")
# Config topic
client.subscribe(f"infoscreen/{client_id}/config")
logging.info(f"Subscribed to: infoscreen/{client_id}/config")
# group_id Topic abonnieren (retained)
group_id_topic = f"infoscreen/{client_id}/group_id"
client.subscribe(group_id_topic)
logging.info(f"Subscribed to: {group_id_topic}")
# Command topics (canonical + transitional)
client.subscribe(command_topic, qos=1)
client.subscribe(command_topic_alias, qos=1)
logging.info(f"Subscribed to command topics: {command_topic}, {command_topic_alias}")
# Wenn beim Start eine group_id vorhanden ist, sofort Event-Topic abonnieren
# Reset event_topic so subscribe_event_topic always re-registers with the broker
# (broker loses all subscriptions on reconnect, even if our local state still has it)
if current_group_id:
logging.info(f"Subscribing to event topic for saved group_id: {current_group_id}")
nonlocal event_topic
event_topic = None # force re-subscribe regardless of previous state
subscribe_event_topic(current_group_id)
nonlocal power_intent_topic
power_intent_topic = None
subscribe_power_intent_topic(current_group_id)
# Send discovery message after reconnection to re-register with server
if is_reconnect:
logging.info("Sending discovery after reconnection to re-register with server")
send_discovery(client, client_id, hardware_token, ip_addr)
else:
connection_state["connected"] = False
logging.error(f"MQTT connection failed with code: {rc}")
# on_disconnect callback (Paho v2 signature)
def on_disconnect(client, userdata, disconnect_flags, rc, properties=None):
connection_state["connected"] = False
connection_state["last_disconnect"] = time.time()
if rc == 0:
logging.info("MQTT disconnected cleanly")
else:
logging.warning(f"MQTT disconnected unexpectedly with code: {rc}")
logging.info("Automatic reconnection will be attempted...")
client.on_connect = on_connect
client.on_disconnect = on_disconnect
# Robust MQTT connect with fallbacks and retries
broker_candidates = [MQTT_BROKER]
# Add environment-provided fallbacks
broker_candidates.extend([b for b in MQTT_BROKER_FALLBACKS if b not in broker_candidates])
# Add common local fallbacks
for alt in ("127.0.0.1", "localhost", "mqtt"):
if alt not in broker_candidates:
broker_candidates.append(alt)
connect_ok = False
last_error = None
for attempt in range(1, 6): # up to 5 attempts
for host in broker_candidates:
try:
logging.info(f"Connecting to MQTT broker {host}:{MQTT_PORT} (attempt {attempt}/5)...")
client.connect(host, MQTT_PORT)
connect_ok = True
MQTT_HOST_USED = host # noqa: N816 local doc variable
break
except Exception as e:
last_error = e
logging.warning(f"MQTT connection to {host}:{MQTT_PORT} failed: {e}")
if connect_ok:
break
backoff = min(5 * attempt, 20)
logging.info(f"Retrying connection in {backoff}s...")
time.sleep(backoff)
if not connect_ok:
logging.error(f"MQTT connection failed after multiple attempts: {last_error}")
raise last_error
# Start the network loop early to begin connection process
client.loop_start()
logging.info("MQTT network loop started - establishing connection...")
# Wait for connection to complete and on_connect callback to fire
logging.info("Waiting for initial connection and subscription setup...")
connection_timeout = 30 # seconds
start_wait = time.time()
while not connection_state["connected"] and (time.time() - start_wait) < connection_timeout:
time.sleep(0.5)
if not connection_state["connected"]:
logging.error(f"Failed to establish initial MQTT connection within {connection_timeout}s")
raise Exception("MQTT connection timeout")
logging.info("Initial connection established, subscription setup complete")
# group_id message callback
group_id_topic = f"infoscreen/{client_id}/group_id"
def on_group_id_message(client, userdata, msg, properties=None):
payload = msg.payload.decode().strip()
new_group_id = None
# Versuche, group_id aus JSON zu extrahieren, sonst als String verwenden
try:
data = json.loads(payload)
if isinstance(data, dict) and "group_id" in data:
new_group_id = str(data["group_id"])
else:
new_group_id = str(data)
except Exception:
new_group_id = payload
new_group_id = new_group_id.strip()
if new_group_id:
if new_group_id != current_group_id:
logging.info(f"New group_id received: {new_group_id}")
else:
logging.info(f"group_id unchanged: {new_group_id}, ensuring event topic is subscribed")
# Always call subscribe_event_topic to ensure subscription
subscribe_event_topic(new_group_id)
subscribe_power_intent_topic(new_group_id)
else:
logging.warning("Empty group_id received!")
client.message_callback_add(group_id_topic, on_group_id_message)
logging.info(f"Current group_id at start: {current_group_id if current_group_id else 'none'}")
def mark_command_processed(command_id, status, error_code=None):
processed_commands[command_id] = {
"status": status,
"error_code": error_code,
"processed_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
}
persist_processed_commands(processed_commands)
def execute_command_action(action):
try:
proc = subprocess.run(
["sudo", COMMAND_HELPER_PATH, action],
timeout=max(1, COMMAND_EXEC_TIMEOUT_SEC),
check=False,
capture_output=True,
text=True,
)
except subprocess.TimeoutExpired:
return False, "execution_timeout", f"command timed out after {COMMAND_EXEC_TIMEOUT_SEC}s"
except PermissionError:
return False, "permission_denied_local", "permission denied invoking command helper"
except Exception as e:
return False, "internal_error", f"internal execution error: {e}"
if proc.returncode != 0:
stderr = (proc.stderr or "").strip()
if proc.returncode in (126, 127):
return False, "permission_denied_local", stderr or "command helper unavailable"
return False, "execution_failed", stderr or f"helper exited with code {proc.returncode}"
return True, None, None
def on_command_message(client, userdata, msg, properties=None):
payload_text = msg.payload.decode().strip()
received_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
expires_at = None
if not payload_text:
publish_command_ack(
client,
client_id,
NIL_COMMAND_ID,
"failed",
error_code="invalid_schema",
error_message="empty command payload",
)
return
try:
payload = json.loads(payload_text)
except json.JSONDecodeError as e:
publish_command_ack(
client,
client_id,
NIL_COMMAND_ID,
"failed",
error_code="invalid_schema",
error_message=f"invalid JSON: {e}",
)
return
command_id_hint = _extract_command_id(payload)
if isinstance(payload, dict):
expires_at = payload.get("expires_at")
is_valid, normalized, error_code, error_message = validate_command_payload(payload, client_id)
if not is_valid:
publish_command_ack(
client,
client_id,
command_id_hint,
"failed",
error_code=error_code,
error_message=error_message,
expires_at=expires_at,
)
return
command_id = normalized["command_id"]
expires_at = normalized["expires_at"]
action = normalized["action"]
if command_id in processed_commands:
publish_command_ack(
client,
client_id,
command_id,
"failed",
error_code="duplicate_command",
error_message="command_id already processed",
expires_at=expires_at,
)
return
publish_command_ack(
client,
client_id,
command_id,
"accepted",
expires_at=expires_at,
)
publish_command_ack(
client,
client_id,
command_id,
"execution_started",
expires_at=expires_at,
)
write_last_command_state({
"command_id": command_id,
"action": action,
"ack_status": "execution_started",
"received_at": received_at,
"execution_started_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"expires_at": expires_at,
"source_topic": msg.topic,
})
ok, exec_error_code, exec_error_message = execute_command_action(action)
if ok:
if command_requires_recovery_completion(action):
if command_mock_reboot_immediate_complete_enabled(action):
logging.info(
"Mock reboot immediate completion enabled: command_id=%s action=%s",
command_id,
action,
)
else:
logging.info(
"Command entered recovery completion path: command_id=%s action=%s",
command_id,
action,
)
return
logging.info(
"Command continuing to immediate completion path: command_id=%s action=%s",
command_id,
action,
)
publish_command_ack(
client,
client_id,
command_id,
"completed",
expires_at=expires_at,
)
mark_command_processed(command_id, "completed")
write_last_command_state({
"command_id": command_id,
"action": action,
"ack_status": "completed",
"completed_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"expires_at": expires_at,
})
return
publish_command_ack(
client,
client_id,
command_id,
"failed",
error_code=exec_error_code,
error_message=exec_error_message,
expires_at=expires_at,
)
mark_command_processed(command_id, "failed", error_code=exec_error_code)
write_last_command_state({
"command_id": command_id,
"action": action,
"ack_status": "failed",
"error_code": exec_error_code,
"error_message": exec_error_message,
"failed_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"expires_at": expires_at,
})
client.message_callback_add(command_topic, on_command_message)
client.message_callback_add(command_topic_alias, on_command_message)
def on_power_intent_message(client, userdata, msg, properties=None):
nonlocal last_power_intent_id, last_power_issued_at
payload_text = msg.payload.decode().strip()
received_at = datetime.now(timezone.utc).isoformat()
# A retained null-message clears the topic and arrives as an empty payload.
if not payload_text:
logging.info("Power intent retained message cleared (empty payload)")
write_power_intent_state({
"valid": False,
"mode": POWER_CONTROL_MODE,
"error": "retained_cleared",
"received_at": received_at,
"topic": msg.topic,
})
return
try:
payload = json.loads(payload_text)
except json.JSONDecodeError as e:
logging.warning(f"Invalid power intent JSON: {e}")
write_power_intent_state({
"valid": False,
"mode": POWER_CONTROL_MODE,
"error": f"invalid_json: {e}",
"received_at": received_at,
"topic": msg.topic,
})
return
is_valid, normalized, error = validate_power_intent_payload(payload, expected_group_id=current_group_id)
if not is_valid:
logging.warning(f"Rejected power intent: {error}")
write_power_intent_state({
"valid": False,
"mode": POWER_CONTROL_MODE,
"error": error,
"received_at": received_at,
"topic": msg.topic,
})
return
try:
issued_dt = _parse_utc_iso(normalized["issued_at"])
except Exception:
issued_dt = None
if last_power_issued_at and issued_dt and issued_dt < last_power_issued_at:
logging.warning(
f"Rejected out-of-order power intent {normalized['intent_id']} issued_at={normalized['issued_at']}"
)
write_power_intent_state({
"valid": False,
"mode": POWER_CONTROL_MODE,
"error": "out_of_order_intent",
"received_at": received_at,
"topic": msg.topic,
})
return
duplicate_intent_id = normalized["intent_id"] == last_power_intent_id
if issued_dt:
last_power_issued_at = issued_dt
last_power_intent_id = normalized["intent_id"]
logging.info(
"Power intent accepted: id=%s desired_state=%s reason=%s expires_at=%s duplicate=%s",
normalized["intent_id"],
normalized["desired_state"],
normalized["reason"],
normalized["expires_at"],
duplicate_intent_id,
)
write_power_intent_state({
"valid": True,
"mode": POWER_CONTROL_MODE,
"received_at": received_at,
"topic": msg.topic,
"duplicate_intent_id": duplicate_intent_id,
"payload": normalized,
})
config_topic = f"infoscreen/{client_id}/config"
def on_config_message(client, userdata, msg, properties=None):
payload = msg.payload.decode().strip()
if not payload or payload.lower() in ("null", "none", "empty", "{}"):
logging.info("Empty client config received - deleting persisted client settings")
delete_client_settings()
return
try:
config_data = json.loads(payload)
except json.JSONDecodeError as e:
logging.error(f"Invalid JSON in client config message: {e}")
return
if not isinstance(config_data, dict):
logging.warning("Ignoring non-object client config payload")
return
save_client_settings(config_data)
client.message_callback_add(config_topic, on_config_message)
if POWER_CONTROL_MODE in ("hybrid", "mqtt"):
if current_group_id:
subscribe_power_intent_topic(current_group_id)
else:
logging.info("Power control mode active but no group_id yet; waiting for group assignment")
def on_power_intent_dispatch(client, userdata, msg, properties=None):
on_power_intent_message(client, userdata, msg, properties)
# Register a generic callback so topic changes on group switch do not require re-registration.
client.message_callback_add("infoscreen/groups/+/power/intent", on_power_intent_dispatch)
logging.info(f"Power control mode active: {POWER_CONTROL_MODE}")
else:
logging.info(f"Power control mode is local; MQTT power intents disabled")
# Discovery-Phase: Sende Discovery bis ACK empfangen
# The loop is already started, just wait and send discovery messages
discovery_attempts = 0
max_discovery_attempts = 20
while not discovered and discovery_attempts < max_discovery_attempts:
if connection_state["connected"]:
send_discovery(client, client_id, hardware_token, ip_addr)
discovery_attempts += 1
# Wait for ACK, checking every second
for _ in range(int(HEARTBEAT_INTERVAL)):
if discovered:
break
time.sleep(1)
else:
logging.info("Waiting for MQTT connection before sending discovery...")
time.sleep(2)
if discovered:
break
if not discovered:
logging.warning(f"Discovery ACK not received after {max_discovery_attempts} attempts - continuing anyway")
# Start screenshot service in background thread
screenshot_thread = threading.Thread(
target=screenshot_service_thread,
args=(client, client_id),
daemon=True
)
screenshot_thread.start()
logging.info("Screenshot service thread started")
power_state_thread = threading.Thread(
target=power_state_service_thread,
args=(client, client_id),
daemon=True,
)
power_state_thread.start()
logging.info("Power state service thread started")
# Heartbeat-Loop with connection state monitoring
last_heartbeat = 0
logging.info("Entering heartbeat loop (network loop already running in background thread)")
_sd_notify("READY=1") # tell systemd the process is fully initialised
while True:
try:
current_time = time.time()
# Check connection state and log warnings if disconnected
if not connection_state["connected"]:
if connection_state["last_disconnect"]:
disconnect_duration = current_time - connection_state["last_disconnect"]
logging.warning(f"MQTT disconnected for {disconnect_duration:.1f}s - waiting for reconnection...")
else:
logging.warning("MQTT not connected - waiting for connection...")
# Send heartbeat only when connected
if current_time - last_heartbeat >= HEARTBEAT_INTERVAL:
if client.is_connected():
result = client.publish(f"infoscreen/{client_id}/heartbeat", "alive", qos=0)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logging.info("Heartbeat sent.")
# Also send health and screenshot heartbeats
publish_health_message(client, client_id, connection_state)
if (
isinstance(pending_recovery_command, dict)
and pending_recovery_command.get("ack_status") == "execution_started"
and pending_recovery_command.get("action") == "reboot_host"
and pending_recovery_command.get("command_id")
):
recovered_command_id = pending_recovery_command.get("command_id")
recovered_expires = pending_recovery_command.get("expires_at")
publish_command_ack(
client,
client_id,
recovered_command_id,
"completed",
expires_at=recovered_expires,
)
mark_command_processed(recovered_command_id, "completed")
write_last_command_state({
"command_id": recovered_command_id,
"action": pending_recovery_command.get("action"),
"ack_status": "recovered",
"recovered_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"expires_at": recovered_expires,
})
pending_recovery_command = None
elif result.rc == mqtt.MQTT_ERR_NO_CONN:
logging.debug("Heartbeat publish returned NO_CONN; retrying in 2s...")
time.sleep(2)
if client.is_connected():
retry = client.publish(f"infoscreen/{client_id}/heartbeat", "alive", qos=0)
if retry.rc == mqtt.MQTT_ERR_SUCCESS:
logging.info("Heartbeat sent after retry.")
publish_health_message(client, client_id, connection_state)
else:
logging.warning(f"Heartbeat publish failed after retry with code: {retry.rc}")
else:
logging.warning("Skipping heartbeat retry - MQTT still not connected")
else:
logging.warning(f"Heartbeat publish failed with code: {result.rc}")
else:
logging.debug("Skipping heartbeat - MQTT not connected (is_connected=False)")
last_heartbeat = current_time
_sd_notify("WATCHDOG=1") # kick systemd watchdog each loop iteration
time.sleep(5)
except KeyboardInterrupt:
logging.info("Shutting down gracefully...")
client.loop_stop()
client.disconnect()
break
except Exception as e:
logging.error(f"Error in main loop: {e}")
time.sleep(5)
if __name__ == "__main__":
main()