From 09c003964d177e84aca6bb6b078eef56ada8cb1c Mon Sep 17 00:00:00 2001 From: olaf Date: Sun, 21 Dec 2025 16:04:25 +0000 Subject: [PATCH] handling of changing ids of sensors --- data_tables.py | 4 +- datacollector.py | 280 +++++++++++++++++++++++++++++------------ docker-compose.dev.yml | 2 + requirements.txt | 3 +- 4 files changed, 210 insertions(+), 79 deletions(-) diff --git a/data_tables.py b/data_tables.py index f220ce9..d533c6a 100644 --- a/data_tables.py +++ b/data_tables.py @@ -10,13 +10,15 @@ Base = declarative_base() class Sensor(Base): __tablename__ = 'sensors' id = Column(Integer, primary_key=True) - mqtt_name = Column(String(50), unique=True) + mqtt_name = Column(String(50)) # Removed unique=True to allow multiple pool sensors with same name mqtt_id = Column(String(50)) position = Column(String(50)) room = Column(String(50)) battery = Column(Float) rain_offset = Column(Float, default=0.0) # Cumulative offset for rain sensor resets last_rain_value = Column(Float, default=0.0) # Last reported rain value (for reset detection) + node_id = Column(Integer, nullable=True) # For pool sensors: the nodeId (1, 2, etc.) that generates mqtt_id + sensor_type = Column(String(50), nullable=True) # Sensor type: 'BME280', 'DS18B20', 'Bresser-6in1', etc. # Define the TemperatureInside table class TemperatureInside(Base): diff --git a/datacollector.py b/datacollector.py index bbd2550..9882d99 100644 --- a/datacollector.py +++ b/datacollector.py @@ -1,9 +1,3 @@ - -# import debugpy -# debugpy.listen(('0.0.0.0', 5678)) -# print("Waiting for debugger attach") -# debugpy.wait_for_client() - import json import time import paho.mqtt.client as mqtt @@ -14,6 +8,7 @@ import struct import subprocess import os import threading +from dotenv import load_dotenv from logging.handlers import RotatingFileHandler from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler @@ -23,6 +18,9 @@ from sqlalchemy.orm import sessionmaker from datetime import datetime, timezone from data_tables import Sensor, TemperatureInside,TemperatureOutside, HumidityOutside, HumidityInside, AirPressure, Wind, Precipitation +# Load .env file so environment variables from .env are available at startup +load_dotenv() + # Configure logging with environment-based log level LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logger = logging.getLogger(__name__) @@ -31,6 +29,17 @@ handler = RotatingFileHandler('datacollector.log', maxBytes=100000000, backupCou handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(handler) +# Log loaded configuration values at startup (after logger setup) +try: + logger.info( + f"Startup config: LOG_LEVEL={LOG_LEVEL}, " + f"BRESSER_6IN1_TEMP_OFFSET={float(os.getenv('BRESSER_6IN1_TEMP_OFFSET', '0'))}°C, " + f"BATTERY_CHANGE_MIN_SILENCE={os.getenv('BATTERY_CHANGE_MIN_SILENCE', '60')}s, " + f"BATTERY_CHANGE_MAX_SILENCE={os.getenv('BATTERY_CHANGE_MAX_SILENCE', '600')}s" + ) +except Exception: + pass + # Malformed hex logging (controlled via environment). Enable with LOG_MALFORMED_HEX=true|1|yes|on LOG_MALFORMED_HEX = os.getenv("LOG_MALFORMED_HEX", "false").strip().lower() in ("1", "true", "yes", "on") malformed_hex_logger = logging.getLogger('malformed_hex') @@ -67,6 +76,11 @@ last_restart_time = None # Load sensor-specific temperature offsets from environment BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0")) + +# Load battery change detection timing from environment +BATTERY_CHANGE_MIN_SILENCE = int(os.getenv("BATTERY_CHANGE_MIN_SILENCE", "60")) # seconds +BATTERY_CHANGE_MAX_SILENCE = int(os.getenv("BATTERY_CHANGE_MAX_SILENCE", "600")) # seconds + config_lock = threading.Lock() # File watcher for runtime configuration changes @@ -78,13 +92,25 @@ class EnvFileWatcher(FileSystemEventHandler): def reload_config(): """Reload configuration values from .env file""" - global BRESSER_6IN1_TEMP_OFFSET + global BRESSER_6IN1_TEMP_OFFSET, BATTERY_CHANGE_MIN_SILENCE, BATTERY_CHANGE_MAX_SILENCE try: + # Re-read .env so changes on disk propagate into os.environ + load_dotenv(override=True) + with config_lock: + # Reload temperature offset old_offset = BRESSER_6IN1_TEMP_OFFSET BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0")) if old_offset != BRESSER_6IN1_TEMP_OFFSET: logger.info(f"Configuration reloaded: BRESSER_6IN1_TEMP_OFFSET changed from {old_offset}°C to {BRESSER_6IN1_TEMP_OFFSET}°C") + + # Reload battery change detection timing + old_min = BATTERY_CHANGE_MIN_SILENCE + old_max = BATTERY_CHANGE_MAX_SILENCE + BATTERY_CHANGE_MIN_SILENCE = int(os.getenv("BATTERY_CHANGE_MIN_SILENCE", "60")) + BATTERY_CHANGE_MAX_SILENCE = int(os.getenv("BATTERY_CHANGE_MAX_SILENCE", "600")) + if old_min != BATTERY_CHANGE_MIN_SILENCE or old_max != BATTERY_CHANGE_MAX_SILENCE: + logger.info(f"Configuration reloaded: BATTERY_CHANGE timing changed from {old_min}-{old_max}s to {BATTERY_CHANGE_MIN_SILENCE}-{BATTERY_CHANGE_MAX_SILENCE}s") except Exception as e: logger.error(f"Error reloading configuration from .env: {e}") @@ -193,10 +219,10 @@ def parse_radio_frame(byte_data): def refresh_sensor_cache(): """Refresh the sensor cache from database""" - global sensor_ids, sensor_names, sensor_by_name_room + global sensor_ids, sensor_names, sensor_by_name_room, pool_sensors_cache sensors = session.query(Sensor).all() sensor_ids = [f'{sensor.mqtt_name}_{sensor.mqtt_id}' for sensor in sensors] - sensor_names = [sensor.mqtt_name for sensor in sensors] + sensor_names = list(set([sensor.mqtt_name for sensor in sensors])) # Unique model names # Create a mapping of (mqtt_name_base, room) -> sensor for battery change detection sensor_by_name_room = {} @@ -206,10 +232,77 @@ def refresh_sensor_cache(): if sensor.room: sensor_by_name_room[(base_name, sensor.room)] = sensor + # Cache pool sensors by node_id for dynamic processing + pool_sensors_cache = {} + for sensor in sensors: + if sensor.mqtt_name == 'pool' and sensor.node_id is not None: + if sensor.node_id not in pool_sensors_cache: + pool_sensors_cache[sensor.node_id] = {} + # Map by sensor_type to easily identify BME vs DS + pool_sensors_cache[sensor.node_id][sensor.sensor_type] = sensor + return sensors +def build_sensor_lists_from_db(): + """Build sensor configuration lists from database instead of hardcoding. + Populates KNOWN_DEVICES, allowed_sensors_for_time, and ignored_sensors_for_time. + Call after refresh_sensor_cache() and whenever sensors are added/removed. + """ + global KNOWN_DEVICES, allowed_sensors_for_time, ignored_sensors_for_time + sensors = session.query(Sensor).all() + + # Build KNOWN_DEVICES - unique model names + KNOWN_DEVICES = list(set([sensor.mqtt_name for sensor in sensors])) + logger.info(f"Built KNOWN_DEVICES from database: {KNOWN_DEVICES}") + + # Build allowed_sensors_for_time - sensors that should be monitored for health + # These are the radio sensors that transmit regularly + allowed_types = ['Bresser-6in1', 'LaCrosse-TX35DTHIT'] + allowed_sensors_for_time = [ + f"{sensor.mqtt_name}_{sensor.mqtt_id}" + for sensor in sensors + if any(sensor_type in sensor.mqtt_name for sensor_type in allowed_types) + ] + logger.info(f"Built allowed_sensors_for_time from database: {allowed_sensors_for_time}") + + # Build ignored_sensors_for_time - sensors that don't need health monitoring + # These are local/wired sensors (BME280 I2C, Oregon sensors, etc.) + ignored_types = ['Bosch-BME280', 'Oregon-v1', 'Oregon-THGR122N', 'inFactory-TH'] + ignored_sensors_for_time = [ + f"{sensor.mqtt_name}_{sensor.mqtt_id}" + for sensor in sensors + if any(sensor_type in sensor.mqtt_name for sensor_type in ignored_types) + ] + logger.info(f"Built ignored_sensors_for_time from database: {ignored_sensors_for_time}") + +def get_sensor_keys(sensor_type): + """Return the list of MQTT keys to average for each sensor type. + This ensures we only extract fields that the sensor actually provides. + """ + keys_map = { + 'Bresser-6in1': ['temperature_C', 'humidity', 'wind_max_m_s', 'wind_avg_m_s', 'wind_dir_deg', 'rain_mm', 'battery_ok'], + 'Bosch-BME280': ['temperature_C', 'humidity', 'pressure_rel'], + 'LaCrosse-TX35DTHIT': ['temperature_C', 'humidity', 'battery_ok'], + 'Oregon-v1': ['temperature_C', 'battery_ok'], + 'Oregon-THGR122N': ['temperature_C', 'humidity', 'battery_ok'], + 'inFactory-TH': ['temperature_C', 'humidity', 'battery_ok'], + 'BME280': ['temperature_C', 'humidity', 'pressure_rel'], # Pool BME280 + 'DS18B20': ['temperature_C'], # Pool DS18B20 + } + # Fallback for unknown types - try to match by substring + for key_name, keys in keys_map.items(): + if key_name in sensor_type: + return keys + # Ultimate fallback - temperature only + logger.warning(f"Unknown sensor type '{sensor_type}' - defaulting to temperature only") + return ['temperature_C'] + sensors = refresh_sensor_cache() sensor_by_name_room = {} +pool_sensors_cache = {} + +# Build sensor lists from database at startup +build_sensor_lists_from_db() warte = '' @@ -421,6 +514,8 @@ def get_or_update_sensor(mqtt_name, mqtt_id): then update the mqtt_id (battery change scenario). Uses timing heuristic: if exactly one sensor of the type stopped transmitting recently, assume that's the one with battery change. + + For pool sensors: handles nodeId changes by updating both sensor_type rows if detected. """ # Try to find exact match first sensor = session.query(Sensor).filter_by(mqtt_name=mqtt_name, mqtt_id=mqtt_id).first() @@ -428,13 +523,27 @@ def get_or_update_sensor(mqtt_name, mqtt_id): if sensor: return sensor - # If not found, this might be a battery change - # For pool sensors, require exact match + # Special handling for pool sensors - check for nodeId changes if mqtt_name == 'pool': + # Try to find any pool sensor and check if nodeId changed + mqtt_id_int = int(mqtt_id) + old_node_id = mqtt_id_int // 10 # Extract nodeId from mqtt_id + + # Look for sensors with this node_id + pool_sensor = session.query(Sensor).filter_by(mqtt_name='pool', node_id=old_node_id).first() + if pool_sensor and pool_sensor.mqtt_id != mqtt_id: + # NodeId changed - update all sensors for this node + logger.warning(f"Pool sensor nodeId change detected: nodeId={old_node_id}, old mqtt_id={pool_sensor.mqtt_id}, new mqtt_id={mqtt_id}") + handle_pool_nodeid_change(old_node_id, mqtt_id_int) + # Try again after update + sensor = session.query(Sensor).filter_by(mqtt_name=mqtt_name, mqtt_id=mqtt_id).first() + if sensor: + return sensor + logger.warning(f"Pool sensor {mqtt_name}_{mqtt_id} not found in database. Please add it manually.") return None - # Extract base name (e.g., "LaCrosse-TX35DTHIT" from "LaCrosse-TX35DTHIT_28") + # For non-pool sensors: Extract base name and try to find by model base_name = mqtt_name.rsplit('_', 1)[0] if '_' in mqtt_name else mqtt_name # Find all sensors with matching base name @@ -470,10 +579,13 @@ def get_or_update_sensor(mqtt_name, mqtt_id): if sensor_key in last_transmission_times: last_seen = last_transmission_times[sensor_key] time_since_last = (current_time - last_seen.replace(tzinfo=timezone.utc)).total_seconds() - # Consider sensors that stopped in last 10 minutes but were active before - if 60 < time_since_last < 600: # Between 1 and 10 minutes + # Consider sensors that stopped within configured timing window + with config_lock: + min_silence = BATTERY_CHANGE_MIN_SILENCE + max_silence = BATTERY_CHANGE_MAX_SILENCE + if min_silence < time_since_last < max_silence: recent_silent.append((candidate, time_since_last)) - logger.info(f"Candidate {sensor_key} in room '{candidate.room}' last seen {time_since_last:.0f}s ago") + logger.info(f"Candidate {sensor_key} in room '{candidate.room}' last seen {time_since_last:.0f}s ago (battery change window: {min_silence}-{max_silence}s)") if len(recent_silent) == 1: # Exactly one sensor went silent recently - assume battery change @@ -510,13 +622,49 @@ def get_or_update_sensor(mqtt_name, mqtt_id): logger.warning(f" - {sensor_key} in room '{candidate.room}' (never seen)") if len(recent_silent) == 0: - logger.warning(f"No sensors stopped recently (1-10 min). Cannot determine which sensor changed battery.") + with config_lock: + min_silence = BATTERY_CHANGE_MIN_SILENCE + max_silence = BATTERY_CHANGE_MAX_SILENCE + logger.warning(f"No sensors stopped recently ({min_silence}-{max_silence}s window). Cannot determine which sensor changed battery.") else: logger.warning(f"{len(recent_silent)} sensors stopped recently. Cannot determine which one changed battery.") logger.warning(f"Please update manually: UPDATE sensors SET mqtt_name='{mqtt_name}', mqtt_id='{mqtt_id}' WHERE mqtt_name='[old_name]' AND mqtt_id='[old_id]' AND room='[room]';") return None +def handle_pool_nodeid_change(old_node_id, new_mqtt_id): + """Handle pool sensor nodeId change (battery reset). Updates both BME280 and DS18B20 sensors.""" + new_node_id = new_mqtt_id // 10 + + # Update BME280 sensor (nodeId*10+1) + bme_sensor = session.query(Sensor).filter_by( + mqtt_name='pool', + node_id=old_node_id, + sensor_type='BME280' + ).first() + + if bme_sensor: + old_mqtt_id = bme_sensor.mqtt_id + bme_sensor.mqtt_id = new_node_id * 10 + 1 + bme_sensor.node_id = new_node_id + logger.info(f"Pool BME280 sensor updated: node_id {old_node_id}→{new_node_id}, mqtt_id {old_mqtt_id}→{bme_sensor.mqtt_id}") + + # Update DS18B20 sensor (nodeId*10+2) + ds_sensor = session.query(Sensor).filter_by( + mqtt_name='pool', + node_id=old_node_id, + sensor_type='DS18B20' + ).first() + + if ds_sensor: + old_mqtt_id = ds_sensor.mqtt_id + ds_sensor.mqtt_id = new_node_id * 10 + 2 + ds_sensor.node_id = new_node_id + logger.info(f"Pool DS18B20 sensor updated: node_id {old_node_id}→{new_node_id}, mqtt_id {old_mqtt_id}→{ds_sensor.mqtt_id}") + + session.commit() + refresh_sensor_cache() + def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm): mqtt_name, mqtt_id = mqtt_name_id.split('_', 1) # Use maxsplit=1 to handle IDs with underscores @@ -540,11 +688,17 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b if temperature_inside is None or temperature_inside.temperature_c != temperature_c: temperature_inside = TemperatureInside(timestamp=utc_time, sensor_id=sensor.id, temperature_c=temperature_c) session.add(temperature_inside) + logger.debug(f"{mqtt_name_id}: Stored new temperature (inside): {temperature_c}°C") + else: + logger.debug(f"{mqtt_name_id}: Skipped temperature (inside) - unchanged: {temperature_c}°C") elif position == "outside": temperature_outside = session.query(TemperatureOutside).filter_by(sensor_id=sensor.id).order_by(TemperatureOutside.timestamp.desc()).first() if temperature_outside is None or temperature_outside.temperature_c != temperature_c: temperature_outside = TemperatureOutside(timestamp=utc_time, sensor_id=sensor.id, temperature_c=temperature_c) session.add(temperature_outside) + logger.debug(f"{mqtt_name_id}: Stored new temperature (outside): {temperature_c}°C") + else: + logger.debug(f"{mqtt_name_id}: Skipped temperature (outside) - unchanged: {temperature_c}°C") # Update the humidity data if humidity is not None: @@ -674,8 +828,9 @@ def process_sensor_data(utc_time, sensor_key, data_list, keys_to_average, mqtt_i if 'Bresser-6in1' in sensor_key: with config_lock: offset = BRESSER_6IN1_TEMP_OFFSET - temperature_c = temperature_c - offset - logger.debug(f"Applied Bresser-6in1 temperature offset: {offset}°C, corrected value: {temperature_c}°C") + original_temp = temperature_c + temperature_c = round(temperature_c - offset, 1) # Round to 1 decimal to avoid floating-point errors + logger.info(f"Applied Bresser-6in1 temperature offset: raw={original_temp}°C offset={offset}°C corrected={temperature_c}°C") update_data( utc_time, @@ -693,6 +848,9 @@ def process_sensor_data(utc_time, sensor_key, data_list, keys_to_average, mqtt_i return remaining def process_mqtt_messages(seen_messages): + """Process all received MQTT messages dynamically based on database configuration. + No hardcoded sensor checks - all sensors are processed based on what's in the database. + """ for sensor, data in seen_messages.items(): if data: # Try to get 'time' from first message, or use None as fallback @@ -701,66 +859,34 @@ def process_mqtt_messages(seen_messages): utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") - # Process pool sensors (BME280 and DS18B20) - pool_sensors = [k for k in seen_messages.keys() if k.startswith('pool_')] - for pool_key in pool_sensors: - if seen_messages[pool_key]: - sensor_id = pool_key.split('_')[1] - if sensor_id.endswith('1'): # BME280 (nodeId*10+1) - keys = ['temperature_C', 'humidity', 'pressure_rel'] - else: # DS18B20 (nodeId*10+2) - keys = ['temperature_C'] + # Process pool sensors dynamically from cache + for node_id, sensors_by_type in pool_sensors_cache.items(): + for sensor_type, db_sensor in sensors_by_type.items(): + sensor_key = f'pool_{db_sensor.mqtt_id}' + if sensor_key in seen_messages and seen_messages[sensor_key]: + # Get appropriate keys for this pool sensor type + keys = get_sensor_keys(sensor_type) + seen_messages[sensor_key] = process_sensor_data(utc_time, sensor_key, seen_messages[sensor_key], keys) + + # Process all non-pool sensors dynamically from database + # Query all non-pool sensors and process any that have received messages + all_sensors = session.query(Sensor).filter(Sensor.mqtt_name != 'pool').all() + + for db_sensor in all_sensors: + sensor_key = f"{db_sensor.mqtt_name}_{db_sensor.mqtt_id}" + + if sensor_key in seen_messages and seen_messages[sensor_key]: + # Get the appropriate keys for this sensor type + keys = get_sensor_keys(db_sensor.mqtt_name) - seen_messages[pool_key] = process_sensor_data(utc_time, pool_key, seen_messages[pool_key], keys) - - if 'Bresser-6in1_-2021550075' in seen_messages.keys(): - seen_messages['Bresser-6in1_-2021550075'] = process_sensor_data( - utc_time, 'Bresser-6in1_-2021550075', seen_messages['Bresser-6in1_-2021550075'], - ['temperature_C', 'humidity', 'wind_max_m_s', 'wind_avg_m_s', 'wind_dir_deg', 'rain_mm', 'battery_ok'] - ) - - if 'Bosch-BME280_1' in seen_messages.keys(): - seen_messages['Bosch-BME280_1'] = process_sensor_data( - utc_time, 'Bosch-BME280_1', seen_messages['Bosch-BME280_1'], - ['temperature_C', 'humidity', 'pressure_rel'] - ) - - if 'Oregon-v1_0' in seen_messages.keys(): - seen_messages['Oregon-v1_0'] = process_sensor_data( - utc_time, 'Oregon-v1_0', seen_messages['Oregon-v1_0'], - ['temperature_C', 'battery_ok'] - ) - - if 'Oregon-THGR122N_233' in seen_messages.keys(): - seen_messages['Oregon-THGR122N_233'] = process_sensor_data( - utc_time, 'Oregon-THGR122N_233', seen_messages['Oregon-THGR122N_233'], - ['temperature_C', 'humidity', 'battery_ok'] - ) - - if 'LaCrosse-TX35DTHIT_52' in seen_messages.keys(): - seen_messages['LaCrosse-TX35DTHIT_52'] = process_sensor_data( - utc_time, 'LaCrosse-TX35DTHIT_52', seen_messages['LaCrosse-TX35DTHIT_52'], - ['temperature_C', 'humidity', 'battery_ok'] - ) - - if 'LaCrosse-TX35DTHIT_20' in seen_messages.keys(): - seen_messages['LaCrosse-TX35DTHIT_20'] = process_sensor_data( - utc_time, 'LaCrosse-TX35DTHIT_20', seen_messages['LaCrosse-TX35DTHIT_20'], - ['temperature_C', 'humidity', 'battery_ok'] - ) - - if 'LaCrosse-TX35DTHIT_28' in seen_messages.keys(): - seen_messages['LaCrosse-TX35DTHIT_28'] = process_sensor_data( - utc_time, 'LaCrosse-TX35DTHIT_28', seen_messages['LaCrosse-TX35DTHIT_28'], - ['temperature_C', 'humidity', 'battery_ok'] - ) - - if 'LaCrosse-TX35DTHIT_31' in seen_messages.keys(): - seen_messages['LaCrosse-TX35DTHIT_31'] = process_sensor_data( - utc_time, 'LaCrosse-TX35DTHIT_31', seen_messages['LaCrosse-TX35DTHIT_31'], - ['temperature_C', 'humidity', 'battery_ok'] - ) - + # Process the sensor data with dynamically determined keys + seen_messages[sensor_key] = process_sensor_data( + utc_time, + sensor_key, + seen_messages[sensor_key], + keys + ) + # Seen messages already logged in main loop when devices are active pass diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index d78563e..be633a9 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -2,6 +2,8 @@ version: '3.8' services: data-collector: + # Override command to prevent auto-start in dev environment + command: sleep infinity # Override volumes for local dev; host workspace is mounted to /workspace. volumes: - ./:/workspace diff --git a/requirements.txt b/requirements.txt index c491c35..c17efab 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ paho-mqtt>=2.1.0 mysql-connector-python>=9.0.0 requests>=2.25.0 sqlalchemy>=1.4.0 -watchdog>=4.0.0 \ No newline at end of file +watchdog>=4.0.0 +python-dotenv \ No newline at end of file