#!/usr/bin/env python3 """ Gadgetbridge MQTT Publisher for Termux Watches for Gadgetbridge exports and publishes sensor data to Home Assistant via MQTT """ import os import sqlite3 import json import logging import sys import time import re import signal from datetime import datetime, timedelta from pathlib import Path # MQTT library - paho-mqtt works well on Termux try: import paho.mqtt.client as mqtt except ImportError: if "PYTEST_CURRENT_TEST" in os.environ: mqtt = None else: print("Error: paho-mqtt not installed. Run: pip install paho-mqtt") sys.exit(1) # --- Configuration --- CONFIG_FILE = os.path.expanduser("~/.config/gadgetbridge_mqtt/config.json") GB_EXPORT_DIR = "/storage/emulated/0/Documents/GB_Export" PUBLISH_INTERVAL = 300 # 5 minutes STALE_THRESHOLD_SECONDS = 1200 # 20 minutes (Force reconnect if no new data for this long) # --- Logging --- logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", stream=sys.stdout ) logger = logging.getLogger(__name__) def load_config(): """Load MQTT configuration from config file""" if not os.path.exists(CONFIG_FILE): logger.error(f"Config file not found: {CONFIG_FILE}") logger.error("Run setup.py first to configure MQTT settings") sys.exit(1) with open(CONFIG_FILE, "r") as f: return json.load(f) def send_gadgetbridge_intent(action, extra_args=""): """Send broadcast intent to Gadgetbridge via Termux API""" cmd = f"am broadcast -a {action} {extra_args} -p nodomain.freeyourgadget.gadgetbridge" logger.info(f"Sending intent: {action}") result = os.system(cmd) if result != 0: logger.warning(f"Intent may have failed (exit code: {result})") return result == 0 def trigger_bluetooth_connect(device_mac): """Force Gadgetbridge to connect to the device via Bluetooth. This fixes the 'disconnected' state that can occur when the band loses connection during sleep or when the phone is idle. """ if not device_mac: logger.warning("No device MAC address configured, skipping Bluetooth connect") return False action = "nodomain.freeyourgadget.gadgetbridge.BLUETOOTH_CONNECT" extra = f"-e EXTRA_DEVICE_ADDRESS '{device_mac}'" return send_gadgetbridge_intent(action, extra) def trigger_bluetooth_reconnect(device_mac): """ Force a full disconnect cycle. Used when data is stale to clear 'zombie' connection states. Note: We only Disconnect here. The subsequent normal sync cycle will handle the Connect, effectively completing the reset. """ if not device_mac: logger.warning("No device MAC address available for reconnection") return False logger.warning("Initiating Force Disconnect (Zombie Recovery)...") # 1. Force Disconnect send_gadgetbridge_intent( "nodomain.freeyourgadget.gadgetbridge.BLUETOOTH_DISCONNECT", f"-e EXTRA_DEVICE_ADDRESS '{device_mac}'" ) # Wait for the stack to clear logger.info("Waiting 15s for Bluetooth stack to clear...") time.sleep(15) return True def trigger_gadgetbridge_sync(device_mac=None): """Trigger Gadgetbridge to sync data from band and export database. If device_mac is provided, first forces a Bluetooth reconnection. """ # Step 1: Force Bluetooth connection (fixes disconnected state) if device_mac: trigger_bluetooth_connect(device_mac) time.sleep(10) # Wait for Bluetooth connection to establish # Step 2: Sync activity data from band to phone send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.ACTIVITY_SYNC") time.sleep(10) # Wait for sync to complete # Step 3: Trigger database export send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.TRIGGER_EXPORT") def find_latest_db(export_dir): """Find the most recent Gadgetbridge database file in export directory""" if not os.path.exists(export_dir): return None db_files = [] for f in os.listdir(export_dir): if f.endswith(".db") and "Gadgetbridge" in f: path = os.path.join(export_dir, f) db_files.append((path, os.path.getmtime(path))) if not db_files: return None # Return the most recently modified database db_files.sort(key=lambda x: x[1], reverse=True) return db_files[0][0] class GadgetbridgeMQTT: def __init__(self, config): self.config = config self.db_path = None self.device_name = "fitness_tracker" # MQTT identifier (always fitness_tracker) self.device_alias = "Unknown" # Actual device name for display self.device_id = None # Track device ID for filtering queries self.device_mac = None # Track device MAC for Bluetooth reconnection self.mqtt_client = None self.last_publish_time = 0 self.last_db_mtime = 0 self.running = True # New: Track data freshness for watchdog self.last_data_timestamp = 0 # Register signal handlers for graceful shutdown signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) def _signal_handler(self, signum, frame): """Handle shutdown signals gracefully""" logger.info(f"Received signal {signum}. Shutting down...") self.running = False @staticmethod def _compute_awake(is_awake_flag, wakeup_raw, stage_code, stage_timestamp_ms, now_ms, avg_recent_hr=None, resting_hr=None): """Decide awake state using most reliable signals first. Priority: 1. If past wakeup time -> awake (session has ended) 2. If within sleep session (before wakeup) -> check stage data 3. If recent sleep stage shows awake (code 5) -> awake 4. If recent sleep stage shows sleep (2=deep, 3=light, 4=REM) -> sleeping 5. If within sleep session but no recent stage -> sleeping 6. Use HR as fallback: HR < (resting_hr + 10) suggests sleeping 7. Default: awake Note: is_awake_flag indicates session has finished (!isSleepFinish in Gadgetbridge) so we only use it as confirmation when past wakeup time Stage codes for Xiaomi: 2=deep, 3=light, 4=REM, 5=awake """ # Priority 1: If we're past the wakeup time, the session is over -> awake if wakeup_raw is not None and wakeup_raw <= now_ms: return True # Priority 2-5: Within a sleep session (wakeup time in the future) in_session = wakeup_raw is not None and wakeup_raw > now_ms # Check if stage data is recent (within 2 hours) recent_stage = stage_timestamp_ms is not None and now_ms - stage_timestamp_ms <= 2 * 60 * 60 * 1000 if recent_stage and stage_code is not None: if stage_code == 5: # AWAKE stage return True if stage_code in (2, 3, 4): # Deep(2), Light(3), REM(4) return False # Priority 5: If within session but no recent stage data -> assume sleeping if in_session: return False # Priority 6: Use HR as fallback indicator (outside sessions) hr_threshold = (resting_hr + 10) if resting_hr else 65 if avg_recent_hr is not None and avg_recent_hr < hr_threshold: return False # Priority 7: Default to awake return True def connect_mqtt(self): """Connect to MQTT broker""" if mqtt is None: logger.error("paho-mqtt not available; cannot publish") return False # Use callback API version 2 to avoid deprecation warning try: self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) except (AttributeError, TypeError): # Fallback for older paho-mqtt versions self.mqtt_client = mqtt.Client() if self.config.get("mqtt_username") and self.config.get("mqtt_password"): self.mqtt_client.username_pw_set( self.config["mqtt_username"], self.config["mqtt_password"] ) try: self.mqtt_client.connect( self.config["mqtt_broker"], self.config.get("mqtt_port", 1883), 60 ) self.mqtt_client.loop_start() # Wait for connection to establish time.sleep(1) logger.info(f"Connected to MQTT broker: {self.config['mqtt_broker']}") return True except Exception as e: logger.error(f"MQTT connection failed: {e}") return False def disconnect_mqtt(self): """Disconnect from MQTT broker""" if self.mqtt_client: self.mqtt_client.loop_stop() self.mqtt_client.disconnect() def get_device_info(self, cursor): """Get device ID, alias, and MAC from database - picks device with most recent activity""" try: # Find the device with the most recent activity data # This ensures we get the currently active band, not an old one cursor.execute(""" SELECT d._id, d.ALIAS, d.NAME, d.IDENTIFIER FROM DEVICE d WHERE (LOWER(d.NAME) LIKE '%band%' OR LOWER(d.NAME) LIKE '%watch%') ORDER BY d._id DESC LIMIT 1 """) row = cursor.fetchone() if row: device_id = row[0] # Get actual device name for display device_alias = row[1] if row[1] else row[2] # Use ALIAS, fallback to NAME device_mac = row[3] # MAC address / IDENTIFIER logger.info(f"Selected device: ID={device_id}, Name={device_alias}, MAC={device_mac}") return device_id, device_alias, device_mac except Exception as e: logger.error(f"Error getting device info: {e}") return None, "Unknown", None def get_day_start_timestamp(self): """Get timestamp for start of current day (4am)""" now = datetime.now() today = now.date() day_start = datetime.combine(today, datetime.min.time()).replace(hour=4) if now.hour < 4: day_start -= timedelta(days=1) return int(day_start.timestamp()) def get_day_midnight_timestamp(self): """Get midnight timestamp in seconds for daily summary queries""" now = datetime.now() today = now.date() midnight = datetime.combine(today, datetime.min.time()) if now.hour < 4: midnight -= timedelta(days=1) return int(midnight.timestamp()) def query_sensors(self, cursor): """Query all sensor data from database - filtered by device_id""" data = {} day_start_ts = self.get_day_start_timestamp() now_ts = int(datetime.now().timestamp()) day_midnight = self.get_day_midnight_timestamp() now_ms = int(time.time() * 1000) # Query sleep stage FIRST (needed for is_awake calculation) stage_code = None stage_timestamp_ms = None try: cursor.execute( """ SELECT STAGE, TIMESTAMP FROM XIAOMI_SLEEP_STAGE_SAMPLE WHERE DEVICE_ID = ? ORDER BY TIMESTAMP DESC LIMIT 1 """, (self.device_id,) ) row = cursor.fetchone() if row: stage_code, stage_timestamp_ms = row except Exception as e: logger.debug(f"Sleep stage query failed: {e}") # Query average recent HR (last 10 minutes) for sleep detection fallback avg_recent_hr = None try: cursor.execute( """ SELECT AVG(HEART_RATE) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 AND TIMESTAMP >= ? """, (self.device_id, now_ts - 600) # Last 10 minutes ) row = cursor.fetchone() if row and row[0]: avg_recent_hr = row[0] except Exception as e: logger.debug(f"Recent HR query failed: {e}") # Query resting HR for dynamic sleep threshold resting_hr = None try: cursor.execute( """ SELECT HR_RESTING FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE DEVICE_ID = ? AND HR_RESTING > 0 ORDER BY TIMESTAMP DESC LIMIT 1 """, (self.device_id,) ) row = cursor.fetchone() if row and row[0]: resting_hr = row[0] except Exception as e: logger.debug(f"Resting HR query failed: {e}") # Daily Steps (filtered by device) try: cursor.execute( "SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?", (self.device_id, day_start_ts, now_ts) ) data["daily_steps"] = cursor.fetchone()[0] or 0 except Exception as e: logger.debug(f"Daily steps query failed: {e}") # Weekly Steps (filtered by device) try: now = datetime.now() week_start = now.date() - timedelta(days=now.date().weekday()) week_start_time = datetime.combine(week_start, datetime.min.time()).replace(hour=4) if now.date().weekday() == 0 and now.hour < 4: week_start_time -= timedelta(days=7) cursor.execute( "SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?", (self.device_id, int(week_start_time.timestamp()), now_ts) ) data["weekly_steps"] = cursor.fetchone()[0] or 0 except Exception as e: logger.debug(f"Weekly steps query failed: {e}") # Battery Level (filtered by device) try: cursor.execute( "SELECT LEVEL FROM BATTERY_LEVEL WHERE DEVICE_ID = ? ORDER BY TIMESTAMP DESC LIMIT 1", (self.device_id,) ) row = cursor.fetchone() if row: data["battery_level"] = row[0] except Exception as e: logger.debug(f"Battery query failed: {e}") # Latest Heart Rate (filtered by device) # --- MODIFIED: Capture TIMESTAMP for watchdog --- try: cursor.execute( "SELECT HEART_RATE, TIMESTAMP FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 1", (self.device_id,) ) row = cursor.fetchone() if row: data["heart_rate"] = row[0] self.last_data_timestamp = row[1] # <--- NEW: Update timestamp except Exception as e: logger.debug(f"Heart rate query failed: {e}") # Daily Summary Data (filtered by device) try: cursor.execute( "SELECT HR_RESTING, HR_MAX, HR_AVG, CALORIES FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1", (self.device_id, day_midnight * 1000) # Convert to milliseconds ) row = cursor.fetchone() if row: if row[0]: data["hr_resting"] = row[0] if row[1]: data["hr_max"] = row[1] if row[2]: data["hr_avg"] = row[2] if row[3]: data["calories"] = row[3] except Exception as e: logger.debug(f"Daily summary query failed: {e}") # Sleep Data (filtered by device) try: # SLEEP DURATION: Sum consecutive sleep sessions until there's a 2h+ gap day_ago_ts_ms = (int(time.time()) - 24 * 3600) * 1000 cursor.execute( """ SELECT TIMESTAMP, WAKEUP_TIME, TOTAL_DURATION, DEEP_SLEEP_DURATION, LIGHT_SLEEP_DURATION, REM_SLEEP_DURATION FROM XIAOMI_SLEEP_TIME_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? ORDER BY TIMESTAMP DESC """, (self.device_id, day_ago_ts_ms) ) sessions = cursor.fetchall() total_sleep_min = 0 last_wakeup_ms = None for sess_row in sessions: sess_start, sess_wake, sess_total, sess_deep, sess_light, sess_rem = sess_row if last_wakeup_ms is not None: gap_hours = (last_wakeup_ms - sess_wake) / 1000 / 3600 if sess_wake else 999 if gap_hours > 2: break sess_min = 0 if sess_deep or sess_light or sess_rem: sess_min = (sess_deep or 0) + (sess_light or 0) + (sess_rem or 0) elif sess_total: sess_min = sess_total total_sleep_min += sess_min last_wakeup_ms = sess_start if total_sleep_min > 0: data["sleep_duration"] = round(total_sleep_min / 60.0, 2) # IS_AWAKE / SLEEP_STAGE: Use the MOST RECENT session (within 24h) cursor.execute( """ SELECT TOTAL_DURATION, IS_AWAKE, WAKEUP_TIME, TIMESTAMP, DEEP_SLEEP_DURATION, LIGHT_SLEEP_DURATION, REM_SLEEP_DURATION, AWAKE_DURATION FROM XIAOMI_SLEEP_TIME_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ? ORDER BY TIMESTAMP DESC LIMIT 1 """, (self.device_id, day_ago_ts_ms, now_ms) ) row = cursor.fetchone() if row: (total_duration, is_awake_flag, wakeup_raw, sleep_start_ms, deep_dur, light_dur, rem_dur, awake_dur) = row in_sleep_session = ( sleep_start_ms is not None and wakeup_raw is not None and sleep_start_ms <= now_ms < wakeup_raw ) data["in_sleep_session"] = in_sleep_session is_awake = self._compute_awake( is_awake_flag=is_awake_flag, wakeup_raw=wakeup_raw, stage_code=stage_code, stage_timestamp_ms=stage_timestamp_ms, now_ms=now_ms, avg_recent_hr=avg_recent_hr, resting_hr=resting_hr, ) data["is_awake"] = is_awake stage_names = { 0: "not_sleep", 1: "unknown", 2: "deep_sleep", 3: "light_sleep", 4: "rem_sleep", 5: "awake" } if stage_code is not None and stage_timestamp_ms is not None: stage_age_minutes = (now_ms - stage_timestamp_ms) / 1000 / 60 if in_sleep_session or stage_age_minutes <= 120: data["sleep_stage"] = stage_names.get(stage_code, f"unknown_{stage_code}") data["sleep_stage_code"] = stage_code else: data["sleep_stage"] = "not_sleep" data["sleep_stage_code"] = 0 else: data["sleep_stage"] = "not_sleep" if is_awake else "unknown" data["sleep_stage_code"] = 0 else: data["is_awake"] = True data["in_sleep_session"] = False data["sleep_stage"] = "not_sleep" data["sleep_stage_code"] = 0 except Exception as e: logger.debug(f"Sleep query failed: {e}") # Weight (not device-specific, from scale) try: cursor.execute("SELECT WEIGHT_KG FROM MI_SCALE_WEIGHT_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1") row = cursor.fetchone() if row and row[0]: data["weight"] = round(row[0], 1) # Round to 1 decimal except Exception as e: logger.debug(f"Weight query failed: {e}") # Server time data["server_time"] = datetime.now().astimezone().isoformat() # Device name (actual band name) data["device"] = self.device_alias return data def publish_discovery(self): """Publish Home Assistant MQTT discovery configs""" device_info = { "identifiers": [self.device_name], "name": "Fitness Tracker", "model": self.device_alias, "manufacturer": "Gadgetbridge", } sensors = [ ("device", "Device", None, "mdi:watch", None, None), ("daily_steps", "Daily Steps", "steps", "mdi:walk", "total_increasing", None), ("weekly_steps", "Weekly Steps", "steps", "mdi:walk", "total", None), ("battery_level", "Battery", "%", "mdi:battery", None, "battery"), ("heart_rate", "Heart Rate", "bpm", "mdi:heart-pulse", "measurement", None), ("hr_resting", "Resting HR", "bpm", "mdi:heart-pulse", "measurement", None), ("hr_max", "Max HR", "bpm", "mdi:heart-pulse", "measurement", None), ("hr_avg", "Average HR", "bpm", "mdi:heart-pulse", "measurement", None), ("calories", "Calories", "kcal", "mdi:fire", "total_increasing", None), ("sleep_duration", "Sleep Duration", "h", "mdi:sleep", "measurement", None), ("is_awake", "Is Awake", None, "mdi:power-sleep", None, None), ("in_sleep_session", "In Sleep Session", None, "mdi:bed", None, None), ("sleep_stage", "Sleep Stage", None, "mdi:sleep-cycle", None, None), ("sleep_stage_code", "Sleep Stage Code", None, "mdi:numeric", "measurement", None), ("weight", "Weight", "kg", "mdi:scale-bathroom", "measurement", None), ("server_time", "Last Update", None, "mdi:clock-outline", None, "timestamp"), ] for sensor_id, name, unit, icon, state_class, device_class in sensors: config = { "name": f"Fitness Tracker {name}", "unique_id": f"{self.device_name}_{sensor_id}", "state_topic": f"gadgetbridge/{self.device_name}/{sensor_id}", "device": device_info, "icon": icon, } if unit: config["unit_of_measurement"] = unit if state_class: config["state_class"] = state_class if device_class: config["device_class"] = device_class topic = f"homeassistant/sensor/{self.device_name}_{sensor_id}/config" self.mqtt_client.publish(topic, json.dumps(config), qos=1, retain=True) logger.info("Published Home Assistant discovery configs") def publish_data(self, data): """Publish sensor data to MQTT""" for key, value in data.items(): topic = f"gadgetbridge/{self.device_name}/{key}" # Use retain=True so HA gets values on restart self.mqtt_client.publish(topic, str(value), qos=1, retain=True) logger.info(f"Published: steps={data.get('daily_steps', 'N/A')}, " f"hr={data.get('heart_rate', 'N/A')}, " f"battery={data.get('battery_level', 'N/A')}%") def process_database(self): """Read database and publish data""" if not self.db_path or not os.path.exists(self.db_path): logger.warning("No database file found") return False try: conn = sqlite3.connect(self.db_path, timeout=10.0) cursor = conn.cursor() # Get device ID, name, and MAC self.device_id, self.device_alias, self.device_mac = self.get_device_info(cursor) if not self.device_id: logger.warning("No fitness device found in database") conn.close() return False # Query all sensors data = self.query_sensors(cursor) conn.close() if data: self.publish_data(data) return True except Exception as e: logger.error(f"Database error: {e}") return False def check_and_recover_connection(self): """ WATCHDOG: Checks if data is stale (>20 mins) and forces Bluetooth restart if so. """ if self.last_data_timestamp == 0: return # No data seen yet, skip check current_ts = int(time.time()) time_diff = current_ts - self.last_data_timestamp if time_diff > STALE_THRESHOLD_SECONDS: logger.warning(f"Data stale ({time_diff}s > {STALE_THRESHOLD_SECONDS}s). Triggering recovery...") if self.device_mac: # 1. Disconnect and wait trigger_bluetooth_reconnect(self.device_mac) # 2. Reset timestamp so we don't loop immediately self.last_data_timestamp = current_ts else: logger.warning("Cannot recover: No device MAC available") def run(self): """Main loop""" export_dir = self.config.get("export_dir", GB_EXPORT_DIR) interval = self.config.get("publish_interval", PUBLISH_INTERVAL) logger.info(f"Starting Gadgetbridge MQTT Publisher") logger.info(f"Export directory: {export_dir}") logger.info(f"Publish interval: {interval}s") # Ensure export directory exists os.makedirs(export_dir, exist_ok=True) # Connect to MQTT if not self.connect_mqtt(): logger.error("Failed to connect to MQTT. Exiting.") sys.exit(1) discovery_published = False try: while self.running: current_time = time.time() # Check if it's time to trigger sync and publish if current_time - self.last_publish_time >= interval: try: # 1. WATCHDOG (New) self.check_and_recover_connection() logger.info("Triggering Gadgetbridge sync...") # 2. SYNC (Original - this handles the CONNECT logic) # If watchdog ran, this provides the "Reconnect" part of the cycle trigger_gadgetbridge_sync(self.device_mac) # Wait for export to complete time.sleep(5) # Find latest database with retry self.db_path = find_latest_db(export_dir) if not self.db_path: logger.warning(f"No database found, retrying in 3s...") time.sleep(3) self.db_path = find_latest_db(export_dir) if self.db_path: logger.info(f"Using database: {os.path.basename(self.db_path)}") # Publish discovery on first successful read if not discovery_published: # Need to read device info first try: conn = sqlite3.connect(self.db_path, timeout=5.0) cursor = conn.cursor() self.device_id, self.device_alias, self.device_mac = self.get_device_info(cursor) conn.close() if self.device_mac: logger.info(f"Device MAC for reconnection: {self.device_mac}") except Exception as e: logger.warning(f"Could not read device info: {e}") self.publish_discovery() discovery_published = True self.process_database() else: logger.warning(f"No database found in {export_dir}") except Exception as e: logger.error(f"Sync cycle failed: {e}") self.last_publish_time = current_time logger.info(f"Next publish in {interval}s...") time.sleep(10) # Check every 10 seconds except KeyboardInterrupt: logger.info("Interrupted by user") finally: logger.info("Cleaning up...") self.disconnect_mqtt() def main(): config = load_config() publisher = GadgetbridgeMQTT(config) publisher.run() if __name__ == "__main__": main()