diff --git a/listener/listener.py b/listener/listener.py index 904f8b1..ed7f6b3 100644 --- a/listener/listener.py +++ b/listener/listener.py @@ -1,19 +1,26 @@ + import os import json import logging import paho.mqtt.client as mqtt - -from sqlalchemy import create_engine +from sqlalchemy import create_engine, func from sqlalchemy.orm import sessionmaker from models.models import Client +from dotenv import load_dotenv + +load_dotenv("/workspace/.env") + +# ENV-abhängige Konfiguration +ENV = os.getenv("ENV", "development") +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO" if ENV == "production" else "DEBUG") +DB_URL = os.environ.get( + "DB_CONN", "mysql+pymysql://user:password@db/infoscreen") # Logging -logging.basicConfig(level=logging.INFO, +logging.basicConfig(level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), format='%(asctime)s [%(levelname)s] %(message)s') -# DB-Konfiguration (Beispiel: MariaDB/MySQL, anpassen!) -DB_URL = os.environ.get( - "DB_URL", "mysql+pymysql://user:password@db/infoscreen") +# DB-Konfiguration engine = create_engine(DB_URL) Session = sessionmaker(bind=engine) @@ -22,16 +29,31 @@ Session = sessionmaker(bind=engine) def on_message(client, userdata, msg): + topic = msg.topic try: + # Heartbeat-Handling + if topic.startswith("infoscreen/") and topic.endswith("/heartbeat"): + uuid = topic.split("/")[1] + session = Session() + client_obj = session.query(Client).filter_by(uuid=uuid).first() + if client_obj: + from sqlalchemy import func + client_obj.last_alive = func.current_timestamp() + session.commit() + logging.info( + f"Heartbeat von {uuid} empfangen, last_alive aktualisiert.") + session.close() + return + + # Discovery-Handling payload = json.loads(msg.payload.decode()) logging.info(f"Discovery empfangen: {payload}") session = Session() - # Prüfen, ob Client schon existiert existing = session.query(Client).filter_by( - client_id=payload["client_id"]).first() + uuid=payload["uuid"]).first() if not existing: new_client = Client( - client_id=payload.get("client_id"), + uuid=payload.get("uuid"), hardware_token=payload.get("hardware_token"), ip=payload.get("ip"), type=payload.get("type"), @@ -43,10 +65,14 @@ def on_message(client, userdata, msg): ) session.add(new_client) session.commit() - logging.info(f"Neuer Client registriert: {payload['client_id']}") + logging.info(f"Neuer Client registriert: {payload['uuid']}") else: - logging.info(f"Client bereits bekannt: {payload['client_id']}") + logging.info(f"Client bereits bekannt: {payload['uuid']}") session.close() + # Discovery-ACK senden + ack_topic = f"infoscreen/{payload['uuid']}/discovery_ack" + client.publish(ack_topic, json.dumps({"status": "ok"})) + logging.info(f"Discovery-ACK gesendet an {ack_topic}") except Exception as e: logging.error(f"Fehler bei Verarbeitung: {e}") @@ -56,7 +82,9 @@ def main(): mqtt_client.on_message = on_message mqtt_client.connect("mqtt", 1883) mqtt_client.subscribe("infoscreen/discovery") - logging.info("Listener gestartet und abonniert auf infoscreen/discovery") + mqtt_client.subscribe("infoscreen/+/heartbeat") + logging.info( + "Listener gestartet und abonniert auf infoscreen/discovery und infoscreen/+/heartbeat") mqtt_client.loop_forever() diff --git a/listener/requirements.txt b/listener/requirements.txt index 1d23772..7b769ef 100644 --- a/listener/requirements.txt +++ b/listener/requirements.txt @@ -1,3 +1,4 @@ paho-mqtt>=2.0 SQLAlchemy>=2.0 pymysql +python-dotenv diff --git a/models/models.py b/models/models.py index 9b84aab..98dc611 100644 --- a/models/models.py +++ b/models/models.py @@ -38,9 +38,14 @@ class ClientGroup(Base): class Client(Base): __tablename__ = 'clients' uuid = Column(String(36), primary_key=True, nullable=False) - hardware_hash = Column(String(64), nullable=False, index=True) - location = Column(String(100), nullable=True) - ip_address = Column(String(45), nullable=True, index=True) + hardware_token = Column(String(64), nullable=True) + ip = Column(String(45), nullable=True) + type = Column(String(50), nullable=True) + hostname = Column(String(100), nullable=True) + os_version = Column(String(100), nullable=True) + software_version = Column(String(100), nullable=True) + macs = Column(String(255), nullable=True) + model = Column(String(100), nullable=True) registration_time = Column(TIMESTAMP( timezone=True), server_default=func.current_timestamp(), nullable=False) last_alive = Column(TIMESTAMP(timezone=True), server_default=func.current_timestamp( diff --git a/server/alembic/versions/207f5b190f93_update_clients_table_for_new_fields.py b/server/alembic/versions/207f5b190f93_update_clients_table_for_new_fields.py new file mode 100644 index 0000000..e5e58da --- /dev/null +++ b/server/alembic/versions/207f5b190f93_update_clients_table_for_new_fields.py @@ -0,0 +1,56 @@ +"""Update clients table for new fields + +Revision ID: 207f5b190f93 +Revises: 3d15c3cac7b6 +Create Date: 2025-07-15 14:12:42.427274 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + +# revision identifiers, used by Alembic. +revision: str = '207f5b190f93' +down_revision: Union[str, None] = '3d15c3cac7b6' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('clients', sa.Column('hardware_token', sa.String(length=64), nullable=True)) + op.add_column('clients', sa.Column('ip', sa.String(length=45), nullable=True)) + op.add_column('clients', sa.Column('type', sa.String(length=50), nullable=True)) + op.add_column('clients', sa.Column('hostname', sa.String(length=100), nullable=True)) + op.add_column('clients', sa.Column('os_version', sa.String(length=100), nullable=True)) + op.add_column('clients', sa.Column('software_version', sa.String(length=100), nullable=True)) + op.add_column('clients', sa.Column('macs', sa.String(length=255), nullable=True)) + op.add_column('clients', sa.Column('model', sa.String(length=100), nullable=True)) + op.drop_index(op.f('ix_clients_hardware_hash'), table_name='clients') + op.drop_index(op.f('ix_clients_ip_address'), table_name='clients') + op.drop_column('clients', 'location') + op.drop_column('clients', 'hardware_hash') + op.drop_column('clients', 'ip_address') + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('clients', sa.Column('ip_address', mysql.VARCHAR(length=45), nullable=True)) + op.add_column('clients', sa.Column('hardware_hash', mysql.VARCHAR(length=64), nullable=False)) + op.add_column('clients', sa.Column('location', mysql.VARCHAR(length=100), nullable=True)) + op.create_index(op.f('ix_clients_ip_address'), 'clients', ['ip_address'], unique=False) + op.create_index(op.f('ix_clients_hardware_hash'), 'clients', ['hardware_hash'], unique=False) + op.drop_column('clients', 'model') + op.drop_column('clients', 'macs') + op.drop_column('clients', 'software_version') + op.drop_column('clients', 'os_version') + op.drop_column('clients', 'hostname') + op.drop_column('clients', 'type') + op.drop_column('clients', 'ip') + op.drop_column('clients', 'hardware_token') + # ### end Alembic commands ### diff --git a/simclient/simclient.py b/simclient/simclient.py index 037a6e2..a2e16d2 100644 --- a/simclient/simclient.py +++ b/simclient/simclient.py @@ -1,4 +1,5 @@ # simclient/simclient.py + import time import uuid import json @@ -9,23 +10,43 @@ import os import re import platform import logging -DEBUG_MODE = True # Auf False setzen für Produktion +from dotenv import load_dotenv + +# ENV laden +load_dotenv("/workspace/simclient/.env") + +# Konfiguration aus ENV +ENV = os.getenv("ENV", "development") +HEARTBEAT_INTERVAL = int( + os.getenv("HEARTBEAT_INTERVAL", 5 if ENV == "development" else 60)) +LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG" if ENV == "development" else "INFO") +MQTT_BROKER = os.getenv("MQTT_BROKER", "mqtt") +MQTT_PORT = int(os.getenv("MQTT_PORT", 1883)) +DEBUG_MODE = os.getenv("DEBUG_MODE", "1" if ENV == + "development" else "0") in ("1", "true", "True") # Logging-Konfiguration -LOG_DIR = os.path.dirname(os.path.abspath(__file__)) -LOG_PATH = os.path.join(LOG_DIR, "simclient.log") +LOG_PATH = "/tmp/simclient.log" log_handlers = [logging.FileHandler(LOG_PATH, encoding="utf-8")] if DEBUG_MODE: log_handlers.append(logging.StreamHandler()) logging.basicConfig( - level=logging.DEBUG if DEBUG_MODE else logging.INFO, + level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), format="%(asctime)s [%(levelname)s] %(message)s", handlers=log_handlers ) +discovered = False + + def on_message(client, userdata, msg): + global discovered logging.info(f"Empfangen: {msg.topic} {msg.payload.decode()}") + # ACK-Quittung empfangen? + if msg.topic.endswith("/discovery_ack"): + discovered = True + logging.info("Discovery-ACK empfangen. Starte Heartbeat.") def get_mac_addresses(): @@ -107,7 +128,7 @@ SOFTWARE_VERSION = "1.0.0" # Optional: Anpassen bei neuen Releases def send_discovery(client, client_id, hardware_token, ip_addr): macs = get_mac_addresses() discovery_msg = { - "client_id": client_id, + "uuid": client_id, "hardware_token": hardware_token, "ip": ip_addr, "type": "infoscreen", @@ -122,20 +143,30 @@ def send_discovery(client, client_id, hardware_token, ip_addr): def main(): + global discovered client_id = str(uuid.uuid4()) hardware_token = get_hardware_token() ip_addr = get_ip() client = mqtt.Client(protocol=mqtt.MQTTv311, callback_api_version=2) client.on_message = on_message - client.connect("mqtt", 1883) + client.connect(MQTT_BROKER, MQTT_PORT) + # Discovery-ACK-Topic abonnieren + ack_topic = f"infoscreen/{client_id}/discovery_ack" + client.subscribe(ack_topic) client.subscribe(f"infoscreen/{client_id}/config") - send_discovery(client, client_id, hardware_token, ip_addr) + + # Discovery-Phase: Sende Discovery bis ACK empfangen + while not discovered: + send_discovery(client, client_id, hardware_token, ip_addr) + client.loop(timeout=1.0) + time.sleep(HEARTBEAT_INTERVAL) + + # Heartbeat-Phase while True: - # Heartbeat senden client.publish(f"infoscreen/{client_id}/heartbeat", "alive") logging.debug("Heartbeat gesendet.") client.loop(timeout=1.0) - time.sleep(5) + time.sleep(HEARTBEAT_INTERVAL) if __name__ == "__main__":