From f55c1fe6f1d203d7b5bbaad5e8f8458c8e8e8af4 Mon Sep 17 00:00:00 2001 From: olaf Date: Sun, 25 Jan 2026 11:25:15 +0000 Subject: [PATCH] Pool sensor v2: VCC monitoring, database resilience, receiver improvements - Added voltage monitoring table and storage pipeline - Extended pool payload to 17 bytes with VCC field (protocol v2) - Improved database connection pool resilience (reduced pool size, aggressive recycling, pool disposal on failure) - Added environment variable support for database configuration - Fixed receiver MQTT deprecation warning (CallbackAPIVersion.VERSION2) - Silenced excessive RSSI status logging in receiver - Added reset flag tracking and reporting - Updated Docker compose with DB config and log rotation limits --- .gitignore | 1 + DATABASE_CONNECTIVITY_FIXES.md | 176 ++++++++ data_tables.py | 11 +- datacollector.py | 377 +++++++++++++----- docker-compose.dev.yml | 6 +- docker-compose.yml | 11 + main.cpp | 706 +++++++++++++++++++++++++++++++++ receiver_diy.py | 296 ++++++++++++++ test_pool_decode.py | 29 +- 9 files changed, 1512 insertions(+), 101 deletions(-) create mode 100644 DATABASE_CONNECTIVITY_FIXES.md create mode 100644 main.cpp create mode 100644 receiver_diy.py diff --git a/.gitignore b/.gitignore index f1b510c..7077a35 100644 --- a/.gitignore +++ b/.gitignore @@ -68,3 +68,4 @@ logs/ # OS Thumbs.db .DS_Store +.env diff --git a/DATABASE_CONNECTIVITY_FIXES.md b/DATABASE_CONNECTIVITY_FIXES.md new file mode 100644 index 0000000..abb816a --- /dev/null +++ b/DATABASE_CONNECTIVITY_FIXES.md @@ -0,0 +1,176 @@ +# Database Connectivity Issues - Analysis & Fixes + +## Problem Summary + +The NAS container experiences **intermittent database connectivity** failures with the error: +``` +Exception during reset or similar +_mysql_connector.MySQLInterfaceError: Lost connection to MySQL server during query +``` + +While Docker for Desktop works reliably and MySQL Workbench can connect without issues. + +--- + +## Root Causes Identified + +### 1. **Aggressive Connection Pool Settings** +- **Old config**: `pool_size=5` + `max_overflow=10` = up to 15 simultaneous connections +- **Problem**: Creates excessive connections that exhaust database resources or trigger connection limits +- **Result**: Pool reset failures when trying to return/reset dead connections + +### 2. **Insufficient Connection Recycling** +- **Old config**: `pool_recycle=1800` (30 minutes) +- **Problem**: Connections held too long; database may timeout/close them due to `wait_timeout` or network issues +- **Result**: When SQLAlchemy tries to reuse connections, they're already dead + +### 3. **Conflicting autocommit Setting** +- **Old config**: `autocommit=True` in connect_args +- **Problem**: When autocommit is enabled, there's nothing to rollback, but SQLAlchemy still tries during pool reset +- **Result**: Rollback fails on dead connections → traceback logged + +### 4. **Pool Reset on Dead Connections** +- **Config**: `pool_reset_on_return="none"` (correct) but **didn't dispose pool on failure** +- **Problem**: When a connection dies, the pool kept trying to reuse it +- **Result**: Repeated failures until the next retry window (30 seconds) + +### 5. **Network/Database Timeout Issues (NAS-specific)** +- **Likely cause**: NAS MariaDB has aggressive connection timeouts +- **Or**: Container network has higher packet loss/latency than Docker Desktop +- **Or**: Pool exhaustion prevents new connections from being established + +--- + +## Applied Fixes + +### ✅ Fix 1: Conservative Connection Pool (Lines 183-195) + +```python +pool_size=3, # Reduced from 5 +max_overflow=5, # Reduced from 10 +pool_recycle=300, # Reduced from 1800 (every 5 mins vs 30 mins) +autocommit=False, # Removed - let SQLAlchemy manage transactions +``` + +**Why this works:** +- Fewer simultaneous connections = less resource contention +- Aggressive recycling = avoids stale connections killed by database +- Proper transaction management = cleaner rollback handling + +### ✅ Fix 2: Pool Disposal on Connection Failure (Lines 530-533) + +```python +except exc.OperationalError as e: + sql_engine.dispose() # ← CRITICAL: Force all connections to be closed/recreated + logger.warning(f"Lost database connectivity: {e}") +``` + +**Why this works:** +- When connection fails, dump the entire pool +- Next connection attempt gets fresh connections +- Avoids repeated failures trying to reuse dead connections + +### ✅ Fix 3: Environment Variable Support (Lines 169-175) + +```python +DB_HOST = os.getenv("DB_HOST", "192.168.43.102") +DB_PORT = int(os.getenv("DB_PORT", "3306")) +# ... etc +``` + +**Why this matters:** +- Different deployments can now use different database hosts +- Docker Desktop can use `192.168.43.102` +- NAS can use `mariadb` (Docker DNS) or different IP if needed + +--- + +## Recommended MariaDB Configuration + +The NAS MariaDB should have appropriate timeout settings: + +```sql +-- Check current settings +SHOW VARIABLES LIKE 'wait_timeout'; +SHOW VARIABLES LIKE 'interactive_timeout'; +SHOW VARIABLES LIKE 'max_connections'; +SHOW VARIABLES LIKE 'max_allowed_packet'; + +-- Recommended settings (in /etc/mysql/mariadb.conf.d/50-server.cnf) +[mysqld] +wait_timeout = 600 # 10 minutes (allow idle connections longer) +interactive_timeout = 600 +max_connections = 100 # Ensure enough for pool + workbench +max_allowed_packet = 64M +``` + +--- + +## Deployment Instructions + +### For Docker Desktop: +```bash +# Use default or override in your compose +docker-compose -f docker-compose.yml up +``` + +### For NAS: +Update your docker-compose or environment file: +```yaml +environment: + - DB_HOST=192.168.43.102 # or your NAS's actual IP/hostname + - DB_PORT=3306 + - DB_USER=weatherdata + - DB_PASSWORD=cfCU$swM!HfK82%* + - DB_NAME=weatherdata + - DB_CONNECT_TIMEOUT=5 +``` + +--- + +## Monitoring + +The application now logs database configuration at startup: +``` +DB config: host=192.168.43.102:3306, user=weatherdata, db=weatherdata +``` + +Monitor the logs for: +- **"Database reachable again"** → Connection recovered +- **"Lost database connectivity"** → Transient failure detected and pool disposed +- **"Stored batch locally"** → Data queued to SQLite while DB unavailable + +--- + +## Testing + +### Test 1: Verify Environment Variables +```bash +# Run container with override +docker run -e DB_HOST=test-host ... python datacollector.py +# Check log: "DB config: host=test-host:3306" +``` + +### Test 2: Simulate Connection Loss +```python +# In Python shell connected to container +import requests +requests.get('http://container:port/shutdown') # Reconnect simulation +# Should see: "Database still unreachable" → "Database reachable again" +``` + +### Test 3: Monitor Pool State +Enable pool logging: +```python +echo_pool=True # Line 195 in datacollector.py +``` + +--- + +## Expected Behavior After Fix + +- ✅ Connection pool adapts to transient failures +- ✅ Stale connections are recycled frequently +- ✅ Pool is disposed on failure to prevent cascading errors +- ✅ Different environments can specify different hosts +- ✅ Data is cached locally if database is temporarily unavailable diff --git a/data_tables.py b/data_tables.py index 4ef483a..3016dc7 100644 --- a/data_tables.py +++ b/data_tables.py @@ -85,4 +85,13 @@ class Precipitation(Base): sensor_id = Column(Integer, ForeignKey('sensors.id')) sensor = relationship('Sensor', backref='precipitation') timestamp = Column(DateTime, default=datetime.utcnow) - precipitation = Column(Float) \ No newline at end of file + precipitation = Column(Float) + +# Define the Voltage table +class Voltage(Base): + __tablename__ = 'voltage' + id = Column(Integer, primary_key=True) + sensor_id = Column(Integer, ForeignKey('sensors.id')) + sensor = relationship('Sensor', backref='voltage') + timestamp = Column(DateTime, default=datetime.utcnow) + vcc_mv = Column(Integer) \ No newline at end of file diff --git a/datacollector.py b/datacollector.py index 57e271a..5ee6da5 100644 --- a/datacollector.py +++ b/datacollector.py @@ -19,11 +19,15 @@ from sqlalchemy.engine import URL # from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime, timezone -from data_tables import Sensor, TemperatureInside,TemperatureOutside, HumidityOutside, HumidityInside, AirPressure, Wind, Precipitation +from data_tables import Sensor, TemperatureInside,TemperatureOutside, HumidityOutside, HumidityInside, AirPressure, Wind, Precipitation, Voltage, Base # Load .env file so environment variables from .env are available at startup load_dotenv() +# Strip quotes from password if they were included (defensive) +DB_PASSWORD_RAW = os.getenv("DB_PASSWORD", "cfCUswMHfK82!") +DB_PASSWORD = DB_PASSWORD_RAW.strip("'\"") # Remove surrounding quotes if present + # Configure logging with environment-based log level LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logger = logging.getLogger(__name__) @@ -73,6 +77,7 @@ sensor_ids = [] sensor_names = [] sensor_by_name_room = {} pool_sensors_cache = {} +pool_reset_flags_seen = {} # Track sensor failure states to avoid repeated logging sensor_failure_logged = {} @@ -164,11 +169,58 @@ sqlite_conn.commit() logger.info(f"SQLite database initialized at: {sqlite_db_path}") -# Database connection (hardcoded for private intranet) -DB_HOST = "192.168.43.102" -DB_PORT = 3306 -sql_engine = create_engine('mysql+mysqlconnector://weatherdata:cfCU$swM!HfK82%*@192.168.43.102/weatherdata', - connect_args={"connection_timeout": 5}) +# Database connection configuration (read from environment variables or use defaults) +DB_HOST = os.getenv("DB_HOST", "192.168.43.102") +DB_PORT = int(os.getenv("DB_PORT", "3306")) +DB_USER = os.getenv("DB_USER", "weatherdata") +DB_PASSWORD_RAW = os.getenv("DB_PASSWORD", "cfCUswMHfK82!") +DB_PASSWORD = DB_PASSWORD_RAW.strip("'\"") # Remove any surrounding quotes +DB_NAME = os.getenv("DB_NAME", "weatherdata") +DB_CONNECT_TIMEOUT = int(os.getenv("DB_CONNECT_TIMEOUT", "5")) + +# Log database configuration at startup +logger.info(f"DB config: host={DB_HOST}:{DB_PORT}, user={DB_USER}, db={DB_NAME}") +if DB_PASSWORD: + logger.debug(f"DB_PASSWORD length: {len(DB_PASSWORD)}, chars: {[c for c in DB_PASSWORD]}") + +# Build connection URL (password will be passed separately in connect_args for proper handling) +db_url = f"mysql+mysqlconnector://{DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + +# Create engine with connection pool resilience +# Key settings to handle intermittent network issues: +# - pool_pre_ping: Verify connection is alive before using +# - pool_recycle: Aggressively recycle to avoid stale connections +# - pool_size: Conservative pool to avoid resource exhaustion +# - max_overflow: Limited to prevent connection thrashing +# - pool_reset_on_return: Use "none" to avoid failed rollback on dead connections +sql_engine = create_engine( + db_url, + connect_args={ + "user": DB_USER, + "password": DB_PASSWORD, # Pass password separately to avoid URL encoding issues + "host": DB_HOST, + "port": DB_PORT, + "database": DB_NAME, + "connection_timeout": DB_CONNECT_TIMEOUT, + "autocommit": False, # Let SQLAlchemy manage transactions properly + "raise_on_warnings": False, # Suppress MySQL warnings that clutter logs + }, + pool_pre_ping=True, # Test connection before using it (detects stale connections) + pool_recycle=300, # Recycle connections every 5 minutes (aggressive, handles server timeouts) + pool_timeout=10, # Wait up to 10 seconds to get a connection from pool + pool_size=3, # Keep only 3 steady connections (was 5) + max_overflow=5, # Allow only 5 overflow connections (was 10, prevents thrashing) + pool_reset_on_return="none", # Avoid rollback on return to prevent "Lost connection" errors + echo=False, # Set to True for SQL logging if debugging + echo_pool=False, # Set to True for connection pool logging if debugging +) + +# Ensure tables exist (safe: creates only missing ones) +try: + Base.metadata.create_all(sql_engine) + logger.info("Verified/created database tables") +except Exception as e: + logger.warning(f"Could not auto-create tables: {e}") # DB availability tracking for resilient mode db_available = False @@ -234,10 +286,32 @@ def parse_radio_frame(byte_data): return None -PAYLOAD_SIZE = 15 # bytes in pool payload +POOL_PAYLOAD_FORMATS = [ + { + "size": 17, # New payload with VCC + "struct": " int: """Simple XOR checksum used by the pool payload.""" @@ -247,67 +321,121 @@ def crc8_xor(data: bytes) -> int: return c -def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = None): - """Scan a byte stream for a plausible pool payload. +def parse_version_and_reset_flags(version_byte: int): + """Return protocol version (low nibble) and decoded reset flags (high nibble).""" + protocol_version = version_byte & 0x0F + reset_flags = (version_byte >> 4) & 0x0F + reset_causes = [desc for bit, desc in RESET_FLAG_MAP if reset_flags & bit] + return protocol_version, reset_flags, reset_causes - Slides a 15-byte window, validates with CRC, version/nodeId, and range checks, - and scores candidates. Returns the best decoded dict or None. + +def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = None): + """Scan a byte stream for a plausible pool payload (v1 legacy + v2 with VCC). + + Slides a window across the stream for each supported payload size, validates CRC, + performs plausibility checks, and scores candidates. Returns the best decoded dict + or None. VCC is decoded for the new format but ignored when storing for now. """ # Drop leading preamble (0xAA) if present while candidate_bytes.startswith(b"\xaa"): candidate_bytes = candidate_bytes[1:] best = None - best_score = -1 + best_score = float('-inf') - for offset in range(0, len(candidate_bytes) - PAYLOAD_SIZE + 1): - chunk = candidate_bytes[offset:offset + PAYLOAD_SIZE] - try: - magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack( - ' len(candidate_bytes): + continue - crc_calculated = crc8_xor(chunk[:-1]) - if crc_calculated != crc_received: - continue + chunk = candidate_bytes[offset:offset + fmt["size"]] + try: + if fmt["includes_vcc"]: + ( + magic1, + magic2, + version_byte, + nodeId, + seq, + t_ds10, + t_bme10, + hum10, + pres1, + vcc_mv, + crc_received, + ) = struct.unpack(fmt["struct"], chunk) + else: + ( + magic1, + magic2, + version_byte, + nodeId, + seq, + t_ds10, + t_bme10, + hum10, + pres1, + crc_received, + ) = struct.unpack(fmt["struct"], chunk) + vcc_mv = None + except struct.error: + continue - if version != 1 or nodeId != 1: - continue + crc_calculated = crc8_xor(chunk[:-1]) + crc_valid = crc_calculated == crc_received - # Plausibility checks (unit scaled) - if not (-300 <= t_ds10 <= 600): # -30.0 to 60.0°C - continue - if not (-300 <= t_bme10 <= 600): - continue - if not (0 <= hum10 <= 1000): # 0.0–100.0% - continue - if not (8000 <= pres1 <= 11000): # 800.0–1100.0 hPa - continue + protocol_version, reset_flags, reset_causes = parse_version_and_reset_flags(version_byte) - score = 0 - if magic1 == MAGIC1 and magic2 == MAGIC2: - score += 2 - if expected_seq is not None and seq == expected_seq: - score += 1 - # CRC already validated; reward shorter offset to prefer first valid - score -= offset * 0.001 + # Accept protocol version 1 (legacy) and 2 (future) to tolerate FW bumps + if protocol_version not in (1, 2) or nodeId != 1: + continue - if score > best_score: - best_score = score - best = { - "offset": offset, - "magic_ok": magic1 == MAGIC1 and magic2 == MAGIC2, - "version": version, - "nodeId": nodeId, - "sequence": seq, - "t_ds_c": t_ds10 / 10.0, - "t_bme_c": t_bme10 / 10.0, - "humidity": hum10 / 10.0, - "pressure_hpa": pres1 / 10.0, - "crc_valid": True, - } + # Plausibility checks (unit scaled) + if not (-300 <= t_ds10 <= 600): # -30.0 to 60.0°C + continue + if not (-300 <= t_bme10 <= 600): + continue + if not (0 <= hum10 <= 1000): # 0.0–100.0% + continue + if not (8000 <= pres1 <= 11000): # 800.0–1100.0 hPa + continue + if vcc_mv is not None and not (1000 <= vcc_mv <= 5000): + continue + + score = 0 + if magic1 == MAGIC1 and magic2 == MAGIC2: + score += 2 + if expected_seq is not None and seq == expected_seq: + score += 1 + if fmt["includes_vcc"]: + score += 0.5 # Prefer new payload when both are valid + if crc_valid: + score += 3 + else: + score -= 3 # Keep but penalize invalid CRC + # CRC already validated; reward shorter offset to prefer first valid + score -= offset * 0.001 + + if score > best_score: + best_score = score + best = { + "offset": offset, + "magic_ok": magic1 == MAGIC1 and magic2 == MAGIC2, + "version": protocol_version, + "version_byte": version_byte, + "reset_flags": reset_flags, + "reset_causes": reset_causes, + "nodeId": nodeId, + "sequence": seq, + "t_ds_c": t_ds10 / 10.0, + "t_bme_c": t_bme10 / 10.0, + "humidity": hum10 / 10.0, + "pressure_hpa": pres1 / 10.0, + "vcc_mv": vcc_mv, + "crc_valid": crc_valid, + "crc_expected": crc_calculated, + "format": fmt["label"], + } return best @@ -416,7 +544,7 @@ def get_sensor_keys(sensor_type): 'Oregon-v1': ['temperature_C', 'battery_ok'], 'Oregon-THGR122N': ['temperature_C', 'humidity', 'battery_ok'], 'inFactory-TH': ['temperature_C', 'humidity', 'battery_ok'], - 'BME280': ['temperature_C', 'humidity', 'pressure_rel'], # Pool BME280 + 'BME280': ['temperature_C', 'humidity', 'pressure_rel', 'vcc_mv'], # Pool BME280 includes VCC 'DS18B20': ['temperature_C'], # Pool DS18B20 } # Fallback for unknown types - try to match by substring @@ -448,7 +576,11 @@ def save_json_locally(json_dict): def ensure_db_connection(force: bool = False) -> bool: - """Try to establish DB connectivity with throttling. Returns True if DB is reachable.""" + """Try to establish DB connectivity with throttling. Returns True if DB is reachable. + + This function tests the connection and reinitializes the session if needed. + On failure, it disposes the pool to force reconnection next attempt. + """ global db_available, last_db_check, session now = time.time() if not force and (now - last_db_check) < DB_RETRY_SECONDS: @@ -456,6 +588,7 @@ def ensure_db_connection(force: bool = False) -> bool: last_db_check = now try: + # Test connection with explicit timeout with sql_engine.connect() as conn: conn.execute(text('SELECT 1')) if not db_available: @@ -463,12 +596,21 @@ def ensure_db_connection(force: bool = False) -> bool: db_available = True # Recreate session to ensure fresh connections session = Session() - except Exception as e: + except exc.OperationalError as e: + # Connection failed - dispose pool to force fresh connections on next attempt + sql_engine.dispose() if db_available: logger.warning(f"Lost database connectivity: {e}") else: logger.info(f"Database still unreachable: {e}") db_available = False + except Exception as e: + sql_engine.dispose() + if db_available: + logger.warning(f"Unexpected database error: {type(e).__name__}: {e}") + else: + logger.info(f"Database still unreachable: {type(e).__name__}: {e}") + db_available = False return db_available @@ -477,7 +619,7 @@ def on_connect(client, userdata, flags, reason_code, properties): # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe(MQTT_TOPIC_PREFIX) - print(f"Connected with result code {reason_code}") + logger.info(f"Connected with result code {reason_code}") # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): @@ -513,8 +655,8 @@ def on_message(client, userdata, msg): except ValueError: if LOG_MALFORMED_HEX: malformed_hex_logger.info(f"Invalid hex: {hex_data}") - print(f"Invalid hex data: {hex_data}") - print(d) + logger.debug(f"Invalid hex data: {hex_data}") + logger.debug(f"Full message: {d}") warte = '' return @@ -522,8 +664,8 @@ def on_message(client, userdata, msg): if LOG_MALFORMED_HEX and candidate_meta.get("source") == "raw": malformed_hex_logger.info(f"Pool using raw bytes (no sync match): {byte_data.hex()}") - print(f"Raw bytes ({len(byte_data)}): {byte_data.hex()}") - print( + logger.debug(f"Raw bytes ({len(byte_data)}): {byte_data.hex()}") + logger.debug( f"Candidate payload ({len(candidate_bytes)}), source={candidate_meta.get('source')}: " f"{candidate_bytes.hex()}" ) @@ -541,17 +683,34 @@ def on_message(client, userdata, msg): if not decoded: if LOG_MALFORMED_HEX: malformed_hex_logger.info(f"No valid payload found: {candidate_bytes.hex()}") - print("No valid pool payload found in candidate bytes") + logger.debug("No valid pool payload found in candidate bytes") warte = '' return - print( - f"Decoded payload at offset {decoded['offset']}: seq={decoded['sequence']}, " - f"t_ds={decoded['t_ds_c']}C, t_bme={decoded['t_bme_c']}C, " - f"hum={decoded['humidity']}%, pres={decoded['pressure_hpa']}hPa, " - f"magic_ok={decoded['magic_ok']}" + logger.debug( + f"Decoded payload at offset {decoded['offset']} ({decoded.get('format','')})" + f": seq={decoded['sequence']}, t_ds={decoded['t_ds_c']}C, " + f"t_bme={decoded['t_bme_c']}C, hum={decoded['humidity']}%, " + f"pres={decoded['pressure_hpa']}hPa, vcc={decoded.get('vcc_mv','n/a')}mV, " + f"magic_ok={decoded['magic_ok']}, crc_valid={decoded['crc_valid']}, " + f"crc_exp={decoded.get('crc_expected')}, reset_flags=0x{decoded['reset_flags']:X}" ) + reset_flags = decoded.get('reset_flags', 0) + reset_causes = decoded.get('reset_causes', []) + last_flags = pool_reset_flags_seen.get(decoded['nodeId']) + + if last_flags != reset_flags: + causes_text = ", ".join(reset_causes) if reset_causes else "none set" + reset_msg = ( + f"Pool node {decoded['nodeId']} MCU reset flags 0x{reset_flags:X}: {causes_text}" + ) + if reset_flags & 0x0C: # BORF or WDRF -> warn + logger.warning(reset_msg) + else: + logger.info(reset_msg) + pool_reset_flags_seen[decoded['nodeId']] = reset_flags + original_time = d.get('time', datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S')) bme_msg = { @@ -562,6 +721,7 @@ def on_message(client, userdata, msg): 'temperature_C': decoded['t_bme_c'], 'humidity': decoded['humidity'], 'pressure_rel': decoded['pressure_hpa'], + 'vcc_mv': decoded.get('vcc_mv'), 'mic': 'CRC' } @@ -575,7 +735,7 @@ def on_message(client, userdata, msg): } for msg_data in [bme_msg, ds_msg]: - print(f"Received message from {msg_data['model']}: \n {msg_data}") + logger.debug(f"Received message from {msg_data['model']}: {msg_data}") sensor_id = msg_data['id'] sensor_key = f"{msg_data['model']}_{sensor_id}" @@ -588,7 +748,7 @@ def on_message(client, userdata, msg): return else: # Process non-pool sensors - print(f"Received message from {model}: \n {d}") + logger.debug(f"Received message from {model}: {d}") id = d['id'] if model == 'Bresser-6in1': if d['flags'] == 0: @@ -602,7 +762,7 @@ def on_message(client, userdata, msg): # Define a function to update the data in the database -def update_data(utc_time, mqtt_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm): +def update_data(utc_time, mqtt_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm, vcc_mv=None): values = { "utc_time": utc_time, "mqtt_id": mqtt_id, @@ -613,7 +773,8 @@ def update_data(utc_time, mqtt_id, temperature_c, humidity, pressure_rel, batter "average_speed": average_speed, "direction": direction, "gust": gust, - "rain_mm": rain_mm + "rain_mm": rain_mm, + "vcc_mv": vcc_mv, } if ensure_db_connection(): new_data_queue.append(values) @@ -782,7 +943,7 @@ def handle_pool_nodeid_change(old_node_id, new_mqtt_id): refresh_sensor_cache() -def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm): +def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, battery, average_speed, direction, gust, rain_mm, vcc_mv=None): mqtt_name, mqtt_id = mqtt_name_id.split('_', 1) # Use maxsplit=1 to handle IDs with underscores # Get the sensor object from the database (with auto-update for battery changes) @@ -798,7 +959,8 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b 'average_speed': average_speed, 'direction': direction, 'gust': gust, - 'rain_mm': rain_mm + 'rain_mm': rain_mm, + 'vcc_mv': vcc_mv, }) return @@ -816,7 +978,8 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b 'average_speed': average_speed, 'direction': direction, 'gust': gust, - 'rain_mm': rain_mm + 'rain_mm': rain_mm, + 'vcc_mv': vcc_mv, }) return @@ -825,9 +988,14 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b # Update the sensor's battery level sensor.battery = battery - # Update last contact time for pool sensors + # Update last contact time + # Pool sensors: update every contact (critical for monitoring) + # Other sensors: only update if >5 minutes to reduce DB writes + now = datetime.now(timezone.utc) if mqtt_name == 'pool': - sensor.last_contact = datetime.now(timezone.utc) + sensor.last_contact = now + elif sensor.last_contact is None or (now - sensor.last_contact.replace(tzinfo=timezone.utc)).total_seconds() > 300: + sensor.last_contact = now # Update the temperature data if temperature_c is not None: @@ -868,6 +1036,14 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b air_pressure = AirPressure(timestamp=utc_time, sensor_id=sensor.id, pressure_rel=pressure_rel) session.add(air_pressure) + # Store voltage if provided (associate with this sensor) + if vcc_mv is not None: + try: + voltage_entry = Voltage(timestamp=utc_time, sensor_id=sensor.id, vcc_mv=int(vcc_mv)) + session.add(voltage_entry) + except Exception as e: + logger.warning(f"Failed to store voltage for {mqtt_name_id}: {e}") + if average_speed is not None or gust is not None or direction is not None: wind_value = session.query(Wind).filter_by(sensor_id=sensor.id).order_by(Wind.timestamp.desc()).first() if wind_value is None or (average_speed is not None and wind_value.average_speed != average_speed) or (gust is not None and wind_value.gust != gust) or (direction is not None and wind_value.direction != direction): @@ -903,7 +1079,10 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b session.commit() except exc.SQLAlchemyError as e: logger.error(f"SQLAlchemyError on commit: {e}") - session.rollback() + try: + session.rollback() + except Exception: + pass # Ignore rollback errors if connection is lost save_json_locally({ 'utc_time': utc_time, 'mqtt_id': mqtt_name_id, @@ -914,11 +1093,15 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b 'average_speed': average_speed, 'direction': direction, 'gust': gust, - 'rain_mm': rain_mm + 'rain_mm': rain_mm, + 'vcc_mv': vcc_mv, }) except Exception as e: logger.error(f"Error on commit: {e}") - session.rollback() + try: + session.rollback() + except Exception: + pass # Ignore rollback errors if connection is lost save_json_locally({ 'utc_time': utc_time, 'mqtt_id': mqtt_name_id, @@ -929,7 +1112,8 @@ def store_in_db(utc_time, mqtt_name_id, temperature_c, humidity, pressure_rel, b 'average_speed': average_speed, 'direction': direction, 'gust': gust, - 'rain_mm': rain_mm + 'rain_mm': rain_mm, + 'vcc_mv': vcc_mv, }) @@ -988,12 +1172,12 @@ def debug_sended_data(seen_messages, averages, sensor): if not debug: return - print(f'Averages for {sensor}:') + logger.debug(f'Averages for {sensor}:') for key, value in averages.items(): - print(f"{key}: {value}") + logger.debug(f"{key}: {value}") - print(f"Remaining data {sensor}:") - print(seen_messages[sensor]) + logger.debug(f"Remaining data {sensor}:") + logger.debug(f"{seen_messages[sensor]}") def process_sensor_data(utc_time, sensor_key, data_list, keys_to_average, mqtt_id_override=None): """Helper function to process any sensor data consistently""" @@ -1021,7 +1205,8 @@ def process_sensor_data(utc_time, sensor_key, data_list, keys_to_average, mqtt_i averages.get('wind_avg_m_s'), averages.get('wind_dir_deg'), averages.get('wind_max_m_s'), - averages.get('rain_mm') + averages.get('rain_mm'), + averages.get('vcc_mv') ) debug_sended_data({sensor_key: remaining}, averages, sensor_key) return remaining @@ -1093,6 +1278,8 @@ def get_and_delete_json_data(): # Funktion zum Synchronisieren der Daten def sync_data(): + global session + if not ensure_db_connection(force=True): # MariaDB nicht verfügbar - speichere in SQLite while new_data_queue: @@ -1114,14 +1301,17 @@ def sync_data(): store_in_db(data['utc_time'], data['mqtt_id'], data.get('temperature_c'), data.get('humidity'), data.get('pressure_rel'), data.get('battery', 1), data.get('average_speed'), data.get('direction'), - data.get('gust'), data.get('rain_mm')) + data.get('gust'), data.get('rain_mm'), data.get('vcc_mv')) if not local_data_written: utc_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") logger.info(f"{utc_time}: Restoring data from local SQLite backup to MariaDB") local_data_written = True except exc.SQLAlchemyError as e: logger.error(f"SQLAlchemyError syncing local data: {e}") - session.rollback() + try: + session.rollback() + except Exception: + pass # Ignore rollback errors if connection is lost # Rette den Datensatz zurück in SQLite save_json_locally(data) except Exception as e: @@ -1135,10 +1325,13 @@ def sync_data(): if isinstance(data, dict) and 'mqtt_id' in data: store_in_db(data['utc_time'], data['mqtt_id'], data['temperature_c'], data['humidity'], data['pressure_rel'], data['battery'], data['average_speed'], data['direction'], - data['gust'], data['rain_mm']) + data['gust'], data['rain_mm'], data.get('vcc_mv')) except exc.SQLAlchemyError as e: logger.error(f"SQLAlchemyError: {e}") - session.rollback() + try: + session.rollback() + except Exception: + pass # Ignore rollback errors if connection is lost save_json_locally(data) except Exception as e: logger.error(f"Error writing data: {e}") @@ -1259,7 +1452,7 @@ if __name__ == '__main__': else: logger.warning(f"Starting without database; will cache data locally and retry every {DB_RETRY_SECONDS}s") - print('start data collection') + logger.info('Starting data collection') try: while True: # Periodically retry DB connection if currently down diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 5207264..a7a5c55 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -17,10 +17,10 @@ services: - SSH_KEY_PATH=/workspace/.ssh/id_rsa - STALL_WINDOW_SECONDS=${STALL_WINDOW_SECONDS:-300} - RESTART_COOLDOWN_SECONDS=${RESTART_COOLDOWN_SECONDS:-3600} - # DB override for dev testing; defaults to an invalid host to simulate DB-down - - DB_HOST=${DB_HOST:-invalid-host} + # DB config for dev; connect to real database + - DB_HOST=${DB_HOST:-192.168.43.102} - DB_PORT=${DB_PORT:-3306} - DB_USER=${DB_USER:-weatherdata} - - DB_PASSWORD=${DB_PASSWORD:-cfCU$swM!HfK82%*} + - DB_PASSWORD=${DB_PASSWORD:-cfCUswMHfK82!} - DB_NAME=${DB_NAME:-weatherdata} - DB_CONNECT_TIMEOUT=${DB_CONNECT_TIMEOUT:-5} diff --git a/docker-compose.yml b/docker-compose.yml index cead945..70ee154 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,12 @@ services: - SSH_KEY_PATH=/workspace/.ssh/id_rsa - STALL_WINDOW_SECONDS=${STALL_WINDOW_SECONDS:-300} - RESTART_COOLDOWN_SECONDS=${RESTART_COOLDOWN_SECONDS:-3600} + - DB_HOST=${DB_HOST:-192.168.43.102} + - DB_PORT=${DB_PORT:-3306} + - DB_USER=${DB_USER:-weatherdata} + - DB_PASSWORD=${DB_PASSWORD:-cfCUswMHfK82!} + - DB_NAME=${DB_NAME:-weatherdata} + - DB_CONNECT_TIMEOUT=${DB_CONNECT_TIMEOUT:-5} volumes: - /volume1/docker/weatherstation:/workspace - /volume1/docker/weatherstation/data:/workspace/data @@ -17,6 +23,11 @@ services: - /volume1/docker/weatherstation/secrets/known_hosts:/workspace/.ssh/known_hosts:ro restart: unless-stopped command: python datacollector.py + logging: + driver: json-file + options: + max-size: "5m" + max-file: "3" networks: - data-net stdin_open: true diff --git a/main.cpp b/main.cpp new file mode 100644 index 0000000..d25e51d --- /dev/null +++ b/main.cpp @@ -0,0 +1,706 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Capture and persist reset cause as early as possible +// Store MCUSR in a noinit section so C runtime doesn't clear it +static uint8_t mcusr_mirror __attribute__((section(".noinit"))); + +void wdt_init(void) __attribute__((naked)) __attribute__((section(".init3"))); +void wdt_init(void) { + mcusr_mirror = MCUSR; + MCUSR = 0; + wdt_disable(); + __asm__ __volatile__ ("ret"); +} + +static uint8_t lastResetFlagsGlobal = 0; + +// Battery Optimization Settings +#define ENABLE_CHANGE_DETECTION 1 // Only transmit when values change +#define TEMP_CHANGE_THRESHOLD 0.1 // °C - transmit if temp changes by this amount (reduced from 0.2 for better noise filtering) +#define HUM_CHANGE_THRESHOLD 2.0 // % - transmit if humidity changes by this amount +#define PRES_CHANGE_THRESHOLD 1.0 // hPa - transmit if pressure changes by this amount +#define ALIVE_INTERVAL 10 // Force transmission every N cycles even if no change (10*30s = 5min) +#define SENSOR_INTERVAL 30 // Read sensors every N seconds (30s = good balance) + +// CC1101 Register addresses +#define CC1101_IOCFG0 0x02 +#define CC1101_PKTLEN 0x06 +#define CC1101_PKTCTRL1 0x07 +#define CC1101_SYNC1 0x04 +#define CC1101_SYNC0 0x05 +#define CC1101_PKTCTRL0 0x08 +#define CC1101_CHANNR 0x0A +#define CC1101_FSCTRL1 0x0B +#define CC1101_FREQ2 0x0D +#define CC1101_FREQ1 0x0E +#define CC1101_FREQ0 0x0F +#define CC1101_MDMCFG4 0x10 +#define CC1101_MDMCFG3 0x11 +#define CC1101_MDMCFG2 0x12 +#define CC1101_MDMCFG1 0x13 +#define CC1101_MDMCFG0 0x14 +#define CC1101_DEVIATN 0x15 +#define CC1101_MCSM1 0x17 +#define CC1101_MCSM0 0x18 +#define CC1101_FOCCFG 0x19 +#define CC1101_BSCFG 0x1A +#define CC1101_AGCCTRL2 0x1B +#define CC1101_AGCCTRL1 0x1C +#define CC1101_AGCCTRL0 0x1D +#define CC1101_FREND1 0x21 +#define CC1101_FREND0 0x22 +#define CC1101_FSCAL3 0x23 +#define CC1101_FSCAL2 0x24 +#define CC1101_FSCAL1 0x25 +#define CC1101_FSCAL0 0x26 +#define CC1101_TEST2 0x2C +#define CC1101_TEST1 0x2D +#define CC1101_TEST0 0x2E +#define CC1101_PATABLE 0x3E +#define CC1101_TXFIFO 0x3F + +// Command strobes +#define CC1101_SRES 0x30 +#define CC1101_SCAL 0x33 +#define CC1101_STX 0x35 +#define CC1101_SIDLE 0x36 +#define CC1101_SFTX 0x3B + +// Status registers +#define CC1101_PARTNUM 0x30 +#define CC1101_VERSION 0x31 +#define CC1101_RSSI 0x34 +#define CC1101_MARCSTATE 0x35 + +// Pin definitions +#define CC1101_CS 10 +#define LED_PIN 4 +#define ONE_WIRE_BUS 3 +#define PRES_OFFSET 41.0F + +#define MYNODEID 1 + +// Change detection variables +float lastTxDsTempC = -999.0; +float lastTxBmeTempC = -999.0; +float lastTxBmeHum = -999.0; +float lastTxBmePres = -999.0; +uint8_t cyclesSinceLastTx = 0; + +// Sensor interval tracking +uint16_t wakeCount = 0; +volatile uint8_t watchdog_wakeups = 0; // Counter for watchdog interrupts + +// Watchdog ISR - wakes MCU from sleep +ISR(WDT_vect) { + watchdog_wakeups++; + // Note: do NOT disable watchdog here - it will be re-enabled before next sleep +} + +void sleep_1s(void) { + // Ensure serial is flushed before sleep + Serial.flush(); + + // Disable UART to save power during sleep (~2-3mA) + power_usart0_disable(); + + // Configure watchdog for 1-second interrupt-based wake + cli(); // Disable interrupts during setup + wdt_reset(); + WDTCSR |= (1 << WDCE) | (1 << WDE); // Enable watchdog configuration change + WDTCSR = (1 << WDIE) | (1 << WDP2) | (1 << WDP1); // 1-second, interrupt only + sei(); // Re-enable interrupts + + // Set sleep mode to Power Down (lowest power: ~5-10µA) + set_sleep_mode(SLEEP_MODE_PWR_DOWN); + sleep_enable(); + + // Enter power-down sleep - wakes on watchdog interrupt + sleep_cpu(); + + // Execution resumes here after watchdog interrupt + sleep_disable(); + wdt_disable(); // Stop watchdog after waking + + // Re-enable UART after sleep + power_usart0_enable(); + + // Small delay to let UART stabilize + delayMicroseconds(100); +} + +// Payload structure +struct Payload { + uint8_t magic1; + uint8_t magic2; + uint8_t version; + uint8_t nodeId; + uint16_t seq; + int16_t t_ds10; + int16_t t_bme10; + uint16_t hum10; + uint16_t pres1; + uint16_t vcc; // VCC in millivolts + uint8_t crc; +}; + +// Ensure payload size matches radio PKTLEN expectations (17 bytes) +static_assert(sizeof(Payload) == 17, "Payload size must be 17 bytes"); + +Payload p; +static uint16_t seq = 0; + +OneWire oneWire(ONE_WIRE_BUS); +DallasTemperature ds18(&oneWire); +Adafruit_BME280 bme; +static bool bme_ok = false; + +struct BmeSample { + float tempC; + float hum; + float presHpa; +}; + +uint8_t calcCrc(const uint8_t *data, uint8_t len) { + uint8_t c = 0; + for (uint8_t i = 0; i < len; i++) c ^= data[i]; + return c; +} + +// Read VCC voltage in millivolts using internal 1.1V reference +uint16_t readVcc() { + // Set ADC reference to internal 1.1V and measure VCC + ADMUX = _BV(REFS0) | _BV(MUX3) | _BV(MUX2) | _BV(MUX1); + delay(2); // Wait for Vref to settle + ADCSRA |= _BV(ADSC); // Start conversion + while (bit_is_set(ADCSRA, ADSC)); // Wait for completion + uint16_t result = ADC; + // Calculate VCC in millivolts: internal_ref * 1024 / ADC reading + // Calibrated: 1450512 gives 3300mV on this chip (internal ref ~1.42V) + uint16_t vcc = (1450512L / result); + + // Disable ADC to save power (~200µA) + power_adc_disable(); + + return vcc; +} + +// Check if sensor values have changed enough to warrant transmission +// NOTE: This function ONLY checks for change. Alive/heartbeat logic is handled in loop(). +bool valuesChanged(float dsTempC, float bmeTempC, float bmeHum, float bmePres) { + // Note: Pressure changes do NOT trigger a transmission. Pressure is sent only + // when temperature/humidity trigger or during alive/heartbeat transmissions. + #if ENABLE_CHANGE_DETECTION + if (abs(dsTempC - lastTxDsTempC) >= TEMP_CHANGE_THRESHOLD) return true; + if (abs(bmeTempC - lastTxBmeTempC) >= TEMP_CHANGE_THRESHOLD) return true; + if (abs(bmeHum - lastTxBmeHum) >= HUM_CHANGE_THRESHOLD) return true; + return false; // No significant changes in temp/humidity + #else + return true; // Always transmit if change detection disabled + #endif +} + +// CC1101 SPI functions +void cc1101_WriteReg(uint8_t addr, uint8_t value) { + digitalWrite(CC1101_CS, LOW); + delayMicroseconds(10); + SPI.transfer(addr); + SPI.transfer(value); + delayMicroseconds(10); + digitalWrite(CC1101_CS, HIGH); +} + +uint8_t cc1101_ReadReg(uint8_t addr) { + digitalWrite(CC1101_CS, LOW); + delayMicroseconds(10); + SPI.transfer(addr | 0x80); // Read bit + uint8_t val = SPI.transfer(0); + delayMicroseconds(10); + digitalWrite(CC1101_CS, HIGH); + return val; +} + +void cc1101_WriteStrobe(uint8_t strobe) { + digitalWrite(CC1101_CS, LOW); + delayMicroseconds(10); + SPI.transfer(strobe); + delayMicroseconds(10); + digitalWrite(CC1101_CS, HIGH); +} + +uint8_t cc1101_ReadStatus(uint8_t addr) { + digitalWrite(CC1101_CS, LOW); + delayMicroseconds(10); + SPI.transfer(addr | 0xC0); // Burst read bit + uint8_t val = SPI.transfer(0); + delayMicroseconds(10); + digitalWrite(CC1101_CS, HIGH); + return val; +} + +void cc1101_SetPower(uint8_t paValue) { + digitalWrite(CC1101_CS, LOW); + delayMicroseconds(10); + SPI.transfer(CC1101_PATABLE | 0x40); // Burst write to PATABLE + for (uint8_t i = 0; i < 8; i++) { + SPI.transfer(paValue); + } + delayMicroseconds(10); + digitalWrite(CC1101_CS, HIGH); +} + +// Select TX power based on battery voltage to prevent brown-out +uint8_t getAdaptiveTxPower(uint16_t vccMv) { + if (vccMv >= 3000) return 0xC0; // Full power at 3.0V+ + else if (vccMv >= 2800) return 0x60; // Medium power at 2.8-3.0V (lowered from 0x84) + else return 0x40; // Low power below 2.8V +} + +void cc1101_WriteTxFifo(uint8_t *data, uint8_t len) { + digitalWrite(CC1101_CS, LOW); + delayMicroseconds(10); + SPI.transfer(CC1101_TXFIFO | 0x40); // Burst write + for (uint8_t i = 0; i < len; i++) { + SPI.transfer(data[i]); + } + delayMicroseconds(10); + digitalWrite(CC1101_CS, HIGH); +} + +BmeSample readBmeOnce() { + if (!bme_ok) { + BmeSample s{}; + s.tempC = NAN; s.hum = NAN; s.presHpa = NAN; + return s; + } + BmeSample s{}; + + // Force a single reading + bme.setSampling(Adafruit_BME280::MODE_FORCED, + Adafruit_BME280::SAMPLING_X2, + Adafruit_BME280::SAMPLING_X16, + Adafruit_BME280::SAMPLING_X1); + delay(10); + + s.tempC = bme.readTemperature(); + s.hum = bme.readHumidity(); + s.presHpa = (bme.readPressure() / 100.0F) + PRES_OFFSET; + + // Return to sleep to save power + bme.setSampling(Adafruit_BME280::MODE_SLEEP, + Adafruit_BME280::SAMPLING_X2, + Adafruit_BME280::SAMPLING_X16, + Adafruit_BME280::SAMPLING_X1); + + return s; +} + +// Attempt to recover a stuck I2C bus by toggling SCL until SDA is released +static void i2cBusRecover() { + pinMode(A4, INPUT_PULLUP); // SDA + pinMode(A5, INPUT_PULLUP); // SCL + delay(5); + if (digitalRead(A4) == LOW) { + pinMode(A5, OUTPUT); + for (uint8_t i = 0; i < 9 && digitalRead(A4) == LOW; i++) { + digitalWrite(A5, LOW); + delayMicroseconds(5); + digitalWrite(A5, HIGH); + delayMicroseconds(5); + } + } +} + +float readDs18Median(uint8_t samples = 3) { + float buf[5]; // up to 5 samples + samples = (samples > 5) ? 5 : samples; + for (uint8_t i = 0; i < samples; i++) { + wdt_reset(); // Feed watchdog during slow sensor reads + ds18.requestTemperatures(); + ds18.setWaitForConversion(true); // block until done + buf[i] = ds18.getTempCByIndex(0); + } + // simple insertion sort for small N + for (uint8_t i = 1; i < samples; i++) { + float v = buf[i]; + uint8_t j = i; + while (j > 0 && buf[j - 1] > v) { buf[j] = buf[j - 1]; j--; } + buf[j] = v; + } + return buf[samples / 2]; +} + +float smoothEma(float raw, float alpha = 0.2f) { + static float ema = NAN; + if (isnan(ema)) ema = raw; + else ema = alpha * raw + (1.0f - alpha) * ema; + return ema; +} + +bool waitForIdle(uint16_t timeoutMs) { + unsigned long start = millis(); + while (millis() - start < timeoutMs) { + if ((cc1101_ReadStatus(CC1101_MARCSTATE) & 0x1F) == 0x01) return true; // 0x01 = IDLE + delay(1); + } + return false; +} + +void cc1101_InitFSK() { + Serial.println("Initializing CC1101 in GFSK mode (1.2 kBaud, 5 kHz dev, 58 kHz BW)..."); + + // Reset CC1101 + cc1101_WriteStrobe(CC1101_SRES); + delay(10); + + // Configure for GFSK modulation at 868.3 MHz, 1.2 kBaud, 5 kHz deviation + cc1101_WriteReg(CC1101_IOCFG0, 0x06); + cc1101_WriteReg(CC1101_PKTCTRL0, 0x00); + cc1101_WriteReg(CC1101_PKTCTRL1, 0x00); + cc1101_WriteReg(CC1101_PKTLEN, sizeof(Payload)); + // Debug check: confirm PKTLEN register matches struct size + uint8_t pktlenReg = cc1101_ReadReg(CC1101_PKTLEN); + Serial.print("PKTLEN(struct/reg): "); + Serial.print(sizeof(Payload)); + Serial.print("/"); + Serial.println(pktlenReg); + cc1101_WriteReg(CC1101_SYNC1, 0xD3); + cc1101_WriteReg(CC1101_SYNC0, 0x91); + cc1101_WriteReg(CC1101_CHANNR, 0x00); + cc1101_WriteReg(CC1101_FSCTRL1, 0x06); + + // Set frequency to 868.3 MHz + cc1101_WriteReg(CC1101_FREQ2, 0x21); + cc1101_WriteReg(CC1101_FREQ1, 0x65); + cc1101_WriteReg(CC1101_FREQ0, 0x6A); + + // Modem configuration + cc1101_WriteReg(CC1101_MDMCFG4, 0xF8); + cc1101_WriteReg(CC1101_MDMCFG3, 0x83); + cc1101_WriteReg(CC1101_MDMCFG2, 0x12); + cc1101_WriteReg(CC1101_MDMCFG1, 0x22); + cc1101_WriteReg(CC1101_MDMCFG0, 0xF8); + cc1101_WriteReg(CC1101_DEVIATN, 0x15); + cc1101_WriteReg(CC1101_MCSM0, 0x18); + cc1101_WriteReg(CC1101_FOCCFG, 0x1D); + cc1101_WriteReg(CC1101_BSCFG, 0x1C); + cc1101_WriteReg(CC1101_MCSM1, 0x30); + + // AGC control + cc1101_WriteReg(CC1101_AGCCTRL2, 0xC7); + cc1101_WriteReg(CC1101_AGCCTRL1, 0x00); + cc1101_WriteReg(CC1101_AGCCTRL0, 0xB0); + + // Front-end configuration + cc1101_WriteReg(CC1101_FREND1, 0xB6); + cc1101_WriteReg(CC1101_FREND0, 0x17); + cc1101_WriteReg(CC1101_FSCAL3, 0xEA); + cc1101_WriteReg(CC1101_FSCAL2, 0x2A); + cc1101_WriteReg(CC1101_FSCAL1, 0x00); + cc1101_WriteReg(CC1101_FSCAL0, 0x1F); + + cc1101_WriteReg(CC1101_TEST2, 0x88); + cc1101_WriteReg(CC1101_TEST1, 0x31); + cc1101_WriteReg(CC1101_TEST0, 0x09); + + // Set PA table for maximum TX power (~10dBm) + cc1101_SetPower(0xC0); + + // Calibrate frequency synthesizer + cc1101_WriteStrobe(CC1101_SIDLE); + delay(1); + cc1101_WriteStrobe(CC1101_SCAL); + delay(3); + for (uint16_t i = 0; i < 500; i++) { + uint8_t st = cc1101_ReadStatus(CC1101_MARCSTATE) & 0x1F; + if (st != 0x13 && st != 0x14) break; + delay(1); + } + + Serial.println("CC1101 initialization complete"); +} + +void setup() { + // Disable watchdog immediately + wdt_reset(); + wdt_disable(); + wdt_reset(); + + // Power optimization - disable unused peripherals + power_twi_disable(); // Will re-enable for I2C + power_spi_disable(); // Will re-enable for SPI + ACSR |= (1 << ACD); // Disable analog comparator (~25µA saved) + + // Set up LED pin FIRST + pinMode(LED_PIN, OUTPUT); + digitalWrite(LED_PIN, HIGH); + + wdt_reset(); + + // Initialize serial WITHOUT any delay + Serial.begin(9600); + // NO DELAY - immediate operation + + wdt_reset(); + + Serial.println("\n========== BOOT =========="); + wdt_reset(); + Serial.flush(); + wdt_reset(); + + // Re-enable I2C and SPI for sensor initialization + power_twi_enable(); + power_spi_enable(); + + // Report and persist the last reset cause + uint8_t lastResetFlags = mcusr_mirror; + lastResetFlagsGlobal = lastResetFlags; + EEPROM.update(0, lastResetFlags); + Serial.print("Reset flags: 0x"); + Serial.print(lastResetFlags, HEX); + Serial.print(" ("); + if (lastResetFlags & (1<= ALIVE_INTERVAL) || (lastTxDsTempC == -999.0); + bool shouldTransmit = changed || alive; + + wdt_reset(); + + if (!shouldTransmit) { + Serial.print("No change (cycle "); + Serial.print(cyclesSinceLastTx); + Serial.println("), skipping TX"); + wdt_reset(); + + // Flash LED briefly to show we're alive but not transmitting + digitalWrite(LED_PIN, HIGH); + delay(50); + digitalWrite(LED_PIN, LOW); + wdt_reset(); + + // Sleep for remaining time of this cycle + Serial.flush(); + sleep_1s(); + return; // Skip transmission + } + + // Values changed or alive signal - proceed with transmission + if (changed) { + Serial.print("Values changed, transmitting seq="); + } else { + Serial.print("ALIVE signal, transmitting seq="); + } + wdt_reset(); + Serial.println(seq); + wdt_reset(); + + // Update last transmitted values + lastTxDsTempC = dsTempC; + lastTxBmeTempC = bmeSample.tempC; + lastTxBmeHum = bmeSample.hum; + lastTxBmePres = bmeSample.presHpa; + cyclesSinceLastTx = 0; + + wdt_reset(); + + // Set fixed TX power to 0xC0 (~10 dBm - maximum) + cc1101_SetPower(0xC0); + + // Transmit + cc1101_WriteStrobe(CC1101_SIDLE); + delay(5); + wdt_reset(); + + // Flush TX FIFO + cc1101_WriteStrobe(CC1101_SFTX); + delay(2); + + // Write data to TX FIFO + cc1101_WriteTxFifo((uint8_t*)&p, sizeof(Payload)); + wdt_reset(); + + // Start transmission + cc1101_WriteStrobe(CC1101_STX); + + // Wait until radio returns to IDLE or timeout + bool txDone = waitForIdle(150); + wdt_reset(); + if (!txDone) { + Serial.print("[tx-timeout]"); + } else { + Serial.print("."); + } + wdt_reset(); + + // Go back to idle + cc1101_WriteStrobe(CC1101_SIDLE); + delay(5); + + // Long blink to indicate TX occurred + digitalWrite(LED_PIN, HIGH); + delay(300); + digitalWrite(LED_PIN, LOW); + delay(100); + wdt_reset(); + + Serial.println(" Done!"); + Serial.print("DS: "); Serial.print(dsTempC, 1); + Serial.print(" BME: "); Serial.print(bmeSample.tempC, 1); + Serial.print(" H: "); Serial.print(bmeSample.hum, 1); + Serial.print(" P: "); Serial.print(bmeSample.presHpa, 1); + Serial.print(" VCC: "); Serial.print(vccMv); Serial.println("mV"); + Serial.flush(); + wdt_reset(); + + sleep_1s(); // Sleep for 1 second after transmission +} diff --git a/receiver_diy.py b/receiver_diy.py new file mode 100644 index 0000000..81de17a --- /dev/null +++ b/receiver_diy.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 +""" +Minimal CC1101 GFSK receiver @868.3 MHz +Optimized for sensitivity: 1.2 kBaud, 5 kHz deviation, 58 kHz RX BW +Prints timestamp, RSSI, payload (hex) on each packet. +""" + +import time +import struct +import json +from datetime import datetime, timezone +import spidev +import RPi.GPIO as GPIO +import paho.mqtt.client as mqtt +from cc1101_config import ( + IOCFG2, IOCFG0, FIFOTHR, PKTLEN, PKTCTRL0, PKTCTRL1, + FSCTRL1, FREQ2, FREQ1, FREQ0, MDMCFG4, MDMCFG3, MDMCFG2, MDMCFG1, MDMCFG0, + DEVIATN, MCSM1, MCSM0, FOCCFG, AGCCTRL2, AGCCTRL1, AGCCTRL0, FREND0, + FSCAL3, FSCAL2, FSCAL1, FSCAL0, + SRX, SIDLE, SFRX, SRES, MARCSTATE, RXBYTES, calculate_freq_registers +) + +DIY_FREQ_HZ = 868_300_000 +DIY_SYNC1 = 0xD3 +DIY_SYNC0 = 0x91 +DIY_PACKET_LEN = 17 # Payload with VCC: 17 bytes (magic1, magic2, version, nodeId, seq, temps, hum, pres, vcc, crc) + + +class CC1101DIY: + def __init__(self, bus=0, device=0, gdo0_pin=25): # default BCM25 (pin 22) + self.spi = spidev.SpiDev() + self.spi.open(bus, device) + self.spi.max_speed_hz = 50000 + self.spi.mode = 0 + self.gdo0_pin = gdo0_pin + GPIO.setmode(GPIO.BCM) + GPIO.setup(self.gdo0_pin, GPIO.IN) + print("CC1101 DIY receiver initialized") + + def close(self): + self.spi.close() + GPIO.cleanup() + + def send_strobe(self, strobe): + self.spi.xfer([strobe]) + + def reset(self): + self.send_strobe(SRES) + time.sleep(0.1) + + def write_reg(self, addr, val): + self.spi.xfer([addr, val]) + + def read_status(self, addr): + return self.spi.xfer([addr | 0xC0, 0x00])[1] + + def read_burst(self, addr, length): + return self.spi.xfer([addr | 0xC0] + [0x00] * length)[1:] + + def verify_chip(self): + """Verify CC1101 chip is present and responding correctly""" + # Read PARTNUM (should be 0x00 for CC1101) + partnum = self.read_status(0x30) + # Read VERSION (typically 0x04 or 0x14 for CC1101) + version = self.read_status(0x31) + + print(f"CC1101 Chip Detection:") + print(f" PARTNUM: 0x{partnum:02X} (expected: 0x00)") + print(f" VERSION: 0x{version:02X} (expected: 0x04 or 0x14)") + + if partnum != 0x00: + print(f"ERROR: Invalid PARTNUM. Expected 0x00, got 0x{partnum:02X}") + return False + + if version not in (0x04, 0x14): + if version in (0x00, 0xFF): + print(f"ERROR: No chip detected (VERSION=0x{version:02X}). Check SPI wiring.") + else: + print(f"WARNING: Unexpected VERSION 0x{version:02X}, but proceeding...") + return False + + print("✓ CC1101 chip detected and verified") + return True + + def get_rssi(self): + rssi_dec = self.read_status(0x34) + return ((rssi_dec - 256) / 2 - 74) if rssi_dec >= 128 else (rssi_dec / 2 - 74) + + def get_marc_state(self): + return self.read_status(MARCSTATE) & 0x1F + + def get_rx_bytes(self): + return self.read_status(RXBYTES) & 0x7F + + def flush_rx(self): + self.send_strobe(SFRX) + + def enter_rx(self): + self.send_strobe(SRX) + + def configure(self): + print("Configuring CC1101 for DIY GFSK @868.3 MHz (1.2kBaud, 5kHz dev, 58kHz BW)...") + freq_regs = calculate_freq_registers(DIY_FREQ_HZ) + + # GPIO mapping + self.write_reg(IOCFG2, 0x2E) # GDO2 hi-Z (not wired) + self.write_reg(IOCFG0, 0x06) # GDO0: assert on sync word detected + + # FIFO threshold - low to read quickly and prevent overflow + self.write_reg(FIFOTHR, 0x07) # Trigger at 32 bytes + + # Packet control - fixed length, capture sync'd packets (17-byte payload including VCC) + self.write_reg(PKTLEN, DIY_PACKET_LEN) + self.write_reg(PKTCTRL1, 0x00) # Do NOT append status bytes (matches sender) + self.write_reg(PKTCTRL0, 0x00) # Fixed length, CRC disabled + + # Frequency synthesizer + self.write_reg(FSCTRL1, 0x06) # IF frequency control (matches sender) + self.write_reg(FREQ2, freq_regs[FREQ2]) + self.write_reg(FREQ1, freq_regs[FREQ1]) + self.write_reg(FREQ0, freq_regs[FREQ0]) + + # Modem configuration for GFSK - sensitivity optimized + # MDMCFG4: CHANBW_E=3, CHANBW_M=3 → 58 kHz BW, DRATE_E=8 + self.write_reg(MDMCFG4, 0xF8) # BW=58kHz, DRATE_E=8 (matches sender) + self.write_reg(MDMCFG3, 0x83) # DRATE_M=131 for 1.2 kBaud + self.write_reg(MDMCFG2, 0x12) # GFSK, 16/16 sync word, DC filter ON + self.write_reg(MDMCFG1, 0x22) # 4 preamble bytes, CHANSPC_E=2 + self.write_reg(MDMCFG0, 0xF8) # CHANSPC_M=248 + + # Sync word configuration (0xD391 - matches sender) + self.write_reg(0x04, DIY_SYNC1) # SYNC1 = 0xD3 + self.write_reg(0x05, DIY_SYNC0) # SYNC0 = 0x91 + + # Deviation - 5 kHz for narrow channel + # DEVIATN calculation: (5000 * 2^17) / 26MHz = 25.2 ≈ 0x15 + self.write_reg(DEVIATN, 0x15) # ~5 kHz deviation + + # State machine - auto-calibrate, stay in RX + self.write_reg(MCSM1, 0x3F) # CCA always, stay in RX after RX/TX + self.write_reg(MCSM0, 0x18) # Auto-calibrate from IDLE to RX/TX + + # Frequency offset compensation - enable AFC for better lock + self.write_reg(FOCCFG, 0x1D) # FOC_BS_CS_GATE, FOC_PRE_K=3K, FOC_POST_K=K/2, FOC_LIMIT=BW/4 + self.write_reg(0x1A, 0x1C) # BSCFG: Bit sync config (matches sender) + + # AGC for GFSK sensitivity - match sender settings + self.write_reg(AGCCTRL2, 0xC7) # Max DVGA gain, target 42 dB (matches sender) + self.write_reg(AGCCTRL1, 0x00) # LNA priority, AGC relative threshold (matches sender) + self.write_reg(AGCCTRL0, 0xB0) # Medium hysteresis, 16 samples + + # Front end - match sender configuration exactly + self.write_reg(0x21, 0xB6) # FREND1: PA current = max (matches sender) + self.write_reg(FREND0, 0x17) # FREND0: PA_POWER index = 7 (matches sender) + + # Calibration + self.write_reg(FSCAL3, 0xE9) + self.write_reg(FSCAL2, 0x2A) + self.write_reg(FSCAL1, 0x00) + self.write_reg(FSCAL0, 0x1F) + + print("Configuration applied") + + def read_packet(self): + # Fixed-length: expect 17-byte payload (with VCC) + num_bytes = self.get_rx_bytes() + + if self.read_status(RXBYTES) & 0x80: + self.flush_rx() + return None, False + + if num_bytes >= DIY_PACKET_LEN: + data = self.read_burst(0x3F, DIY_PACKET_LEN) + return data, True + return None, False + + +def reverse_bits_byte(b): + """Reverse bit order in a byte (MSB<->LSB)""" + result = 0 + for i in range(8): + result = (result << 1) | ((b >> i) & 1) + return result + + +def main(): + # Setup MQTT client + mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv311) + mqtt_host = "192.168.43.102" + mqtt_topic = "rtl_433/DietPi/events" + + try: + mqtt_client.connect(mqtt_host, 1883, 60) + mqtt_client.loop_start() + print(f"Connected to MQTT broker at {mqtt_host}") + except Exception as e: + print(f"Warning: Could not connect to MQTT broker: {e}") + print("Continuing without MQTT...") + + radio = CC1101DIY() + try: + radio.reset() + + # Verify chip is correctly recognized + if not radio.verify_chip(): + print("\nTroubleshooting:") + print(" 1. Check SPI is enabled: sudo raspi-config") + print(" 2. Verify wiring: MOSI, MISO, SCLK, CSN, GND, VCC") + print(" 3. Check power supply (3.3V, sufficient current)") + print(" 4. Try running with sudo for GPIO/SPI permissions") + return + + radio.configure() + radio.enter_rx() + print("Listening... Press Ctrl+C to stop\n") + + packet_count = 0 + last_status = time.time() + + while True: + payload, crc_ok = radio.read_packet() + if payload and len(payload) == DIY_PACKET_LEN: + fmt = '= 5: + state = radio.get_marc_state() + + # Check if stuck (not in RX state) and recover + if state != 13: # 13 = RX + ts_status = datetime.now(timezone.utc).isoformat() + print(f"[{ts_status}] WARNING: Not in RX state ({state}) - recovering...") + radio.flush_rx() + radio.send_strobe(SIDLE) + time.sleep(0.01) + radio.enter_rx() + time.sleep(0.01) # Let RX stabilize + # Normal operation: silent (no status spam) + + last_status = time.time() + + time.sleep(0.0001) # Poll at 10kHz to catch all packets + + except KeyboardInterrupt: + print("\nStopping...") + finally: + mqtt_client.loop_stop() + mqtt_client.disconnect() + radio.close() + + +if __name__ == "__main__": + main() diff --git a/test_pool_decode.py b/test_pool_decode.py index a2df765..3914a0f 100644 --- a/test_pool_decode.py +++ b/test_pool_decode.py @@ -8,6 +8,12 @@ from typing import Optional, Tuple, Dict PAYLOAD_SIZE = 15 MAGIC1 = 0x42 MAGIC2 = 0x99 +RESET_FLAG_MAP = [ + (0x1, "PORF (power-on)"), + (0x2, "EXTRF (external reset)"), + (0x4, "BORF (brown-out)"), + (0x8, "WDRF (watchdog)"), +] def crc8_xor(data: bytes) -> int: @@ -18,6 +24,13 @@ def crc8_xor(data: bytes) -> int: return c +def parse_version_and_reset_flags(version_byte: int): + protocol_version = version_byte & 0x0F + reset_flags = (version_byte >> 4) & 0x0F + reset_causes = [desc for bit, desc in RESET_FLAG_MAP if reset_flags & bit] + return protocol_version, reset_flags, reset_causes + + def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = None): """Scan a byte stream for a plausible pool payload. @@ -34,7 +47,7 @@ def decode_pool_payload(candidate_bytes: bytes, expected_seq: Optional[int] = No for offset in range(0, len(candidate_bytes) - PAYLOAD_SIZE + 1): chunk = candidate_bytes[offset:offset + PAYLOAD_SIZE] try: - magic1, magic2, version, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack( + magic1, magic2, version_byte, nodeId, seq, t_ds10, t_bme10, hum10, pres1, crc_received = struct.unpack( ' bytes: +def build_payload(seq: int, t_ds10: int, t_bme10: int, hum10: int, pres1: int, reset_flags: int = 0) -> bytes: """Build a valid payload with CRC appended.""" - header = struct.pack('