import json import time import paho.mqtt.client as mqtt import sqlite3 import requests import logging import struct import subprocess import os import threading from dotenv import load_dotenv from logging.handlers import RotatingFileHandler from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from sqlalchemy import create_engine, exc # from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime, timezone from data_tables import Sensor, TemperatureInside,TemperatureOutside, HumidityOutside, HumidityInside, AirPressure, Wind, Precipitation # Load .env file so environment variables from .env are available at startup load_dotenv() # Configure logging with environment-based log level LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logger = logging.getLogger(__name__) logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) handler = RotatingFileHandler('datacollector.log', maxBytes=100000000, backupCount=1) handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(handler) # Log loaded configuration values at startup (after logger setup) try: logger.info( f"Startup config: LOG_LEVEL={LOG_LEVEL}, " f"BRESSER_6IN1_TEMP_OFFSET={float(os.getenv('BRESSER_6IN1_TEMP_OFFSET', '0'))}°C, " f"BATTERY_CHANGE_MIN_SILENCE={os.getenv('BATTERY_CHANGE_MIN_SILENCE', '60')}s, " f"BATTERY_CHANGE_MAX_SILENCE={os.getenv('BATTERY_CHANGE_MAX_SILENCE', '600')}s" ) except Exception: pass # Malformed hex logging (controlled via environment). Enable with LOG_MALFORMED_HEX=true|1|yes|on LOG_MALFORMED_HEX = os.getenv("LOG_MALFORMED_HEX", "false").strip().lower() in ("1", "true", "yes", "on") malformed_hex_logger = logging.getLogger('malformed_hex') malformed_hex_logger.setLevel(logging.INFO) if LOG_MALFORMED_HEX: malformed_hex_handler = RotatingFileHandler('mal-hex.log', maxBytes=10000000, backupCount=1) malformed_hex_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) malformed_hex_logger.addHandler(malformed_hex_handler) MQTT_SERVER = "192.168.43.102" MQTT_TOPIC_PREFIX = "rtl_433/DietPi/events" KNOWN_DEVICES = ["inFactory-TH_25", "Oregon-THGR122N_233", "Oregon-v1_0", "Bresser-6in1_-2021550075", "Bosch-BME280_1", "pool"] # Remote Pi management (container runs on NAS) PI_HOST = os.getenv("PI_HOST") PI_USER = os.getenv("PI_USER", "pi") SSH_KEY_PATH = os.getenv("SSH_KEY_PATH", "/workspace/.ssh/id_rsa") seen_messages = {} new_data_queue = [] last_transmission_times = {} ignored_sensors_for_time = ['Bosch-BME280_1', 'Oregon-v1_0', 'inFactory-TH_252', 'Oregon-THGR122N_233'] allowed_sensors_for_time = ['Bresser-6in1_-2021550075', 'LaCrosse-TX35DTHIT_20', 'LaCrosse-TX35DTHIT_28', 'LaCrosse-TX35DTHIT_52', 'LaCrosse-TX35DTHIT_31'] debug = False # Track sensor failure states to avoid repeated logging sensor_failure_logged = {} # Watchdog configuration # If only BME280 is active within this window and radio sensors are silent, restart STALL_WINDOW_SECONDS = int(os.getenv("STALL_WINDOW_SECONDS", "300")) # 5 minutes RESTART_COOLDOWN_SECONDS = int(os.getenv("RESTART_COOLDOWN_SECONDS", "3600")) # 1 hour last_restart_time = None # Load sensor-specific temperature offsets from environment BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0")) # Load battery change detection timing from environment BATTERY_CHANGE_MIN_SILENCE = int(os.getenv("BATTERY_CHANGE_MIN_SILENCE", "60")) # seconds BATTERY_CHANGE_MAX_SILENCE = int(os.getenv("BATTERY_CHANGE_MAX_SILENCE", "600")) # seconds config_lock = threading.Lock() # File watcher for runtime configuration changes class EnvFileWatcher(FileSystemEventHandler): """Watch for changes to .env file and reload configuration""" def on_modified(self, event): if event.src_path.endswith('.env'): reload_config() def reload_config(): """Reload configuration values from .env file""" global BRESSER_6IN1_TEMP_OFFSET, BATTERY_CHANGE_MIN_SILENCE, BATTERY_CHANGE_MAX_SILENCE try: # Re-read .env so changes on disk propagate into os.environ load_dotenv(override=True) with config_lock: # Reload temperature offset old_offset = BRESSER_6IN1_TEMP_OFFSET BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0")) if old_offset != BRESSER_6IN1_TEMP_OFFSET: logger.info(f"Configuration reloaded: BRESSER_6IN1_TEMP_OFFSET changed from {old_offset}°C to {BRESSER_6IN1_TEMP_OFFSET}°C") # Reload battery change detection timing old_min = BATTERY_CHANGE_MIN_SILENCE old_max = BATTERY_CHANGE_MAX_SILENCE BATTERY_CHANGE_MIN_SILENCE = int(os.getenv("BATTERY_CHANGE_MIN_SILENCE", "60")) BATTERY_CHANGE_MAX_SILENCE = int(os.getenv("BATTERY_CHANGE_MAX_SILENCE", "600")) if old_min != BATTERY_CHANGE_MIN_SILENCE or old_max != BATTERY_CHANGE_MAX_SILENCE: logger.info(f"Configuration reloaded: BATTERY_CHANGE timing changed from {old_min}-{old_max}s to {BATTERY_CHANGE_MIN_SILENCE}-{BATTERY_CHANGE_MAX_SILENCE}s") except Exception as e: logger.error(f"Error reloading configuration from .env: {e}") # Initialize file watcher env_observer = None def start_env_watcher(): """Start watching .env file for changes""" global env_observer try: env_observer = Observer() event_handler = EnvFileWatcher() env_observer.schedule(event_handler, path='.', recursive=False) env_observer.start() logger.info("Environment file watcher started") except Exception as e: logger.warning(f"Failed to start environment file watcher: {e}") def stop_env_watcher(): """Stop watching .env file""" global env_observer if env_observer: env_observer.stop() env_observer.join(timeout=5) logger.info("Environment file watcher stopped") # Verbindung zur SQLite-Datenbank herstellen (Fallback wenn MariaDB ausfällt) sqlite_db_path = 'data/local_backup.db' os.makedirs('data', exist_ok=True) sqlite_conn = sqlite3.connect(sqlite_db_path, check_same_thread=False) sqlite_cursor = sqlite_conn.cursor() # Tabelle für Anfragen erstellen, falls sie nicht existiert sqlite_cursor.execute(''' CREATE TABLE IF NOT EXISTS json_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') sqlite_conn.commit() logger.info(f"SQLite database initialized at: {sqlite_db_path}") # Create a connection to the database sql_engine = create_engine('mysql+mysqlconnector://weatherdata:cfCU$swM!HfK82%*@192.168.43.102/weatherdata') # Create a configured "Session" class Session = sessionmaker(bind=sql_engine) # Create a session to interact with the database session = Session() def parse_radio_frame(byte_data): """Parse radio frame with structure: Preamble: 0xAA repeated (often 3x) Sync: 0x2D (optionally followed by 0xD4) Header (4 bytes): payload_len (u8), dest_id (u8), sender_id (u8), ctl (u8) Data: payload_len bytes CRC: 2 bytes (polynomial unknown) – returned but not verified here Returns dict with extracted 'data' and header fields, or None if not found/invalid. """ if not byte_data: return None try: # Find sync 0x2D followed by networkId (second byte) sync_index = byte_data.find(b"\x2d") if sync_index == -1: return None if sync_index + 1 >= len(byte_data): # No room for networkId byte return None network_id = byte_data[sync_index + 1] sync_len = 2 header_start = sync_index + sync_len if header_start + 4 > len(byte_data): return None payload_len, dest_id, sender_id, ctl = struct.unpack_from(' len(byte_data): # Not enough bytes for data + 2-byte CRC return None data = byte_data[data_start:data_end] crc_bytes = byte_data[data_end:data_end + 2] return { 'data': data, 'payload_len': payload_len, 'dest_id': dest_id, 'sender_id': sender_id, 'ctl': ctl, 'network_id': network_id, 'crc_bytes': crc_bytes, 'sync_index': sync_index, 'sync_len': sync_len, } except Exception: return None def refresh_sensor_cache(): """Refresh the sensor cache from database""" global sensor_ids, sensor_names, sensor_by_name_room, pool_sensors_cache sensors = session.query(Sensor).all() sensor_ids = [f'{sensor.mqtt_name}_{sensor.mqtt_id}' for sensor in sensors] sensor_names = list(set([sensor.mqtt_name for sensor in sensors])) # Unique model names # Create a mapping of (mqtt_name_base, room) -> sensor for battery change detection sensor_by_name_room = {} for sensor in sensors: # Extract base name (before last underscore if it contains the ID) base_name = sensor.mqtt_name.rsplit('_', 1)[0] if '_' in sensor.mqtt_name else sensor.mqtt_name if sensor.room: sensor_by_name_room[(base_name, sensor.room)] = sensor # Cache pool sensors by node_id for dynamic processing pool_sensors_cache = {} for sensor in sensors: if sensor.mqtt_name == 'pool' and sensor.node_id is not None: if sensor.node_id not in pool_sensors_cache: pool_sensors_cache[sensor.node_id] = {} # Map by sensor_type to easily identify BME vs DS pool_sensors_cache[sensor.node_id][sensor.sensor_type] = sensor return sensors def build_sensor_lists_from_db(): """Build sensor configuration lists from database instead of hardcoding. Populates KNOWN_DEVICES, allowed_sensors_for_time, and ignored_sensors_for_time. Call after refresh_sensor_cache() and whenever sensors are added/removed. """ global KNOWN_DEVICES, allowed_sensors_for_time, ignored_sensors_for_time sensors = session.query(Sensor).all() # Build KNOWN_DEVICES - unique model names KNOWN_DEVICES = list(set([sensor.mqtt_name for sensor in sensors])) logger.info(f"Built KNOWN_DEVICES from database: {KNOWN_DEVICES}") # Build allowed_sensors_for_time - sensors that should be monitored for health # These are the radio sensors that transmit regularly allowed_types = ['Bresser-6in1', 'LaCrosse-TX35DTHIT'] allowed_sensors_for_time = [ f"{sensor.mqtt_name}_{sensor.mqtt_id}" for sensor in sensors if any(sensor_type in sensor.mqtt_name for sensor_type in allowed_types) ] logger.info(f"Built allowed_sensors_for_time from database: {allowed_sensors_for_time}") # Build ignored_sensors_for_time - sensors that don't need health monitoring # These are local/wired sensors (BME280 I2C, Oregon sensors, etc.) ignored_types = ['Bosch-BME280', 'Oregon-v1', 'Oregon-THGR122N', 'inFactory-TH'] ignored_sensors_for_time = [ f"{sensor.mqtt_name}_{sensor.mqtt_id}" for sensor in sensors if any(sensor_type in sensor.mqtt_name for sensor_type in ignored_types) ] logger.info(f"Built ignored_sensors_for_time from database: {ignored_sensors_for_time}") def get_sensor_keys(sensor_type): """Return the list of MQTT keys to average for each sensor type. This ensures we only extract fields that the sensor actually provides. """ keys_map = { 'Bresser-6in1': ['temperature_C', 'humidity', 'wind_max_m_s', 'wind_avg_m_s', 'wind_dir_deg', 'rain_mm', 'battery_ok'], 'Bosch-BME280': ['temperature_C', 'humidity', 'pressure_rel'], 'LaCrosse-TX35DTHIT': ['temperature_C', 'humidity', 'battery_ok'], 'Oregon-v1': ['temperature_C', 'battery_ok'], 'Oregon-THGR122N': ['temperature_C', 'humidity', 'battery_ok'], 'inFactory-TH': ['temperature_C', 'humidity', 'battery_ok'], 'BME280': ['temperature_C', 'humidity', 'pressure_rel'], # Pool BME280 'DS18B20': ['temperature_C'], # Pool DS18B20 } # Fallback for unknown types - try to match by substring for key_name, keys in keys_map.items(): if key_name in sensor_type: return keys # Ultimate fallback - temperature only logger.warning(f"Unknown sensor type '{sensor_type}' - defaulting to temperature only") return ['temperature_C'] sensors = refresh_sensor_cache() sensor_by_name_room = {} pool_sensors_cache = {} # Build sensor lists from database at startup build_sensor_lists_from_db() warte = '' # Funktion zum Überprüfen der Remote-Server-Verbindung def is_remote_server_available(): try: response = requests.get('http://192.168.43.102') return response.status_code == 200 except requests.ConnectionError: return False # return True # Funktion zum Speichern der JSON-Daten in SQLite def save_json_locally(json_dict): try: json_str = json.dumps(json_dict) sqlite_cursor.execute('INSERT INTO json_data (data) VALUES (?)', (json_str,)) sqlite_conn.commit() logger.info(f"Data saved to local SQLite database") except Exception as e: logger.error(f"Error saving to SQLite: {e}") # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, flags, reason_code, properties): # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe(MQTT_TOPIC_PREFIX) print(f"Connected with result code {reason_code}") # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): if msg.topic.startswith(MQTT_TOPIC_PREFIX[:-2]): d = json.loads(msg.payload.decode()) model = d['model'] if model in sensor_names: if model == 'pool': test_value = d['rows'][0]['data'] if not test_value.startswith('aaaaaa'): return else: # Decode the hex-encoded binary payload and scan for a plausible struct hex_data = test_value[6:] # Remove 'aaaaaa' prefix try: byte_data = bytes.fromhex(hex_data) except ValueError: if LOG_MALFORMED_HEX: malformed_hex_logger.info(f"Invalid hex: {hex_data}") print(f"Invalid hex data: {hex_data}") print(d) warte = '' return # Attempt to parse the radio frame first (preamble/sync/header/data/crc) frame = parse_radio_frame(byte_data) if frame and frame.get('data'): print( f"Parsed radio frame: netId={frame.get('network_id')}, len={frame['payload_len']}, " f"dest={frame['dest_id']}, sender={frame['sender_id']}, ctl={frame['ctl']}, crc={frame['crc_bytes'].hex()}" ) candidate_bytes = frame['data'] else: # Fallback: Drop optional leading 0xAA bytes from hardware and use raw stream if LOG_MALFORMED_HEX and not frame: malformed_hex_logger.info(f"Frame parse failed: {byte_data.hex()}") tmp = byte_data while tmp.startswith(b"\xaa"): tmp = tmp[1:] candidate_bytes = tmp print(f"Raw bytes ({len(byte_data)}): {byte_data.hex()}") print(f"Candidate payload for app decode ({len(candidate_bytes)}): {candidate_bytes.hex()}") # Decode payload struct with magic bytes and CRC # struct: magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc PAYLOAD_SIZE = 15 MAGIC1 = 0x42 MAGIC2 = 0x99 def calculate_crc8(data): """Simple XOR checksum""" c = 0 for byte in data: c ^= byte return c # Scan for magic bytes within candidate payload magic_offset = -1 for i in range(len(candidate_bytes) - 1): if candidate_bytes[i] == MAGIC1 and candidate_bytes[i+1] == MAGIC2: magic_offset = i break if magic_offset == -1: if LOG_MALFORMED_HEX: malformed_hex_logger.info(f"Magic bytes not found: {candidate_bytes.hex()}") print(f"Magic bytes {MAGIC1:02x} {MAGIC2:02x} not found in payload") elif len(candidate_bytes) - magic_offset < PAYLOAD_SIZE: print(f"Payload too short after magic bytes: {len(candidate_bytes) - magic_offset} bytes (need {PAYLOAD_SIZE})") else: try: # Extract payload starting from magic bytes payload_data = candidate_bytes[magic_offset:magic_offset + PAYLOAD_SIZE] # Unpack: BBBBHhhhHB = magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack(' id=11 for BME 'battery_ok': 1, 'temperature_C': parsed_struct['t_bme_c'], 'humidity': parsed_struct['humidity'], 'pressure_rel': parsed_struct['pressure_hpa'], 'mic': 'CRC' if parsed_struct['crc_valid'] else 'CHECKSUM' } # Message 2: DS18B20 sensor (temp only) ds_msg = { 'time': original_time, 'model': 'pool', 'id': nodeId * 10 + 2, # e.g., nodeId=1 -> id=12 for DS 'battery_ok': 1, 'temperature_C': parsed_struct['t_ds_c'], 'mic': 'CRC' if parsed_struct['crc_valid'] else 'CHECKSUM' } # Process both messages through the existing logic for msg_data in [bme_msg, ds_msg]: print(f"Received message from {msg_data['model']}: \n {msg_data}") sensor_id = msg_data['id'] sensor_key = f"{msg_data['model']}_{sensor_id}" if sensor_key not in seen_messages.keys(): seen_messages[sensor_key] = [msg_data] else: seen_messages[sensor_key].append(msg_data) except struct.error as e: print(f"Struct unpack error: {e}") warte = '' return print(f"Received message from {model}: \n {d}") id = d['id'] if model == 'Bresser-6in1': if d['flags'] == 0: if 'rain_mm' in d.keys(): del d['rain_mm'] sensor_key = f'{model}_{id}' if sensor_key not in seen_messages.keys(): seen_messages[sensor_key] = [d] else: seen_messages[sensor_key].append(d) # Define a function to update the data in the database def update_data(utc_time, mqtt_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm): values = { "utc_time": utc_time, "mqtt_id": mqtt_id, "temperature_c": temperature_c, "humidity": humidity, "pressure_rel": pressure_rel, "battery": battery, "average_speed": average_speed, "direction": direction, "gust": gust, "rain_mm": rain_mm } if is_remote_server_available(): new_data_queue.append(values) sync_data() # Data sent - no logging needed for normal operation else: logger.warning(f"{utc_time}: Remote server unavailable - storing locally for {mqtt_id}") save_json_locally(values) def get_or_update_sensor(mqtt_name, mqtt_id): """ Get sensor by mqtt_name and mqtt_id. If not found, try to find by base mqtt_name, then update the mqtt_id (battery change scenario). Uses timing heuristic: if exactly one sensor of the type stopped transmitting recently, assume that's the one with battery change. For pool sensors: handles nodeId changes by updating both sensor_type rows if detected. """ # Try to find exact match first sensor = session.query(Sensor).filter_by(mqtt_name=mqtt_name, mqtt_id=mqtt_id).first() if sensor: return sensor # Special handling for pool sensors - check for nodeId changes if mqtt_name == 'pool': # Try to find any pool sensor and check if nodeId changed mqtt_id_int = int(mqtt_id) old_node_id = mqtt_id_int // 10 # Extract nodeId from mqtt_id # Look for sensors with this node_id pool_sensor = session.query(Sensor).filter_by(mqtt_name='pool', node_id=old_node_id).first() if pool_sensor and pool_sensor.mqtt_id != mqtt_id: # NodeId changed - update all sensors for this node logger.warning(f"Pool sensor nodeId change detected: nodeId={old_node_id}, old mqtt_id={pool_sensor.mqtt_id}, new mqtt_id={mqtt_id}") handle_pool_nodeid_change(old_node_id, mqtt_id_int) # Try again after update sensor = session.query(Sensor).filter_by(mqtt_name=mqtt_name, mqtt_id=mqtt_id).first() if sensor: return sensor logger.warning(f"Pool sensor {mqtt_name}_{mqtt_id} not found in database. Please add it manually.") return None # For non-pool sensors: Extract base name and try to find by model base_name = mqtt_name.rsplit('_', 1)[0] if '_' in mqtt_name else mqtt_name # Find all sensors with matching base name candidates = session.query(Sensor).filter( Sensor.mqtt_name.like(f'{base_name}%') ).all() if not candidates: logger.warning(f"Sensor {mqtt_name}_{mqtt_id} not found in database. Please add it manually.") return None # If only one candidate, update it if len(candidates) == 1: old_id = candidates[0].mqtt_id old_name = candidates[0].mqtt_name candidates[0].mqtt_name = mqtt_name candidates[0].mqtt_id = mqtt_id session.commit() logger.info(f"Updated sensor in room '{candidates[0].room}': {old_name}_{old_id} -> {mqtt_name}_{mqtt_id} (battery change detected)") # Refresh the sensor cache to reflect the updated ID refresh_sensor_cache() return candidates[0] # Multiple candidates - use timing heuristic # Check which sensors have stopped transmitting recently (within last 10 minutes) current_time = datetime.now(timezone.utc) recent_silent = [] for candidate in candidates: sensor_key = f"{candidate.mqtt_name}_{candidate.mqtt_id}" if sensor_key in last_transmission_times: last_seen = last_transmission_times[sensor_key] time_since_last = (current_time - last_seen.replace(tzinfo=timezone.utc)).total_seconds() # Consider sensors that stopped within configured timing window with config_lock: min_silence = BATTERY_CHANGE_MIN_SILENCE max_silence = BATTERY_CHANGE_MAX_SILENCE if min_silence < time_since_last < max_silence: recent_silent.append((candidate, time_since_last)) logger.info(f"Candidate {sensor_key} in room '{candidate.room}' last seen {time_since_last:.0f}s ago (battery change window: {min_silence}-{max_silence}s)") if len(recent_silent) == 1: # Exactly one sensor went silent recently - assume battery change sensor_to_update, time_ago = recent_silent[0] old_id = sensor_to_update.mqtt_id old_name = sensor_to_update.mqtt_name old_key = f"{old_name}_{old_id}" sensor_to_update.mqtt_name = mqtt_name sensor_to_update.mqtt_id = mqtt_id session.commit() logger.info(f"AUTO-UPDATE: Sensor in room '{sensor_to_update.room}' (last seen {time_ago:.0f}s ago)") logger.info(f" Changed: {old_name}_{old_id} -> {mqtt_name}_{mqtt_id} (battery change detected)") # Update last_transmission_times key if old_key in last_transmission_times: del last_transmission_times[old_key] # Refresh the sensor cache refresh_sensor_cache() return sensor_to_update # Multiple or no recent silent sensors - need manual intervention logger.warning(f"Cannot auto-update mqtt_id for {mqtt_name}_{mqtt_id}. Found {len(candidates)} candidates:") for candidate in candidates: sensor_key = f"{candidate.mqtt_name}_{candidate.mqtt_id}" if sensor_key in last_transmission_times: last_seen = last_transmission_times[sensor_key] time_ago = (current_time - last_seen.replace(tzinfo=timezone.utc)).total_seconds() logger.warning(f" - {sensor_key} in room '{candidate.room}' (last seen {time_ago:.0f}s ago)") else: logger.warning(f" - {sensor_key} in room '{candidate.room}' (never seen)") if len(recent_silent) == 0: with config_lock: min_silence = BATTERY_CHANGE_MIN_SILENCE max_silence = BATTERY_CHANGE_MAX_SILENCE logger.warning(f"No sensors stopped recently ({min_silence}-{max_silence}s window). Cannot determine which sensor changed battery.") else: logger.warning(f"{len(recent_silent)} sensors stopped recently. Cannot determine which one changed battery.") logger.warning(f"Please update manually: UPDATE sensors SET mqtt_name='{mqtt_name}', mqtt_id='{mqtt_id}' WHERE mqtt_name='[old_name]' AND mqtt_id='[old_id]' AND room='[room]';") return None def handle_pool_nodeid_change(old_node_id, new_mqtt_id): """Handle pool sensor nodeId change (battery reset). Updates both BME280 and DS18B20 sensors.""" new_node_id = new_mqtt_id // 10 # Update BME280 sensor (nodeId*10+1) bme_sensor = session.query(Sensor).filter_by( mqtt_name='pool', node_id=old_node_id, sensor_type='BME280' ).first() if bme_sensor: old_mqtt_id = bme_sensor.mqtt_id bme_sensor.mqtt_id = new_node_id * 10 + 1 bme_sensor.node_id = new_node_id logger.info(f"Pool BME280 sensor updated: node_id {old_node_id}→{new_node_id}, mqtt_id {old_mqtt_id}→{bme_sensor.mqtt_id}") # Update DS18B20 sensor (nodeId*10+2) ds_sensor = session.query(Sensor).filter_by( mqtt_name='pool', node_id=old_node_id, sensor_type='DS18B20' ).first() if ds_sensor: old_mqtt_id = ds_sensor.mqtt_id ds_sensor.mqtt_id = new_node_id * 10 + 2 ds_sensor.node_id = new_node_id logger.info(f"Pool DS18B20 sensor updated: node_id {old_node_id}→{new_node_id}, mqtt_id {old_mqtt_id}→{ds_sensor.mqtt_id}") session.commit() refresh_sensor_cache() def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm): mqtt_name, mqtt_id = mqtt_name_id.split('_', 1) # Use maxsplit=1 to handle IDs with underscores # Get the sensor object from the database (with auto-update for battery changes) sensor = get_or_update_sensor(mqtt_name, mqtt_id) if not sensor: logger.error(f"Cannot store data for {mqtt_name_id} - sensor not found in database") return position = sensor.position # Update the sensor's battery level sensor.battery = battery # Update the temperature data if temperature_c is not None: if position == "inside": temperature_inside = session.query(TemperatureInside).filter_by(sensor_id=sensor.id).order_by(TemperatureInside.timestamp.desc()).first() if temperature_inside is None or temperature_inside.temperature_c != temperature_c: temperature_inside = TemperatureInside(timestamp=utc_time, sensor_id=sensor.id, temperature_c=temperature_c) session.add(temperature_inside) logger.debug(f"{mqtt_name_id}: Stored new temperature (inside): {temperature_c}°C") else: logger.debug(f"{mqtt_name_id}: Skipped temperature (inside) - unchanged: {temperature_c}°C") elif position == "outside": temperature_outside = session.query(TemperatureOutside).filter_by(sensor_id=sensor.id).order_by(TemperatureOutside.timestamp.desc()).first() if temperature_outside is None or temperature_outside.temperature_c != temperature_c: temperature_outside = TemperatureOutside(timestamp=utc_time, sensor_id=sensor.id, temperature_c=temperature_c) session.add(temperature_outside) logger.debug(f"{mqtt_name_id}: Stored new temperature (outside): {temperature_c}°C") else: logger.debug(f"{mqtt_name_id}: Skipped temperature (outside) - unchanged: {temperature_c}°C") # Update the humidity data if humidity is not None: if position == "inside": humidity_inside = session.query(HumidityInside).filter_by(sensor_id=sensor.id).order_by(HumidityInside.timestamp.desc()).first() if humidity_inside is None or humidity_inside.humidity != humidity: humidity_inside = HumidityInside(timestamp=utc_time, sensor_id=sensor.id, humidity=humidity) session.add(humidity_inside) elif position == "outside": humidity_outside = session.query(HumidityOutside).filter_by(sensor_id=sensor.id).order_by(HumidityOutside.timestamp.desc()).first() if humidity_outside is None or humidity_outside.humidity != humidity: humidity_outside = HumidityOutside(timestamp=utc_time, sensor_id=sensor.id, humidity=humidity) session.add(humidity_outside) # Update the air pressure data if pressure_rel is not None: air_pressure = session.query(AirPressure).filter_by(sensor_id=sensor.id).order_by(AirPressure.timestamp.desc()).first() if air_pressure is None or air_pressure.pressure_rel != pressure_rel: air_pressure = AirPressure(timestamp=utc_time, sensor_id=sensor.id, pressure_rel=pressure_rel) session.add(air_pressure) if average_speed is not None or gust is not None or direction is not None: wind_value = session.query(Wind).filter_by(sensor_id=sensor.id).order_by(Wind.timestamp.desc()).first() if wind_value is None or (average_speed is not None and wind_value.average_speed != average_speed) or (gust is not None and wind_value.gust != gust) or (direction is not None and wind_value.direction != direction): wind_value = Wind(timestamp=utc_time, sensor_id=sensor.id, average_speed=average_speed, direction=direction, gust=gust) session.add(wind_value) # Update Precipitation data with cumulative offset handling if rain_mm is not None: if rain_mm <= 1000: # Check for rain sensor reset (battery change) # If current value is significantly less than last value, it's a reset if sensor.last_rain_value is not None and sensor.last_rain_value > 0: if rain_mm < sensor.last_rain_value - 1.0: # Dropped by more than 1mm (reset detected) # Battery was changed, add last value to offset sensor.rain_offset = (sensor.rain_offset or 0.0) + sensor.last_rain_value logger.info(f"Rain sensor reset detected for {mqtt_name_id}. Last value: {sensor.last_rain_value}mm, New offset: {sensor.rain_offset}mm") # Calculate actual cumulative rain including offset actual_rain = rain_mm + (sensor.rain_offset or 0.0) # Update last rain value for next comparison sensor.last_rain_value = rain_mm precipitation = session.query(Precipitation).filter_by(sensor_id=sensor.id).order_by(Precipitation.timestamp.desc()).first() if precipitation is None or precipitation.precipitation != actual_rain: precipitation = Precipitation(timestamp=utc_time, sensor_id=sensor.id, precipitation=actual_rain) session.add(precipitation) else: logger.info(f"{utc_time}: Precipitation value is too high: {rain_mm}") # Commit the changes session.commit() mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) mqttc.on_connect = on_connect mqttc.on_message = on_message # mqttc.username_pw_set("olafn", "weather") mqttc.connect(MQTT_SERVER) mqttc.loop_start() def average_values(data, keys_to_average): if not data: return {}, [] # Filter out the data that contains all the keys period_data = [d for d in data if any(key in d for key in keys_to_average)] # Calculate the average of the period data averages = {} for key in keys_to_average: values = [d.get(key) for d in period_data if d.get(key) is not None] average = sum(values) / len(values) if values else None if average is not None: if key == 'humidity': average = int(round(average, 0)) elif key == 'temperature_C': average = round(average, 1) elif key == 'wind_max_m_s': average = round(average, 1) elif key == 'wind_avg_m_s': average = round(average, 1) elif key == 'wind_dir_deg': average = int(round(average, 0)) elif key == 'rain_mm': average = round(average, 1) elif key == 'pressure_rel': average = int(round(average, 0)) elif key == 'temperature_F': averages['temperature_C'] = round((average - 32) * 5 / 9, 1) continue averages[key] = average else: averages[key] = None # Remove the period data from the original data data = [d for d in data if d not in period_data] return averages, data def debug_sended_data(seen_messages, averages, sensor): global debug if not debug: return print(f'Averages for {sensor}:') for key, value in averages.items(): print(f"{key}: {value}") print(f"Remaining data {sensor}:") print(seen_messages[sensor]) def process_sensor_data(utc_time, sensor_key, data_list, keys_to_average, mqtt_id_override=None): """Helper function to process any sensor data consistently""" averages, remaining = average_values(data_list, keys_to_average) if averages: mqtt_id = mqtt_id_override if mqtt_id_override else sensor_key # Apply sensor-specific temperature corrections temperature_c = averages.get('temperature_C') if temperature_c is not None: if 'Bresser-6in1' in sensor_key: with config_lock: offset = BRESSER_6IN1_TEMP_OFFSET original_temp = temperature_c temperature_c = round(temperature_c - offset, 1) # Round to 1 decimal to avoid floating-point errors logger.info(f"Applied Bresser-6in1 temperature offset: raw={original_temp}°C offset={offset}°C corrected={temperature_c}°C") update_data( utc_time, mqtt_id, temperature_c, averages.get('humidity'), averages.get('pressure_rel'), averages.get('battery_ok', 1) if averages.get('battery_ok') is not None else 1, averages.get('wind_avg_m_s'), averages.get('wind_dir_deg'), averages.get('wind_max_m_s'), averages.get('rain_mm') ) debug_sended_data({sensor_key: remaining}, averages, sensor_key) return remaining def process_mqtt_messages(seen_messages): """Process all received MQTT messages dynamically based on database configuration. No hardcoded sensor checks - all sensors are processed based on what's in the database. """ for sensor, data in seen_messages.items(): if data: # Try to get 'time' from first message, or use None as fallback time_value = data[0].get('time') if isinstance(data[0], dict) else None update_last_transmission_time(sensor, time_value) utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") # Process pool sensors dynamically from cache for node_id, sensors_by_type in pool_sensors_cache.items(): for sensor_type, db_sensor in sensors_by_type.items(): sensor_key = f'pool_{db_sensor.mqtt_id}' if sensor_key in seen_messages and seen_messages[sensor_key]: # Get appropriate keys for this pool sensor type keys = get_sensor_keys(sensor_type) seen_messages[sensor_key] = process_sensor_data(utc_time, sensor_key, seen_messages[sensor_key], keys) # Process all non-pool sensors dynamically from database # Query all non-pool sensors and process any that have received messages all_sensors = session.query(Sensor).filter(Sensor.mqtt_name != 'pool').all() for db_sensor in all_sensors: sensor_key = f"{db_sensor.mqtt_name}_{db_sensor.mqtt_id}" if sensor_key in seen_messages and seen_messages[sensor_key]: # Get the appropriate keys for this sensor type keys = get_sensor_keys(db_sensor.mqtt_name) # Process the sensor data with dynamically determined keys seen_messages[sensor_key] = process_sensor_data( utc_time, sensor_key, seen_messages[sensor_key], keys ) # Seen messages already logged in main loop when devices are active pass # Funktion zum Abrufen und Löschen der JSON-Daten aus SQLite def get_and_delete_json_data(): try: sqlite_cursor.execute('SELECT id, data FROM json_data ORDER BY id ASC') rows = sqlite_cursor.fetchall() json_data_list = [json.loads(row[1]) for row in rows] if rows: ids = [row[0] for row in rows] placeholders = ','.join('?' * len(ids)) sqlite_cursor.execute(f'DELETE FROM json_data WHERE id IN ({placeholders})', ids) sqlite_conn.commit() logger.info(f"Retrieved and deleted {len(rows)} records from SQLite backup") return json_data_list except Exception as e: logger.error(f"Error retrieving from SQLite: {e}") return [] # Funktion zum Synchronisieren der Daten def sync_data(): if is_remote_server_available(): local_data_written = False # Zuerst lokal gespeicherte Daten synchronisieren (SQLite Fallback) local_data = get_and_delete_json_data() for data in local_data: try: if isinstance(data, dict) and 'utc_time' in data: # Einzelner Sensor-Eintrag if " UTC" in str(data.get('utc_time', '')): data['utc_time'] = data['utc_time'].replace(" UTC", "") store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'), data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1), data.get('average_speed'), data.get('direction'), data.get('gust'), data.get('rain_mm')) if not local_data_written: utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB") local_data_written = True except exc.SQLAlchemyError as e: logger.error(f"SQLAlchemyError syncing local data: {e}") session.rollback() # Rette den Datensatz zurück in SQLite save_json_locally(data) except Exception as e: logger.error(f"Error syncing local data: {e}") save_json_locally(data) # Danach neue Daten aus der Warteschlange synchronisieren while new_data_queue: data = new_data_queue.pop(0) try: if isinstance(data, dict) and 'mqtt_id' in data: store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'], data['pressure_rel'], data['battery'], data['average_speed'], data['direction'], data['gust'], data['rain_mm']) except exc.SQLAlchemyError as e: logger.error(f"SQLAlchemyError: {e}") session.rollback() save_json_locally(data) except Exception as e: logger.error(f"Error writing data: {e}") save_json_locally(data) else: # MariaDB nicht verfügbar - speichere in SQLite while new_data_queue: data = new_data_queue.pop(0) save_json_locally(data) def update_last_transmission_time(sensor, time_value): """Update last transmission time for a sensor. Uses provided time or falls back to current time if not available.""" try: if time_value: last_transmission_times[sensor] = datetime.fromisoformat(time_value.replace('Z', '+00:00')) else: last_transmission_times[sensor] = datetime.now(timezone.utc) except (ValueError, AttributeError, TypeError): # Fallback to current time if time parsing fails last_transmission_times[sensor] = datetime.now(timezone.utc) def check_last_transmission_time(): global last_restart_time now = datetime.now(timezone.utc) # Determine recent activity for BME280 (local I2C) and radio sensors (868 MHz) bme_active_recent = any( (now - t.replace(tzinfo=timezone.utc)).total_seconds() <= STALL_WINDOW_SECONDS for k, t in last_transmission_times.items() if k.startswith('Bosch-BME280_') ) radio_active_recent = False radio_nonresponding = 0 # Only evaluate the sensors explicitly listed as radio sensors for sensor in allowed_sensors_for_time: t = last_transmission_times.get(sensor) if not t: # Not seen in this runtime; skip counting to avoid false positives at startup continue t = t.replace(tzinfo=timezone.utc) age = (now - t).total_seconds() if age <= STALL_WINDOW_SECONDS: radio_active_recent = True # Clear failure flag when sensor recovers if sensor_failure_logged.get(sensor): logger.info(f'Sensor {sensor} has recovered') sensor_failure_logged[sensor] = False else: radio_nonresponding += 1 # Only log failure once when it first occurs if not sensor_failure_logged.get(sensor): logger.warning(f'Sensor {sensor} not responding (last seen {age:.0f}s ago)') sensor_failure_logged[sensor] = True # Condition 1: Only BME is active (dongle likely stalled) if bme_active_recent and not radio_active_recent: if not last_restart_time or (now - last_restart_time).total_seconds() >= RESTART_COOLDOWN_SECONDS: logger.warning('Only BME280 active; 868 MHz sensors silent. Restarting Pi...') restart_pi() last_restart_time = now # Only log cooldown every 5 minutes to reduce spam elif (now - last_restart_time).total_seconds() % 300 < 60: remaining = RESTART_COOLDOWN_SECONDS - (now - last_restart_time).total_seconds() logger.info(f'BME-only state continues, restart cooldown: {remaining:.0f}s remaining') return # Condition 2: Fallback – multiple radio sensors not responding if radio_nonresponding >= 2: if not last_restart_time or (now - last_restart_time).total_seconds() >= RESTART_COOLDOWN_SECONDS: logger.warning(f'{radio_nonresponding} sensors not responding. Restarting Pi...') restart_pi() last_restart_time = now # Only log cooldown every 5 minutes to reduce spam elif (now - last_restart_time).total_seconds() % 300 < 60: remaining = RESTART_COOLDOWN_SECONDS - (now - last_restart_time).total_seconds() logger.info(f'{radio_nonresponding} sensors down, restart cooldown: {remaining:.0f}s remaining') def restart_pi(): """Restart the Raspberry Pi remotely via SSH (container runs on NAS).""" logger.warning("Attempting to restart Raspberry Pi due to sensor failure...") if not PI_HOST: logger.error("PI_HOST env var not set; cannot SSH to Pi. Set PI_HOST, PI_USER, SSH_KEY_PATH.") return ssh_target = f"{PI_USER}@{PI_HOST}" ssh_cmd = [ 'ssh', '-i', SSH_KEY_PATH, '-o', 'StrictHostKeyChecking=accept-new', '-o', 'BatchMode=yes', '-o', 'ConnectTimeout=8', ssh_target, 'sudo', '-n', 'reboot' ] try: result = subprocess.run(ssh_cmd, capture_output=True, timeout=15) if result.returncode == 0: logger.info("Pi restart command sent via SSH successfully") return else: logger.error(f"SSH reboot failed (code {result.returncode}): {result.stderr.decode(errors='ignore')}") except Exception as e: logger.error(f"SSH reboot exception: {e}") logger.error("Pi restart via SSH failed. Manual intervention may be required.") if __name__ == '__main__': count = 0 # Start environment file watcher for runtime config changes start_env_watcher() print('start data collection') try: while True: check_last_transmission_time() # if count >= 600: # mqttc.loop_stop() # break # count += 1 # print(count) time.sleep(60) utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") # Only log when devices are actually seen active_devices = [key for key in seen_messages.keys() if seen_messages[key]] if active_devices: logger.info(f"{utc_time}: Seen devices: {active_devices}") # Only log sensor status if there are transmission times to report if last_transmission_times: status_lines = [] for k, v in last_transmission_times.items(): if k not in ignored_sensors_for_time: age_seconds = (datetime.now(timezone.utc) - v.replace(tzinfo=timezone.utc)).total_seconds() status_lines.append(f"{k}: {v.strftime('%H:%M:%S')} ({age_seconds:.0f}s ago)") if status_lines: logger.info(f"{utc_time}: Last seen:\n" + "\n".join(status_lines)) # logger.info(f"{utc_time}: last seen at: {', '.join(f'{k}: {v.strftime('%H:%M:%S')} ({(datetime.now(timezone.utc) - v).total_seconds():.0f} Sekunden ago)' if isinstance(v, datetime) else f'{k}: {v}' for k, v in last_transmission_times.items())}") process_mqtt_messages(seen_messages) if is_remote_server_available(): new_data_queue.append(seen_messages) sync_data() # Data synced - no logging needed for normal operation else: logger.warning(f"{utc_time}: Remote server unavailable - storing batch locally") save_json_locally(seen_messages) except KeyboardInterrupt: logger.info("Shutting down gracefully...") stop_env_watcher() if mqttc: mqttc.loop_stop() raise