remove device name

This commit is contained in:
Oliver Großkloß 2025-07-17 13:07:16 +02:00
parent bcf905b33b
commit 7ba2db2a6f

482
main.py
View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""
Gadgetbridge MQTT Step Counter Integration
Extracts steps data from Gadgetbridge SQLite database and publishes to Home Assistant via MQTT
Extracts sensor data from Gadgetbridge SQLite database and publishes to Home Assistant via MQTT
"""
import os
@ -10,7 +10,7 @@ import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Any
import asyncio
import aiomqtt
import re
@ -21,13 +21,75 @@ class GadgetbridgeMQTTPublisher:
self.setup_logging()
self.db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge.db")
raw_name = os.getenv("DEVICE_NAME", "fitness_tracker")
# Sanitize device_name: lowercase, replace spaces and non-word chars with _
self.device_name = re.sub(r"\W+", "_", raw_name).lower()
self.load_config()
self.mqtt_client = None
self.publish_interval = int(
os.getenv("PUBLISH_INTERVAL_SECONDS", "300")
) # <-- Add this
self.publish_interval = int(os.getenv("PUBLISH_INTERVAL_SECONDS", "300"))
# Define sensors here for easy extension
self.sensors = [
{
"name": "Daily Steps",
"unique_id": "daily_steps",
"state_topic": f"gadgetbridge/{self.device_name}/steps/daily",
"unit_of_measurement": "steps",
"icon": "mdi:walk",
"state_class": "total_increasing",
"query": self.query_daily_steps,
},
{
"name": "Weekly Steps",
"unique_id": "weekly_steps",
"state_topic": f"gadgetbridge/{self.device_name}/steps/weekly",
"unit_of_measurement": "steps",
"icon": "mdi:walk",
"state_class": "total",
"query": self.query_weekly_steps,
},
{
"name": "Monthly Steps",
"unique_id": "monthly_steps",
"state_topic": f"gadgetbridge/{self.device_name}/steps/monthly",
"unit_of_measurement": "steps",
"icon": "mdi:walk",
"state_class": "total",
"query": self.query_monthly_steps,
},
{
"name": "Last Sync",
"unique_id": "last_sync",
"state_topic": f"gadgetbridge/{self.device_name}/last_sync",
"icon": "mdi:sync",
"device_class": "timestamp",
"query": self.query_last_sync,
},
{
"name": "Battery Level",
"unique_id": "battery_level",
"state_topic": f"gadgetbridge/{self.device_name}/battery",
"unit_of_measurement": "%",
"icon": "mdi:battery",
"device_class": "battery",
"query": self.query_battery_level,
},
{
"name": "Weight",
"unique_id": "weight",
"state_topic": f"gadgetbridge/{self.device_name}/weight",
"unit_of_measurement": "kg",
"icon": "mdi:scale-bathroom",
"state_class": "measurement",
"query": self.query_latest_weight,
},
{
"name": "Latest Heart Rate",
"unique_id": "latest_heart_rate",
"state_topic": f"gadgetbridge/{self.device_name}/heart_rate",
"unit_of_measurement": "bpm",
"icon": "mdi:heart-pulse",
"state_class": "measurement",
"query": self.query_latest_heart_rate,
},
]
def setup_logging(self):
"""Setup logging configuration"""
@ -73,221 +135,116 @@ class GadgetbridgeMQTTPublisher:
"model": "Fitness Tracker",
"manufacturer": "Gadgetbridge",
}
# Daily steps sensor
steps_config = {
"name": f"{self.device_name.replace('_', ' ').title()} Daily Steps",
"unique_id": f"{self.device_name}_daily_steps",
"state_topic": f"gadgetbridge/{self.device_name}/steps/daily",
"unit_of_measurement": "steps",
"icon": "mdi:walk",
for sensor in self.sensors:
config = {
"name": f"{self.device_name.replace('_', ' ').title()} {sensor['name']}",
"unique_id": f"{self.device_name}_{sensor['unique_id']}",
"state_topic": sensor["state_topic"],
"device": device_info,
"state_class": "total_increasing",
}
# Weekly steps sensor
weekly_steps_config = {
"name": f"{self.device_name.replace('_', ' ').title()} Weekly Steps",
"unique_id": f"{self.device_name}_weekly_steps",
"state_topic": f"gadgetbridge/{self.device_name}/steps/weekly",
"unit_of_measurement": "steps",
"icon": "mdi:walk",
"device": device_info,
"state_class": "total",
}
# Monthly steps sensor
monthly_steps_config = {
"name": f"{self.device_name.replace('_', ' ').title()} Monthly Steps",
"unique_id": f"{self.device_name}_monthly_steps",
"state_topic": f"gadgetbridge/{self.device_name}/steps/monthly",
"unit_of_measurement": "steps",
"icon": "mdi:walk",
"device": device_info,
"state_class": "total",
}
# Last sync sensor
last_sync_config = {
"name": f"{self.device_name.replace('_', ' ').title()} Last Sync",
"unique_id": f"{self.device_name}_last_sync",
"state_topic": f"gadgetbridge/{self.device_name}/last_sync",
"icon": "mdi:sync",
"device": device_info,
"device_class": "timestamp",
}
# Battery level sensor
battery_config = {
"name": f"{self.device_name.replace('_', ' ').title()} Battery Level",
"unique_id": f"{self.device_name}_battery_level",
"state_topic": f"gadgetbridge/{self.device_name}/battery",
"unit_of_measurement": "%",
"icon": "mdi:battery",
"device": device_info,
"device_class": "battery",
}
# Weight sensor
weight_config = {
"name": f"{self.device_name.replace('_', ' ').title()} Weight",
"unique_id": f"{self.device_name}_weight",
"state_topic": f"gadgetbridge/{self.device_name}/weight",
"unit_of_measurement": "kg",
"icon": "mdi:scale-bathroom",
"device": device_info,
"state_class": "measurement",
}
# Add optional fields if present
for key in ["unit_of_measurement", "icon", "state_class", "device_class"]:
if key in sensor:
config[key] = sensor[key]
await self.publish_home_assistant_discovery(
"sensor", "daily_steps", steps_config
"sensor", sensor["unique_id"], config
)
await self.publish_home_assistant_discovery(
"sensor", "weekly_steps", weekly_steps_config
)
await self.publish_home_assistant_discovery(
"sensor", "monthly_steps", monthly_steps_config
)
await self.publish_home_assistant_discovery(
"sensor", "last_sync", last_sync_config
)
await self.publish_home_assistant_discovery(
"sensor", "battery_level", battery_config
)
await self.publish_home_assistant_discovery("sensor", "weight", weight_config)
def get_steps_data(self) -> Dict:
"""Extract steps data from Gadgetbridge database"""
if not os.path.exists(self.db_path):
self.logger.error(f"Database file not found: {self.db_path}")
return {}
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get today's date
def query_daily_steps(self, cursor) -> Any:
today = datetime.now().date()
week_start = today - timedelta(days=today.weekday())
month_start = today.replace(day=1)
# Convert to Unix timestamps
today_start = int(datetime.combine(today, datetime.min.time()).timestamp())
today_end = int(datetime.combine(today, datetime.max.time()).timestamp())
cursor.execute(
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?",
(today_start, today_end),
)
return cursor.fetchone()[0] or 0
def query_weekly_steps(self, cursor) -> Any:
today = datetime.now().date()
week_start = today - timedelta(days=today.weekday())
week_start_ts = int(
datetime.combine(week_start, datetime.min.time()).timestamp()
)
cursor.execute(
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ?",
(week_start_ts,),
)
return cursor.fetchone()[0] or 0
def query_monthly_steps(self, cursor) -> Any:
today = datetime.now().date()
month_start = today.replace(day=1)
month_start_ts = int(
datetime.combine(month_start, datetime.min.time()).timestamp()
)
# Query daily steps
cursor.execute(
"""
SELECT SUM(STEPS) as daily_steps
FROM XIAOMI_ACTIVITY_SAMPLE
WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?
""",
(today_start, today_end),
)
daily_steps = cursor.fetchone()[0] or 0
# Query weekly steps
cursor.execute(
"""
SELECT SUM(STEPS) as weekly_steps
FROM XIAOMI_ACTIVITY_SAMPLE
WHERE TIMESTAMP >= ?
""",
(week_start_ts,),
)
weekly_steps = cursor.fetchone()[0] or 0
# Query monthly steps
cursor.execute(
"""
SELECT SUM(STEPS) as monthly_steps
FROM XIAOMI_ACTIVITY_SAMPLE
WHERE TIMESTAMP >= ?
""",
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ?",
(month_start_ts,),
)
return cursor.fetchone()[0] or 0
monthly_steps = cursor.fetchone()[0] or 0
# Get last sync timestamp
cursor.execute(
"""
SELECT MAX(TIMESTAMP) as last_sync
FROM XIAOMI_ACTIVITY_SAMPLE
"""
)
def query_last_sync(self, cursor) -> Any:
cursor.execute("SELECT MAX(TIMESTAMP) FROM XIAOMI_ACTIVITY_SAMPLE")
last_sync_ts = cursor.fetchone()[0]
last_sync = (
datetime.fromtimestamp(last_sync_ts).isoformat()
if last_sync_ts
else None
return (
datetime.fromtimestamp(last_sync_ts).isoformat() if last_sync_ts else None
)
def query_battery_level(self, cursor) -> Any:
cursor.execute(
"SELECT LEVEL FROM BATTERY_LEVEL ORDER BY TIMESTAMP DESC LIMIT 1"
)
row = cursor.fetchone()
return row[0] if row else None
def query_latest_weight(self, cursor) -> Any:
cursor.execute(
"SELECT WEIGHT_KG FROM MI_SCALE_WEIGHT_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1"
)
row = cursor.fetchone()
return row[0] if row else None
def query_latest_heart_rate(self, cursor) -> Any:
cursor.execute(
"SELECT HEART_RATE FROM XIAOMI_ACTIVITY_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1"
)
row = cursor.fetchone()
return row[0] if row else None
def get_sensor_data(self) -> Dict[str, Any]:
"""Query all sensors and return their values as a dict"""
if not os.path.exists(self.db_path):
self.logger.error(f"Database file not found: {self.db_path}")
return {}
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
data = {}
for sensor in self.sensors:
try:
data[sensor["unique_id"]] = sensor["query"](cursor)
except Exception as e:
self.logger.error(f"Error querying {sensor['unique_id']}: {e}")
data[sensor["unique_id"]] = None
conn.close()
return {
"daily_steps": daily_steps,
"weekly_steps": weekly_steps,
"monthly_steps": monthly_steps,
"last_sync": last_sync,
}
return data
except Exception as e:
self.logger.error(f"Error querying database: {e}")
return {}
async def publish_steps_data(self, data: Dict):
"""Publish steps data to MQTT asynchronously"""
if not data:
return
topics = {
"daily": f"gadgetbridge/{self.device_name}/steps/daily",
"weekly": f"gadgetbridge/{self.device_name}/steps/weekly",
"monthly": f"gadgetbridge/{self.device_name}/steps/monthly",
"last_sync": f"gadgetbridge/{self.device_name}/last_sync",
"battery": f"gadgetbridge/{self.device_name}/battery",
"weight": f"gadgetbridge/{self.device_name}/weight",
}
async def publish_sensor_data(self, data: Dict[str, Any]):
"""Publish all sensor data to MQTT asynchronously"""
for sensor in self.sensors:
value = data.get(sensor["unique_id"])
if value is not None:
try:
await self.mqtt_client.publish(
topics["daily"], str(data["daily_steps"]), qos=1
)
await self.mqtt_client.publish(
topics["weekly"], str(data["weekly_steps"]), qos=1
)
await self.mqtt_client.publish(
topics["monthly"], str(data["monthly_steps"]), qos=1
)
if data["last_sync"]:
await self.mqtt_client.publish(
topics["last_sync"], data["last_sync"], qos=1
)
# Publish battery level
battery_level = self.get_battery_level()
if battery_level is not None:
await self.mqtt_client.publish(
topics["battery"], str(battery_level), qos=1
)
# Publish latest weight
latest_weight = self.get_latest_weight()
if latest_weight is not None:
await self.mqtt_client.publish(
topics["weight"], str(latest_weight), qos=1
)
self.logger.info(
f"Published steps data: Daily={data['daily_steps']}, Weekly={data['weekly_steps']}, Monthly={data['monthly_steps']}, Battery={battery_level}"
sensor["state_topic"], str(value), qos=1
)
except Exception as e:
self.logger.error(f"Failed to publish steps/battery data: {e}")
self.logger.error(f"Failed to publish {sensor['unique_id']}: {e}")
self.logger.info(f"Published sensor data: {data}")
async def run(self):
"""Main execution method (async)"""
@ -302,158 +259,23 @@ class GadgetbridgeMQTTPublisher:
self.mqtt_client = client
await self.setup_home_assistant_entities()
# Publish immediately on startup
steps_data = self.get_steps_data()
if steps_data:
await self.publish_steps_data(steps_data)
sensor_data = self.get_sensor_data()
await self.publish_sensor_data(sensor_data)
self.logger.info(
f"Sleeping for {self.publish_interval} seconds before next publish..."
)
while True:
await asyncio.sleep(self.publish_interval)
steps_data = self.get_steps_data()
if steps_data:
await self.publish_steps_data(steps_data)
sensor_data = self.get_sensor_data()
await self.publish_sensor_data(sensor_data)
self.logger.info(
f"Sleeping for {self.publish_interval} seconds before next publish..."
)
except Exception as e:
self.logger.error(f"Failed to connect to MQTT broker: {e}")
def get_battery_level(self) -> Optional[int]:
"""Get the latest battery level from BATTERY_LEVEL table"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT LEVEL FROM BATTERY_LEVEL ORDER BY TIMESTAMP DESC LIMIT 1"
)
row = cursor.fetchone()
conn.close()
return row[0] if row else None
except Exception as e:
self.logger.error(f"Error querying battery level: {e}")
return None
def get_latest_weight(self) -> Optional[float]:
"""Get the latest weight in kg from MI_SCALE_WEIGHT_SAMPLE table"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT WEIGHT_KG FROM MI_SCALE_WEIGHT_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1"
)
row = cursor.fetchone()
conn.close()
return row[0] if row else None
except Exception as e:
self.logger.error(f"Error querying latest weight: {e}")
return None
def get_all_device_names(db_path):
"""Returns a list of all unique device names from the database."""
if not os.path.exists(db_path):
print(f"Database file not found: {db_path}")
return []
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Try to find a devices table. If not, look for a device column in samples.
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='DEVICES'"
)
if cursor.fetchone():
cursor.execute("SELECT DISTINCT NAME FROM DEVICES")
names = [row[0] for row in cursor.fetchall()]
else:
# See if MI_BAND_ACTIVITY_SAMPLE has a device column
cursor.execute("PRAGMA table_info(MI_BAND_ACTIVITY_SAMPLE)")
columns = [row[1] for row in cursor.fetchall()]
names = []
if "DEVICE_NAME" in columns:
cursor.execute(
"SELECT DISTINCT DEVICE_NAME FROM MI_BAND_ACTIVITY_SAMPLE"
)
names = [row[0] for row in cursor.fetchall()]
conn.close()
return names
except Exception as e:
print(f"Error querying database: {e}")
return []
def print_db_context(db_path):
print(f"\nDatabase path: {db_path}")
if os.path.exists(db_path):
size_mb = os.path.getsize(db_path) / (1024 * 1024)
print(f"Database size: {size_mb:.2f} MB")
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# List tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
print(f"Tables in database: {tables}")
# Row counts for key tables
for table in tables:
try:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
print(f" {table}: {count} rows")
except Exception as e:
print(f" {table}: error counting rows ({e})")
# Show sample from MI_BAND_ACTIVITY_SAMPLE
if "MI_BAND_ACTIVITY_SAMPLE" in tables:
cursor.execute("SELECT * FROM MI_BAND_ACTIVITY_SAMPLE LIMIT 3")
rows = cursor.fetchall()
print("Sample rows from MI_BAND_ACTIVITY_SAMPLE:")
for row in rows:
print(" ", row)
conn.close()
except Exception as e:
print(f"Error reading database: {e}")
else:
print("Database file not found.")
def print_device_table(db_path):
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM DEVICE")
rows = cursor.fetchall()
print("DEVICE table contents:")
for row in rows:
print(row)
conn.close()
except Exception as e:
print(f"Error reading DEVICE table: {e}")
# --- Main Entry Point ---
if __name__ == "__main__":
db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge")
print_db_context(db_path) # For debugging and context
print_device_table(db_path)
device_name = os.getenv("DEVICE_NAME", "").strip()
if not device_name or device_name.lower() == "unknown":
# Device name is not set or is explicitly 'unknown'
print(
"Device name is not set. Attempting to list all available device names from the database...\n"
)
device_names = get_all_device_names(db_path)
if device_names:
print("Available device names in this database:")
for n in device_names:
print(" -", n)
else:
print("No device names could be found in the database.")
print(
"\nWaiting 10 minutes (600 seconds) before terminating to allow user review..."
)
time.sleep(600)
print("Terminating script.")
exit(0)
# continuous publisher only runs if device_name is present and valid
publisher = GadgetbridgeMQTTPublisher()
asyncio.run(publisher.run())