# dashboard/utils/mqtt_client.py import os import threading import time from dotenv import load_dotenv import paho.mqtt.client as mqtt import random # 1. Laden der Umgebungsvariablen aus .env load_dotenv(dotenv_path=os.path.join(os.path.dirname(__file__), "..", ".env")) # 2. Lese MQTT‐Einstellungen MQTT_BROKER_HOST = os.getenv("MQTT_BROKER_HOST", "localhost") MQTT_BROKER_PORT = int(os.getenv("MQTT_BROKER_PORT", "1883")) MQTT_USERNAME = os.getenv("MQTT_USERNAME", None) MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", None) MQTT_KEEPALIVE = int(os.getenv("MQTT_KEEPALIVE", "60")) base_id = os.getenv("MQTT_CLIENT_ID", "dash") unique_part = f"{os.getpid()}_{random.randint(1000,9999)}" MQTT_CLIENT_ID = f"{base_id}-{unique_part}" # 3. Erstelle eine globale Client‐Instanz client = mqtt.Client(client_id=MQTT_CLIENT_ID) # Falls Nutzer/Passwort gesetzt sind, authentifizieren if MQTT_USERNAME and MQTT_PASSWORD: client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) # 4. Callback‐Stubs (kannst du bei Bedarf anpassen) def _on_connect(client, userdata, flags, rc): if rc == 0: print(f"[mqtt_client.py] Erfolgreich mit MQTT‐Broker verbunden (Code {rc})") else: print(f"[mqtt_client.py] Verbindungsfehler, rc={rc}") def _on_disconnect(client, userdata, rc): print(f"[mqtt_client.py] Verbindung getrennt (rc={rc}). Versuche, neu zu verbinden …") def _on_message(client, userdata, msg): """ Diese Callback‐Funktion wird aufgerufen, sobald eine Nachricht auf einem Topic ankommt, auf das wir subscribed haben. Du kannst hier eine Queue füllen oder direkt eine Datenbank‐Funktion aufrufen. """ topic = msg.topic payload = msg.payload.decode("utf-8", errors="ignore") print(f"[mqtt_client.py] Nachricht eingegangen – Topic: {topic}, Payload: {payload}") # Beispiel: Wenn du Live‐Statusdaten in die Datenbank schreibst, # könntest du hier utils/db.execute_non_query(...) aufrufen. # 5. Setze die Callbacks client.on_connect = _on_connect client.on_disconnect = _on_disconnect client.on_message = _on_message def start_loop(): """ Startet die Endlos‐Schleife, in der der Client auf eingehende MQTT‐Nachrichten hört und automatisch reconnectet. Muss idealerweise in einem eigenen Thread laufen, damit Dash‐Callbacks nicht blockieren. """ try: client.connect(MQTT_BROKER_HOST, MQTT_BROKER_PORT, keepalive=MQTT_KEEPALIVE) client.loop_start() except Exception as e: print(f"[mqtt_client.py] Konnte keine Verbindung zum MQTT‐Broker herstellen: {e}") def stop_loop(): """ Stoppt die MQTT‐Loop und trennt die Verbindung. """ try: client.loop_stop() client.disconnect() except Exception as e: print(f"[mqtt_client.py] Fehler beim Stoppen der MQTT‐Schleife: {e}") def publish(topic: str, payload: str, qos: int = 0, retain: bool = False) -> bool: """ Verschickt eine MQTT‐Nachricht: - topic: z. B. "clients/{client_id}/control" - payload: z. B. '{"command":"restart"}' - qos: 0, 1 oder 2 - retain: True/False Rückgabe: True, falls Veröffentlichung bestätigt wurde; sonst False. """ try: result = client.publish(topic, payload, qos=qos, retain=retain) status = result.rc # 0=Erfolg, sonst Fehler if status == mqtt.MQTT_ERR_SUCCESS: return True else: print(f"[mqtt_client.py] Publish-Fehler für Topic {topic}, rc={status}") return False except Exception as e: print(f"[mqtt_client.py] Exception beim Publish: {e}") return False def subscribe(topic: str, qos: int = 0) -> bool: """ Abonniert ein MQTT‐Topic, sodass _on_message gerufen wird, sobald Nachrichten ankommen. Rückgabe: True bei Erfolg, ansonsten False. """ try: result, mid = client.subscribe(topic, qos=qos) if result == mqtt.MQTT_ERR_SUCCESS: return True else: print(f"[mqtt_client.py] Subscribe‐Fehler für Topic {topic}, rc={result}") return False except Exception as e: print(f"[mqtt_client.py] Exception beim Subscribe: {e}") return False