Bugfixes and improvements to datacollector.py
This commit is contained in:
194
datacollector.py
194
datacollector.py
@@ -17,7 +17,7 @@ from watchdog.events import FileSystemEventHandler
|
||||
from sqlalchemy import create_engine, exc, text
|
||||
from sqlalchemy.engine import URL
|
||||
# from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.orm import sessionmaker, scoped_session
|
||||
from datetime import datetime, timezone
|
||||
from data_tables import Sensor, TemperatureInside,TemperatureOutside, HumidityOutside, HumidityInside, AirPressure, Wind, Precipitation, Voltage, Base
|
||||
|
||||
@@ -40,7 +40,6 @@ logger.addHandler(handler)
|
||||
try:
|
||||
logger.info(
|
||||
f"Startup config: LOG_LEVEL={LOG_LEVEL}, "
|
||||
f"BRESSER_6IN1_TEMP_OFFSET={float(os.getenv('BRESSER_6IN1_TEMP_OFFSET', '0'))}°C, "
|
||||
f"BATTERY_CHANGE_MIN_SILENCE={os.getenv('BATTERY_CHANGE_MIN_SILENCE', '60')}s, "
|
||||
f"BATTERY_CHANGE_MAX_SILENCE={os.getenv('BATTERY_CHANGE_MAX_SILENCE', '600')}s"
|
||||
)
|
||||
@@ -88,9 +87,6 @@ STALL_WINDOW_SECONDS = int(os.getenv("STALL_WINDOW_SECONDS", "300")) # 5 minute
|
||||
RESTART_COOLDOWN_SECONDS = int(os.getenv("RESTART_COOLDOWN_SECONDS", "3600")) # 1 hour
|
||||
last_restart_time = None
|
||||
|
||||
# Load sensor-specific temperature offsets from environment
|
||||
BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0"))
|
||||
|
||||
# Load battery change detection timing from environment
|
||||
BATTERY_CHANGE_MIN_SILENCE = int(os.getenv("BATTERY_CHANGE_MIN_SILENCE", "60")) # seconds
|
||||
BATTERY_CHANGE_MAX_SILENCE = int(os.getenv("BATTERY_CHANGE_MAX_SILENCE", "600")) # seconds
|
||||
@@ -106,18 +102,12 @@ class EnvFileWatcher(FileSystemEventHandler):
|
||||
|
||||
def reload_config():
|
||||
"""Reload configuration values from .env file"""
|
||||
global BRESSER_6IN1_TEMP_OFFSET, BATTERY_CHANGE_MIN_SILENCE, BATTERY_CHANGE_MAX_SILENCE
|
||||
global BATTERY_CHANGE_MIN_SILENCE, BATTERY_CHANGE_MAX_SILENCE
|
||||
try:
|
||||
# Re-read .env so changes on disk propagate into os.environ
|
||||
load_dotenv(override=True)
|
||||
|
||||
with config_lock:
|
||||
# Reload temperature offset
|
||||
old_offset = BRESSER_6IN1_TEMP_OFFSET
|
||||
BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0"))
|
||||
if old_offset != BRESSER_6IN1_TEMP_OFFSET:
|
||||
logger.info(f"Configuration reloaded: BRESSER_6IN1_TEMP_OFFSET changed from {old_offset}°C to {BRESSER_6IN1_TEMP_OFFSET}°C")
|
||||
|
||||
# Reload battery change detection timing
|
||||
old_min = BATTERY_CHANGE_MIN_SILENCE
|
||||
old_max = BATTERY_CHANGE_MAX_SILENCE
|
||||
@@ -227,11 +217,16 @@ db_available = False
|
||||
last_db_check = 0.0
|
||||
DB_RETRY_SECONDS = 30 # Retry DB connection every 30 seconds if down
|
||||
|
||||
# Create a configured "Session" class
|
||||
Session = sessionmaker(bind=sql_engine)
|
||||
# Create a thread-local session registry
|
||||
Session = scoped_session(sessionmaker(bind=sql_engine))
|
||||
session = Session
|
||||
|
||||
# Create a session to interact with the database
|
||||
session = Session()
|
||||
def close_db_session():
|
||||
"""Close the current thread-local session to return connections to the pool."""
|
||||
try:
|
||||
Session.remove()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def parse_radio_frame(byte_data):
|
||||
"""Parse radio frame with structure:
|
||||
@@ -485,7 +480,11 @@ def refresh_sensor_cache():
|
||||
# Extract base name (before last underscore if it contains the ID)
|
||||
base_name = sensor.mqtt_name.rsplit('_', 1)[0] if '_' in sensor.mqtt_name else sensor.mqtt_name
|
||||
if sensor.room:
|
||||
sensor_by_name_room[(base_name, sensor.room)] = sensor
|
||||
sensor_by_name_room[(base_name, sensor.room)] = {
|
||||
"mqtt_name": sensor.mqtt_name,
|
||||
"mqtt_id": sensor.mqtt_id,
|
||||
"room": sensor.room,
|
||||
}
|
||||
|
||||
# Cache pool sensors by node_id for dynamic processing
|
||||
pool_sensors_cache = {}
|
||||
@@ -494,8 +493,11 @@ def refresh_sensor_cache():
|
||||
if sensor.node_id not in pool_sensors_cache:
|
||||
pool_sensors_cache[sensor.node_id] = {}
|
||||
# Map by sensor_type to easily identify BME vs DS
|
||||
pool_sensors_cache[sensor.node_id][sensor.sensor_type] = sensor
|
||||
|
||||
pool_sensors_cache[sensor.node_id][sensor.sensor_type] = {
|
||||
"mqtt_id": sensor.mqtt_id,
|
||||
"sensor_type": sensor.sensor_type,
|
||||
}
|
||||
close_db_session()
|
||||
return sensors
|
||||
|
||||
def build_sensor_lists_from_db():
|
||||
@@ -532,6 +534,7 @@ def build_sensor_lists_from_db():
|
||||
if any(sensor_type in sensor.mqtt_name for sensor_type in ignored_types)
|
||||
]
|
||||
logger.info(f"Built ignored_sensors_for_time from database: {ignored_sensors_for_time}")
|
||||
close_db_session()
|
||||
|
||||
def get_sensor_keys(sensor_type):
|
||||
"""Return the list of MQTT keys to average for each sensor type.
|
||||
@@ -581,7 +584,7 @@ def ensure_db_connection(force: bool = False) -> bool:
|
||||
This function tests the connection and reinitializes the session if needed.
|
||||
On failure, it disposes the pool to force reconnection next attempt.
|
||||
"""
|
||||
global db_available, last_db_check, session
|
||||
global db_available, last_db_check
|
||||
now = time.time()
|
||||
if not force and (now - last_db_check) < DB_RETRY_SECONDS:
|
||||
return db_available
|
||||
@@ -594,9 +597,8 @@ def ensure_db_connection(force: bool = False) -> bool:
|
||||
if not db_available:
|
||||
logger.info(f"Database reachable again at {DB_HOST}:{DB_PORT}")
|
||||
db_available = True
|
||||
# Recreate session to ensure fresh connections
|
||||
session = Session()
|
||||
except exc.OperationalError as e:
|
||||
close_db_session()
|
||||
# Connection failed - dispose pool to force fresh connections on next attempt
|
||||
sql_engine.dispose()
|
||||
if db_available:
|
||||
@@ -605,6 +607,7 @@ def ensure_db_connection(force: bool = False) -> bool:
|
||||
logger.info(f"Database still unreachable: {e}")
|
||||
db_available = False
|
||||
except Exception as e:
|
||||
close_db_session()
|
||||
sql_engine.dispose()
|
||||
if db_available:
|
||||
logger.warning(f"Unexpected database error: {type(e).__name__}: {e}")
|
||||
@@ -619,12 +622,18 @@ def on_connect(client, userdata, flags, reason_code, properties):
|
||||
# Subscribing in on_connect() means that if we lose the connection and
|
||||
# reconnect then subscriptions will be renewed.
|
||||
client.subscribe(MQTT_TOPIC_PREFIX)
|
||||
logger.info(f"Connected with result code {reason_code}")
|
||||
logger.info(f"MQTT - Connected with result code {reason_code}")
|
||||
|
||||
# The callback for when a PUBLISH message is received from the server.
|
||||
def on_message(client, userdata, msg):
|
||||
if msg.topic.startswith(MQTT_TOPIC_PREFIX[:-2]):
|
||||
d = json.loads(msg.payload.decode())
|
||||
try:
|
||||
payload_text = msg.payload.decode(errors='replace')
|
||||
d = json.loads(payload_text)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.warning(f"Malformed JSON payload on {msg.topic}: {e}")
|
||||
logger.debug(f"Raw payload: {payload_text}")
|
||||
return
|
||||
model = d['model']
|
||||
if model in sensor_names:
|
||||
if model == 'pool':
|
||||
@@ -962,6 +971,7 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b
|
||||
'rain_mm': rain_mm,
|
||||
'vcc_mv': vcc_mv,
|
||||
})
|
||||
close_db_session()
|
||||
return
|
||||
|
||||
sensor = get_or_update_sensor(mqtt_name, mqtt_id)
|
||||
@@ -981,6 +991,7 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b
|
||||
'rain_mm': rain_mm,
|
||||
'vcc_mv': vcc_mv,
|
||||
})
|
||||
close_db_session()
|
||||
return
|
||||
|
||||
position = sensor.position
|
||||
@@ -1115,6 +1126,7 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b
|
||||
'rain_mm': rain_mm,
|
||||
'vcc_mv': vcc_mv,
|
||||
})
|
||||
close_db_session()
|
||||
|
||||
|
||||
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
||||
@@ -1185,15 +1197,7 @@ def process_sensor_data(utc_time, sensor_key, data_list, keys_to_average, mqtt_i
|
||||
if averages:
|
||||
mqtt_id = mqtt_id_override if mqtt_id_override else sensor_key
|
||||
|
||||
# Apply sensor-specific temperature corrections
|
||||
temperature_c = averages.get('temperature_C')
|
||||
if temperature_c is not None:
|
||||
if 'Bresser-6in1' in sensor_key:
|
||||
with config_lock:
|
||||
offset = BRESSER_6IN1_TEMP_OFFSET
|
||||
original_temp = temperature_c
|
||||
temperature_c = round(temperature_c - offset, 1) # Round to 1 decimal to avoid floating-point errors
|
||||
logger.info(f"Applied Bresser-6in1 temperature offset: raw={original_temp}°C offset={offset}°C corrected={temperature_c}°C")
|
||||
|
||||
update_data(
|
||||
utc_time,
|
||||
@@ -1228,8 +1232,8 @@ def process_mqtt_messages(seen_messages):
|
||||
|
||||
# Process pool sensors dynamically from cache
|
||||
for node_id, sensors_by_type in pool_sensors_cache.items():
|
||||
for sensor_type, db_sensor in sensors_by_type.items():
|
||||
sensor_key = f'pool_{db_sensor.mqtt_id}'
|
||||
for sensor_type, pool_entry in sensors_by_type.items():
|
||||
sensor_key = f"pool_{pool_entry['mqtt_id']}"
|
||||
if sensor_key in seen_messages and seen_messages[sensor_key]:
|
||||
# Get appropriate keys for this pool sensor type
|
||||
keys = get_sensor_keys(sensor_type)
|
||||
@@ -1237,14 +1241,14 @@ def process_mqtt_messages(seen_messages):
|
||||
|
||||
# Process all non-pool sensors dynamically from database
|
||||
# Query all non-pool sensors and process any that have received messages
|
||||
all_sensors = session.query(Sensor).filter(Sensor.mqtt_name != 'pool').all()
|
||||
|
||||
for db_sensor in all_sensors:
|
||||
sensor_key = f"{db_sensor.mqtt_name}_{db_sensor.mqtt_id}"
|
||||
all_sensors = session.query(Sensor.mqtt_name, Sensor.mqtt_id).filter(Sensor.mqtt_name != 'pool').all()
|
||||
|
||||
for mqtt_name, mqtt_id in all_sensors:
|
||||
sensor_key = f"{mqtt_name}_{mqtt_id}"
|
||||
|
||||
if sensor_key in seen_messages and seen_messages[sensor_key]:
|
||||
# Get the appropriate keys for this sensor type
|
||||
keys = get_sensor_keys(db_sensor.mqtt_name)
|
||||
keys = get_sensor_keys(mqtt_name)
|
||||
|
||||
# Process the sensor data with dynamically determined keys
|
||||
seen_messages[sensor_key] = process_sensor_data(
|
||||
@@ -1255,7 +1259,7 @@ def process_mqtt_messages(seen_messages):
|
||||
)
|
||||
|
||||
# Seen messages already logged in main loop when devices are active
|
||||
pass
|
||||
close_db_session()
|
||||
|
||||
# Funktion zum Abrufen und Löschen der JSON-Daten aus SQLite
|
||||
def get_and_delete_json_data():
|
||||
@@ -1279,63 +1283,65 @@ def get_and_delete_json_data():
|
||||
# Funktion zum Synchronisieren der Daten
|
||||
def sync_data():
|
||||
global session
|
||||
|
||||
if not ensure_db_connection(force=True):
|
||||
# MariaDB nicht verfügbar - speichere in SQLite
|
||||
try:
|
||||
if not ensure_db_connection(force=True):
|
||||
# MariaDB nicht verfügbar - speichere in SQLite
|
||||
while new_data_queue:
|
||||
data = new_data_queue.pop(0)
|
||||
save_json_locally(data)
|
||||
return
|
||||
|
||||
local_data_written = False
|
||||
|
||||
# Zuerst lokal gespeicherte Daten synchronisieren (SQLite Fallback)
|
||||
local_data = get_and_delete_json_data()
|
||||
for data in local_data:
|
||||
try:
|
||||
if isinstance(data, dict) and 'utc_time' in data:
|
||||
# Einzelner Sensor-Eintrag
|
||||
if " UTC" in str(data.get('utc_time', '')):
|
||||
data['utc_time'] = data['utc_time'].replace(" UTC", "")
|
||||
|
||||
store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'),
|
||||
data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1),
|
||||
data.get('average_speed'), data.get('direction'),
|
||||
data.get('gust'), data.get('rain_mm'), data.get('vcc_mv'))
|
||||
if not local_data_written:
|
||||
utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
|
||||
logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB")
|
||||
local_data_written = True
|
||||
except exc.SQLAlchemyError as e:
|
||||
logger.error(f"SQLAlchemyError syncing local data: {e}")
|
||||
try:
|
||||
session.rollback()
|
||||
except Exception:
|
||||
pass # Ignore rollback errors if connection is lost
|
||||
# Rette den Datensatz zurück in SQLite
|
||||
save_json_locally(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error syncing local data: {e}")
|
||||
save_json_locally(data)
|
||||
|
||||
# Danach neue Daten aus der Warteschlange synchronisieren
|
||||
while new_data_queue:
|
||||
data = new_data_queue.pop(0)
|
||||
save_json_locally(data)
|
||||
return
|
||||
|
||||
local_data_written = False
|
||||
|
||||
# Zuerst lokal gespeicherte Daten synchronisieren (SQLite Fallback)
|
||||
local_data = get_and_delete_json_data()
|
||||
for data in local_data:
|
||||
try:
|
||||
if isinstance(data, dict) and 'utc_time' in data:
|
||||
# Einzelner Sensor-Eintrag
|
||||
if " UTC" in str(data.get('utc_time', '')):
|
||||
data['utc_time'] = data['utc_time'].replace(" UTC", "")
|
||||
|
||||
store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'),
|
||||
data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1),
|
||||
data.get('average_speed'), data.get('direction'),
|
||||
data.get('gust'), data.get('rain_mm'), data.get('vcc_mv'))
|
||||
if not local_data_written:
|
||||
utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
|
||||
logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB")
|
||||
local_data_written = True
|
||||
except exc.SQLAlchemyError as e:
|
||||
logger.error(f"SQLAlchemyError syncing local data: {e}")
|
||||
try:
|
||||
session.rollback()
|
||||
except Exception:
|
||||
pass # Ignore rollback errors if connection is lost
|
||||
# Rette den Datensatz zurück in SQLite
|
||||
save_json_locally(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error syncing local data: {e}")
|
||||
save_json_locally(data)
|
||||
|
||||
# Danach neue Daten aus der Warteschlange synchronisieren
|
||||
while new_data_queue:
|
||||
data = new_data_queue.pop(0)
|
||||
try:
|
||||
if isinstance(data, dict) and 'mqtt_id' in data:
|
||||
store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'],
|
||||
data['pressure_rel'], data['battery'], data['average_speed'], data['direction'],
|
||||
data['gust'], data['rain_mm'], data.get('vcc_mv'))
|
||||
except exc.SQLAlchemyError as e:
|
||||
logger.error(f"SQLAlchemyError: {e}")
|
||||
try:
|
||||
session.rollback()
|
||||
except Exception:
|
||||
pass # Ignore rollback errors if connection is lost
|
||||
save_json_locally(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing data: {e}")
|
||||
save_json_locally(data)
|
||||
if isinstance(data, dict) and 'mqtt_id' in data:
|
||||
store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'],
|
||||
data['pressure_rel'], data['battery'], data['average_speed'], data['direction'],
|
||||
data['gust'], data['rain_mm'], data.get('vcc_mv'))
|
||||
except exc.SQLAlchemyError as e:
|
||||
logger.error(f"SQLAlchemyError: {e}")
|
||||
try:
|
||||
session.rollback()
|
||||
except Exception:
|
||||
pass # Ignore rollback errors if connection is lost
|
||||
save_json_locally(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing data: {e}")
|
||||
save_json_locally(data)
|
||||
finally:
|
||||
close_db_session()
|
||||
|
||||
def update_last_transmission_time(sensor, time_value):
|
||||
"""Update last transmission time for a sensor. Uses provided time or falls back to current time if not available."""
|
||||
|
||||
Reference in New Issue
Block a user