GadgetbridgeMqtt/main.py
2025-12-17 13:34:26 +00:00

707 lines
29 KiB
Python

#!/usr/bin/env python3
"""
Gadgetbridge MQTT Publisher for Termux
Watches for Gadgetbridge exports and publishes sensor data to Home Assistant via MQTT
"""
import os
import sqlite3
import json
import logging
import sys
import time
import re
import signal
from datetime import datetime, timedelta
from pathlib import Path
# MQTT library - paho-mqtt works well on Termux
try:
import paho.mqtt.client as mqtt
except ImportError:
if "PYTEST_CURRENT_TEST" in os.environ:
mqtt = None
else:
print("Error: paho-mqtt not installed. Run: pip install paho-mqtt")
sys.exit(1)
# --- Configuration ---
CONFIG_FILE = os.path.expanduser("~/.config/gadgetbridge_mqtt/config.json")
GB_EXPORT_DIR = "/storage/emulated/0/Documents/GB_Export"
PUBLISH_INTERVAL = 300 # 5 minutes
# --- Logging ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
stream=sys.stdout
)
logger = logging.getLogger(__name__)
def load_config():
"""Load MQTT configuration from config file"""
if not os.path.exists(CONFIG_FILE):
logger.error(f"Config file not found: {CONFIG_FILE}")
logger.error("Run setup.py first to configure MQTT settings")
sys.exit(1)
with open(CONFIG_FILE, "r") as f:
return json.load(f)
def send_gadgetbridge_intent(action, extra_args=""):
"""Send broadcast intent to Gadgetbridge via Termux API"""
cmd = f"am broadcast -a {action} {extra_args} -p nodomain.freeyourgadget.gadgetbridge"
logger.info(f"Sending intent: {action}")
result = os.system(cmd)
if result != 0:
logger.warning(f"Intent may have failed (exit code: {result})")
return result == 0
def trigger_bluetooth_connect(device_mac):
"""Force Gadgetbridge to connect to the device via Bluetooth.
This fixes the 'disconnected' state that can occur when the band
loses connection during sleep or when the phone is idle.
"""
if not device_mac:
logger.warning("No device MAC address configured, skipping Bluetooth connect")
return False
action = "nodomain.freeyourgadget.gadgetbridge.BLUETOOTH_CONNECT"
extra = f"-e EXTRA_DEVICE_ADDRESS '{device_mac}'"
return send_gadgetbridge_intent(action, extra)
def trigger_gadgetbridge_sync(device_mac=None):
"""Trigger Gadgetbridge to sync data from band and export database.
If device_mac is provided, first forces a Bluetooth reconnection.
"""
# Step 1: Force Bluetooth connection (fixes disconnected state)
if device_mac:
trigger_bluetooth_connect(device_mac)
time.sleep(10) # Wait for Bluetooth connection to establish
# Step 2: Sync activity data from band to phone
send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.ACTIVITY_SYNC")
time.sleep(10) # Wait for sync to complete
# Step 3: Trigger database export
send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.TRIGGER_EXPORT")
def find_latest_db(export_dir):
"""Find the most recent Gadgetbridge database file in export directory"""
if not os.path.exists(export_dir):
return None
db_files = []
for f in os.listdir(export_dir):
if f.endswith(".db") and "Gadgetbridge" in f:
path = os.path.join(export_dir, f)
db_files.append((path, os.path.getmtime(path)))
if not db_files:
return None
# Return the most recently modified database
db_files.sort(key=lambda x: x[1], reverse=True)
return db_files[0][0]
class GadgetbridgeMQTT:
def __init__(self, config):
self.config = config
self.db_path = None
self.device_name = "fitness_tracker" # MQTT identifier (always fitness_tracker)
self.device_alias = "Unknown" # Actual device name for display
self.device_id = None # Track device ID for filtering queries
self.device_mac = None # Track device MAC for Bluetooth reconnection
self.mqtt_client = None
self.last_publish_time = 0
self.last_db_mtime = 0
self.running = True
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully"""
logger.info(f"Received signal {signum}. Shutting down...")
self.running = False
@staticmethod
def _compute_awake(is_awake_flag, wakeup_raw, stage_code, stage_timestamp_ms, now_ms, avg_recent_hr=None, resting_hr=None):
"""Decide awake state using most reliable signals first.
Priority:
1. If past wakeup time -> awake (session ended)
2. If is_awake_flag explicitly set to 1 -> awake
3. If recent sleep stage shows awake (code 5) -> awake
4. If recent sleep stage shows sleep (2=deep, 3=light, 4=REM) -> sleeping
5. If we're within a sleep session (before wakeup time) -> sleeping
6. Use HR as fallback: HR < (resting_hr + 10) suggests sleeping
7. Default: awake
Stage codes for Xiaomi: 2=deep, 3=light, 4=REM, 5=awake
"""
# If we're past the wakeup time, the session is over -> awake
if wakeup_raw is not None and wakeup_raw <= now_ms:
return True
if is_awake_flag == 1:
return True
# Check if stage data is recent (within 2 hours for better coverage)
recent_stage = stage_timestamp_ms is not None and now_ms - stage_timestamp_ms <= 2 * 60 * 60 * 1000
if recent_stage and stage_code is not None:
if stage_code == 5: # AWAKE stage
return True
if stage_code in (2, 3, 4): # Deep(2), Light(3), REM(4)
return False
# If within a sleep session (wakeup time is in the future), likely sleeping
if wakeup_raw is not None and wakeup_raw > now_ms:
return False
# Use HR as fallback indicator - low HR suggests sleeping
# Use resting HR + 10 as threshold, or default to 65 if no resting HR available
hr_threshold = (resting_hr + 10) if resting_hr else 65
if avg_recent_hr is not None and avg_recent_hr < hr_threshold:
return False
return True
def connect_mqtt(self):
"""Connect to MQTT broker"""
if mqtt is None:
logger.error("paho-mqtt not available; cannot publish")
return False
# Use callback API version 2 to avoid deprecation warning
try:
self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
except (AttributeError, TypeError):
# Fallback for older paho-mqtt versions
self.mqtt_client = mqtt.Client()
if self.config.get("mqtt_username") and self.config.get("mqtt_password"):
self.mqtt_client.username_pw_set(
self.config["mqtt_username"],
self.config["mqtt_password"]
)
try:
self.mqtt_client.connect(
self.config["mqtt_broker"],
self.config.get("mqtt_port", 1883),
60
)
self.mqtt_client.loop_start()
# Wait for connection to establish
time.sleep(1)
logger.info(f"Connected to MQTT broker: {self.config['mqtt_broker']}")
return True
except Exception as e:
logger.error(f"MQTT connection failed: {e}")
return False
def disconnect_mqtt(self):
"""Disconnect from MQTT broker"""
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
def get_device_info(self, cursor):
"""Get device ID, alias, and MAC from database - picks device with most recent activity"""
try:
# Find the device with the most recent activity data
# This ensures we get the currently active band, not an old one
cursor.execute("""
SELECT d._id, d.ALIAS, d.NAME, d.IDENTIFIER FROM DEVICE d
WHERE (LOWER(d.NAME) LIKE '%band%' OR LOWER(d.NAME) LIKE '%watch%')
ORDER BY d._id DESC
LIMIT 1
""")
row = cursor.fetchone()
if row:
device_id = row[0]
# Get actual device name for display
device_alias = row[1] if row[1] else row[2] # Use ALIAS, fallback to NAME
device_mac = row[3] # MAC address / IDENTIFIER
logger.info(f"Selected device: ID={device_id}, Name={device_alias}, MAC={device_mac}")
return device_id, device_alias, device_mac
except Exception as e:
logger.error(f"Error getting device info: {e}")
return None, "Unknown", None
def get_day_start_timestamp(self):
"""Get timestamp for start of current day (4am)"""
now = datetime.now()
today = now.date()
day_start = datetime.combine(today, datetime.min.time()).replace(hour=4)
if now.hour < 4:
day_start -= timedelta(days=1)
return int(day_start.timestamp())
def get_day_midnight_timestamp(self):
"""Get midnight timestamp in seconds for daily summary queries"""
now = datetime.now()
today = now.date()
midnight = datetime.combine(today, datetime.min.time())
if now.hour < 4:
midnight -= timedelta(days=1)
return int(midnight.timestamp())
def query_sensors(self, cursor):
"""Query all sensor data from database - filtered by device_id"""
data = {}
day_start_ts = self.get_day_start_timestamp()
now_ts = int(datetime.now().timestamp())
day_midnight = self.get_day_midnight_timestamp()
now_ms = int(time.time() * 1000)
# Query sleep stage FIRST (needed for is_awake calculation)
stage_code = None
stage_timestamp_ms = None
try:
cursor.execute(
"""
SELECT STAGE, TIMESTAMP
FROM XIAOMI_SLEEP_STAGE_SAMPLE
WHERE DEVICE_ID = ?
ORDER BY TIMESTAMP DESC
LIMIT 1
""",
(self.device_id,)
)
row = cursor.fetchone()
if row:
stage_code, stage_timestamp_ms = row
except Exception as e:
logger.debug(f"Sleep stage query failed: {e}")
# Query average recent HR (last 10 minutes) for sleep detection fallback
avg_recent_hr = None
try:
cursor.execute(
"""
SELECT AVG(HEART_RATE)
FROM XIAOMI_ACTIVITY_SAMPLE
WHERE DEVICE_ID = ?
AND HEART_RATE > 0 AND HEART_RATE < 255
AND TIMESTAMP >= ?
""",
(self.device_id, now_ts - 600) # Last 10 minutes
)
row = cursor.fetchone()
if row and row[0]:
avg_recent_hr = row[0]
except Exception as e:
logger.debug(f"Recent HR query failed: {e}")
# Query resting HR for dynamic sleep threshold
resting_hr = None
try:
cursor.execute(
"""
SELECT HR_RESTING
FROM XIAOMI_DAILY_SUMMARY_SAMPLE
WHERE DEVICE_ID = ? AND HR_RESTING > 0
ORDER BY TIMESTAMP DESC
LIMIT 1
""",
(self.device_id,)
)
row = cursor.fetchone()
if row and row[0]:
resting_hr = row[0]
except Exception as e:
logger.debug(f"Resting HR query failed: {e}")
# Daily Steps (filtered by device)
try:
cursor.execute(
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?",
(self.device_id, day_start_ts, now_ts)
)
data["daily_steps"] = cursor.fetchone()[0] or 0
except Exception as e:
logger.debug(f"Daily steps query failed: {e}")
# Weekly Steps (filtered by device)
try:
now = datetime.now()
week_start = now.date() - timedelta(days=now.date().weekday())
week_start_time = datetime.combine(week_start, datetime.min.time()).replace(hour=4)
if now.date().weekday() == 0 and now.hour < 4:
week_start_time -= timedelta(days=7)
cursor.execute(
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?",
(self.device_id, int(week_start_time.timestamp()), now_ts)
)
data["weekly_steps"] = cursor.fetchone()[0] or 0
except Exception as e:
logger.debug(f"Weekly steps query failed: {e}")
# Battery Level (filtered by device)
try:
cursor.execute(
"SELECT LEVEL FROM BATTERY_LEVEL WHERE DEVICE_ID = ? ORDER BY TIMESTAMP DESC LIMIT 1",
(self.device_id,)
)
row = cursor.fetchone()
if row:
data["battery_level"] = row[0]
except Exception as e:
logger.debug(f"Battery query failed: {e}")
# Latest Heart Rate (filtered by device)
try:
cursor.execute(
"SELECT HEART_RATE FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 1",
(self.device_id,)
)
row = cursor.fetchone()
if row:
data["heart_rate"] = row[0]
except Exception as e:
logger.debug(f"Heart rate query failed: {e}")
# Daily Summary Data (filtered by device)
# Note: XIAOMI_DAILY_SUMMARY_SAMPLE uses MILLISECONDS timestamps
try:
cursor.execute(
"SELECT HR_RESTING, HR_MAX, HR_AVG, CALORIES FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1",
(self.device_id, day_midnight * 1000) # Convert to milliseconds
)
row = cursor.fetchone()
if row:
if row[0]: data["hr_resting"] = row[0]
if row[1]: data["hr_max"] = row[1]
if row[2]: data["hr_avg"] = row[2]
if row[3]: data["calories"] = row[3]
except Exception as e:
logger.debug(f"Daily summary query failed: {e}")
# Sleep Data (filtered by device)
# Note: XIAOMI_SLEEP_TIME_SAMPLE uses MILLISECONDS timestamps
try:
day_ago_ts_ms = (int(time.time()) - 24 * 3600) * 1000 # 24h ago in milliseconds
cursor.execute(
"""
SELECT TOTAL_DURATION, IS_AWAKE, WAKEUP_TIME, TIMESTAMP,
DEEP_SLEEP_DURATION, LIGHT_SLEEP_DURATION, REM_SLEEP_DURATION, AWAKE_DURATION
FROM XIAOMI_SLEEP_TIME_SAMPLE
WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?
ORDER BY TIMESTAMP DESC
LIMIT 1
""",
(self.device_id, day_ago_ts_ms, now_ms) # Only sessions that have started
)
row = cursor.fetchone()
if row:
(total_duration, is_awake_flag, wakeup_raw, sleep_start_ms,
deep_dur, light_dur, rem_dur, awake_dur) = row
# Calculate sleep duration using multiple strategies
actual_sleep_min = 0
# Strategy 1: Use breakdown sum if available (most accurate)
if deep_dur or light_dur or rem_dur:
actual_sleep_min = (deep_dur or 0) + (light_dur or 0) + (rem_dur or 0)
# Strategy 2: Calculate from stage data (count sleep stage entries)
# This works even when TOTAL_DURATION is incomplete
if actual_sleep_min == 0 and sleep_start_ms and wakeup_raw:
try:
cursor.execute(
"""
SELECT
MIN(TIMESTAMP) as first_stage,
MAX(TIMESTAMP) as last_stage,
COUNT(*) as stage_count,
SUM(CASE WHEN STAGE IN (2,3,4) THEN 1 ELSE 0 END) as sleep_count
FROM XIAOMI_SLEEP_STAGE_SAMPLE
WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?
""",
(self.device_id, sleep_start_ms, wakeup_raw)
)
stage_row = cursor.fetchone()
if stage_row and stage_row[0] and stage_row[1]:
# Calculate duration from stage time span
stage_span_min = (stage_row[1] - stage_row[0]) / 1000 / 60
if stage_span_min > 30: # At least 30 min of stage data
actual_sleep_min = stage_span_min
except Exception as e:
logger.debug(f"Stage duration calculation failed: {e}")
# Strategy 3: Use TOTAL_DURATION if available
if actual_sleep_min == 0 and total_duration:
actual_sleep_min = total_duration
# Strategy 4: Calculate from session time range (least accurate)
if actual_sleep_min < 30 and sleep_start_ms and wakeup_raw:
session_minutes = (wakeup_raw - sleep_start_ms) / 1000 / 60
# Only use if session looks reasonable (1-14 hours)
if 60 <= session_minutes <= 840:
# Estimate ~10% awake time
actual_sleep_min = session_minutes * 0.9
if actual_sleep_min > 0:
data["sleep_duration"] = round(actual_sleep_min / 60.0, 2)
# Determine if currently in a sleep session
# User is sleeping if: sleep_start <= now < wakeup_time
in_sleep_session = (
sleep_start_ms is not None and
wakeup_raw is not None and
sleep_start_ms <= now_ms < wakeup_raw
)
data["in_sleep_session"] = in_sleep_session
is_awake = self._compute_awake(
is_awake_flag=is_awake_flag,
wakeup_raw=wakeup_raw,
stage_code=stage_code,
stage_timestamp_ms=stage_timestamp_ms,
now_ms=now_ms,
avg_recent_hr=avg_recent_hr,
resting_hr=resting_hr,
)
data["is_awake"] = is_awake
# Report sleep stage - Xiaomi codes: 2=deep, 3=light, 4=REM, 5=awake
# Stage mapping for Xiaomi devices (verified from database)
stage_names = {
0: "not_sleep",
1: "unknown",
2: "deep_sleep",
3: "light_sleep",
4: "rem_sleep",
5: "awake"
}
if stage_code is not None and stage_timestamp_ms is not None:
stage_age_minutes = (now_ms - stage_timestamp_ms) / 1000 / 60
# Report stage if in sleep session or stage is recent (within 2 hours)
if in_sleep_session or stage_age_minutes <= 120:
data["sleep_stage"] = stage_names.get(stage_code, f"unknown_{stage_code}")
data["sleep_stage_code"] = stage_code
else:
# Not in sleep session and stage is stale
data["sleep_stage"] = "not_sleep"
data["sleep_stage_code"] = 0
else:
# No stage data
data["sleep_stage"] = "not_sleep" if is_awake else "unknown"
data["sleep_stage_code"] = 0
else:
# No sleep data - user is awake
data["is_awake"] = True
data["in_sleep_session"] = False
data["sleep_stage"] = "not_sleep"
data["sleep_stage_code"] = 0
except Exception as e:
logger.debug(f"Sleep query failed: {e}")
# Weight (not device-specific, from scale)
try:
cursor.execute("SELECT WEIGHT_KG FROM MI_SCALE_WEIGHT_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1")
row = cursor.fetchone()
if row and row[0]:
data["weight"] = round(row[0], 1) # Round to 1 decimal
except Exception as e:
logger.debug(f"Weight query failed: {e}")
# Server time
data["server_time"] = datetime.now().astimezone().isoformat()
# Device name (actual band name)
data["device"] = self.device_alias
return data
def publish_discovery(self):
"""Publish Home Assistant MQTT discovery configs"""
device_info = {
"identifiers": [self.device_name],
"name": "Fitness Tracker",
"model": self.device_alias,
"manufacturer": "Gadgetbridge",
}
sensors = [
("device", "Device", None, "mdi:watch", None, None),
("daily_steps", "Daily Steps", "steps", "mdi:walk", "total_increasing", None),
("weekly_steps", "Weekly Steps", "steps", "mdi:walk", "total", None),
("battery_level", "Battery", "%", "mdi:battery", None, "battery"),
("heart_rate", "Heart Rate", "bpm", "mdi:heart-pulse", "measurement", None),
("hr_resting", "Resting HR", "bpm", "mdi:heart-pulse", "measurement", None),
("hr_max", "Max HR", "bpm", "mdi:heart-pulse", "measurement", None),
("hr_avg", "Average HR", "bpm", "mdi:heart-pulse", "measurement", None),
("calories", "Calories", "kcal", "mdi:fire", "total_increasing", None),
("sleep_duration", "Sleep Duration", "h", "mdi:sleep", "measurement", None),
("is_awake", "Is Awake", None, "mdi:power-sleep", None, None),
("in_sleep_session", "In Sleep Session", None, "mdi:bed", None, None),
("sleep_stage", "Sleep Stage", None, "mdi:sleep-cycle", None, None),
("sleep_stage_code", "Sleep Stage Code", None, "mdi:numeric", "measurement", None),
("weight", "Weight", "kg", "mdi:scale-bathroom", "measurement", None),
("server_time", "Last Update", None, "mdi:clock-outline", None, "timestamp"),
]
for sensor_id, name, unit, icon, state_class, device_class in sensors:
config = {
"name": f"Fitness Tracker {name}",
"unique_id": f"{self.device_name}_{sensor_id}",
"state_topic": f"gadgetbridge/{self.device_name}/{sensor_id}",
"device": device_info,
"icon": icon,
}
if unit:
config["unit_of_measurement"] = unit
if state_class:
config["state_class"] = state_class
if device_class:
config["device_class"] = device_class
topic = f"homeassistant/sensor/{self.device_name}_{sensor_id}/config"
self.mqtt_client.publish(topic, json.dumps(config), qos=1, retain=True)
logger.info("Published Home Assistant discovery configs")
def publish_data(self, data):
"""Publish sensor data to MQTT"""
for key, value in data.items():
topic = f"gadgetbridge/{self.device_name}/{key}"
# Use retain=True so HA gets values on restart
self.mqtt_client.publish(topic, str(value), qos=1, retain=True)
logger.info(f"Published: steps={data.get('daily_steps', 'N/A')}, "
f"hr={data.get('heart_rate', 'N/A')}, "
f"battery={data.get('battery_level', 'N/A')}%")
def process_database(self):
"""Read database and publish data"""
if not self.db_path or not os.path.exists(self.db_path):
logger.warning("No database file found")
return False
try:
conn = sqlite3.connect(self.db_path, timeout=10.0)
cursor = conn.cursor()
# Get device ID, name, and MAC
self.device_id, self.device_alias, self.device_mac = self.get_device_info(cursor)
if not self.device_id:
logger.warning("No fitness device found in database")
conn.close()
return False
# Query all sensors
data = self.query_sensors(cursor)
conn.close()
if data:
self.publish_data(data)
return True
except Exception as e:
logger.error(f"Database error: {e}")
return False
def run(self):
"""Main loop"""
export_dir = self.config.get("export_dir", GB_EXPORT_DIR)
interval = self.config.get("publish_interval", PUBLISH_INTERVAL)
logger.info(f"Starting Gadgetbridge MQTT Publisher")
logger.info(f"Export directory: {export_dir}")
logger.info(f"Publish interval: {interval}s")
# Ensure export directory exists
os.makedirs(export_dir, exist_ok=True)
# Connect to MQTT
if not self.connect_mqtt():
logger.error("Failed to connect to MQTT. Exiting.")
sys.exit(1)
discovery_published = False
try:
while self.running:
current_time = time.time()
# Check if it's time to trigger sync and publish
if current_time - self.last_publish_time >= interval:
try:
logger.info("Triggering Gadgetbridge sync...")
# Use device MAC for Bluetooth reconnection if available
trigger_gadgetbridge_sync(self.device_mac)
# Wait for export to complete
time.sleep(5)
# Find latest database with retry
self.db_path = find_latest_db(export_dir)
if not self.db_path:
logger.warning(f"No database found, retrying in 3s...")
time.sleep(3)
self.db_path = find_latest_db(export_dir)
if self.db_path:
logger.info(f"Using database: {os.path.basename(self.db_path)}")
# Publish discovery on first successful read
if not discovery_published:
# Need to read device info first
try:
conn = sqlite3.connect(self.db_path, timeout=5.0)
cursor = conn.cursor()
self.device_id, self.device_alias, self.device_mac = self.get_device_info(cursor)
conn.close()
if self.device_mac:
logger.info(f"Device MAC for reconnection: {self.device_mac}")
except Exception as e:
logger.warning(f"Could not read device info: {e}")
self.publish_discovery()
discovery_published = True
self.process_database()
else:
logger.warning(f"No database found in {export_dir}")
except Exception as e:
logger.error(f"Sync cycle failed: {e}")
self.last_publish_time = current_time
logger.info(f"Next publish in {interval}s...")
time.sleep(10) # Check every 10 seconds
except KeyboardInterrupt:
logger.info("Interrupted by user")
finally:
logger.info("Cleaning up...")
self.disconnect_mqtt()
def main():
config = load_config()
publisher = GadgetbridgeMQTT(config)
publisher.run()
if __name__ == "__main__":
main()