Improve MQTT resilience, clarify behavior, and minor UX
Switch to Paho v2 callbacks; add loop_start() and reconnect_delay_set() for auto-reconnect Rework on_connect/on_disconnect to v2 signatures; handle session_present and reconnection flows Gate heartbeats with client.is_connected() and add short retry on rc=4 (NO_CONN) Re-send discovery after reconnect; ensure re-subscription to all topics Add startup terminal message with ISO timestamp in simclient.py Docs: update README and Copilot instructions with reconnection/heartbeat guidance and benign rc=4 notes
This commit is contained in:
154
src/simclient.py
154
src/simclient.py
@@ -552,6 +552,7 @@ def screenshot_service_thread(client, client_id):
|
||||
|
||||
def main():
|
||||
global discovered
|
||||
print(f"[{datetime.now().isoformat()}] simclient.py: program started")
|
||||
logging.info("Client starting - deleting old event file if present")
|
||||
delete_event_file()
|
||||
|
||||
@@ -575,6 +576,18 @@ def main():
|
||||
client = mqtt.Client(**client_kwargs)
|
||||
client.on_message = on_message
|
||||
|
||||
# Enable automatic reconnection
|
||||
client.reconnect_delay_set(min_delay=1, max_delay=120)
|
||||
|
||||
# Connection state tracking
|
||||
connection_state = {"connected": False, "last_disconnect": None}
|
||||
|
||||
# Optional: Enable MQTT debug logging in DEBUG_MODE
|
||||
if DEBUG_MODE:
|
||||
def on_log(client, userdata, level, buf):
|
||||
logging.debug(f"MQTT: {buf}")
|
||||
client.on_log = on_log
|
||||
|
||||
# Define subscribe_event_topic BEFORE on_connect so it can be called from the callback
|
||||
def subscribe_event_topic(new_group_id):
|
||||
nonlocal event_topic, current_group_id
|
||||
@@ -613,7 +626,26 @@ def main():
|
||||
# on_connect callback: Subscribe to all topics after connection is established
|
||||
def on_connect(client, userdata, flags, rc, properties=None):
|
||||
if rc == 0:
|
||||
logging.info("MQTT connected successfully - subscribing to topics...")
|
||||
connection_state["connected"] = True
|
||||
connection_state["last_disconnect"] = None
|
||||
|
||||
# Check if this is a reconnection
|
||||
# paho-mqtt v2 provides ConnectFlags with attribute 'session_present'
|
||||
# Older versions may provide dict-like flags; default to False.
|
||||
is_reconnect = False
|
||||
try:
|
||||
if hasattr(flags, "session_present"):
|
||||
is_reconnect = bool(getattr(flags, "session_present"))
|
||||
elif isinstance(flags, dict):
|
||||
is_reconnect = bool(flags.get("session present", False))
|
||||
except Exception:
|
||||
is_reconnect = False
|
||||
|
||||
if is_reconnect:
|
||||
logging.info("MQTT reconnected successfully - resubscribing to all topics...")
|
||||
else:
|
||||
logging.info("MQTT connected successfully - subscribing to topics...")
|
||||
|
||||
# Discovery-ACK-Topic abonnieren
|
||||
ack_topic = f"infoscreen/{client_id}/discovery_ack"
|
||||
client.subscribe(ack_topic)
|
||||
@@ -632,10 +664,28 @@ def main():
|
||||
if current_group_id:
|
||||
logging.info(f"Subscribing to event topic for saved group_id: {current_group_id}")
|
||||
subscribe_event_topic(current_group_id)
|
||||
|
||||
# Send discovery message after reconnection to re-register with server
|
||||
if is_reconnect:
|
||||
logging.info("Sending discovery after reconnection to re-register with server")
|
||||
send_discovery(client, client_id, hardware_token, ip_addr)
|
||||
else:
|
||||
connection_state["connected"] = False
|
||||
logging.error(f"MQTT connection failed with code: {rc}")
|
||||
|
||||
# on_disconnect callback (Paho v2 signature)
|
||||
def on_disconnect(client, userdata, disconnect_flags, rc, properties=None):
|
||||
connection_state["connected"] = False
|
||||
connection_state["last_disconnect"] = time.time()
|
||||
|
||||
if rc == 0:
|
||||
logging.info("MQTT disconnected cleanly")
|
||||
else:
|
||||
logging.warning(f"MQTT disconnected unexpectedly with code: {rc}")
|
||||
logging.info("Automatic reconnection will be attempted...")
|
||||
|
||||
client.on_connect = on_connect
|
||||
client.on_disconnect = on_disconnect
|
||||
|
||||
# Robust MQTT connect with fallbacks and retries
|
||||
broker_candidates = [MQTT_BROKER]
|
||||
@@ -669,13 +719,22 @@ def main():
|
||||
logging.error(f"MQTT connection failed after multiple attempts: {last_error}")
|
||||
raise last_error
|
||||
|
||||
# Start the network loop early to begin connection process
|
||||
client.loop_start()
|
||||
logging.info("MQTT network loop started - establishing connection...")
|
||||
|
||||
# Wait for connection to complete and on_connect callback to fire
|
||||
# This ensures subscriptions are set up before we start discovery
|
||||
logging.info("Waiting for on_connect callback and subscription setup...")
|
||||
for _ in range(10): # Wait up to ~1 second
|
||||
client.loop(timeout=0.1)
|
||||
time.sleep(0.1)
|
||||
logging.info("Subscription setup complete, starting discovery phase")
|
||||
logging.info("Waiting for initial connection and subscription setup...")
|
||||
connection_timeout = 30 # seconds
|
||||
start_wait = time.time()
|
||||
while not connection_state["connected"] and (time.time() - start_wait) < connection_timeout:
|
||||
time.sleep(0.5)
|
||||
|
||||
if not connection_state["connected"]:
|
||||
logging.error(f"Failed to establish initial MQTT connection within {connection_timeout}s")
|
||||
raise Exception("MQTT connection timeout")
|
||||
|
||||
logging.info("Initial connection established, subscription setup complete")
|
||||
|
||||
# group_id message callback
|
||||
group_id_topic = f"infoscreen/{client_id}/group_id"
|
||||
@@ -705,16 +764,27 @@ def main():
|
||||
logging.info(f"Current group_id at start: {current_group_id if current_group_id else 'none'}")
|
||||
|
||||
# Discovery-Phase: Sende Discovery bis ACK empfangen
|
||||
while not discovered:
|
||||
send_discovery(client, client_id, hardware_token, ip_addr)
|
||||
# Check for messages and discovered flag more frequently
|
||||
for _ in range(int(HEARTBEAT_INTERVAL)):
|
||||
client.loop(timeout=1.0)
|
||||
if discovered:
|
||||
break
|
||||
time.sleep(1)
|
||||
# The loop is already started, just wait and send discovery messages
|
||||
discovery_attempts = 0
|
||||
max_discovery_attempts = 20
|
||||
while not discovered and discovery_attempts < max_discovery_attempts:
|
||||
if connection_state["connected"]:
|
||||
send_discovery(client, client_id, hardware_token, ip_addr)
|
||||
discovery_attempts += 1
|
||||
# Wait for ACK, checking every second
|
||||
for _ in range(int(HEARTBEAT_INTERVAL)):
|
||||
if discovered:
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
logging.info("Waiting for MQTT connection before sending discovery...")
|
||||
time.sleep(2)
|
||||
|
||||
if discovered:
|
||||
break
|
||||
|
||||
if not discovered:
|
||||
logging.warning(f"Discovery ACK not received after {max_discovery_attempts} attempts - continuing anyway")
|
||||
|
||||
# Start screenshot service in background thread
|
||||
screenshot_thread = threading.Thread(
|
||||
@@ -725,16 +795,54 @@ def main():
|
||||
screenshot_thread.start()
|
||||
logging.info("Screenshot service thread started")
|
||||
|
||||
# Heartbeat-Loop
|
||||
# Heartbeat-Loop with connection state monitoring
|
||||
last_heartbeat = 0
|
||||
logging.info("Entering heartbeat loop (network loop already running in background thread)")
|
||||
|
||||
while True:
|
||||
current_time = time.time()
|
||||
if current_time - last_heartbeat >= HEARTBEAT_INTERVAL:
|
||||
client.publish(f"infoscreen/{client_id}/heartbeat", "alive")
|
||||
logging.info("Heartbeat sent.")
|
||||
last_heartbeat = current_time
|
||||
client.loop(timeout=5.0)
|
||||
time.sleep(5)
|
||||
try:
|
||||
current_time = time.time()
|
||||
|
||||
# Check connection state and log warnings if disconnected
|
||||
if not connection_state["connected"]:
|
||||
if connection_state["last_disconnect"]:
|
||||
disconnect_duration = current_time - connection_state["last_disconnect"]
|
||||
logging.warning(f"MQTT disconnected for {disconnect_duration:.1f}s - waiting for reconnection...")
|
||||
else:
|
||||
logging.warning("MQTT not connected - waiting for connection...")
|
||||
|
||||
# Send heartbeat only when connected
|
||||
if current_time - last_heartbeat >= HEARTBEAT_INTERVAL:
|
||||
if client.is_connected():
|
||||
result = client.publish(f"infoscreen/{client_id}/heartbeat", "alive", qos=0)
|
||||
if result.rc == mqtt.MQTT_ERR_SUCCESS:
|
||||
logging.info("Heartbeat sent.")
|
||||
elif result.rc == mqtt.MQTT_ERR_NO_CONN:
|
||||
logging.debug("Heartbeat publish returned NO_CONN; retrying in 2s...")
|
||||
time.sleep(2)
|
||||
if client.is_connected():
|
||||
retry = client.publish(f"infoscreen/{client_id}/heartbeat", "alive", qos=0)
|
||||
if retry.rc == mqtt.MQTT_ERR_SUCCESS:
|
||||
logging.info("Heartbeat sent after retry.")
|
||||
else:
|
||||
logging.warning(f"Heartbeat publish failed after retry with code: {retry.rc}")
|
||||
else:
|
||||
logging.warning("Skipping heartbeat retry - MQTT still not connected")
|
||||
else:
|
||||
logging.warning(f"Heartbeat publish failed with code: {result.rc}")
|
||||
else:
|
||||
logging.debug("Skipping heartbeat - MQTT not connected (is_connected=False)")
|
||||
last_heartbeat = current_time
|
||||
|
||||
time.sleep(5)
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Shutting down gracefully...")
|
||||
client.loop_stop()
|
||||
client.disconnect()
|
||||
break
|
||||
except Exception as e:
|
||||
logging.error(f"Error in main loop: {e}")
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user