Files
weatherstation-datacollector/test_pool_decode.py

132 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Test pool payload decoding with the new MQTT message format"""
import json
import struct
from typing import Optional, Tuple, Dict
PAYLOAD_SIZE = 15
MAGIC1 = 0x42
MAGIC2 = 0x99
def crc8_xor(data: bytes) -> int:
"""Simple XOR checksum used by the pool payload."""
c = 0
for b in data:
c ^= b
return c
def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = None):
"""Scan a byte stream for a plausible pool payload.
Slides a 15-byte window, validates with CRC, version/nodeId, and range checks,
and scores candidates. Returns the best decoded dict or None.
"""
# Drop leading preamble (0xAA) if present
while candidate_bytes.startswith(b"\xaa"):
candidate_bytes = candidate_bytes[1:]
best = None
best_score = -1
for offset in range(0, len(candidate_bytes) - PAYLOAD_SIZE + 1):
chunk = candidate_bytes[offset:offset + PAYLOAD_SIZE]
try:
magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack(
'<BBBBHhhHHB', chunk
)
except struct.error:
continue
crc_calculated = crc8_xor(chunk[:-1])
if crc_calculated != crc_received:
continue
if version != 1 or nodeId != 1:
continue
# Plausibility checks (unit scaled)
if not (-300 <= t_ds10 <= 600): # -30.0 to 60.0°C
continue
if not (-300 <= t_bme10 <= 600):
continue
if not (0 <= hum10 <= 1000): # 0.0100.0%
continue
if not (8000 <= pres1 <= 11000): # 800.01100.0 hPa
continue
score = 0
if magic1 == MAGIC1 and magic2 == MAGIC2:
score += 2
if expected_seq is not None and seq == expected_seq:
score += 1
# CRC already validated; reward shorter offset to prefer first valid
score -= offset * 0.001
if score > best_score:
best_score = score
best = {
"offset": offset,
"magic_ok": magic1 == MAGIC1 and magic2 == MAGIC2,
"version": version,
"nodeId": nodeId,
"sequence": seq,
"t_ds_c": t_ds10 / 10.0,
"t_bme_c": t_bme10 / 10.0,
"humidity": hum10 / 10.0,
"pressure_hpa": pres1 / 10.0,
"crc_valid": True,
}
return best
def build_payload(seq: int, t_ds10: int, t_bme10: int, hum10: int, pres1: int) -> bytes:
"""Build a valid payload with CRC appended."""
header = struct.pack('<BBBBHhhHH', MAGIC1, MAGIC2, 1, 1, seq, t_ds10, t_bme10, hum10, pres1)
crc = crc8_xor(header)
return header + bytes([crc])
def extract_pool_candidate_bytes(raw_bytes: bytes) -> Tuple[bytes, Dict[str, str]]:
"""Mirror runtime extraction: strip 0xAA preamble and optional sync bytes."""
trimmed = raw_bytes
while trimmed.startswith(b"\xaa"):
trimmed = trimmed[1:]
for sync in (b"\x39\x14", b"\xd3\x91"):
idx = trimmed.find(sync)
if idx != -1 and idx + len(sync) < len(trimmed):
return trimmed[idx + len(sync):], {"source": "sync", "offset": idx, "sync": sync.hex()}
return trimmed, {"source": "raw"}
def demo_decode(hex_stream: str, label: str):
print(f"\n--- {label} ---")
byte_stream = bytes.fromhex(hex_stream)
candidate_bytes, meta = extract_pool_candidate_bytes(byte_stream)
print(f"candidate source={meta['source']}, len={len(candidate_bytes)}, hex={candidate_bytes.hex()}")
decoded = decode_pool_payload(candidate_bytes)
if decoded:
print("✓ Decoded:")
print(json.dumps(decoded, indent=2))
else:
print("✗ Failed to decode")
# Construct a known-good payload and embed it in the new frame layout (aa preamble + sync 0x39 0x14)
payload = build_payload(seq=0x013d, t_ds10=231, t_bme10=223, hum10=550, pres1=10123)
preamble = b"\xaa" * 8
sync = b"\x39\x14"
new_frame_hex = (preamble + sync + payload).hex()
# Legacy frame: just the payload bytes
legacy_hex = payload.hex()
print("Testing pool payload decoding for both hardware generations")
demo_decode(new_frame_hex, "New hardware (preamble + sync)")
demo_decode(legacy_hex, "Legacy hardware (bare payload)")