This commit is contained in:
Oliver Großkloß 2025-07-16 13:55:20 +02:00
commit 7f36bc3207
5 changed files with 531 additions and 0 deletions

16
Readme.md Normal file
View File

@ -0,0 +1,16 @@
# GadgetbridgeMqtt for TrueNAS Scale
This is a Gadgetbridge MQTT bridge for TrueNAS Scale, which allows you to connect your Gadgetbridge database to Home Assistant or other MQTT clients.
## Setup
- edit ```compose.yaml``` and set
- mount points for your data
- mount point for your Gadgetbridge database
- your Timezone
- environment variables for your MQTT broker
- your DEVICE_NAME. (Can be skipped first time: If you start the app with "unknown" as DEVICE_NAME, you will see all devices from the Gadgetbridge Database in your App Log.)
- include it in your TrueNAS Scale Custom Apps e.g. ```include: [/mnt/Data/Apps/GadgetbridgeMqtt/Config/compose.yaml]```
- start the app
- find your DEVICE_NAME in the App Log and set it in the compose.yaml, then restart the app
- (AI told me to ignore the cron setup error)

28
compose.yaml Normal file
View File

@ -0,0 +1,28 @@
services:
gadgetbridge-mqtt:
image: python:3.11-slim
container_name: gadgetbridge-mqtt
restart: unless-stopped
network_mode: host
working_dir: /app
volumes:
- /mnt/Data/Apps/GadgetbridgeMqtt/App:/app
- /mnt/Data/Apps/GadgetbridgeMqtt/Logs:/app/logs
- /mnt/Data/Apps/Nextcloud10/data/oliver/files/Backups/Android/Apps/Gadgetbridge/GadgetbridgeOld.db:/data/Gadgetbridge.db:ro
environment:
- TZ=Europe/Berlin
- MQTT_BROKER=192.168.***.***
- MQTT_PORT=1883
- MQTT_USERNAME=*****
- MQTT_PASSWORD=*****
- GADGETBRIDGE_DB_PATH=/data/Gadgetbridge.db
- DEVICE_NAME="unknown" # Set your device name here, or leave as "unknown" to see all devices in the logs
- PYTHONUNBUFFERED=1
command: >
sh -c "
apt-get update &&
apt-get install -y cron &&
pip install --no-cache-dir -r requirements.txt &&
python setup.py &&
cron -f
"

66
healthcheck.py Normal file
View File

@ -0,0 +1,66 @@
#!/usr/bin/env python3
"""
Health check script for Gadgetbridge MQTT integration
"""
import os
import sqlite3
import paho.mqtt.client as mqtt
from datetime import datetime, timedelta
def check_database():
"""Check if Gadgetbridge database is accessible"""
db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge")
if not os.path.exists(db_path):
return False
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='MI_BAND_ACTIVITY_SAMPLE'"
)
result = cursor.fetchone()
conn.close()
return result is not None
except:
return False
def check_mqtt_connection():
"""Check MQTT broker connectivity"""
try:
client = mqtt.Client("healthcheck")
if os.getenv("MQTT_USERNAME"):
client.username_pw_set(
os.getenv("MQTT_USERNAME"), os.getenv("MQTT_PASSWORD")
)
client.connect(
os.getenv("MQTT_BROKER", "localhost"),
int(os.getenv("MQTT_PORT", "1883")),
10,
)
client.disconnect()
return True
except:
return False
def main():
"""Main health check"""
db_ok = check_database()
mqtt_ok = check_mqtt_connection()
if db_ok and mqtt_ok:
print("Health check passed")
exit(0)
else:
print(f"Health check failed - DB: {db_ok}, MQTT: {mqtt_ok}")
exit(1)
if __name__ == "__main__":
main()

375
main.py Normal file
View File

