GadgetbridgeMqtt/main.py

639 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Gadgetbridge MQTT Publisher for Termux
Watches for Gadgetbridge exports and publishes sensor data to Home Assistant via MQTT
Updated with improved sleep detection and HR-derived sleep stage
"""
import os
import sqlite3
import json
import logging
import sys
import time
import re
import signal
from datetime import datetime, timedelta
from pathlib import Path
# MQTT library - paho-mqtt works well on Termux
try:
import paho.mqtt.client as mqtt
except ImportError:
if "PYTEST_CURRENT_TEST" in os.environ:
mqtt = None
else:
print("Error: paho-mqtt not installed. Run: pip install paho-mqtt")
sys.exit(1)
# --- Configuration ---
CONFIG_FILE = os.path.expanduser("~/.config/gadgetbridge_mqtt/config.json")
GB_EXPORT_DIR = "/storage/emulated/0/Documents/GB_Export"
PUBLISH_INTERVAL = 300 # 5 minutes
STALE_THRESHOLD_SECONDS = 1200 # 20 minutes (Force reconnect if no new data for this long)
# --- Logging ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
stream=sys.stdout
)
logger = logging.getLogger(__name__)
def load_config():
"""Load MQTT configuration from config file"""
if not os.path.exists(CONFIG_FILE):
logger.error(f"Config file not found: {CONFIG_FILE}")
logger.error("Run setup.py first to configure MQTT settings")
sys.exit(1)
with open(CONFIG_FILE, "r") as f:
return json.load(f)
def send_gadgetbridge_intent(action, extra_args=""):
"""Send broadcast intent to Gadgetbridge via Termux API"""
cmd = f"am broadcast -a {action} {extra_args} -p nodomain.freeyourgadget.gadgetbridge"
logger.info(f"Sending intent: {action}")
result = os.system(cmd)
if result != 0:
logger.warning(f"Intent may have failed (exit code: {result})")
return result == 0
def trigger_bluetooth_connect(device_mac):
"""Force Gadgetbridge to connect to the device via Bluetooth."""
if not device_mac:
logger.warning("No device MAC address configured, skipping Bluetooth connect")
return False
action = "nodomain.freeyourgadget.gadgetbridge.BLUETOOTH_CONNECT"
extra = f"-e EXTRA_DEVICE_ADDRESS '{device_mac}'"
return send_gadgetbridge_intent(action, extra)
def trigger_bluetooth_reconnect(device_mac):
"""Force a full disconnect cycle for zombie recovery."""
if not device_mac:
logger.warning("No device MAC address available for reconnection")
return False
logger.warning("Initiating Force Disconnect (Zombie Recovery)...")
send_gadgetbridge_intent(
"nodomain.freeyourgadget.gadgetbridge.BLUETOOTH_DISCONNECT",
f"-e EXTRA_DEVICE_ADDRESS '{device_mac}'"
)
logger.info("Waiting 15s for Bluetooth stack to clear...")
time.sleep(15)
return True
def trigger_gadgetbridge_sync(device_mac=None):
"""Trigger Gadgetbridge to sync data from band and export database."""
if device_mac:
trigger_bluetooth_connect(device_mac)
time.sleep(10)
send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.ACTIVITY_SYNC")
time.sleep(10)
send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.TRIGGER_EXPORT")
def find_latest_db(export_dir):
"""Find the most recent Gadgetbridge database file in export directory"""
if not os.path.exists(export_dir):
return None
db_files = []
for f in os.listdir(export_dir):
if f.endswith(".db") and "Gadgetbridge" in f:
path = os.path.join(export_dir, f)
db_files.append((path, os.path.getmtime(path)))
if not db_files:
return None
db_files.sort(key=lambda x: x[1], reverse=True)
return db_files[0][0]
class GadgetbridgeMQTT:
def __init__(self, config):
self.config = config
self.db_path = None
self.device_name = "fitness_tracker"
self.device_alias = "Unknown"
self.device_id = None
self.device_mac = None
self.mqtt_client = None
self.last_publish_time = 0
self.last_db_mtime = 0
self.running = True
self.last_data_timestamp = 0
self.is_sleeping = False
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
def _signal_handler(self, signum, frame):
logger.info(f"Received signal {signum}. Shutting down...")
self.running = False
@staticmethod
def _compute_awake(is_awake_flag, wakeup_raw, stage_code, stage_timestamp_ms, now_ms, avg_recent_hr=None, resting_hr=None):
"""Enhanced awake logic: Strong bias to sleeping during active sessions."""
# 1. Past wakeup -> definitely awake
if wakeup_raw is not None and wakeup_raw <= now_ms:
return True
# 2. Active sleep session? Bias heavily to sleeping
in_session = wakeup_raw is not None and wakeup_raw > now_ms
if in_session:
recent_stage = stage_timestamp_ms is not None and now_ms - stage_timestamp_ms <= 2 * 60 * 60 * 1000
if recent_stage and stage_code == 5: # Recent AWAKE stage
return True
if recent_stage and stage_code in (2, 3, 4):
return False
# No recent stage in session -> assume sleeping (common for Xiaomi)
return False
# 3. Not in session: Use stages or HR (30min recency)
recent_stage = stage_timestamp_ms is not None and now_ms - stage_timestamp_ms <= 30 * 60 * 1000
if recent_stage:
if stage_code == 5:
return True
if stage_code in (2, 3, 4):
return False
# 4. HR fallback (tighter threshold)
if avg_recent_hr is not None:
hr_threshold = (resting_hr or 60) + 5 if resting_hr else 60
if avg_recent_hr < hr_threshold:
return False
# 5. Default: awake
return True
@staticmethod
def _hr_to_sleep_stage(avg_hr, resting_hr):
"""Derive sleep stage from HR (fallback when band stages unreliable)."""
if not avg_hr or not resting_hr:
return "unknown"
ratio = avg_hr / resting_hr
if ratio < 0.85:
return "deep_sleep"
elif ratio < 1.1:
return "light_sleep"
elif ratio < 1.3:
return "rem_sleep"
else:
return "awake"
def connect_mqtt(self):
if mqtt is None:
logger.error("paho-mqtt not available")
return False
try:
self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
except (AttributeError, TypeError):
self.mqtt_client = mqtt.Client()
if self.config.get("mqtt_username") and self.config.get("mqtt_password"):
self.mqtt_client.username_pw_set(
self.config["mqtt_username"], self.config["mqtt_password"]
)
try:
self.mqtt_client.connect(self.config["mqtt_broker"], self.config.get("mqtt_port", 1883), 60)
self.mqtt_client.loop_start()
time.sleep(1)
logger.info(f"Connected to MQTT broker: {self.config['mqtt_broker']}")
return True
except Exception as e:
logger.error(f"MQTT connection failed: {e}")
return False
def disconnect_mqtt(self):
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
def get_device_info(self, cursor):
try:
cursor.execute("""
SELECT d._id, d.ALIAS, d.NAME, d.IDENTIFIER FROM DEVICE d
WHERE (LOWER(d.NAME) LIKE '%band%' OR LOWER(d.NAME) LIKE '%watch%')
ORDER BY d._id DESC LIMIT 1
""")
row = cursor.fetchone()
if row:
device_id = row[0]
device_alias = row[1] if row[1] else row[2]
device_mac = row[3]
logger.info(f"Selected device: ID={device_id}, Name={device_alias}, MAC={device_mac}")
return device_id, device_alias, device_mac
except Exception as e:
logger.error(f"Error getting device info: {e}")
return None, "Unknown", None
def get_day_start_timestamp(self):
now = datetime.now()
today = now.date()
day_start = datetime.combine(today, datetime.min.time()).replace(hour=4)
if now.hour < 4:
day_start -= timedelta(days=1)
return int(day_start.timestamp())
def get_day_midnight_timestamp(self):
now = datetime.now()
today = now.date()
midnight = datetime.combine(today, datetime.min.time())
if now.hour < 4:
midnight -= timedelta(days=1)
return int(midnight.timestamp())
def query_sensors(self, cursor):
data = {}
day_start_ts = self.get_day_start_timestamp()
now_ts = int(datetime.now().timestamp())
day_midnight = self.get_day_midnight_timestamp()
now_ms = int(time.time() * 1000)
# Sleep stage (first)
stage_code = None
stage_timestamp_ms = None
try:
cursor.execute("""
SELECT STAGE, TIMESTAMP FROM XIAOMI_SLEEP_STAGE_SAMPLE
WHERE DEVICE_ID = ? ORDER BY TIMESTAMP DESC LIMIT 1
""", (self.device_id,))
row = cursor.fetchone()
if row:
stage_code, stage_timestamp_ms = row
except Exception as e:
logger.debug(f"Sleep stage query failed: {e}")
# Recent HR (5min now, tighter)
avg_recent_hr = None
try:
cursor.execute("""
SELECT AVG(HEART_RATE) FROM XIAOMI_ACTIVITY_SAMPLE
WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255
AND TIMESTAMP >= ?
""", (self.device_id, now_ts - 300)) # 5 minutes
row = cursor.fetchone()
if row and row[0]:
avg_recent_hr = row[0]
except Exception as e:
logger.debug(f"Recent HR query failed: {e}")
# Resting HR
resting_hr = None
try:
cursor.execute("""
SELECT HR_RESTING FROM XIAOMI_DAILY_SUMMARY_SAMPLE
WHERE DEVICE_ID = ? AND HR_RESTING > 0 ORDER BY TIMESTAMP DESC LIMIT 1
""", (self.device_id,))
row = cursor.fetchone()
if row and row[0]:
resting_hr = row[0]
except Exception as e:
logger.debug(f"Resting HR query failed: {e}")
# Steps, battery, etc. (unchanged)
try:
cursor.execute("SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?", (self.device_id, day_start_ts, now_ts))
data["daily_steps"] = cursor.fetchone()[0] or 0
except Exception as e:
logger.debug(f"Daily steps query failed: {e}")
try:
now = datetime.now()
week_start = now.date() - timedelta(days=now.date().weekday())
week_start_time = datetime.combine(week_start, datetime.min.time()).replace(hour=4)
if now.date().weekday() == 0 and now.hour < 4:
week_start_time -= timedelta(days=7)
cursor.execute("SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?", (self.device_id, int(week_start_time.timestamp()), now_ts))
data["weekly_steps"] = cursor.fetchone()[0] or 0
except Exception as e:
logger.debug(f"Weekly steps query failed: {e}")
try:
cursor.execute("SELECT LEVEL FROM BATTERY_LEVEL WHERE DEVICE_ID = ? ORDER BY TIMESTAMP DESC LIMIT 1", (self.device_id,))
row = cursor.fetchone()
if row:
data["battery_level"] = row[0]
except Exception as e:
logger.debug(f"Battery query failed: {e}")
try:
cursor.execute("SELECT HEART_RATE, TIMESTAMP FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 1", (self.device_id,))
row = cursor.fetchone()
if row:
data["heart_rate"] = row[0]
if row[1] > self.last_data_timestamp:
self.last_data_timestamp = row[1]
except Exception as e:
logger.debug(f"Heart rate query failed: {e}")
try:
cursor.execute("SELECT HR_RESTING, HR_MAX, HR_AVG, CALORIES FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1", (self.device_id, day_midnight * 1000))
row = cursor.fetchone()
if row:
if row[0]: data["hr_resting"] = row[0]
if row[1]: data["hr_max"] = row[1]
if row[2]: data["hr_avg"] = row[2]
if row[3]: data["calories"] = row[3]
except Exception as e:
logger.debug(f"Daily summary query failed: {e}")
# Enhanced Sleep Data
try:
day_ago_ts_ms = (int(time.time()) - 24 * 3600) * 1000
cursor.execute("""
SELECT TIMESTAMP, WAKEUP_TIME, TOTAL_DURATION,
DEEP_SLEEP_DURATION, LIGHT_SLEEP_DURATION, REM_SLEEP_DURATION
FROM XIAOMI_SLEEP_TIME_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ?
ORDER BY TIMESTAMP DESC
""", (self.device_id, day_ago_ts_ms))
sessions = cursor.fetchall()
total_sleep_min = 0
last_wakeup_ms = None
for sess_row in sessions:
sess_start, sess_wake, sess_total, sess_deep, sess_light, sess_rem = sess_row
if last_wakeup_ms is not None:
gap_hours = (last_wakeup_ms - sess_wake) / 1000 / 3600 if sess_wake else 999
if gap_hours > 2:
break
sess_min = 0
if sess_deep or sess_light or sess_rem:
sess_min = (sess_deep or 0) + (sess_light or 0) + (sess_rem or 0)
elif sess_total:
sess_min = sess_total
total_sleep_min += sess_min
last_wakeup_ms = sess_start
if total_sleep_min > 0:
data["sleep_duration"] = round(total_sleep_min / 60.0, 2)
# Current session
cursor.execute("""
SELECT TOTAL_DURATION, IS_AWAKE, WAKEUP_TIME, TIMESTAMP,
DEEP_SLEEP_DURATION, LIGHT_SLEEP_DURATION, REM_SLEEP_DURATION, AWAKE_DURATION
FROM XIAOMI_SLEEP_TIME_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?
ORDER BY TIMESTAMP DESC LIMIT 1
""", (self.device_id, day_ago_ts_ms, now_ms))
row = cursor.fetchone()
if row:
(total_duration, is_awake_flag, wakeup_raw, sleep_start_ms, deep_dur, light_dur, rem_dur, awake_dur) = row
in_sleep_session = sleep_start_ms and wakeup_raw and sleep_start_ms <= now_ms < wakeup_raw
data["in_sleep_session"] = in_sleep_session
is_awake = self._compute_awake(
is_awake_flag, wakeup_raw, stage_code, stage_timestamp_ms, now_ms,
avg_recent_hr, resting_hr
)
data["is_awake"] = is_awake
self.is_sleeping = not is_awake
# Primary sleep stage (relaxed recency: 24h)
stage_names = {0: "not_sleep", 1: "unknown", 2: "deep_sleep", 3: "light_sleep", 4: "rem_sleep", 5: "awake"}
if stage_code is not None and stage_timestamp_ms:
stage_age_minutes = (now_ms - stage_timestamp_ms) / 1000 / 60
if in_sleep_session or stage_age_minutes <= 1440: # 24h
data["sleep_stage"] = stage_names.get(stage_code, f"unknown_{stage_code}")
data["sleep_stage_code"] = stage_code
else:
data["sleep_stage"] = "not_sleep"
data["sleep_stage_code"] = 0
else:
data["sleep_stage"] = "not_sleep" if is_awake else "unknown"
data["sleep_stage_code"] = 0
# NEW: HR-derived sleep stage (always available)
hr_stage = self._hr_to_sleep_stage(avg_recent_hr, resting_hr)
data["sleep_stage_hr"] = hr_stage
data["avg_recent_hr"] = round(avg_recent_hr, 1) if avg_recent_hr else None
# Debug logging
logger.info(f"Sleep debug: in_session={in_sleep_session}, is_awake={is_awake}, "
f"stage={stage_code}@{stage_age_minutes:.0f}min if stage_timestamp_ms else 'none'}, "
f"hr_stage={hr_stage}, recent_hr={avg_recent_hr:.0f}, resting={resting_hr}")
else:
data["is_awake"] = True
data["in_sleep_session"] = False
data["sleep_stage"] = "not_sleep"
data["sleep_stage_code"] = 0
data["sleep_stage_hr"] = "awake"
self.is_sleeping = False
except Exception as e:
logger.debug(f"Sleep query failed: {e}")
# Weight
try:
cursor.execute("SELECT WEIGHT_KG FROM MI_SCALE_WEIGHT_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1")
row = cursor.fetchone()
if row and row[0]:
data["weight"] = round(row[0], 1)
except Exception as e:
logger.debug(f"Weight query failed: {e}")
data["server_time"] = datetime.now().astimezone().isoformat()
data["device"] = self.device_alias
return data
def publish_discovery(self):
device_info = {
"identifiers": [self.device_name],
"name": "Fitness Tracker",
"model": self.device_alias,
"manufacturer": "Gadgetbridge",
}
sensors = [
("device", "Device", None, "mdi:watch", None, None),
("daily_steps", "Daily Steps", "steps", "mdi:walk", "total_increasing", None),
("weekly_steps", "Weekly Steps", "steps", "mdi:walk", "total", None),
("battery_level", "Battery", "%", "mdi:battery", None, "battery"),
("heart_rate", "Heart Rate", "bpm", "mdi:heart-pulse", "measurement", None),
("hr_resting", "Resting HR", "bpm", "mdi:heart-pulse", "measurement", None),
("hr_max", "Max HR", "bpm", "mdi:heart-pulse", "measurement", None),
("hr_avg", "Average HR", "bpm", "mdi:heart-pulse", "measurement", None),
("avg_recent_hr", "Recent Avg HR", "bpm", "mdi:heart-pulse", "measurement", None),
("calories", "Calories", "kcal", "mdi:fire", "total_increasing", None),
("sleep_duration", "Sleep Duration", "h", "mdi:sleep", "measurement", None),
("is_awake", "Is Awake", None, "mdi:power-sleep", None, None),
("in_sleep_session", "In Sleep Session", None, "mdi:bed", None, None),
("sleep_stage", "Sleep Stage (Band)", None, "mdi:sleep-cycle", None, None),
("sleep_stage_code", "Sleep Stage Code", None, "mdi:numeric", "measurement", None),
("sleep_stage_hr", "Sleep Stage (HR)", None, "mdi:heart-pulse-outline", None, None),
("weight", "Weight", "kg", "mdi:scale-bathroom", "measurement", None),
("server_time", "Last Update", None, "mdi:clock-outline", None, "timestamp"),
]
for sensor_id, name, unit, icon, state_class, device_class in sensors:
config = {
"name": f"Fitness Tracker {name}",
"unique_id": f"{self.device_name}_{sensor_id}",
"state_topic": f"gadgetbridge/{self.device_name}/{sensor_id}",
"device": device_info,
"icon": icon,
}
if unit:
config["unit_of_measurement"] = unit
if state_class:
config["state_class"] = state_class
if device_class:
config["device_class"] = device_class
topic = f"homeassistant/sensor/{self.device_name}_{sensor_id}/config"
self.mqtt_client.publish(topic, json.dumps(config), qos=1, retain=True)
logger.info("Published Home Assistant discovery configs")
def publish_data(self, data):
for key, value in data.items():
if value is not None: # Skip None values
topic = f"gadgetbridge/{self.device_name}/{key}"
self.mqtt_client.publish(topic, str(value), qos=1, retain=True)
sleep_info = f"sleep={data.get('sleep_duration', 0)}h" if 'sleep_duration' in data else "sleep=N/A"
awake_info = f"awake={data.get('is_awake', 'N/A')}"
logger.info(f"Published: steps={data.get('daily_steps', 'N/A')}, hr={data.get('heart_rate', 'N/A')}, "
f"battery={data.get('battery_level', 'N/A')}%, {sleep_info}, {awake_info}, "
f"stage={data.get('sleep_stage', 'N/A')}/{data.get('sleep_stage_hr', 'N/A')}")
def process_database(self):
if not self.db_path or not os.path.exists(self.db_path):
logger.warning("No database file found")
return False
try:
conn = sqlite3.connect(self.db_path, timeout=10.0)
cursor = conn.cursor()
self.device_id, self.device_alias, self.device_mac = self.get_device_info(cursor)
if not self.device_id:
logger.warning("No fitness device found")
conn.close()
return False
data = self.query_sensors(cursor)
conn.close()
if data:
self.publish_data(data)
return True
except Exception as e:
logger.error(f"Database error: {e}")
return False
def check_and_recover_connection(self):
if self.last_data_timestamp == 0:
return
threshold = STALE_THRESHOLD_SECONDS * 3 if self.is_sleeping else STALE_THRESHOLD_SECONDS
current_ts = int(time.time())
time_diff = current_ts - self.last_data_timestamp
if time_diff > threshold:
logger.warning(f"Data stale ({time_diff}s > {threshold}s). Triggering recovery...")
if self.device_mac:
trigger_bluetooth_reconnect(self.device_mac)
self.last_data_timestamp = current_ts
else:
logger.warning("Cannot recover: No device MAC")
def run(self):
export_dir = self.config.get("export_dir", GB_EXPORT_DIR)
interval = self.config.get("publish_interval", PUBLISH_INTERVAL)
logger.info(f"Starting Gadgetbridge MQTT Publisher (Enhanced Sleep)")
logger.info(f"Export directory: {export_dir}, interval: {interval}s")
os.makedirs(export_dir, exist_ok=True)
if not self.connect_mqtt():
logger.error("Failed to connect to MQTT. Exiting.")
sys.exit(1)
discovery_published = False
try:
while self.running:
current_time = time.time()
if current_time - self.last_publish_time >= interval:
try:
self.check_and_recover_connection()
logger.info("Triggering Gadgetbridge sync...")
trigger_gadgetbridge_sync(self.device_mac)
time.sleep(5)
self.db_path = find_latest_db(export_dir)
if not self.db_path:
logger.warning("No database found, retrying...")
time.sleep(3)
self.db_path = find_latest_db(export_dir)
if self.db_path:
logger.info(f"Using database: {os.path.basename(self.db_path)}")
if not discovery_published:
try:
conn = sqlite3.connect(self.db_path, timeout=5.0)
cursor = conn.cursor()
self.device_id, self.device_alias, self.device_mac = self.get_device_info(cursor)
conn.close()
if self.device_mac:
logger.info(f"Device MAC: {self.device_mac}")
except Exception as e:
logger.warning(f"Could not read device info: {e}")
self.publish_discovery()
discovery_published = True
self.process_database()
else:
logger.warning(f"No database in {export_dir}")
except Exception as e:
logger.error(f"Sync cycle failed: {e}")
self.last_publish_time = current_time
logger.info(f"Next publish in {interval}s...")
time.sleep(10)
except KeyboardInterrupt:
logger.info("Interrupted by user")
finally:
logger.info("Cleaning up...")
self.disconnect_mqtt()
def main():
config = load_config()
publisher = GadgetbridgeMQTT(config)
publisher.run()
if __name__ == "__main__":
main()