changed: data-logging configuration to rotate more frequently
fixed: database connection retry interval and storing in local SQLite when DB is down
This commit is contained in:
589
datacollector.py
589
datacollector.py
@@ -7,12 +7,15 @@ import logging
|
||||
import struct
|
||||
import subprocess
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
from typing import Optional
|
||||
from dotenv import load_dotenv
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from sqlalchemy import create_engine, exc
|
||||
from sqlalchemy import create_engine, exc, text
|
||||
from sqlalchemy.engine import URL
|
||||
# from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from datetime import datetime, timezone
|
||||
@@ -25,7 +28,7 @@ load_dotenv()
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
|
||||
handler = RotatingFileHandler('datacollector.log', maxBytes=100000000, backupCount=1)
|
||||
handler = RotatingFileHandler('datacollector.log', maxBytes=5242880, backupCount=10)
|
||||
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
||||
logger.addHandler(handler)
|
||||
|
||||
@@ -65,6 +68,12 @@ ignored_sensors_for_time = ['Bosch-BME280_1', 'Oregon-v1_0', 'inFactory-TH_252',
|
||||
allowed_sensors_for_time = ['Bresser-6in1_-2021550075', 'LaCrosse-TX35DTHIT_20', 'LaCrosse-TX35DTHIT_28', 'LaCrosse-TX35DTHIT_52', 'LaCrosse-TX35DTHIT_31']
|
||||
debug = False
|
||||
|
||||
# Safe defaults for DB-derived caches
|
||||
sensor_ids = []
|
||||
sensor_names = []
|
||||
sensor_by_name_room = {}
|
||||
pool_sensors_cache = {}
|
||||
|
||||
# Track sensor failure states to avoid repeated logging
|
||||
sensor_failure_logged = {}
|
||||
|
||||
@@ -155,8 +164,16 @@ sqlite_conn.commit()
|
||||
|
||||
logger.info(f"SQLite database initialized at: {sqlite_db_path}")
|
||||
|
||||
# Create a connection to the database
|
||||
sql_engine = create_engine('mysql+mysqlconnector://weatherdata:cfCU$swM!HfK82%*@192.168.43.102/weatherdata')
|
||||
# Database connection (hardcoded for private intranet)
|
||||
DB_HOST = "192.168.43.102"
|
||||
DB_PORT = 3306
|
||||
sql_engine = create_engine('mysql+mysqlconnector://weatherdata:cfCU$swM!HfK82%*@192.168.43.102/weatherdata',
|
||||
connect_args={"connection_timeout": 5})
|
||||
|
||||
# DB availability tracking for resilient mode
|
||||
db_available = False
|
||||
last_db_check = 0.0
|
||||
DB_RETRY_SECONDS = 30 # Retry DB connection every 30 seconds if down
|
||||
|
||||
# Create a configured "Session" class
|
||||
Session = sessionmaker(bind=sql_engine)
|
||||
@@ -176,6 +193,84 @@ def parse_radio_frame(byte_data):
|
||||
if not byte_data:
|
||||
return None
|
||||
|
||||
|
||||
PAYLOAD_SIZE = 15 # bytes in pool payload
|
||||
MAGIC1 = 0x42
|
||||
MAGIC2 = 0x99
|
||||
|
||||
|
||||
def crc8_xor(data: bytes) -> int:
|
||||
"""Simple XOR checksum used by the pool payload."""
|
||||
c = 0
|
||||
for b in data:
|
||||
c ^= b
|
||||
return c
|
||||
|
||||
|
||||
def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = None):
|
||||
"""Scan a byte stream for a plausible pool payload.
|
||||
|
||||
Slides a 15-byte window, validates with CRC, version/nodeId, and range checks,
|
||||
and scores candidates. Returns the best decoded dict or None.
|
||||
"""
|
||||
# Drop leading preamble (0xAA) if present
|
||||
while candidate_bytes.startswith(b"\xaa"):
|
||||
candidate_bytes = candidate_bytes[1:]
|
||||
|
||||
best = None
|
||||
best_score = -1
|
||||
|
||||
for offset in range(0, len(candidate_bytes) - PAYLOAD_SIZE + 1):
|
||||
chunk = candidate_bytes[offset:offset + PAYLOAD_SIZE]
|
||||
try:
|
||||
magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack(
|
||||
'<BBBBHhhHHB', chunk
|
||||
)
|
||||
except struct.error:
|
||||
continue
|
||||
|
||||
crc_calculated = crc8_xor(chunk[:-1])
|
||||
if crc_calculated != crc_received:
|
||||
continue
|
||||
|
||||
if version != 1 or nodeId != 1:
|
||||
continue
|
||||
|
||||
# Plausibility checks (unit scaled)
|
||||
if not (-300 <= t_ds10 <= 600): # -30.0 to 60.0°C
|
||||
continue
|
||||
if not (-300 <= t_bme10 <= 600):
|
||||
continue
|
||||
if not (0 <= hum10 <= 1000): # 0.0–100.0%
|
||||
continue
|
||||
if not (8000 <= pres1 <= 11000): # 800.0–1100.0 hPa
|
||||
continue
|
||||
|
||||
score = 0
|
||||
if magic1 == MAGIC1 and magic2 == MAGIC2:
|
||||
score += 2
|
||||
if expected_seq is not None and seq == expected_seq:
|
||||
score += 1
|
||||
# CRC already validated; reward shorter offset to prefer first valid
|
||||
score -= offset * 0.001
|
||||
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best = {
|
||||
"offset": offset,
|
||||
"magic_ok": magic1 == MAGIC1 and magic2 == MAGIC2,
|
||||
"version": version,
|
||||
"nodeId": nodeId,
|
||||
"sequence": seq,
|
||||
"t_ds_c": t_ds10 / 10.0,
|
||||
"t_bme_c": t_bme10 / 10.0,
|
||||
"humidity": hum10 / 10.0,
|
||||
"pressure_hpa": pres1 / 10.0,
|
||||
"crc_valid": True,
|
||||
}
|
||||
|
||||
return best
|
||||
|
||||
try:
|
||||
# Find sync 0x2D followed by networkId (second byte)
|
||||
sync_index = byte_data.find(b"\x2d")
|
||||
@@ -220,6 +315,9 @@ def parse_radio_frame(byte_data):
|
||||
def refresh_sensor_cache():
|
||||
"""Refresh the sensor cache from database"""
|
||||
global sensor_ids, sensor_names, sensor_by_name_room, pool_sensors_cache
|
||||
if not ensure_db_connection():
|
||||
return []
|
||||
|
||||
sensors = session.query(Sensor).all()
|
||||
sensor_ids = [f'{sensor.mqtt_name}_{sensor.mqtt_id}' for sensor in sensors]
|
||||
sensor_names = list(set([sensor.mqtt_name for sensor in sensors])) # Unique model names
|
||||
@@ -249,6 +347,9 @@ def build_sensor_lists_from_db():
|
||||
Call after refresh_sensor_cache() and whenever sensors are added/removed.
|
||||
"""
|
||||
global KNOWN_DEVICES, allowed_sensors_for_time, ignored_sensors_for_time
|
||||
if not ensure_db_connection():
|
||||
return
|
||||
|
||||
sensors = session.query(Sensor).all()
|
||||
|
||||
# Build KNOWN_DEVICES - unique model names
|
||||
@@ -297,23 +398,14 @@ def get_sensor_keys(sensor_type):
|
||||
logger.warning(f"Unknown sensor type '{sensor_type}' - defaulting to temperature only")
|
||||
return ['temperature_C']
|
||||
|
||||
sensors = refresh_sensor_cache()
|
||||
sensor_by_name_room = {}
|
||||
pool_sensors_cache = {}
|
||||
|
||||
# Build sensor lists from database at startup
|
||||
build_sensor_lists_from_db()
|
||||
sensors = []
|
||||
|
||||
warte = ''
|
||||
|
||||
# Funktion zum Überprüfen der Remote-Server-Verbindung
|
||||
def is_remote_server_available():
|
||||
try:
|
||||
response = requests.get('http://192.168.43.102')
|
||||
return response.status_code == 200
|
||||
except requests.ConnectionError:
|
||||
return False
|
||||
# return True
|
||||
# If DB is up, we consider the remote server available for writes.
|
||||
return db_available
|
||||
|
||||
# Funktion zum Speichern der JSON-Daten in SQLite
|
||||
def save_json_locally(json_dict):
|
||||
@@ -326,6 +418,31 @@ def save_json_locally(json_dict):
|
||||
logger.error(f"Error saving to SQLite: {e}")
|
||||
|
||||
|
||||
def ensure_db_connection(force: bool = False) -> bool:
|
||||
"""Try to establish DB connectivity with throttling. Returns True if DB is reachable."""
|
||||
global db_available, last_db_check, session
|
||||
now = time.time()
|
||||
if not force and (now - last_db_check) < DB_RETRY_SECONDS:
|
||||
return db_available
|
||||
|
||||
last_db_check = now
|
||||
try:
|
||||
with sql_engine.connect() as conn:
|
||||
conn.execute(text('SELECT 1'))
|
||||
if not db_available:
|
||||
logger.info(f"Database reachable again at {DB_HOST}:{DB_PORT}")
|
||||
db_available = True
|
||||
# Recreate session to ensure fresh connections
|
||||
session = Session()
|
||||
except Exception as e:
|
||||
if db_available:
|
||||
logger.warning(f"Lost database connectivity: {e}")
|
||||
else:
|
||||
logger.info(f"Database still unreachable: {e}")
|
||||
db_available = False
|
||||
return db_available
|
||||
|
||||
|
||||
# The callback for when the client receives a CONNACK response from the server.
|
||||
def on_connect(client, userdata, flags, reason_code, properties):
|
||||
# Subscribing in on_connect() means that if we lose the connection and
|
||||
@@ -340,149 +457,123 @@ def on_message(client, userdata, msg):
|
||||
model = d['model']
|
||||
if model in sensor_names:
|
||||
if model == 'pool':
|
||||
test_value = d['rows'][0]['data']
|
||||
if not test_value.startswith('aaaaaa'):
|
||||
# Extract hex data from rows array
|
||||
if not d.get('rows') or len(d['rows']) == 0:
|
||||
if LOG_MALFORMED_HEX:
|
||||
malformed_hex_logger.info(f"Pool message missing rows: {d}")
|
||||
return
|
||||
else:
|
||||
# Decode the hex-encoded binary payload and scan for a plausible struct
|
||||
hex_data = test_value[6:] # Remove 'aaaaaa' prefix
|
||||
try:
|
||||
byte_data = bytes.fromhex(hex_data)
|
||||
except ValueError:
|
||||
if LOG_MALFORMED_HEX:
|
||||
malformed_hex_logger.info(f"Invalid hex: {hex_data}")
|
||||
print(f"Invalid hex data: {hex_data}")
|
||||
print(d)
|
||||
warte = ''
|
||||
return
|
||||
|
||||
# Attempt to parse the radio frame first (preamble/sync/header/data/crc)
|
||||
frame = parse_radio_frame(byte_data)
|
||||
if frame and frame.get('data'):
|
||||
print(
|
||||
f"Parsed radio frame: netId={frame.get('network_id')}, len={frame['payload_len']}, "
|
||||
f"dest={frame['dest_id']}, sender={frame['sender_id']}, ctl={frame['ctl']}, crc={frame['crc_bytes'].hex()}"
|
||||
)
|
||||
candidate_bytes = frame['data']
|
||||
else:
|
||||
# Fallback: Drop optional leading 0xAA bytes from hardware and use raw stream
|
||||
if LOG_MALFORMED_HEX and not frame:
|
||||
malformed_hex_logger.info(f"Frame parse failed: {byte_data.hex()}")
|
||||
tmp = byte_data
|
||||
while tmp.startswith(b"\xaa"):
|
||||
tmp = tmp[1:]
|
||||
candidate_bytes = tmp
|
||||
hex_data = d['rows'][0].get('data')
|
||||
if not hex_data:
|
||||
if LOG_MALFORMED_HEX:
|
||||
malformed_hex_logger.info(f"Pool message missing data: {d}")
|
||||
return
|
||||
|
||||
print(f"Raw bytes ({len(byte_data)}): {byte_data.hex()}")
|
||||
print(f"Candidate payload for app decode ({len(candidate_bytes)}): {candidate_bytes.hex()}")
|
||||
|
||||
# Decode payload struct with magic bytes and CRC
|
||||
# struct: magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc
|
||||
PAYLOAD_SIZE = 15
|
||||
MAGIC1 = 0x42
|
||||
MAGIC2 = 0x99
|
||||
|
||||
def calculate_crc8(data):
|
||||
"""Simple XOR checksum"""
|
||||
c = 0
|
||||
for byte in data:
|
||||
c ^= byte
|
||||
return c
|
||||
|
||||
# Scan for magic bytes within candidate payload
|
||||
magic_offset = -1
|
||||
for i in range(len(candidate_bytes) - 1):
|
||||
if candidate_bytes[i] == MAGIC1 and candidate_bytes[i+1] == MAGIC2:
|
||||
magic_offset = i
|
||||
break
|
||||
|
||||
if magic_offset == -1:
|
||||
if LOG_MALFORMED_HEX:
|
||||
malformed_hex_logger.info(f"Magic bytes not found: {candidate_bytes.hex()}")
|
||||
print(f"Magic bytes {MAGIC1:02x} {MAGIC2:02x} not found in payload")
|
||||
elif len(candidate_bytes) - magic_offset < PAYLOAD_SIZE:
|
||||
print(f"Payload too short after magic bytes: {len(candidate_bytes) - magic_offset} bytes (need {PAYLOAD_SIZE})")
|
||||
else:
|
||||
try:
|
||||
# Extract payload starting from magic bytes
|
||||
payload_data = candidate_bytes[magic_offset:magic_offset + PAYLOAD_SIZE]
|
||||
|
||||
# Unpack: BBBBHhhhHB = magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc
|
||||
magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack('<BBBBHhhhHB', payload_data)
|
||||
|
||||
print(f"Found magic bytes at offset {magic_offset}")
|
||||
|
||||
# Verify CRC
|
||||
crc_calculated = calculate_crc8(payload_data[:PAYLOAD_SIZE-1])
|
||||
if crc_received != crc_calculated:
|
||||
print(f"CRC mismatch! Received: {crc_received:02x}, Calculated: {crc_calculated:02x}")
|
||||
print("Data may be corrupted - displaying anyway:")
|
||||
|
||||
# Convert to human-readable values
|
||||
parsed_struct = {
|
||||
"version": version,
|
||||
"nodeId": nodeId,
|
||||
"sequence": seq,
|
||||
"t_ds_c": t_ds10 / 10.0,
|
||||
"t_bme_c": t_bme10 / 10.0,
|
||||
"humidity": hum10 / 10.0,
|
||||
"pressure_hpa": pres1 / 10.0,
|
||||
"crc_valid": crc_received == crc_calculated
|
||||
}
|
||||
print(f"Decoded payload: {json.dumps(parsed_struct, indent=2)}")
|
||||
|
||||
# Create two separate sensor messages matching the standard format
|
||||
original_time = d.get('time', datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S'))
|
||||
|
||||
# Message 1: BME280 sensor (temp, humidity, pressure)
|
||||
bme_msg = {
|
||||
'time': original_time,
|
||||
'model': 'pool',
|
||||
'id': nodeId * 10 + 1, # e.g., nodeId=1 -> id=11 for BME
|
||||
'battery_ok': 1,
|
||||
'temperature_C': parsed_struct['t_bme_c'],
|
||||
'humidity': parsed_struct['humidity'],
|
||||
'pressure_rel': parsed_struct['pressure_hpa'],
|
||||
'mic': 'CRC' if parsed_struct['crc_valid'] else 'CHECKSUM'
|
||||
}
|
||||
|
||||
# Message 2: DS18B20 sensor (temp only)
|
||||
ds_msg = {
|
||||
'time': original_time,
|
||||
'model': 'pool',
|
||||
'id': nodeId * 10 + 2, # e.g., nodeId=1 -> id=12 for DS
|
||||
'battery_ok': 1,
|
||||
'temperature_C': parsed_struct['t_ds_c'],
|
||||
'mic': 'CRC' if parsed_struct['crc_valid'] else 'CHECKSUM'
|
||||
}
|
||||
|
||||
# Process both messages through the existing logic
|
||||
for msg_data in [bme_msg, ds_msg]:
|
||||
print(f"Received message from {msg_data['model']}: \n {msg_data}")
|
||||
sensor_id = msg_data['id']
|
||||
sensor_key = f"{msg_data['model']}_{sensor_id}"
|
||||
|
||||
if sensor_key not in seen_messages.keys():
|
||||
seen_messages[sensor_key] = [msg_data]
|
||||
else:
|
||||
seen_messages[sensor_key].append(msg_data)
|
||||
|
||||
except struct.error as e:
|
||||
print(f"Struct unpack error: {e}")
|
||||
# Strip optional 'aaaaaa' prefix if present (old format)
|
||||
if hex_data.startswith('aaaaaa'):
|
||||
hex_data = hex_data[6:]
|
||||
|
||||
try:
|
||||
byte_data = bytes.fromhex(hex_data)
|
||||
except ValueError:
|
||||
if LOG_MALFORMED_HEX:
|
||||
malformed_hex_logger.info(f"Invalid hex: {hex_data}")
|
||||
print(f"Invalid hex data: {hex_data}")
|
||||
print(d)
|
||||
warte = ''
|
||||
return
|
||||
print(f"Received message from {model}: \n {d}")
|
||||
id = d['id']
|
||||
if model == 'Bresser-6in1':
|
||||
if d['flags'] == 0:
|
||||
if 'rain_mm' in d.keys():
|
||||
del d['rain_mm']
|
||||
sensor_key = f'{model}_{id}'
|
||||
if sensor_key not in seen_messages.keys():
|
||||
seen_messages[sensor_key] = [d]
|
||||
|
||||
# Attempt to parse the radio frame first (preamble/sync/header/data/crc)
|
||||
frame = parse_radio_frame(byte_data)
|
||||
if frame and frame.get('data'):
|
||||
print(
|
||||
f"Parsed radio frame: netId={frame.get('network_id')}, len={frame['payload_len']}, "
|
||||
f"dest={frame['dest_id']}, sender={frame['sender_id']}, ctl={frame['ctl']}, crc={frame['crc_bytes'].hex()}"
|
||||
)
|
||||
candidate_bytes = frame['data']
|
||||
else:
|
||||
# Fallback: Drop optional leading 0xAA bytes from hardware and use raw stream
|
||||
if LOG_MALFORMED_HEX and not frame:
|
||||
malformed_hex_logger.info(f"Frame parse failed: {byte_data.hex()}")
|
||||
tmp = byte_data
|
||||
while tmp.startswith(b"\xaa"):
|
||||
tmp = tmp[1:]
|
||||
candidate_bytes = tmp
|
||||
|
||||
print(f"Raw bytes ({len(byte_data)}): {byte_data.hex()}")
|
||||
print(f"Candidate payload for app decode ({len(candidate_bytes)}): {candidate_bytes.hex()}")
|
||||
|
||||
# Decode payload by sliding-window detection (magic bytes may be missing)
|
||||
expected_seq = None
|
||||
if new_data_queue:
|
||||
try:
|
||||
expected_seq = (new_data_queue[-1].get("sequence") or 0) + 1
|
||||
except Exception:
|
||||
expected_seq = None
|
||||
|
||||
decoded = decode_pool_payload(candidate_bytes, expected_seq=expected_seq)
|
||||
|
||||
if not decoded:
|
||||
if LOG_MALFORMED_HEX:
|
||||
malformed_hex_logger.info(f"No valid payload found: {candidate_bytes.hex()}")
|
||||
print("No valid pool payload found in candidate bytes")
|
||||
warte = ''
|
||||
return
|
||||
|
||||
print(
|
||||
f"Decoded payload at offset {decoded['offset']}: seq={decoded['sequence']}, "
|
||||
f"t_ds={decoded['t_ds_c']}C, t_bme={decoded['t_bme_c']}C, "
|
||||
f"hum={decoded['humidity']}%, pres={decoded['pressure_hpa']}hPa, "
|
||||
f"magic_ok={decoded['magic_ok']}"
|
||||
)
|
||||
|
||||
original_time = d.get('time', datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S'))
|
||||
|
||||
bme_msg = {
|
||||
'time': original_time,
|
||||
'model': 'pool',
|
||||
'id': decoded['nodeId'] * 10 + 1,
|
||||
'battery_ok': 1,
|
||||
'temperature_C': decoded['t_bme_c'],
|
||||
'humidity': decoded['humidity'],
|
||||
'pressure_rel': decoded['pressure_hpa'],
|
||||
'mic': 'CRC'
|
||||
}
|
||||
|
||||
ds_msg = {
|
||||
'time': original_time,
|
||||
'model': 'pool',
|
||||
'id': decoded['nodeId'] * 10 + 2,
|
||||
'battery_ok': 1,
|
||||
'temperature_C': decoded['t_ds_c'],
|
||||
'mic': 'CRC'
|
||||
}
|
||||
|
||||
for msg_data in [bme_msg, ds_msg]:
|
||||
print(f"Received message from {msg_data['model']}: \n {msg_data}")
|
||||
sensor_id = msg_data['id']
|
||||
sensor_key = f"{msg_data['model']}_{sensor_id}"
|
||||
|
||||
if sensor_key not in seen_messages.keys():
|
||||
seen_messages[sensor_key] = [msg_data]
|
||||
else:
|
||||
seen_messages[sensor_key].append(msg_data)
|
||||
|
||||
warte = ''
|
||||
return
|
||||
else:
|
||||
seen_messages[sensor_key].append(d)
|
||||
# Process non-pool sensors
|
||||
print(f"Received message from {model}: \n {d}")
|
||||
id = d['id']
|
||||
if model == 'Bresser-6in1':
|
||||
if d['flags'] == 0:
|
||||
if 'rain_mm' in d.keys():
|
||||
del d['rain_mm']
|
||||
sensor_key = f'{model}_{id}'
|
||||
if sensor_key not in seen_messages.keys():
|
||||
seen_messages[sensor_key] = [d]
|
||||
else:
|
||||
seen_messages[sensor_key].append(d)
|
||||
|
||||
|
||||
# Define a function to update the data in the database
|
||||
@@ -499,12 +590,12 @@ def update_data(utc_time, mqtt_id, temperature_c, humidity, pressure_rel, batter
|
||||
"gust": gust,
|
||||
"rain_mm": rain_mm
|
||||
}
|
||||
if is_remote_server_available():
|
||||
if ensure_db_connection():
|
||||
new_data_queue.append(values)
|
||||
sync_data()
|
||||
# Data sent - no logging needed for normal operation
|
||||
else:
|
||||
logger.warning(f"{utc_time}: Remote server unavailable - storing locally for {mqtt_id}")
|
||||
logger.warning(f"{utc_time}: Database unavailable - storing locally for {mqtt_id}")
|
||||
save_json_locally(values)
|
||||
|
||||
|
||||
@@ -670,10 +761,38 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b
|
||||
mqtt_name, mqtt_id = mqtt_name_id.split('_', 1) # Use maxsplit=1 to handle IDs with underscores
|
||||
|
||||
# Get the sensor object from the database (with auto-update for battery changes)
|
||||
if not ensure_db_connection():
|
||||
# DB unavailable: stash the data to SQLite and return
|
||||
save_json_locally({
|
||||
'utc_time': utc_time,
|
||||
'mqtt_id': mqtt_name_id,
|
||||
'temperature_c': temperature_c,
|
||||
'humidity': humidity,
|
||||
'pressure_rel': pressure_rel,
|
||||
'battery': battery,
|
||||
'average_speed': average_speed,
|
||||
'direction': direction,
|
||||
'gust': gust,
|
||||
'rain_mm': rain_mm
|
||||
})
|
||||
return
|
||||
|
||||
sensor = get_or_update_sensor(mqtt_name, mqtt_id)
|
||||
|
||||
if not sensor:
|
||||
logger.error(f"Cannot store data for {mqtt_name_id} - sensor not found in database")
|
||||
save_json_locally({
|
||||
'utc_time': utc_time,
|
||||
'mqtt_id': mqtt_name_id,
|
||||
'temperature_c': temperature_c,
|
||||
'humidity': humidity,
|
||||
'pressure_rel': pressure_rel,
|
||||
'battery': battery,
|
||||
'average_speed': average_speed,
|
||||
'direction': direction,
|
||||
'gust': gust,
|
||||
'rain_mm': rain_mm
|
||||
})
|
||||
return
|
||||
|
||||
position = sensor.position
|
||||
@@ -750,8 +869,39 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b
|
||||
else:
|
||||
logger.info(f"{utc_time}: Precipitation value is too high: {rain_mm}")
|
||||
|
||||
# Commit the changes
|
||||
session.commit()
|
||||
try:
|
||||
# Commit the changes
|
||||
session.commit()
|
||||
except exc.SQLAlchemyError as e:
|
||||
logger.error(f"SQLAlchemyError on commit: {e}")
|
||||
session.rollback()
|
||||
save_json_locally({
|
||||
'utc_time': utc_time,
|
||||
'mqtt_id': mqtt_name_id,
|
||||
'temperature_c': temperature_c,
|
||||
'humidity': humidity,
|
||||
'pressure_rel': pressure_rel,
|
||||
'battery': battery,
|
||||
'average_speed': average_speed,
|
||||
'direction': direction,
|
||||
'gust': gust,
|
||||
'rain_mm': rain_mm
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Error on commit: {e}")
|
||||
session.rollback()
|
||||
save_json_locally({
|
||||
'utc_time': utc_time,
|
||||
'mqtt_id': mqtt_name_id,
|
||||
'temperature_c': temperature_c,
|
||||
'humidity': humidity,
|
||||
'pressure_rel': pressure_rel,
|
||||
'battery': battery,
|
||||
'average_speed': average_speed,
|
||||
'direction': direction,
|
||||
'gust': gust,
|
||||
'rain_mm': rain_mm
|
||||
})
|
||||
|
||||
|
||||
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
||||
@@ -851,6 +1001,9 @@ def process_mqtt_messages(seen_messages):
|
||||
"""Process all received MQTT messages dynamically based on database configuration.
|
||||
No hardcoded sensor checks - all sensors are processed based on what's in the database.
|
||||
"""
|
||||
if not ensure_db_connection():
|
||||
# DB not available; skip processing but keep messages cached for later.
|
||||
return
|
||||
for sensor, data in seen_messages.items():
|
||||
if data:
|
||||
# Try to get 'time' from first message, or use None as fallback
|
||||
@@ -911,55 +1064,56 @@ def get_and_delete_json_data():
|
||||
|
||||
# Funktion zum Synchronisieren der Daten
|
||||
def sync_data():
|
||||
if is_remote_server_available():
|
||||
local_data_written = False
|
||||
|
||||
# Zuerst lokal gespeicherte Daten synchronisieren (SQLite Fallback)
|
||||
local_data = get_and_delete_json_data()
|
||||
for data in local_data:
|
||||
try:
|
||||
if isinstance(data, dict) and 'utc_time' in data:
|
||||
# Einzelner Sensor-Eintrag
|
||||
if " UTC" in str(data.get('utc_time', '')):
|
||||
data['utc_time'] = data['utc_time'].replace(" UTC", "")
|
||||
|
||||
store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'),
|
||||
data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1),
|
||||
data.get('average_speed'), data.get('direction'),
|
||||
data.get('gust'), data.get('rain_mm'))
|
||||
if not local_data_written:
|
||||
utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
|
||||
logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB")
|
||||
local_data_written = True
|
||||
except exc.SQLAlchemyError as e:
|
||||
logger.error(f"SQLAlchemyError syncing local data: {e}")
|
||||
session.rollback()
|
||||
# Rette den Datensatz zurück in SQLite
|
||||
save_json_locally(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error syncing local data: {e}")
|
||||
save_json_locally(data)
|
||||
|
||||
# Danach neue Daten aus der Warteschlange synchronisieren
|
||||
while new_data_queue:
|
||||
data = new_data_queue.pop(0)
|
||||
try:
|
||||
if isinstance(data, dict) and 'mqtt_id' in data:
|
||||
store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'],
|
||||
data['pressure_rel'], data['battery'], data['average_speed'], data['direction'],
|
||||
data['gust'], data['rain_mm'])
|
||||
except exc.SQLAlchemyError as e:
|
||||
logger.error(f"SQLAlchemyError: {e}")
|
||||
session.rollback()
|
||||
save_json_locally(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing data: {e}")
|
||||
save_json_locally(data)
|
||||
else:
|
||||
if not ensure_db_connection(force=True):
|
||||
# MariaDB nicht verfügbar - speichere in SQLite
|
||||
while new_data_queue:
|
||||
data = new_data_queue.pop(0)
|
||||
save_json_locally(data)
|
||||
return
|
||||
|
||||
local_data_written = False
|
||||
|
||||
# Zuerst lokal gespeicherte Daten synchronisieren (SQLite Fallback)
|
||||
local_data = get_and_delete_json_data()
|
||||
for data in local_data:
|
||||
try:
|
||||
if isinstance(data, dict) and 'utc_time' in data:
|
||||
# Einzelner Sensor-Eintrag
|
||||
if " UTC" in str(data.get('utc_time', '')):
|
||||
data['utc_time'] = data['utc_time'].replace(" UTC", "")
|
||||
|
||||
store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'),
|
||||
data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1),
|
||||
data.get('average_speed'), data.get('direction'),
|
||||
data.get('gust'), data.get('rain_mm'))
|
||||
if not local_data_written:
|
||||
utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
|
||||
logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB")
|
||||
local_data_written = True
|
||||
except exc.SQLAlchemyError as e:
|
||||
logger.error(f"SQLAlchemyError syncing local data: {e}")
|
||||
session.rollback()
|
||||
# Rette den Datensatz zurück in SQLite
|
||||
save_json_locally(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error syncing local data: {e}")
|
||||
save_json_locally(data)
|
||||
|
||||
# Danach neue Daten aus der Warteschlange synchronisieren
|
||||
while new_data_queue:
|
||||
data = new_data_queue.pop(0)
|
||||
try:
|
||||
if isinstance(data, dict) and 'mqtt_id' in data:
|
||||
store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'],
|
||||
data['pressure_rel'], data['battery'], data['average_speed'], data['direction'],
|
||||
data['gust'], data['rain_mm'])
|
||||
except exc.SQLAlchemyError as e:
|
||||
logger.error(f"SQLAlchemyError: {e}")
|
||||
session.rollback()
|
||||
save_json_locally(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing data: {e}")
|
||||
save_json_locally(data)
|
||||
|
||||
def update_last_transmission_time(sensor, time_value):
|
||||
"""Update last transmission time for a sensor. Uses provided time or falls back to current time if not available."""
|
||||
@@ -1065,16 +1219,31 @@ if __name__ == '__main__':
|
||||
# Start environment file watcher for runtime config changes
|
||||
start_env_watcher()
|
||||
|
||||
# Initial DB check (non-fatal). If unreachable, we will retry in loop.
|
||||
ensure_db_connection(force=True)
|
||||
if db_available:
|
||||
try:
|
||||
sensors = refresh_sensor_cache()
|
||||
build_sensor_lists_from_db()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load sensor configuration after DB connection: {e}")
|
||||
else:
|
||||
logger.warning(f"Starting without database; will cache data locally and retry every {DB_RETRY_SECONDS}s")
|
||||
|
||||
print('start data collection')
|
||||
try:
|
||||
while True:
|
||||
check_last_transmission_time()
|
||||
# if count >= 600:
|
||||
# mqttc.loop_stop()
|
||||
# break
|
||||
# Periodically retry DB connection if currently down
|
||||
if not db_available:
|
||||
if ensure_db_connection(force=True):
|
||||
try:
|
||||
sensors = refresh_sensor_cache()
|
||||
build_sensor_lists_from_db()
|
||||
sync_data() # flush cached data once DB is back
|
||||
except Exception as e:
|
||||
logger.error(f"DB reconnected but failed to refresh caches/sync: {e}")
|
||||
|
||||
# count += 1
|
||||
# print(count)
|
||||
check_last_transmission_time()
|
||||
time.sleep(60)
|
||||
utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
|
||||
|
||||
@@ -1093,14 +1262,12 @@ if __name__ == '__main__':
|
||||
|
||||
if status_lines:
|
||||
logger.info(f"{utc_time}: Last seen:\n" + "\n".join(status_lines))
|
||||
# logger.info(f"{utc_time}: last seen at: {', '.join(f'{k}: {v.strftime('%H:%M:%S')} ({(datetime.now(timezone.utc) - v).total_seconds():.0f} Sekunden ago)' if isinstance(v, datetime) else f'{k}: {v}' for k, v in last_transmission_times.items())}")
|
||||
process_mqtt_messages(seen_messages)
|
||||
if is_remote_server_available():
|
||||
if ensure_db_connection():
|
||||
new_data_queue.append(seen_messages)
|
||||
sync_data()
|
||||
# Data synced - no logging needed for normal operation
|
||||
else:
|
||||
logger.warning(f"{utc_time}: Remote server unavailable - storing batch locally")
|
||||
logger.warning(f"{utc_time}: Database unavailable - storing batch locally")
|
||||
save_json_locally(seen_messages)
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Shutting down gracefully...")
|
||||
|
||||
@@ -17,3 +17,10 @@ services:
|
||||
- SSH_KEY_PATH=/workspace/.ssh/id_rsa
|
||||
- STALL_WINDOW_SECONDS=${STALL_WINDOW_SECONDS:-300}
|
||||
- RESTART_COOLDOWN_SECONDS=${RESTART_COOLDOWN_SECONDS:-3600}
|
||||
# DB override for dev testing; defaults to an invalid host to simulate DB-down
|
||||
- DB_HOST=${DB_HOST:-invalid-host}
|
||||
- DB_PORT=${DB_PORT:-3306}
|
||||
- DB_USER=${DB_USER:-weatherdata}
|
||||
- DB_PASSWORD=${DB_PASSWORD:-cfCU$swM!HfK82%*}
|
||||
- DB_NAME=${DB_NAME:-weatherdata}
|
||||
- DB_CONNECT_TIMEOUT=${DB_CONNECT_TIMEOUT:-5}
|
||||
|
||||
154
test_pool_decode.py
Normal file
154
test_pool_decode.py
Normal file
@@ -0,0 +1,154 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Test pool payload decoding with the new MQTT message format"""
|
||||
|
||||
import json
|
||||
import struct
|
||||
from typing import Optional
|
||||
|
||||
PAYLOAD_SIZE = 15
|
||||
MAGIC1 = 0x42
|
||||
MAGIC2 = 0x99
|
||||
|
||||
|
||||
def crc8_xor(data: bytes) -> int:
|
||||
"""Simple XOR checksum used by the pool payload."""
|
||||
c = 0
|
||||
for b in data:
|
||||
c ^= b
|
||||
return c
|
||||
|
||||
|
||||
def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = None):
|
||||
"""Scan a byte stream for a plausible pool payload.
|
||||
|
||||
Slides a 15-byte window, validates with CRC, version/nodeId, and range checks,
|
||||
and scores candidates. Returns the best decoded dict or None.
|
||||
"""
|
||||
# Drop leading preamble (0xAA) if present
|
||||
while candidate_bytes.startswith(b"\xaa"):
|
||||
candidate_bytes = candidate_bytes[1:]
|
||||
|
||||
best = None
|
||||
best_score = -1
|
||||
|
||||
for offset in range(0, len(candidate_bytes) - PAYLOAD_SIZE + 1):
|
||||
chunk = candidate_bytes[offset:offset + PAYLOAD_SIZE]
|
||||
try:
|
||||
magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack(
|
||||
'<BBBBHhhHHB', chunk
|
||||
)
|
||||
except struct.error:
|
||||
continue
|
||||
|
||||
crc_calculated = crc8_xor(chunk[:-1])
|
||||
if crc_calculated != crc_received:
|
||||
continue
|
||||
|
||||
if version != 1 or nodeId != 1:
|
||||
continue
|
||||
|
||||
# Plausibility checks (unit scaled)
|
||||
if not (-300 <= t_ds10 <= 600): # -30.0 to 60.0°C
|
||||
continue
|
||||
if not (-300 <= t_bme10 <= 600):
|
||||
continue
|
||||
if not (0 <= hum10 <= 1000): # 0.0–100.0%
|
||||
continue
|
||||
if not (8000 <= pres1 <= 11000): # 800.0–1100.0 hPa
|
||||
continue
|
||||
|
||||
score = 0
|
||||
if magic1 == MAGIC1 and magic2 == MAGIC2:
|
||||
score += 2
|
||||
if expected_seq is not None and seq == expected_seq:
|
||||
score += 1
|
||||
# CRC already validated; reward shorter offset to prefer first valid
|
||||
score -= offset * 0.001
|
||||
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best = {
|
||||
"offset": offset,
|
||||
"magic_ok": magic1 == MAGIC1 and magic2 == MAGIC2,
|
||||
"version": version,
|
||||
"nodeId": nodeId,
|
||||
"sequence": seq,
|
||||
"t_ds_c": t_ds10 / 10.0,
|
||||
"t_bme_c": t_bme10 / 10.0,
|
||||
"humidity": hum10 / 10.0,
|
||||
"pressure_hpa": pres1 / 10.0,
|
||||
"crc_valid": True,
|
||||
}
|
||||
|
||||
return best
|
||||
|
||||
|
||||
# Test with the actual MQTT message
|
||||
mqtt_message = {
|
||||
"time": "2025-12-27T13:26:47",
|
||||
"model": "pool",
|
||||
"count": 1,
|
||||
"num_rows": 1,
|
||||
"rows": [
|
||||
{
|
||||
"len": 143,
|
||||
"data": "429901013400a801f6002b0294272a000000"
|
||||
}
|
||||
],
|
||||
"codes": ["{143}429901013400a801f6002b0294272a000000"]
|
||||
}
|
||||
|
||||
print("Testing pool payload decode with new MQTT format:")
|
||||
print(f"MQTT message: {json.dumps(mqtt_message, indent=2)}")
|
||||
print()
|
||||
|
||||
hex_data = mqtt_message['rows'][0]['data']
|
||||
print(f"Hex data from rows[0]['data']: {hex_data}")
|
||||
print(f"Hex data length: {len(hex_data)} chars ({len(hex_data)//2} bytes)")
|
||||
|
||||
# Strip 'aaaaaa' prefix if present
|
||||
if hex_data.startswith('aaaaaa'):
|
||||
hex_data = hex_data[6:]
|
||||
print(f"Stripped 'aaaaaa' prefix, remaining: {hex_data}")
|
||||
|
||||
byte_data = bytes.fromhex(hex_data)
|
||||
print(f"Byte data: {byte_data.hex()}")
|
||||
print()
|
||||
|
||||
# Decode with sliding window
|
||||
decoded = decode_pool_payload(byte_data)
|
||||
|
||||
if decoded:
|
||||
print("✓ Payload decoded successfully!")
|
||||
print(json.dumps(decoded, indent=2))
|
||||
|
||||
print()
|
||||
print("Generated sensor messages:")
|
||||
|
||||
bme_msg = {
|
||||
'time': mqtt_message['time'],
|
||||
'model': 'pool',
|
||||
'id': decoded['nodeId'] * 10 + 1,
|
||||
'battery_ok': 1,
|
||||
'temperature_C': decoded['t_bme_c'],
|
||||
'humidity': decoded['humidity'],
|
||||
'pressure_rel': decoded['pressure_hpa'],
|
||||
'mic': 'CRC'
|
||||
}
|
||||
|
||||
ds_msg = {
|
||||
'time': mqtt_message['time'],
|
||||
'model': 'pool',
|
||||
'id': decoded['nodeId'] * 10 + 2,
|
||||
'battery_ok': 1,
|
||||
'temperature_C': decoded['t_ds_c'],
|
||||
'mic': 'CRC'
|
||||
}
|
||||
|
||||
print("BME280 message:")
|
||||
print(json.dumps(bme_msg, indent=2))
|
||||
print()
|
||||
print("DS18B20 message:")
|
||||
print(json.dumps(ds_msg, indent=2))
|
||||
else:
|
||||
print("✗ Failed to decode payload!")
|
||||
Reference in New Issue
Block a user