Initial commit

This commit is contained in:
User
2025-12-21 07:26:30 +00:00
commit 1d1036cf2a
7 changed files with 1210 additions and 0 deletions

986
datacollector.py Normal file
View File

@@ -0,0 +1,986 @@
# import debugpy
# debugpy.listen(('0.0.0.0', 5678))
# print("Waiting for debugger attach")
# debugpy.wait_for_client()
import json
import time
import paho.mqtt.client as mqtt
import sqlite3
import requests
import logging
import struct
import subprocess
import os
import threading
from logging.handlers import RotatingFileHandler
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from sqlalchemy import create_engine, exc
# from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timezone
from data_tables import Sensor, TemperatureInside,TemperatureOutside, HumidityOutside, HumidityInside, AirPressure, Wind, Precipitation
# Configure logging with environment-based log level
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logger = logging.getLogger(__name__)
logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
handler = RotatingFileHandler('datacollector.log', maxBytes=100000000, backupCount=1)
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
# Malformed hex logging (controlled via environment). Enable with LOG_MALFORMED_HEX=true|1|yes|on
LOG_MALFORMED_HEX = os.getenv("LOG_MALFORMED_HEX", "false").strip().lower() in ("1", "true", "yes", "on")
malformed_hex_logger = logging.getLogger('malformed_hex')
malformed_hex_logger.setLevel(logging.INFO)
if LOG_MALFORMED_HEX:
malformed_hex_handler = RotatingFileHandler('mal-hex.log', maxBytes=10000000, backupCount=1)
malformed_hex_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
malformed_hex_logger.addHandler(malformed_hex_handler)
MQTT_SERVER = "192.168.43.102"
MQTT_TOPIC_PREFIX = "rtl_433/DietPi/events"
KNOWN_DEVICES = ["inFactory-TH_25", "Oregon-THGR122N_233", "Oregon-v1_0", "Bresser-6in1_-2021550075", "Bosch-BME280_1", "pool"]
# Remote Pi management (container runs on NAS)
PI_HOST = os.getenv("PI_HOST")
PI_USER = os.getenv("PI_USER", "pi")
SSH_KEY_PATH = os.getenv("SSH_KEY_PATH", "/workspace/.ssh/id_rsa")
seen_messages = {}
new_data_queue = []
last_transmission_times = {}
ignored_sensors_for_time = ['Bosch-BME280_1', 'Oregon-v1_0', 'inFactory-TH_252', 'Oregon-THGR122N_233']
allowed_sensors_for_time = ['Bresser-6in1_-2021550075', 'LaCrosse-TX35DTHIT_20', 'LaCrosse-TX35DTHIT_28', 'LaCrosse-TX35DTHIT_52', 'LaCrosse-TX35DTHIT_31']
debug = False
# Track sensor failure states to avoid repeated logging
sensor_failure_logged = {}
# Watchdog configuration
# If only BME280 is active within this window and radio sensors are silent, restart
STALL_WINDOW_SECONDS = int(os.getenv("STALL_WINDOW_SECONDS", "300")) # 5 minutes
RESTART_COOLDOWN_SECONDS = int(os.getenv("RESTART_COOLDOWN_SECONDS", "3600")) # 1 hour
last_restart_time = None
# Load sensor-specific temperature offsets from environment
BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0"))
config_lock = threading.Lock()
# File watcher for runtime configuration changes
class EnvFileWatcher(FileSystemEventHandler):
"""Watch for changes to .env file and reload configuration"""
def on_modified(self, event):
if event.src_path.endswith('.env'):
reload_config()
def reload_config():
"""Reload configuration values from .env file"""
global BRESSER_6IN1_TEMP_OFFSET
try:
with config_lock:
old_offset = BRESSER_6IN1_TEMP_OFFSET
BRESSER_6IN1_TEMP_OFFSET = float(os.getenv("BRESSER_6IN1_TEMP_OFFSET", "0"))
if old_offset != BRESSER_6IN1_TEMP_OFFSET:
logger.info(f"Configuration reloaded: BRESSER_6IN1_TEMP_OFFSET changed from {old_offset}°C to {BRESSER_6IN1_TEMP_OFFSET}°C")
except Exception as e:
logger.error(f"Error reloading configuration from .env: {e}")
# Initialize file watcher
env_observer = None
def start_env_watcher():
"""Start watching .env file for changes"""
global env_observer
try:
env_observer = Observer()
event_handler = EnvFileWatcher()
env_observer.schedule(event_handler, path='.', recursive=False)
env_observer.start()
logger.info("Environment file watcher started")
except Exception as e:
logger.warning(f"Failed to start environment file watcher: {e}")
def stop_env_watcher():
"""Stop watching .env file"""
global env_observer
if env_observer:
env_observer.stop()
env_observer.join(timeout=5)
logger.info("Environment file watcher stopped")
# Verbindung zur SQLite-Datenbank herstellen (Fallback wenn MariaDB ausfällt)
sqlite_db_path = 'data/local_backup.db'
os.makedirs('data', exist_ok=True)
sqlite_conn = sqlite3.connect(sqlite_db_path, check_same_thread=False)
sqlite_cursor = sqlite_conn.cursor()
# Tabelle für Anfragen erstellen, falls sie nicht existiert
sqlite_cursor.execute('''
CREATE TABLE IF NOT EXISTS json_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
data TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
sqlite_conn.commit()
logger.info(f"SQLite database initialized at: {sqlite_db_path}")
# Create a connection to the database
sql_engine = create_engine('mysql+mysqlconnector://weatherdata:cfCU$swM!HfK82%*@192.168.43.102/weatherdata')
# Create a configured "Session" class
Session = sessionmaker(bind=sql_engine)
# Create a session to interact with the database
session = Session()
def parse_radio_frame(byte_data):
"""Parse radio frame with structure:
Preamble: 0xAA repeated (often 3x)
Sync: 0x2D (optionally followed by 0xD4)
Header (4 bytes): payload_len (u8), dest_id (u8), sender_id (u8), ctl (u8)
Data: payload_len bytes
CRC: 2 bytes (polynomial unknown) returned but not verified here
Returns dict with extracted 'data' and header fields, or None if not found/invalid.
"""
if not byte_data:
return None
try:
# Find sync 0x2D followed by networkId (second byte)
sync_index = byte_data.find(b"\x2d")
if sync_index == -1:
return None
if sync_index + 1 >= len(byte_data):
# No room for networkId byte
return None
network_id = byte_data[sync_index + 1]
sync_len = 2
header_start = sync_index + sync_len
if header_start + 4 > len(byte_data):
return None
payload_len, dest_id, sender_id, ctl = struct.unpack_from('<BBBB', byte_data, header_start)
data_start = header_start + 4
data_end = data_start + payload_len
if data_end + 2 > len(byte_data):
# Not enough bytes for data + 2-byte CRC
return None
data = byte_data[data_start:data_end]
crc_bytes = byte_data[data_end:data_end + 2]
return {
'data': data,
'payload_len': payload_len,
'dest_id': dest_id,
'sender_id': sender_id,
'ctl': ctl,
'network_id': network_id,
'crc_bytes': crc_bytes,
'sync_index': sync_index,
'sync_len': sync_len,
}
except Exception:
return None
def refresh_sensor_cache():
"""Refresh the sensor cache from database"""
global sensor_ids, sensor_names, sensor_by_name_room
sensors = session.query(Sensor).all()
sensor_ids = [f'{sensor.mqtt_name}_{sensor.mqtt_id}' for sensor in sensors]
sensor_names = [sensor.mqtt_name for sensor in sensors]
# Create a mapping of (mqtt_name_base, room) -> sensor for battery change detection
sensor_by_name_room = {}
for sensor in sensors:
# Extract base name (before last underscore if it contains the ID)
base_name = sensor.mqtt_name.rsplit('_', 1)[0] if '_' in sensor.mqtt_name else sensor.mqtt_name
if sensor.room:
sensor_by_name_room[(base_name, sensor.room)] = sensor
return sensors
sensors = refresh_sensor_cache()
sensor_by_name_room = {}
warte = ''
# Funktion zum Überprüfen der Remote-Server-Verbindung
def is_remote_server_available():
try:
response = requests.get('http://192.168.43.102')
return response.status_code == 200
except requests.ConnectionError:
return False
# return True
# Funktion zum Speichern der JSON-Daten in SQLite
def save_json_locally(json_dict):
try:
json_str = json.dumps(json_dict)
sqlite_cursor.execute('INSERT INTO json_data (data) VALUES (?)', (json_str,))
sqlite_conn.commit()
logger.info(f"Data saved to local SQLite database")
except Exception as e:
logger.error(f"Error saving to SQLite: {e}")
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, reason_code, properties):
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(MQTT_TOPIC_PREFIX)
print(f"Connected with result code {reason_code}")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
if msg.topic.startswith(MQTT_TOPIC_PREFIX[:-2]):
d = json.loads(msg.payload.decode())
model = d['model']
if model in sensor_names:
if model == 'pool':
test_value = d['rows'][0]['data']
if not test_value.startswith('aaaaaa'):
return
else:
# Decode the hex-encoded binary payload and scan for a plausible struct
hex_data = test_value[6:] # Remove 'aaaaaa' prefix
try:
byte_data = bytes.fromhex(hex_data)
except ValueError:
if LOG_MALFORMED_HEX:
malformed_hex_logger.info(f"Invalid hex: {hex_data}")
print(f"Invalid hex data: {hex_data}")
print(d)
warte = ''
return
# Attempt to parse the radio frame first (preamble/sync/header/data/crc)
frame = parse_radio_frame(byte_data)
if frame and frame.get('data'):
print(
f"Parsed radio frame: netId={frame.get('network_id')}, len={frame['payload_len']}, "
f"dest={frame['dest_id']}, sender={frame['sender_id']}, ctl={frame['ctl']}, crc={frame['crc_bytes'].hex()}"
)
candidate_bytes = frame['data']
else:
# Fallback: Drop optional leading 0xAA bytes from hardware and use raw stream
if LOG_MALFORMED_HEX and not frame:
malformed_hex_logger.info(f"Frame parse failed: {byte_data.hex()}")
tmp = byte_data
while tmp.startswith(b"\xaa"):
tmp = tmp[1:]
candidate_bytes = tmp
print(f"Raw bytes ({len(byte_data)}): {byte_data.hex()}")
print(f"Candidate payload for app decode ({len(candidate_bytes)}): {candidate_bytes.hex()}")
# Decode payload struct with magic bytes and CRC
# struct: magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc
PAYLOAD_SIZE = 15
MAGIC1 = 0x42
MAGIC2 = 0x99
def calculate_crc8(data):
"""Simple XOR checksum"""
c = 0
for byte in data:
c ^= byte
return c
# Scan for magic bytes within candidate payload
magic_offset = -1
for i in range(len(candidate_bytes) - 1):
if candidate_bytes[i] == MAGIC1 and candidate_bytes[i+1] == MAGIC2:
magic_offset = i
break
if magic_offset == -1:
if LOG_MALFORMED_HEX:
malformed_hex_logger.info(f"Magic bytes not found: {candidate_bytes.hex()}")
print(f"Magic bytes {MAGIC1:02x} {MAGIC2:02x} not found in payload")
elif len(candidate_bytes) - magic_offset < PAYLOAD_SIZE:
print(f"Payload too short after magic bytes: {len(candidate_bytes) - magic_offset} bytes (need {PAYLOAD_SIZE})")
else:
try:
# Extract payload starting from magic bytes
payload_data = candidate_bytes[magic_offset:magic_offset + PAYLOAD_SIZE]
# Unpack: BBBBHhhhHB = magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc
magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack('<BBBBHhhhHB', payload_data)
print(f"Found magic bytes at offset {magic_offset}")
# Verify CRC
crc_calculated = calculate_crc8(payload_data[:PAYLOAD_SIZE-1])
if crc_received != crc_calculated:
print(f"CRC mismatch! Received: {crc_received:02x}, Calculated: {crc_calculated:02x}")
print("Data may be corrupted - displaying anyway:")
# Convert to human-readable values
parsed_struct = {
"version": version,
"nodeId": nodeId,
"sequence": seq,
"t_ds_c": t_ds10 / 10.0,
"t_bme_c": t_bme10 / 10.0,
"humidity": hum10 / 10.0,
"pressure_hpa": pres1 / 10.0,
"crc_valid": crc_received == crc_calculated
}
print(f"Decoded payload: {json.dumps(parsed_struct, indent=2)}")
# Create two separate sensor messages matching the standard format
original_time = d.get('time', datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S'))
# Message 1: BME280 sensor (temp, humidity, pressure)
bme_msg = {
'time': original_time,
'model': 'pool',
'id': nodeId * 10 + 1, # e.g., nodeId=1 -> id=11 for BME
'battery_ok': 1,
'temperature_C': parsed_struct['t_bme_c'],
'humidity': parsed_struct['humidity'],
'pressure_rel': parsed_struct['pressure_hpa'],
'mic': 'CRC' if parsed_struct['crc_valid'] else 'CHECKSUM'
}
# Message 2: DS18B20 sensor (temp only)
ds_msg = {
'time': original_time,
'model': 'pool',
'id': nodeId * 10 + 2, # e.g., nodeId=1 -> id=12 for DS
'battery_ok': 1,
'temperature_C': parsed_struct['t_ds_c'],
'mic': 'CRC' if parsed_struct['crc_valid'] else 'CHECKSUM'
}
# Process both messages through the existing logic
for msg_data in [bme_msg, ds_msg]:
print(f"Received message from {msg_data['model']}: \n {msg_data}")
sensor_id = msg_data['id']
sensor_key = f"{msg_data['model']}_{sensor_id}"
if sensor_key not in seen_messages.keys():
seen_messages[sensor_key] = [msg_data]
else:
seen_messages[sensor_key].append(msg_data)
except struct.error as e:
print(f"Struct unpack error: {e}")
warte = ''
return
print(f"Received message from {model}: \n {d}")
id = d['id']
if model == 'Bresser-6in1':
if d['flags'] == 0:
if 'rain_mm' in d.keys():
del d['rain_mm']
sensor_key = f'{model}_{id}'
if sensor_key not in seen_messages.keys():
seen_messages[sensor_key] = [d]
else:
seen_messages[sensor_key].append(d)
# Define a function to update the data in the database
def update_data(utc_time, mqtt_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm):
values = {
"utc_time": utc_time,
"mqtt_id": mqtt_id,
"temperature_c": temperature_c,
"humidity": humidity,
"pressure_rel": pressure_rel,
"battery": battery,
"average_speed": average_speed,
"direction": direction,
"gust": gust,
"rain_mm": rain_mm
}
if is_remote_server_available():
new_data_queue.append(values)
sync_data()
# Data sent - no logging needed for normal operation
else:
logger.warning(f"{utc_time}: Remote server unavailable - storing locally for {mqtt_id}")
save_json_locally(values)
def get_or_update_sensor(mqtt_name, mqtt_id):
"""
Get sensor by mqtt_name and mqtt_id. If not found, try to find by base mqtt_name,
then update the mqtt_id (battery change scenario).
Uses timing heuristic: if exactly one sensor of the type stopped transmitting recently,
assume that's the one with battery change.
"""
# Try to find exact match first
sensor = session.query(Sensor).filter_by(mqtt_name=mqtt_name, mqtt_id=mqtt_id).first()
if sensor:
return sensor
# If not found, this might be a battery change
# For pool sensors, require exact match
if mqtt_name == 'pool':
logger.warning(f"Pool sensor {mqtt_name}_{mqtt_id} not found in database. Please add it manually.")
return None
# Extract base name (e.g., "LaCrosse-TX35DTHIT" from "LaCrosse-TX35DTHIT_28")
base_name = mqtt_name.rsplit('_', 1)[0] if '_' in mqtt_name else mqtt_name
# Find all sensors with matching base name
candidates = session.query(Sensor).filter(
Sensor.mqtt_name.like(f'{base_name}%')
).all()
if not candidates:
logger.warning(f"Sensor {mqtt_name}_{mqtt_id} not found in database. Please add it manually.")
return None
# If only one candidate, update it
if len(candidates) == 1:
old_id = candidates[0].mqtt_id
old_name = candidates[0].mqtt_name
candidates[0].mqtt_name = mqtt_name
candidates[0].mqtt_id = mqtt_id
session.commit()
logger.info(f"Updated sensor in room '{candidates[0].room}': {old_name}_{old_id} -> {mqtt_name}_{mqtt_id} (battery change detected)")
# Refresh the sensor cache to reflect the updated ID
refresh_sensor_cache()
return candidates[0]
# Multiple candidates - use timing heuristic
# Check which sensors have stopped transmitting recently (within last 10 minutes)
current_time = datetime.now(timezone.utc)
recent_silent = []
for candidate in candidates:
sensor_key = f"{candidate.mqtt_name}_{candidate.mqtt_id}"
if sensor_key in last_transmission_times:
last_seen = last_transmission_times[sensor_key]
time_since_last = (current_time - last_seen.replace(tzinfo=timezone.utc)).total_seconds()
# Consider sensors that stopped in last 10 minutes but were active before
if 60 < time_since_last < 600: # Between 1 and 10 minutes
recent_silent.append((candidate, time_since_last))
logger.info(f"Candidate {sensor_key} in room '{candidate.room}' last seen {time_since_last:.0f}s ago")
if len(recent_silent) == 1:
# Exactly one sensor went silent recently - assume battery change
sensor_to_update, time_ago = recent_silent[0]
old_id = sensor_to_update.mqtt_id
old_name = sensor_to_update.mqtt_name
old_key = f"{old_name}_{old_id}"
sensor_to_update.mqtt_name = mqtt_name
sensor_to_update.mqtt_id = mqtt_id
session.commit()
logger.info(f"AUTO-UPDATE: Sensor in room '{sensor_to_update.room}' (last seen {time_ago:.0f}s ago)")
logger.info(f" Changed: {old_name}_{old_id} -> {mqtt_name}_{mqtt_id} (battery change detected)")
# Update last_transmission_times key
if old_key in last_transmission_times:
del last_transmission_times[old_key]
# Refresh the sensor cache
refresh_sensor_cache()
return sensor_to_update
# Multiple or no recent silent sensors - need manual intervention
logger.warning(f"Cannot auto-update mqtt_id for {mqtt_name}_{mqtt_id}. Found {len(candidates)} candidates:")
for candidate in candidates:
sensor_key = f"{candidate.mqtt_name}_{candidate.mqtt_id}"
if sensor_key in last_transmission_times:
last_seen = last_transmission_times[sensor_key]
time_ago = (current_time - last_seen.replace(tzinfo=timezone.utc)).total_seconds()
logger.warning(f" - {sensor_key} in room '{candidate.room}' (last seen {time_ago:.0f}s ago)")
else:
logger.warning(f" - {sensor_key} in room '{candidate.room}' (never seen)")
if len(recent_silent) == 0:
logger.warning(f"No sensors stopped recently (1-10 min). Cannot determine which sensor changed battery.")
else:
logger.warning(f"{len(recent_silent)} sensors stopped recently. Cannot determine which one changed battery.")
logger.warning(f"Please update manually: UPDATE sensors SET mqtt_name='{mqtt_name}', mqtt_id='{mqtt_id}' WHERE mqtt_name='[old_name]' AND mqtt_id='[old_id]' AND room='[room]';")
return None
def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm):
mqtt_name, mqtt_id = mqtt_name_id.split('_', 1) # Use maxsplit=1 to handle IDs with underscores
# Get the sensor object from the database (with auto-update for battery changes)
sensor = get_or_update_sensor(mqtt_name, mqtt_id)
if not sensor:
logger.error(f"Cannot store data for {mqtt_name_id} - sensor not found in database")
return
position = sensor.position
# Update the sensor's battery level
sensor.battery = battery
# Update the temperature data
if temperature_c is not None:
if position == "inside":
temperature_inside = session.query(TemperatureInside).filter_by(sensor_id=sensor.id).order_by(TemperatureInside.timestamp.desc()).first()
if temperature_inside is None or temperature_inside.temperature_c != temperature_c:
temperature_inside = TemperatureInside(timestamp=utc_time, sensor_id=sensor.id, temperature_c=temperature_c)
session.add(temperature_inside)
elif position == "outside":
temperature_outside = session.query(TemperatureOutside).filter_by(sensor_id=sensor.id).order_by(TemperatureOutside.timestamp.desc()).first()
if temperature_outside is None or temperature_outside.temperature_c != temperature_c:
temperature_outside = TemperatureOutside(timestamp=utc_time, sensor_id=sensor.id, temperature_c=temperature_c)
session.add(temperature_outside)
# Update the humidity data
if humidity is not None:
if position == "inside":
humidity_inside = session.query(HumidityInside).filter_by(sensor_id=sensor.id).order_by(HumidityInside.timestamp.desc()).first()
if humidity_inside is None or humidity_inside.humidity != humidity:
humidity_inside = HumidityInside(timestamp=utc_time, sensor_id=sensor.id, humidity=humidity)
session.add(humidity_inside)
elif position == "outside":
humidity_outside = session.query(HumidityOutside).filter_by(sensor_id=sensor.id).order_by(HumidityOutside.timestamp.desc()).first()
if humidity_outside is None or humidity_outside.humidity != humidity:
humidity_outside = HumidityOutside(timestamp=utc_time, sensor_id=sensor.id, humidity=humidity)
session.add(humidity_outside)
# Update the air pressure data
if pressure_rel is not None:
air_pressure = session.query(AirPressure).filter_by(sensor_id=sensor.id).order_by(AirPressure.timestamp.desc()).first()
if air_pressure is None or air_pressure.pressure_rel != pressure_rel:
air_pressure = AirPressure(timestamp=utc_time, sensor_id=sensor.id, pressure_rel=pressure_rel)
session.add(air_pressure)
if average_speed is not None or gust is not None or direction is not None:
wind_value = session.query(Wind).filter_by(sensor_id=sensor.id).order_by(Wind.timestamp.desc()).first()
if wind_value is None or (average_speed is not None and wind_value.average_speed != average_speed) or (gust is not None and wind_value.gust != gust) or (direction is not None and wind_value.direction != direction):
wind_value = Wind(timestamp=utc_time, sensor_id=sensor.id, average_speed=average_speed, direction=direction, gust=gust)
session.add(wind_value)
# Update Precipitation data with cumulative offset handling
if rain_mm is not None:
if rain_mm <= 1000:
# Check for rain sensor reset (battery change)
# If current value is significantly less than last value, it's a reset
if sensor.last_rain_value is not None and sensor.last_rain_value > 0:
if rain_mm < sensor.last_rain_value - 1.0: # Dropped by more than 1mm (reset detected)
# Battery was changed, add last value to offset
sensor.rain_offset = (sensor.rain_offset or 0.0) + sensor.last_rain_value
logger.info(f"Rain sensor reset detected for {mqtt_name_id}. Last value: {sensor.last_rain_value}mm, New offset: {sensor.rain_offset}mm")
# Calculate actual cumulative rain including offset
actual_rain = rain_mm + (sensor.rain_offset or 0.0)
# Update last rain value for next comparison
sensor.last_rain_value = rain_mm
precipitation = session.query(Precipitation).filter_by(sensor_id=sensor.id).order_by(Precipitation.timestamp.desc()).first()
if precipitation is None or precipitation.precipitation != actual_rain:
precipitation = Precipitation(timestamp=utc_time, sensor_id=sensor.id, precipitation=actual_rain)
session.add(precipitation)
else:
logger.info(f"{utc_time}: Precipitation value is too high: {rain_mm}")
# Commit the changes
session.commit()
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
# mqttc.username_pw_set("olafn", "weather")
mqttc.connect(MQTT_SERVER)
mqttc.loop_start()
def average_values(data, keys_to_average):
if not data:
return {}, []
# Filter out the data that contains all the keys
period_data = [d for d in data if any(key in d for key in keys_to_average)]
# Calculate the average of the period data
averages = {}
for key in keys_to_average:
values = [d.get(key) for d in period_data if d.get(key) is not None]
average = sum(values) / len(values) if values else None
if average is not None:
if key == 'humidity':
average = int(round(average, 0))
elif key == 'temperature_C':
average = round(average, 1)
elif key == 'wind_max_m_s':
average = round(average, 1)
elif key == 'wind_avg_m_s':
average = round(average, 1)
elif key == 'wind_dir_deg':
average = int(round(average, 0))
elif key == 'rain_mm':
average = round(average, 1)
elif key == 'pressure_rel':
average = int(round(average, 0))
elif key == 'temperature_F':
averages['temperature_C'] = round((average - 32) * 5 / 9, 1)
continue
averages[key] = average
else:
averages[key] = None
# Remove the period data from the original data
data = [d for d in data if d not in period_data]
return averages, data
def debug_sended_data(seen_messages, averages, sensor):
global debug
if not debug:
return
print(f'Averages for {sensor}:')
for key, value in averages.items():
print(f"{key}: {value}")
print(f"Remaining data {sensor}:")
print(seen_messages[sensor])
def process_sensor_data(utc_time, sensor_key, data_list, keys_to_average, mqtt_id_override=None):
"""Helper function to process any sensor data consistently"""
averages, remaining = average_values(data_list, keys_to_average)
if averages:
mqtt_id = mqtt_id_override if mqtt_id_override else sensor_key
# Apply sensor-specific temperature corrections
temperature_c = averages.get('temperature_C')
if temperature_c is not None:
if 'Bresser-6in1' in sensor_key:
with config_lock:
offset = BRESSER_6IN1_TEMP_OFFSET
temperature_c = temperature_c - offset
logger.debug(f"Applied Bresser-6in1 temperature offset: {offset}°C, corrected value: {temperature_c}°C")
update_data(
utc_time,
mqtt_id,
temperature_c,
averages.get('humidity'),
averages.get('pressure_rel'),
averages.get('battery_ok', 1) if averages.get('battery_ok') is not None else 1,
averages.get('wind_avg_m_s'),
averages.get('wind_dir_deg'),
averages.get('wind_max_m_s'),
averages.get('rain_mm')
)
debug_sended_data({sensor_key: remaining}, averages, sensor_key)
return remaining
def process_mqtt_messages(seen_messages):
for sensor, data in seen_messages.items():
if data:
# Try to get 'time' from first message, or use None as fallback
time_value = data[0].get('time') if isinstance(data[0], dict) else None
update_last_transmission_time(sensor, time_value)
utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
# Process pool sensors (BME280 and DS18B20)
pool_sensors = [k for k in seen_messages.keys() if k.startswith('pool_')]
for pool_key in pool_sensors:
if seen_messages[pool_key]:
sensor_id = pool_key.split('_')[1]
if sensor_id.endswith('1'): # BME280 (nodeId*10+1)
keys = ['temperature_C', 'humidity', 'pressure_rel']
else: # DS18B20 (nodeId*10+2)
keys = ['temperature_C']
seen_messages[pool_key] = process_sensor_data(utc_time, pool_key, seen_messages[pool_key], keys)
if 'Bresser-6in1_-2021550075' in seen_messages.keys():
seen_messages['Bresser-6in1_-2021550075'] = process_sensor_data(
utc_time, 'Bresser-6in1_-2021550075', seen_messages['Bresser-6in1_-2021550075'],
['temperature_C', 'humidity', 'wind_max_m_s', 'wind_avg_m_s', 'wind_dir_deg', 'rain_mm', 'battery_ok']
)
if 'Bosch-BME280_1' in seen_messages.keys():
seen_messages['Bosch-BME280_1'] = process_sensor_data(
utc_time, 'Bosch-BME280_1', seen_messages['Bosch-BME280_1'],
['temperature_C', 'humidity', 'pressure_rel']
)
if 'Oregon-v1_0' in seen_messages.keys():
seen_messages['Oregon-v1_0'] = process_sensor_data(
utc_time, 'Oregon-v1_0', seen_messages['Oregon-v1_0'],
['temperature_C', 'battery_ok']
)
if 'Oregon-THGR122N_233' in seen_messages.keys():
seen_messages['Oregon-THGR122N_233'] = process_sensor_data(
utc_time, 'Oregon-THGR122N_233', seen_messages['Oregon-THGR122N_233'],
['temperature_C', 'humidity', 'battery_ok']
)
if 'LaCrosse-TX35DTHIT_52' in seen_messages.keys():
seen_messages['LaCrosse-TX35DTHIT_52'] = process_sensor_data(
utc_time, 'LaCrosse-TX35DTHIT_52', seen_messages['LaCrosse-TX35DTHIT_52'],
['temperature_C', 'humidity', 'battery_ok']
)
if 'LaCrosse-TX35DTHIT_20' in seen_messages.keys():
seen_messages['LaCrosse-TX35DTHIT_20'] = process_sensor_data(
utc_time, 'LaCrosse-TX35DTHIT_20', seen_messages['LaCrosse-TX35DTHIT_20'],
['temperature_C', 'humidity', 'battery_ok']
)
if 'LaCrosse-TX35DTHIT_28' in seen_messages.keys():
seen_messages['LaCrosse-TX35DTHIT_28'] = process_sensor_data(
utc_time, 'LaCrosse-TX35DTHIT_28', seen_messages['LaCrosse-TX35DTHIT_28'],
['temperature_C', 'humidity', 'battery_ok']
)
if 'LaCrosse-TX35DTHIT_31' in seen_messages.keys():
seen_messages['LaCrosse-TX35DTHIT_31'] = process_sensor_data(
utc_time, 'LaCrosse-TX35DTHIT_31', seen_messages['LaCrosse-TX35DTHIT_31'],
['temperature_C', 'humidity', 'battery_ok']
)
# Seen messages already logged in main loop when devices are active
pass
# Funktion zum Abrufen und Löschen der JSON-Daten aus SQLite
def get_and_delete_json_data():
try:
sqlite_cursor.execute('SELECT id, data FROM json_data ORDER BY id ASC')
rows = sqlite_cursor.fetchall()
json_data_list = [json.loads(row[1]) for row in rows]
if rows:
ids = [row[0] for row in rows]
placeholders = ','.join('?' * len(ids))
sqlite_cursor.execute(f'DELETE FROM json_data WHERE id IN ({placeholders})', ids)
sqlite_conn.commit()
logger.info(f"Retrieved and deleted {len(rows)} records from SQLite backup")
return json_data_list
except Exception as e:
logger.error(f"Error retrieving from SQLite: {e}")
return []
# Funktion zum Synchronisieren der Daten
def sync_data():
if is_remote_server_available():
local_data_written = False
# Zuerst lokal gespeicherte Daten synchronisieren (SQLite Fallback)
local_data = get_and_delete_json_data()
for data in local_data:
try:
if isinstance(data, dict) and 'utc_time' in data:
# Einzelner Sensor-Eintrag
if " UTC" in str(data.get('utc_time', '')):
data['utc_time'] = data['utc_time'].replace(" UTC", "")
store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'),
data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1),
data.get('average_speed'), data.get('direction'),
data.get('gust'), data.get('rain_mm'))
if not local_data_written:
utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB")
local_data_written = True
except exc.SQLAlchemyError as e:
logger.error(f"SQLAlchemyError syncing local data: {e}")
session.rollback()
# Rette den Datensatz zurück in SQLite
save_json_locally(data)
except Exception as e:
logger.error(f"Error syncing local data: {e}")
save_json_locally(data)
# Danach neue Daten aus der Warteschlange synchronisieren
while new_data_queue:
data = new_data_queue.pop(0)
try:
if isinstance(data, dict) and 'mqtt_id' in data:
store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'],
data['pressure_rel'], data['battery'], data['average_speed'], data['direction'],
data['gust'], data['rain_mm'])
except exc.SQLAlchemyError as e:
logger.error(f"SQLAlchemyError: {e}")
session.rollback()
save_json_locally(data)
except Exception as e:
logger.error(f"Error writing data: {e}")
save_json_locally(data)
else:
# MariaDB nicht verfügbar - speichere in SQLite
while new_data_queue:
data = new_data_queue.pop(0)
save_json_locally(data)
def update_last_transmission_time(sensor, time_value):
"""Update last transmission time for a sensor. Uses provided time or falls back to current time if not available."""
try:
if time_value:
last_transmission_times[sensor] = datetime.fromisoformat(time_value.replace('Z', '+00:00'))
else:
last_transmission_times[sensor] = datetime.now(timezone.utc)
except (ValueError, AttributeError, TypeError):
# Fallback to current time if time parsing fails
last_transmission_times[sensor] = datetime.now(timezone.utc)
def check_last_transmission_time():
global last_restart_time
now = datetime.now(timezone.utc)
# Determine recent activity for BME280 (local I2C) and radio sensors (868 MHz)
bme_active_recent = any(
(now - t.replace(tzinfo=timezone.utc)).total_seconds() <= STALL_WINDOW_SECONDS
for k, t in last_transmission_times.items() if k.startswith('Bosch-BME280_')
)
radio_active_recent = False
radio_nonresponding = 0
# Only evaluate the sensors explicitly listed as radio sensors
for sensor in allowed_sensors_for_time:
t = last_transmission_times.get(sensor)
if not t:
# Not seen in this runtime; skip counting to avoid false positives at startup
continue
t = t.replace(tzinfo=timezone.utc)
age = (now - t).total_seconds()
if age <= STALL_WINDOW_SECONDS:
radio_active_recent = True
# Clear failure flag when sensor recovers
if sensor_failure_logged.get(sensor):
logger.info(f'Sensor {sensor} has recovered')
sensor_failure_logged[sensor] = False
else:
radio_nonresponding += 1
# Only log failure once when it first occurs
if not sensor_failure_logged.get(sensor):
logger.warning(f'Sensor {sensor} not responding (last seen {age:.0f}s ago)')
sensor_failure_logged[sensor] = True
# Condition 1: Only BME is active (dongle likely stalled)
if bme_active_recent and not radio_active_recent:
if not last_restart_time or (now - last_restart_time).total_seconds() >= RESTART_COOLDOWN_SECONDS:
logger.warning('Only BME280 active; 868 MHz sensors silent. Restarting Pi...')
restart_pi()
last_restart_time = now
# Only log cooldown every 5 minutes to reduce spam
elif (now - last_restart_time).total_seconds() % 300 < 60:
remaining = RESTART_COOLDOWN_SECONDS - (now - last_restart_time).total_seconds()
logger.info(f'BME-only state continues, restart cooldown: {remaining:.0f}s remaining')
return
# Condition 2: Fallback multiple radio sensors not responding
if radio_nonresponding >= 2:
if not last_restart_time or (now - last_restart_time).total_seconds() >= RESTART_COOLDOWN_SECONDS:
logger.warning(f'{radio_nonresponding} sensors not responding. Restarting Pi...')
restart_pi()
last_restart_time = now
# Only log cooldown every 5 minutes to reduce spam
elif (now - last_restart_time).total_seconds() % 300 < 60:
remaining = RESTART_COOLDOWN_SECONDS - (now - last_restart_time).total_seconds()
logger.info(f'{radio_nonresponding} sensors down, restart cooldown: {remaining:.0f}s remaining')
def restart_pi():
"""Restart the Raspberry Pi remotely via SSH (container runs on NAS)."""
logger.warning("Attempting to restart Raspberry Pi due to sensor failure...")
if not PI_HOST:
logger.error("PI_HOST env var not set; cannot SSH to Pi. Set PI_HOST, PI_USER, SSH_KEY_PATH.")
return
ssh_target = f"{PI_USER}@{PI_HOST}"
ssh_cmd = [
'ssh',
'-i', SSH_KEY_PATH,
'-o', 'StrictHostKeyChecking=accept-new',
'-o', 'BatchMode=yes',
'-o', 'ConnectTimeout=8',
ssh_target,
'sudo', '-n', 'reboot'
]
try:
result = subprocess.run(ssh_cmd, capture_output=True, timeout=15)
if result.returncode == 0:
logger.info("Pi restart command sent via SSH successfully")
return
else:
logger.error(f"SSH reboot failed (code {result.returncode}): {result.stderr.decode(errors='ignore')}")
except Exception as e:
logger.error(f"SSH reboot exception: {e}")
logger.error("Pi restart via SSH failed. Manual intervention may be required.")
if __name__ == '__main__':
count = 0
# Start environment file watcher for runtime config changes
start_env_watcher()
print('start data collection')
try:
while True:
check_last_transmission_time()
# if count >= 600:
# mqttc.loop_stop()
# break
# count += 1
# print(count)
time.sleep(60)
utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
# Only log when devices are actually seen
active_devices = [key for key in seen_messages.keys() if seen_messages[key]]
if active_devices:
logger.info(f"{utc_time}: Seen devices: {active_devices}")
# Only log sensor status if there are transmission times to report
if last_transmission_times:
status_lines = []
for k, v in last_transmission_times.items():
if k not in ignored_sensors_for_time:
age_seconds = (datetime.now(timezone.utc) - v.replace(tzinfo=timezone.utc)).total_seconds()
status_lines.append(f"{k}: {v.strftime('%H:%M:%S')} ({age_seconds:.0f}s ago)")
if status_lines:
logger.info(f"{utc_time}: Last seen:\n" + "\n".join(status_lines))
# logger.info(f"{utc_time}: last seen at: {', '.join(f'{k}: {v.strftime('%H:%M:%S')} ({(datetime.now(timezone.utc) - v).total_seconds():.0f} Sekunden ago)' if isinstance(v, datetime) else f'{k}: {v}' for k, v in last_transmission_times.items())}")
process_mqtt_messages(seen_messages)
if is_remote_server_available():
new_data_queue.append(seen_messages)
sync_data()
# Data synced - no logging needed for normal operation
else:
logger.warning(f"{utc_time}: Remote server unavailable - storing batch locally")
save_json_locally(seen_messages)
except KeyboardInterrupt:
logger.info("Shutting down gracefully...")
stop_env_watcher()
if mqttc:
mqttc.loop_stop()
raise