#!/usr/bin/env python3 """ Gadgetbridge MQTT Publisher for Termux Watches for Gadgetbridge exports and publishes sensor data to Home Assistant via MQTT Updated with improved sleep detection and HR-derived sleep stage """ import os import sqlite3 import json import logging import sys import time import re import signal from datetime import datetime, timedelta from pathlib import Path # MQTT library - paho-mqtt works well on Termux try: import paho.mqtt.client as mqtt except ImportError: if "PYTEST_CURRENT_TEST" in os.environ: mqtt = None else: print("Error: paho-mqtt not installed. Run: pip install paho-mqtt") sys.exit(1) # --- Configuration --- CONFIG_FILE = os.path.expanduser("~/.config/gadgetbridge_mqtt/config.json") GB_EXPORT_DIR = "/storage/emulated/0/Documents/GB_Export" PUBLISH_INTERVAL = 300 # 5 minutes STALE_THRESHOLD_SECONDS = 1200 # 20 minutes (Force reconnect if no new data for this long) # --- Logging --- logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", stream=sys.stdout ) logger = logging.getLogger(__name__) def load_config(): """Load MQTT configuration from config file""" if not os.path.exists(CONFIG_FILE): logger.error(f"Config file not found: {CONFIG_FILE}") logger.error("Run setup.py first to configure MQTT settings") sys.exit(1) with open(CONFIG_FILE, "r") as f: return json.load(f) def send_gadgetbridge_intent(action, extra_args=""): """Send broadcast intent to Gadgetbridge via Termux API""" cmd = f"am broadcast -a {action} {extra_args} -p nodomain.freeyourgadget.gadgetbridge" logger.info(f"Sending intent: {action}") result = os.system(cmd) if result != 0: logger.warning(f"Intent may have failed (exit code: {result})") return result == 0 def trigger_bluetooth_connect(device_mac): """Force Gadgetbridge to connect to the device via Bluetooth.""" if not device_mac: logger.warning("No device MAC address configured, skipping Bluetooth connect") return False action = "nodomain.freeyourgadget.gadgetbridge.BLUETOOTH_CONNECT" extra = f"-e EXTRA_DEVICE_ADDRESS '{device_mac}'" return send_gadgetbridge_intent(action, extra) def trigger_bluetooth_reconnect(device_mac): """Force a full disconnect cycle for zombie recovery.""" if not device_mac: logger.warning("No device MAC address available for reconnection") return False logger.warning("Initiating Force Disconnect (Zombie Recovery)...") send_gadgetbridge_intent( "nodomain.freeyourgadget.gadgetbridge.BLUETOOTH_DISCONNECT", f"-e EXTRA_DEVICE_ADDRESS '{device_mac}'" ) logger.info("Waiting 15s for Bluetooth stack to clear...") time.sleep(15) return True def trigger_gadgetbridge_sync(device_mac=None): """Trigger Gadgetbridge to sync data from band and export database.""" if device_mac: trigger_bluetooth_connect(device_mac) time.sleep(10) send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.ACTIVITY_SYNC") time.sleep(10) send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.TRIGGER_EXPORT") def find_latest_db(export_dir): """Find the most recent Gadgetbridge database file in export directory""" if not os.path.exists(export_dir): return None db_files = [] for f in os.listdir(export_dir): if f.endswith(".db") and "Gadgetbridge" in f: path = os.path.join(export_dir, f) db_files.append((path, os.path.getmtime(path))) if not db_files: return None db_files.sort(key=lambda x: x[1], reverse=True) return db_files[0][0] class GadgetbridgeMQTT: def __init__(self, config): self.config = config self.db_path = None self.device_name = "fitness_tracker" self.device_alias = "Unknown" self.device_id = None self.device_mac = None self.device_type = "unknown" self.mqtt_client = None self.last_publish_time = 0 self.last_db_mtime = 0 self.running = True self.last_data_timestamp = 0 self.is_sleeping = False signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) def _signal_handler(self, signum, frame): logger.info(f"Received signal {signum}. Shutting down...") self.running = False @staticmethod def _compute_awake(is_awake_flag, wakeup_raw, stage_code, stage_timestamp_ms, now_ms, avg_recent_hr=None, resting_hr=None): """Enhanced awake logic: Strong bias to sleeping during active sessions.""" # 1. Past wakeup -> definitely awake if wakeup_raw is not None and wakeup_raw <= now_ms: return True # 2. Active sleep session? Bias heavily to sleeping in_session = wakeup_raw is not None and wakeup_raw > now_ms if in_session: recent_stage = stage_timestamp_ms is not None and now_ms - stage_timestamp_ms <= 2 * 60 * 60 * 1000 if recent_stage and stage_code == 5: # Recent AWAKE stage return True if recent_stage and stage_code in (2, 3, 4): return False # No recent stage in session -> assume sleeping (common for Xiaomi) return False # 3. Not in session: Use stages or HR (30min recency) recent_stage = stage_timestamp_ms is not None and now_ms - stage_timestamp_ms <= 30 * 60 * 1000 if recent_stage: if stage_code == 5: return True if stage_code in (2, 3, 4): return False # 4. HR fallback (tighter threshold) if avg_recent_hr is not None: hr_threshold = (resting_hr or 60) + 5 if resting_hr else 60 if avg_recent_hr < hr_threshold: return False # 5. Default: awake return True @staticmethod def _hr_to_sleep_stage(avg_hr, resting_hr): """Derive sleep stage from HR (fallback when band stages unreliable).""" if not avg_hr or not resting_hr: return "unknown" ratio = avg_hr / resting_hr if ratio < 0.85: return "deep_sleep" elif ratio < 1.1: return "light_sleep" elif ratio < 1.3: return "rem_sleep" else: return "awake" def connect_mqtt(self): if mqtt is None: logger.error("paho-mqtt not available") return False try: self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) except (AttributeError, TypeError): self.mqtt_client = mqtt.Client() if self.config.get("mqtt_username") and self.config.get("mqtt_password"): self.mqtt_client.username_pw_set( self.config["mqtt_username"], self.config["mqtt_password"] ) try: self.mqtt_client.connect(self.config["mqtt_broker"], self.config.get("mqtt_port", 1883), 60) self.mqtt_client.loop_start() time.sleep(1) logger.info(f"Connected to MQTT broker: {self.config['mqtt_broker']}") return True except Exception as e: logger.error(f"MQTT connection failed: {e}") return False def disconnect_mqtt(self): if self.mqtt_client: self.mqtt_client.loop_stop() self.mqtt_client.disconnect() def detect_device_type(self, cursor, device_id): """Detect if device is Xiaomi or Hybrid (Fossil) based on which table has data for this device""" try: # Check which table actually has data for this device_id cursor.execute("SELECT COUNT(*) FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ?", (device_id,)) hybrid_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ?", (device_id,)) xiaomi_count = cursor.fetchone()[0] logger.info(f"Device type detection: HYBRID samples={hybrid_count}, XIAOMI samples={xiaomi_count}") if hybrid_count > 0: return 'hybrid' elif xiaomi_count > 0: return 'xiaomi' except Exception as e: logger.debug(f"Device type detection error: {e}") return 'unknown' def get_device_info(self, cursor): try: cursor.execute(""" SELECT d._id, d.ALIAS, d.NAME, d.IDENTIFIER FROM DEVICE d WHERE (LOWER(d.NAME) LIKE '%band%' OR LOWER(d.NAME) LIKE '%watch%' OR LOWER(d.NAME) LIKE '%fossil%' OR LOWER(d.NAME) LIKE '%hybrid%') ORDER BY d._id DESC LIMIT 1 """) row = cursor.fetchone() if row: device_id = row[0] device_alias = row[1] if row[1] else row[2] device_mac = row[3] device_type = self.detect_device_type(cursor, device_id) logger.info(f"Selected device: ID={device_id}, Name={device_alias}, MAC={device_mac}, Type={device_type}") self.device_type = device_type return device_id, device_alias, device_mac except Exception as e: logger.error(f"Error getting device info: {e}") return None, "Unknown", None def get_day_start_timestamp(self): now = datetime.now() today = now.date() day_start = datetime.combine(today, datetime.min.time()).replace(hour=4) if now.hour < 4: day_start -= timedelta(days=1) return int(day_start.timestamp()) def get_day_midnight_timestamp(self): now = datetime.now() today = now.date() midnight = datetime.combine(today, datetime.min.time()) if now.hour < 4: midnight -= timedelta(days=1) return int(midnight.timestamp()) def query_sensors(self, cursor): data = {} day_start_ts = self.get_day_start_timestamp() now_ts = int(datetime.now().timestamp()) day_midnight = self.get_day_midnight_timestamp() now_ms = int(time.time() * 1000) # Sleep stage (first) stage_code = None stage_timestamp_ms = None try: if self.device_type == 'xiaomi': cursor.execute(""" SELECT STAGE, TIMESTAMP FROM XIAOMI_SLEEP_STAGE_SAMPLE WHERE DEVICE_ID = ? ORDER BY TIMESTAMP DESC LIMIT 1 """, (self.device_id,)) row = cursor.fetchone() if row: stage_code, stage_timestamp_ms = row elif self.device_type == 'hybrid': # Hybrid/Fossil doesn't have dedicated sleep stage table pass else: # Try Xiaomi table try: cursor.execute(""" SELECT STAGE, TIMESTAMP FROM XIAOMI_SLEEP_STAGE_SAMPLE WHERE DEVICE_ID = ? ORDER BY TIMESTAMP DESC LIMIT 1 """, (self.device_id,)) row = cursor.fetchone() if row: stage_code, stage_timestamp_ms = row except: pass except Exception as e: logger.debug(f"Sleep stage query failed: {e}") # Recent HR (5min now, tighter) avg_recent_hr = None try: if self.device_type == 'xiaomi': cursor.execute(""" SELECT AVG(HEART_RATE) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 AND TIMESTAMP >= ? """, (self.device_id, now_ts - 300)) elif self.device_type == 'hybrid': cursor.execute(""" SELECT AVG(HEART_RATE) FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 AND TIMESTAMP >= ? """, (self.device_id, now_ts - 300)) else: # Generic fallback try: cursor.execute(""" SELECT AVG(HEART_RATE) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 AND TIMESTAMP >= ? """, (self.device_id, now_ts - 300)) except: cursor.execute(""" SELECT AVG(HEART_RATE) FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 AND TIMESTAMP >= ? """, (self.device_id, now_ts - 300)) row = cursor.fetchone() if row and row[0]: avg_recent_hr = row[0] except Exception as e: logger.debug(f"Recent HR query failed: {e}") # Resting HR resting_hr = None try: if self.device_type == 'xiaomi': cursor.execute(""" SELECT HR_RESTING FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE DEVICE_ID = ? AND HR_RESTING > 0 ORDER BY TIMESTAMP DESC LIMIT 1 """, (self.device_id,)) elif self.device_type == 'hybrid': # Hybrid doesn't have daily summary, calculate from HR samples cursor.execute(""" SELECT AVG(HEART_RATE) FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 100 """, (self.device_id,)) else: try: cursor.execute(""" SELECT HR_RESTING FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE DEVICE_ID = ? AND HR_RESTING > 0 ORDER BY TIMESTAMP DESC LIMIT 1 """, (self.device_id,)) except: cursor.execute(""" SELECT AVG(HEART_RATE) FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 100 """, (self.device_id,)) row = cursor.fetchone() if row and row[0]: resting_hr = row[0] except Exception as e: logger.debug(f"Resting HR query failed: {e}") # Steps, battery, etc. try: if self.device_type == 'xiaomi': cursor.execute("SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?", (self.device_id, day_start_ts, now_ts)) elif self.device_type == 'hybrid': # Debug: Check what data exists cursor.execute("SELECT COUNT(*), MIN(TIMESTAMP), MAX(TIMESTAMP) FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ?", (self.device_id,)) debug_row = cursor.fetchone() if debug_row: logger.info(f"Hybrid Debug: {debug_row[0]} total samples, timestamp range: {debug_row[1]} to {debug_row[2]}") logger.info(f"Querying steps between {day_start_ts} and {now_ts}") cursor.execute("SELECT SUM(STEPS) FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?", (self.device_id, day_start_ts, now_ts)) else: try: cursor.execute("SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?", (self.device_id, day_start_ts, now_ts)) except: cursor.execute("SELECT SUM(STEPS) FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?", (self.device_id, day_start_ts, now_ts)) data["daily_steps"] = cursor.fetchone()[0] or 0 except Exception as e: logger.debug(f"Daily steps query failed: {e}") try: now = datetime.now() week_start = now.date() - timedelta(days=now.date().weekday()) week_start_time = datetime.combine(week_start, datetime.min.time()).replace(hour=4) if now.date().weekday() == 0 and now.hour < 4: week_start_time -= timedelta(days=7) week_start_ts = int(week_start_time.timestamp()) if self.device_type == 'xiaomi': cursor.execute("SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?", (self.device_id, week_start_ts, now_ts)) elif self.device_type == 'hybrid': cursor.execute("SELECT SUM(STEPS) FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?", (self.device_id, week_start_ts, now_ts)) else: try: cursor.execute("SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?", (self.device_id, week_start_ts, now_ts)) except: cursor.execute("SELECT SUM(STEPS) FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?", (self.device_id, week_start_ts, now_ts)) data["weekly_steps"] = cursor.fetchone()[0] or 0 except Exception as e: logger.debug(f"Weekly steps query failed: {e}") try: cursor.execute("SELECT LEVEL FROM BATTERY_LEVEL WHERE DEVICE_ID = ? ORDER BY TIMESTAMP DESC LIMIT 1", (self.device_id,)) row = cursor.fetchone() if row: data["battery_level"] = row[0] except Exception as e: logger.debug(f"Battery query failed: {e}") try: if self.device_type == 'xiaomi': cursor.execute("SELECT HEART_RATE, TIMESTAMP FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 1", (self.device_id,)) elif self.device_type == 'hybrid': # Debug: Check for HR data cursor.execute("SELECT COUNT(*), MAX(TIMESTAMP) FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0", (self.device_id,)) debug_row = cursor.fetchone() if debug_row: logger.info(f"Hybrid HR Debug: {debug_row[0]} HR samples, latest timestamp: {debug_row[1]}") cursor.execute("SELECT HEART_RATE, TIMESTAMP FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 1", (self.device_id,)) else: try: cursor.execute("SELECT HEART_RATE, TIMESTAMP FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 1", (self.device_id,)) except: cursor.execute("SELECT HEART_RATE, TIMESTAMP FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 1", (self.device_id,)) row = cursor.fetchone() if row: data["heart_rate"] = row[0] # Hybrid/Fossil uses seconds like Xiaomi hr_timestamp = row[1] if hr_timestamp > self.last_data_timestamp: self.last_data_timestamp = hr_timestamp else: logger.info("No heart rate data found") except Exception as e: logger.error(f"Heart rate query failed: {e}") try: if self.device_type == 'xiaomi': cursor.execute("SELECT HR_RESTING, HR_MAX, HR_AVG, CALORIES FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1", (self.device_id, day_midnight * 1000)) row = cursor.fetchone() if row: if row[0]: data["hr_resting"] = row[0] if row[1]: data["hr_max"] = row[1] if row[2]: data["hr_avg"] = row[2] if row[3]: data["calories"] = row[3] elif self.device_type == 'hybrid': # Calculate stats from HR samples and get calories for today cursor.execute("SELECT MIN(HEART_RATE), MAX(HEART_RATE), AVG(HEART_RATE), SUM(CALORIES) FROM HYBRID_HRACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ?", (self.device_id, day_midnight)) row = cursor.fetchone() if row: if row[0] and row[0] > 0: data["hr_resting"] = row[0] if row[1] and row[1] < 255: data["hr_max"] = row[1] if row[2]: data["hr_avg"] = int(row[2]) if row[3]: data["calories"] = row[3] except Exception as e: logger.debug(f"Daily summary query failed: {e}") # Enhanced Sleep Data try: day_ago_ts_ms = (int(time.time()) - 24 * 3600) * 1000 if self.device_type == 'xiaomi': cursor.execute(""" SELECT TIMESTAMP, WAKEUP_TIME, TOTAL_DURATION, DEEP_SLEEP_DURATION, LIGHT_SLEEP_DURATION, REM_SLEEP_DURATION FROM XIAOMI_SLEEP_TIME_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? ORDER BY TIMESTAMP DESC """, (self.device_id, day_ago_ts_ms)) elif self.device_type == 'hybrid': # Hybrid doesn't have dedicated sleep tables, skip for now cursor.execute("SELECT NULL, NULL, NULL, NULL, NULL, NULL WHERE 1=0") else: try: cursor.execute(""" SELECT TIMESTAMP, WAKEUP_TIME, TOTAL_DURATION, DEEP_SLEEP_DURATION, LIGHT_SLEEP_DURATION, REM_SLEEP_DURATION FROM XIAOMI_SLEEP_TIME_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? ORDER BY TIMESTAMP DESC """, (self.device_id, day_ago_ts_ms)) except: cursor.execute("SELECT NULL, NULL, NULL, NULL, NULL, NULL WHERE 1=0") sessions = cursor.fetchall() total_sleep_min = 0 last_wakeup_ms = None for sess_row in sessions: sess_start, sess_wake, sess_total, sess_deep, sess_light, sess_rem = sess_row if last_wakeup_ms is not None: gap_hours = (last_wakeup_ms - sess_wake) / 1000 / 3600 if sess_wake else 999 if gap_hours > 2: break sess_min = 0 if sess_deep or sess_light or sess_rem: sess_min = (sess_deep or 0) + (sess_light or 0) + (sess_rem or 0) elif sess_total: sess_min = sess_total total_sleep_min += sess_min last_wakeup_ms = sess_start if total_sleep_min > 0: data["sleep_duration"] = round(total_sleep_min / 60.0, 2) # Current session if self.device_type == 'xiaomi': cursor.execute(""" SELECT TOTAL_DURATION, IS_AWAKE, WAKEUP_TIME, TIMESTAMP, DEEP_SLEEP_DURATION, LIGHT_SLEEP_DURATION, REM_SLEEP_DURATION, AWAKE_DURATION FROM XIAOMI_SLEEP_TIME_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ? ORDER BY TIMESTAMP DESC LIMIT 1 """, (self.device_id, day_ago_ts_ms, now_ms)) elif self.device_type == 'hybrid': # No sleep session data for Hybrid cursor.execute("SELECT NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL WHERE 1=0") else: try: cursor.execute(""" SELECT TOTAL_DURATION, IS_AWAKE, WAKEUP_TIME, TIMESTAMP, DEEP_SLEEP_DURATION, LIGHT_SLEEP_DURATION, REM_SLEEP_DURATION, AWAKE_DURATION FROM XIAOMI_SLEEP_TIME_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ? ORDER BY TIMESTAMP DESC LIMIT 1 """, (self.device_id, day_ago_ts_ms, now_ms)) except: cursor.execute("SELECT NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL WHERE 1=0") row = cursor.fetchone() if row: (total_duration, is_awake_flag, wakeup_raw, sleep_start_ms, deep_dur, light_dur, rem_dur, awake_dur) = row in_sleep_session = sleep_start_ms and wakeup_raw and sleep_start_ms <= now_ms < wakeup_raw data["in_sleep_session"] = in_sleep_session is_awake = self._compute_awake( is_awake_flag, wakeup_raw, stage_code, stage_timestamp_ms, now_ms, avg_recent_hr, resting_hr ) data["is_awake"] = is_awake self.is_sleeping = not is_awake # Primary sleep stage (relaxed recency: 24h) stage_names = {0: "not_sleep", 1: "unknown", 2: "deep_sleep", 3: "light_sleep", 4: "rem_sleep", 5: "awake"} if stage_code is not None and stage_timestamp_ms: stage_age_minutes = (now_ms - stage_timestamp_ms) / 1000 / 60 if in_sleep_session or stage_age_minutes <= 1440: # 24h data["sleep_stage"] = stage_names.get(stage_code, f"unknown_{stage_code}") data["sleep_stage_code"] = stage_code else: data["sleep_stage"] = "not_sleep" data["sleep_stage_code"] = 0 else: data["sleep_stage"] = "not_sleep" if is_awake else "unknown" data["sleep_stage_code"] = 0 # NEW: HR-derived sleep stage (always available) hr_stage = self._hr_to_sleep_stage(avg_recent_hr, resting_hr) data["sleep_stage_hr"] = hr_stage data["avg_recent_hr"] = round(avg_recent_hr, 1) if avg_recent_hr else None # Debug logging stage_info = f"{stage_code}@{stage_age_minutes:.0f}min" if stage_timestamp_ms else 'none' recent_hr_val = f"{avg_recent_hr:.0f}" if avg_recent_hr else 'N/A' logger.info(f"Sleep debug: in_session={in_sleep_session}, is_awake={is_awake}, " f"stage={stage_info}, hr_stage={hr_stage}, recent_hr={recent_hr_val}, resting={resting_hr}") else: data["is_awake"] = True data["in_sleep_session"] = False data["sleep_stage"] = "not_sleep" data["sleep_stage_code"] = 0 data["sleep_stage_hr"] = "awake" self.is_sleeping = False except Exception as e: logger.debug(f"Sleep query failed: {e}") # Weight try: cursor.execute("SELECT WEIGHT_KG FROM MI_SCALE_WEIGHT_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1") row = cursor.fetchone() if row and row[0]: data["weight"] = round(row[0], 1) except Exception as e: logger.debug(f"Weight query failed: {e}") data["server_time"] = datetime.now().astimezone().isoformat() data["device"] = self.device_alias return data def publish_discovery(self): device_info = { "identifiers": [self.device_name], "name": "Fitness Tracker", "model": self.device_alias, "manufacturer": "Gadgetbridge", } sensors = [ ("device", "Device", None, "mdi:watch", None, None), ("daily_steps", "Daily Steps", "steps", "mdi:walk", "total_increasing", None), ("weekly_steps", "Weekly Steps", "steps", "mdi:walk", "total", None), ("battery_level", "Battery", "%", "mdi:battery", None, "battery"), ("heart_rate", "Heart Rate", "bpm", "mdi:heart-pulse", "measurement", None), ("hr_resting", "Resting HR", "bpm", "mdi:heart-pulse", "measurement", None), ("hr_max", "Max HR", "bpm", "mdi:heart-pulse", "measurement", None), ("hr_avg", "Average HR", "bpm", "mdi:heart-pulse", "measurement", None), ("avg_recent_hr", "Recent Avg HR", "bpm", "mdi:heart-pulse", "measurement", None), ("calories", "Calories", "kcal", "mdi:fire", "total_increasing", None), ("sleep_duration", "Sleep Duration", "h", "mdi:sleep", "measurement", None), ("is_awake", "Is Awake", None, "mdi:power-sleep", None, None), ("in_sleep_session", "In Sleep Session", None, "mdi:bed", None, None), ("sleep_stage", "Sleep Stage (Band)", None, "mdi:sleep-cycle", None, None), ("sleep_stage_code", "Sleep Stage Code", None, "mdi:numeric", "measurement", None), ("sleep_stage_hr", "Sleep Stage (HR)", None, "mdi:heart-pulse-outline", None, None), ("weight", "Weight", "kg", "mdi:scale-bathroom", "measurement", None), ("server_time", "Last Update", None, "mdi:clock-outline", None, "timestamp"), ] for sensor_id, name, unit, icon, state_class, device_class in sensors: config = { "name": f"Fitness Tracker {name}", "unique_id": f"{self.device_name}_{sensor_id}", "state_topic": f"gadgetbridge/{self.device_name}/{sensor_id}", "device": device_info, "icon": icon, } if unit: config["unit_of_measurement"] = unit if state_class: config["state_class"] = state_class if device_class: config["device_class"] = device_class topic = f"homeassistant/sensor/{self.device_name}_{sensor_id}/config" self.mqtt_client.publish(topic, json.dumps(config), qos=1, retain=True) logger.info("Published Home Assistant discovery configs") def publish_data(self, data): for key, value in data.items(): if value is not None: # Skip None values topic = f"gadgetbridge/{self.device_name}/{key}" self.mqtt_client.publish(topic, str(value), qos=1, retain=True) sleep_info = f"sleep={data.get('sleep_duration', 0)}h" if 'sleep_duration' in data else "sleep=N/A" awake_info = f"awake={data.get('is_awake', 'N/A')}" logger.info(f"Published: steps={data.get('daily_steps', 'N/A')}, hr={data.get('heart_rate', 'N/A')}, " f"battery={data.get('battery_level', 'N/A')}%, {sleep_info}, {awake_info}, " f"stage={data.get('sleep_stage', 'N/A')}/{data.get('sleep_stage_hr', 'N/A')}") def process_database(self): if not self.db_path or not os.path.exists(self.db_path): logger.warning("No database file found") return False try: conn = sqlite3.connect(self.db_path, timeout=10.0) cursor = conn.cursor() self.device_id, self.device_alias, self.device_mac = self.get_device_info(cursor) if not self.device_id: logger.warning("No fitness device found") conn.close() return False data = self.query_sensors(cursor) conn.close() if data: self.publish_data(data) return True except Exception as e: logger.error(f"Database error: {e}") return False def check_and_recover_connection(self): if self.last_data_timestamp == 0: return threshold = STALE_THRESHOLD_SECONDS * 3 if self.is_sleeping else STALE_THRESHOLD_SECONDS current_ts = int(time.time()) time_diff = current_ts - self.last_data_timestamp if time_diff > threshold: logger.warning(f"Data stale ({time_diff}s > {threshold}s). Triggering recovery...") if self.device_mac: trigger_bluetooth_reconnect(self.device_mac) self.last_data_timestamp = current_ts else: logger.warning("Cannot recover: No device MAC") def run(self): export_dir = self.config.get("export_dir", GB_EXPORT_DIR) interval = self.config.get("publish_interval", PUBLISH_INTERVAL) logger.info(f"Starting Gadgetbridge MQTT Publisher (Enhanced Sleep)") logger.info(f"Export directory: {export_dir}, interval: {interval}s") os.makedirs(export_dir, exist_ok=True) if not self.connect_mqtt(): logger.error("Failed to connect to MQTT. Exiting.") sys.exit(1) discovery_published = False try: while self.running: current_time = time.time() if current_time - self.last_publish_time >= interval: try: self.check_and_recover_connection() logger.info("Triggering Gadgetbridge sync...") trigger_gadgetbridge_sync(self.device_mac) time.sleep(5) self.db_path = find_latest_db(export_dir) if not self.db_path: logger.warning("No database found, retrying...") time.sleep(3) self.db_path = find_latest_db(export_dir) if self.db_path: logger.info(f"Using database: {os.path.basename(self.db_path)}") if not discovery_published: try: conn = sqlite3.connect(self.db_path, timeout=5.0) cursor = conn.cursor() self.device_id, self.device_alias, self.device_mac = self.get_device_info(cursor) conn.close() if self.device_mac: logger.info(f"Device MAC: {self.device_mac}") except Exception as e: logger.warning(f"Could not read device info: {e}") self.publish_discovery() discovery_published = True self.process_database() else: logger.warning(f"No database in {export_dir}") except Exception as e: logger.error(f"Sync cycle failed: {e}") self.last_publish_time = current_time logger.info(f"Next publish in {interval}s...") time.sleep(10) except KeyboardInterrupt: logger.info("Interrupted by user") finally: logger.info("Cleaning up...") self.disconnect_mqtt() def main(): config = load_config() publisher = GadgetbridgeMQTT(config) publisher.run() if __name__ == "__main__": main()