From d1c1f63cb971ed37d079a73022970c6e1a60b3f4 Mon Sep 17 00:00:00 2001 From: olaf Date: Thu, 8 Jan 2026 21:26:05 +0000 Subject: [PATCH] Updating to new sending format of diy-sender --- data_tables.py | 1 + datacollector.py | 137 +++++++++++++++++++++++++++----------------- test_pool_decode.py | 105 +++++++++++++-------------------- 3 files changed, 125 insertions(+), 118 deletions(-) diff --git a/data_tables.py b/data_tables.py index d533c6a..4ef483a 100644 --- a/data_tables.py +++ b/data_tables.py @@ -19,6 +19,7 @@ class Sensor(Base): last_rain_value = Column(Float, default=0.0) # Last reported rain value (for reset detection) node_id = Column(Integer, nullable=True) # For pool sensors: the nodeId (1, 2, etc.) that generates mqtt_id sensor_type = Column(String(50), nullable=True) # Sensor type: 'BME280', 'DS18B20', 'Bresser-6in1', etc. + last_contact = Column(DateTime, default=lambda: datetime.now(timezone.utc), nullable=True) # Last transmission received from sensor # Define the TemperatureInside table class TemperatureInside(Base): diff --git a/datacollector.py b/datacollector.py index 41c3ac1..57e271a 100644 --- a/datacollector.py +++ b/datacollector.py @@ -193,6 +193,46 @@ def parse_radio_frame(byte_data): if not byte_data: return None + try: + sync_index = byte_data.find(b"\x2d") + if sync_index == -1: + return None + + if sync_index + 1 >= len(byte_data): + # No room for networkId byte + return None + + network_id = byte_data[sync_index + 1] + sync_len = 2 + + header_start = sync_index + sync_len + if header_start + 4 > len(byte_data): + return None + + payload_len, dest_id, sender_id, ctl = struct.unpack_from(' len(byte_data): + # Not enough bytes for data + 2-byte CRC + return None + + data = byte_data[data_start:data_end] + crc_bytes = byte_data[data_end:data_end + 2] + + return { + 'data': data, + 'payload_len': payload_len, + 'dest_id': dest_id, + 'sender_id': sender_id, + 'ctl': ctl, + 'network_id': network_id, + 'crc_bytes': crc_bytes, + 'sync_index': sync_index, + 'sync_len': sync_len, + } + except Exception: + return None + PAYLOAD_SIZE = 15 # bytes in pool payload MAGIC1 = 0x42 @@ -271,46 +311,35 @@ def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = No return best - try: - # Find sync 0x2D followed by networkId (second byte) - sync_index = byte_data.find(b"\x2d") - if sync_index == -1: - return None - if sync_index + 1 >= len(byte_data): - # No room for networkId byte - return None +def extract_pool_candidate_bytes(raw_bytes: bytes): + """Return payload bytes for pool sensors, handling legacy and new radio framing. - network_id = byte_data[sync_index + 1] - sync_len = 2 + Tries legacy framed parsing first, then strips a 0xAA preamble and optional + sync bytes (0x39 0x14) used by the new hardware. Falls back to the stripped + raw stream so old payloads continue to work. + """ + if not raw_bytes: + return b"", {"source": "empty"} - header_start = sync_index + sync_len - if header_start + 4 > len(byte_data): - return None + # Legacy framed format (rarely used but kept for compatibility) + frame = parse_radio_frame(raw_bytes) + if frame and frame.get('data'): + return frame['data'], {"source": "legacy_frame", "network_id": frame.get('network_id')} - payload_len, dest_id, sender_id, ctl = struct.unpack_from(' len(byte_data): - # Not enough bytes for data + 2-byte CRC - return None + trimmed = raw_bytes + while trimmed.startswith(b"\xaa"): + trimmed = trimmed[1:] - data = byte_data[data_start:data_end] - crc_bytes = byte_data[data_end:data_end + 2] + # New hardware emits sync bytes; support both 0x39 0x14 and 0xD3 0x91 variants + sync_patterns = [b"\x39\x14", b"\xd3\x91"] + for sync in sync_patterns: + idx = trimmed.find(sync) + if idx != -1 and idx + len(sync) < len(trimmed): + return trimmed[idx + len(sync):], {"source": "sync", "offset": idx, "sync": sync.hex()} - return { - 'data': data, - 'payload_len': payload_len, - 'dest_id': dest_id, - 'sender_id': sender_id, - 'ctl': ctl, - 'network_id': network_id, - 'crc_bytes': crc_bytes, - 'sync_index': sync_index, - 'sync_len': sync_len, - } - except Exception: - return None + # Fallback: use stripped raw stream (old hardware behaviour) + return trimmed, {"source": "raw"} def refresh_sensor_cache(): """Refresh the sensor cache from database""" @@ -469,9 +498,15 @@ def on_message(client, userdata, msg): malformed_hex_logger.info(f"Pool message missing data: {d}") return - # Strip optional 'aaaaaa' prefix if present (old format) - if hex_data.startswith('aaaaaa'): - hex_data = hex_data[6:] + # Strip any length of 'aa' preamble (old/new formats) + while hex_data.startswith('aa'): + hex_data = hex_data[2:] + + # Some rtl_433 captures occasionally lose the final nibble; drop it to keep hex even-length + if len(hex_data) % 2 == 1: + if LOG_MALFORMED_HEX: + malformed_hex_logger.info(f"Trimming odd-length hex (dropped last nibble): {hex_data}") + hex_data = hex_data[:-1] try: byte_data = bytes.fromhex(hex_data) @@ -483,25 +518,15 @@ def on_message(client, userdata, msg): warte = '' return - # Attempt to parse the radio frame first (preamble/sync/header/data/crc) - frame = parse_radio_frame(byte_data) - if frame and frame.get('data'): - print( - f"Parsed radio frame: netId={frame.get('network_id')}, len={frame['payload_len']}, " - f"dest={frame['dest_id']}, sender={frame['sender_id']}, ctl={frame['ctl']}, crc={frame['crc_bytes'].hex()}" - ) - candidate_bytes = frame['data'] - else: - # Fallback: Drop optional leading 0xAA bytes from hardware and use raw stream - if LOG_MALFORMED_HEX and not frame: - malformed_hex_logger.info(f"Frame parse failed: {byte_data.hex()}") - tmp = byte_data - while tmp.startswith(b"\xaa"): - tmp = tmp[1:] - candidate_bytes = tmp + candidate_bytes, candidate_meta = extract_pool_candidate_bytes(byte_data) + if LOG_MALFORMED_HEX and candidate_meta.get("source") == "raw": + malformed_hex_logger.info(f"Pool using raw bytes (no sync match): {byte_data.hex()}") print(f"Raw bytes ({len(byte_data)}): {byte_data.hex()}") - print(f"Candidate payload for app decode ({len(candidate_bytes)}): {candidate_bytes.hex()}") + print( + f"Candidate payload ({len(candidate_bytes)}), source={candidate_meta.get('source')}: " + f"{candidate_bytes.hex()}" + ) # Decode payload by sliding-window detection (magic bytes may be missing) expected_seq = None @@ -799,6 +824,10 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b # Update the sensor's battery level sensor.battery = battery + + # Update last contact time for pool sensors + if mqtt_name == 'pool': + sensor.last_contact = datetime.now(timezone.utc) # Update the temperature data if temperature_c is not None: diff --git a/test_pool_decode.py b/test_pool_decode.py index 546316d..a2df765 100644 --- a/test_pool_decode.py +++ b/test_pool_decode.py @@ -3,7 +3,7 @@ import json import struct -from typing import Optional +from typing import Optional, Tuple, Dict PAYLOAD_SIZE = 15 MAGIC1 = 0x42 @@ -83,72 +83,49 @@ def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = No return best -# Test with the actual MQTT message -mqtt_message = { - "time": "2025-12-27T13:26:47", - "model": "pool", - "count": 1, - "num_rows": 1, - "rows": [ - { - "len": 143, - "data": "429901013400a801f6002b0294272a000000" - } - ], - "codes": ["{143}429901013400a801f6002b0294272a000000"] -} +def build_payload(seq: int, t_ds10: int, t_bme10: int, hum10: int, pres1: int) -> bytes: + """Build a valid payload with CRC appended.""" + header = struct.pack(' Tuple[bytes, Dict[str, str]]: + """Mirror runtime extraction: strip 0xAA preamble and optional sync bytes.""" + trimmed = raw_bytes + while trimmed.startswith(b"\xaa"): + trimmed = trimmed[1:] -# Strip 'aaaaaa' prefix if present -if hex_data.startswith('aaaaaa'): - hex_data = hex_data[6:] - print(f"Stripped 'aaaaaa' prefix, remaining: {hex_data}") + for sync in (b"\x39\x14", b"\xd3\x91"): + idx = trimmed.find(sync) + if idx != -1 and idx + len(sync) < len(trimmed): + return trimmed[idx + len(sync):], {"source": "sync", "offset": idx, "sync": sync.hex()} -byte_data = bytes.fromhex(hex_data) -print(f"Byte data: {byte_data.hex()}") -print() + return trimmed, {"source": "raw"} -# Decode with sliding window -decoded = decode_pool_payload(byte_data) -if decoded: - print("✓ Payload decoded successfully!") - print(json.dumps(decoded, indent=2)) - - print() - print("Generated sensor messages:") - - bme_msg = { - 'time': mqtt_message['time'], - 'model': 'pool', - 'id': decoded['nodeId'] * 10 + 1, - 'battery_ok': 1, - 'temperature_C': decoded['t_bme_c'], - 'humidity': decoded['humidity'], - 'pressure_rel': decoded['pressure_hpa'], - 'mic': 'CRC' - } - - ds_msg = { - 'time': mqtt_message['time'], - 'model': 'pool', - 'id': decoded['nodeId'] * 10 + 2, - 'battery_ok': 1, - 'temperature_C': decoded['t_ds_c'], - 'mic': 'CRC' - } - - print("BME280 message:") - print(json.dumps(bme_msg, indent=2)) - print() - print("DS18B20 message:") - print(json.dumps(ds_msg, indent=2)) -else: - print("✗ Failed to decode payload!") +def demo_decode(hex_stream: str, label: str): + print(f"\n--- {label} ---") + byte_stream = bytes.fromhex(hex_stream) + candidate_bytes, meta = extract_pool_candidate_bytes(byte_stream) + print(f"candidate source={meta['source']}, len={len(candidate_bytes)}, hex={candidate_bytes.hex()}") + decoded = decode_pool_payload(candidate_bytes) + if decoded: + print("✓ Decoded:") + print(json.dumps(decoded, indent=2)) + else: + print("✗ Failed to decode") + + +# Construct a known-good payload and embed it in the new frame layout (aa preamble + sync 0x39 0x14) +payload = build_payload(seq=0x013d, t_ds10=231, t_bme10=223, hum10=550, pres1=10123) +preamble = b"\xaa" * 8 +sync = b"\x39\x14" +new_frame_hex = (preamble + sync + payload).hex() + +# Legacy frame: just the payload bytes +legacy_hex = payload.hex() + +print("Testing pool payload decoding for both hardware generations") +demo_decode(new_frame_hex, "New hardware (preamble + sync)") +demo_decode(legacy_hex, "Legacy hardware (bare payload)")