#!/usr/bin/env python3 """ Gadgetbridge MQTT Step Counter Integration Extracts steps data from Gadgetbridge SQLite database and publishes to Home Assistant via MQTT """ import os import sqlite3 import json import time import logging from datetime import datetime, timedelta from typing import Dict, List, Optional import asyncio import aiomqtt import re class GadgetbridgeMQTTPublisher: def __init__(self): self.setup_logging() self.db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge.db") raw_name = os.getenv("DEVICE_NAME", "fitness_tracker") # Sanitize device_name: lowercase, replace spaces and non-word chars with _ self.device_name = re.sub(r"\W+", "_", raw_name).lower() self.load_config() self.mqtt_client = None self.publish_interval = int( os.getenv("PUBLISH_INTERVAL_SECONDS", "300") ) # <-- Add this def setup_logging(self): """Setup logging configuration""" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("/app/logs/gadgetbridge_mqtt.log"), logging.StreamHandler(), ], ) self.logger = logging.getLogger(__name__) def load_config(self): """Load MQTT configuration from environment variables""" self.mqtt_config = { "broker": os.getenv("MQTT_BROKER", "localhost"), "port": int(os.getenv("MQTT_PORT", "1883")), "username": os.getenv("MQTT_USERNAME", ""), "password": os.getenv("MQTT_PASSWORD", ""), } async def publish_home_assistant_discovery( self, entity_type: str, entity_id: str, config: Dict ): """Publish Home Assistant MQTT discovery configuration asynchronously""" discovery_topic = ( f"homeassistant/{entity_type}/{self.device_name}_{entity_id}/config" ) try: await self.mqtt_client.publish( discovery_topic, json.dumps(config), qos=1, retain=True ) self.logger.info(f"Published discovery config for {entity_id}") except Exception as e: self.logger.error(f"Failed to publish discovery config: {e}") async def setup_home_assistant_entities(self): """Setup Home Assistant entities via MQTT discovery""" device_info = { "identifiers": [self.device_name], "name": f"Gadgetbridge {self.device_name.replace('_', ' ').title()}", "model": "Fitness Tracker", "manufacturer": "Gadgetbridge", } # Daily steps sensor steps_config = { "name": f"{self.device_name.replace('_', ' ').title()} Daily Steps", "unique_id": f"{self.device_name}_daily_steps", "state_topic": f"gadgetbridge/{self.device_name}/steps/daily", "unit_of_measurement": "steps", "icon": "mdi:walk", "device": device_info, "state_class": "total_increasing", } # Weekly steps sensor weekly_steps_config = { "name": f"{self.device_name.replace('_', ' ').title()} Weekly Steps", "unique_id": f"{self.device_name}_weekly_steps", "state_topic": f"gadgetbridge/{self.device_name}/steps/weekly", "unit_of_measurement": "steps", "icon": "mdi:walk", "device": device_info, "state_class": "total", } # Monthly steps sensor monthly_steps_config = { "name": f"{self.device_name.replace('_', ' ').title()} Monthly Steps", "unique_id": f"{self.device_name}_monthly_steps", "state_topic": f"gadgetbridge/{self.device_name}/steps/monthly", "unit_of_measurement": "steps", "icon": "mdi:walk", "device": device_info, "state_class": "total", } # Last sync sensor last_sync_config = { "name": f"{self.device_name.replace('_', ' ').title()} Last Sync", "unique_id": f"{self.device_name}_last_sync", "state_topic": f"gadgetbridge/{self.device_name}/last_sync", "icon": "mdi:sync", "device": device_info, "device_class": "timestamp", } # Battery level sensor battery_config = { "name": f"{self.device_name.replace('_', ' ').title()} Battery Level", "unique_id": f"{self.device_name}_battery_level", "state_topic": f"gadgetbridge/{self.device_name}/battery", "unit_of_measurement": "%", "icon": "mdi:battery", "device": device_info, "device_class": "battery", } await self.publish_home_assistant_discovery( "sensor", "daily_steps", steps_config ) await self.publish_home_assistant_discovery( "sensor", "weekly_steps", weekly_steps_config ) await self.publish_home_assistant_discovery( "sensor", "monthly_steps", monthly_steps_config ) await self.publish_home_assistant_discovery( "sensor", "last_sync", last_sync_config ) await self.publish_home_assistant_discovery( "sensor", "battery_level", battery_config ) def get_steps_data(self) -> Dict: """Extract steps data from Gadgetbridge database""" if not os.path.exists(self.db_path): self.logger.error(f"Database file not found: {self.db_path}") return {} try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Get today's date today = datetime.now().date() week_start = today - timedelta(days=today.weekday()) month_start = today.replace(day=1) # Convert to Unix timestamps today_start = int(datetime.combine(today, datetime.min.time()).timestamp()) today_end = int(datetime.combine(today, datetime.max.time()).timestamp()) week_start_ts = int( datetime.combine(week_start, datetime.min.time()).timestamp() ) month_start_ts = int( datetime.combine(month_start, datetime.min.time()).timestamp() ) # Query daily steps cursor.execute( """ SELECT SUM(STEPS) as daily_steps FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? AND TIMESTAMP <= ? """, (today_start, today_end), ) daily_steps = cursor.fetchone()[0] or 0 # Query weekly steps cursor.execute( """ SELECT SUM(STEPS) as weekly_steps FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? """, (week_start_ts,), ) weekly_steps = cursor.fetchone()[0] or 0 # Query monthly steps cursor.execute( """ SELECT SUM(STEPS) as monthly_steps FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? """, (month_start_ts,), ) monthly_steps = cursor.fetchone()[0] or 0 # Get last sync timestamp cursor.execute( """ SELECT MAX(TIMESTAMP) as last_sync FROM XIAOMI_ACTIVITY_SAMPLE """ ) last_sync_ts = cursor.fetchone()[0] last_sync = ( datetime.fromtimestamp(last_sync_ts).isoformat() if last_sync_ts else None ) conn.close() return { "daily_steps": daily_steps, "weekly_steps": weekly_steps, "monthly_steps": monthly_steps, "last_sync": last_sync, } except Exception as e: self.logger.error(f"Error querying database: {e}") return {} async def publish_steps_data(self, data: Dict): """Publish steps data to MQTT asynchronously""" if not data: return topics = { "daily": f"gadgetbridge/{self.device_name}/steps/daily", "weekly": f"gadgetbridge/{self.device_name}/steps/weekly", "monthly": f"gadgetbridge/{self.device_name}/steps/monthly", "last_sync": f"gadgetbridge/{self.device_name}/last_sync", "battery": f"gadgetbridge/{self.device_name}/battery", } try: await self.mqtt_client.publish( topics["daily"], str(data["daily_steps"]), qos=1 ) await self.mqtt_client.publish( topics["weekly"], str(data["weekly_steps"]), qos=1 ) await self.mqtt_client.publish( topics["monthly"], str(data["monthly_steps"]), qos=1 ) if data["last_sync"]: await self.mqtt_client.publish( topics["last_sync"], data["last_sync"], qos=1 ) # Publish battery level battery_level = self.get_battery_level() if battery_level is not None: await self.mqtt_client.publish( topics["battery"], str(battery_level), qos=1 ) self.logger.info( f"Published steps data: Daily={data['daily_steps']}, Weekly={data['weekly_steps']}, Monthly={data['monthly_steps']}, Battery={battery_level}" ) except Exception as e: self.logger.error(f"Failed to publish steps/battery data: {e}") async def run(self): """Main execution method (async)""" self.logger.info("Starting Gadgetbridge MQTT Publisher") try: async with aiomqtt.Client( hostname=self.mqtt_config["broker"], port=self.mqtt_config["port"], username=self.mqtt_config["username"] or None, password=self.mqtt_config["password"] or None, ) as client: self.mqtt_client = client await self.setup_home_assistant_entities() # Publish immediately on startup steps_data = self.get_steps_data() if steps_data: await self.publish_steps_data(steps_data) self.logger.info( f"Sleeping for {self.publish_interval} seconds before next publish..." ) while True: await asyncio.sleep(self.publish_interval) steps_data = self.get_steps_data() if steps_data: await self.publish_steps_data(steps_data) self.logger.info( f"Sleeping for {self.publish_interval} seconds before next publish..." ) except Exception as e: self.logger.error(f"Failed to connect to MQTT broker: {e}") def get_battery_level(self) -> Optional[int]: """Get the latest battery level from BATTERY_LEVEL table""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( "SELECT LEVEL FROM BATTERY_LEVEL ORDER BY TIMESTAMP DESC LIMIT 1" ) row = cursor.fetchone() conn.close() return row[0] if row else None except Exception as e: self.logger.error(f"Error querying battery level: {e}") return None def get_all_device_names(db_path): """Returns a list of all unique device names from the database.""" if not os.path.exists(db_path): print(f"Database file not found: {db_path}") return [] try: conn = sqlite3.connect(db_path) cursor = conn.cursor() # Try to find a devices table. If not, look for a device column in samples. cursor.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='DEVICES'" ) if cursor.fetchone(): cursor.execute("SELECT DISTINCT NAME FROM DEVICES") names = [row[0] for row in cursor.fetchall()] else: # See if MI_BAND_ACTIVITY_SAMPLE has a device column cursor.execute("PRAGMA table_info(MI_BAND_ACTIVITY_SAMPLE)") columns = [row[1] for row in cursor.fetchall()] names = [] if "DEVICE_NAME" in columns: cursor.execute( "SELECT DISTINCT DEVICE_NAME FROM MI_BAND_ACTIVITY_SAMPLE" ) names = [row[0] for row in cursor.fetchall()] conn.close() return names except Exception as e: print(f"Error querying database: {e}") return [] def print_db_context(db_path): print(f"\nDatabase path: {db_path}") if os.path.exists(db_path): size_mb = os.path.getsize(db_path) / (1024 * 1024) print(f"Database size: {size_mb:.2f} MB") try: conn = sqlite3.connect(db_path) cursor = conn.cursor() # List tables cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [row[0] for row in cursor.fetchall()] print(f"Tables in database: {tables}") # Row counts for key tables for table in tables: try: cursor.execute(f"SELECT COUNT(*) FROM {table}") count = cursor.fetchone()[0] print(f" {table}: {count} rows") except Exception as e: print(f" {table}: error counting rows ({e})") # Show sample from MI_BAND_ACTIVITY_SAMPLE if "MI_BAND_ACTIVITY_SAMPLE" in tables: cursor.execute("SELECT * FROM MI_BAND_ACTIVITY_SAMPLE LIMIT 3") rows = cursor.fetchall() print("Sample rows from MI_BAND_ACTIVITY_SAMPLE:") for row in rows: print(" ", row) conn.close() except Exception as e: print(f"Error reading database: {e}") else: print("Database file not found.") def print_device_table(db_path): try: conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute("SELECT * FROM DEVICE") rows = cursor.fetchall() print("DEVICE table contents:") for row in rows: print(row) conn.close() except Exception as e: print(f"Error reading DEVICE table: {e}") # --- Main Entry Point --- if __name__ == "__main__": db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge") print_db_context(db_path) # For debugging and context print_device_table(db_path) device_name = os.getenv("DEVICE_NAME", "").strip() if not device_name or device_name.lower() == "unknown": # Device name is not set or is explicitly 'unknown' print( "Device name is not set. Attempting to list all available device names from the database...\n" ) device_names = get_all_device_names(db_path) if device_names: print("Available device names in this database:") for n in device_names: print(" -", n) else: print("No device names could be found in the database.") print( "\nWaiting 10 minutes (600 seconds) before terminating to allow user review..." ) time.sleep(600) print("Terminating script.") exit(0) # continuous publisher only runs if device_name is present and valid publisher = GadgetbridgeMQTTPublisher() asyncio.run(publisher.run())