383 lines
14 KiB
Python
383 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Gadgetbridge MQTT Step Counter Integration
|
|
Extracts steps data from Gadgetbridge SQLite database and publishes to Home Assistant via MQTT
|
|
"""
|
|
|
|
import os
|
|
import sqlite3
|
|
import json
|
|
import time
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional
|
|
import asyncio
|
|
import aiomqtt
|
|
import re
|
|
|
|
|
|
class GadgetbridgeMQTTPublisher:
|
|
def __init__(self):
|
|
self.setup_logging()
|
|
self.db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge.db")
|
|
raw_name = os.getenv("DEVICE_NAME", "fitness_tracker")
|
|
# Sanitize device_name: lowercase, replace spaces and non-word chars with _
|
|
self.device_name = re.sub(r"\W+", "_", raw_name).lower()
|
|
self.load_config()
|
|
self.mqtt_client = None
|
|
self.publish_interval = int(
|
|
os.getenv("PUBLISH_INTERVAL_SECONDS", "300")
|
|
) # <-- Add this
|
|
|
|
def setup_logging(self):
|
|
"""Setup logging configuration"""
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
handlers=[
|
|
logging.FileHandler("/app/logs/gadgetbridge_mqtt.log"),
|
|
logging.StreamHandler(),
|
|
],
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def load_config(self):
|
|
"""Load MQTT configuration from environment variables"""
|
|
self.mqtt_config = {
|
|
"broker": os.getenv("MQTT_BROKER", "localhost"),
|
|
"port": int(os.getenv("MQTT_PORT", "1883")),
|
|
"username": os.getenv("MQTT_USERNAME", ""),
|
|
"password": os.getenv("MQTT_PASSWORD", ""),
|
|
}
|
|
|
|
async def publish_home_assistant_discovery(
|
|
self, entity_type: str, entity_id: str, config: Dict
|
|
):
|
|
"""Publish Home Assistant MQTT discovery configuration asynchronously"""
|
|
discovery_topic = (
|
|
f"homeassistant/{entity_type}/{self.device_name}_{entity_id}/config"
|
|
)
|
|
try:
|
|
await self.mqtt_client.publish(
|
|
discovery_topic, json.dumps(config), qos=1, retain=True
|
|
)
|
|
self.logger.info(f"Published discovery config for {entity_id}")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to publish discovery config: {e}")
|
|
|
|
async def setup_home_assistant_entities(self):
|
|
"""Setup Home Assistant entities via MQTT discovery"""
|
|
device_info = {
|
|
"identifiers": [self.device_name],
|
|
"name": f"Gadgetbridge {self.device_name.replace('_', ' ').title()}",
|
|
"model": "Fitness Tracker",
|
|
"manufacturer": "Gadgetbridge",
|
|
}
|
|
|
|
# Daily steps sensor
|
|
steps_config = {
|
|
"name": f"{self.device_name.replace('_', ' ').title()} Daily Steps",
|
|
"unique_id": f"{self.device_name}_daily_steps",
|
|
"state_topic": f"gadgetbridge/{self.device_name}/steps/daily",
|
|
"unit_of_measurement": "steps",
|
|
"icon": "mdi:walk",
|
|
"device": device_info,
|
|
"state_class": "total_increasing",
|
|
}
|
|
|
|
# Weekly steps sensor
|
|
weekly_steps_config = {
|
|
"name": f"{self.device_name.replace('_', ' ').title()} Weekly Steps",
|
|
"unique_id": f"{self.device_name}_weekly_steps",
|
|
"state_topic": f"gadgetbridge/{self.device_name}/steps/weekly",
|
|
"unit_of_measurement": "steps",
|
|
"icon": "mdi:walk",
|
|
"device": device_info,
|
|
"state_class": "total",
|
|
}
|
|
|
|
# Monthly steps sensor
|
|
monthly_steps_config = {
|
|
"name": f"{self.device_name.replace('_', ' ').title()} Monthly Steps",
|
|
"unique_id": f"{self.device_name}_monthly_steps",
|
|
"state_topic": f"gadgetbridge/{self.device_name}/steps/monthly",
|
|
"unit_of_measurement": "steps",
|
|
"icon": "mdi:walk",
|
|
"device": device_info,
|
|
"state_class": "total",
|
|
}
|
|
|
|
# Last sync sensor
|
|
last_sync_config = {
|
|
"name": f"{self.device_name.replace('_', ' ').title()} Last Sync",
|
|
"unique_id": f"{self.device_name}_last_sync",
|
|
"state_topic": f"gadgetbridge/{self.device_name}/last_sync",
|
|
"icon": "mdi:sync",
|
|
"device": device_info,
|
|
"device_class": "timestamp",
|
|
}
|
|
|
|
await self.publish_home_assistant_discovery(
|
|
"sensor", "daily_steps", steps_config
|
|
)
|
|
await self.publish_home_assistant_discovery(
|
|
"sensor", "weekly_steps", weekly_steps_config
|
|
)
|
|
await self.publish_home_assistant_discovery(
|
|
"sensor", "monthly_steps", monthly_steps_config
|
|
)
|
|
await self.publish_home_assistant_discovery(
|
|
"sensor", "last_sync", last_sync_config
|
|
)
|
|
|
|
def get_steps_data(self) -> Dict:
|
|
"""Extract steps data from Gadgetbridge database"""
|
|
if not os.path.exists(self.db_path):
|
|
self.logger.error(f"Database file not found: {self.db_path}")
|
|
return {}
|
|
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Get today's date
|
|
today = datetime.now().date()
|
|
week_start = today - timedelta(days=today.weekday())
|
|
month_start = today.replace(day=1)
|
|
|
|
# Convert to Unix timestamps
|
|
today_start = int(datetime.combine(today, datetime.min.time()).timestamp())
|
|
today_end = int(datetime.combine(today, datetime.max.time()).timestamp())
|
|
week_start_ts = int(
|
|
datetime.combine(week_start, datetime.min.time()).timestamp()
|
|
)
|
|
month_start_ts = int(
|
|
datetime.combine(month_start, datetime.min.time()).timestamp()
|
|
)
|
|
|
|
# Query daily steps
|
|
cursor.execute(
|
|
"""
|
|
SELECT SUM(STEPS) as daily_steps
|
|
FROM XIAOMI_ACTIVITY_SAMPLE
|
|
WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?
|
|
""",
|
|
(today_start, today_end),
|
|
)
|
|
|
|
daily_steps = cursor.fetchone()[0] or 0
|
|
|
|
# Query weekly steps
|
|
cursor.execute(
|
|
"""
|
|
SELECT SUM(STEPS) as weekly_steps
|
|
FROM XIAOMI_ACTIVITY_SAMPLE
|
|
WHERE TIMESTAMP >= ?
|
|
""",
|
|
(week_start_ts,),
|
|
)
|
|
|
|
weekly_steps = cursor.fetchone()[0] or 0
|
|
|
|
# Query monthly steps
|
|
cursor.execute(
|
|
"""
|
|
SELECT SUM(STEPS) as monthly_steps
|
|
FROM XIAOMI_ACTIVITY_SAMPLE
|
|
WHERE TIMESTAMP >= ?
|
|
""",
|
|
(month_start_ts,),
|
|
)
|
|
|
|
monthly_steps = cursor.fetchone()[0] or 0
|
|
|
|
# Get last sync timestamp
|
|
cursor.execute(
|
|
"""
|
|
SELECT MAX(TIMESTAMP) as last_sync
|
|
FROM XIAOMI_ACTIVITY_SAMPLE
|
|
"""
|
|
)
|
|
|
|
last_sync_ts = cursor.fetchone()[0]
|
|
last_sync = (
|
|
datetime.fromtimestamp(last_sync_ts).isoformat()
|
|
if last_sync_ts
|
|
else None
|
|
)
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
"daily_steps": daily_steps,
|
|
"weekly_steps": weekly_steps,
|
|
"monthly_steps": monthly_steps,
|
|
"last_sync": last_sync,
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error querying database: {e}")
|
|
return {}
|
|
|
|
async def publish_steps_data(self, data: Dict):
|
|
"""Publish steps data to MQTT asynchronously"""
|
|
if not data:
|
|
return
|
|
topics = {
|
|
"daily": f"gadgetbridge/{self.device_name}/steps/daily",
|
|
"weekly": f"gadgetbridge/{self.device_name}/steps/weekly",
|
|
"monthly": f"gadgetbridge/{self.device_name}/steps/monthly",
|
|
"last_sync": f"gadgetbridge/{self.device_name}/last_sync",
|
|
}
|
|
try:
|
|
await self.mqtt_client.publish(
|
|
topics["daily"], str(data["daily_steps"]), qos=1
|
|
)
|
|
await self.mqtt_client.publish(
|
|
topics["weekly"], str(data["weekly_steps"]), qos=1
|
|
)
|
|
await self.mqtt_client.publish(
|
|
topics["monthly"], str(data["monthly_steps"]), qos=1
|
|
)
|
|
if data["last_sync"]:
|
|
await self.mqtt_client.publish(
|
|
topics["last_sync"], data["last_sync"], qos=1
|
|
)
|
|
self.logger.info(
|
|
f"Published steps data: Daily={data['daily_steps']}, Weekly={data['weekly_steps']}, Monthly={data['monthly_steps']}"
|
|
)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to publish steps data: {e}")
|
|
|
|
async def run(self):
|
|
"""Main execution method (async)"""
|
|
self.logger.info("Starting Gadgetbridge MQTT Publisher")
|
|
try:
|
|
async with aiomqtt.Client(
|
|
hostname=self.mqtt_config["broker"],
|
|
port=self.mqtt_config["port"],
|
|
username=self.mqtt_config["username"] or None,
|
|
password=self.mqtt_config["password"] or None,
|
|
) as client:
|
|
self.mqtt_client = client
|
|
await self.setup_home_assistant_entities()
|
|
while True:
|
|
steps_data = self.get_steps_data()
|
|
if steps_data:
|
|
await self.publish_steps_data(steps_data)
|
|
self.logger.info(
|
|
f"Sleeping for {self.publish_interval} seconds before next publish..."
|
|
)
|
|
await asyncio.sleep(self.publish_interval)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to connect to MQTT broker: {e}")
|
|
|
|
|
|
def get_all_device_names(db_path):
|
|
"""Returns a list of all unique device names from the database."""
|
|
if not os.path.exists(db_path):
|
|
print(f"Database file not found: {db_path}")
|
|
return []
|
|
try:
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
# Try to find a devices table. If not, look for a device column in samples.
|
|
cursor.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='DEVICES'"
|
|
)
|
|
if cursor.fetchone():
|
|
cursor.execute("SELECT DISTINCT NAME FROM DEVICES")
|
|
names = [row[0] for row in cursor.fetchall()]
|
|
else:
|
|
# See if MI_BAND_ACTIVITY_SAMPLE has a device column
|
|
cursor.execute("PRAGMA table_info(MI_BAND_ACTIVITY_SAMPLE)")
|
|
columns = [row[1] for row in cursor.fetchall()]
|
|
names = []
|
|
if "DEVICE_NAME" in columns:
|
|
cursor.execute(
|
|
"SELECT DISTINCT DEVICE_NAME FROM MI_BAND_ACTIVITY_SAMPLE"
|
|
)
|
|
names = [row[0] for row in cursor.fetchall()]
|
|
conn.close()
|
|
return names
|
|
except Exception as e:
|
|
print(f"Error querying database: {e}")
|
|
return []
|
|
|
|
|
|
def print_db_context(db_path):
|
|
print(f"\nDatabase path: {db_path}")
|
|
if os.path.exists(db_path):
|
|
size_mb = os.path.getsize(db_path) / (1024 * 1024)
|
|
print(f"Database size: {size_mb:.2f} MB")
|
|
try:
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
# List tables
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
|
tables = [row[0] for row in cursor.fetchall()]
|
|
print(f"Tables in database: {tables}")
|
|
# Row counts for key tables
|
|
for table in tables:
|
|
try:
|
|
cursor.execute(f"SELECT COUNT(*) FROM {table}")
|
|
count = cursor.fetchone()[0]
|
|
print(f" {table}: {count} rows")
|
|
except Exception as e:
|
|
print(f" {table}: error counting rows ({e})")
|
|
# Show sample from MI_BAND_ACTIVITY_SAMPLE
|
|
if "MI_BAND_ACTIVITY_SAMPLE" in tables:
|
|
cursor.execute("SELECT * FROM MI_BAND_ACTIVITY_SAMPLE LIMIT 3")
|
|
rows = cursor.fetchall()
|
|
print("Sample rows from MI_BAND_ACTIVITY_SAMPLE:")
|
|
for row in rows:
|
|
print(" ", row)
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"Error reading database: {e}")
|
|
else:
|
|
print("Database file not found.")
|
|
|
|
|
|
def print_device_table(db_path):
|
|
try:
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT * FROM DEVICE")
|
|
rows = cursor.fetchall()
|
|
print("DEVICE table contents:")
|
|
for row in rows:
|
|
print(row)
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"Error reading DEVICE table: {e}")
|
|
|
|
|
|
# --- Main Entry Point ---
|
|
if __name__ == "__main__":
|
|
db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge")
|
|
print_db_context(db_path) # For debugging and context
|
|
print_device_table(db_path)
|
|
device_name = os.getenv("DEVICE_NAME", "").strip()
|
|
if not device_name or device_name.lower() == "unknown":
|
|
# Device name is not set or is explicitly 'unknown'
|
|
print(
|
|
"Device name is not set. Attempting to list all available device names from the database...\n"
|
|
)
|
|
device_names = get_all_device_names(db_path)
|
|
if device_names:
|
|
print("Available device names in this database:")
|
|
for n in device_names:
|
|
print(" -", n)
|
|
else:
|
|
print("No device names could be found in the database.")
|
|
print(
|
|
"\nWaiting 10 minutes (600 seconds) before terminating to allow user review..."
|
|
)
|
|
time.sleep(600)
|
|
print("Terminating script.")
|
|
exit(0)
|
|
# continuous publisher only runs if device_name is present and valid
|
|
publisher = GadgetbridgeMQTTPublisher()
|
|
asyncio.run(publisher.run())
|