@ -0,0 +1,375 @@
#!/usr/bin/env python3
"""
Gadgetbridge MQTT Step Counter Integration
Extracts steps data from Gadgetbridge SQLite database and publishes to Home Assistant via MQTT
"""
import os
import sqlite3
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import asyncio
import aiomqtt
import re
class GadgetbridgeMQTTPublisher:
def __init__(self):
self.setup_logging()
self.db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge.db")
raw_name = os.getenv("DEVICE_NAME", "fitness_tracker")
# Sanitize device_name: lowercase, replace spaces and non-word chars with _
self.device_name = re.sub(r"\W+", "_", raw_name).lower()
self.load_config()
self.mqtt_client = None
def setup_logging(self):
"""Setup logging configuration"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("/app/logs/gadgetbridge_mqtt.log"),
logging.StreamHandler(),
],
)
self.logger = logging.getLogger(__name__)
def load_config(self):
"""Load MQTT configuration from environment variables"""
self.mqtt_config = {
"broker": os.getenv("MQTT_BROKER", "localhost"),
"port": int(os.getenv("MQTT_PORT", "1883")),
"username": os.getenv("MQTT_USERNAME", ""),
"password": os.getenv("MQTT_PASSWORD", ""),
}
async def publish_home_assistant_discovery(
self, entity_type: str, entity_id: str, config: Dict
):
"""Publish Home Assistant MQTT discovery configuration asynchronously"""
discovery_topic = (
f"homeassistant/{entity_type}/{self.device_name}_{entity_id}/config"
)
try:
await self.mqtt_client.publish(
discovery_topic, json.dumps(config), qos=1, retain=True
)
self.logger.info(f"Published discovery config for {entity_id}")
except Exception as e:
self.logger.error(f"Failed to publish discovery config: {e}")
async def setup_home_assistant_entities(self):
"""Setup Home Assistant entities via MQTT discovery"""
device_info = {
"identifiers": [self.device_name],
"name": f"Gadgetbridge {self.device_name.title()}",
"model": "Fitness Tracker",
"manufacturer": "Gadgetbridge",
}
# Daily steps sensor
steps_config = {
"name": f"{self.device_name.title()} Daily Steps",
"unique_id": f"{self.device_name}_daily_steps",
"state_topic": f"gadgetbridge/{self.device_name}/steps/daily",
"unit_of_measurement": "steps",
"icon": "mdi:walk",
"device": device_info,
"state_class": "total_increasing",
}
# Weekly steps sensor
weekly_steps_config = {
"name": f"{self.device_name.title()} Weekly Steps",
"unique_id": f"{self.device_name}_weekly_steps",
"state_topic": f"gadgetbridge/{self.device_name}/steps/weekly",
"unit_of_measurement": "steps",
"icon": "mdi:walk",
"device": device_info,
"state_class": "total",
}
# Monthly steps sensor
monthly_steps_config = {
"name": f"{self.device_name.title()} Monthly Steps",
"unique_id": f"{self.device_name}_monthly_steps",
"state_topic": f"gadgetbridge/{self.device_name}/steps/monthly",
"unit_of_measurement": "steps",
"icon": "mdi:walk",
"device": device_info,
"state_class": "total",
}
# Last sync sensor
last_sync_config = {
"name": f"{self.device_name.title()} Last Sync",
"unique_id": f"{self.device_name}_last_sync",
"state_topic": f"gadgetbridge/{self.device_name}/last_sync",
"icon": "mdi:sync",
"device": device_info,
"device_class": "timestamp",
}
await self.publish_home_assistant_discovery(
"sensor", "daily_steps", steps_config
)
await self.publish_home_assistant_discovery(
"sensor", "weekly_steps", weekly_steps_config
)
await self.publish_home_assistant_discovery(
"sensor", "monthly_steps", monthly_steps_config
)
await self.publish_home_assistant_discovery(
"sensor", "last_sync", last_sync_config
)
def get_steps_data(self) -> Dict:
"""Extract steps data from Gadgetbridge database"""
if not os.path.exists(self.db_path):
self.logger.error(f"Database file not found: {self.db_path}")
return {}
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get today's date
today = datetime.now().date()
week_start = today - timedelta(days=today.weekday())
month_start = today.replace(day=1)
# Convert to Unix timestamps
today_start = int(datetime.combine(today, datetime.min.time()).timestamp())
today_end = int(datetime.combine(today, datetime.max.time()).timestamp())
week_start_ts = int(
datetime.combine(week_start, datetime.min.time()).timestamp()
)
month_start_ts = int(
datetime.combine(month_start, datetime.min.time()).timestamp()
)
# Query daily steps
cursor.execute(
"""
SELECT SUM(STEPS) as daily_steps
FROM XIAOMI_ACTIVITY_SAMPLE
WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?
""",
(today_start, today_end),
)
daily_steps = cursor.fetchone()[0] or 0
# Query weekly steps
cursor.execute(
"""
SELECT SUM(STEPS) as weekly_steps
FROM XIAOMI_ACTIVITY_SAMPLE
WHERE TIMESTAMP >= ?
""",
(week_start_ts,),
)
weekly_steps = cursor.fetchone()[0] or 0
# Query monthly steps
cursor.execute(
"""
SELECT SUM(STEPS) as monthly_steps
FROM XIAOMI_ACTIVITY_SAMPLE
WHERE TIMESTAMP >= ?
""",
(month_start_ts,),
)
monthly_steps = cursor.fetchone()[0] or 0
# Get last sync timestamp
cursor.execute(
"""
SELECT MAX(TIMESTAMP) as last_sync
FROM XIAOMI_ACTIVITY_SAMPLE
"""
)
last_sync_ts = cursor.fetchone()[0]
last_sync = (
datetime.fromtimestamp(last_sync_ts).isoformat()
if last_sync_ts
else None
)
conn.close()
return {
"daily_steps": daily_steps,
"weekly_steps": weekly_steps,
"monthly_steps": monthly_steps,
"last_sync": last_sync,
}
except Exception as e:
self.logger.error(f"Error querying database: {e}")
return {}
async def publish_steps_data(self, data: Dict):
"""Publish steps data to MQTT asynchronously"""
if not data:
return
topics = {
"daily": f"gadgetbridge/{self.device_name}/steps/daily",
"weekly": f"gadgetbridge/{self.device_name}/steps/weekly",
"monthly": f"gadgetbridge/{self.device_name}/steps/monthly",
"last_sync": f"gadgetbridge/{self.device_name}/last_sync",
}
try:
await self.mqtt_client.publish(
topics["daily"], str(data["daily_steps"]), qos=1
)
await self.mqtt_client.publish(
topics["weekly"], str(data["weekly_steps"]), qos=1
)
await self.mqtt_client.publish(
topics["monthly"], str(data["monthly_steps"]), qos=1
)
if data["last_sync"]:
await self.mqtt_client.publish(
topics["last_sync"], data["last_sync"], qos=1
)
self.logger.info(
f"Published steps data: Daily={data['daily_steps']}, Weekly={data['weekly_steps']}, Monthly={data['monthly_steps']}"
)
except Exception as e:
self.logger.error(f"Failed to publish steps data: {e}")
async def run(self):
"""Main execution method (async)"""
self.logger.info("Starting Gadgetbridge MQTT Publisher")
try:
async with aiomqtt.Client(
hostname=self.mqtt_config["broker"],
port=self.mqtt_config["port"],
username=self.mqtt_config["username"] or None,
password=self.mqtt_config["password"] or None,
) as client:
self.mqtt_client = client
await self.setup_home_assistant_entities()
steps_data = self.get_steps_data()
if steps_data:
await self.publish_steps_data(steps_data)
self.logger.info("Gadgetbridge MQTT Publisher completed")
except Exception as e:
self.logger.error(f"Failed to connect to MQTT broker: {e}")
def get_all_device_names(db_path):
"""Returns a list of all unique device names from the database."""
if not os.path.exists(db_path):
print(f"Database file not found: {db_path}")
return []
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Try to find a devices table. If not, look for a device column in samples.
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='DEVICES'"
)
if cursor.fetchone():
cursor.execute("SELECT DISTINCT NAME FROM DEVICES")
names = [row[0] for row in cursor.fetchall()]
else:
# See if MI_BAND_ACTIVITY_SAMPLE has a device column
cursor.execute("PRAGMA table_info(MI_BAND_ACTIVITY_SAMPLE)")
columns = [row[1] for row in cursor.fetchall()]
names = []
if "DEVICE_NAME" in columns:
cursor.execute(
"SELECT DISTINCT DEVICE_NAME FROM MI_BAND_ACTIVITY_SAMPLE"
)
names = [row[0] for row in cursor.fetchall()]
conn.close()
return names
except Exception as e:
print(f"Error querying database: {e}")
return []
def print_db_context(db_path):
print(f"\nDatabase path: {db_path}")
if os.path.exists(db_path):
size_mb = os.path.getsize(db_path) / (1024 * 1024)
print(f"Database size: {size_mb:.2f} MB")
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# List tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
print(f"Tables in database: {tables}")
# Row counts for key tables
for table in tables:
try:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
print(f" {table}: {count} rows")
except Exception as e:
print(f" {table}: error counting rows ({e})")
# Show sample from MI_BAND_ACTIVITY_SAMPLE
if "MI_BAND_ACTIVITY_SAMPLE" in tables:
cursor.execute("SELECT * FROM MI_BAND_ACTIVITY_SAMPLE LIMIT 3")
rows = cursor.fetchall()
print("Sample rows from MI_BAND_ACTIVITY_SAMPLE:")
for row in rows:
print(" ", row)
conn.close()
except Exception as e:
print(f"Error reading database: {e}")
else:
print("Database file not found.")
def print_device_table(db_path):
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM DEVICE")
rows = cursor.fetchall()
print("DEVICE table contents:")
for row in rows:
print(row)
conn.close()
except Exception as e:
print(f"Error reading DEVICE table: {e}")
# --- Main Entry Point ---
if __name__ == "__main__":
db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge")
print_db_context(db_path) # For debugging and context
print_device_table(db_path)
device_name = os.getenv("DEVICE_NAME", "").strip()
if not device_name or device_name.lower() == "unknown":
# Device name is not set or is explicitly 'unknown'
print(
"Device name is not set. Attempting to list all available device names from the database...\n"
)
device_names = get_all_device_names(db_path)
if device_names:
print("Available device names in this database:")
for n in device_names:
print(" -", n)
else:
print("No device names could be found in the database.")
print(
"\nWaiting 10 minutes (600 seconds) before terminating to allow user review..."
)
time.sleep(600)
print("Terminating script.")
exit(0)
# Original code follows, only runs if device_name is present and valid
publisher = GadgetbridgeMQTTPublisher()
asyncio.run(publisher.run())

46
setup.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/env python3
"""
Setup script for Gadgetbridge MQTT integration
"""
import os
import subprocess
import logging
def setup_cron_job():
"""Setup cron job for periodic execution"""
cron_schedule = "*/15 * * * *" # Every 15 minutes
cron_command = (
f"cd /app && /usr/local/bin/python main.py >> /app/logs/cron.log 2>&1"
)
# Create cron job
with open("/tmp/gadgetbridge_cron", "w") as f:
f.write(f"{cron_schedule} {cron_command}\n")
# Install cron job
subprocess.run(["crontab", "/tmp/gadgetbridge_cron"], check=True)
# Start cron service
subprocess.run(["service", "cron", "start"], check=True)
print("Cron job setup completed - running every 15 minutes")
def create_directories():
"""Create necessary directories"""
os.makedirs("/app/logs", exist_ok=True)
print("Created log directory")
def run_initial_setup():
"""Run initial discovery setup"""
subprocess.run(["/usr/local/bin/python", "/app/main.py"], check=True)
print("Initial MQTT discovery setup completed")
if __name__ == "__main__":
create_directories()
setup_cron_job()
run_initial_setup()