diff --git a/datacollector.py b/datacollector.py index 5ee6da5..b11fd9a 100644 --- a/datacollector.py +++ b/datacollector.py @@ -17,7 +17,7 @@ from watchdog.events import FileSystemEventHandler from sqlalchemy import create_engine, exc, text from sqlalchemy.engine import URL # from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import sessionmaker, scoped_session from datetime import datetime, timezone from data_tables import Sensor, TemperatureInside,TemperatureOutside, HumidityOutside, HumidityInside, AirPressure, Wind, Precipitation, Voltage, Base @@ -40,7 +40,6 @@ logger.addHandler(handler) try: logger.info( f"Startup config: LOG_LEVEL={LOG_LEVEL}, " - f"BRESSER_6IN1_TEMP_OFFSET={float(os.getenv('BRESSER_6IN1_TEMP_OFFSET', '0'))}°C, " f"BATTERY_CHANGE_MIN_SILENCE={os.getenv('BATTERY_CHANGE_MIN_SILENCE', '60')}s, " f"BATTERY_CHANGE_MAX_SILENCE={os.getenv('BATTERY_CHANGE_MAX_SILENCE', '600')}s" ) @@ -88,9 +87,6 @@ STALL_WINDOW_SECONDS = int(os.getenv("STALL_WINDOW_SECONDS", "300")) # 5 minute RESTART_COOLDOWN_SECONDS = int(os.getenv("RESTART_COOLDOWN_SECONDS", "3600")) # 1 hour last_restart_time = None -# Load sensor-specific temperature offsets from environment -BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0")) - # Load battery change detection timing from environment BATTERY_CHANGE_MIN_SILENCE = int(os.getenv("BATTERY_CHANGE_MIN_SILENCE", "60")) # seconds BATTERY_CHANGE_MAX_SILENCE = int(os.getenv("BATTERY_CHANGE_MAX_SILENCE", "600")) # seconds @@ -106,18 +102,12 @@ class EnvFileWatcher(FileSystemEventHandler): def reload_config(): """Reload configuration values from .env file""" - global BRESSER_6IN1_TEMP_OFFSET, BATTERY_CHANGE_MIN_SILENCE, BATTERY_CHANGE_MAX_SILENCE + global BATTERY_CHANGE_MIN_SILENCE, BATTERY_CHANGE_MAX_SILENCE try: # Re-read .env so changes on disk propagate into os.environ load_dotenv(override=True) with config_lock: - # Reload temperature offset - old_offset = BRESSER_6IN1_TEMP_OFFSET - BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0")) - if old_offset != BRESSER_6IN1_TEMP_OFFSET: - logger.info(f"Configuration reloaded: BRESSER_6IN1_TEMP_OFFSET changed from {old_offset}°C to {BRESSER_6IN1_TEMP_OFFSET}°C") - # Reload battery change detection timing old_min = BATTERY_CHANGE_MIN_SILENCE old_max = BATTERY_CHANGE_MAX_SILENCE @@ -227,11 +217,16 @@ db_available = False last_db_check = 0.0 DB_RETRY_SECONDS = 30 # Retry DB connection every 30 seconds if down -# Create a configured "Session" class -Session = sessionmaker(bind=sql_engine) +# Create a thread-local session registry +Session = scoped_session(sessionmaker(bind=sql_engine)) +session = Session -# Create a session to interact with the database -session = Session() +def close_db_session(): + """Close the current thread-local session to return connections to the pool.""" + try: + Session.remove() + except Exception: + pass def parse_radio_frame(byte_data): """Parse radio frame with structure: @@ -485,7 +480,11 @@ def refresh_sensor_cache(): # Extract base name (before last underscore if it contains the ID) base_name = sensor.mqtt_name.rsplit('_', 1)[0] if '_' in sensor.mqtt_name else sensor.mqtt_name if sensor.room: - sensor_by_name_room[(base_name, sensor.room)] = sensor + sensor_by_name_room[(base_name, sensor.room)] = { + "mqtt_name": sensor.mqtt_name, + "mqtt_id": sensor.mqtt_id, + "room": sensor.room, + } # Cache pool sensors by node_id for dynamic processing pool_sensors_cache = {} @@ -494,8 +493,11 @@ def refresh_sensor_cache(): if sensor.node_id not in pool_sensors_cache: pool_sensors_cache[sensor.node_id] = {} # Map by sensor_type to easily identify BME vs DS - pool_sensors_cache[sensor.node_id][sensor.sensor_type] = sensor - + pool_sensors_cache[sensor.node_id][sensor.sensor_type] = { + "mqtt_id": sensor.mqtt_id, + "sensor_type": sensor.sensor_type, + } + close_db_session() return sensors def build_sensor_lists_from_db(): @@ -532,6 +534,7 @@ def build_sensor_lists_from_db(): if any(sensor_type in sensor.mqtt_name for sensor_type in ignored_types) ] logger.info(f"Built ignored_sensors_for_time from database: {ignored_sensors_for_time}") + close_db_session() def get_sensor_keys(sensor_type): """Return the list of MQTT keys to average for each sensor type. @@ -581,7 +584,7 @@ def ensure_db_connection(force: bool = False) -> bool: This function tests the connection and reinitializes the session if needed. On failure, it disposes the pool to force reconnection next attempt. """ - global db_available, last_db_check, session + global db_available, last_db_check now = time.time() if not force and (now - last_db_check) < DB_RETRY_SECONDS: return db_available @@ -594,9 +597,8 @@ def ensure_db_connection(force: bool = False) -> bool: if not db_available: logger.info(f"Database reachable again at {DB_HOST}:{DB_PORT}") db_available = True - # Recreate session to ensure fresh connections - session = Session() except exc.OperationalError as e: + close_db_session() # Connection failed - dispose pool to force fresh connections on next attempt sql_engine.dispose() if db_available: @@ -605,6 +607,7 @@ def ensure_db_connection(force: bool = False) -> bool: logger.info(f"Database still unreachable: {e}") db_available = False except Exception as e: + close_db_session() sql_engine.dispose() if db_available: logger.warning(f"Unexpected database error: {type(e).__name__}: {e}") @@ -619,12 +622,18 @@ def on_connect(client, userdata, flags, reason_code, properties): # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe(MQTT_TOPIC_PREFIX) - logger.info(f"Connected with result code {reason_code}") + logger.info(f"MQTT - Connected with result code {reason_code}") # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): if msg.topic.startswith(MQTT_TOPIC_PREFIX[:-2]): - d = json.loads(msg.payload.decode()) + try: + payload_text = msg.payload.decode(errors='replace') + d = json.loads(payload_text) + except json.JSONDecodeError as e: + logger.warning(f"Malformed JSON payload on {msg.topic}: {e}") + logger.debug(f"Raw payload: {payload_text}") + return model = d['model'] if model in sensor_names: if model == 'pool': @@ -962,6 +971,7 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b 'rain_mm': rain_mm, 'vcc_mv': vcc_mv, }) + close_db_session() return sensor = get_or_update_sensor(mqtt_name, mqtt_id) @@ -981,6 +991,7 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b 'rain_mm': rain_mm, 'vcc_mv': vcc_mv, }) + close_db_session() return position = sensor.position @@ -1115,6 +1126,7 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b 'rain_mm': rain_mm, 'vcc_mv': vcc_mv, }) + close_db_session() mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) @@ -1185,15 +1197,7 @@ def process_sensor_data(utc_time, sensor_key, data_list, keys_to_average, mqtt_i if averages: mqtt_id = mqtt_id_override if mqtt_id_override else sensor_key - # Apply sensor-specific temperature corrections temperature_c = averages.get('temperature_C') - if temperature_c is not None: - if 'Bresser-6in1' in sensor_key: - with config_lock: - offset = BRESSER_6IN1_TEMP_OFFSET - original_temp = temperature_c - temperature_c = round(temperature_c - offset, 1) # Round to 1 decimal to avoid floating-point errors - logger.info(f"Applied Bresser-6in1 temperature offset: raw={original_temp}°C offset={offset}°C corrected={temperature_c}°C") update_data( utc_time, @@ -1228,8 +1232,8 @@ def process_mqtt_messages(seen_messages): # Process pool sensors dynamically from cache for node_id, sensors_by_type in pool_sensors_cache.items(): - for sensor_type, db_sensor in sensors_by_type.items(): - sensor_key = f'pool_{db_sensor.mqtt_id}' + for sensor_type, pool_entry in sensors_by_type.items(): + sensor_key = f"pool_{pool_entry['mqtt_id']}" if sensor_key in seen_messages and seen_messages[sensor_key]: # Get appropriate keys for this pool sensor type keys = get_sensor_keys(sensor_type) @@ -1237,14 +1241,14 @@ def process_mqtt_messages(seen_messages): # Process all non-pool sensors dynamically from database # Query all non-pool sensors and process any that have received messages - all_sensors = session.query(Sensor).filter(Sensor.mqtt_name != 'pool').all() - - for db_sensor in all_sensors: - sensor_key = f"{db_sensor.mqtt_name}_{db_sensor.mqtt_id}" + all_sensors = session.query(Sensor.mqtt_name, Sensor.mqtt_id).filter(Sensor.mqtt_name != 'pool').all() + + for mqtt_name, mqtt_id in all_sensors: + sensor_key = f"{mqtt_name}_{mqtt_id}" if sensor_key in seen_messages and seen_messages[sensor_key]: # Get the appropriate keys for this sensor type - keys = get_sensor_keys(db_sensor.mqtt_name) + keys = get_sensor_keys(mqtt_name) # Process the sensor data with dynamically determined keys seen_messages[sensor_key] = process_sensor_data( @@ -1255,7 +1259,7 @@ def process_mqtt_messages(seen_messages): ) # Seen messages already logged in main loop when devices are active - pass + close_db_session() # Funktion zum Abrufen und Löschen der JSON-Daten aus SQLite def get_and_delete_json_data(): @@ -1279,63 +1283,65 @@ def get_and_delete_json_data(): # Funktion zum Synchronisieren der Daten def sync_data(): global session - - if not ensure_db_connection(force=True): - # MariaDB nicht verfügbar - speichere in SQLite + try: + if not ensure_db_connection(force=True): + # MariaDB nicht verfügbar - speichere in SQLite + while new_data_queue: + data = new_data_queue.pop(0) + save_json_locally(data) + return + + local_data_written = False + + # Zuerst lokal gespeicherte Daten synchronisieren (SQLite Fallback) + local_data = get_and_delete_json_data() + for data in local_data: + try: + if isinstance(data, dict) and 'utc_time' in data: + # Einzelner Sensor-Eintrag + if " UTC" in str(data.get('utc_time', '')): + data['utc_time'] = data['utc_time'].replace(" UTC", "") + + store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'), + data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1), + data.get('average_speed'), data.get('direction'), + data.get('gust'), data.get('rain_mm'), data.get('vcc_mv')) + if not local_data_written: + utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") + logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB") + local_data_written = True + except exc.SQLAlchemyError as e: + logger.error(f"SQLAlchemyError syncing local data: {e}") + try: + session.rollback() + except Exception: + pass # Ignore rollback errors if connection is lost + # Rette den Datensatz zurück in SQLite + save_json_locally(data) + except Exception as e: + logger.error(f"Error syncing local data: {e}") + save_json_locally(data) + + # Danach neue Daten aus der Warteschlange synchronisieren while new_data_queue: data = new_data_queue.pop(0) - save_json_locally(data) - return - - local_data_written = False - - # Zuerst lokal gespeicherte Daten synchronisieren (SQLite Fallback) - local_data = get_and_delete_json_data() - for data in local_data: - try: - if isinstance(data, dict) and 'utc_time' in data: - # Einzelner Sensor-Eintrag - if " UTC" in str(data.get('utc_time', '')): - data['utc_time'] = data['utc_time'].replace(" UTC", "") - - store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'), - data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1), - data.get('average_speed'), data.get('direction'), - data.get('gust'), data.get('rain_mm'), data.get('vcc_mv')) - if not local_data_written: - utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") - logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB") - local_data_written = True - except exc.SQLAlchemyError as e: - logger.error(f"SQLAlchemyError syncing local data: {e}") try: - session.rollback() - except Exception: - pass # Ignore rollback errors if connection is lost - # Rette den Datensatz zurück in SQLite - save_json_locally(data) - except Exception as e: - logger.error(f"Error syncing local data: {e}") - save_json_locally(data) - - # Danach neue Daten aus der Warteschlange synchronisieren - while new_data_queue: - data = new_data_queue.pop(0) - try: - if isinstance(data, dict) and 'mqtt_id' in data: - store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'], - data['pressure_rel'], data['battery'], data['average_speed'], data['direction'], - data['gust'], data['rain_mm'], data.get('vcc_mv')) - except exc.SQLAlchemyError as e: - logger.error(f"SQLAlchemyError: {e}") - try: - session.rollback() - except Exception: - pass # Ignore rollback errors if connection is lost - save_json_locally(data) - except Exception as e: - logger.error(f"Error writing data: {e}") - save_json_locally(data) + if isinstance(data, dict) and 'mqtt_id' in data: + store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'], + data['pressure_rel'], data['battery'], data['average_speed'], data['direction'], + data['gust'], data['rain_mm'], data.get('vcc_mv')) + except exc.SQLAlchemyError as e: + logger.error(f"SQLAlchemyError: {e}") + try: + session.rollback() + except Exception: + pass # Ignore rollback errors if connection is lost + save_json_locally(data) + except Exception as e: + logger.error(f"Error writing data: {e}") + save_json_locally(data) + finally: + close_db_session() def update_last_transmission_time(sensor, time_value): """Update last transmission time for a sensor. Uses provided time or falls back to current time if not available."""