Initial import: clean snapshot from /home/olafn/infoscreen-dev (2025-10-25)
This commit is contained in:
741
src/simclient.py
Normal file
741
src/simclient.py
Normal file
@@ -0,0 +1,741 @@
|
||||
# simclient/simclient.py
|
||||
|
||||
from logging.handlers import RotatingFileHandler
|
||||
import time
|
||||
import uuid
|
||||
import json
|
||||
import socket
|
||||
import hashlib
|
||||
import paho.mqtt.client as mqtt
|
||||
import os
|
||||
import shutil
|
||||
import re
|
||||
import platform
|
||||
import logging
|
||||
from dotenv import load_dotenv
|
||||
import requests
|
||||
import base64
|
||||
from datetime import datetime
|
||||
import threading
|
||||
from urllib.parse import urlsplit, urlunsplit, unquote
|
||||
|
||||
# ENV laden - support both container and native development
|
||||
env_paths = [
|
||||
"/workspace/simclient/.env", # Container path
|
||||
os.path.join(os.path.dirname(__file__), ".env"), # Same directory
|
||||
os.path.join(os.path.expanduser("~"), "infoscreen-dev", ".env"), # Development path
|
||||
]
|
||||
|
||||
for env_path in env_paths:
|
||||
if os.path.exists(env_path):
|
||||
load_dotenv(env_path)
|
||||
break
|
||||
|
||||
def _env_int(name, default):
|
||||
"""Parse an int from environment variable, tolerating inline comments.
|
||||
Examples:
|
||||
- "10 # seconds" -> 10
|
||||
- " 300ms" -> 300
|
||||
- invalid or empty -> default
|
||||
"""
|
||||
raw = os.getenv(name)
|
||||
if raw is None or str(raw).strip() == "":
|
||||
return default
|
||||
try:
|
||||
# Remove inline comments
|
||||
sanitized = str(raw).split('#', 1)[0].strip()
|
||||
# Extract first integer occurrence
|
||||
m = re.search(r"-?\d+", sanitized)
|
||||
if m:
|
||||
return int(m.group(0))
|
||||
except Exception:
|
||||
pass
|
||||
return default
|
||||
|
||||
|
||||
def _env_bool(name, default=False):
|
||||
raw = os.getenv(name)
|
||||
if raw is None:
|
||||
return default
|
||||
return str(raw).strip().lower() in ("1", "true", "yes", "on")
|
||||
|
||||
def _env_host(name, default):
|
||||
"""Parse a hostname/IP from env, stripping inline comments and whitespace.
|
||||
Example: "192.168.1.10 # comment" -> "192.168.1.10"
|
||||
"""
|
||||
raw = os.getenv(name)
|
||||
if raw is None:
|
||||
return default
|
||||
# Remove inline comments and extra spaces
|
||||
sanitized = str(raw).split('#', 1)[0].strip()
|
||||
# If any whitespace remains, take the first token as host
|
||||
if not sanitized:
|
||||
return default
|
||||
return sanitized.split()[0]
|
||||
|
||||
|
||||
def _env_str_clean(name, default=""):
|
||||
"""Parse a generic string from env, removing inline comments and trimming.
|
||||
Returns the first whitespace-delimited token to avoid accidental comment tails.
|
||||
"""
|
||||
raw = os.getenv(name)
|
||||
if raw is None:
|
||||
return default
|
||||
sanitized = str(raw).split('#', 1)[0].strip()
|
||||
if not sanitized:
|
||||
return default
|
||||
return sanitized.split()[0]
|
||||
|
||||
|
||||
# Konfiguration aus ENV
|
||||
ENV = os.getenv("ENV", "development")
|
||||
HEARTBEAT_INTERVAL = _env_int("HEARTBEAT_INTERVAL", 5 if ENV == "development" else 60)
|
||||
SCREENSHOT_INTERVAL = _env_int("SCREENSHOT_INTERVAL", 30 if ENV == "development" else 300)
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG" if ENV == "development" else "INFO")
|
||||
# Default to localhost in development, 'mqtt' (Docker compose service) otherwise
|
||||
MQTT_BROKER = _env_host("MQTT_BROKER", "localhost" if ENV == "development" else "mqtt")
|
||||
MQTT_PORT = _env_int("MQTT_PORT", 1883)
|
||||
DEBUG_MODE = _env_bool("DEBUG_MODE", ENV == "development")
|
||||
MQTT_BROKER_FALLBACKS = []
|
||||
_fallbacks_raw = os.getenv("MQTT_BROKER_FALLBACKS", "")
|
||||
if _fallbacks_raw:
|
||||
for item in _fallbacks_raw.split(","):
|
||||
host = item.split('#', 1)[0].strip()
|
||||
if host:
|
||||
# Only take the first whitespace-delimited token
|
||||
MQTT_BROKER_FALLBACKS.append(host.split()[0])
|
||||
|
||||
# File server/API configuration
|
||||
# Defaults: use same host as MQTT broker, port 8000, http scheme
|
||||
FILE_SERVER_BASE_URL = _env_str_clean("FILE_SERVER_BASE_URL", "")
|
||||
_scheme_raw = _env_str_clean("FILE_SERVER_SCHEME", "http").lower()
|
||||
FILE_SERVER_SCHEME = _scheme_raw if _scheme_raw in ("http", "https") else "http"
|
||||
FILE_SERVER_HOST = _env_host("FILE_SERVER_HOST", MQTT_BROKER)
|
||||
FILE_SERVER_PORT = _env_int("FILE_SERVER_PORT", 8000)
|
||||
|
||||
# Logging-Konfiguration
|
||||
LOG_PATH = os.path.join(os.path.dirname(__file__), "simclient.log")
|
||||
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
|
||||
log_handlers = []
|
||||
log_handlers.append(RotatingFileHandler(
|
||||
LOG_PATH, maxBytes=2*1024*1024, backupCount=5, encoding="utf-8"))
|
||||
if DEBUG_MODE:
|
||||
log_handlers.append(logging.StreamHandler())
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=log_handlers
|
||||
)
|
||||
|
||||
|
||||
discovered = False
|
||||
|
||||
|
||||
def save_event_to_json(event_data):
|
||||
"""Speichert eine Event-Nachricht in der Datei current_event.json
|
||||
|
||||
This function preserves ALL fields from the incoming event data,
|
||||
including scheduler-specific fields like:
|
||||
- page_progress: Current page/slide progress tracking
|
||||
- auto_progress: Auto-progression state
|
||||
- And any other fields sent by the scheduler
|
||||
"""
|
||||
try:
|
||||
json_path = os.path.join(os.path.dirname(__file__), "current_event.json")
|
||||
with open(json_path, "w", encoding="utf-8") as f:
|
||||
json.dump(event_data, f, ensure_ascii=False, indent=2)
|
||||
logging.info(f"Event message saved to {json_path}")
|
||||
|
||||
# Log if scheduler-specific fields are present
|
||||
if isinstance(event_data, list):
|
||||
for idx, event in enumerate(event_data):
|
||||
if isinstance(event, dict):
|
||||
if 'page_progress' in event:
|
||||
logging.debug(f"Event {idx}: page_progress = {event['page_progress']}")
|
||||
if 'auto_progress' in event:
|
||||
logging.debug(f"Event {idx}: auto_progress = {event['auto_progress']}")
|
||||
elif isinstance(event_data, dict):
|
||||
if 'page_progress' in event_data:
|
||||
logging.debug(f"Event page_progress = {event_data['page_progress']}")
|
||||
if 'auto_progress' in event_data:
|
||||
logging.debug(f"Event auto_progress = {event_data['auto_progress']}")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error saving event message: {e}")
|
||||
|
||||
|
||||
def delete_event_file():
|
||||
"""Löscht die current_event.json Datei wenn kein Event aktiv ist"""
|
||||
try:
|
||||
json_path = os.path.join(os.path.dirname(__file__), "current_event.json")
|
||||
if os.path.exists(json_path):
|
||||
# Copy to last_event.json first so we keep a record of the last event
|
||||
try:
|
||||
last_path = os.path.join(os.path.dirname(__file__), "last_event.json")
|
||||
# Use atomic replace: write to temp then replace
|
||||
tmp_path = last_path + ".tmp"
|
||||
shutil.copyfile(json_path, tmp_path)
|
||||
os.replace(tmp_path, last_path)
|
||||
logging.info(f"Copied {json_path} to {last_path} (last event)")
|
||||
except Exception as e:
|
||||
logging.warning(f"Could not copy current_event.json to last_event.json: {e}")
|
||||
|
||||
os.remove(json_path)
|
||||
logging.info(f"Event file {json_path} deleted - no active event")
|
||||
except Exception as e:
|
||||
logging.error(f"Error deleting event file: {e}")
|
||||
|
||||
|
||||
def is_empty_event(event_data):
|
||||
"""Prüft ob eine Event-Nachricht bedeutet, dass kein Event aktiv ist"""
|
||||
if event_data is None:
|
||||
return True
|
||||
|
||||
# Verschiedene Möglichkeiten für "kein Event":
|
||||
# 1. Leeres Dictionary
|
||||
if not event_data:
|
||||
return True
|
||||
|
||||
# 2. Explizite "null" oder "empty" Werte
|
||||
if isinstance(event_data, dict):
|
||||
# Event ist null/None
|
||||
if event_data.get("event") is None or event_data.get("event") == "null":
|
||||
return True
|
||||
# Event ist explizit als "empty" oder "none" markiert
|
||||
if str(event_data.get("event", "")).lower() in ["empty", "none", ""]:
|
||||
return True
|
||||
# Status zeigt an dass kein Event aktiv ist
|
||||
status = str(event_data.get("status", "")).lower()
|
||||
if status in ["inactive", "none", "empty", "cleared"]:
|
||||
return True
|
||||
|
||||
# 3. String-basierte Events
|
||||
if isinstance(event_data, str) and event_data.lower() in ["null", "none", "empty", ""]:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def on_message(client, userdata, msg, properties=None):
|
||||
global discovered
|
||||
logging.info(f"Received: {msg.topic} {msg.payload.decode()}")
|
||||
if msg.topic.startswith("infoscreen/events/"):
|
||||
event_payload = msg.payload.decode()
|
||||
logging.info(f"Event message from scheduler received: {event_payload}")
|
||||
|
||||
try:
|
||||
event_data = json.loads(event_payload)
|
||||
|
||||
if is_empty_event(event_data):
|
||||
logging.info("No active event - deleting event file")
|
||||
delete_event_file()
|
||||
else:
|
||||
save_event_to_json(event_data)
|
||||
|
||||
# Check if event_data is a list or a dictionary
|
||||
if isinstance(event_data, list):
|
||||
for event in event_data:
|
||||
presentation_files = event.get("presentation", {}).get("files", [])
|
||||
for file in presentation_files:
|
||||
file_url = file.get("url")
|
||||
if file_url:
|
||||
download_presentation_file(file_url)
|
||||
elif isinstance(event_data, dict):
|
||||
presentation_files = event_data.get("presentation", {}).get("files", [])
|
||||
for file in presentation_files:
|
||||
file_url = file.get("url")
|
||||
if file_url:
|
||||
download_presentation_file(file_url)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logging.error(f"Invalid JSON in event message: {e}")
|
||||
if event_payload.strip().lower() in ["null", "none", "empty", ""]:
|
||||
logging.info("Empty event message received - deleting event file")
|
||||
delete_event_file()
|
||||
else:
|
||||
event_data = {"raw_message": event_payload, "error": "Invalid JSON format"}
|
||||
save_event_to_json(event_data)
|
||||
|
||||
if msg.topic.endswith("/discovery_ack"):
|
||||
discovered = True
|
||||
logging.info("Discovery ACK received. Starting heartbeat.")
|
||||
|
||||
|
||||
def get_mac_addresses():
|
||||
macs = set()
|
||||
try:
|
||||
for root, dirs, files in os.walk('/sys/class/net/'):
|
||||
for iface in dirs:
|
||||
try:
|
||||
with open(f'/sys/class/net/{iface}/address') as f:
|
||||
mac = f.read().strip()
|
||||
if mac and mac != '00:00:00:00:00:00':
|
||||
macs.add(mac)
|
||||
except Exception:
|
||||
continue
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
return sorted(macs)
|
||||
|
||||
|
||||
def get_board_serial():
|
||||
# Raspberry Pi: /proc/cpuinfo, andere: /sys/class/dmi/id/product_serial
|
||||
serial = None
|
||||
try:
|
||||
with open('/proc/cpuinfo') as f:
|
||||
for line in f:
|
||||
if line.lower().startswith('serial'):
|
||||
serial = line.split(':')[1].strip()
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
if not serial:
|
||||
try:
|
||||
with open('/sys/class/dmi/id/product_serial') as f:
|
||||
serial = f.read().strip()
|
||||
except Exception:
|
||||
pass
|
||||
return serial or "unknown"
|
||||
|
||||
|
||||
def get_ip():
|
||||
# Versucht, die lokale IP zu ermitteln (nicht 127.0.0.1)
|
||||
try:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.connect(("8.8.8.8", 80))
|
||||
ip = s.getsockname()[0]
|
||||
s.close()
|
||||
return ip
|
||||
except Exception:
|
||||
return "unknown"
|
||||
|
||||
|
||||
def get_hardware_token():
|
||||
serial = get_board_serial()
|
||||
macs = get_mac_addresses()
|
||||
token_raw = serial + "_" + "_".join(macs)
|
||||
# Hashen für Datenschutz
|
||||
token_hash = hashlib.sha256(token_raw.encode()).hexdigest()
|
||||
return token_hash
|
||||
|
||||
|
||||
def get_model():
|
||||
# Versucht, das Modell auszulesen (z.B. Raspberry Pi, PC, etc.)
|
||||
try:
|
||||
if os.path.exists('/proc/device-tree/model'):
|
||||
with open('/proc/device-tree/model') as f:
|
||||
return f.read().strip()
|
||||
elif os.path.exists('/sys/class/dmi/id/product_name'):
|
||||
with open('/sys/class/dmi/id/product_name') as f:
|
||||
return f.read().strip()
|
||||
except Exception:
|
||||
pass
|
||||
return "unknown"
|
||||
|
||||
|
||||
SOFTWARE_VERSION = "1.0.0" # Optional: Anpassen bei neuen Releases
|
||||
|
||||
|
||||
def send_discovery(client, client_id, hardware_token, ip_addr):
|
||||
macs = get_mac_addresses()
|
||||
discovery_msg = {
|
||||
"uuid": client_id,
|
||||
"hardware_token": hardware_token,
|
||||
"ip": ip_addr,
|
||||
"type": "infoscreen",
|
||||
"hostname": socket.gethostname(),
|
||||
"os_version": platform.platform(),
|
||||
"software_version": SOFTWARE_VERSION,
|
||||
"macs": macs,
|
||||
"model": get_model(),
|
||||
}
|
||||
client.publish("infoscreen/discovery", json.dumps(discovery_msg))
|
||||
logging.info(f"Discovery message sent: {discovery_msg}")
|
||||
|
||||
|
||||
def get_persistent_uuid(uuid_path=None):
|
||||
if uuid_path is None:
|
||||
uuid_path = os.path.join(os.path.dirname(__file__), "config", "client_uuid.txt")
|
||||
# Prüfe, ob die Datei existiert
|
||||
if os.path.exists(uuid_path):
|
||||
with open(uuid_path, "r") as f:
|
||||
return f.read().strip()
|
||||
# Generiere neue UUID und speichere sie
|
||||
new_uuid = str(uuid.uuid4())
|
||||
os.makedirs(os.path.dirname(uuid_path), exist_ok=True)
|
||||
with open(uuid_path, "w") as f:
|
||||
f.write(new_uuid)
|
||||
return new_uuid
|
||||
|
||||
|
||||
|
||||
def load_last_group_id(path):
|
||||
try:
|
||||
with open(path, 'r') as f:
|
||||
return f.read().strip()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def save_last_group_id(path, group_id):
|
||||
try:
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
with open(path, 'w') as f:
|
||||
f.write(str(group_id))
|
||||
except Exception as e:
|
||||
logging.error(f"Error saving group_id: {e}")
|
||||
|
||||
def download_presentation_file(url):
|
||||
"""Downloads the presentation file from the given URL."""
|
||||
try:
|
||||
# Resolve URL to correct API host (same IP as MQTT broker by default)
|
||||
resolved_url = resolve_file_url(url)
|
||||
|
||||
# Create the presentation directory if it doesn't exist
|
||||
presentation_dir = os.path.join(os.path.dirname(__file__), "presentation")
|
||||
os.makedirs(presentation_dir, exist_ok=True)
|
||||
|
||||
# Extract the filename from the (possibly encoded) URL
|
||||
filename = unquote(urlsplit(resolved_url).path.split("/")[-1]) or "downloaded_file"
|
||||
file_path = os.path.join(presentation_dir, filename)
|
||||
|
||||
# Check if the file already exists
|
||||
if os.path.exists(file_path):
|
||||
logging.info(f"File already exists: {file_path}")
|
||||
return
|
||||
|
||||
# Download the file
|
||||
logging.info(f"Downloading file from: {resolved_url}")
|
||||
response = requests.get(resolved_url, timeout=20)
|
||||
response.raise_for_status() # Raise an error for bad responses
|
||||
|
||||
# Save the file
|
||||
with open(file_path, "wb") as f:
|
||||
f.write(response.content)
|
||||
|
||||
logging.info(f"File downloaded successfully: {file_path}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error downloading file: {e}")
|
||||
|
||||
|
||||
def resolve_file_url(original_url: str) -> str:
|
||||
"""Resolve/normalize a file URL to point to the configured file server.
|
||||
|
||||
Rules:
|
||||
- If FILE_SERVER_BASE_URL is set, force scheme/host/port from it.
|
||||
- Else default to FILE_SERVER_HOST (defaults to MQTT_BROKER) and FILE_SERVER_PORT (8000).
|
||||
- Only rewrite host when incoming URL host is missing or equals 'server'.
|
||||
- Preserve path and query.
|
||||
"""
|
||||
try:
|
||||
parts = urlsplit(original_url)
|
||||
|
||||
# Determine target base
|
||||
target_scheme = FILE_SERVER_SCHEME
|
||||
target_host = FILE_SERVER_HOST
|
||||
target_port = FILE_SERVER_PORT
|
||||
|
||||
if FILE_SERVER_BASE_URL:
|
||||
base = urlsplit(FILE_SERVER_BASE_URL)
|
||||
# Only assign if present to allow partial base definitions
|
||||
if base.scheme:
|
||||
target_scheme = base.scheme
|
||||
if base.hostname:
|
||||
target_host = base.hostname
|
||||
if base.port:
|
||||
target_port = base.port
|
||||
|
||||
# Decide whether to rewrite
|
||||
incoming_host = parts.hostname
|
||||
should_rewrite = (incoming_host is None) or (incoming_host.lower() == "server")
|
||||
|
||||
if should_rewrite:
|
||||
# Build netloc with port (always include port to be explicit)
|
||||
netloc = f"{target_host}:{target_port}" if target_port else target_host
|
||||
new_parts = (
|
||||
target_scheme,
|
||||
netloc,
|
||||
parts.path or "/",
|
||||
parts.query,
|
||||
parts.fragment,
|
||||
)
|
||||
return urlunsplit(new_parts)
|
||||
else:
|
||||
# Keep original if it's already a proper absolute URL
|
||||
return original_url
|
||||
except Exception as e:
|
||||
logging.warning(f"Could not resolve URL, using original: {original_url} (error: {e})")
|
||||
return original_url
|
||||
|
||||
|
||||
def get_latest_screenshot():
|
||||
"""Get the latest screenshot from the host OS shared folder"""
|
||||
try:
|
||||
screenshot_dir = os.path.join(os.path.dirname(__file__), "screenshots")
|
||||
if not os.path.exists(screenshot_dir):
|
||||
return None
|
||||
|
||||
# Find the most recent screenshot file
|
||||
screenshot_files = [f for f in os.listdir(screenshot_dir)
|
||||
if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
|
||||
|
||||
if not screenshot_files:
|
||||
return None
|
||||
|
||||
# Get the most recent file
|
||||
latest_file = max(screenshot_files,
|
||||
key=lambda f: os.path.getmtime(os.path.join(screenshot_dir, f)))
|
||||
|
||||
screenshot_path = os.path.join(screenshot_dir, latest_file)
|
||||
|
||||
# Read and encode screenshot
|
||||
with open(screenshot_path, "rb") as f:
|
||||
screenshot_data = base64.b64encode(f.read()).decode('utf-8')
|
||||
|
||||
# Get file info
|
||||
file_stats = os.stat(screenshot_path)
|
||||
|
||||
return {
|
||||
"filename": latest_file,
|
||||
"data": screenshot_data,
|
||||
"timestamp": datetime.fromtimestamp(file_stats.st_mtime).isoformat(),
|
||||
"size": file_stats.st_size
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error reading screenshot: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def send_screenshot_heartbeat(client, client_id):
|
||||
"""Send heartbeat with screenshot to server for dashboard monitoring"""
|
||||
try:
|
||||
screenshot_info = get_latest_screenshot()
|
||||
|
||||
heartbeat_data = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"client_id": client_id,
|
||||
"status": "alive",
|
||||
"screenshot": screenshot_info,
|
||||
"system_info": {
|
||||
"hostname": socket.gethostname(),
|
||||
"ip": get_ip(),
|
||||
"uptime": time.time() # Could be replaced with actual uptime
|
||||
}
|
||||
}
|
||||
|
||||
# Send to dashboard monitoring topic
|
||||
dashboard_topic = f"infoscreen/{client_id}/dashboard"
|
||||
client.publish(dashboard_topic, json.dumps(heartbeat_data))
|
||||
|
||||
if screenshot_info:
|
||||
logging.info(f"Screenshot heartbeat sent: {screenshot_info['filename']} ({screenshot_info['size']} bytes)")
|
||||
else:
|
||||
logging.debug("Heartbeat sent without screenshot")
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error sending screenshot heartbeat: {e}")
|
||||
|
||||
|
||||
def screenshot_service_thread(client, client_id):
|
||||
"""Background thread for screenshot monitoring and transmission"""
|
||||
logging.info(f"Screenshot service started with {SCREENSHOT_INTERVAL}s interval")
|
||||
|
||||
while True:
|
||||
try:
|
||||
send_screenshot_heartbeat(client, client_id)
|
||||
time.sleep(SCREENSHOT_INTERVAL)
|
||||
except Exception as e:
|
||||
logging.error(f"Screenshot service error: {e}")
|
||||
time.sleep(60) # Wait a minute before retrying
|
||||
|
||||
|
||||
def main():
|
||||
global discovered
|
||||
logging.info("Client starting - deleting old event file if present")
|
||||
delete_event_file()
|
||||
|
||||
client_id = get_persistent_uuid()
|
||||
hardware_token = get_hardware_token()
|
||||
ip_addr = get_ip()
|
||||
|
||||
# Persistenz für group_id (needed in on_connect)
|
||||
group_id_path = os.path.join(os.path.dirname(__file__), "config", "last_group_id.txt")
|
||||
current_group_id = load_last_group_id(group_id_path)
|
||||
event_topic = None
|
||||
|
||||
# paho-mqtt v2: opt into latest callback API to avoid deprecation warnings.
|
||||
client_kwargs = {"protocol": mqtt.MQTTv311}
|
||||
try:
|
||||
# Use enum when available (paho-mqtt >= 2.0)
|
||||
if hasattr(mqtt, "CallbackAPIVersion"):
|
||||
client_kwargs["callback_api_version"] = mqtt.CallbackAPIVersion.VERSION2
|
||||
except Exception:
|
||||
pass
|
||||
client = mqtt.Client(**client_kwargs)
|
||||
client.on_message = on_message
|
||||
|
||||
# Define subscribe_event_topic BEFORE on_connect so it can be called from the callback
|
||||
def subscribe_event_topic(new_group_id):
|
||||
nonlocal event_topic, current_group_id
|
||||
|
||||
# Check if group actually changed to handle cleanup
|
||||
group_changed = new_group_id != current_group_id
|
||||
|
||||
if group_changed:
|
||||
if current_group_id is not None:
|
||||
logging.info(f"Group change from {current_group_id} to {new_group_id} - deleting old event file")
|
||||
delete_event_file()
|
||||
if event_topic:
|
||||
client.unsubscribe(event_topic)
|
||||
logging.info(f"Unsubscribed from event topic: {event_topic}")
|
||||
|
||||
# Always ensure the event topic is subscribed
|
||||
new_event_topic = f"infoscreen/events/{new_group_id}"
|
||||
|
||||
# Only subscribe if we don't already have this topic subscribed
|
||||
if event_topic != new_event_topic:
|
||||
if event_topic:
|
||||
client.unsubscribe(event_topic)
|
||||
logging.info(f"Unsubscribed from event topic: {event_topic}")
|
||||
|
||||
event_topic = new_event_topic
|
||||
client.subscribe(event_topic)
|
||||
logging.info(f"Subscribing to event topic: {event_topic} for group_id: {new_group_id}")
|
||||
else:
|
||||
logging.info(f"Event topic already subscribed: {event_topic}")
|
||||
|
||||
# Update current group_id and save it
|
||||
if group_changed:
|
||||
current_group_id = new_group_id
|
||||
save_last_group_id(group_id_path, new_group_id)
|
||||
|
||||
# on_connect callback: Subscribe to all topics after connection is established
|
||||
def on_connect(client, userdata, flags, rc, properties=None):
|
||||
if rc == 0:
|
||||
logging.info("MQTT connected successfully - subscribing to topics...")
|
||||
# Discovery-ACK-Topic abonnieren
|
||||
ack_topic = f"infoscreen/{client_id}/discovery_ack"
|
||||
client.subscribe(ack_topic)
|
||||
logging.info(f"Subscribed to: {ack_topic}")
|
||||
|
||||
# Config topic
|
||||
client.subscribe(f"infoscreen/{client_id}/config")
|
||||
logging.info(f"Subscribed to: infoscreen/{client_id}/config")
|
||||
|
||||
# group_id Topic abonnieren (retained)
|
||||
group_id_topic = f"infoscreen/{client_id}/group_id"
|
||||
client.subscribe(group_id_topic)
|
||||
logging.info(f"Subscribed to: {group_id_topic}")
|
||||
|
||||
# Wenn beim Start eine group_id vorhanden ist, sofort Event-Topic abonnieren
|
||||
if current_group_id:
|
||||
logging.info(f"Subscribing to event topic for saved group_id: {current_group_id}")
|
||||
subscribe_event_topic(current_group_id)
|
||||
else:
|
||||
logging.error(f"MQTT connection failed with code: {rc}")
|
||||
|
||||
client.on_connect = on_connect
|
||||
|
||||
# Robust MQTT connect with fallbacks and retries
|
||||
broker_candidates = [MQTT_BROKER]
|
||||
# Add environment-provided fallbacks
|
||||
broker_candidates.extend([b for b in MQTT_BROKER_FALLBACKS if b not in broker_candidates])
|
||||
# Add common local fallbacks
|
||||
for alt in ("127.0.0.1", "localhost", "mqtt"):
|
||||
if alt not in broker_candidates:
|
||||
broker_candidates.append(alt)
|
||||
|
||||
connect_ok = False
|
||||
last_error = None
|
||||
for attempt in range(1, 6): # up to 5 attempts
|
||||
for host in broker_candidates:
|
||||
try:
|
||||
logging.info(f"Connecting to MQTT broker {host}:{MQTT_PORT} (attempt {attempt}/5)...")
|
||||
client.connect(host, MQTT_PORT)
|
||||
connect_ok = True
|
||||
MQTT_HOST_USED = host # noqa: N816 local doc variable
|
||||
break
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
logging.warning(f"MQTT connection to {host}:{MQTT_PORT} failed: {e}")
|
||||
if connect_ok:
|
||||
break
|
||||
backoff = min(5 * attempt, 20)
|
||||
logging.info(f"Retrying connection in {backoff}s...")
|
||||
time.sleep(backoff)
|
||||
|
||||
if not connect_ok:
|
||||
logging.error(f"MQTT connection failed after multiple attempts: {last_error}")
|
||||
raise last_error
|
||||
|
||||
# Wait for connection to complete and on_connect callback to fire
|
||||
# This ensures subscriptions are set up before we start discovery
|
||||
logging.info("Waiting for on_connect callback and subscription setup...")
|
||||
for _ in range(10): # Wait up to ~1 second
|
||||
client.loop(timeout=0.1)
|
||||
time.sleep(0.1)
|
||||
logging.info("Subscription setup complete, starting discovery phase")
|
||||
|
||||
# group_id message callback
|
||||
group_id_topic = f"infoscreen/{client_id}/group_id"
|
||||
def on_group_id_message(client, userdata, msg, properties=None):
|
||||
payload = msg.payload.decode().strip()
|
||||
new_group_id = None
|
||||
# Versuche, group_id aus JSON zu extrahieren, sonst als String verwenden
|
||||
try:
|
||||
data = json.loads(payload)
|
||||
if isinstance(data, dict) and "group_id" in data:
|
||||
new_group_id = str(data["group_id"])
|
||||
else:
|
||||
new_group_id = str(data)
|
||||
except Exception:
|
||||
new_group_id = payload
|
||||
new_group_id = new_group_id.strip()
|
||||
if new_group_id:
|
||||
if new_group_id != current_group_id:
|
||||
logging.info(f"New group_id received: {new_group_id}")
|
||||
else:
|
||||
logging.info(f"group_id unchanged: {new_group_id}, ensuring event topic is subscribed")
|
||||
# Always call subscribe_event_topic to ensure subscription
|
||||
subscribe_event_topic(new_group_id)
|
||||
else:
|
||||
logging.warning("Empty group_id received!")
|
||||
client.message_callback_add(group_id_topic, on_group_id_message)
|
||||
logging.info(f"Current group_id at start: {current_group_id if current_group_id else 'none'}")
|
||||
|
||||
# Discovery-Phase: Sende Discovery bis ACK empfangen
|
||||
while not discovered:
|
||||
send_discovery(client, client_id, hardware_token, ip_addr)
|
||||
# Check for messages and discovered flag more frequently
|
||||
for _ in range(int(HEARTBEAT_INTERVAL)):
|
||||
client.loop(timeout=1.0)
|
||||
if discovered:
|
||||
break
|
||||
time.sleep(1)
|
||||
if discovered:
|
||||
break
|
||||
|
||||
# Start screenshot service in background thread
|
||||
screenshot_thread = threading.Thread(
|
||||
target=screenshot_service_thread,
|
||||
args=(client, client_id),
|
||||
daemon=True
|
||||
)
|
||||
screenshot_thread.start()
|
||||
logging.info("Screenshot service thread started")
|
||||
|
||||
# Heartbeat-Loop
|
||||
last_heartbeat = 0
|
||||
while True:
|
||||
current_time = time.time()
|
||||
if current_time - last_heartbeat >= HEARTBEAT_INTERVAL:
|
||||
client.publish(f"infoscreen/{client_id}/heartbeat", "alive")
|
||||
logging.info("Heartbeat sent.")
|
||||
last_heartbeat = current_time
|
||||
client.loop(timeout=5.0)
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user