521 lines
20 KiB
Python
521 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
|
|
"""
|
|
Gadgetbridge MQTT Publisher for Termux
|
|
Watches for Gadgetbridge exports and publishes sensor data to Home Assistant via MQTT
|
|
"""
|
|
|
|
import os
|
|
import sqlite3
|
|
import json
|
|
import logging
|
|
import sys
|
|
import time
|
|
import re
|
|
import signal
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
# MQTT library - paho-mqtt works well on Termux
|
|
try:
|
|
import paho.mqtt.client as mqtt
|
|
except ImportError:
|
|
print("Error: paho-mqtt not installed. Run: pip install paho-mqtt")
|
|
sys.exit(1)
|
|
|
|
# --- Configuration ---
|
|
CONFIG_FILE = os.path.expanduser("~/.config/gadgetbridge_mqtt/config.json")
|
|
GB_EXPORT_DIR = "/storage/emulated/0/Documents/GB_Export"
|
|
PUBLISH_INTERVAL = 300 # 5 minutes
|
|
|
|
# --- Logging ---
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
stream=sys.stdout
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def load_config():
|
|
"""Load MQTT configuration from config file"""
|
|
if not os.path.exists(CONFIG_FILE):
|
|
logger.error(f"Config file not found: {CONFIG_FILE}")
|
|
logger.error("Run setup.py first to configure MQTT settings")
|
|
sys.exit(1)
|
|
|
|
with open(CONFIG_FILE, "r") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def send_gadgetbridge_intent(action):
|
|
"""Send broadcast intent to Gadgetbridge via Termux API"""
|
|
cmd = f"am broadcast -a {action} -p nodomain.freeyourgadget.gadgetbridge"
|
|
logger.info(f"Sending intent: {action}")
|
|
result = os.system(cmd)
|
|
if result != 0:
|
|
logger.warning(f"Intent may have failed (exit code: {result})")
|
|
|
|
|
|
def trigger_gadgetbridge_sync():
|
|
"""Trigger Gadgetbridge to sync data from band and export database"""
|
|
# First sync data from the band
|
|
send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.ACTIVITY_SYNC")
|
|
time.sleep(5) # Give it time to sync
|
|
# Then trigger database export
|
|
send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.TRIGGER_EXPORT")
|
|
|
|
|
|
def find_latest_db(export_dir):
|
|
"""Find the most recent Gadgetbridge database file in export directory"""
|
|
if not os.path.exists(export_dir):
|
|
return None
|
|
|
|
db_files = []
|
|
for f in os.listdir(export_dir):
|
|
if f.endswith(".db") and "Gadgetbridge" in f:
|
|
path = os.path.join(export_dir, f)
|
|
db_files.append((path, os.path.getmtime(path)))
|
|
|
|
if not db_files:
|
|
return None
|
|
|
|
# Return the most recently modified database
|
|
db_files.sort(key=lambda x: x[1], reverse=True)
|
|
return db_files[0][0]
|
|
|
|
|
|
class GadgetbridgeMQTT:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.db_path = None
|
|
self.device_name = "fitness_tracker" # MQTT identifier (always fitness_tracker)
|
|
self.device_alias = "Unknown" # Actual device name for display
|
|
self.device_id = None # Track device ID for filtering queries
|
|
self.mqtt_client = None
|
|
self.last_publish_time = 0
|
|
self.last_db_mtime = 0
|
|
self.running = True
|
|
|
|
# Register signal handlers for graceful shutdown
|
|
signal.signal(signal.SIGTERM, self._signal_handler)
|
|
signal.signal(signal.SIGINT, self._signal_handler)
|
|
|
|
def _signal_handler(self, signum, frame):
|
|
"""Handle shutdown signals gracefully"""
|
|
logger.info(f"Received signal {signum}. Shutting down...")
|
|
self.running = False
|
|
|
|
def connect_mqtt(self):
|
|
"""Connect to MQTT broker"""
|
|
# Use callback API version 2 to avoid deprecation warning
|
|
try:
|
|
self.mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
|
|
except (AttributeError, TypeError):
|
|
# Fallback for older paho-mqtt versions
|
|
self.mqtt_client = mqtt.Client()
|
|
|
|
if self.config.get("mqtt_username") and self.config.get("mqtt_password"):
|
|
self.mqtt_client.username_pw_set(
|
|
self.config["mqtt_username"],
|
|
self.config["mqtt_password"]
|
|
)
|
|
|
|
try:
|
|
self.mqtt_client.connect(
|
|
self.config["mqtt_broker"],
|
|
self.config.get("mqtt_port", 1883),
|
|
60
|
|
)
|
|
self.mqtt_client.loop_start()
|
|
# Wait for connection to establish
|
|
time.sleep(1)
|
|
logger.info(f"Connected to MQTT broker: {self.config['mqtt_broker']}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"MQTT connection failed: {e}")
|
|
return False
|
|
|
|
def disconnect_mqtt(self):
|
|
"""Disconnect from MQTT broker"""
|
|
if self.mqtt_client:
|
|
self.mqtt_client.loop_stop()
|
|
self.mqtt_client.disconnect()
|
|
|
|
def get_device_info(self, cursor):
|
|
"""Get device ID and alias from database - picks device with most recent activity"""
|
|
try:
|
|
# Find the device with the most recent activity data
|
|
# This ensures we get the currently active band, not an old one
|
|
cursor.execute("""
|
|
SELECT d._id, d.ALIAS, d.NAME FROM DEVICE d
|
|
WHERE (LOWER(d.NAME) LIKE '%band%' OR LOWER(d.NAME) LIKE '%watch%')
|
|
ORDER BY d._id DESC
|
|
LIMIT 1
|
|
""")
|
|
row = cursor.fetchone()
|
|
if row:
|
|
device_id = row[0]
|
|
# Get actual device name for display
|
|
device_alias = row[1] if row[1] else row[2] # Use ALIAS, fallback to NAME
|
|
logger.info(f"Selected device: ID={device_id}, Name={device_alias}")
|
|
return device_id, device_alias
|
|
except Exception as e:
|
|
logger.error(f"Error getting device info: {e}")
|
|
return None, "Unknown"
|
|
|
|
def get_day_start_timestamp(self):
|
|
"""Get timestamp for start of current day (4am)"""
|
|
now = datetime.now()
|
|
today = now.date()
|
|
day_start = datetime.combine(today, datetime.min.time()).replace(hour=4)
|
|
if now.hour < 4:
|
|
day_start -= timedelta(days=1)
|
|
return int(day_start.timestamp())
|
|
|
|
def get_day_midnight_timestamp(self):
|
|
"""Get midnight timestamp in seconds for daily summary queries"""
|
|
now = datetime.now()
|
|
today = now.date()
|
|
midnight = datetime.combine(today, datetime.min.time())
|
|
if now.hour < 4:
|
|
midnight -= timedelta(days=1)
|
|
return int(midnight.timestamp())
|
|
|
|
def query_sensors(self, cursor):
|
|
"""Query all sensor data from database - filtered by device_id"""
|
|
data = {}
|
|
day_start_ts = self.get_day_start_timestamp()
|
|
now_ts = int(datetime.now().timestamp())
|
|
day_midnight = self.get_day_midnight_timestamp()
|
|
|
|
# Daily Steps (filtered by device)
|
|
try:
|
|
cursor.execute(
|
|
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?",
|
|
(self.device_id, day_start_ts, now_ts)
|
|
)
|
|
data["daily_steps"] = cursor.fetchone()[0] or 0
|
|
except Exception as e:
|
|
logger.debug(f"Daily steps query failed: {e}")
|
|
|
|
# Weekly Steps (filtered by device)
|
|
try:
|
|
now = datetime.now()
|
|
week_start = now.date() - timedelta(days=now.date().weekday())
|
|
week_start_time = datetime.combine(week_start, datetime.min.time()).replace(hour=4)
|
|
if now.date().weekday() == 0 and now.hour < 4:
|
|
week_start_time -= timedelta(days=7)
|
|
cursor.execute(
|
|
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? AND TIMESTAMP <= ?",
|
|
(self.device_id, int(week_start_time.timestamp()), now_ts)
|
|
)
|
|
data["weekly_steps"] = cursor.fetchone()[0] or 0
|
|
except Exception as e:
|
|
logger.debug(f"Weekly steps query failed: {e}")
|
|
|
|
# Battery Level (filtered by device)
|
|
try:
|
|
cursor.execute(
|
|
"SELECT LEVEL FROM BATTERY_LEVEL WHERE DEVICE_ID = ? ORDER BY TIMESTAMP DESC LIMIT 1",
|
|
(self.device_id,)
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
data["battery_level"] = row[0]
|
|
except Exception as e:
|
|
logger.debug(f"Battery query failed: {e}")
|
|
|
|
# Latest Heart Rate (filtered by device)
|
|
try:
|
|
cursor.execute(
|
|
"SELECT HEART_RATE FROM XIAOMI_ACTIVITY_SAMPLE WHERE DEVICE_ID = ? AND HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 1",
|
|
(self.device_id,)
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
data["heart_rate"] = row[0]
|
|
except Exception as e:
|
|
logger.debug(f"Heart rate query failed: {e}")
|
|
|
|
# Daily Summary Data (filtered by device)
|
|
# Note: XIAOMI_DAILY_SUMMARY_SAMPLE uses MILLISECONDS timestamps
|
|
try:
|
|
cursor.execute(
|
|
"SELECT HR_RESTING, HR_MAX, HR_AVG, CALORIES FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE DEVICE_ID = ? AND TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1",
|
|
(self.device_id, day_midnight * 1000) # Convert to milliseconds
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
if row[0]: data["hr_resting"] = row[0]
|
|
if row[1]: data["hr_max"] = row[1]
|
|
if row[2]: data["hr_avg"] = row[2]
|
|
if row[3]: data["calories"] = row[3]
|
|
except Exception as e:
|
|
logger.debug(f"Daily summary query failed: {e}")
|
|
|
|
# Sleep Data (filtered by device)
|
|
# Note: XIAOMI_SLEEP_TIME_SAMPLE uses MILLISECONDS timestamps
|
|
try:
|
|
day_ago_ts_ms = (int(time.time()) - 24 * 3600) * 1000 # 24h ago in milliseconds
|
|
now_ms = int(time.time()) * 1000 # Current time in milliseconds
|
|
|
|
cursor.execute(
|
|
"""
|
|
SELECT TOTAL_DURATION, IS_AWAKE, WAKEUP_TIME
|
|
FROM XIAOMI_SLEEP_TIME_SAMPLE
|
|
WHERE DEVICE_ID = ? AND TIMESTAMP >= ?
|
|
ORDER BY TIMESTAMP DESC
|
|
LIMIT 1
|
|
""",
|
|
(self.device_id, day_ago_ts_ms)
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
total_duration, is_awake_flag, wakeup_raw = row
|
|
|
|
# Convert duration to hours
|
|
if total_duration is not None:
|
|
data["sleep_duration"] = round(total_duration / 60.0, 2)
|
|
|
|
# Determine if user is awake:
|
|
# 1. If IS_AWAKE flag is explicitly set to 1, user is awake
|
|
# 2. If WAKEUP_TIME exists and is in the past, user is awake
|
|
# 3. Otherwise, assume still sleeping
|
|
if is_awake_flag == 1:
|
|
is_awake = True
|
|
elif wakeup_raw is not None and wakeup_raw <= now_ms:
|
|
# WAKEUP_TIME is in the past = user has woken up
|
|
is_awake = True
|
|
else:
|
|
is_awake = False
|
|
|
|
data["is_awake"] = is_awake
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Sleep query failed: {e}")
|
|
|
|
# Sleep Stage Data (current stage)
|
|
# Note: XIAOMI_SLEEP_STAGE_SAMPLE uses MILLISECONDS timestamps
|
|
try:
|
|
cursor.execute(
|
|
"""
|
|
SELECT STAGE, TIMESTAMP
|
|
FROM XIAOMI_SLEEP_STAGE_SAMPLE
|
|
WHERE DEVICE_ID = ?
|
|
ORDER BY TIMESTAMP DESC
|
|
LIMIT 1
|
|
""",
|
|
(self.device_id,)
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
stage_code, stage_timestamp = row
|
|
|
|
# Sleep stage codes from Gadgetbridge SleepDetailsParser.java:
|
|
# 0: NOT_SLEEP, 1: N/A (unknown), 2: DEEP_SLEEP,
|
|
# 3: LIGHT_SLEEP, 4: REM_SLEEP, 5: AWAKE
|
|
stage_names = {
|
|
0: "not_sleep",
|
|
1: "unknown",
|
|
2: "deep_sleep",
|
|
3: "light_sleep",
|
|
4: "rem_sleep",
|
|
5: "awake"
|
|
}
|
|
|
|
data["sleep_stage"] = stage_names.get(stage_code, f"unknown_{stage_code}")
|
|
data["sleep_stage_code"] = stage_code
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Sleep stage query failed: {e}")
|
|
|
|
# Weight (not device-specific, from scale)
|
|
try:
|
|
cursor.execute("SELECT WEIGHT_KG FROM MI_SCALE_WEIGHT_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1")
|
|
row = cursor.fetchone()
|
|
if row and row[0]:
|
|
data["weight"] = round(row[0], 1) # Round to 1 decimal
|
|
except Exception as e:
|
|
logger.debug(f"Weight query failed: {e}")
|
|
|
|
# Server time
|
|
data["server_time"] = datetime.now().astimezone().isoformat()
|
|
|
|
# Device name (actual band name)
|
|
data["device"] = self.device_alias
|
|
|
|
return data
|
|
|
|
def publish_discovery(self):
|
|
"""Publish Home Assistant MQTT discovery configs"""
|
|
device_info = {
|
|
"identifiers": [self.device_name],
|
|
"name": "Fitness Tracker",
|
|
"model": self.device_alias,
|
|
"manufacturer": "Gadgetbridge",
|
|
}
|
|
|
|
sensors = [
|
|
("device", "Device", None, "mdi:watch", None, None),
|
|
("daily_steps", "Daily Steps", "steps", "mdi:walk", "total_increasing", None),
|
|
("weekly_steps", "Weekly Steps", "steps", "mdi:walk", "total", None),
|
|
("battery_level", "Battery", "%", "mdi:battery", None, "battery"),
|
|
("heart_rate", "Heart Rate", "bpm", "mdi:heart-pulse", "measurement", None),
|
|
("hr_resting", "Resting HR", "bpm", "mdi:heart-pulse", "measurement", None),
|
|
("hr_max", "Max HR", "bpm", "mdi:heart-pulse", "measurement", None),
|
|
("hr_avg", "Average HR", "bpm", "mdi:heart-pulse", "measurement", None),
|
|
("calories", "Calories", "kcal", "mdi:fire", "total_increasing", None),
|
|
("sleep_duration", "Sleep Duration", "h", "mdi:sleep", "measurement", None),
|
|
("is_awake", "Is Awake", None, "mdi:power-sleep", None, None),
|
|
("sleep_stage", "Sleep Stage", None, "mdi:sleep-cycle", "measurement", None),
|
|
("sleep_stage_code", "Sleep Stage Code", None, "mdi:numeric", "measurement", None),
|
|
("weight", "Weight", "kg", "mdi:scale-bathroom", "measurement", None),
|
|
("server_time", "Last Update", None, "mdi:clock-outline", None, "timestamp"),
|
|
]
|
|
|
|
for sensor_id, name, unit, icon, state_class, device_class in sensors:
|
|
config = {
|
|
"name": f"Fitness Tracker {name}",
|
|
"unique_id": f"{self.device_name}_{sensor_id}",
|
|
"state_topic": f"gadgetbridge/{self.device_name}/{sensor_id}",
|
|
"device": device_info,
|
|
"icon": icon,
|
|
}
|
|
if unit:
|
|
config["unit_of_measurement"] = unit
|
|
if state_class:
|
|
config["state_class"] = state_class
|
|
if device_class:
|
|
config["device_class"] = device_class
|
|
|
|
topic = f"homeassistant/sensor/{self.device_name}_{sensor_id}/config"
|
|
self.mqtt_client.publish(topic, json.dumps(config), qos=1, retain=True)
|
|
|
|
logger.info("Published Home Assistant discovery configs")
|
|
|
|
def publish_data(self, data):
|
|
"""Publish sensor data to MQTT"""
|
|
for key, value in data.items():
|
|
topic = f"gadgetbridge/{self.device_name}/{key}"
|
|
# Use retain=True so HA gets values on restart
|
|
self.mqtt_client.publish(topic, str(value), qos=1, retain=True)
|
|
|
|
logger.info(f"Published: steps={data.get('daily_steps', 'N/A')}, "
|
|
f"hr={data.get('heart_rate', 'N/A')}, "
|
|
f"battery={data.get('battery_level', 'N/A')}%")
|
|
|
|
def process_database(self):
|
|
"""Read database and publish data"""
|
|
if not self.db_path or not os.path.exists(self.db_path):
|
|
logger.warning("No database file found")
|
|
return False
|
|
|
|
try:
|
|
conn = sqlite3.connect(self.db_path, timeout=10.0)
|
|
cursor = conn.cursor()
|
|
|
|
# Get device ID and actual name
|
|
self.device_id, self.device_alias = self.get_device_info(cursor)
|
|
|
|
if not self.device_id:
|
|
logger.warning("No fitness device found in database")
|
|
conn.close()
|
|
return False
|
|
|
|
# Query all sensors
|
|
data = self.query_sensors(cursor)
|
|
conn.close()
|
|
|
|
if data:
|
|
self.publish_data(data)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Database error: {e}")
|
|
|
|
return False
|
|
|
|
def run(self):
|
|
"""Main loop"""
|
|
export_dir = self.config.get("export_dir", GB_EXPORT_DIR)
|
|
interval = self.config.get("publish_interval", PUBLISH_INTERVAL)
|
|
|
|
logger.info(f"Starting Gadgetbridge MQTT Publisher")
|
|
logger.info(f"Export directory: {export_dir}")
|
|
logger.info(f"Publish interval: {interval}s")
|
|
|
|
# Ensure export directory exists
|
|
os.makedirs(export_dir, exist_ok=True)
|
|
|
|
# Connect to MQTT
|
|
if not self.connect_mqtt():
|
|
logger.error("Failed to connect to MQTT. Exiting.")
|
|
sys.exit(1)
|
|
|
|
discovery_published = False
|
|
|
|
try:
|
|
while self.running:
|
|
current_time = time.time()
|
|
|
|
# Check if it's time to trigger sync and publish
|
|
if current_time - self.last_publish_time >= interval:
|
|
try:
|
|
logger.info("Triggering Gadgetbridge sync...")
|
|
trigger_gadgetbridge_sync()
|
|
|
|
# Wait for export to complete
|
|
time.sleep(10)
|
|
|
|
# Find latest database with retry
|
|
self.db_path = find_latest_db(export_dir)
|
|
if not self.db_path:
|
|
logger.warning(f"No database found, retrying in 3s...")
|
|
time.sleep(3)
|
|
self.db_path = find_latest_db(export_dir)
|
|
|
|
if self.db_path:
|
|
logger.info(f"Using database: {os.path.basename(self.db_path)}")
|
|
|
|
# Publish discovery on first successful read
|
|
if not discovery_published:
|
|
# Need to read device info first
|
|
try:
|
|
conn = sqlite3.connect(self.db_path, timeout=5.0)
|
|
cursor = conn.cursor()
|
|
self.device_id, self.device_alias = self.get_device_info(cursor)
|
|
conn.close()
|
|
except Exception as e:
|
|
logger.warning(f"Could not read device info: {e}")
|
|
self.publish_discovery()
|
|
discovery_published = True
|
|
|
|
self.process_database()
|
|
else:
|
|
logger.warning(f"No database found in {export_dir}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Sync cycle failed: {e}")
|
|
|
|
self.last_publish_time = current_time
|
|
logger.info(f"Next publish in {interval}s...")
|
|
|
|
time.sleep(10) # Check every 10 seconds
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Interrupted by user")
|
|
finally:
|
|
logger.info("Cleaning up...")
|
|
self.disconnect_mqtt()
|
|
|
|
|
|
def main():
|
|
config = load_config()
|
|
publisher = GadgetbridgeMQTT(config)
|
|
publisher.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|