commit 1d1036cf2a5ec37b95cd3d654bd6f4ca274b9632 Author: User Date: Sun Dec 21 07:26:30 2025 +0000 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f1b510c --- /dev/null +++ b/.gitignore @@ -0,0 +1,70 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Virtual environments +venv/ +ENV/ +env/ +.venv + +# IDEs +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + +# Jupyter Notebook +.ipynb_checkpoints + +# Environment variables +.env +.env.local + +# Data files +data/ +*.csv +*.xlsx +*.json +*.db +*.sqlite + +# Secrets and sensitive files +secrets/ +*.key +*.pem +id_rsa +id_rsa.pub +known_hosts + +# Docker +docker-compose.override.yml + +# Logs +*.log +logs/ + +# OS +Thumbs.db +.DS_Store diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..060adcf --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.13-slim + +WORKDIR /workspace + +# Install system dependencies for mysql-connector-python and git +RUN apt-get update && apt-get install -y \ + default-libmysqlclient-dev \ + pkg-config \ + git \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Create volume for SQLite database persistence +VOLUME ["/workspace/data"] + +EXPOSE 1883 3306 + +CMD ["python", "datacollector.py"] diff --git a/data_tables.py b/data_tables.py new file mode 100644 index 0000000..f220ce9 --- /dev/null +++ b/data_tables.py @@ -0,0 +1,85 @@ +from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float, ForeignKey +# from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker, relationship, declarative_base +from datetime import datetime, timezone + +# Create a base class for declarative class definitions +Base = declarative_base() + +# Define the Sensor table +class Sensor(Base): + __tablename__ = 'sensors' + id = Column(Integer, primary_key=True) + mqtt_name = Column(String(50), unique=True) + mqtt_id = Column(String(50)) + position = Column(String(50)) + room = Column(String(50)) + battery = Column(Float) + rain_offset = Column(Float, default=0.0) # Cumulative offset for rain sensor resets + last_rain_value = Column(Float, default=0.0) # Last reported rain value (for reset detection) + +# Define the TemperatureInside table +class TemperatureInside(Base): + __tablename__ = 'temperature_inside' + id = Column(Integer, primary_key=True) + sensor_id = Column(Integer, ForeignKey('sensors.id')) + sensor = relationship('Sensor', backref='temperature_inside') + timestamp = Column(DateTime, default=datetime.utcnow) + temperature_c = Column(Float) + +# Define the TemperatureOutside table +class TemperatureOutside(Base): + __tablename__ = 'temperature_outside' + id = Column(Integer, primary_key=True) + sensor_id = Column(Integer, ForeignKey('sensors.id')) + sensor = relationship('Sensor', backref='temperature_outside') + timestamp = Column(DateTime, default=datetime.utcnow) + temperature_c = Column(Float) + + +# Define the HumidityInside table +class HumidityInside(Base): + __tablename__ = 'humidity_inside' + id = Column(Integer, primary_key=True) + sensor_id = Column(Integer, ForeignKey('sensors.id')) + sensor = relationship('Sensor', backref='humidity_inside') + timestamp = Column(DateTime, default=datetime.utcnow) + humidity = Column(Float) + +# Define the HumidityInside table +class HumidityOutside(Base): + __tablename__ = 'humidity_outside' + id = Column(Integer, primary_key=True) + sensor_id = Column(Integer, ForeignKey('sensors.id')) + sensor = relationship('Sensor', backref='humidity_outside') + timestamp = Column(DateTime, default=datetime.utcnow) + humidity = Column(Float) + +# Define the AirPressure table +class AirPressure(Base): + __tablename__ = 'air_pressure' + id = Column(Integer, primary_key=True) + sensor_id = Column(Integer, ForeignKey('sensors.id')) + sensor = relationship('Sensor', backref='air_pressure') + timestamp = Column(DateTime, default=datetime.utcnow) + pressure_rel = Column(Float) + +# Define the wind table +class Wind(Base): + __tablename__ = 'wind' + id = Column(Integer, primary_key=True) + sensor_id = Column(Integer, ForeignKey('sensors.id')) + sensor = relationship('Sensor', backref='wind') + timestamp = Column(DateTime, default=datetime.utcnow) + average_speed = Column(Float) + direction = Column(Float) + gust = Column(Float) + +# Define the precipitation table +class Precipitation(Base): + __tablename__ = 'precipitation' + id = Column(Integer, primary_key=True) + sensor_id = Column(Integer, ForeignKey('sensors.id')) + sensor = relationship('Sensor', backref='precipitation') + timestamp = Column(DateTime, default=datetime.utcnow) + precipitation = Column(Float) \ No newline at end of file diff --git a/datacollector.py b/datacollector.py new file mode 100644 index 0000000..bbd2550 --- /dev/null +++ b/datacollector.py @@ -0,0 +1,986 @@ + +# import debugpy +# debugpy.listen(('0.0.0.0', 5678)) +# print("Waiting for debugger attach") +# debugpy.wait_for_client() + +import json +import time +import paho.mqtt.client as mqtt +import sqlite3 +import requests +import logging +import struct +import subprocess +import os +import threading +from logging.handlers import RotatingFileHandler +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +from sqlalchemy import create_engine, exc +# from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker +from datetime import datetime, timezone +from data_tables import Sensor, TemperatureInside,TemperatureOutside, HumidityOutside, HumidityInside, AirPressure, Wind, Precipitation + +# Configure logging with environment-based log level +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() +logger = logging.getLogger(__name__) +logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO)) +handler = RotatingFileHandler('datacollector.log', maxBytes=100000000, backupCount=1) +handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) +logger.addHandler(handler) + +# Malformed hex logging (controlled via environment). Enable with LOG_MALFORMED_HEX=true|1|yes|on +LOG_MALFORMED_HEX = os.getenv("LOG_MALFORMED_HEX", "false").strip().lower() in ("1", "true", "yes", "on") +malformed_hex_logger = logging.getLogger('malformed_hex') +malformed_hex_logger.setLevel(logging.INFO) +if LOG_MALFORMED_HEX: + malformed_hex_handler = RotatingFileHandler('mal-hex.log', maxBytes=10000000, backupCount=1) + malformed_hex_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) + malformed_hex_logger.addHandler(malformed_hex_handler) + +MQTT_SERVER = "192.168.43.102" +MQTT_TOPIC_PREFIX = "rtl_433/DietPi/events" +KNOWN_DEVICES = ["inFactory-TH_25", "Oregon-THGR122N_233", "Oregon-v1_0", "Bresser-6in1_-2021550075", "Bosch-BME280_1", "pool"] + +# Remote Pi management (container runs on NAS) +PI_HOST = os.getenv("PI_HOST") +PI_USER = os.getenv("PI_USER", "pi") +SSH_KEY_PATH = os.getenv("SSH_KEY_PATH", "/workspace/.ssh/id_rsa") + +seen_messages = {} +new_data_queue = [] +last_transmission_times = {} +ignored_sensors_for_time = ['Bosch-BME280_1', 'Oregon-v1_0', 'inFactory-TH_252', 'Oregon-THGR122N_233'] +allowed_sensors_for_time = ['Bresser-6in1_-2021550075', 'LaCrosse-TX35DTHIT_20', 'LaCrosse-TX35DTHIT_28', 'LaCrosse-TX35DTHIT_52', 'LaCrosse-TX35DTHIT_31'] +debug = False + +# Track sensor failure states to avoid repeated logging +sensor_failure_logged = {} + +# Watchdog configuration +# If only BME280 is active within this window and radio sensors are silent, restart +STALL_WINDOW_SECONDS = int(os.getenv("STALL_WINDOW_SECONDS", "300")) # 5 minutes +RESTART_COOLDOWN_SECONDS = int(os.getenv("RESTART_COOLDOWN_SECONDS", "3600")) # 1 hour +last_restart_time = None + +# Load sensor-specific temperature offsets from environment +BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0")) +config_lock = threading.Lock() + +# File watcher for runtime configuration changes +class EnvFileWatcher(FileSystemEventHandler): + """Watch for changes to .env file and reload configuration""" + def on_modified(self, event): + if event.src_path.endswith('.env'): + reload_config() + +def reload_config(): + """Reload configuration values from .env file""" + global BRESSER_6IN1_TEMP_OFFSET + try: + with config_lock: + old_offset = BRESSER_6IN1_TEMP_OFFSET + BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0")) + if old_offset != BRESSER_6IN1_TEMP_OFFSET: + logger.info(f"Configuration reloaded: BRESSER_6IN1_TEMP_OFFSET changed from {old_offset}°C to {BRESSER_6IN1_TEMP_OFFSET}°C") + except Exception as e: + logger.error(f"Error reloading configuration from .env: {e}") + +# Initialize file watcher +env_observer = None + +def start_env_watcher(): + """Start watching .env file for changes""" + global env_observer + try: + env_observer = Observer() + event_handler = EnvFileWatcher() + env_observer.schedule(event_handler, path='.', recursive=False) + env_observer.start() + logger.info("Environment file watcher started") + except Exception as e: + logger.warning(f"Failed to start environment file watcher: {e}") + +def stop_env_watcher(): + """Stop watching .env file""" + global env_observer + if env_observer: + env_observer.stop() + env_observer.join(timeout=5) + logger.info("Environment file watcher stopped") + +# Verbindung zur SQLite-Datenbank herstellen (Fallback wenn MariaDB ausfällt) +sqlite_db_path = 'data/local_backup.db' +os.makedirs('data', exist_ok=True) +sqlite_conn = sqlite3.connect(sqlite_db_path, check_same_thread=False) +sqlite_cursor = sqlite_conn.cursor() + +# Tabelle für Anfragen erstellen, falls sie nicht existiert +sqlite_cursor.execute(''' +CREATE TABLE IF NOT EXISTS json_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + data TEXT NOT NULL, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP +) +''') +sqlite_conn.commit() + +logger.info(f"SQLite database initialized at: {sqlite_db_path}") + +# Create a connection to the database +sql_engine = create_engine('mysql+mysqlconnector://weatherdata:cfCU$swM!HfK82%*@192.168.43.102/weatherdata') + +# Create a configured "Session" class +Session = sessionmaker(bind=sql_engine) + +# Create a session to interact with the database +session = Session() + +def parse_radio_frame(byte_data): + """Parse radio frame with structure: + Preamble: 0xAA repeated (often 3x) + Sync: 0x2D (optionally followed by 0xD4) + Header (4 bytes): payload_len (u8), dest_id (u8), sender_id (u8), ctl (u8) + Data: payload_len bytes + CRC: 2 bytes (polynomial unknown) – returned but not verified here + Returns dict with extracted 'data' and header fields, or None if not found/invalid. + """ + if not byte_data: + return None + + try: + # Find sync 0x2D followed by networkId (second byte) + sync_index = byte_data.find(b"\x2d") + if sync_index == -1: + return None + + if sync_index + 1 >= len(byte_data): + # No room for networkId byte + return None + + network_id = byte_data[sync_index + 1] + sync_len = 2 + + header_start = sync_index + sync_len + if header_start + 4 > len(byte_data): + return None + + payload_len, dest_id, sender_id, ctl = struct.unpack_from(' len(byte_data): + # Not enough bytes for data + 2-byte CRC + return None + + data = byte_data[data_start:data_end] + crc_bytes = byte_data[data_end:data_end + 2] + + return { + 'data': data, + 'payload_len': payload_len, + 'dest_id': dest_id, + 'sender_id': sender_id, + 'ctl': ctl, + 'network_id': network_id, + 'crc_bytes': crc_bytes, + 'sync_index': sync_index, + 'sync_len': sync_len, + } + except Exception: + return None + +def refresh_sensor_cache(): + """Refresh the sensor cache from database""" + global sensor_ids, sensor_names, sensor_by_name_room + sensors = session.query(Sensor).all() + sensor_ids = [f'{sensor.mqtt_name}_{sensor.mqtt_id}' for sensor in sensors] + sensor_names = [sensor.mqtt_name for sensor in sensors] + + # Create a mapping of (mqtt_name_base, room) -> sensor for battery change detection + sensor_by_name_room = {} + for sensor in sensors: + # Extract base name (before last underscore if it contains the ID) + base_name = sensor.mqtt_name.rsplit('_', 1)[0] if '_' in sensor.mqtt_name else sensor.mqtt_name + if sensor.room: + sensor_by_name_room[(base_name, sensor.room)] = sensor + + return sensors + +sensors = refresh_sensor_cache() +sensor_by_name_room = {} + +warte = '' + +# Funktion zum Überprüfen der Remote-Server-Verbindung +def is_remote_server_available(): + try: + response = requests.get('http://192.168.43.102') + return response.status_code == 200 + except requests.ConnectionError: + return False + # return True + +# Funktion zum Speichern der JSON-Daten in SQLite +def save_json_locally(json_dict): + try: + json_str = json.dumps(json_dict) + sqlite_cursor.execute('INSERT INTO json_data (data) VALUES (?)', (json_str,)) + sqlite_conn.commit() + logger.info(f"Data saved to local SQLite database") + except Exception as e: + logger.error(f"Error saving to SQLite: {e}") + + +# The callback for when the client receives a CONNACK response from the server. +def on_connect(client, userdata, flags, reason_code, properties): + # Subscribing in on_connect() means that if we lose the connection and + # reconnect then subscriptions will be renewed. + client.subscribe(MQTT_TOPIC_PREFIX) + print(f"Connected with result code {reason_code}") + +# The callback for when a PUBLISH message is received from the server. +def on_message(client, userdata, msg): + if msg.topic.startswith(MQTT_TOPIC_PREFIX[:-2]): + d = json.loads(msg.payload.decode()) + model = d['model'] + if model in sensor_names: + if model == 'pool': + test_value = d['rows'][0]['data'] + if not test_value.startswith('aaaaaa'): + return + else: + # Decode the hex-encoded binary payload and scan for a plausible struct + hex_data = test_value[6:] # Remove 'aaaaaa' prefix + try: + byte_data = bytes.fromhex(hex_data) + except ValueError: + if LOG_MALFORMED_HEX: + malformed_hex_logger.info(f"Invalid hex: {hex_data}") + print(f"Invalid hex data: {hex_data}") + print(d) + warte = '' + return + + # Attempt to parse the radio frame first (preamble/sync/header/data/crc) + frame = parse_radio_frame(byte_data) + if frame and frame.get('data'): + print( + f"Parsed radio frame: netId={frame.get('network_id')}, len={frame['payload_len']}, " + f"dest={frame['dest_id']}, sender={frame['sender_id']}, ctl={frame['ctl']}, crc={frame['crc_bytes'].hex()}" + ) + candidate_bytes = frame['data'] + else: + # Fallback: Drop optional leading 0xAA bytes from hardware and use raw stream + if LOG_MALFORMED_HEX and not frame: + malformed_hex_logger.info(f"Frame parse failed: {byte_data.hex()}") + tmp = byte_data + while tmp.startswith(b"\xaa"): + tmp = tmp[1:] + candidate_bytes = tmp + + print(f"Raw bytes ({len(byte_data)}): {byte_data.hex()}") + print(f"Candidate payload for app decode ({len(candidate_bytes)}): {candidate_bytes.hex()}") + + # Decode payload struct with magic bytes and CRC + # struct: magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc + PAYLOAD_SIZE = 15 + MAGIC1 = 0x42 + MAGIC2 = 0x99 + + def calculate_crc8(data): + """Simple XOR checksum""" + c = 0 + for byte in data: + c ^= byte + return c + + # Scan for magic bytes within candidate payload + magic_offset = -1 + for i in range(len(candidate_bytes) - 1): + if candidate_bytes[i] == MAGIC1 and candidate_bytes[i+1] == MAGIC2: + magic_offset = i + break + + if magic_offset == -1: + if LOG_MALFORMED_HEX: + malformed_hex_logger.info(f"Magic bytes not found: {candidate_bytes.hex()}") + print(f"Magic bytes {MAGIC1:02x} {MAGIC2:02x} not found in payload") + elif len(candidate_bytes) - magic_offset < PAYLOAD_SIZE: + print(f"Payload too short after magic bytes: {len(candidate_bytes) - magic_offset} bytes (need {PAYLOAD_SIZE})") + else: + try: + # Extract payload starting from magic bytes + payload_data = candidate_bytes[magic_offset:magic_offset + PAYLOAD_SIZE] + + # Unpack: BBBBHhhhHB = magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc + magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack(' id=11 for BME + 'battery_ok': 1, + 'temperature_C': parsed_struct['t_bme_c'], + 'humidity': parsed_struct['humidity'], + 'pressure_rel': parsed_struct['pressure_hpa'], + 'mic': 'CRC' if parsed_struct['crc_valid'] else 'CHECKSUM' + } + + # Message 2: DS18B20 sensor (temp only) + ds_msg = { + 'time': original_time, + 'model': 'pool', + 'id': nodeId * 10 + 2, # e.g., nodeId=1 -> id=12 for DS + 'battery_ok': 1, + 'temperature_C': parsed_struct['t_ds_c'], + 'mic': 'CRC' if parsed_struct['crc_valid'] else 'CHECKSUM' + } + + # Process both messages through the existing logic + for msg_data in [bme_msg, ds_msg]: + print(f"Received message from {msg_data['model']}: \n {msg_data}") + sensor_id = msg_data['id'] + sensor_key = f"{msg_data['model']}_{sensor_id}" + + if sensor_key not in seen_messages.keys(): + seen_messages[sensor_key] = [msg_data] + else: + seen_messages[sensor_key].append(msg_data) + + except struct.error as e: + print(f"Struct unpack error: {e}") + + warte = '' + return + print(f"Received message from {model}: \n {d}") + id = d['id'] + if model == 'Bresser-6in1': + if d['flags'] == 0: + if 'rain_mm' in d.keys(): + del d['rain_mm'] + sensor_key = f'{model}_{id}' + if sensor_key not in seen_messages.keys(): + seen_messages[sensor_key] = [d] + else: + seen_messages[sensor_key].append(d) + + +# Define a function to update the data in the database +def update_data(utc_time, mqtt_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm): + values = { + "utc_time": utc_time, + "mqtt_id": mqtt_id, + "temperature_c": temperature_c, + "humidity": humidity, + "pressure_rel": pressure_rel, + "battery": battery, + "average_speed": average_speed, + "direction": direction, + "gust": gust, + "rain_mm": rain_mm + } + if is_remote_server_available(): + new_data_queue.append(values) + sync_data() + # Data sent - no logging needed for normal operation + else: + logger.warning(f"{utc_time}: Remote server unavailable - storing locally for {mqtt_id}") + save_json_locally(values) + + +def get_or_update_sensor(mqtt_name, mqtt_id): + """ + Get sensor by mqtt_name and mqtt_id. If not found, try to find by base mqtt_name, + then update the mqtt_id (battery change scenario). + Uses timing heuristic: if exactly one sensor of the type stopped transmitting recently, + assume that's the one with battery change. + """ + # Try to find exact match first + sensor = session.query(Sensor).filter_by(mqtt_name=mqtt_name, mqtt_id=mqtt_id).first() + + if sensor: + return sensor + + # If not found, this might be a battery change + # For pool sensors, require exact match + if mqtt_name == 'pool': + logger.warning(f"Pool sensor {mqtt_name}_{mqtt_id} not found in database. Please add it manually.") + return None + + # Extract base name (e.g., "LaCrosse-TX35DTHIT" from "LaCrosse-TX35DTHIT_28") + base_name = mqtt_name.rsplit('_', 1)[0] if '_' in mqtt_name else mqtt_name + + # Find all sensors with matching base name + candidates = session.query(Sensor).filter( + Sensor.mqtt_name.like(f'{base_name}%') + ).all() + + if not candidates: + logger.warning(f"Sensor {mqtt_name}_{mqtt_id} not found in database. Please add it manually.") + return None + + # If only one candidate, update it + if len(candidates) == 1: + old_id = candidates[0].mqtt_id + old_name = candidates[0].mqtt_name + candidates[0].mqtt_name = mqtt_name + candidates[0].mqtt_id = mqtt_id + session.commit() + logger.info(f"Updated sensor in room '{candidates[0].room}': {old_name}_{old_id} -> {mqtt_name}_{mqtt_id} (battery change detected)") + + # Refresh the sensor cache to reflect the updated ID + refresh_sensor_cache() + + return candidates[0] + + # Multiple candidates - use timing heuristic + # Check which sensors have stopped transmitting recently (within last 10 minutes) + current_time = datetime.now(timezone.utc) + recent_silent = [] + + for candidate in candidates: + sensor_key = f"{candidate.mqtt_name}_{candidate.mqtt_id}" + if sensor_key in last_transmission_times: + last_seen = last_transmission_times[sensor_key] + time_since_last = (current_time - last_seen.replace(tzinfo=timezone.utc)).total_seconds() + # Consider sensors that stopped in last 10 minutes but were active before + if 60 < time_since_last < 600: # Between 1 and 10 minutes + recent_silent.append((candidate, time_since_last)) + logger.info(f"Candidate {sensor_key} in room '{candidate.room}' last seen {time_since_last:.0f}s ago") + + if len(recent_silent) == 1: + # Exactly one sensor went silent recently - assume battery change + sensor_to_update, time_ago = recent_silent[0] + old_id = sensor_to_update.mqtt_id + old_name = sensor_to_update.mqtt_name + old_key = f"{old_name}_{old_id}" + + sensor_to_update.mqtt_name = mqtt_name + sensor_to_update.mqtt_id = mqtt_id + session.commit() + + logger.info(f"AUTO-UPDATE: Sensor in room '{sensor_to_update.room}' (last seen {time_ago:.0f}s ago)") + logger.info(f" Changed: {old_name}_{old_id} -> {mqtt_name}_{mqtt_id} (battery change detected)") + + # Update last_transmission_times key + if old_key in last_transmission_times: + del last_transmission_times[old_key] + + # Refresh the sensor cache + refresh_sensor_cache() + + return sensor_to_update + + # Multiple or no recent silent sensors - need manual intervention + logger.warning(f"Cannot auto-update mqtt_id for {mqtt_name}_{mqtt_id}. Found {len(candidates)} candidates:") + for candidate in candidates: + sensor_key = f"{candidate.mqtt_name}_{candidate.mqtt_id}" + if sensor_key in last_transmission_times: + last_seen = last_transmission_times[sensor_key] + time_ago = (current_time - last_seen.replace(tzinfo=timezone.utc)).total_seconds() + logger.warning(f" - {sensor_key} in room '{candidate.room}' (last seen {time_ago:.0f}s ago)") + else: + logger.warning(f" - {sensor_key} in room '{candidate.room}' (never seen)") + + if len(recent_silent) == 0: + logger.warning(f"No sensors stopped recently (1-10 min). Cannot determine which sensor changed battery.") + else: + logger.warning(f"{len(recent_silent)} sensors stopped recently. Cannot determine which one changed battery.") + + logger.warning(f"Please update manually: UPDATE sensors SET mqtt_name='{mqtt_name}', mqtt_id='{mqtt_id}' WHERE mqtt_name='[old_name]' AND mqtt_id='[old_id]' AND room='[room]';") + return None + + +def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm): + mqtt_name, mqtt_id = mqtt_name_id.split('_', 1) # Use maxsplit=1 to handle IDs with underscores + + # Get the sensor object from the database (with auto-update for battery changes) + sensor = get_or_update_sensor(mqtt_name, mqtt_id) + + if not sensor: + logger.error(f"Cannot store data for {mqtt_name_id} - sensor not found in database") + return + + position = sensor.position + + # Update the sensor's battery level + sensor.battery = battery + + # Update the temperature data + if temperature_c is not None: + if position == "inside": + temperature_inside = session.query(TemperatureInside).filter_by(sensor_id=sensor.id).order_by(TemperatureInside.timestamp.desc()).first() + if temperature_inside is None or temperature_inside.temperature_c != temperature_c: + temperature_inside = TemperatureInside(timestamp=utc_time, sensor_id=sensor.id, temperature_c=temperature_c) + session.add(temperature_inside) + elif position == "outside": + temperature_outside = session.query(TemperatureOutside).filter_by(sensor_id=sensor.id).order_by(TemperatureOutside.timestamp.desc()).first() + if temperature_outside is None or temperature_outside.temperature_c != temperature_c: + temperature_outside = TemperatureOutside(timestamp=utc_time, sensor_id=sensor.id, temperature_c=temperature_c) + session.add(temperature_outside) + + # Update the humidity data + if humidity is not None: + if position == "inside": + humidity_inside = session.query(HumidityInside).filter_by(sensor_id=sensor.id).order_by(HumidityInside.timestamp.desc()).first() + if humidity_inside is None or humidity_inside.humidity != humidity: + humidity_inside = HumidityInside(timestamp=utc_time, sensor_id=sensor.id, humidity=humidity) + session.add(humidity_inside) + elif position == "outside": + humidity_outside = session.query(HumidityOutside).filter_by(sensor_id=sensor.id).order_by(HumidityOutside.timestamp.desc()).first() + if humidity_outside is None or humidity_outside.humidity != humidity: + humidity_outside = HumidityOutside(timestamp=utc_time, sensor_id=sensor.id, humidity=humidity) + session.add(humidity_outside) + + # Update the air pressure data + if pressure_rel is not None: + air_pressure = session.query(AirPressure).filter_by(sensor_id=sensor.id).order_by(AirPressure.timestamp.desc()).first() + if air_pressure is None or air_pressure.pressure_rel != pressure_rel: + air_pressure = AirPressure(timestamp=utc_time, sensor_id=sensor.id, pressure_rel=pressure_rel) + session.add(air_pressure) + + if average_speed is not None or gust is not None or direction is not None: + wind_value = session.query(Wind).filter_by(sensor_id=sensor.id).order_by(Wind.timestamp.desc()).first() + if wind_value is None or (average_speed is not None and wind_value.average_speed != average_speed) or (gust is not None and wind_value.gust != gust) or (direction is not None and wind_value.direction != direction): + wind_value = Wind(timestamp=utc_time, sensor_id=sensor.id, average_speed=average_speed, direction=direction, gust=gust) + session.add(wind_value) + + # Update Precipitation data with cumulative offset handling + if rain_mm is not None: + if rain_mm <= 1000: + # Check for rain sensor reset (battery change) + # If current value is significantly less than last value, it's a reset + if sensor.last_rain_value is not None and sensor.last_rain_value > 0: + if rain_mm < sensor.last_rain_value - 1.0: # Dropped by more than 1mm (reset detected) + # Battery was changed, add last value to offset + sensor.rain_offset = (sensor.rain_offset or 0.0) + sensor.last_rain_value + logger.info(f"Rain sensor reset detected for {mqtt_name_id}. Last value: {sensor.last_rain_value}mm, New offset: {sensor.rain_offset}mm") + + # Calculate actual cumulative rain including offset + actual_rain = rain_mm + (sensor.rain_offset or 0.0) + + # Update last rain value for next comparison + sensor.last_rain_value = rain_mm + + precipitation = session.query(Precipitation).filter_by(sensor_id=sensor.id).order_by(Precipitation.timestamp.desc()).first() + if precipitation is None or precipitation.precipitation != actual_rain: + precipitation = Precipitation(timestamp=utc_time, sensor_id=sensor.id, precipitation=actual_rain) + session.add(precipitation) + else: + logger.info(f"{utc_time}: Precipitation value is too high: {rain_mm}") + + # Commit the changes + session.commit() + + +mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) +mqttc.on_connect = on_connect +mqttc.on_message = on_message +# mqttc.username_pw_set("olafn", "weather") + +mqttc.connect(MQTT_SERVER) + + +mqttc.loop_start() + +def average_values(data, keys_to_average): + if not data: + return {}, [] + + # Filter out the data that contains all the keys + period_data = [d for d in data if any(key in d for key in keys_to_average)] + + # Calculate the average of the period data + averages = {} + for key in keys_to_average: + values = [d.get(key) for d in period_data if d.get(key) is not None] + average = sum(values) / len(values) if values else None + if average is not None: + if key == 'humidity': + average = int(round(average, 0)) + elif key == 'temperature_C': + average = round(average, 1) + elif key == 'wind_max_m_s': + average = round(average, 1) + elif key == 'wind_avg_m_s': + average = round(average, 1) + elif key == 'wind_dir_deg': + average = int(round(average, 0)) + elif key == 'rain_mm': + average = round(average, 1) + elif key == 'pressure_rel': + average = int(round(average, 0)) + elif key == 'temperature_F': + averages['temperature_C'] = round((average - 32) * 5 / 9, 1) + continue + averages[key] = average + else: + averages[key] = None + + # Remove the period data from the original data + data = [d for d in data if d not in period_data] + + return averages, data + + +def debug_sended_data(seen_messages, averages, sensor): + global debug + if not debug: + return + + print(f'Averages for {sensor}:') + for key, value in averages.items(): + print(f"{key}: {value}") + + print(f"Remaining data {sensor}:") + print(seen_messages[sensor]) + +def process_sensor_data(utc_time, sensor_key, data_list, keys_to_average, mqtt_id_override=None): + """Helper function to process any sensor data consistently""" + averages, remaining = average_values(data_list, keys_to_average) + if averages: + mqtt_id = mqtt_id_override if mqtt_id_override else sensor_key + + # Apply sensor-specific temperature corrections + temperature_c = averages.get('temperature_C') + if temperature_c is not None: + if 'Bresser-6in1' in sensor_key: + with config_lock: + offset = BRESSER_6IN1_TEMP_OFFSET + temperature_c = temperature_c - offset + logger.debug(f"Applied Bresser-6in1 temperature offset: {offset}°C, corrected value: {temperature_c}°C") + + update_data( + utc_time, + mqtt_id, + temperature_c, + averages.get('humidity'), + averages.get('pressure_rel'), + averages.get('battery_ok', 1) if averages.get('battery_ok') is not None else 1, + averages.get('wind_avg_m_s'), + averages.get('wind_dir_deg'), + averages.get('wind_max_m_s'), + averages.get('rain_mm') + ) + debug_sended_data({sensor_key: remaining}, averages, sensor_key) + return remaining + +def process_mqtt_messages(seen_messages): + for sensor, data in seen_messages.items(): + if data: + # Try to get 'time' from first message, or use None as fallback + time_value = data[0].get('time') if isinstance(data[0], dict) else None + update_last_transmission_time(sensor, time_value) + + utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + + # Process pool sensors (BME280 and DS18B20) + pool_sensors = [k for k in seen_messages.keys() if k.startswith('pool_')] + for pool_key in pool_sensors: + if seen_messages[pool_key]: + sensor_id = pool_key.split('_')[1] + if sensor_id.endswith('1'): # BME280 (nodeId*10+1) + keys = ['temperature_C', 'humidity', 'pressure_rel'] + else: # DS18B20 (nodeId*10+2) + keys = ['temperature_C'] + + seen_messages[pool_key] = process_sensor_data(utc_time, pool_key, seen_messages[pool_key], keys) + + if 'Bresser-6in1_-2021550075' in seen_messages.keys(): + seen_messages['Bresser-6in1_-2021550075'] = process_sensor_data( + utc_time, 'Bresser-6in1_-2021550075', seen_messages['Bresser-6in1_-2021550075'], + ['temperature_C', 'humidity', 'wind_max_m_s', 'wind_avg_m_s', 'wind_dir_deg', 'rain_mm', 'battery_ok'] + ) + + if 'Bosch-BME280_1' in seen_messages.keys(): + seen_messages['Bosch-BME280_1'] = process_sensor_data( + utc_time, 'Bosch-BME280_1', seen_messages['Bosch-BME280_1'], + ['temperature_C', 'humidity', 'pressure_rel'] + ) + + if 'Oregon-v1_0' in seen_messages.keys(): + seen_messages['Oregon-v1_0'] = process_sensor_data( + utc_time, 'Oregon-v1_0', seen_messages['Oregon-v1_0'], + ['temperature_C', 'battery_ok'] + ) + + if 'Oregon-THGR122N_233' in seen_messages.keys(): + seen_messages['Oregon-THGR122N_233'] = process_sensor_data( + utc_time, 'Oregon-THGR122N_233', seen_messages['Oregon-THGR122N_233'], + ['temperature_C', 'humidity', 'battery_ok'] + ) + + if 'LaCrosse-TX35DTHIT_52' in seen_messages.keys(): + seen_messages['LaCrosse-TX35DTHIT_52'] = process_sensor_data( + utc_time, 'LaCrosse-TX35DTHIT_52', seen_messages['LaCrosse-TX35DTHIT_52'], + ['temperature_C', 'humidity', 'battery_ok'] + ) + + if 'LaCrosse-TX35DTHIT_20' in seen_messages.keys(): + seen_messages['LaCrosse-TX35DTHIT_20'] = process_sensor_data( + utc_time, 'LaCrosse-TX35DTHIT_20', seen_messages['LaCrosse-TX35DTHIT_20'], + ['temperature_C', 'humidity', 'battery_ok'] + ) + + if 'LaCrosse-TX35DTHIT_28' in seen_messages.keys(): + seen_messages['LaCrosse-TX35DTHIT_28'] = process_sensor_data( + utc_time, 'LaCrosse-TX35DTHIT_28', seen_messages['LaCrosse-TX35DTHIT_28'], + ['temperature_C', 'humidity', 'battery_ok'] + ) + + if 'LaCrosse-TX35DTHIT_31' in seen_messages.keys(): + seen_messages['LaCrosse-TX35DTHIT_31'] = process_sensor_data( + utc_time, 'LaCrosse-TX35DTHIT_31', seen_messages['LaCrosse-TX35DTHIT_31'], + ['temperature_C', 'humidity', 'battery_ok'] + ) + + # Seen messages already logged in main loop when devices are active + pass + +# Funktion zum Abrufen und Löschen der JSON-Daten aus SQLite +def get_and_delete_json_data(): + try: + sqlite_cursor.execute('SELECT id, data FROM json_data ORDER BY id ASC') + rows = sqlite_cursor.fetchall() + json_data_list = [json.loads(row[1]) for row in rows] + + if rows: + ids = [row[0] for row in rows] + placeholders = ','.join('?' * len(ids)) + sqlite_cursor.execute(f'DELETE FROM json_data WHERE id IN ({placeholders})', ids) + sqlite_conn.commit() + logger.info(f"Retrieved and deleted {len(rows)} records from SQLite backup") + + return json_data_list + except Exception as e: + logger.error(f"Error retrieving from SQLite: {e}") + return [] + +# Funktion zum Synchronisieren der Daten +def sync_data(): + if is_remote_server_available(): + local_data_written = False + + # Zuerst lokal gespeicherte Daten synchronisieren (SQLite Fallback) + local_data = get_and_delete_json_data() + for data in local_data: + try: + if isinstance(data, dict) and 'utc_time' in data: + # Einzelner Sensor-Eintrag + if " UTC" in str(data.get('utc_time', '')): + data['utc_time'] = data['utc_time'].replace(" UTC", "") + + store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'), + data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1), + data.get('average_speed'), data.get('direction'), + data.get('gust'), data.get('rain_mm')) + if not local_data_written: + utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") + logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB") + local_data_written = True + except exc.SQLAlchemyError as e: + logger.error(f"SQLAlchemyError syncing local data: {e}") + session.rollback() + # Rette den Datensatz zurück in SQLite + save_json_locally(data) + except Exception as e: + logger.error(f"Error syncing local data: {e}") + save_json_locally(data) + + # Danach neue Daten aus der Warteschlange synchronisieren + while new_data_queue: + data = new_data_queue.pop(0) + try: + if isinstance(data, dict) and 'mqtt_id' in data: + store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'], + data['pressure_rel'], data['battery'], data['average_speed'], data['direction'], + data['gust'], data['rain_mm']) + except exc.SQLAlchemyError as e: + logger.error(f"SQLAlchemyError: {e}") + session.rollback() + save_json_locally(data) + except Exception as e: + logger.error(f"Error writing data: {e}") + save_json_locally(data) + else: + # MariaDB nicht verfügbar - speichere in SQLite + while new_data_queue: + data = new_data_queue.pop(0) + save_json_locally(data) + +def update_last_transmission_time(sensor, time_value): + """Update last transmission time for a sensor. Uses provided time or falls back to current time if not available.""" + try: + if time_value: + last_transmission_times[sensor] = datetime.fromisoformat(time_value.replace('Z', '+00:00')) + else: + last_transmission_times[sensor] = datetime.now(timezone.utc) + except (ValueError, AttributeError, TypeError): + # Fallback to current time if time parsing fails + last_transmission_times[sensor] = datetime.now(timezone.utc) + +def check_last_transmission_time(): + global last_restart_time + now = datetime.now(timezone.utc) + + # Determine recent activity for BME280 (local I2C) and radio sensors (868 MHz) + bme_active_recent = any( + (now - t.replace(tzinfo=timezone.utc)).total_seconds() <= STALL_WINDOW_SECONDS + for k, t in last_transmission_times.items() if k.startswith('Bosch-BME280_') + ) + + radio_active_recent = False + radio_nonresponding = 0 + + # Only evaluate the sensors explicitly listed as radio sensors + for sensor in allowed_sensors_for_time: + t = last_transmission_times.get(sensor) + if not t: + # Not seen in this runtime; skip counting to avoid false positives at startup + continue + t = t.replace(tzinfo=timezone.utc) + age = (now - t).total_seconds() + if age <= STALL_WINDOW_SECONDS: + radio_active_recent = True + # Clear failure flag when sensor recovers + if sensor_failure_logged.get(sensor): + logger.info(f'Sensor {sensor} has recovered') + sensor_failure_logged[sensor] = False + else: + radio_nonresponding += 1 + # Only log failure once when it first occurs + if not sensor_failure_logged.get(sensor): + logger.warning(f'Sensor {sensor} not responding (last seen {age:.0f}s ago)') + sensor_failure_logged[sensor] = True + + # Condition 1: Only BME is active (dongle likely stalled) + if bme_active_recent and not radio_active_recent: + if not last_restart_time or (now - last_restart_time).total_seconds() >= RESTART_COOLDOWN_SECONDS: + logger.warning('Only BME280 active; 868 MHz sensors silent. Restarting Pi...') + restart_pi() + last_restart_time = now + # Only log cooldown every 5 minutes to reduce spam + elif (now - last_restart_time).total_seconds() % 300 < 60: + remaining = RESTART_COOLDOWN_SECONDS - (now - last_restart_time).total_seconds() + logger.info(f'BME-only state continues, restart cooldown: {remaining:.0f}s remaining') + return + + # Condition 2: Fallback – multiple radio sensors not responding + if radio_nonresponding >= 2: + if not last_restart_time or (now - last_restart_time).total_seconds() >= RESTART_COOLDOWN_SECONDS: + logger.warning(f'{radio_nonresponding} sensors not responding. Restarting Pi...') + restart_pi() + last_restart_time = now + # Only log cooldown every 5 minutes to reduce spam + elif (now - last_restart_time).total_seconds() % 300 < 60: + remaining = RESTART_COOLDOWN_SECONDS - (now - last_restart_time).total_seconds() + logger.info(f'{radio_nonresponding} sensors down, restart cooldown: {remaining:.0f}s remaining') +def restart_pi(): + """Restart the Raspberry Pi remotely via SSH (container runs on NAS).""" + logger.warning("Attempting to restart Raspberry Pi due to sensor failure...") + + if not PI_HOST: + logger.error("PI_HOST env var not set; cannot SSH to Pi. Set PI_HOST, PI_USER, SSH_KEY_PATH.") + return + + ssh_target = f"{PI_USER}@{PI_HOST}" + ssh_cmd = [ + 'ssh', + '-i', SSH_KEY_PATH, + '-o', 'StrictHostKeyChecking=accept-new', + '-o', 'BatchMode=yes', + '-o', 'ConnectTimeout=8', + ssh_target, + 'sudo', '-n', 'reboot' + ] + + try: + result = subprocess.run(ssh_cmd, capture_output=True, timeout=15) + if result.returncode == 0: + logger.info("Pi restart command sent via SSH successfully") + return + else: + logger.error(f"SSH reboot failed (code {result.returncode}): {result.stderr.decode(errors='ignore')}") + except Exception as e: + logger.error(f"SSH reboot exception: {e}") + + logger.error("Pi restart via SSH failed. Manual intervention may be required.") + +if __name__ == '__main__': + count = 0 + + # Start environment file watcher for runtime config changes + start_env_watcher() + + print('start data collection') + try: + while True: + check_last_transmission_time() + # if count >= 600: + # mqttc.loop_stop() + # break + + # count += 1 + # print(count) + time.sleep(60) + utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") + + # Only log when devices are actually seen + active_devices = [key for key in seen_messages.keys() if seen_messages[key]] + if active_devices: + logger.info(f"{utc_time}: Seen devices: {active_devices}") + + # Only log sensor status if there are transmission times to report + if last_transmission_times: + status_lines = [] + for k, v in last_transmission_times.items(): + if k not in ignored_sensors_for_time: + age_seconds = (datetime.now(timezone.utc) - v.replace(tzinfo=timezone.utc)).total_seconds() + status_lines.append(f"{k}: {v.strftime('%H:%M:%S')} ({age_seconds:.0f}s ago)") + + if status_lines: + logger.info(f"{utc_time}: Last seen:\n" + "\n".join(status_lines)) + # logger.info(f"{utc_time}: last seen at: {', '.join(f'{k}: {v.strftime('%H:%M:%S')} ({(datetime.now(timezone.utc) - v).total_seconds():.0f} Sekunden ago)' if isinstance(v, datetime) else f'{k}: {v}' for k, v in last_transmission_times.items())}") + process_mqtt_messages(seen_messages) + if is_remote_server_available(): + new_data_queue.append(seen_messages) + sync_data() + # Data synced - no logging needed for normal operation + else: + logger.warning(f"{utc_time}: Remote server unavailable - storing batch locally") + save_json_locally(seen_messages) + except KeyboardInterrupt: + logger.info("Shutting down gracefully...") + stop_env_watcher() + if mqttc: + mqttc.loop_stop() + raise + + diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml new file mode 100644 index 0000000..d78563e --- /dev/null +++ b/docker-compose.dev.yml @@ -0,0 +1,17 @@ +version: '3.8' + +services: + data-collector: + # Override volumes for local dev; host workspace is mounted to /workspace. + volumes: + - ./:/workspace + - ./data:/workspace/data + - ./secrets/id_rsa:/workspace/.ssh/id_rsa:ro + - ./secrets/known_hosts:/workspace/.ssh/known_hosts:ro + environment: + # Safe defaults for local dev + - PI_HOST=${PI_HOST:-localhost} + - PI_USER=${PI_USER:-pi} + - SSH_KEY_PATH=/workspace/.ssh/id_rsa + - STALL_WINDOW_SECONDS=${STALL_WINDOW_SECONDS:-300} + - RESTART_COOLDOWN_SECONDS=${RESTART_COOLDOWN_SECONDS:-3600} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..cead945 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,27 @@ +version: '3.8' +services: + data-collector: + build: + context: . + dockerfile: Dockerfile + environment: + - PI_HOST=${PI_HOST} + - PI_USER=${PI_USER:-pi} + - SSH_KEY_PATH=/workspace/.ssh/id_rsa + - STALL_WINDOW_SECONDS=${STALL_WINDOW_SECONDS:-300} + - RESTART_COOLDOWN_SECONDS=${RESTART_COOLDOWN_SECONDS:-3600} + volumes: + - /volume1/docker/weatherstation:/workspace + - /volume1/docker/weatherstation/data:/workspace/data + - /volume1/docker/weatherstation/secrets/id_rsa:/workspace/.ssh/id_rsa:ro + - /volume1/docker/weatherstation/secrets/known_hosts:/workspace/.ssh/known_hosts:ro + restart: unless-stopped + command: python datacollector.py + networks: + - data-net + stdin_open: true + tty: true + +networks: + data-net: + driver: bridge diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c491c35 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +paho-mqtt>=2.1.0 +mysql-connector-python>=9.0.0 +requests>=2.25.0 +sqlalchemy>=1.4.0 +watchdog>=4.0.0 \ No newline at end of file