#!/usr/bin/env python3 """ Minimal CC1101 GFSK receiver @868.3 MHz Optimized for sensitivity: 1.2 kBaud, 5 kHz deviation, 58 kHz RX BW Prints timestamp, RSSI, payload (hex) on each packet. """ import time import struct import json from datetime import datetime, timezone import spidev import RPi.GPIO as GPIO import paho.mqtt.client as mqtt from cc1101_config import ( IOCFG2, IOCFG0, FIFOTHR, PKTLEN, PKTCTRL0, PKTCTRL1, FSCTRL1, FREQ2, FREQ1, FREQ0, MDMCFG4, MDMCFG3, MDMCFG2, MDMCFG1, MDMCFG0, DEVIATN, MCSM1, MCSM0, FOCCFG, AGCCTRL2, AGCCTRL1, AGCCTRL0, FREND0, FSCAL3, FSCAL2, FSCAL1, FSCAL0, SRX, SIDLE, SFRX, SRES, MARCSTATE, RXBYTES, calculate_freq_registers ) DIY_FREQ_HZ = 868_300_000 DIY_SYNC1 = 0xD3 DIY_SYNC0 = 0x91 DIY_PACKET_LEN = 17 # Payload with VCC: 17 bytes (magic1, magic2, version, nodeId, seq, temps, hum, pres, vcc, crc) class CC1101DIY: def __init__(self, bus=0, device=0, gdo0_pin=25): # default BCM25 (pin 22) self.spi = spidev.SpiDev() self.spi.open(bus, device) self.spi.max_speed_hz = 50000 self.spi.mode = 0 self.gdo0_pin = gdo0_pin GPIO.setmode(GPIO.BCM) GPIO.setup(self.gdo0_pin, GPIO.IN) print("CC1101 DIY receiver initialized") def close(self): self.spi.close() GPIO.cleanup() def send_strobe(self, strobe): self.spi.xfer([strobe]) def reset(self): self.send_strobe(SRES) time.sleep(0.1) def write_reg(self, addr, val): self.spi.xfer([addr, val]) def read_status(self, addr): return self.spi.xfer([addr | 0xC0, 0x00])[1] def read_burst(self, addr, length): return self.spi.xfer([addr | 0xC0] + [0x00] * length)[1:] def verify_chip(self): """Verify CC1101 chip is present and responding correctly""" # Read PARTNUM (should be 0x00 for CC1101) partnum = self.read_status(0x30) # Read VERSION (typically 0x04 or 0x14 for CC1101) version = self.read_status(0x31) print(f"CC1101 Chip Detection:") print(f" PARTNUM: 0x{partnum:02X} (expected: 0x00)") print(f" VERSION: 0x{version:02X} (expected: 0x04 or 0x14)") if partnum != 0x00: print(f"ERROR: Invalid PARTNUM. Expected 0x00, got 0x{partnum:02X}") return False if version not in (0x04, 0x14): if version in (0x00, 0xFF): print(f"ERROR: No chip detected (VERSION=0x{version:02X}). Check SPI wiring.") else: print(f"WARNING: Unexpected VERSION 0x{version:02X}, but proceeding...") return False print("✓ CC1101 chip detected and verified") return True def get_rssi(self): rssi_dec = self.read_status(0x34) return ((rssi_dec - 256) / 2 - 74) if rssi_dec >= 128 else (rssi_dec / 2 - 74) def get_marc_state(self): return self.read_status(MARCSTATE) & 0x1F def get_rx_bytes(self): return self.read_status(RXBYTES) & 0x7F def flush_rx(self): self.send_strobe(SFRX) def enter_rx(self): self.send_strobe(SRX) def configure(self): print("Configuring CC1101 for DIY GFSK @868.3 MHz (1.2kBaud, 5kHz dev, 58kHz BW)...") freq_regs = calculate_freq_registers(DIY_FREQ_HZ) # GPIO mapping self.write_reg(IOCFG2, 0x2E) # GDO2 hi-Z (not wired) self.write_reg(IOCFG0, 0x06) # GDO0: assert on sync word detected # FIFO threshold - low to read quickly and prevent overflow self.write_reg(FIFOTHR, 0x07) # Trigger at 32 bytes # Packet control - fixed length, capture sync'd packets (17-byte payload including VCC) self.write_reg(PKTLEN, DIY_PACKET_LEN) self.write_reg(PKTCTRL1, 0x00) # Do NOT append status bytes (matches sender) self.write_reg(PKTCTRL0, 0x00) # Fixed length, CRC disabled # Frequency synthesizer self.write_reg(FSCTRL1, 0x06) # IF frequency control (matches sender) self.write_reg(FREQ2, freq_regs[FREQ2]) self.write_reg(FREQ1, freq_regs[FREQ1]) self.write_reg(FREQ0, freq_regs[FREQ0]) # Modem configuration for GFSK - sensitivity optimized # MDMCFG4: CHANBW_E=3, CHANBW_M=3 → 58 kHz BW, DRATE_E=8 self.write_reg(MDMCFG4, 0xF8) # BW=58kHz, DRATE_E=8 (matches sender) self.write_reg(MDMCFG3, 0x83) # DRATE_M=131 for 1.2 kBaud self.write_reg(MDMCFG2, 0x12) # GFSK, 16/16 sync word, DC filter ON self.write_reg(MDMCFG1, 0x22) # 4 preamble bytes, CHANSPC_E=2 self.write_reg(MDMCFG0, 0xF8) # CHANSPC_M=248 # Sync word configuration (0xD391 - matches sender) self.write_reg(0x04, DIY_SYNC1) # SYNC1 = 0xD3 self.write_reg(0x05, DIY_SYNC0) # SYNC0 = 0x91 # Deviation - 5 kHz for narrow channel # DEVIATN calculation: (5000 * 2^17) / 26MHz = 25.2 ≈ 0x15 self.write_reg(DEVIATN, 0x15) # ~5 kHz deviation # State machine - auto-calibrate, stay in RX self.write_reg(MCSM1, 0x3F) # CCA always, stay in RX after RX/TX self.write_reg(MCSM0, 0x18) # Auto-calibrate from IDLE to RX/TX # Frequency offset compensation - enable AFC for better lock self.write_reg(FOCCFG, 0x1D) # FOC_BS_CS_GATE, FOC_PRE_K=3K, FOC_POST_K=K/2, FOC_LIMIT=BW/4 self.write_reg(0x1A, 0x1C) # BSCFG: Bit sync config (matches sender) # AGC for GFSK sensitivity - match sender settings self.write_reg(AGCCTRL2, 0xC7) # Max DVGA gain, target 42 dB (matches sender) self.write_reg(AGCCTRL1, 0x00) # LNA priority, AGC relative threshold (matches sender) self.write_reg(AGCCTRL0, 0xB0) # Medium hysteresis, 16 samples # Front end - match sender configuration exactly self.write_reg(0x21, 0xB6) # FREND1: PA current = max (matches sender) self.write_reg(FREND0, 0x17) # FREND0: PA_POWER index = 7 (matches sender) # Calibration self.write_reg(FSCAL3, 0xE9) self.write_reg(FSCAL2, 0x2A) self.write_reg(FSCAL1, 0x00) self.write_reg(FSCAL0, 0x1F) print("Configuration applied") def read_packet(self): # Fixed-length: expect 17-byte payload (with VCC) num_bytes = self.get_rx_bytes() if self.read_status(RXBYTES) & 0x80: self.flush_rx() return None, False if num_bytes >= DIY_PACKET_LEN: data = self.read_burst(0x3F, DIY_PACKET_LEN) return data, True return None, False def reverse_bits_byte(b): """Reverse bit order in a byte (MSB<->LSB)""" result = 0 for i in range(8): result = (result << 1) | ((b >> i) & 1) return result def main(): # Setup MQTT client mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv311) mqtt_host = "192.168.43.102" mqtt_topic = "rtl_433/DietPi/events" try: mqtt_client.connect(mqtt_host, 1883, 60) mqtt_client.loop_start() print(f"Connected to MQTT broker at {mqtt_host}") except Exception as e: print(f"Warning: Could not connect to MQTT broker: {e}") print("Continuing without MQTT...") radio = CC1101DIY() try: radio.reset() # Verify chip is correctly recognized if not radio.verify_chip(): print("\nTroubleshooting:") print(" 1. Check SPI is enabled: sudo raspi-config") print(" 2. Verify wiring: MOSI, MISO, SCLK, CSN, GND, VCC") print(" 3. Check power supply (3.3V, sufficient current)") print(" 4. Try running with sudo for GPIO/SPI permissions") return radio.configure() radio.enter_rx() print("Listening... Press Ctrl+C to stop\n") packet_count = 0 last_status = time.time() while True: payload, crc_ok = radio.read_packet() if payload and len(payload) == DIY_PACKET_LEN: fmt = '= 5: state = radio.get_marc_state() # Check if stuck (not in RX state) and recover if state != 13: # 13 = RX ts_status = datetime.now(timezone.utc).isoformat() print(f"[{ts_status}] WARNING: Not in RX state ({state}) - recovering...") radio.flush_rx() radio.send_strobe(SIDLE) time.sleep(0.01) radio.enter_rx() time.sleep(0.01) # Let RX stabilize # Normal operation: silent (no status spam) last_status = time.time() time.sleep(0.0001) # Poll at 10kHz to catch all packets except KeyboardInterrupt: print("\nStopping...") finally: mqtt_client.loop_stop() mqtt_client.disconnect() radio.close() if __name__ == "__main__": main()