GadgetbridgeMqtt/main.py

421 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Gadgetbridge MQTT Publisher for Termux
Watches for Gadgetbridge exports and publishes sensor data to Home Assistant via MQTT
"""
import os
import sqlite3
import json
import logging
import sys
import time
import re
from datetime import datetime, timedelta
from pathlib import Path
# MQTT library - paho-mqtt works well on Termux
try:
import paho.mqtt.client as mqtt
except ImportError:
print("Error: paho-mqtt not installed. Run: pip install paho-mqtt")
sys.exit(1)
# --- Configuration ---
CONFIG_FILE = os.path.expanduser("~/.config/gadgetbridge_mqtt/config.json")
GB_EXPORT_DIR = "/storage/emulated/0/Documents/GB_Export"
PUBLISH_INTERVAL = 300 # 5 minutes
# --- Logging ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
stream=sys.stdout
)
logger = logging.getLogger(__name__)
def load_config():
"""Load MQTT configuration from config file"""
if not os.path.exists(CONFIG_FILE):
logger.error(f"Config file not found: {CONFIG_FILE}")
logger.error("Run setup.py first to configure MQTT settings")
sys.exit(1)
with open(CONFIG_FILE, "r") as f:
return json.load(f)
def send_gadgetbridge_intent(action):
"""Send broadcast intent to Gadgetbridge via Termux API"""
cmd = f"am broadcast -a {action} -p nodomain.freeyourgadget.gadgetbridge"
logger.info(f"Sending intent: {action}")
result = os.system(cmd)
if result != 0:
logger.warning(f"Intent may have failed (exit code: {result})")
def trigger_gadgetbridge_sync():
"""Trigger Gadgetbridge to sync data from band and export database"""
# First sync data from the band
send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.ACTIVITY_SYNC")
time.sleep(5) # Give it time to sync
# Then trigger database export
send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.TRIGGER_EXPORT")
def find_latest_db(export_dir):
"""Find the most recent Gadgetbridge database file in export directory"""
if not os.path.exists(export_dir):
return None
db_files = []
for f in os.listdir(export_dir):
if f.endswith(".db") and "Gadgetbridge" in f:
path = os.path.join(export_dir, f)
db_files.append((path, os.path.getmtime(path)))
if not db_files:
return None
# Return the most recently modified database
db_files.sort(key=lambda x: x[1], reverse=True)
return db_files[0][0]
class GadgetbridgeMQTT:
def __init__(self, config):
self.config = config
self.db_path = None
self.device_name = "fitness_tracker"
self.mqtt_client = None
self.last_publish_time = 0
self.last_db_mtime = 0
def connect_mqtt(self):
"""Connect to MQTT broker"""
# Use callback API version 2 to avoid deprecation warning
try:
self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
except (AttributeError, TypeError):
# Fallback for older paho-mqtt versions
self.mqtt_client = mqtt.Client()
if self.config.get("mqtt_username") and self.config.get("mqtt_password"):
self.mqtt_client.username_pw_set(
self.config["mqtt_username"],
self.config["mqtt_password"]
)
try:
self.mqtt_client.connect(
self.config["mqtt_broker"],
self.config.get("mqtt_port", 1883),
60
)
self.mqtt_client.loop_start()
# Wait for connection to establish
time.sleep(1)
logger.info(f"Connected to MQTT broker: {self.config['mqtt_broker']}")
return True
except Exception as e:
logger.error(f"MQTT connection failed: {e}")
return False
def disconnect_mqtt(self):
"""Disconnect from MQTT broker"""
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
def get_device_alias(self, cursor):
"""Get device alias from database"""
try:
cursor.execute("""
SELECT ALIAS FROM DEVICE
WHERE LOWER(NAME) LIKE '%band%' OR LOWER(NAME) LIKE '%watch%'
LIMIT 1
""")
row = cursor.fetchone()
if row and row[0]:
return re.sub(r"\W+", "_", row[0]).lower()
except:
pass
return "fitness_tracker"
def get_day_start_timestamp(self):
"""Get timestamp for start of current day (4am)"""
now = datetime.now()
today = now.date()
day_start = datetime.combine(today, datetime.min.time()).replace(hour=4)
if now.hour < 4:
day_start -= timedelta(days=1)
return int(day_start.timestamp())
def get_day_midnight_timestamp_ms(self):
"""Get midnight timestamp in milliseconds for daily summary queries"""
now = datetime.now()
today = now.date()
midnight = datetime.combine(today, datetime.min.time())
if now.hour < 4:
midnight -= timedelta(days=1)
return int(midnight.timestamp()) * 1000
def query_sensors(self, cursor):
"""Query all sensor data from database"""
data = {}
day_start_ts = self.get_day_start_timestamp()
now_ts = int(datetime.now().timestamp())
day_midnight_ms = self.get_day_midnight_timestamp_ms()
# Daily Steps
try:
cursor.execute(
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?",
(day_start_ts, now_ts)
)
data["daily_steps"] = cursor.fetchone()[0] or 0
except Exception as e:
logger.debug(f"Daily steps query failed: {e}")
# Weekly Steps
try:
now = datetime.now()
week_start = now.date() - timedelta(days=now.date().weekday())
week_start_time = datetime.combine(week_start, datetime.min.time()).replace(hour=4)
if now.date().weekday() == 0 and now.hour < 4:
week_start_time -= timedelta(days=7)
cursor.execute(
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?",
(int(week_start_time.timestamp()), now_ts)
)
data["weekly_steps"] = cursor.fetchone()[0] or 0
except Exception as e:
logger.debug(f"Weekly steps query failed: {e}")
# Battery Level
try:
cursor.execute("SELECT LEVEL FROM BATTERY_LEVEL ORDER BY TIMESTAMP DESC LIMIT 1")
row = cursor.fetchone()
if row:
data["battery_level"] = row[0]
except Exception as e:
logger.debug(f"Battery query failed: {e}")
# Latest Heart Rate
try:
cursor.execute(
"SELECT HEART_RATE FROM XIAOMI_ACTIVITY_SAMPLE WHERE HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 1"
)
row = cursor.fetchone()
if row:
data["heart_rate"] = row[0]
except Exception as e:
logger.debug(f"Heart rate query failed: {e}")
# Daily Summary Data (resting HR, max HR, avg HR, calories)
try:
cursor.execute(
"SELECT HR_RESTING, HR_MAX, HR_AVG, CALORIES FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1",
(day_midnight_ms,)
)
row = cursor.fetchone()
if row:
if row[0]: data["hr_resting"] = row[0]
if row[1]: data["hr_max"] = row[1]
if row[2]: data["hr_avg"] = row[2]
if row[3]: data["calories"] = row[3]
except Exception as e:
logger.debug(f"Daily summary query failed: {e}")
# Sleep Data
try:
day_ago_ms = (int(time.time()) - 24 * 3600) * 1000
cursor.execute(
"SELECT TOTAL_DURATION, IS_AWAKE, WAKEUP_TIME FROM XIAOMI_SLEEP_TIME_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1",
(day_ago_ms,)
)
row = cursor.fetchone()
if row:
if row[0]:
data["sleep_duration"] = round(row[0] / 60, 2) # Convert to hours
# Determine if awake
is_awake = True
if row[1] == 0:
is_awake = False
if row[2] and row[2] <= int(time.time()) * 1000:
is_awake = True
data["is_awake"] = is_awake
except Exception as e:
logger.debug(f"Sleep query failed: {e}")
# Weight
try:
cursor.execute("SELECT WEIGHT_KG FROM MI_SCALE_WEIGHT_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1")
row = cursor.fetchone()
if row and row[0]:
data["weight"] = row[0]
except Exception as e:
logger.debug(f"Weight query failed: {e}")
# Server time
data["server_time"] = datetime.now().astimezone().isoformat()
return data
def publish_discovery(self):
"""Publish Home Assistant MQTT discovery configs"""
device_info = {
"identifiers": [self.device_name],
"name": f"Gadgetbridge {self.device_name.replace('_', ' ').title()}",
"model": "Fitness Tracker",
"manufacturer": "Gadgetbridge",
}
sensors = [
("daily_steps", "Daily Steps", "steps", "mdi:walk", "total_increasing", None),
("weekly_steps", "Weekly Steps", "steps", "mdi:walk", "total", None),
("battery_level", "Battery", "%", "mdi:battery", None, "battery"),
("heart_rate", "Heart Rate", "bpm", "mdi:heart-pulse", "measurement", None),
("hr_resting", "Resting HR", "bpm", "mdi:heart-pulse", "measurement", None),
("hr_max", "Max HR", "bpm", "mdi:heart-pulse", "measurement", None),
("hr_avg", "Average HR", "bpm", "mdi:heart-pulse", "measurement", None),
("calories", "Calories", "kcal", "mdi:fire", "total_increasing", None),
("sleep_duration", "Sleep Duration", "h", "mdi:sleep", "measurement", None),
("is_awake", "Is Awake", None, "mdi:power-sleep", None, None),
("weight", "Weight", "kg", "mdi:scale-bathroom", "measurement", None),
("server_time", "Last Update", None, "mdi:clock-outline", None, "timestamp"),
]
for sensor_id, name, unit, icon, state_class, device_class in sensors:
config = {
"name": f"{self.device_name.replace('_', ' ').title()} {name}",
"unique_id": f"{self.device_name}_{sensor_id}",
"state_topic": f"gadgetbridge/{self.device_name}/{sensor_id}",
"device": device_info,
"icon": icon,
}
if unit:
config["unit_of_measurement"] = unit
if state_class:
config["state_class"] = state_class
if device_class:
config["device_class"] = device_class
topic = f"homeassistant/sensor/{self.device_name}_{sensor_id}/config"
self.mqtt_client.publish(topic, json.dumps(config), qos=1, retain=True)
logger.info("Published Home Assistant discovery configs")
def publish_data(self, data):
"""Publish sensor data to MQTT"""
for key, value in data.items():
topic = f"gadgetbridge/{self.device_name}/{key}"
# Use retain=True so HA gets values on restart
self.mqtt_client.publish(topic, str(value), qos=1, retain=True)
logger.info(f"Published: steps={data.get('daily_steps', 'N/A')}, "
f"hr={data.get('heart_rate', 'N/A')}, "
f"battery={data.get('battery_level', 'N/A')}%")
def process_database(self):
"""Read database and publish data"""
if not self.db_path or not os.path.exists(self.db_path):
logger.warning("No database file found")
return False
try:
conn = sqlite3.connect(self.db_path, timeout=10.0)
cursor = conn.cursor()
# Get device name
self.device_name = self.get_device_alias(cursor)
# Query all sensors
data = self.query_sensors(cursor)
conn.close()
if data:
self.publish_data(data)
return True
except Exception as e:
logger.error(f"Database error: {e}")
return False
def run(self):
"""Main loop"""
export_dir = self.config.get("export_dir", GB_EXPORT_DIR)
interval = self.config.get("publish_interval", PUBLISH_INTERVAL)
logger.info(f"Starting Gadgetbridge MQTT Publisher")
logger.info(f"Export directory: {export_dir}")
logger.info(f"Publish interval: {interval}s")
# Ensure export directory exists
os.makedirs(export_dir, exist_ok=True)
# Connect to MQTT
if not self.connect_mqtt():
logger.error("Failed to connect to MQTT. Exiting.")
sys.exit(1)
discovery_published = False
try:
while True:
current_time = time.time()
# Check if it's time to trigger sync and publish
if current_time - self.last_publish_time >= interval:
logger.info("Triggering Gadgetbridge sync...")
trigger_gadgetbridge_sync()
# Wait for export to complete
time.sleep(10)
# Find latest database
self.db_path = find_latest_db(export_dir)
if self.db_path:
logger.info(f"Using database: {os.path.basename(self.db_path)}")
# Publish discovery on first successful read
if not discovery_published:
# Need to read device name first
try:
conn = sqlite3.connect(self.db_path, timeout=5.0)
cursor = conn.cursor()
self.device_name = self.get_device_alias(cursor)
conn.close()
except:
pass
self.publish_discovery()
discovery_published = True
self.process_database()
else:
logger.warning(f"No database found in {export_dir}")
self.last_publish_time = current_time
logger.info(f"Next publish in {interval}s...")
time.sleep(10) # Check every 10 seconds
except KeyboardInterrupt:
logger.info("Shutting down...")
finally:
self.disconnect_mqtt()
def main():
config = load_config()
publisher = GadgetbridgeMQTT(config)
publisher.run()
if __name__ == "__main__":
main()