160 lines
5.3 KiB
Python
160 lines
5.3 KiB
Python
import sys
|
|
sys.path.append('/workspace')
|
|
import os
|
|
import json
|
|
import base64
|
|
import glob
|
|
from datetime import datetime
|
|
from paho.mqtt import client as mqtt_client
|
|
from sqlalchemy import create_engine, func
|
|
from sqlalchemy.orm import sessionmaker
|
|
from models.models import Client, Base
|
|
from helpers.check_folder import ensure_folder_exists
|
|
import shutil
|
|
|
|
# Basisverzeichnis relativ zum aktuellen Skript
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
# Konfiguration
|
|
MQTT_BROKER = os.getenv("MQTT_BROKER_HOST", "localhost")
|
|
MQTT_PORT = int(os.getenv("MQTT_BROKER_PORT", 1883))
|
|
MQTT_USER = os.getenv("MQTT_USER")
|
|
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD")
|
|
MQTT_KEEPALIVE = int(os.getenv("MQTT_KEEPALIVE"))
|
|
DB_USER = os.getenv("DB_USER")
|
|
DB_PASSWORD = os.getenv("DB_PASSWORD")
|
|
DB_HOST = os.getenv("DB_HOST")
|
|
DB_NAME = os.getenv("DB_NAME")
|
|
|
|
topics = [
|
|
("infoscreen/screenshot", 0),
|
|
("infoscreen/heartbeat", 0),
|
|
# ... weitere Topics hier
|
|
]
|
|
|
|
# Verzeichnisse für Screenshots
|
|
RECEIVED_DIR = os.path.join(BASE_DIR, "received_screenshots")
|
|
LATEST_DIR = os.path.join(BASE_DIR, "screenshots")
|
|
MAX_PER_CLIENT = 20
|
|
|
|
# Ordner für empfangene Screenshots und den neuesten Screenshot anlegen
|
|
ensure_folder_exists(RECEIVED_DIR)
|
|
ensure_folder_exists(LATEST_DIR)
|
|
|
|
# Datenbank konfigurieren (MariaDB)
|
|
DB_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"
|
|
engine = create_engine(DB_URL, echo=False)
|
|
Session = sessionmaker(bind=engine)
|
|
Base.metadata.create_all(engine)
|
|
|
|
|
|
def prune_old_screenshots(client_id: str):
|
|
"""Löscht alte Screenshots, wenn mehr als MAX_PER_CLIENT vorhanden sind."""
|
|
pattern = os.path.join(RECEIVED_DIR, f"{client_id}_*.jpg")
|
|
files = sorted(glob.glob(pattern), key=os.path.getmtime)
|
|
while len(files) > MAX_PER_CLIENT:
|
|
oldest = files.pop(0)
|
|
try:
|
|
os.remove(oldest)
|
|
print(f"Altes Bild gelöscht: {oldest}")
|
|
except OSError as e:
|
|
print(f"Fehler beim Löschen von {oldest}: {e}")
|
|
|
|
|
|
def handle_screenshot(msg):
|
|
"""Verarbeitet eingehende Screenshot-Payloads."""
|
|
try:
|
|
payload = json.loads(msg.payload.decode("utf-8"))
|
|
client_id = payload.get("client_id", "unknown")
|
|
ts = datetime.fromtimestamp(
|
|
payload.get("timestamp", datetime.now().timestamp())
|
|
)
|
|
b64_str = payload["screenshot"]
|
|
img_data = base64.b64decode(b64_str)
|
|
|
|
# Dateiname mit Client-ID und Zeitstempel
|
|
filename = ts.strftime(f"{client_id}_%Y%m%d_%H%M%S.jpg")
|
|
received_path = os.path.join(RECEIVED_DIR, filename)
|
|
|
|
# Bild im Verzeichnis "received_screenshots" speichern
|
|
with open(received_path, "wb") as f:
|
|
f.write(img_data)
|
|
print(f"Bild gespeichert: {received_path}")
|
|
|
|
# Kopiere den neuesten Screenshot in das Verzeichnis "screenshots"
|
|
latest_path = os.path.join(LATEST_DIR, f"{client_id}.jpg")
|
|
shutil.copy(received_path, latest_path)
|
|
print(f"Neuester Screenshot aktualisiert: {latest_path}")
|
|
|
|
# Alte Screenshots beschneiden
|
|
prune_old_screenshots(client_id)
|
|
|
|
except Exception as e:
|
|
print("Fehler beim Verarbeiten der Screenshot-Nachricht:", e)
|
|
|
|
|
|
def handle_heartbeat(msg):
|
|
"""Verarbeitet Heartbeat und aktualisiert oder legt Clients an."""
|
|
session = Session()
|
|
try:
|
|
payload = json.loads(msg.payload.decode("utf-8"))
|
|
uuid = payload.get("client_id")
|
|
hardware_hash = payload.get("hardware_hash")
|
|
ip_address = payload.get("ip_address")
|
|
# Versuche, Client zu finden
|
|
client = session.query(Client).filter_by(uuid=uuid).first()
|
|
if client:
|
|
# Bekannter Client: last_alive und IP aktualisieren
|
|
client.ip_address = ip_address
|
|
client.last_alive = func.now()
|
|
session.commit()
|
|
print(f"Heartbeat aktualisiert für Client {uuid}")
|
|
else:
|
|
# Neuer Client: Location per input abfragen
|
|
location = input(f"Neuer Client {uuid} gefunden. Bitte Standort eingeben: ")
|
|
new_client = Client(
|
|
uuid=uuid,
|
|
hardware_hash=hardware_hash,
|
|
location=location,
|
|
ip_address=ip_address
|
|
)
|
|
session.add(new_client)
|
|
session.commit()
|
|
print(f"Neuer Client {uuid} angelegt mit Standort {location}")
|
|
except Exception as e:
|
|
print("Fehler beim Verarbeiten der Heartbeat-Nachricht:", e)
|
|
session.rollback()
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
# Mapping von Topics auf Handler-Funktionen
|
|
handlers = {
|
|
"infoscreen/screenshot": handle_screenshot,
|
|
"infoscreen/heartbeat": handle_heartbeat,
|
|
# ... weitere Zuordnungen hier
|
|
}
|
|
|
|
|
|
def on_connect(client, userdata, flags, rc, properties):
|
|
print("Verbunden mit Code:", rc)
|
|
client.subscribe(topics)
|
|
|
|
|
|
def on_message(client, userdata, msg):
|
|
topic = msg.topic
|
|
if topic in handlers:
|
|
handlers[topic](msg)
|
|
else:
|
|
print(f"Unbekanntes Topic '{topic}', keine Verarbeitung definiert.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
client = mqtt_client.Client(callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2)
|
|
client.username_pw_set(MQTT_USER, MQTT_PASSWORD) # <<<< AUTHENTIFIZIERUNG
|
|
client.on_connect = on_connect
|
|
client.on_message = on_message
|
|
|
|
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=MQTT_KEEPALIVE)
|
|
client.loop_forever()
|