Files
infoscreen/server/mqtt_multitopic_receiver.py
2025-06-10 22:54:55 +02:00

149 lines
4.9 KiB
Python

import os
import json
import base64
import glob
from datetime import datetime, timezone
# import paho.mqtt.client as mqtt
from paho.mqtt import client as mqtt_client
import pytz
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker
from models import Client, Base
from helpers.check_folder import ensure_folder_exists
# Konfiguration
MQTT_BROKER = os.getenv("MQTT_BROKER_HOST", "localhost")
MQTT_PORT = int(os.getenv("MQTT_BROKER_PORT", 1883))
MQTT_USER = os.getenv("MQTT_USER")
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD")
MQTT_KEEPALIVE = int(os.getenv("MQTT_KEEPALIVE"))
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_NAME = os.getenv("DB_NAME")
topics = [
("infoscreen/screenshot", 0),
("infoscreen/heartbeat", 0),
# ... weitere Topics hier
]
SAVE_DIR = "received_screenshots"
MAX_PER_CLIENT = 20
# Ordner für empfangene Screenshots anlegen
ensure_folder_exists(SAVE_DIR)
# Datenbank konfigurieren (MariaDB)
# Ersetze user, password, host und datenbankname entsprechend.
DB_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"
engine = create_engine(DB_URL, echo=False)
Session = sessionmaker(bind=engine)
# Falls Tabellen noch nicht existieren
Base.metadata.create_all(engine)
def prune_old_screenshots(client_id: str):
"""Löscht alte Screenshots, wenn mehr als MAX_PER_CLIENT vorhanden sind."""
pattern = os.path.join(SAVE_DIR, f"{client_id}_*.jpg")
files = sorted(glob.glob(pattern), key=os.path.getmtime)
while len(files) > MAX_PER_CLIENT:
oldest = files.pop(0)
try:
os.remove(oldest)
print(f"Altes Bild gelöscht: {oldest}")
except OSError as e:
print(f"Fehler beim Löschen von {oldest}: {e}")
def handle_screenshot(msg):
"""Verarbeitet eingehende Screenshot-Payloads."""
try:
payload = json.loads(msg.payload.decode("utf-8"))
client_id = payload.get("client_id", "unknown")
ts = datetime.fromtimestamp(
payload.get("timestamp", datetime.now().timestamp())
)
b64_str = payload["screenshot"]
img_data = base64.b64decode(b64_str)
# Dateiname mit Client-ID und Zeitstempel
filename = ts.strftime(f"{client_id}_%Y%m%d_%H%M%S.jpg")
filepath = os.path.join(SAVE_DIR, filename)
# Bild speichern
with open(filepath, "wb") as f:
f.write(img_data)
print(f"Bild gespeichert: {filepath}")
# Alte Screenshots beschneiden
prune_old_screenshots(client_id)
except Exception as e:
print("Fehler beim Verarbeiten der Screenshot-Nachricht:", e)
def handle_heartbeat(msg):
"""Verarbeitet Heartbeat und aktualisiert oder legt Clients an."""
session = Session()
try:
payload = json.loads(msg.payload.decode("utf-8"))
uuid = payload.get("client_id")
hardware_hash = payload.get("hardware_hash")
ip_address = payload.get("ip_address")
# Versuche, Client zu finden
client = session.query(Client).filter_by(uuid=uuid).first()
if client:
# Bekannter Client: last_alive und IP aktualisieren
client.ip_address = ip_address
client.last_alive = func.now()
session.commit()
print(f"Heartbeat aktualisiert für Client {uuid}")
else:
# Neuer Client: Location per input abfragen
location = input(f"Neuer Client {uuid} gefunden. Bitte Standort eingeben: ")
# ip_address = msg._sock.getpeername()[0]
new_client = Client(
uuid=uuid,
hardware_hash=hardware_hash,
location=location,
ip_address=ip_address
)
session.add(new_client)
session.commit()
print(f"Neuer Client {uuid} angelegt mit Standort {location}")
except Exception as e:
print("Fehler beim Verarbeiten der Heartbeat-Nachricht:", e)
session.rollback()
finally:
session.close()
# Mapping von Topics auf Handler-Funktionen
handlers = {
"infoscreen/screenshot": handle_screenshot,
"infoscreen/heartbeat": handle_heartbeat,
# ... weitere Zuordnungen hier
}
def on_connect(client, userdata, flags, rc, properties):
print("Verbunden mit Code:", rc)
client.subscribe(topics)
def on_message(client, userdata, msg):
topic = msg.topic
if topic in handlers:
handlers[topic](msg)
else:
print(f"Unbekanntes Topic '{topic}', keine Verarbeitung definiert.")
if __name__ == "__main__":
client = mqtt_client.Client(callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2)
client.username_pw_set(MQTT_USER, MQTT_PASSWORD) # <<<< AUTHENTIFIZIERUNG
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=MQTT_KEEPALIVE)
client.loop_forever()