diff --git a/datacollector.py b/datacollector.py index 9882d99..41c3ac1 100644 --- a/datacollector.py +++ b/datacollector.py @@ -7,12 +7,15 @@ import logging import struct import subprocess import os +import sys import threading +from typing import Optional from dotenv import load_dotenv from logging.handlers import RotatingFileHandler from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler -from sqlalchemy import create_engine, exc +from sqlalchemy import create_engine, exc, text +from sqlalchemy.engine import URL # from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime, timezone @@ -25,7 +28,7 @@ load_dotenv() LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logger = logging.getLogger(__name__) logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) -handler = RotatingFileHandler('datacollector.log', maxBytes=100000000, backupCount=1) +handler = RotatingFileHandler('datacollector.log', maxBytes=5242880, backupCount=10) handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(handler) @@ -65,6 +68,12 @@ ignored_sensors_for_time = ['Bosch-BME280_1', 'Oregon-v1_0', 'inFactory-TH_252', allowed_sensors_for_time = ['Bresser-6in1_-2021550075', 'LaCrosse-TX35DTHIT_20', 'LaCrosse-TX35DTHIT_28', 'LaCrosse-TX35DTHIT_52', 'LaCrosse-TX35DTHIT_31'] debug = False +# Safe defaults for DB-derived caches +sensor_ids = [] +sensor_names = [] +sensor_by_name_room = {} +pool_sensors_cache = {} + # Track sensor failure states to avoid repeated logging sensor_failure_logged = {} @@ -155,8 +164,16 @@ sqlite_conn.commit() logger.info(f"SQLite database initialized at: {sqlite_db_path}") -# Create a connection to the database -sql_engine = create_engine('mysql+mysqlconnector://weatherdata:cfCU$swM!HfK82%*@192.168.43.102/weatherdata') +# Database connection (hardcoded for private intranet) +DB_HOST = "192.168.43.102" +DB_PORT = 3306 +sql_engine = create_engine('mysql+mysqlconnector://weatherdata:cfCU$swM!HfK82%*@192.168.43.102/weatherdata', + connect_args={"connection_timeout": 5}) + +# DB availability tracking for resilient mode +db_available = False +last_db_check = 0.0 +DB_RETRY_SECONDS = 30 # Retry DB connection every 30 seconds if down # Create a configured "Session" class Session = sessionmaker(bind=sql_engine) @@ -176,6 +193,84 @@ def parse_radio_frame(byte_data): if not byte_data: return None + +PAYLOAD_SIZE = 15 # bytes in pool payload +MAGIC1 = 0x42 +MAGIC2 = 0x99 + + +def crc8_xor(data: bytes) -> int: + """Simple XOR checksum used by the pool payload.""" + c = 0 + for b in data: + c ^= b + return c + + +def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = None): + """Scan a byte stream for a plausible pool payload. + + Slides a 15-byte window, validates with CRC, version/nodeId, and range checks, + and scores candidates. Returns the best decoded dict or None. + """ + # Drop leading preamble (0xAA) if present + while candidate_bytes.startswith(b"\xaa"): + candidate_bytes = candidate_bytes[1:] + + best = None + best_score = -1 + + for offset in range(0, len(candidate_bytes) - PAYLOAD_SIZE + 1): + chunk = candidate_bytes[offset:offset + PAYLOAD_SIZE] + try: + magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack( + ' best_score: + best_score = score + best = { + "offset": offset, + "magic_ok": magic1 == MAGIC1 and magic2 == MAGIC2, + "version": version, + "nodeId": nodeId, + "sequence": seq, + "t_ds_c": t_ds10 / 10.0, + "t_bme_c": t_bme10 / 10.0, + "humidity": hum10 / 10.0, + "pressure_hpa": pres1 / 10.0, + "crc_valid": True, + } + + return best + try: # Find sync 0x2D followed by networkId (second byte) sync_index = byte_data.find(b"\x2d") @@ -220,6 +315,9 @@ def parse_radio_frame(byte_data): def refresh_sensor_cache(): """Refresh the sensor cache from database""" global sensor_ids, sensor_names, sensor_by_name_room, pool_sensors_cache + if not ensure_db_connection(): + return [] + sensors = session.query(Sensor).all() sensor_ids = [f'{sensor.mqtt_name}_{sensor.mqtt_id}' for sensor in sensors] sensor_names = list(set([sensor.mqtt_name for sensor in sensors])) # Unique model names @@ -249,6 +347,9 @@ def build_sensor_lists_from_db(): Call after refresh_sensor_cache() and whenever sensors are added/removed. """ global KNOWN_DEVICES, allowed_sensors_for_time, ignored_sensors_for_time + if not ensure_db_connection(): + return + sensors = session.query(Sensor).all() # Build KNOWN_DEVICES - unique model names @@ -297,23 +398,14 @@ def get_sensor_keys(sensor_type): logger.warning(f"Unknown sensor type '{sensor_type}' - defaulting to temperature only") return ['temperature_C'] -sensors = refresh_sensor_cache() -sensor_by_name_room = {} -pool_sensors_cache = {} - -# Build sensor lists from database at startup -build_sensor_lists_from_db() +sensors = [] warte = '' # Funktion zum Überprüfen der Remote-Server-Verbindung def is_remote_server_available(): - try: - response = requests.get('http://192.168.43.102') - return response.status_code == 200 - except requests.ConnectionError: - return False - # return True + # If DB is up, we consider the remote server available for writes. + return db_available # Funktion zum Speichern der JSON-Daten in SQLite def save_json_locally(json_dict): @@ -326,6 +418,31 @@ def save_json_locally(json_dict): logger.error(f"Error saving to SQLite: {e}") +def ensure_db_connection(force: bool = False) -> bool: + """Try to establish DB connectivity with throttling. Returns True if DB is reachable.""" + global db_available, last_db_check, session + now = time.time() + if not force and (now - last_db_check) < DB_RETRY_SECONDS: + return db_available + + last_db_check = now + try: + with sql_engine.connect() as conn: + conn.execute(text('SELECT 1')) + if not db_available: + logger.info(f"Database reachable again at {DB_HOST}:{DB_PORT}") + db_available = True + # Recreate session to ensure fresh connections + session = Session() + except Exception as e: + if db_available: + logger.warning(f"Lost database connectivity: {e}") + else: + logger.info(f"Database still unreachable: {e}") + db_available = False + return db_available + + # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, flags, reason_code, properties): # Subscribing in on_connect() means that if we lose the connection and @@ -340,149 +457,123 @@ def on_message(client, userdata, msg): model = d['model'] if model in sensor_names: if model == 'pool': - test_value = d['rows'][0]['data'] - if not test_value.startswith('aaaaaa'): + # Extract hex data from rows array + if not d.get('rows') or len(d['rows']) == 0: + if LOG_MALFORMED_HEX: + malformed_hex_logger.info(f"Pool message missing rows: {d}") return - else: - # Decode the hex-encoded binary payload and scan for a plausible struct - hex_data = test_value[6:] # Remove 'aaaaaa' prefix - try: - byte_data = bytes.fromhex(hex_data) - except ValueError: - if LOG_MALFORMED_HEX: - malformed_hex_logger.info(f"Invalid hex: {hex_data}") - print(f"Invalid hex data: {hex_data}") - print(d) - warte = '' - return - # Attempt to parse the radio frame first (preamble/sync/header/data/crc) - frame = parse_radio_frame(byte_data) - if frame and frame.get('data'): - print( - f"Parsed radio frame: netId={frame.get('network_id')}, len={frame['payload_len']}, " - f"dest={frame['dest_id']}, sender={frame['sender_id']}, ctl={frame['ctl']}, crc={frame['crc_bytes'].hex()}" - ) - candidate_bytes = frame['data'] - else: - # Fallback: Drop optional leading 0xAA bytes from hardware and use raw stream - if LOG_MALFORMED_HEX and not frame: - malformed_hex_logger.info(f"Frame parse failed: {byte_data.hex()}") - tmp = byte_data - while tmp.startswith(b"\xaa"): - tmp = tmp[1:] - candidate_bytes = tmp + hex_data = d['rows'][0].get('data') + if not hex_data: + if LOG_MALFORMED_HEX: + malformed_hex_logger.info(f"Pool message missing data: {d}") + return - print(f"Raw bytes ({len(byte_data)}): {byte_data.hex()}") - print(f"Candidate payload for app decode ({len(candidate_bytes)}): {candidate_bytes.hex()}") + # Strip optional 'aaaaaa' prefix if present (old format) + if hex_data.startswith('aaaaaa'): + hex_data = hex_data[6:] - # Decode payload struct with magic bytes and CRC - # struct: magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc - PAYLOAD_SIZE = 15 - MAGIC1 = 0x42 - MAGIC2 = 0x99 - - def calculate_crc8(data): - """Simple XOR checksum""" - c = 0 - for byte in data: - c ^= byte - return c - - # Scan for magic bytes within candidate payload - magic_offset = -1 - for i in range(len(candidate_bytes) - 1): - if candidate_bytes[i] == MAGIC1 and candidate_bytes[i+1] == MAGIC2: - magic_offset = i - break - - if magic_offset == -1: - if LOG_MALFORMED_HEX: - malformed_hex_logger.info(f"Magic bytes not found: {candidate_bytes.hex()}") - print(f"Magic bytes {MAGIC1:02x} {MAGIC2:02x} not found in payload") - elif len(candidate_bytes) - magic_offset < PAYLOAD_SIZE: - print(f"Payload too short after magic bytes: {len(candidate_bytes) - magic_offset} bytes (need {PAYLOAD_SIZE})") - else: - try: - # Extract payload starting from magic bytes - payload_data = candidate_bytes[magic_offset:magic_offset + PAYLOAD_SIZE] - - # Unpack: BBBBHhhhHB = magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc - magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack(' id=11 for BME - 'battery_ok': 1, - 'temperature_C': parsed_struct['t_bme_c'], - 'humidity': parsed_struct['humidity'], - 'pressure_rel': parsed_struct['pressure_hpa'], - 'mic': 'CRC' if parsed_struct['crc_valid'] else 'CHECKSUM' - } - - # Message 2: DS18B20 sensor (temp only) - ds_msg = { - 'time': original_time, - 'model': 'pool', - 'id': nodeId * 10 + 2, # e.g., nodeId=1 -> id=12 for DS - 'battery_ok': 1, - 'temperature_C': parsed_struct['t_ds_c'], - 'mic': 'CRC' if parsed_struct['crc_valid'] else 'CHECKSUM' - } - - # Process both messages through the existing logic - for msg_data in [bme_msg, ds_msg]: - print(f"Received message from {msg_data['model']}: \n {msg_data}") - sensor_id = msg_data['id'] - sensor_key = f"{msg_data['model']}_{sensor_id}" - - if sensor_key not in seen_messages.keys(): - seen_messages[sensor_key] = [msg_data] - else: - seen_messages[sensor_key].append(msg_data) - - except struct.error as e: - print(f"Struct unpack error: {e}") - + try: + byte_data = bytes.fromhex(hex_data) + except ValueError: + if LOG_MALFORMED_HEX: + malformed_hex_logger.info(f"Invalid hex: {hex_data}") + print(f"Invalid hex data: {hex_data}") + print(d) warte = '' return - print(f"Received message from {model}: \n {d}") - id = d['id'] - if model == 'Bresser-6in1': - if d['flags'] == 0: - if 'rain_mm' in d.keys(): - del d['rain_mm'] - sensor_key = f'{model}_{id}' - if sensor_key not in seen_messages.keys(): - seen_messages[sensor_key] = [d] + + # Attempt to parse the radio frame first (preamble/sync/header/data/crc) + frame = parse_radio_frame(byte_data) + if frame and frame.get('data'): + print( + f"Parsed radio frame: netId={frame.get('network_id')}, len={frame['payload_len']}, " + f"dest={frame['dest_id']}, sender={frame['sender_id']}, ctl={frame['ctl']}, crc={frame['crc_bytes'].hex()}" + ) + candidate_bytes = frame['data'] + else: + # Fallback: Drop optional leading 0xAA bytes from hardware and use raw stream + if LOG_MALFORMED_HEX and not frame: + malformed_hex_logger.info(f"Frame parse failed: {byte_data.hex()}") + tmp = byte_data + while tmp.startswith(b"\xaa"): + tmp = tmp[1:] + candidate_bytes = tmp + + print(f"Raw bytes ({len(byte_data)}): {byte_data.hex()}") + print(f"Candidate payload for app decode ({len(candidate_bytes)}): {candidate_bytes.hex()}") + + # Decode payload by sliding-window detection (magic bytes may be missing) + expected_seq = None + if new_data_queue: + try: + expected_seq = (new_data_queue[-1].get("sequence") or 0) + 1 + except Exception: + expected_seq = None + + decoded = decode_pool_payload(candidate_bytes, expected_seq=expected_seq) + + if not decoded: + if LOG_MALFORMED_HEX: + malformed_hex_logger.info(f"No valid payload found: {candidate_bytes.hex()}") + print("No valid pool payload found in candidate bytes") + warte = '' + return + + print( + f"Decoded payload at offset {decoded['offset']}: seq={decoded['sequence']}, " + f"t_ds={decoded['t_ds_c']}C, t_bme={decoded['t_bme_c']}C, " + f"hum={decoded['humidity']}%, pres={decoded['pressure_hpa']}hPa, " + f"magic_ok={decoded['magic_ok']}" + ) + + original_time = d.get('time', datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S')) + + bme_msg = { + 'time': original_time, + 'model': 'pool', + 'id': decoded['nodeId'] * 10 + 1, + 'battery_ok': 1, + 'temperature_C': decoded['t_bme_c'], + 'humidity': decoded['humidity'], + 'pressure_rel': decoded['pressure_hpa'], + 'mic': 'CRC' + } + + ds_msg = { + 'time': original_time, + 'model': 'pool', + 'id': decoded['nodeId'] * 10 + 2, + 'battery_ok': 1, + 'temperature_C': decoded['t_ds_c'], + 'mic': 'CRC' + } + + for msg_data in [bme_msg, ds_msg]: + print(f"Received message from {msg_data['model']}: \n {msg_data}") + sensor_id = msg_data['id'] + sensor_key = f"{msg_data['model']}_{sensor_id}" + + if sensor_key not in seen_messages.keys(): + seen_messages[sensor_key] = [msg_data] + else: + seen_messages[sensor_key].append(msg_data) + + warte = '' + return else: - seen_messages[sensor_key].append(d) + # Process non-pool sensors + print(f"Received message from {model}: \n {d}") + id = d['id'] + if model == 'Bresser-6in1': + if d['flags'] == 0: + if 'rain_mm' in d.keys(): + del d['rain_mm'] + sensor_key = f'{model}_{id}' + if sensor_key not in seen_messages.keys(): + seen_messages[sensor_key] = [d] + else: + seen_messages[sensor_key].append(d) # Define a function to update the data in the database @@ -499,12 +590,12 @@ def update_data(utc_time, mqtt_id, temperature_c, humidity, pressure_rel, batter "gust": gust, "rain_mm": rain_mm } - if is_remote_server_available(): + if ensure_db_connection(): new_data_queue.append(values) sync_data() # Data sent - no logging needed for normal operation else: - logger.warning(f"{utc_time}: Remote server unavailable - storing locally for {mqtt_id}") + logger.warning(f"{utc_time}: Database unavailable - storing locally for {mqtt_id}") save_json_locally(values) @@ -670,10 +761,38 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b mqtt_name, mqtt_id = mqtt_name_id.split('_', 1) # Use maxsplit=1 to handle IDs with underscores # Get the sensor object from the database (with auto-update for battery changes) + if not ensure_db_connection(): + # DB unavailable: stash the data to SQLite and return + save_json_locally({ + 'utc_time': utc_time, + 'mqtt_id': mqtt_name_id, + 'temperature_c': temperature_c, + 'humidity': humidity, + 'pressure_rel': pressure_rel, + 'battery': battery, + 'average_speed': average_speed, + 'direction': direction, + 'gust': gust, + 'rain_mm': rain_mm + }) + return + sensor = get_or_update_sensor(mqtt_name, mqtt_id) if not sensor: logger.error(f"Cannot store data for {mqtt_name_id} - sensor not found in database") + save_json_locally({ + 'utc_time': utc_time, + 'mqtt_id': mqtt_name_id, + 'temperature_c': temperature_c, + 'humidity': humidity, + 'pressure_rel': pressure_rel, + 'battery': battery, + 'average_speed': average_speed, + 'direction': direction, + 'gust': gust, + 'rain_mm': rain_mm + }) return position = sensor.position @@ -750,8 +869,39 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b else: logger.info(f"{utc_time}: Precipitation value is too high: {rain_mm}") - # Commit the changes - session.commit() + try: + # Commit the changes + session.commit() + except exc.SQLAlchemyError as e: + logger.error(f"SQLAlchemyError on commit: {e}") + session.rollback() + save_json_locally({ + 'utc_time': utc_time, + 'mqtt_id': mqtt_name_id, + 'temperature_c': temperature_c, + 'humidity': humidity, + 'pressure_rel': pressure_rel, + 'battery': battery, + 'average_speed': average_speed, + 'direction': direction, + 'gust': gust, + 'rain_mm': rain_mm + }) + except Exception as e: + logger.error(f"Error on commit: {e}") + session.rollback() + save_json_locally({ + 'utc_time': utc_time, + 'mqtt_id': mqtt_name_id, + 'temperature_c': temperature_c, + 'humidity': humidity, + 'pressure_rel': pressure_rel, + 'battery': battery, + 'average_speed': average_speed, + 'direction': direction, + 'gust': gust, + 'rain_mm': rain_mm + }) mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) @@ -851,6 +1001,9 @@ def process_mqtt_messages(seen_messages): """Process all received MQTT messages dynamically based on database configuration. No hardcoded sensor checks - all sensors are processed based on what's in the database. """ + if not ensure_db_connection(): + # DB not available; skip processing but keep messages cached for later. + return for sensor, data in seen_messages.items(): if data: # Try to get 'time' from first message, or use None as fallback @@ -911,55 +1064,56 @@ def get_and_delete_json_data(): # Funktion zum Synchronisieren der Daten def sync_data(): - if is_remote_server_available(): - local_data_written = False - - # Zuerst lokal gespeicherte Daten synchronisieren (SQLite Fallback) - local_data = get_and_delete_json_data() - for data in local_data: - try: - if isinstance(data, dict) and 'utc_time' in data: - # Einzelner Sensor-Eintrag - if " UTC" in str(data.get('utc_time', '')): - data['utc_time'] = data['utc_time'].replace(" UTC", "") - - store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'), - data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1), - data.get('average_speed'), data.get('direction'), - data.get('gust'), data.get('rain_mm')) - if not local_data_written: - utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") - logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB") - local_data_written = True - except exc.SQLAlchemyError as e: - logger.error(f"SQLAlchemyError syncing local data: {e}") - session.rollback() - # Rette den Datensatz zurück in SQLite - save_json_locally(data) - except Exception as e: - logger.error(f"Error syncing local data: {e}") - save_json_locally(data) - - # Danach neue Daten aus der Warteschlange synchronisieren - while new_data_queue: - data = new_data_queue.pop(0) - try: - if isinstance(data, dict) and 'mqtt_id' in data: - store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'], - data['pressure_rel'], data['battery'], data['average_speed'], data['direction'], - data['gust'], data['rain_mm']) - except exc.SQLAlchemyError as e: - logger.error(f"SQLAlchemyError: {e}") - session.rollback() - save_json_locally(data) - except Exception as e: - logger.error(f"Error writing data: {e}") - save_json_locally(data) - else: + if not ensure_db_connection(force=True): # MariaDB nicht verfügbar - speichere in SQLite while new_data_queue: data = new_data_queue.pop(0) save_json_locally(data) + return + + local_data_written = False + + # Zuerst lokal gespeicherte Daten synchronisieren (SQLite Fallback) + local_data = get_and_delete_json_data() + for data in local_data: + try: + if isinstance(data, dict) and 'utc_time' in data: + # Einzelner Sensor-Eintrag + if " UTC" in str(data.get('utc_time', '')): + data['utc_time'] = data['utc_time'].replace(" UTC", "") + + store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'), + data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1), + data.get('average_speed'), data.get('direction'), + data.get('gust'), data.get('rain_mm')) + if not local_data_written: + utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") + logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB") + local_data_written = True + except exc.SQLAlchemyError as e: + logger.error(f"SQLAlchemyError syncing local data: {e}") + session.rollback() + # Rette den Datensatz zurück in SQLite + save_json_locally(data) + except Exception as e: + logger.error(f"Error syncing local data: {e}") + save_json_locally(data) + + # Danach neue Daten aus der Warteschlange synchronisieren + while new_data_queue: + data = new_data_queue.pop(0) + try: + if isinstance(data, dict) and 'mqtt_id' in data: + store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'], + data['pressure_rel'], data['battery'], data['average_speed'], data['direction'], + data['gust'], data['rain_mm']) + except exc.SQLAlchemyError as e: + logger.error(f"SQLAlchemyError: {e}") + session.rollback() + save_json_locally(data) + except Exception as e: + logger.error(f"Error writing data: {e}") + save_json_locally(data) def update_last_transmission_time(sensor, time_value): """Update last transmission time for a sensor. Uses provided time or falls back to current time if not available.""" @@ -1061,20 +1215,35 @@ def restart_pi(): if __name__ == '__main__': count = 0 - + # Start environment file watcher for runtime config changes start_env_watcher() + # Initial DB check (non-fatal). If unreachable, we will retry in loop. + ensure_db_connection(force=True) + if db_available: + try: + sensors = refresh_sensor_cache() + build_sensor_lists_from_db() + except Exception as e: + logger.warning(f"Failed to load sensor configuration after DB connection: {e}") + else: + logger.warning(f"Starting without database; will cache data locally and retry every {DB_RETRY_SECONDS}s") + print('start data collection') try: while True: + # Periodically retry DB connection if currently down + if not db_available: + if ensure_db_connection(force=True): + try: + sensors = refresh_sensor_cache() + build_sensor_lists_from_db() + sync_data() # flush cached data once DB is back + except Exception as e: + logger.error(f"DB reconnected but failed to refresh caches/sync: {e}") + check_last_transmission_time() - # if count >= 600: - # mqttc.loop_stop() - # break - - # count += 1 - # print(count) time.sleep(60) utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") @@ -1093,14 +1262,12 @@ if __name__ == '__main__': if status_lines: logger.info(f"{utc_time}: Last seen:\n" + "\n".join(status_lines)) - # logger.info(f"{utc_time}: last seen at: {', '.join(f'{k}: {v.strftime('%H:%M:%S')} ({(datetime.now(timezone.utc) - v).total_seconds():.0f} Sekunden ago)' if isinstance(v, datetime) else f'{k}: {v}' for k, v in last_transmission_times.items())}") process_mqtt_messages(seen_messages) - if is_remote_server_available(): + if ensure_db_connection(): new_data_queue.append(seen_messages) sync_data() - # Data synced - no logging needed for normal operation else: - logger.warning(f"{utc_time}: Remote server unavailable - storing batch locally") + logger.warning(f"{utc_time}: Database unavailable - storing batch locally") save_json_locally(seen_messages) except KeyboardInterrupt: logger.info("Shutting down gracefully...") diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index be633a9..5207264 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -17,3 +17,10 @@ services: - SSH_KEY_PATH=/workspace/.ssh/id_rsa - STALL_WINDOW_SECONDS=${STALL_WINDOW_SECONDS:-300} - RESTART_COOLDOWN_SECONDS=${RESTART_COOLDOWN_SECONDS:-3600} + # DB override for dev testing; defaults to an invalid host to simulate DB-down + - DB_HOST=${DB_HOST:-invalid-host} + - DB_PORT=${DB_PORT:-3306} + - DB_USER=${DB_USER:-weatherdata} + - DB_PASSWORD=${DB_PASSWORD:-cfCU$swM!HfK82%*} + - DB_NAME=${DB_NAME:-weatherdata} + - DB_CONNECT_TIMEOUT=${DB_CONNECT_TIMEOUT:-5} diff --git a/test_pool_decode.py b/test_pool_decode.py new file mode 100644 index 0000000..546316d --- /dev/null +++ b/test_pool_decode.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +"""Test pool payload decoding with the new MQTT message format""" + +import json +import struct +from typing import Optional + +PAYLOAD_SIZE = 15 +MAGIC1 = 0x42 +MAGIC2 = 0x99 + + +def crc8_xor(data: bytes) -> int: + """Simple XOR checksum used by the pool payload.""" + c = 0 + for b in data: + c ^= b + return c + + +def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = None): + """Scan a byte stream for a plausible pool payload. + + Slides a 15-byte window, validates with CRC, version/nodeId, and range checks, + and scores candidates. Returns the best decoded dict or None. + """ + # Drop leading preamble (0xAA) if present + while candidate_bytes.startswith(b"\xaa"): + candidate_bytes = candidate_bytes[1:] + + best = None + best_score = -1 + + for offset in range(0, len(candidate_bytes) - PAYLOAD_SIZE + 1): + chunk = candidate_bytes[offset:offset + PAYLOAD_SIZE] + try: + magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack( + ' best_score: + best_score = score + best = { + "offset": offset, + "magic_ok": magic1 == MAGIC1 and magic2 == MAGIC2, + "version": version, + "nodeId": nodeId, + "sequence": seq, + "t_ds_c": t_ds10 / 10.0, + "t_bme_c": t_bme10 / 10.0, + "humidity": hum10 / 10.0, + "pressure_hpa": pres1 / 10.0, + "crc_valid": True, + } + + return best + + +# Test with the actual MQTT message +mqtt_message = { + "time": "2025-12-27T13:26:47", + "model": "pool", + "count": 1, + "num_rows": 1, + "rows": [ + { + "len": 143, + "data": "429901013400a801f6002b0294272a000000" + } + ], + "codes": ["{143}429901013400a801f6002b0294272a000000"] +} + +print("Testing pool payload decode with new MQTT format:") +print(f"MQTT message: {json.dumps(mqtt_message, indent=2)}") +print() + +hex_data = mqtt_message['rows'][0]['data'] +print(f"Hex data from rows[0]['data']: {hex_data}") +print(f"Hex data length: {len(hex_data)} chars ({len(hex_data)//2} bytes)") + +# Strip 'aaaaaa' prefix if present +if hex_data.startswith('aaaaaa'): + hex_data = hex_data[6:] + print(f"Stripped 'aaaaaa' prefix, remaining: {hex_data}") + +byte_data = bytes.fromhex(hex_data) +print(f"Byte data: {byte_data.hex()}") +print() + +# Decode with sliding window +decoded = decode_pool_payload(byte_data) + +if decoded: + print("✓ Payload decoded successfully!") + print(json.dumps(decoded, indent=2)) + + print() + print("Generated sensor messages:") + + bme_msg = { + 'time': mqtt_message['time'], + 'model': 'pool', + 'id': decoded['nodeId'] * 10 + 1, + 'battery_ok': 1, + 'temperature_C': decoded['t_bme_c'], + 'humidity': decoded['humidity'], + 'pressure_rel': decoded['pressure_hpa'], + 'mic': 'CRC' + } + + ds_msg = { + 'time': mqtt_message['time'], + 'model': 'pool', + 'id': decoded['nodeId'] * 10 + 2, + 'battery_ok': 1, + 'temperature_C': decoded['t_ds_c'], + 'mic': 'CRC' + } + + print("BME280 message:") + print(json.dumps(bme_msg, indent=2)) + print() + print("DS18B20 message:") + print(json.dumps(ds_msg, indent=2)) +else: + print("✗ Failed to decode payload!")