#!/usr/bin/env python3 """ Gadgetbridge MQTT Step Counter Integration Extracts sensor data from Gadgetbridge SQLite database and publishes to Home Assistant via MQTT """ import os import sqlite3 import json import logging from datetime import datetime, timedelta from typing import Dict, Any import asyncio import aiomqtt import re import time class GadgetbridgeMQTTPublisher: def __init__(self): self.setup_logging() self.db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge.db") self.load_config() self.mqtt_client = None self.publish_interval = int(os.getenv("PUBLISH_INTERVAL_SECONDS", "300")) self.max_retries = int(os.getenv("MAX_RETRIES", "5")) self.retry_delay = int(os.getenv("RETRY_DELAY_SECONDS", "30")) # Initialize device_name with fallback - don't fail on DB issues during init try: self.device_name = self.get_device_alias() except Exception as e: self.logger.warning(f"Could not get device alias during init: {e}") self.device_name = "fitness_tracker" # Initialize sensors after device_name is set self.initialize_sensors() def setup_logging(self): """Setup logging configuration (console only)""" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(), ], ) self.logger = logging.getLogger(__name__) def load_config(self): """Load MQTT configuration from environment variables""" self.mqtt_config = { "broker": os.getenv("MQTT_BROKER", "localhost"), "port": int(os.getenv("MQTT_PORT", "1883")), "username": os.getenv("MQTT_USERNAME", ""), "password": os.getenv("MQTT_PASSWORD", ""), } def get_day_start_timestamp(self) -> int: """Get the timestamp for the start of the current day (4am)""" now = datetime.now() today = now.date() # Day starts at 4am day_start_time = datetime.combine(today, datetime.min.time()).replace(hour=4) # If current time is before 4am, we're still in "yesterday's" day if now.hour < 4: day_start_time -= timedelta(days=1) return int(day_start_time.timestamp()) def get_day_midnight_timestamp_ms(self) -> int: """Get the timestamp for midnight of the current day in milliseconds. Used for XIAOMI_DAILY_SUMMARY_SAMPLE which stores data at midnight.""" now = datetime.now() today = now.date() # Daily summary is stored at midnight day_midnight = datetime.combine(today, datetime.min.time()) # If current time is before 4am, use yesterday's midnight if now.hour < 4: day_midnight -= timedelta(days=1) return int(day_midnight.timestamp()) * 1000 def initialize_sensors(self): """Initialize sensor definitions after device_name is available""" self.sensors = [ { "name": "Daily Steps", "unique_id": "daily_steps", "state_topic": f"gadgetbridge/{self.device_name}/steps/daily", "unit_of_measurement": "steps", "icon": "mdi:walk", "state_class": "total_increasing", "query": self.query_daily_steps, }, { "name": "Weekly Steps", "unique_id": "weekly_steps", "state_topic": f"gadgetbridge/{self.device_name}/steps/weekly", "unit_of_measurement": "steps", "icon": "mdi:walk", "state_class": "total", "query": self.query_weekly_steps, }, { "name": "Monthly Steps", "unique_id": "monthly_steps", "state_topic": f"gadgetbridge/{self.device_name}/steps/monthly", "unit_of_measurement": "steps", "icon": "mdi:walk", "state_class": "total", "query": self.query_monthly_steps, }, { "name": "Battery Level", "unique_id": "battery_level", "state_topic": f"gadgetbridge/{self.device_name}/battery", "unit_of_measurement": "%", "icon": "mdi:battery", "device_class": "battery", "query": self.query_battery_level, }, { "name": "Weight", "unique_id": "weight", "state_topic": f"gadgetbridge/{self.device_name}/weight", "unit_of_measurement": "kg", "icon": "mdi:scale-bathroom", "state_class": "measurement", "query": self.query_latest_weight, }, { "name": "Latest Heart Rate", "unique_id": "latest_heart_rate", "state_topic": f"gadgetbridge/{self.device_name}/heart_rate", "unit_of_measurement": "bpm", "icon": "mdi:heart-pulse", "state_class": "measurement", "query": self.query_latest_heart_rate, }, { "name": "Resting Heart Rate", "unique_id": "hr_resting", "state_topic": f"gadgetbridge/{self.device_name}/hr_resting", "unit_of_measurement": "bpm", "icon": "mdi:heart-pulse", "state_class": "measurement", "query": self.query_hr_resting, }, { "name": "Max Heart Rate", "unique_id": "hr_max", "state_topic": f"gadgetbridge/{self.device_name}/hr_max", "unit_of_measurement": "bpm", "icon": "mdi:heart-pulse", "state_class": "measurement", "query": self.query_hr_max, }, { "name": "Average Heart Rate", "unique_id": "hr_avg", "state_topic": f"gadgetbridge/{self.device_name}/hr_avg", "unit_of_measurement": "bpm", "icon": "mdi:heart-pulse", "state_class": "measurement", "query": self.query_hr_avg, }, { "name": "Calories", "unique_id": "calories", "state_topic": f"gadgetbridge/{self.device_name}/calories", "unit_of_measurement": "kcal", "icon": "mdi:fire", "state_class": "total_increasing", "query": self.query_calories, }, { "name": "Is Awake", "unique_id": "is_awake", "state_topic": f"gadgetbridge/{self.device_name}/is_awake", "icon": "mdi:power-sleep", "device_class": "enum", "query": self.query_is_awake, }, { "name": "Total Sleep Duration", "unique_id": "total_sleep_duration", "state_topic": f"gadgetbridge/{self.device_name}/total_sleep_duration", "unit_of_measurement": "h", "icon": "mdi:sleep", "state_class": "measurement", "query": self.query_total_sleep_duration, }, { "name": "Server Time", "unique_id": "server_time", "state_topic": f"gadgetbridge/{self.device_name}/server_time", "icon": "mdi:clock-outline", "device_class": "timestamp", "query": self.query_server_time, }, ] async def publish_home_assistant_discovery( self, entity_type: str, entity_id: str, config: Dict ): """Publish Home Assistant MQTT discovery configuration asynchronously""" discovery_topic = ( f"homeassistant/{entity_type}/{self.device_name}_{entity_id}/config" ) try: await self.mqtt_client.publish( discovery_topic, json.dumps(config), qos=1, retain=True ) self.logger.info(f"Published discovery config for {entity_id}") except Exception as e: self.logger.error(f"Failed to publish discovery config: {e}") raise async def setup_home_assistant_entities(self): """Setup Home Assistant entities via MQTT discovery""" device_info = { "identifiers": [self.device_name], "name": f"Gadgetbridge {self.device_name.replace('_', ' ').title()}", "model": "Fitness Tracker", "manufacturer": "Gadgetbridge", } for sensor in self.sensors: config = { "name": f"{self.device_name.replace('_', ' ').title()} {sensor['name']}", "unique_id": f"{self.device_name}_{sensor['unique_id']}", "state_topic": sensor["state_topic"], "device": device_info, } # Add optional fields if present for key in ["unit_of_measurement", "icon", "state_class", "device_class"]: if key in sensor: config[key] = sensor[key] await self.publish_home_assistant_discovery( "sensor", sensor["unique_id"], config ) def query_daily_steps(self, cursor) -> Any: day_start_ts = self.get_day_start_timestamp() now_ts = int(datetime.now().timestamp()) cursor.execute( "SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?", (day_start_ts, now_ts), ) return cursor.fetchone()[0] or 0 def query_weekly_steps(self, cursor) -> Any: now = datetime.now() today = now.date() # Week starts on Monday at 4am week_start = today - timedelta(days=today.weekday()) week_start_time = datetime.combine(week_start, datetime.min.time()).replace(hour=4) # If we're before 4am on Monday, the week actually started last Monday if today.weekday() == 0 and now.hour < 4: week_start_time -= timedelta(days=7) week_start_ts = int(week_start_time.timestamp()) now_ts = int(now.timestamp()) cursor.execute( "SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?", (week_start_ts, now_ts), ) return cursor.fetchone()[0] or 0 def query_monthly_steps(self, cursor) -> Any: now = datetime.now() today = now.date() # Month starts on 1st at 4am month_start = today.replace(day=1) month_start_time = datetime.combine(month_start, datetime.min.time()).replace(hour=4) # If we're before 4am on the 1st, the month actually started last month's 1st if today.day == 1 and now.hour < 4: # Go back to previous month if month_start.month == 1: month_start_time = month_start_time.replace(year=month_start.year - 1, month=12) else: month_start_time = month_start_time.replace(month=month_start.month - 1) month_start_ts = int(month_start_time.timestamp()) now_ts = int(now.timestamp()) cursor.execute( "SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?", (month_start_ts, now_ts), ) return cursor.fetchone()[0] or 0 def query_battery_level(self, cursor) -> Any: cursor.execute( "SELECT LEVEL FROM BATTERY_LEVEL ORDER BY TIMESTAMP DESC LIMIT 1" ) row = cursor.fetchone() return row[0] if row else None def query_latest_weight(self, cursor) -> Any: cursor.execute( "SELECT WEIGHT_KG FROM MI_SCALE_WEIGHT_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1" ) row = cursor.fetchone() return row[0] if row else None def query_latest_heart_rate(self, cursor) -> Any: cursor.execute( "SELECT HEART_RATE FROM XIAOMI_ACTIVITY_SAMPLE WHERE HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 1" ) row = cursor.fetchone() return row[0] if row else None def query_hr_resting(self, cursor) -> Any: day_midnight_ts_ms = self.get_day_midnight_timestamp_ms() cursor.execute( "SELECT HR_RESTING FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1", (day_midnight_ts_ms,) ) row = cursor.fetchone() return row[0] if row else None def query_hr_max(self, cursor) -> Any: day_midnight_ts_ms = self.get_day_midnight_timestamp_ms() cursor.execute( "SELECT HR_MAX FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1", (day_midnight_ts_ms,) ) row = cursor.fetchone() return row[0] if row else None def query_hr_avg(self, cursor) -> Any: day_midnight_ts_ms = self.get_day_midnight_timestamp_ms() cursor.execute( "SELECT HR_AVG FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1", (day_midnight_ts_ms,) ) row = cursor.fetchone() return row[0] if row else None def query_calories(self, cursor) -> Any: day_midnight_ts_ms = self.get_day_midnight_timestamp_ms() cursor.execute( "SELECT CALORIES FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1", (day_midnight_ts_ms,) ) row = cursor.fetchone() return row[0] if row else None def query_is_awake(self, cursor) -> Any: cursor.execute("""SELECT TIMESTAMP, IS_AWAKE, WAKEUP_TIME FROM XIAOMI_SLEEP_TIME_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1""") row = cursor.fetchone() # 1. No data at all -> Assume Awake if not row: return True last_ts_epoch = row[0] // 1000 # Convert from milliseconds to seconds is_awake_val = row[1] # This can be 1, 0, or None (NULL) wakeup_time = row[2] # Wakeup time in milliseconds # 2. Check if WAKEUP_TIME is in the past (user has woken up) current_time_ms = int(time.time()) * 1000 if wakeup_time and wakeup_time <= current_time_ms: return True # 3. Timeout Safety: If last sleep data is older than 12 hours, force Awake # This handles "Band Removed" or "Sync Failed" scenarios if (int(time.time()) - last_ts_epoch) > (12 * 3600): return True # 4. Explicit Status Check if is_awake_val == 1: return True # If is_awake_val is 0 or None, the user is likely asleep return False def query_total_sleep_duration(self, cursor) -> Any: # Get sleep from the last 24 hours day_ago_ts = (int(time.time()) - 24 * 3600) * 1000 # 24 hours ago in milliseconds cursor.execute( "SELECT TOTAL_DURATION FROM XIAOMI_SLEEP_TIME_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1", (day_ago_ts,) ) row = cursor.fetchone() # Convert minutes to hours, round to 2 decimals return round(row[0] / 60, 2) if row and row[0] is not None else None def query_server_time(self, cursor) -> Any: """Return current server time in ISO 8601 format for Home Assistant timestamp""" # .astimezone() adds the system's local timezone offset (e.g., +01:00) return datetime.now().astimezone().isoformat() def get_sensor_data(self) -> Dict[str, Any]: """Query all sensors and return their values as a dict""" if not os.path.exists(self.db_path): self.logger.error(f"Database file not found: {self.db_path}") return {} try: conn = sqlite3.connect(self.db_path, timeout=10.0) cursor = conn.cursor() data = {} for sensor in self.sensors: try: data[sensor["unique_id"]] = sensor["query"](cursor) except Exception as e: self.logger.error(f"Error querying {sensor['unique_id']}: {e}") data[sensor["unique_id"]] = None conn.close() return data except Exception as e: self.logger.error(f"Error querying database: {e}") return {} async def publish_sensor_data(self, data: Dict[str, Any]): """Publish all sensor data to MQTT asynchronously""" for sensor in self.sensors: value = data.get(sensor["unique_id"]) if value is not None: try: await self.mqtt_client.publish( sensor["state_topic"], str(value), qos=1 ) except Exception as e: self.logger.error(f"Failed to publish {sensor['unique_id']}: {e}") raise self.logger.info(f"Published sensor data: {data}") async def run_main_loop(self): """Main execution loop with error recovery""" while True: try: self.logger.info("Attempting MQTT connection...") async with aiomqtt.Client( hostname=self.mqtt_config["broker"], port=self.mqtt_config["port"], username=self.mqtt_config["username"] or None, password=self.mqtt_config["password"] or None, ) as client: self.mqtt_client = client self.logger.info("MQTT connection successful") await self.setup_home_assistant_entities() # Publish immediately on startup sensor_data = self.get_sensor_data() await self.publish_sensor_data(sensor_data) self.logger.info(f"Next publish in {self.publish_interval} seconds...") # Main publishing loop while True: await asyncio.sleep(self.publish_interval) sensor_data = self.get_sensor_data() await self.publish_sensor_data(sensor_data) self.logger.info(f"Next publish in {self.publish_interval} seconds...") except Exception as e: self.logger.error(f"Error in main loop: {e}") self.logger.info(f"Restarting main loop in {self.retry_delay} seconds...") await asyncio.sleep(self.retry_delay) async def run(self): """Main execution method (async) - now with proper error recovery""" self.logger.info("Starting Gadgetbridge MQTT Publisher") # Initial database check with wait loop if not os.path.exists(self.db_path): self.logger.error(f"Database file not found: {self.db_path}") self.logger.info("Waiting for database to become available...") while not os.path.exists(self.db_path): await asyncio.sleep(30) self.logger.info("Still waiting for database...") self.logger.info("Database file found, continuing...") # Try to get proper device name now that DB might be available try: new_device_name = self.get_device_alias() if new_device_name != self.device_name: self.logger.info(f"Updating device name from {self.device_name} to {new_device_name}") self.device_name = new_device_name self.initialize_sensors() # Reinitialize with correct device name except Exception as e: self.logger.warning(f"Could not update device alias: {e}, using fallback") # Run main loop with recovery await self.run_main_loop() def get_device_alias(self) -> str: """Fetch ALIAS from DEVICE table for device_name where NAME contains 'band' or 'watch' (case-insensitive)""" if not os.path.exists(self.db_path): self.logger.warning(f"Database file not found: {self.db_path}") return "fitness_tracker" try: conn = sqlite3.connect(self.db_path, timeout=5.0) cursor = conn.cursor() cursor.execute( """ SELECT ALIAS FROM DEVICE WHERE LOWER(NAME) LIKE '%band%' OR LOWER(NAME) LIKE '%watch%' LIMIT 1 """ ) row = cursor.fetchone() conn.close() if row and row[0]: # Sanitize alias for MQTT topics return re.sub(r"\W+", "_", row[0]).lower() else: return "fitness_tracker" except Exception as e: self.logger.error(f"Error fetching device alias: {e}") return "fitness_tracker" # --- Main Entry Point --- if __name__ == "__main__": publisher = GadgetbridgeMQTTPublisher() asyncio.run(publisher.run())