functional system simclient<-> listener<->server
This commit is contained in:
@@ -1,19 +1,26 @@
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import paho.mqtt.client as mqtt
|
import paho.mqtt.client as mqtt
|
||||||
|
from sqlalchemy import create_engine, func
|
||||||
from sqlalchemy import create_engine
|
|
||||||
from sqlalchemy.orm import sessionmaker
|
from sqlalchemy.orm import sessionmaker
|
||||||
from models.models import Client
|
from models.models import Client
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
load_dotenv("/workspace/.env")
|
||||||
|
|
||||||
|
# ENV-abhängige Konfiguration
|
||||||
|
ENV = os.getenv("ENV", "development")
|
||||||
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO" if ENV == "production" else "DEBUG")
|
||||||
|
DB_URL = os.environ.get(
|
||||||
|
"DB_CONN", "mysql+pymysql://user:password@db/infoscreen")
|
||||||
|
|
||||||
# Logging
|
# Logging
|
||||||
logging.basicConfig(level=logging.INFO,
|
logging.basicConfig(level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
|
||||||
format='%(asctime)s [%(levelname)s] %(message)s')
|
format='%(asctime)s [%(levelname)s] %(message)s')
|
||||||
|
|
||||||
# DB-Konfiguration (Beispiel: MariaDB/MySQL, anpassen!)
|
# DB-Konfiguration
|
||||||
DB_URL = os.environ.get(
|
|
||||||
"DB_URL", "mysql+pymysql://user:password@db/infoscreen")
|
|
||||||
engine = create_engine(DB_URL)
|
engine = create_engine(DB_URL)
|
||||||
Session = sessionmaker(bind=engine)
|
Session = sessionmaker(bind=engine)
|
||||||
|
|
||||||
@@ -22,16 +29,31 @@ Session = sessionmaker(bind=engine)
|
|||||||
|
|
||||||
|
|
||||||
def on_message(client, userdata, msg):
|
def on_message(client, userdata, msg):
|
||||||
|
topic = msg.topic
|
||||||
try:
|
try:
|
||||||
|
# Heartbeat-Handling
|
||||||
|
if topic.startswith("infoscreen/") and topic.endswith("/heartbeat"):
|
||||||
|
uuid = topic.split("/")[1]
|
||||||
|
session = Session()
|
||||||
|
client_obj = session.query(Client).filter_by(uuid=uuid).first()
|
||||||
|
if client_obj:
|
||||||
|
from sqlalchemy import func
|
||||||
|
client_obj.last_alive = func.current_timestamp()
|
||||||
|
session.commit()
|
||||||
|
logging.info(
|
||||||
|
f"Heartbeat von {uuid} empfangen, last_alive aktualisiert.")
|
||||||
|
session.close()
|
||||||
|
return
|
||||||
|
|
||||||
|
# Discovery-Handling
|
||||||
payload = json.loads(msg.payload.decode())
|
payload = json.loads(msg.payload.decode())
|
||||||
logging.info(f"Discovery empfangen: {payload}")
|
logging.info(f"Discovery empfangen: {payload}")
|
||||||
session = Session()
|
session = Session()
|
||||||
# Prüfen, ob Client schon existiert
|
|
||||||
existing = session.query(Client).filter_by(
|
existing = session.query(Client).filter_by(
|
||||||
client_id=payload["client_id"]).first()
|
uuid=payload["uuid"]).first()
|
||||||
if not existing:
|
if not existing:
|
||||||
new_client = Client(
|
new_client = Client(
|
||||||
client_id=payload.get("client_id"),
|
uuid=payload.get("uuid"),
|
||||||
hardware_token=payload.get("hardware_token"),
|
hardware_token=payload.get("hardware_token"),
|
||||||
ip=payload.get("ip"),
|
ip=payload.get("ip"),
|
||||||
type=payload.get("type"),
|
type=payload.get("type"),
|
||||||
@@ -43,10 +65,14 @@ def on_message(client, userdata, msg):
|
|||||||
)
|
)
|
||||||
session.add(new_client)
|
session.add(new_client)
|
||||||
session.commit()
|
session.commit()
|
||||||
logging.info(f"Neuer Client registriert: {payload['client_id']}")
|
logging.info(f"Neuer Client registriert: {payload['uuid']}")
|
||||||
else:
|
else:
|
||||||
logging.info(f"Client bereits bekannt: {payload['client_id']}")
|
logging.info(f"Client bereits bekannt: {payload['uuid']}")
|
||||||
session.close()
|
session.close()
|
||||||
|
# Discovery-ACK senden
|
||||||
|
ack_topic = f"infoscreen/{payload['uuid']}/discovery_ack"
|
||||||
|
client.publish(ack_topic, json.dumps({"status": "ok"}))
|
||||||
|
logging.info(f"Discovery-ACK gesendet an {ack_topic}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Fehler bei Verarbeitung: {e}")
|
logging.error(f"Fehler bei Verarbeitung: {e}")
|
||||||
|
|
||||||
@@ -56,7 +82,9 @@ def main():
|
|||||||
mqtt_client.on_message = on_message
|
mqtt_client.on_message = on_message
|
||||||
mqtt_client.connect("mqtt", 1883)
|
mqtt_client.connect("mqtt", 1883)
|
||||||
mqtt_client.subscribe("infoscreen/discovery")
|
mqtt_client.subscribe("infoscreen/discovery")
|
||||||
logging.info("Listener gestartet und abonniert auf infoscreen/discovery")
|
mqtt_client.subscribe("infoscreen/+/heartbeat")
|
||||||
|
logging.info(
|
||||||
|
"Listener gestartet und abonniert auf infoscreen/discovery und infoscreen/+/heartbeat")
|
||||||
mqtt_client.loop_forever()
|
mqtt_client.loop_forever()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
paho-mqtt>=2.0
|
paho-mqtt>=2.0
|
||||||
SQLAlchemy>=2.0
|
SQLAlchemy>=2.0
|
||||||
pymysql
|
pymysql
|
||||||
|
python-dotenv
|
||||||
|
|||||||
@@ -38,9 +38,14 @@ class ClientGroup(Base):
|
|||||||
class Client(Base):
|
class Client(Base):
|
||||||
__tablename__ = 'clients'
|
__tablename__ = 'clients'
|
||||||
uuid = Column(String(36), primary_key=True, nullable=False)
|
uuid = Column(String(36), primary_key=True, nullable=False)
|
||||||
hardware_hash = Column(String(64), nullable=False, index=True)
|
hardware_token = Column(String(64), nullable=True)
|
||||||
location = Column(String(100), nullable=True)
|
ip = Column(String(45), nullable=True)
|
||||||
ip_address = Column(String(45), nullable=True, index=True)
|
type = Column(String(50), nullable=True)
|
||||||
|
hostname = Column(String(100), nullable=True)
|
||||||
|
os_version = Column(String(100), nullable=True)
|
||||||
|
software_version = Column(String(100), nullable=True)
|
||||||
|
macs = Column(String(255), nullable=True)
|
||||||
|
model = Column(String(100), nullable=True)
|
||||||
registration_time = Column(TIMESTAMP(
|
registration_time = Column(TIMESTAMP(
|
||||||
timezone=True), server_default=func.current_timestamp(), nullable=False)
|
timezone=True), server_default=func.current_timestamp(), nullable=False)
|
||||||
last_alive = Column(TIMESTAMP(timezone=True), server_default=func.current_timestamp(
|
last_alive = Column(TIMESTAMP(timezone=True), server_default=func.current_timestamp(
|
||||||
|
|||||||
@@ -0,0 +1,56 @@
|
|||||||
|
"""Update clients table for new fields
|
||||||
|
|
||||||
|
Revision ID: 207f5b190f93
|
||||||
|
Revises: 3d15c3cac7b6
|
||||||
|
Create Date: 2025-07-15 14:12:42.427274
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from sqlalchemy.dialects import mysql
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = '207f5b190f93'
|
||||||
|
down_revision: Union[str, None] = '3d15c3cac7b6'
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
"""Upgrade schema."""
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.add_column('clients', sa.Column('hardware_token', sa.String(length=64), nullable=True))
|
||||||
|
op.add_column('clients', sa.Column('ip', sa.String(length=45), nullable=True))
|
||||||
|
op.add_column('clients', sa.Column('type', sa.String(length=50), nullable=True))
|
||||||
|
op.add_column('clients', sa.Column('hostname', sa.String(length=100), nullable=True))
|
||||||
|
op.add_column('clients', sa.Column('os_version', sa.String(length=100), nullable=True))
|
||||||
|
op.add_column('clients', sa.Column('software_version', sa.String(length=100), nullable=True))
|
||||||
|
op.add_column('clients', sa.Column('macs', sa.String(length=255), nullable=True))
|
||||||
|
op.add_column('clients', sa.Column('model', sa.String(length=100), nullable=True))
|
||||||
|
op.drop_index(op.f('ix_clients_hardware_hash'), table_name='clients')
|
||||||
|
op.drop_index(op.f('ix_clients_ip_address'), table_name='clients')
|
||||||
|
op.drop_column('clients', 'location')
|
||||||
|
op.drop_column('clients', 'hardware_hash')
|
||||||
|
op.drop_column('clients', 'ip_address')
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
"""Downgrade schema."""
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.add_column('clients', sa.Column('ip_address', mysql.VARCHAR(length=45), nullable=True))
|
||||||
|
op.add_column('clients', sa.Column('hardware_hash', mysql.VARCHAR(length=64), nullable=False))
|
||||||
|
op.add_column('clients', sa.Column('location', mysql.VARCHAR(length=100), nullable=True))
|
||||||
|
op.create_index(op.f('ix_clients_ip_address'), 'clients', ['ip_address'], unique=False)
|
||||||
|
op.create_index(op.f('ix_clients_hardware_hash'), 'clients', ['hardware_hash'], unique=False)
|
||||||
|
op.drop_column('clients', 'model')
|
||||||
|
op.drop_column('clients', 'macs')
|
||||||
|
op.drop_column('clients', 'software_version')
|
||||||
|
op.drop_column('clients', 'os_version')
|
||||||
|
op.drop_column('clients', 'hostname')
|
||||||
|
op.drop_column('clients', 'type')
|
||||||
|
op.drop_column('clients', 'ip')
|
||||||
|
op.drop_column('clients', 'hardware_token')
|
||||||
|
# ### end Alembic commands ###
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
# simclient/simclient.py
|
# simclient/simclient.py
|
||||||
|
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
import json
|
import json
|
||||||
@@ -9,23 +10,43 @@ import os
|
|||||||
import re
|
import re
|
||||||
import platform
|
import platform
|
||||||
import logging
|
import logging
|
||||||
DEBUG_MODE = True # Auf False setzen für Produktion
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
# ENV laden
|
||||||
|
load_dotenv("/workspace/simclient/.env")
|
||||||
|
|
||||||
|
# Konfiguration aus ENV
|
||||||
|
ENV = os.getenv("ENV", "development")
|
||||||
|
HEARTBEAT_INTERVAL = int(
|
||||||
|
os.getenv("HEARTBEAT_INTERVAL", 5 if ENV == "development" else 60))
|
||||||
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG" if ENV == "development" else "INFO")
|
||||||
|
MQTT_BROKER = os.getenv("MQTT_BROKER", "mqtt")
|
||||||
|
MQTT_PORT = int(os.getenv("MQTT_PORT", 1883))
|
||||||
|
DEBUG_MODE = os.getenv("DEBUG_MODE", "1" if ENV ==
|
||||||
|
"development" else "0") in ("1", "true", "True")
|
||||||
|
|
||||||
# Logging-Konfiguration
|
# Logging-Konfiguration
|
||||||
LOG_DIR = os.path.dirname(os.path.abspath(__file__))
|
LOG_PATH = "/tmp/simclient.log"
|
||||||
LOG_PATH = os.path.join(LOG_DIR, "simclient.log")
|
|
||||||
log_handlers = [logging.FileHandler(LOG_PATH, encoding="utf-8")]
|
log_handlers = [logging.FileHandler(LOG_PATH, encoding="utf-8")]
|
||||||
if DEBUG_MODE:
|
if DEBUG_MODE:
|
||||||
log_handlers.append(logging.StreamHandler())
|
log_handlers.append(logging.StreamHandler())
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging.DEBUG if DEBUG_MODE else logging.INFO,
|
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
|
||||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||||
handlers=log_handlers
|
handlers=log_handlers
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
discovered = False
|
||||||
|
|
||||||
|
|
||||||
def on_message(client, userdata, msg):
|
def on_message(client, userdata, msg):
|
||||||
|
global discovered
|
||||||
logging.info(f"Empfangen: {msg.topic} {msg.payload.decode()}")
|
logging.info(f"Empfangen: {msg.topic} {msg.payload.decode()}")
|
||||||
|
# ACK-Quittung empfangen?
|
||||||
|
if msg.topic.endswith("/discovery_ack"):
|
||||||
|
discovered = True
|
||||||
|
logging.info("Discovery-ACK empfangen. Starte Heartbeat.")
|
||||||
|
|
||||||
|
|
||||||
def get_mac_addresses():
|
def get_mac_addresses():
|
||||||
@@ -107,7 +128,7 @@ SOFTWARE_VERSION = "1.0.0" # Optional: Anpassen bei neuen Releases
|
|||||||
def send_discovery(client, client_id, hardware_token, ip_addr):
|
def send_discovery(client, client_id, hardware_token, ip_addr):
|
||||||
macs = get_mac_addresses()
|
macs = get_mac_addresses()
|
||||||
discovery_msg = {
|
discovery_msg = {
|
||||||
"client_id": client_id,
|
"uuid": client_id,
|
||||||
"hardware_token": hardware_token,
|
"hardware_token": hardware_token,
|
||||||
"ip": ip_addr,
|
"ip": ip_addr,
|
||||||
"type": "infoscreen",
|
"type": "infoscreen",
|
||||||
@@ -122,20 +143,30 @@ def send_discovery(client, client_id, hardware_token, ip_addr):
|
|||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
global discovered
|
||||||
client_id = str(uuid.uuid4())
|
client_id = str(uuid.uuid4())
|
||||||
hardware_token = get_hardware_token()
|
hardware_token = get_hardware_token()
|
||||||
ip_addr = get_ip()
|
ip_addr = get_ip()
|
||||||
client = mqtt.Client(protocol=mqtt.MQTTv311, callback_api_version=2)
|
client = mqtt.Client(protocol=mqtt.MQTTv311, callback_api_version=2)
|
||||||
client.on_message = on_message
|
client.on_message = on_message
|
||||||
client.connect("mqtt", 1883)
|
client.connect(MQTT_BROKER, MQTT_PORT)
|
||||||
|
# Discovery-ACK-Topic abonnieren
|
||||||
|
ack_topic = f"infoscreen/{client_id}/discovery_ack"
|
||||||
|
client.subscribe(ack_topic)
|
||||||
client.subscribe(f"infoscreen/{client_id}/config")
|
client.subscribe(f"infoscreen/{client_id}/config")
|
||||||
send_discovery(client, client_id, hardware_token, ip_addr)
|
|
||||||
|
# Discovery-Phase: Sende Discovery bis ACK empfangen
|
||||||
|
while not discovered:
|
||||||
|
send_discovery(client, client_id, hardware_token, ip_addr)
|
||||||
|
client.loop(timeout=1.0)
|
||||||
|
time.sleep(HEARTBEAT_INTERVAL)
|
||||||
|
|
||||||
|
# Heartbeat-Phase
|
||||||
while True:
|
while True:
|
||||||
# Heartbeat senden
|
|
||||||
client.publish(f"infoscreen/{client_id}/heartbeat", "alive")
|
client.publish(f"infoscreen/{client_id}/heartbeat", "alive")
|
||||||
logging.debug("Heartbeat gesendet.")
|
logging.debug("Heartbeat gesendet.")
|
||||||
client.loop(timeout=1.0)
|
client.loop(timeout=1.0)
|
||||||
time.sleep(5)
|
time.sleep(HEARTBEAT_INTERVAL)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user