Files
weatherstation-datacollector/datacollector.py
olaf f55c1fe6f1 Pool sensor v2: VCC monitoring, database resilience, receiver improvements
- Added voltage monitoring table and storage pipeline
- Extended pool payload to 17 bytes with VCC field (protocol v2)
- Improved database connection pool resilience (reduced pool size, aggressive recycling, pool disposal on failure)
- Added environment variable support for database configuration
- Fixed receiver MQTT deprecation warning (CallbackAPIVersion.VERSION2)
- Silenced excessive RSSI status logging in receiver
- Added reset flag tracking and reporting
- Updated Docker compose with DB config and log rotation limits
2026-01-25 11:25:15 +00:00

1502 lines
63 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import time
import paho.mqtt.client as mqtt
import sqlite3
import requests
import logging
import struct
import subprocess
import os
import sys
import threading
from typing import Optional
from dotenv import load_dotenv
from logging.handlers import RotatingFileHandler
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from sqlalchemy import create_engine, exc, text
from sqlalchemy.engine import URL
# from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timezone
from data_tables import Sensor, TemperatureInside,TemperatureOutside, HumidityOutside, HumidityInside, AirPressure, Wind, Precipitation, Voltage, Base
# Load .env file so environment variables from .env are available at startup
load_dotenv()
# Strip quotes from password if they were included (defensive)
DB_PASSWORD_RAW = os.getenv("DB_PASSWORD", "cfCUswMHfK82!")
DB_PASSWORD = DB_PASSWORD_RAW.strip("'\"") # Remove surrounding quotes if present
# Configure logging with environment-based log level
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logger = logging.getLogger(__name__)
logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
handler = RotatingFileHandler('datacollector.log', maxBytes=5242880, backupCount=10)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
# Log loaded configuration values at startup (after logger setup)
try:
logger.info(
f"Startup config: LOG_LEVEL={LOG_LEVEL}, "
f"BRESSER_6IN1_TEMP_OFFSET={float(os.getenv('BRESSER_6IN1_TEMP_OFFSET', '0'))}°C, "
f"BATTERY_CHANGE_MIN_SILENCE={os.getenv('BATTERY_CHANGE_MIN_SILENCE', '60')}s, "
f"BATTERY_CHANGE_MAX_SILENCE={os.getenv('BATTERY_CHANGE_MAX_SILENCE', '600')}s"
)
except Exception:
pass
# Malformed hex logging (controlled via environment). Enable with LOG_MALFORMED_HEX=true|1|yes|on
LOG_MALFORMED_HEX = os.getenv("LOG_MALFORMED_HEX", "false").strip().lower() in ("1", "true", "yes", "on")
malformed_hex_logger = logging.getLogger('malformed_hex')
malformed_hex_logger.setLevel(logging.INFO)
if LOG_MALFORMED_HEX:
malformed_hex_handler = RotatingFileHandler('mal-hex.log', maxBytes=10000000, backupCount=1)
malformed_hex_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
malformed_hex_logger.addHandler(malformed_hex_handler)
MQTT_SERVER = "192.168.43.102"
MQTT_TOPIC_PREFIX = "rtl_433/DietPi/events"
KNOWN_DEVICES = ["inFactory-TH_25", "Oregon-THGR122N_233", "Oregon-v1_0", "Bresser-6in1_-2021550075", "Bosch-BME280_1", "pool"]
# Remote Pi management (container runs on NAS)
PI_HOST = os.getenv("PI_HOST")
PI_USER = os.getenv("PI_USER", "pi")
SSH_KEY_PATH = os.getenv("SSH_KEY_PATH", "/workspace/.ssh/id_rsa")
seen_messages = {}
new_data_queue = []
last_transmission_times = {}
ignored_sensors_for_time = ['Bosch-BME280_1', 'Oregon-v1_0', 'inFactory-TH_252', 'Oregon-THGR122N_233']
allowed_sensors_for_time = ['Bresser-6in1_-2021550075', 'LaCrosse-TX35DTHIT_20', 'LaCrosse-TX35DTHIT_28', 'LaCrosse-TX35DTHIT_52', 'LaCrosse-TX35DTHIT_31']
debug = False
# Safe defaults for DB-derived caches
sensor_ids = []
sensor_names = []
sensor_by_name_room = {}
pool_sensors_cache = {}
pool_reset_flags_seen = {}
# Track sensor failure states to avoid repeated logging
sensor_failure_logged = {}
# Watchdog configuration
# If only BME280 is active within this window and radio sensors are silent, restart
STALL_WINDOW_SECONDS = int(os.getenv("STALL_WINDOW_SECONDS", "300")) # 5 minutes
RESTART_COOLDOWN_SECONDS = int(os.getenv("RESTART_COOLDOWN_SECONDS", "3600")) # 1 hour
last_restart_time = None
# Load sensor-specific temperature offsets from environment
BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0"))
# Load battery change detection timing from environment
BATTERY_CHANGE_MIN_SILENCE = int(os.getenv("BATTERY_CHANGE_MIN_SILENCE", "60")) # seconds
BATTERY_CHANGE_MAX_SILENCE = int(os.getenv("BATTERY_CHANGE_MAX_SILENCE", "600")) # seconds
config_lock = threading.Lock()
# File watcher for runtime configuration changes
class EnvFileWatcher(FileSystemEventHandler):
"""Watch for changes to .env file and reload configuration"""
def on_modified(self, event):
if event.src_path.endswith('.env'):
reload_config()
def reload_config():
"""Reload configuration values from .env file"""
global BRESSER_6IN1_TEMP_OFFSET, BATTERY_CHANGE_MIN_SILENCE, BATTERY_CHANGE_MAX_SILENCE
try:
# Re-read .env so changes on disk propagate into os.environ
load_dotenv(override=True)
with config_lock:
# Reload temperature offset
old_offset = BRESSER_6IN1_TEMP_OFFSET
BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0"))
if old_offset != BRESSER_6IN1_TEMP_OFFSET:
logger.info(f"Configuration reloaded: BRESSER_6IN1_TEMP_OFFSET changed from {old_offset}°C to {BRESSER_6IN1_TEMP_OFFSET}°C")
# Reload battery change detection timing
old_min = BATTERY_CHANGE_MIN_SILENCE
old_max = BATTERY_CHANGE_MAX_SILENCE
BATTERY_CHANGE_MIN_SILENCE = int(os.getenv("BATTERY_CHANGE_MIN_SILENCE", "60"))
BATTERY_CHANGE_MAX_SILENCE = int(os.getenv("BATTERY_CHANGE_MAX_SILENCE", "600"))
if old_min != BATTERY_CHANGE_MIN_SILENCE or old_max != BATTERY_CHANGE_MAX_SILENCE:
logger.info(f"Configuration reloaded: BATTERY_CHANGE timing changed from {old_min}-{old_max}s to {BATTERY_CHANGE_MIN_SILENCE}-{BATTERY_CHANGE_MAX_SILENCE}s")
except Exception as e:
logger.error(f"Error reloading configuration from .env: {e}")
# Initialize file watcher
env_observer = None
def start_env_watcher():
"""Start watching .env file for changes"""
global env_observer
try:
env_observer = Observer()
event_handler = EnvFileWatcher()
env_observer.schedule(event_handler, path='.', recursive=False)
env_observer.start()
logger.info("Environment file watcher started")
except Exception as e:
logger.warning(f"Failed to start environment file watcher: {e}")
def stop_env_watcher():
"""Stop watching .env file"""
global env_observer
if env_observer:
env_observer.stop()
env_observer.join(timeout=5)
logger.info("Environment file watcher stopped")
# Verbindung zur SQLite-Datenbank herstellen (Fallback wenn MariaDB ausfällt)
sqlite_db_path = 'data/local_backup.db'
os.makedirs('data', exist_ok=True)
sqlite_conn = sqlite3.connect(sqlite_db_path, check_same_thread=False)
sqlite_cursor = sqlite_conn.cursor()
# Tabelle für Anfragen erstellen, falls sie nicht existiert
sqlite_cursor.execute('''
CREATE TABLE IF NOT EXISTS json_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
data TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
sqlite_conn.commit()
logger.info(f"SQLite database initialized at: {sqlite_db_path}")
# Database connection configuration (read from environment variables or use defaults)
DB_HOST = os.getenv("DB_HOST", "192.168.43.102")
DB_PORT = int(os.getenv("DB_PORT", "3306"))
DB_USER = os.getenv("DB_USER", "weatherdata")
DB_PASSWORD_RAW = os.getenv("DB_PASSWORD", "cfCUswMHfK82!")
DB_PASSWORD = DB_PASSWORD_RAW.strip("'\"") # Remove any surrounding quotes
DB_NAME = os.getenv("DB_NAME", "weatherdata")
DB_CONNECT_TIMEOUT = int(os.getenv("DB_CONNECT_TIMEOUT", "5"))
# Log database configuration at startup
logger.info(f"DB config: host={DB_HOST}:{DB_PORT}, user={DB_USER}, db={DB_NAME}")
if DB_PASSWORD:
logger.debug(f"DB_PASSWORD length: {len(DB_PASSWORD)}, chars: {[c for c in DB_PASSWORD]}")
# Build connection URL (password will be passed separately in connect_args for proper handling)
db_url = f"mysql+mysqlconnector://{DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
# Create engine with connection pool resilience
# Key settings to handle intermittent network issues:
# - pool_pre_ping: Verify connection is alive before using
# - pool_recycle: Aggressively recycle to avoid stale connections
# - pool_size: Conservative pool to avoid resource exhaustion
# - max_overflow: Limited to prevent connection thrashing
# - pool_reset_on_return: Use "none" to avoid failed rollback on dead connections
sql_engine = create_engine(
db_url,
connect_args={
"user": DB_USER,
"password": DB_PASSWORD, # Pass password separately to avoid URL encoding issues
"host": DB_HOST,
"port": DB_PORT,
"database": DB_NAME,
"connection_timeout": DB_CONNECT_TIMEOUT,
"autocommit": False, # Let SQLAlchemy manage transactions properly
"raise_on_warnings": False, # Suppress MySQL warnings that clutter logs
},
pool_pre_ping=True, # Test connection before using it (detects stale connections)
pool_recycle=300, # Recycle connections every 5 minutes (aggressive, handles server timeouts)
pool_timeout=10, # Wait up to 10 seconds to get a connection from pool
pool_size=3, # Keep only 3 steady connections (was 5)
max_overflow=5, # Allow only 5 overflow connections (was 10, prevents thrashing)
pool_reset_on_return="none", # Avoid rollback on return to prevent "Lost connection" errors
echo=False, # Set to True for SQL logging if debugging
echo_pool=False, # Set to True for connection pool logging if debugging
)
# Ensure tables exist (safe: creates only missing ones)
try:
Base.metadata.create_all(sql_engine)
logger.info("Verified/created database tables")
except Exception as e:
logger.warning(f"Could not auto-create tables: {e}")
# DB availability tracking for resilient mode
db_available = False
last_db_check = 0.0
DB_RETRY_SECONDS = 30 # Retry DB connection every 30 seconds if down
# Create a configured "Session" class
Session = sessionmaker(bind=sql_engine)
# Create a session to interact with the database
session = Session()
def parse_radio_frame(byte_data):
"""Parse radio frame with structure:
Preamble: 0xAA repeated (often 3x)
Sync: 0x2D (optionally followed by 0xD4)
Header (4 bytes): payload_len (u8), dest_id (u8), sender_id (u8), ctl (u8)
Data: payload_len bytes
CRC: 2 bytes (polynomial unknown) returned but not verified here
Returns dict with extracted 'data' and header fields, or None if not found/invalid.
"""
if not byte_data:
return None
try:
sync_index = byte_data.find(b"\x2d")
if sync_index == -1:
return None
if sync_index + 1 >= len(byte_data):
# No room for networkId byte
return None
network_id = byte_data[sync_index + 1]
sync_len = 2
header_start = sync_index + sync_len
if header_start + 4 > len(byte_data):
return None
payload_len, dest_id, sender_id, ctl = struct.unpack_from('<BBBB', byte_data, header_start)
data_start = header_start + 4
data_end = data_start + payload_len
if data_end + 2 > len(byte_data):
# Not enough bytes for data + 2-byte CRC
return None
data = byte_data[data_start:data_end]
crc_bytes = byte_data[data_end:data_end + 2]
return {
'data': data,
'payload_len': payload_len,
'dest_id': dest_id,
'sender_id': sender_id,
'ctl': ctl,
'network_id': network_id,
'crc_bytes': crc_bytes,
'sync_index': sync_index,
'sync_len': sync_len,
}
except Exception:
return None
POOL_PAYLOAD_FORMATS = [
{
"size": 17, # New payload with VCC
"struct": "<BBBBHhhHHHB",
"includes_vcc": True,
"label": "v2_vcc",
},
{
"size": 15, # Legacy payload without VCC
"struct": "<BBBBHhhHHB",
"includes_vcc": False,
"label": "v1_legacy",
},
]
MIN_POOL_PAYLOAD_SIZE = min(fmt["size"] for fmt in POOL_PAYLOAD_FORMATS)
MAGIC1 = 0x42
MAGIC2 = 0x99
# Reset flags use AVR MCUSR bit mapping in the version byte's upper nibble
RESET_FLAG_MAP = [
(0x1, "PORF (power-on)"),
(0x2, "EXTRF (external reset)"),
(0x4, "BORF (brown-out)"),
(0x8, "WDRF (watchdog)"),
]
def crc8_xor(data: bytes) -> int:
"""Simple XOR checksum used by the pool payload."""
c = 0
for b in data:
c ^= b
return c
def parse_version_and_reset_flags(version_byte: int):
"""Return protocol version (low nibble) and decoded reset flags (high nibble)."""
protocol_version = version_byte & 0x0F
reset_flags = (version_byte >> 4) & 0x0F
reset_causes = [desc for bit, desc in RESET_FLAG_MAP if reset_flags & bit]
return protocol_version, reset_flags, reset_causes
def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = None):
"""Scan a byte stream for a plausible pool payload (v1 legacy + v2 with VCC).
Slides a window across the stream for each supported payload size, validates CRC,
performs plausibility checks, and scores candidates. Returns the best decoded dict
or None. VCC is decoded for the new format but ignored when storing for now.
"""
# Drop leading preamble (0xAA) if present
while candidate_bytes.startswith(b"\xaa"):
candidate_bytes = candidate_bytes[1:]
best = None
best_score = float('-inf')
for offset in range(0, len(candidate_bytes) - MIN_POOL_PAYLOAD_SIZE + 1):
for fmt in POOL_PAYLOAD_FORMATS:
if offset + fmt["size"] > len(candidate_bytes):
continue
chunk = candidate_bytes[offset:offset + fmt["size"]]
try:
if fmt["includes_vcc"]:
(
magic1,
magic2,
version_byte,
nodeId,
seq,
t_ds10,
t_bme10,
hum10,
pres1,
vcc_mv,
crc_received,
) = struct.unpack(fmt["struct"], chunk)
else:
(
magic1,
magic2,
version_byte,
nodeId,
seq,
t_ds10,
t_bme10,
hum10,
pres1,
crc_received,
) = struct.unpack(fmt["struct"], chunk)
vcc_mv = None
except struct.error:
continue
crc_calculated = crc8_xor(chunk[:-1])
crc_valid = crc_calculated == crc_received
protocol_version, reset_flags, reset_causes = parse_version_and_reset_flags(version_byte)
# Accept protocol version 1 (legacy) and 2 (future) to tolerate FW bumps
if protocol_version not in (1, 2) or nodeId != 1:
continue
# Plausibility checks (unit scaled)
if not (-300 <= t_ds10 <= 600): # -30.0 to 60.0°C
continue
if not (-300 <= t_bme10 <= 600):
continue
if not (0 <= hum10 <= 1000): # 0.0100.0%
continue
if not (8000 <= pres1 <= 11000): # 800.01100.0 hPa
continue
if vcc_mv is not None and not (1000 <= vcc_mv <= 5000):
continue
score = 0
if magic1 == MAGIC1 and magic2 == MAGIC2:
score += 2
if expected_seq is not None and seq == expected_seq:
score += 1
if fmt["includes_vcc"]:
score += 0.5 # Prefer new payload when both are valid
if crc_valid:
score += 3
else:
score -= 3 # Keep but penalize invalid CRC
# CRC already validated; reward shorter offset to prefer first valid
score -= offset * 0.001
if score > best_score:
best_score = score
best = {
"offset": offset,
"magic_ok": magic1 == MAGIC1 and magic2 == MAGIC2,
"version": protocol_version,
"version_byte": version_byte,
"reset_flags": reset_flags,
"reset_causes": reset_causes,
"nodeId": nodeId,
"sequence": seq,
"t_ds_c": t_ds10 / 10.0,
"t_bme_c": t_bme10 / 10.0,
"humidity": hum10 / 10.0,
"pressure_hpa": pres1 / 10.0,
"vcc_mv": vcc_mv,
"crc_valid": crc_valid,
"crc_expected": crc_calculated,
"format": fmt["label"],
}
return best
def extract_pool_candidate_bytes(raw_bytes: bytes):
"""Return payload bytes for pool sensors, handling legacy and new radio framing.
Tries legacy framed parsing first, then strips a 0xAA preamble and optional
sync bytes (0x39 0x14) used by the new hardware. Falls back to the stripped
raw stream so old payloads continue to work.
"""
if not raw_bytes:
return b"", {"source": "empty"}
# Legacy framed format (rarely used but kept for compatibility)
frame = parse_radio_frame(raw_bytes)
if frame and frame.get('data'):
return frame['data'], {"source": "legacy_frame", "network_id": frame.get('network_id')}
trimmed = raw_bytes
while trimmed.startswith(b"\xaa"):
trimmed = trimmed[1:]
# New hardware emits sync bytes; support both 0x39 0x14 and 0xD3 0x91 variants
sync_patterns = [b"\x39\x14", b"\xd3\x91"]
for sync in sync_patterns:
idx = trimmed.find(sync)
if idx != -1 and idx + len(sync) < len(trimmed):
return trimmed[idx + len(sync):], {"source": "sync", "offset": idx, "sync": sync.hex()}
# Fallback: use stripped raw stream (old hardware behaviour)
return trimmed, {"source": "raw"}
def refresh_sensor_cache():
"""Refresh the sensor cache from database"""
global sensor_ids, sensor_names, sensor_by_name_room, pool_sensors_cache
if not ensure_db_connection():
return []
sensors = session.query(Sensor).all()
sensor_ids = [f'{sensor.mqtt_name}_{sensor.mqtt_id}' for sensor in sensors]
sensor_names = list(set([sensor.mqtt_name for sensor in sensors])) # Unique model names
# Create a mapping of (mqtt_name_base, room) -> sensor for battery change detection
sensor_by_name_room = {}
for sensor in sensors:
# Extract base name (before last underscore if it contains the ID)
base_name = sensor.mqtt_name.rsplit('_', 1)[0] if '_' in sensor.mqtt_name else sensor.mqtt_name
if sensor.room:
sensor_by_name_room[(base_name, sensor.room)] = sensor
# Cache pool sensors by node_id for dynamic processing
pool_sensors_cache = {}
for sensor in sensors:
if sensor.mqtt_name == 'pool' and sensor.node_id is not None:
if sensor.node_id not in pool_sensors_cache:
pool_sensors_cache[sensor.node_id] = {}
# Map by sensor_type to easily identify BME vs DS
pool_sensors_cache[sensor.node_id][sensor.sensor_type] = sensor
return sensors
def build_sensor_lists_from_db():
"""Build sensor configuration lists from database instead of hardcoding.
Populates KNOWN_DEVICES, allowed_sensors_for_time, and ignored_sensors_for_time.
Call after refresh_sensor_cache() and whenever sensors are added/removed.
"""
global KNOWN_DEVICES, allowed_sensors_for_time, ignored_sensors_for_time
if not ensure_db_connection():
return
sensors = session.query(Sensor).all()
# Build KNOWN_DEVICES - unique model names
KNOWN_DEVICES = list(set([sensor.mqtt_name for sensor in sensors]))
logger.info(f"Built KNOWN_DEVICES from database: {KNOWN_DEVICES}")
# Build allowed_sensors_for_time - sensors that should be monitored for health
# These are the radio sensors that transmit regularly
allowed_types = ['Bresser-6in1', 'LaCrosse-TX35DTHIT']
allowed_sensors_for_time = [
f"{sensor.mqtt_name}_{sensor.mqtt_id}"
for sensor in sensors
if any(sensor_type in sensor.mqtt_name for sensor_type in allowed_types)
]
logger.info(f"Built allowed_sensors_for_time from database: {allowed_sensors_for_time}")
# Build ignored_sensors_for_time - sensors that don't need health monitoring
# These are local/wired sensors (BME280 I2C, Oregon sensors, etc.)
ignored_types = ['Bosch-BME280', 'Oregon-v1', 'Oregon-THGR122N', 'inFactory-TH']
ignored_sensors_for_time = [
f"{sensor.mqtt_name}_{sensor.mqtt_id}"
for sensor in sensors
if any(sensor_type in sensor.mqtt_name for sensor_type in ignored_types)
]
logger.info(f"Built ignored_sensors_for_time from database: {ignored_sensors_for_time}")
def get_sensor_keys(sensor_type):
"""Return the list of MQTT keys to average for each sensor type.
This ensures we only extract fields that the sensor actually provides.
"""
keys_map = {
'Bresser-6in1': ['temperature_C', 'humidity', 'wind_max_m_s', 'wind_avg_m_s', 'wind_dir_deg', 'rain_mm', 'battery_ok'],
'Bosch-BME280': ['temperature_C', 'humidity', 'pressure_rel'],
'LaCrosse-TX35DTHIT': ['temperature_C', 'humidity', 'battery_ok'],
'Oregon-v1': ['temperature_C', 'battery_ok'],
'Oregon-THGR122N': ['temperature_C', 'humidity', 'battery_ok'],
'inFactory-TH': ['temperature_C', 'humidity', 'battery_ok'],
'BME280': ['temperature_C', 'humidity', 'pressure_rel', 'vcc_mv'], # Pool BME280 includes VCC
'DS18B20': ['temperature_C'], # Pool DS18B20
}
# Fallback for unknown types - try to match by substring
for key_name, keys in keys_map.items():
if key_name in sensor_type:
return keys
# Ultimate fallback - temperature only
logger.warning(f"Unknown sensor type '{sensor_type}' - defaulting to temperature only")
return ['temperature_C']
sensors = []
warte = ''
# Funktion zum Überprüfen der Remote-Server-Verbindung
def is_remote_server_available():
# If DB is up, we consider the remote server available for writes.
return db_available
# Funktion zum Speichern der JSON-Daten in SQLite
def save_json_locally(json_dict):
try:
json_str = json.dumps(json_dict)
sqlite_cursor.execute('INSERT INTO json_data (data) VALUES (?)', (json_str,))
sqlite_conn.commit()
logger.info(f"Data saved to local SQLite database")
except Exception as e:
logger.error(f"Error saving to SQLite: {e}")
def ensure_db_connection(force: bool = False) -> bool:
"""Try to establish DB connectivity with throttling. Returns True if DB is reachable.
This function tests the connection and reinitializes the session if needed.
On failure, it disposes the pool to force reconnection next attempt.
"""
global db_available, last_db_check, session
now = time.time()
if not force and (now - last_db_check) < DB_RETRY_SECONDS:
return db_available
last_db_check = now
try:
# Test connection with explicit timeout
with sql_engine.connect() as conn:
conn.execute(text('SELECT 1'))
if not db_available:
logger.info(f"Database reachable again at {DB_HOST}:{DB_PORT}")
db_available = True
# Recreate session to ensure fresh connections
session = Session()
except exc.OperationalError as e:
# Connection failed - dispose pool to force fresh connections on next attempt
sql_engine.dispose()
if db_available:
logger.warning(f"Lost database connectivity: {e}")
else:
logger.info(f"Database still unreachable: {e}")
db_available = False
except Exception as e:
sql_engine.dispose()
if db_available:
logger.warning(f"Unexpected database error: {type(e).__name__}: {e}")
else:
logger.info(f"Database still unreachable: {type(e).__name__}: {e}")
db_available = False
return db_available
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, reason_code, properties):
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(MQTT_TOPIC_PREFIX)
logger.info(f"Connected with result code {reason_code}")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
if msg.topic.startswith(MQTT_TOPIC_PREFIX[:-2]):
d = json.loads(msg.payload.decode())
model = d['model']
if model in sensor_names:
if model == 'pool':
# Extract hex data from rows array
if not d.get('rows') or len(d['rows']) == 0:
if LOG_MALFORMED_HEX:
malformed_hex_logger.info(f"Pool message missing rows: {d}")
return
hex_data = d['rows'][0].get('data')
if not hex_data:
if LOG_MALFORMED_HEX:
malformed_hex_logger.info(f"Pool message missing data: {d}")
return
# Strip any length of 'aa' preamble (old/new formats)
while hex_data.startswith('aa'):
hex_data = hex_data[2:]
# Some rtl_433 captures occasionally lose the final nibble; drop it to keep hex even-length
if len(hex_data) % 2 == 1:
if LOG_MALFORMED_HEX:
malformed_hex_logger.info(f"Trimming odd-length hex (dropped last nibble): {hex_data}")
hex_data = hex_data[:-1]
try:
byte_data = bytes.fromhex(hex_data)
except ValueError:
if LOG_MALFORMED_HEX:
malformed_hex_logger.info(f"Invalid hex: {hex_data}")
logger.debug(f"Invalid hex data: {hex_data}")
logger.debug(f"Full message: {d}")
warte = ''
return
candidate_bytes, candidate_meta = extract_pool_candidate_bytes(byte_data)
if LOG_MALFORMED_HEX and candidate_meta.get("source") == "raw":
malformed_hex_logger.info(f"Pool using raw bytes (no sync match): {byte_data.hex()}")
logger.debug(f"Raw bytes ({len(byte_data)}): {byte_data.hex()}")
logger.debug(
f"Candidate payload ({len(candidate_bytes)}), source={candidate_meta.get('source')}: "
f"{candidate_bytes.hex()}"
)
# Decode payload by sliding-window detection (magic bytes may be missing)
expected_seq = None
if new_data_queue:
try:
expected_seq = (new_data_queue[-1].get("sequence") or 0) + 1
except Exception:
expected_seq = None
decoded = decode_pool_payload(candidate_bytes, expected_seq=expected_seq)
if not decoded:
if LOG_MALFORMED_HEX:
malformed_hex_logger.info(f"No valid payload found: {candidate_bytes.hex()}")
logger.debug("No valid pool payload found in candidate bytes")
warte = ''
return
logger.debug(
f"Decoded payload at offset {decoded['offset']} ({decoded.get('format','')})"
f": seq={decoded['sequence']}, t_ds={decoded['t_ds_c']}C, "
f"t_bme={decoded['t_bme_c']}C, hum={decoded['humidity']}%, "
f"pres={decoded['pressure_hpa']}hPa, vcc={decoded.get('vcc_mv','n/a')}mV, "
f"magic_ok={decoded['magic_ok']}, crc_valid={decoded['crc_valid']}, "
f"crc_exp={decoded.get('crc_expected')}, reset_flags=0x{decoded['reset_flags']:X}"
)
reset_flags = decoded.get('reset_flags', 0)
reset_causes = decoded.get('reset_causes', [])
last_flags = pool_reset_flags_seen.get(decoded['nodeId'])
if last_flags != reset_flags:
causes_text = ", ".join(reset_causes) if reset_causes else "none set"
reset_msg = (
f"Pool node {decoded['nodeId']} MCU reset flags 0x{reset_flags:X}: {causes_text}"
)
if reset_flags & 0x0C: # BORF or WDRF -> warn
logger.warning(reset_msg)
else:
logger.info(reset_msg)
pool_reset_flags_seen[decoded['nodeId']] = reset_flags
original_time = d.get('time', datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S'))
bme_msg = {
'time': original_time,
'model': 'pool',
'id': decoded['nodeId'] * 10 + 1,
'battery_ok': 1,
'temperature_C': decoded['t_bme_c'],
'humidity': decoded['humidity'],
'pressure_rel': decoded['pressure_hpa'],
'vcc_mv': decoded.get('vcc_mv'),
'mic': 'CRC'
}
ds_msg = {
'time': original_time,
'model': 'pool',
'id': decoded['nodeId'] * 10 + 2,
'battery_ok': 1,
'temperature_C': decoded['t_ds_c'],
'mic': 'CRC'
}
for msg_data in [bme_msg, ds_msg]:
logger.debug(f"Received message from {msg_data['model']}: {msg_data}")
sensor_id = msg_data['id']
sensor_key = f"{msg_data['model']}_{sensor_id}"
if sensor_key not in seen_messages.keys():
seen_messages[sensor_key] = [msg_data]
else:
seen_messages[sensor_key].append(msg_data)
warte = ''
return
else:
# Process non-pool sensors
logger.debug(f"Received message from {model}: {d}")
id = d['id']
if model == 'Bresser-6in1':
if d['flags'] == 0:
if 'rain_mm' in d.keys():
del d['rain_mm']
sensor_key = f'{model}_{id}'
if sensor_key not in seen_messages.keys():
seen_messages[sensor_key] = [d]
else:
seen_messages[sensor_key].append(d)
# Define a function to update the data in the database
def update_data(utc_time, mqtt_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm, vcc_mv=None):
values = {
"utc_time": utc_time,
"mqtt_id": mqtt_id,
"temperature_c": temperature_c,
"humidity": humidity,
"pressure_rel": pressure_rel,
"battery": battery,
"average_speed": average_speed,
"direction": direction,
"gust": gust,
"rain_mm": rain_mm,
"vcc_mv": vcc_mv,
}
if ensure_db_connection():
new_data_queue.append(values)
sync_data()
# Data sent - no logging needed for normal operation
else:
logger.warning(f"{utc_time}: Database unavailable - storing locally for {mqtt_id}")
save_json_locally(values)
def get_or_update_sensor(mqtt_name, mqtt_id):
"""
Get sensor by mqtt_name and mqtt_id. If not found, try to find by base mqtt_name,
then update the mqtt_id (battery change scenario).
Uses timing heuristic: if exactly one sensor of the type stopped transmitting recently,
assume that's the one with battery change.
For pool sensors: handles nodeId changes by updating both sensor_type rows if detected.
"""
# Try to find exact match first
sensor = session.query(Sensor).filter_by(mqtt_name=mqtt_name, mqtt_id=mqtt_id).first()
if sensor:
return sensor
# Special handling for pool sensors - check for nodeId changes
if mqtt_name == 'pool':
# Try to find any pool sensor and check if nodeId changed
mqtt_id_int = int(mqtt_id)
old_node_id = mqtt_id_int // 10 # Extract nodeId from mqtt_id
# Look for sensors with this node_id
pool_sensor = session.query(Sensor).filter_by(mqtt_name='pool', node_id=old_node_id).first()
if pool_sensor and pool_sensor.mqtt_id != mqtt_id:
# NodeId changed - update all sensors for this node
logger.warning(f"Pool sensor nodeId change detected: nodeId={old_node_id}, old mqtt_id={pool_sensor.mqtt_id}, new mqtt_id={mqtt_id}")
handle_pool_nodeid_change(old_node_id, mqtt_id_int)
# Try again after update
sensor = session.query(Sensor).filter_by(mqtt_name=mqtt_name, mqtt_id=mqtt_id).first()
if sensor:
return sensor
logger.warning(f"Pool sensor {mqtt_name}_{mqtt_id} not found in database. Please add it manually.")
return None
# For non-pool sensors: Extract base name and try to find by model
base_name = mqtt_name.rsplit('_', 1)[0] if '_' in mqtt_name else mqtt_name
# Find all sensors with matching base name
candidates = session.query(Sensor).filter(
Sensor.mqtt_name.like(f'{base_name}%')
).all()
if not candidates:
logger.warning(f"Sensor {mqtt_name}_{mqtt_id} not found in database. Please add it manually.")
return None
# If only one candidate, update it
if len(candidates) == 1:
old_id = candidates[0].mqtt_id
old_name = candidates[0].mqtt_name
candidates[0].mqtt_name = mqtt_name
candidates[0].mqtt_id = mqtt_id
session.commit()
logger.info(f"Updated sensor in room '{candidates[0].room}': {old_name}_{old_id} -> {mqtt_name}_{mqtt_id} (battery change detected)")
# Refresh the sensor cache to reflect the updated ID
refresh_sensor_cache()
return candidates[0]
# Multiple candidates - use timing heuristic
# Check which sensors have stopped transmitting recently (within last 10 minutes)
current_time = datetime.now(timezone.utc)
recent_silent = []
for candidate in candidates:
sensor_key = f"{candidate.mqtt_name}_{candidate.mqtt_id}"
if sensor_key in last_transmission_times:
last_seen = last_transmission_times[sensor_key]
time_since_last = (current_time - last_seen.replace(tzinfo=timezone.utc)).total_seconds()
# Consider sensors that stopped within configured timing window
with config_lock:
min_silence = BATTERY_CHANGE_MIN_SILENCE
max_silence = BATTERY_CHANGE_MAX_SILENCE
if min_silence < time_since_last < max_silence:
recent_silent.append((candidate, time_since_last))
logger.info(f"Candidate {sensor_key} in room '{candidate.room}' last seen {time_since_last:.0f}s ago (battery change window: {min_silence}-{max_silence}s)")
if len(recent_silent) == 1:
# Exactly one sensor went silent recently - assume battery change
sensor_to_update, time_ago = recent_silent[0]
old_id = sensor_to_update.mqtt_id
old_name = sensor_to_update.mqtt_name
old_key = f"{old_name}_{old_id}"
sensor_to_update.mqtt_name = mqtt_name
sensor_to_update.mqtt_id = mqtt_id
session.commit()
logger.info(f"AUTO-UPDATE: Sensor in room '{sensor_to_update.room}' (last seen {time_ago:.0f}s ago)")
logger.info(f" Changed: {old_name}_{old_id} -> {mqtt_name}_{mqtt_id} (battery change detected)")
# Update last_transmission_times key
if old_key in last_transmission_times:
del last_transmission_times[old_key]
# Refresh the sensor cache
refresh_sensor_cache()
return sensor_to_update
# Multiple or no recent silent sensors - need manual intervention
logger.warning(f"Cannot auto-update mqtt_id for {mqtt_name}_{mqtt_id}. Found {len(candidates)} candidates:")
for candidate in candidates:
sensor_key = f"{candidate.mqtt_name}_{candidate.mqtt_id}"
if sensor_key in last_transmission_times:
last_seen = last_transmission_times[sensor_key]
time_ago = (current_time - last_seen.replace(tzinfo=timezone.utc)).total_seconds()
logger.warning(f" - {sensor_key} in room '{candidate.room}' (last seen {time_ago:.0f}s ago)")
else:
logger.warning(f" - {sensor_key} in room '{candidate.room}' (never seen)")
if len(recent_silent) == 0:
with config_lock:
min_silence = BATTERY_CHANGE_MIN_SILENCE
max_silence = BATTERY_CHANGE_MAX_SILENCE
logger.warning(f"No sensors stopped recently ({min_silence}-{max_silence}s window). Cannot determine which sensor changed battery.")
else:
logger.warning(f"{len(recent_silent)} sensors stopped recently. Cannot determine which one changed battery.")
logger.warning(f"Please update manually: UPDATE sensors SET mqtt_name='{mqtt_name}', mqtt_id='{mqtt_id}' WHERE mqtt_name='[old_name]' AND mqtt_id='[old_id]' AND room='[room]';")
return None
def handle_pool_nodeid_change(old_node_id, new_mqtt_id):
"""Handle pool sensor nodeId change (battery reset). Updates both BME280 and DS18B20 sensors."""
new_node_id = new_mqtt_id // 10
# Update BME280 sensor (nodeId*10+1)
bme_sensor = session.query(Sensor).filter_by(
mqtt_name='pool',
node_id=old_node_id,
sensor_type='BME280'
).first()
if bme_sensor:
old_mqtt_id = bme_sensor.mqtt_id
bme_sensor.mqtt_id = new_node_id * 10 + 1
bme_sensor.node_id = new_node_id
logger.info(f"Pool BME280 sensor updated: node_id {old_node_id}{new_node_id}, mqtt_id {old_mqtt_id}{bme_sensor.mqtt_id}")
# Update DS18B20 sensor (nodeId*10+2)
ds_sensor = session.query(Sensor).filter_by(
mqtt_name='pool',
node_id=old_node_id,
sensor_type='DS18B20'
).first()
if ds_sensor:
old_mqtt_id = ds_sensor.mqtt_id
ds_sensor.mqtt_id = new_node_id * 10 + 2
ds_sensor.node_id = new_node_id
logger.info(f"Pool DS18B20 sensor updated: node_id {old_node_id}{new_node_id}, mqtt_id {old_mqtt_id}{ds_sensor.mqtt_id}")
session.commit()
refresh_sensor_cache()
def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm, vcc_mv=None):
mqtt_name, mqtt_id = mqtt_name_id.split('_', 1) # Use maxsplit=1 to handle IDs with underscores
# Get the sensor object from the database (with auto-update for battery changes)
if not ensure_db_connection():
# DB unavailable: stash the data to SQLite and return
save_json_locally({
'utc_time': utc_time,
'mqtt_id': mqtt_name_id,
'temperature_c': temperature_c,
'humidity': humidity,
'pressure_rel': pressure_rel,
'battery': battery,
'average_speed': average_speed,
'direction': direction,
'gust': gust,
'rain_mm': rain_mm,
'vcc_mv': vcc_mv,
})
return
sensor = get_or_update_sensor(mqtt_name, mqtt_id)
if not sensor:
logger.error(f"Cannot store data for {mqtt_name_id} - sensor not found in database")
save_json_locally({
'utc_time': utc_time,
'mqtt_id': mqtt_name_id,
'temperature_c': temperature_c,
'humidity': humidity,
'pressure_rel': pressure_rel,
'battery': battery,
'average_speed': average_speed,
'direction': direction,
'gust': gust,
'rain_mm': rain_mm,
'vcc_mv': vcc_mv,
})
return
position = sensor.position
# Update the sensor's battery level
sensor.battery = battery
# Update last contact time
# Pool sensors: update every contact (critical for monitoring)
# Other sensors: only update if >5 minutes to reduce DB writes
now = datetime.now(timezone.utc)
if mqtt_name == 'pool':
sensor.last_contact = now
elif sensor.last_contact is None or (now - sensor.last_contact.replace(tzinfo=timezone.utc)).total_seconds() > 300:
sensor.last_contact = now
# Update the temperature data
if temperature_c is not None:
if position == "inside":
temperature_inside = session.query(TemperatureInside).filter_by(sensor_id=sensor.id).order_by(TemperatureInside.timestamp.desc()).first()
if temperature_inside is None or temperature_inside.temperature_c != temperature_c:
temperature_inside = TemperatureInside(timestamp=utc_time, sensor_id=sensor.id, temperature_c=temperature_c)
session.add(temperature_inside)
logger.debug(f"{mqtt_name_id}: Stored new temperature (inside): {temperature_c}°C")
else:
logger.debug(f"{mqtt_name_id}: Skipped temperature (inside) - unchanged: {temperature_c}°C")
elif position == "outside":
temperature_outside = session.query(TemperatureOutside).filter_by(sensor_id=sensor.id).order_by(TemperatureOutside.timestamp.desc()).first()
if temperature_outside is None or temperature_outside.temperature_c != temperature_c:
temperature_outside = TemperatureOutside(timestamp=utc_time, sensor_id=sensor.id, temperature_c=temperature_c)
session.add(temperature_outside)
logger.debug(f"{mqtt_name_id}: Stored new temperature (outside): {temperature_c}°C")
else:
logger.debug(f"{mqtt_name_id}: Skipped temperature (outside) - unchanged: {temperature_c}°C")
# Update the humidity data
if humidity is not None:
if position == "inside":
humidity_inside = session.query(HumidityInside).filter_by(sensor_id=sensor.id).order_by(HumidityInside.timestamp.desc()).first()
if humidity_inside is None or humidity_inside.humidity != humidity:
humidity_inside = HumidityInside(timestamp=utc_time, sensor_id=sensor.id, humidity=humidity)
session.add(humidity_inside)
elif position == "outside":
humidity_outside = session.query(HumidityOutside).filter_by(sensor_id=sensor.id).order_by(HumidityOutside.timestamp.desc()).first()
if humidity_outside is None or humidity_outside.humidity != humidity:
humidity_outside = HumidityOutside(timestamp=utc_time, sensor_id=sensor.id, humidity=humidity)
session.add(humidity_outside)
# Update the air pressure data
if pressure_rel is not None:
air_pressure = session.query(AirPressure).filter_by(sensor_id=sensor.id).order_by(AirPressure.timestamp.desc()).first()
if air_pressure is None or air_pressure.pressure_rel != pressure_rel:
air_pressure = AirPressure(timestamp=utc_time, sensor_id=sensor.id, pressure_rel=pressure_rel)
session.add(air_pressure)
# Store voltage if provided (associate with this sensor)
if vcc_mv is not None:
try:
voltage_entry = Voltage(timestamp=utc_time, sensor_id=sensor.id, vcc_mv=int(vcc_mv))
session.add(voltage_entry)
except Exception as e:
logger.warning(f"Failed to store voltage for {mqtt_name_id}: {e}")
if average_speed is not None or gust is not None or direction is not None:
wind_value = session.query(Wind).filter_by(sensor_id=sensor.id).order_by(Wind.timestamp.desc()).first()
if wind_value is None or (average_speed is not None and wind_value.average_speed != average_speed) or (gust is not None and wind_value.gust != gust) or (direction is not None and wind_value.direction != direction):
wind_value = Wind(timestamp=utc_time, sensor_id=sensor.id, average_speed=average_speed, direction=direction, gust=gust)
session.add(wind_value)
# Update Precipitation data with cumulative offset handling
if rain_mm is not None:
if rain_mm <= 1000:
# Check for rain sensor reset (battery change)
# If current value is significantly less than last value, it's a reset
if sensor.last_rain_value is not None and sensor.last_rain_value > 0:
if rain_mm < sensor.last_rain_value - 1.0: # Dropped by more than 1mm (reset detected)
# Battery was changed, add last value to offset
sensor.rain_offset = (sensor.rain_offset or 0.0) + sensor.last_rain_value
logger.info(f"Rain sensor reset detected for {mqtt_name_id}. Last value: {sensor.last_rain_value}mm, New offset: {sensor.rain_offset}mm")
# Calculate actual cumulative rain including offset
actual_rain = rain_mm + (sensor.rain_offset or 0.0)
# Update last rain value for next comparison
sensor.last_rain_value = rain_mm
precipitation = session.query(Precipitation).filter_by(sensor_id=sensor.id).order_by(Precipitation.timestamp.desc()).first()
if precipitation is None or precipitation.precipitation != actual_rain:
precipitation = Precipitation(timestamp=utc_time, sensor_id=sensor.id, precipitation=actual_rain)
session.add(precipitation)
else:
logger.info(f"{utc_time}: Precipitation value is too high: {rain_mm}")
try:
# Commit the changes
session.commit()
except exc.SQLAlchemyError as e:
logger.error(f"SQLAlchemyError on commit: {e}")
try:
session.rollback()
except Exception:
pass # Ignore rollback errors if connection is lost
save_json_locally({
'utc_time': utc_time,
'mqtt_id': mqtt_name_id,
'temperature_c': temperature_c,
'humidity': humidity,
'pressure_rel': pressure_rel,
'battery': battery,
'average_speed': average_speed,
'direction': direction,
'gust': gust,
'rain_mm': rain_mm,
'vcc_mv': vcc_mv,
})
except Exception as e:
logger.error(f"Error on commit: {e}")
try:
session.rollback()
except Exception:
pass # Ignore rollback errors if connection is lost
save_json_locally({
'utc_time': utc_time,
'mqtt_id': mqtt_name_id,
'temperature_c': temperature_c,
'humidity': humidity,
'pressure_rel': pressure_rel,
'battery': battery,
'average_speed': average_speed,
'direction': direction,
'gust': gust,
'rain_mm': rain_mm,
'vcc_mv': vcc_mv,
})
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
# mqttc.username_pw_set("olafn", "weather")
mqttc.connect(MQTT_SERVER)
mqttc.loop_start()
def average_values(data, keys_to_average):
if not data:
return {}, []
# Filter out the data that contains all the keys
period_data = [d for d in data if any(key in d for key in keys_to_average)]
# Calculate the average of the period data
averages = {}
for key in keys_to_average:
values = [d.get(key) for d in period_data if d.get(key) is not None]
average = sum(values) / len(values) if values else None
if average is not None:
if key == 'humidity':
average = int(round(average, 0))
elif key == 'temperature_C':
average = round(average, 1)
elif key == 'wind_max_m_s':
average = round(average, 1)
elif key == 'wind_avg_m_s':
average = round(average, 1)
elif key == 'wind_dir_deg':
average = int(round(average, 0))
elif key == 'rain_mm':
average = round(average, 1)
elif key == 'pressure_rel':
average = int(round(average, 0))
elif key == 'temperature_F':
averages['temperature_C'] = round((average - 32) * 5 / 9, 1)
continue
averages[key] = average
else:
averages[key] = None
# Remove the period data from the original data
data = [d for d in data if d not in period_data]
return averages, data
def debug_sended_data(seen_messages, averages, sensor):
global debug
if not debug:
return
logger.debug(f'Averages for {sensor}:')
for key, value in averages.items():
logger.debug(f"{key}: {value}")
logger.debug(f"Remaining data {sensor}:")
logger.debug(f"{seen_messages[sensor]}")
def process_sensor_data(utc_time, sensor_key, data_list, keys_to_average, mqtt_id_override=None):
"""Helper function to process any sensor data consistently"""
averages, remaining = average_values(data_list, keys_to_average)
if averages:
mqtt_id = mqtt_id_override if mqtt_id_override else sensor_key
# Apply sensor-specific temperature corrections
temperature_c = averages.get('temperature_C')
if temperature_c is not None:
if 'Bresser-6in1' in sensor_key:
with config_lock:
offset = BRESSER_6IN1_TEMP_OFFSET
original_temp = temperature_c
temperature_c = round(temperature_c - offset, 1) # Round to 1 decimal to avoid floating-point errors
logger.info(f"Applied Bresser-6in1 temperature offset: raw={original_temp}°C offset={offset}°C corrected={temperature_c}°C")
update_data(
utc_time,
mqtt_id,
temperature_c,
averages.get('humidity'),
averages.get('pressure_rel'),
averages.get('battery_ok', 1) if averages.get('battery_ok') is not None else 1,
averages.get('wind_avg_m_s'),
averages.get('wind_dir_deg'),
averages.get('wind_max_m_s'),
averages.get('rain_mm'),
averages.get('vcc_mv')
)
debug_sended_data({sensor_key: remaining}, averages, sensor_key)
return remaining
def process_mqtt_messages(seen_messages):
"""Process all received MQTT messages dynamically based on database configuration.
No hardcoded sensor checks - all sensors are processed based on what's in the database.
"""
if not ensure_db_connection():
# DB not available; skip processing but keep messages cached for later.
return
for sensor, data in seen_messages.items():
if data:
# Try to get 'time' from first message, or use None as fallback
time_value = data[0].get('time') if isinstance(data[0], dict) else None
update_last_transmission_time(sensor, time_value)
utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
# Process pool sensors dynamically from cache
for node_id, sensors_by_type in pool_sensors_cache.items():
for sensor_type, db_sensor in sensors_by_type.items():
sensor_key = f'pool_{db_sensor.mqtt_id}'
if sensor_key in seen_messages and seen_messages[sensor_key]:
# Get appropriate keys for this pool sensor type
keys = get_sensor_keys(sensor_type)
seen_messages[sensor_key] = process_sensor_data(utc_time, sensor_key, seen_messages[sensor_key], keys)
# Process all non-pool sensors dynamically from database
# Query all non-pool sensors and process any that have received messages
all_sensors = session.query(Sensor).filter(Sensor.mqtt_name != 'pool').all()
for db_sensor in all_sensors:
sensor_key = f"{db_sensor.mqtt_name}_{db_sensor.mqtt_id}"
if sensor_key in seen_messages and seen_messages[sensor_key]:
# Get the appropriate keys for this sensor type
keys = get_sensor_keys(db_sensor.mqtt_name)
# Process the sensor data with dynamically determined keys
seen_messages[sensor_key] = process_sensor_data(
utc_time,
sensor_key,
seen_messages[sensor_key],
keys
)
# Seen messages already logged in main loop when devices are active
pass
# Funktion zum Abrufen und Löschen der JSON-Daten aus SQLite
def get_and_delete_json_data():
try:
sqlite_cursor.execute('SELECT id, data FROM json_data ORDER BY id ASC')
rows = sqlite_cursor.fetchall()
json_data_list = [json.loads(row[1]) for row in rows]
if rows:
ids = [row[0] for row in rows]
placeholders = ','.join('?' * len(ids))
sqlite_cursor.execute(f'DELETE FROM json_data WHERE id IN ({placeholders})', ids)
sqlite_conn.commit()
logger.info(f"Retrieved and deleted {len(rows)} records from SQLite backup")
return json_data_list
except Exception as e:
logger.error(f"Error retrieving from SQLite: {e}")
return []
# Funktion zum Synchronisieren der Daten
def sync_data():
global session
if not ensure_db_connection(force=True):
# MariaDB nicht verfügbar - speichere in SQLite
while new_data_queue:
data = new_data_queue.pop(0)
save_json_locally(data)
return
local_data_written = False
# Zuerst lokal gespeicherte Daten synchronisieren (SQLite Fallback)
local_data = get_and_delete_json_data()
for data in local_data:
try:
if isinstance(data, dict) and 'utc_time' in data:
# Einzelner Sensor-Eintrag
if " UTC" in str(data.get('utc_time', '')):
data['utc_time'] = data['utc_time'].replace(" UTC", "")
store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'),
data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1),
data.get('average_speed'), data.get('direction'),
data.get('gust'), data.get('rain_mm'), data.get('vcc_mv'))
if not local_data_written:
utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB")
local_data_written = True
except exc.SQLAlchemyError as e:
logger.error(f"SQLAlchemyError syncing local data: {e}")
try:
session.rollback()
except Exception:
pass # Ignore rollback errors if connection is lost
# Rette den Datensatz zurück in SQLite
save_json_locally(data)
except Exception as e:
logger.error(f"Error syncing local data: {e}")
save_json_locally(data)
# Danach neue Daten aus der Warteschlange synchronisieren
while new_data_queue:
data = new_data_queue.pop(0)
try:
if isinstance(data, dict) and 'mqtt_id' in data:
store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'],
data['pressure_rel'], data['battery'], data['average_speed'], data['direction'],
data['gust'], data['rain_mm'], data.get('vcc_mv'))
except exc.SQLAlchemyError as e:
logger.error(f"SQLAlchemyError: {e}")
try:
session.rollback()
except Exception:
pass # Ignore rollback errors if connection is lost
save_json_locally(data)
except Exception as e:
logger.error(f"Error writing data: {e}")
save_json_locally(data)
def update_last_transmission_time(sensor, time_value):
"""Update last transmission time for a sensor. Uses provided time or falls back to current time if not available."""
try:
if time_value:
last_transmission_times[sensor] = datetime.fromisoformat(time_value.replace('Z', '+00:00'))
else:
last_transmission_times[sensor] = datetime.now(timezone.utc)
except (ValueError, AttributeError, TypeError):
# Fallback to current time if time parsing fails
last_transmission_times[sensor] = datetime.now(timezone.utc)
def check_last_transmission_time():
global last_restart_time
now = datetime.now(timezone.utc)
# Determine recent activity for BME280 (local I2C) and radio sensors (868 MHz)
bme_active_recent = any(
(now - t.replace(tzinfo=timezone.utc)).total_seconds() <= STALL_WINDOW_SECONDS
for k, t in last_transmission_times.items() if k.startswith('Bosch-BME280_')
)
radio_active_recent = False
radio_nonresponding = 0
# Only evaluate the sensors explicitly listed as radio sensors
for sensor in allowed_sensors_for_time:
t = last_transmission_times.get(sensor)
if not t:
# Not seen in this runtime; skip counting to avoid false positives at startup
continue
t = t.replace(tzinfo=timezone.utc)
age = (now - t).total_seconds()
if age <= STALL_WINDOW_SECONDS:
radio_active_recent = True
# Clear failure flag when sensor recovers
if sensor_failure_logged.get(sensor):
logger.info(f'Sensor {sensor} has recovered')
sensor_failure_logged[sensor] = False
else:
radio_nonresponding += 1
# Only log failure once when it first occurs
if not sensor_failure_logged.get(sensor):
logger.warning(f'Sensor {sensor} not responding (last seen {age:.0f}s ago)')
sensor_failure_logged[sensor] = True
# Condition 1: Only BME is active (dongle likely stalled)
if bme_active_recent and not radio_active_recent:
if not last_restart_time or (now - last_restart_time).total_seconds() >= RESTART_COOLDOWN_SECONDS:
logger.warning('Only BME280 active; 868 MHz sensors silent. Restarting Pi...')
restart_pi()
last_restart_time = now
# Only log cooldown every 5 minutes to reduce spam
elif (now - last_restart_time).total_seconds() % 300 < 60:
remaining = RESTART_COOLDOWN_SECONDS - (now - last_restart_time).total_seconds()
logger.info(f'BME-only state continues, restart cooldown: {remaining:.0f}s remaining')
return
# Condition 2: Fallback multiple radio sensors not responding
if radio_nonresponding >= 2:
if not last_restart_time or (now - last_restart_time).total_seconds() >= RESTART_COOLDOWN_SECONDS:
logger.warning(f'{radio_nonresponding} sensors not responding. Restarting Pi...')
restart_pi()
last_restart_time = now
# Only log cooldown every 5 minutes to reduce spam
elif (now - last_restart_time).total_seconds() % 300 < 60:
remaining = RESTART_COOLDOWN_SECONDS - (now - last_restart_time).total_seconds()
logger.info(f'{radio_nonresponding} sensors down, restart cooldown: {remaining:.0f}s remaining')
def restart_pi():
"""Restart the Raspberry Pi remotely via SSH (container runs on NAS)."""
logger.warning("Attempting to restart Raspberry Pi due to sensor failure...")
if not PI_HOST:
logger.error("PI_HOST env var not set; cannot SSH to Pi. Set PI_HOST, PI_USER, SSH_KEY_PATH.")
return
ssh_target = f"{PI_USER}@{PI_HOST}"
ssh_cmd = [
'ssh',
'-i', SSH_KEY_PATH,
'-o', 'StrictHostKeyChecking=accept-new',
'-o', 'BatchMode=yes',
'-o', 'ConnectTimeout=8',
ssh_target,
'sudo', '-n', 'reboot'
]
try:
result = subprocess.run(ssh_cmd, capture_output=True, timeout=15)
if result.returncode == 0:
logger.info("Pi restart command sent via SSH successfully")
return
else:
logger.error(f"SSH reboot failed (code {result.returncode}): {result.stderr.decode(errors='ignore')}")
except Exception as e:
logger.error(f"SSH reboot exception: {e}")
logger.error("Pi restart via SSH failed. Manual intervention may be required.")
if __name__ == '__main__':
count = 0
# Start environment file watcher for runtime config changes
start_env_watcher()
# Initial DB check (non-fatal). If unreachable, we will retry in loop.
ensure_db_connection(force=True)
if db_available:
try:
sensors = refresh_sensor_cache()
build_sensor_lists_from_db()
except Exception as e:
logger.warning(f"Failed to load sensor configuration after DB connection: {e}")
else:
logger.warning(f"Starting without database; will cache data locally and retry every {DB_RETRY_SECONDS}s")
logger.info('Starting data collection')
try:
while True:
# Periodically retry DB connection if currently down
if not db_available:
if ensure_db_connection(force=True):
try:
sensors = refresh_sensor_cache()
build_sensor_lists_from_db()
sync_data() # flush cached data once DB is back
except Exception as e:
logger.error(f"DB reconnected but failed to refresh caches/sync: {e}")
check_last_transmission_time()
time.sleep(60)
utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
# Only log when devices are actually seen
active_devices = [key for key in seen_messages.keys() if seen_messages[key]]
if active_devices:
logger.info(f"{utc_time}: Seen devices: {active_devices}")
# Only log sensor status if there are transmission times to report
if last_transmission_times:
status_lines = []
for k, v in last_transmission_times.items():
if k not in ignored_sensors_for_time:
age_seconds = (datetime.now(timezone.utc) - v.replace(tzinfo=timezone.utc)).total_seconds()
status_lines.append(f"{k}: {v.strftime('%H:%M:%S')} ({age_seconds:.0f}s ago)")
if status_lines:
logger.info(f"{utc_time}: Last seen:\n" + "\n".join(status_lines))
process_mqtt_messages(seen_messages)
if ensure_db_connection():
new_data_queue.append(seen_messages)
sync_data()
else:
logger.warning(f"{utc_time}: Database unavailable - storing batch locally")
save_json_locally(seen_messages)
except KeyboardInterrupt:
logger.info("Shutting down gracefully...")
stop_env_watcher()
if mqttc:
mqttc.loop_stop()
raise