#!/usr/bin/env python3 """Test pool payload decoding with the new MQTT message format""" import json import struct from typing import Optional, Tuple, Dict PAYLOAD_SIZE = 15 MAGIC1 = 0x42 MAGIC2 = 0x99 RESET_FLAG_MAP = [ (0x1, "PORF (power-on)"), (0x2, "EXTRF (external reset)"), (0x4, "BORF (brown-out)"), (0x8, "WDRF (watchdog)"), ] def crc8_xor(data: bytes) -> int: """Simple XOR checksum used by the pool payload.""" c = 0 for b in data: c ^= b return c def parse_version_and_reset_flags(version_byte: int): protocol_version = version_byte & 0x0F reset_flags = (version_byte >> 4) & 0x0F reset_causes = [desc for bit, desc in RESET_FLAG_MAP if reset_flags & bit] return protocol_version, reset_flags, reset_causes def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = None): """Scan a byte stream for a plausible pool payload. Slides a 15-byte window, validates with CRC, version/nodeId, and range checks, and scores candidates. Returns the best decoded dict or None. """ # Drop leading preamble (0xAA) if present while candidate_bytes.startswith(b"\xaa"): candidate_bytes = candidate_bytes[1:] best = None best_score = -1 for offset in range(0, len(candidate_bytes) - PAYLOAD_SIZE + 1): chunk = candidate_bytes[offset:offset + PAYLOAD_SIZE] try: magic1, magic2, version_byte, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack( ' best_score: best_score = score best = { "offset": offset, "magic_ok": magic1 == MAGIC1 and magic2 == MAGIC2, "version": protocol_version, "version_byte": version_byte, "reset_flags": reset_flags, "reset_causes": reset_causes, "nodeId": nodeId, "sequence": seq, "t_ds_c": t_ds10 / 10.0, "t_bme_c": t_bme10 / 10.0, "humidity": hum10 / 10.0, "pressure_hpa": pres1 / 10.0, "crc_valid": True, } return best def build_payload(seq: int, t_ds10: int, t_bme10: int, hum10: int, pres1: int, reset_flags: int = 0) -> bytes: """Build a valid payload with CRC appended.""" version_byte = ((reset_flags & 0x0F) << 4) | 0x01 header = struct.pack(' Tuple[bytes, Dict[str, str]]: """Mirror runtime extraction: strip 0xAA preamble and optional sync bytes.""" trimmed = raw_bytes while trimmed.startswith(b"\xaa"): trimmed = trimmed[1:] for sync in (b"\x39\x14", b"\xd3\x91"): idx = trimmed.find(sync) if idx != -1 and idx + len(sync) < len(trimmed): return trimmed[idx + len(sync):], {"source": "sync", "offset": idx, "sync": sync.hex()} return trimmed, {"source": "raw"} def demo_decode(hex_stream: str, label: str): print(f"\n--- {label} ---") byte_stream = bytes.fromhex(hex_stream) candidate_bytes, meta = extract_pool_candidate_bytes(byte_stream) print(f"candidate source={meta['source']}, len={len(candidate_bytes)}, hex={candidate_bytes.hex()}") decoded = decode_pool_payload(candidate_bytes) if decoded: print("✓ Decoded:") print(json.dumps(decoded, indent=2)) else: print("✗ Failed to decode") # Construct a known-good payload and embed it in the new frame layout (aa preamble + sync 0x39 0x14) payload = build_payload(seq=0x013d, t_ds10=231, t_bme10=223, hum10=550, pres1=10123) preamble = b"\xaa" * 8 sync = b"\x39\x14" new_frame_hex = (preamble + sync + payload).hex() # Legacy frame: just the payload bytes legacy_hex = payload.hex() print("Testing pool payload decoding for both hardware generations") demo_decode(new_frame_hex, "New hardware (preamble + sync)") demo_decode(legacy_hex, "Legacy hardware (bare payload)")