refactor to run on phone

This commit is contained in:
Oliver Großkloß 2025-12-07 15:08:37 +01:00
parent a7d5c190c7
commit 45a6bbfc5e
6 changed files with 728 additions and 729 deletions

164
Readme.md
View File

@ -1,48 +1,146 @@
# GadgetbridgeMqtt for TrueNAS Scale
# Gadgetbridge MQTT for Termux
This is a Gadgetbridge MQTT bridge for TrueNAS Scale, which allows you to connect your Gadgetbridge database to Home Assistant or other MQTT clients.
Publish fitness data from Gadgetbridge to Home Assistant via MQTT directly from your Android phone using Termux.
## Setup
## Features
- copy content [```compose.yaml```](./compose.yaml) your TrueNAS Scale Apps -> Discover Apps -> ⋮ -> Install via YAML
- edit
- mount point for your Gadgetbridge database
- your Timezone
- environment variables for your MQTT broker
- start the app
- 📊 Publishes steps, heart rate, sleep, battery, weight, and more
- 🔄 Auto-syncs with your fitness band every 5 minutes
- 🏠 Home Assistant auto-discovery via MQTT
- <20><> Runs entirely on your phone (no server needed)
- 🚀 Auto-starts on boot
## Editing Sensors
## Quick Setup
- add new mqtt sensor around here [main.py#L90](https://git.olli.info/Oliver/GadgetbridgeMqtt/src/branch/main/main.py#L90)
- add new function for new sensor around here [main.py#L208](https://git.olli.info/Oliver/GadgetbridgeMqtt/src/branch/main/main.py#L208)
### 1. Install Apps from F-Droid
## Automated Gadgetbridge DB Shrink & Sync
- **[Termux](https://f-droid.org/packages/com.termux/)** - Terminal emulator
- **[Termux:Boot](https://f-droid.org/packages/com.termux.boot/)** - Auto-start scripts on boot
- **[Gadgetbridge](https://f-droid.org/packages/nodomain.freeyourgadget.gadgetbridge/)** - Fitness tracker app
### 1. Install Apps (F-Droid)
* **Termux** (Terminal)
* **Termux:Boot** (Auto-start scripts)
### 2. Run Setup in Termux
### 2. Configure Apps
* **Gadgetbridge:** Settings → Auto Export → Location: `/sdcard/Documents/GB_Raw`
* **Nextcloud:** Auto Upload → Custom Folder → Local: `/sdcard/Documents/GB_Sync` → Remote: `/Backup/Gadgetbridge`
* **Termux:Boot:** Open app once to initialize.
Open Termux and run:
### 3. Termux Setup (Run these commands)
```bash
# 1. Setup storage & install python
# Grant storage access
termux-setup-storage
pkg update -y && pkg install -y python wget
# 2. Create script directories
mkdir -p ~/scripts ~/.termux/boot
# Install Python
pkg update -y && pkg install -y python
# 3. Download python script
wget -O ~/scripts/shrinkdb.py https://git.olli.info/Oliver/GadgetbridgeMqtt/raw/branch/main/termux_setup/shrinkdb.py
# 4. Create autostart launcher
printf '#!/data/data/com.termux/files/usr/bin/sh\ntermux-wake-lock\npython ~/scripts/shrinkdb.py &\n' > ~/.termux/boot/start_gb
# Download and run setup
curl -sL https://git.olli.info/Oliver/GadgetbridgeMqtt/raw/branch/main/setup.py | python
```
### 4. Finish
**Reboot your phone.**
The script will now run automatically, shrinking DBs from `GB_Raw` and placing clean versions in `GB_Sync` for Nextcloud.
Or step-by-step:
```bash
termux-setup-storage
pkg update -y && pkg install -y python wget
mkdir -p ~/scripts
wget -O ~/scripts/setup.py https://git.olli.info/Oliver/GadgetbridgeMqtt/raw/branch/main/setup.py
python ~/scripts/setup.py
```
### 3. Configure Gadgetbridge
1. Open **Gadgetbridge** app
2. Go to **Settings → Auto Export**
3. Set **Location** to: `/storage/emulated/0/Documents/GB_Export`
4. Enable export (the script will trigger syncs automatically)
### 4. Enable Autostart
1. Open **Termux:Boot** app once (this enables the autostart feature)
2. **Reboot your phone**
The script will now start automatically on boot!
## Manual Operation
```bash
# Start manually
python ~/scripts/gadgetbridge_mqtt.py
# View logs
tail -f ~/gb_mqtt.log
# Edit config
nano ~/.config/gadgetbridge_mqtt/config.json
```
## Configuration
Config is stored at `~/.config/gadgetbridge_mqtt/config.json`:
```json
{
"mqtt_broker": "192.168.1.100",
"mqtt_port": 1883,
"mqtt_username": "your_username",
"mqtt_password": "your_password",
"export_dir": "/storage/emulated/0/Documents/GB_Export",
"publish_interval": 300
}
```
## Published Sensors
| Sensor | Topic | Unit |
|--------|-------|------|
| Daily Steps | `gadgetbridge/{device}/daily_steps` | steps |
| Weekly Steps | `gadgetbridge/{device}/weekly_steps` | steps |
| Heart Rate | `gadgetbridge/{device}/heart_rate` | bpm |
| Resting HR | `gadgetbridge/{device}/hr_resting` | bpm |
| Max HR | `gadgetbridge/{device}/hr_max` | bpm |
| Average HR | `gadgetbridge/{device}/hr_avg` | bpm |
| Battery | `gadgetbridge/{device}/battery_level` | % |
| Calories | `gadgetbridge/{device}/calories` | kcal |
| Sleep Duration | `gadgetbridge/{device}/sleep_duration` | hours |
| Is Awake | `gadgetbridge/{device}/is_awake` | bool |
| Weight | `gadgetbridge/{device}/weight` | kg |
| Last Update | `gadgetbridge/{device}/server_time` | timestamp |
## Home Assistant
Sensors are automatically discovered via MQTT. They will appear under:
- **Devices**: Gadgetbridge {Your Device Name}
## Troubleshooting
### Script not starting on boot
- Make sure you opened Termux:Boot at least once
- Check if battery optimization is disabled for Termux
- Check logs: `cat ~/gb_mqtt.log`
### No data being published
- Verify Gadgetbridge export directory is correct
- Check if `.db` files exist in export folder
- Ensure MQTT broker is reachable: `ping {broker_ip}`
### Permission issues
- Run `termux-setup-storage` and grant permissions
- Ensure Gadgetbridge has storage permissions
## How It Works
1. Every 5 minutes (configurable), the script:
- Sends `ACTIVITY_SYNC` intent to Gadgetbridge (syncs data from band)
- Sends `TRIGGER_EXPORT` intent (exports database)
- Reads the exported database
- Publishes sensor data via MQTT
2. Home Assistant discovers sensors automatically via MQTT discovery
## Files
- `main.py` - Main MQTT publisher script
- `setup.py` - Interactive setup script
- `~/.config/gadgetbridge_mqtt/config.json` - Configuration
- `~/.termux/boot/start_gb_mqtt` - Autostart script
- `~/gb_mqtt.log` - Log file
## License
MIT

View File

@ -1,33 +0,0 @@
services:
gadgetbridge-mqtt:
image: python:3.11-slim
container_name: gadgetbridge-mqtt
restart: unless-stopped
network_mode: host
working_dir: /app
volumes:
- /mnt/Data/Apps/*****/Gadgetbridge:/data:ro
environment:
- TZ=Europe/Berlin # Get from e.g. https://webbrowsertools.com/timezone/ -> Timezone info Table -> Timezone
- MQTT_BROKER=192.168.***.***
- MQTT_PORT=1883
- MQTT_USERNAME=*****
- MQTT_PASSWORD=*****
- GADGETBRIDGE_DB_PATH=/data/Gadgetbridge.db
- PYTHONUNBUFFERED=1
- PUBLISH_INTERVAL_SECONDS=300
command: >
sh -c "
apt-get update &&
apt-get install -y git &&
git clone https://git.olli.info/Oliver/GadgetbridgeMqtt.git /tmp/repo &&
cp /tmp/repo/main.py /app/ &&
cp /tmp/repo/healthcheck.py /app/ &&
pip install --no-cache-dir aiomqtt &&
python main.py
"
healthcheck:
test: ["CMD", "python", "healthcheck.py"]
interval: 1m
timeout: 10s
retries: 3

View File

@ -1,87 +0,0 @@
#!/usr/bin/env python3
"""
Health check script for Gadgetbridge MQTT integration
"""
import os
import sqlite3
import socket
import sys
import time
def check_database():
"""Check if Gadgetbridge database is accessible"""
db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge.db")
if not os.path.exists(db_path):
print(f"Database file not found: {db_path}")
return False
try:
# Use timeout to prevent hanging
conn = sqlite3.connect(db_path, timeout=5.0)
cursor = conn.cursor()
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='XIAOMI_ACTIVITY_SAMPLE'"
)
result = cursor.fetchone()
conn.close()
return result is not None
except Exception as e:
print(f"Database check failed: {e}")
return False
def check_mqtt_connection():
"""Check MQTT broker TCP connectivity (no paho)"""
host = os.getenv("MQTT_BROKER", "localhost")
port = int(os.getenv("MQTT_PORT", "1883"))
try:
with socket.create_connection((host, port), timeout=10):
return True
except Exception as e:
print(f"MQTT connection check failed: {e}")
return False
def check_main_process():
"""Check if main process is likely running by checking for Python processes"""
try:
import psutil
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
if proc.info['name'] == 'python' and proc.info['cmdline']:
if 'main.py' in ' '.join(proc.info['cmdline']):
return True
except ImportError:
# psutil not available, skip this check
pass
except Exception as e:
print(f"Process check failed: {e}")
return True # Assume OK if we can't check
def main():
print("Starting health check...")
# Add startup grace period
startup_grace = int(os.getenv("HEALTHCHECK_STARTUP_GRACE", "0"))
if startup_grace > 0:
print(f"Waiting {startup_grace} seconds for startup grace period...")
time.sleep(startup_grace)
db_ok = check_database()
mqtt_ok = check_mqtt_connection()
process_ok = check_main_process()
print(f"Health check results - DB: {db_ok}, MQTT: {mqtt_ok}, Process: {process_ok}")
# Only fail if both DB and MQTT are down (more lenient)
if db_ok or mqtt_ok:
print("Health check passed")
sys.exit(0)
else:
print("Health check failed - both DB and MQTT unavailable")
sys.exit(1)
if __name__ == "__main__":
main()

801
main.py
View File

@ -1,228 +1,264 @@
#!/usr/bin/env python3
"""
Gadgetbridge MQTT Step Counter Integration
Extracts sensor data from Gadgetbridge SQLite database and publishes to Home Assistant via MQTT
Gadgetbridge MQTT Publisher for Termux
Watches for Gadgetbridge exports and publishes sensor data to Home Assistant via MQTT
"""
import os
import sqlite3
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, Any
import asyncio
import aiomqtt
import re
import sys
import time
import re
from datetime import datetime, timedelta
from pathlib import Path
class GadgetbridgeMQTTPublisher:
def __init__(self):
self.setup_logging()
self.db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge.db")
self.load_config()
self.mqtt_client = None
self.publish_interval = int(os.getenv("PUBLISH_INTERVAL_SECONDS", "300"))
self.max_retries = int(os.getenv("MAX_RETRIES", "5"))
self.retry_delay = int(os.getenv("RETRY_DELAY_SECONDS", "30"))
# Initialize device_name with fallback - don't fail on DB issues during init
# MQTT library - paho-mqtt works well on Termux
try:
self.device_name = self.get_device_alias()
except Exception as e:
self.logger.warning(f"Could not get device alias during init: {e}")
self.device_name = "fitness_tracker"
import paho.mqtt.client as mqtt
except ImportError:
print("Error: paho-mqtt not installed. Run: pip install paho-mqtt")
sys.exit(1)
# Initialize sensors after device_name is set
self.initialize_sensors()
# --- Configuration ---
CONFIG_FILE = os.path.expanduser("~/.config/gadgetbridge_mqtt/config.json")
GB_EXPORT_DIR = "/storage/emulated/0/Documents/GB_Export"
PUBLISH_INTERVAL = 300 # 5 minutes
def setup_logging(self):
"""Setup logging configuration (console only)"""
# --- Logging ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
],
stream=sys.stdout
)
self.logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
def load_config(self):
"""Load MQTT configuration from environment variables"""
self.mqtt_config = {
"broker": os.getenv("MQTT_BROKER", "localhost"),
"port": int(os.getenv("MQTT_PORT", "1883")),
"username": os.getenv("MQTT_USERNAME", ""),
"password": os.getenv("MQTT_PASSWORD", ""),
}
def get_day_start_timestamp(self) -> int:
"""Get the timestamp for the start of the current day (4am)"""
now = datetime.now()
today = now.date()
def load_config():
"""Load MQTT configuration from config file"""
if not os.path.exists(CONFIG_FILE):
logger.error(f"Config file not found: {CONFIG_FILE}")
logger.error("Run setup.py first to configure MQTT settings")
sys.exit(1)
# Day starts at 4am
day_start_time = datetime.combine(today, datetime.min.time()).replace(hour=4)
with open(CONFIG_FILE, "r") as f:
return json.load(f)
# If current time is before 4am, we're still in "yesterday's" day
if now.hour < 4:
day_start_time -= timedelta(days=1)
return int(day_start_time.timestamp())
def send_gadgetbridge_intent(action):
"""Send broadcast intent to Gadgetbridge via Termux API"""
cmd = f"am broadcast -a {action} -p nodomain.freeyourgadget.gadgetbridge"
logger.info(f"Sending intent: {action}")
result = os.system(cmd)
if result != 0:
logger.warning(f"Intent may have failed (exit code: {result})")
def get_day_midnight_timestamp_ms(self) -> int:
"""Get the timestamp for midnight of the current day in milliseconds.
Used for XIAOMI_DAILY_SUMMARY_SAMPLE which stores data at midnight."""
now = datetime.now()
today = now.date()
# Daily summary is stored at midnight
day_midnight = datetime.combine(today, datetime.min.time())
def trigger_gadgetbridge_sync():
"""Trigger Gadgetbridge to sync data from band and export database"""
# First sync data from the band
send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.ACTIVITY_SYNC")
time.sleep(5) # Give it time to sync
# Then trigger database export
send_gadgetbridge_intent("nodomain.freeyourgadget.gadgetbridge.command.TRIGGER_EXPORT")
# If current time is before 4am, use yesterday's midnight
if now.hour < 4:
day_midnight -= timedelta(days=1)
return int(day_midnight.timestamp()) * 1000
def find_latest_db(export_dir):
"""Find the most recent Gadgetbridge database file in export directory"""
if not os.path.exists(export_dir):
return None
def initialize_sensors(self):
"""Initialize sensor definitions after device_name is available"""
self.sensors = [
{
"name": "Daily Steps",
"unique_id": "daily_steps",
"state_topic": f"gadgetbridge/{self.device_name}/steps/daily",
"unit_of_measurement": "steps",
"icon": "mdi:walk",
"state_class": "total_increasing",
"query": self.query_daily_steps,
},
{
"name": "Weekly Steps",
"unique_id": "weekly_steps",
"state_topic": f"gadgetbridge/{self.device_name}/steps/weekly",
"unit_of_measurement": "steps",
"icon": "mdi:walk",
"state_class": "total",
"query": self.query_weekly_steps,
},
{
"name": "Monthly Steps",
"unique_id": "monthly_steps",
"state_topic": f"gadgetbridge/{self.device_name}/steps/monthly",
"unit_of_measurement": "steps",
"icon": "mdi:walk",
"state_class": "total",
"query": self.query_monthly_steps,
},
{
"name": "Battery Level",
"unique_id": "battery_level",
"state_topic": f"gadgetbridge/{self.device_name}/battery",
"unit_of_measurement": "%",
"icon": "mdi:battery",
"device_class": "battery",
"query": self.query_battery_level,
},
{
"name": "Weight",
"unique_id": "weight",
"state_topic": f"gadgetbridge/{self.device_name}/weight",
"unit_of_measurement": "kg",
"icon": "mdi:scale-bathroom",
"state_class": "measurement",
"query": self.query_latest_weight,
},
{
"name": "Latest Heart Rate",
"unique_id": "latest_heart_rate",
"state_topic": f"gadgetbridge/{self.device_name}/heart_rate",
"unit_of_measurement": "bpm",
"icon": "mdi:heart-pulse",
"state_class": "measurement",
"query": self.query_latest_heart_rate,
},
{
"name": "Resting Heart Rate",
"unique_id": "hr_resting",
"state_topic": f"gadgetbridge/{self.device_name}/hr_resting",
"unit_of_measurement": "bpm",
"icon": "mdi:heart-pulse",
"state_class": "measurement",
"query": self.query_hr_resting,
},
{
"name": "Max Heart Rate",
"unique_id": "hr_max",
"state_topic": f"gadgetbridge/{self.device_name}/hr_max",
"unit_of_measurement": "bpm",
"icon": "mdi:heart-pulse",
"state_class": "measurement",
"query": self.query_hr_max,
},
{
"name": "Average Heart Rate",
"unique_id": "hr_avg",
"state_topic": f"gadgetbridge/{self.device_name}/hr_avg",
"unit_of_measurement": "bpm",
"icon": "mdi:heart-pulse",
"state_class": "measurement",
"query": self.query_hr_avg,
},
{
"name": "Calories",
"unique_id": "calories",
"state_topic": f"gadgetbridge/{self.device_name}/calories",
"unit_of_measurement": "kcal",
"icon": "mdi:fire",
"state_class": "total_increasing",
"query": self.query_calories,
},
{
"name": "Is Awake",
"unique_id": "is_awake",
"state_topic": f"gadgetbridge/{self.device_name}/is_awake",
"icon": "mdi:power-sleep",
"device_class": "enum",
"query": self.query_is_awake,
},
{
"name": "Total Sleep Duration",
"unique_id": "total_sleep_duration",
"state_topic": f"gadgetbridge/{self.device_name}/total_sleep_duration",
"unit_of_measurement": "h",
"icon": "mdi:sleep",
"state_class": "measurement",
"query": self.query_total_sleep_duration,
},
{
"name": "Server Time",
"unique_id": "server_time",
"state_topic": f"gadgetbridge/{self.device_name}/server_time",
"icon": "mdi:clock-outline",
"device_class": "timestamp",
"query": self.query_server_time,
},
]
db_files = []
for f in os.listdir(export_dir):
if f.endswith(".db") and "Gadgetbridge" in f:
path = os.path.join(export_dir, f)
db_files.append((path, os.path.getmtime(path)))
async def publish_home_assistant_discovery(
self, entity_type: str, entity_id: str, config: Dict
):
"""Publish Home Assistant MQTT discovery configuration asynchronously"""
discovery_topic = (
f"homeassistant/{entity_type}/{self.device_name}_{entity_id}/config"
if not db_files:
return None
# Return the most recently modified database
db_files.sort(key=lambda x: x[1], reverse=True)
return db_files[0][0]
class GadgetbridgeMQTT:
def __init__(self, config):
self.config = config
self.db_path = None
self.device_name = "fitness_tracker"
self.mqtt_client = None
self.last_publish_time = 0
self.last_db_mtime = 0
def connect_mqtt(self):
"""Connect to MQTT broker"""
self.mqtt_client = mqtt.Client()
if self.config.get("mqtt_username") and self.config.get("mqtt_password"):
self.mqtt_client.username_pw_set(
self.config["mqtt_username"],
self.config["mqtt_password"]
)
try:
await self.mqtt_client.publish(
discovery_topic, json.dumps(config), qos=1, retain=True
self.mqtt_client.connect(
self.config["mqtt_broker"],
self.config.get("mqtt_port", 1883),
60
)
self.logger.info(f"Published discovery config for {entity_id}")
self.mqtt_client.loop_start()
logger.info(f"Connected to MQTT broker: {self.config['mqtt_broker']}")
return True
except Exception as e:
self.logger.error(f"Failed to publish discovery config: {e}")
raise
logger.error(f"MQTT connection failed: {e}")
return False
async def setup_home_assistant_entities(self):
"""Setup Home Assistant entities via MQTT discovery"""
def disconnect_mqtt(self):
"""Disconnect from MQTT broker"""
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
def get_device_alias(self, cursor):
"""Get device alias from database"""
try:
cursor.execute("""
SELECT ALIAS FROM DEVICE
WHERE LOWER(NAME) LIKE '%band%' OR LOWER(NAME) LIKE '%watch%'
LIMIT 1
""")
row = cursor.fetchone()
if row and row[0]:
return re.sub(r"\W+", "_", row[0]).lower()
except:
pass
return "fitness_tracker"
def get_day_start_timestamp(self):
"""Get timestamp for start of current day (4am)"""
now = datetime.now()
today = now.date()
day_start = datetime.combine(today, datetime.min.time()).replace(hour=4)
if now.hour < 4:
day_start -= timedelta(days=1)
return int(day_start.timestamp())
def get_day_midnight_timestamp_ms(self):
"""Get midnight timestamp in milliseconds for daily summary queries"""
now = datetime.now()
today = now.date()
midnight = datetime.combine(today, datetime.min.time())
if now.hour < 4:
midnight -= timedelta(days=1)
return int(midnight.timestamp()) * 1000
def query_sensors(self, cursor):
"""Query all sensor data from database"""
data = {}
day_start_ts = self.get_day_start_timestamp()
now_ts = int(datetime.now().timestamp())
day_midnight_ms = self.get_day_midnight_timestamp_ms()
# Daily Steps
try:
cursor.execute(
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?",
(day_start_ts, now_ts)
)
data["daily_steps"] = cursor.fetchone()[0] or 0
except Exception as e:
logger.debug(f"Daily steps query failed: {e}")
# Weekly Steps
try:
now = datetime.now()
week_start = now.date() - timedelta(days=now.date().weekday())
week_start_time = datetime.combine(week_start, datetime.min.time()).replace(hour=4)
if now.date().weekday() == 0 and now.hour < 4:
week_start_time -= timedelta(days=7)
cursor.execute(
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?",
(int(week_start_time.timestamp()), now_ts)
)
data["weekly_steps"] = cursor.fetchone()[0] or 0
except Exception as e:
logger.debug(f"Weekly steps query failed: {e}")
# Battery Level
try:
cursor.execute("SELECT LEVEL FROM BATTERY_LEVEL ORDER BY TIMESTAMP DESC LIMIT 1")
row = cursor.fetchone()
if row:
data["battery_level"] = row[0]
except Exception as e:
logger.debug(f"Battery query failed: {e}")
# Latest Heart Rate
try:
cursor.execute(
"SELECT HEART_RATE FROM XIAOMI_ACTIVITY_SAMPLE WHERE HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 1"
)
row = cursor.fetchone()
if row:
data["heart_rate"] = row[0]
except Exception as e:
logger.debug(f"Heart rate query failed: {e}")
# Daily Summary Data (resting HR, max HR, avg HR, calories)
try:
cursor.execute(
"SELECT HR_RESTING, HR_MAX, HR_AVG, CALORIES FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1",
(day_midnight_ms,)
)
row = cursor.fetchone()
if row:
if row[0]: data["hr_resting"] = row[0]
if row[1]: data["hr_max"] = row[1]
if row[2]: data["hr_avg"] = row[2]
if row[3]: data["calories"] = row[3]
except Exception as e:
logger.debug(f"Daily summary query failed: {e}")
# Sleep Data
try:
day_ago_ms = (int(time.time()) - 24 * 3600) * 1000
cursor.execute(
"SELECT TOTAL_DURATION, IS_AWAKE, WAKEUP_TIME FROM XIAOMI_SLEEP_TIME_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1",
(day_ago_ms,)
)
row = cursor.fetchone()
if row:
if row[0]:
data["sleep_duration"] = round(row[0] / 60, 2) # Convert to hours
# Determine if awake
is_awake = True
if row[1] == 0:
is_awake = False
if row[2] and row[2] <= int(time.time()) * 1000:
is_awake = True
data["is_awake"] = is_awake
except Exception as e:
logger.debug(f"Sleep query failed: {e}")
# Weight
try:
cursor.execute("SELECT WEIGHT_KG FROM MI_SCALE_WEIGHT_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1")
row = cursor.fetchone()
if row and row[0]:
data["weight"] = row[0]
except Exception as e:
logger.debug(f"Weight query failed: {e}")
# Server time
data["server_time"] = datetime.now().astimezone().isoformat()
return data
def publish_discovery(self):
"""Publish Home Assistant MQTT discovery configs"""
device_info = {
"identifiers": [self.device_name],
"name": f"Gadgetbridge {self.device_name.replace('_', ' ').title()}",
@ -230,314 +266,147 @@ class GadgetbridgeMQTTPublisher:
"manufacturer": "Gadgetbridge",
}
for sensor in self.sensors:
sensors = [
("daily_steps", "Daily Steps", "steps", "mdi:walk", "total_increasing", None),
("weekly_steps", "Weekly Steps", "steps", "mdi:walk", "total", None),
("battery_level", "Battery", "%", "mdi:battery", None, "battery"),
("heart_rate", "Heart Rate", "bpm", "mdi:heart-pulse", "measurement", None),
("hr_resting", "Resting HR", "bpm", "mdi:heart-pulse", "measurement", None),
("hr_max", "Max HR", "bpm", "mdi:heart-pulse", "measurement", None),
("hr_avg", "Average HR", "bpm", "mdi:heart-pulse", "measurement", None),
("calories", "Calories", "kcal", "mdi:fire", "total_increasing", None),
("sleep_duration", "Sleep Duration", "h", "mdi:sleep", "measurement", None),
("is_awake", "Is Awake", None, "mdi:power-sleep", None, None),
("weight", "Weight", "kg", "mdi:scale-bathroom", "measurement", None),
("server_time", "Last Update", None, "mdi:clock-outline", None, "timestamp"),
]
for sensor_id, name, unit, icon, state_class, device_class in sensors:
config = {
"name": f"{self.device_name.replace('_', ' ').title()} {sensor['name']}",
"unique_id": f"{self.device_name}_{sensor['unique_id']}",
"state_topic": sensor["state_topic"],
"name": f"{self.device_name.replace('_', ' ').title()} {name}",
"unique_id": f"{self.device_name}_{sensor_id}",
"state_topic": f"gadgetbridge/{self.device_name}/{sensor_id}",
"device": device_info,
"icon": icon,
}
if unit:
config["unit_of_measurement"] = unit
if state_class:
config["state_class"] = state_class
if device_class:
config["device_class"] = device_class
# Add optional fields if present
for key in ["unit_of_measurement", "icon", "state_class", "device_class"]:
if key in sensor:
config[key] = sensor[key]
topic = f"homeassistant/sensor/{self.device_name}_{sensor_id}/config"
self.mqtt_client.publish(topic, json.dumps(config), qos=1, retain=True)
await self.publish_home_assistant_discovery(
"sensor", sensor["unique_id"], config
)
logger.info("Published Home Assistant discovery configs")
def query_daily_steps(self, cursor) -> Any:
day_start_ts = self.get_day_start_timestamp()
now_ts = int(datetime.now().timestamp())
def publish_data(self, data):
"""Publish sensor data to MQTT"""
for key, value in data.items():
topic = f"gadgetbridge/{self.device_name}/{key}"
self.mqtt_client.publish(topic, str(value), qos=1)
cursor.execute(
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?",
(day_start_ts, now_ts),
)
return cursor.fetchone()[0] or 0
logger.info(f"Published: steps={data.get('daily_steps', 'N/A')}, "
f"hr={data.get('heart_rate', 'N/A')}, "
f"battery={data.get('battery_level', 'N/A')}%")
def query_weekly_steps(self, cursor) -> Any:
now = datetime.now()
today = now.date()
# Week starts on Monday at 4am
week_start = today - timedelta(days=today.weekday())
week_start_time = datetime.combine(week_start, datetime.min.time()).replace(hour=4)
# If we're before 4am on Monday, the week actually started last Monday
if today.weekday() == 0 and now.hour < 4:
week_start_time -= timedelta(days=7)
week_start_ts = int(week_start_time.timestamp())
now_ts = int(now.timestamp())
cursor.execute(
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?",
(week_start_ts, now_ts),
)
return cursor.fetchone()[0] or 0
def query_monthly_steps(self, cursor) -> Any:
now = datetime.now()
today = now.date()
# Month starts on 1st at 4am
month_start = today.replace(day=1)
month_start_time = datetime.combine(month_start, datetime.min.time()).replace(hour=4)
# If we're before 4am on the 1st, the month actually started last month's 1st
if today.day == 1 and now.hour < 4:
# Go back to previous month
if month_start.month == 1:
month_start_time = month_start_time.replace(year=month_start.year - 1, month=12)
else:
month_start_time = month_start_time.replace(month=month_start.month - 1)
month_start_ts = int(month_start_time.timestamp())
now_ts = int(now.timestamp())
cursor.execute(
"SELECT SUM(STEPS) FROM XIAOMI_ACTIVITY_SAMPLE WHERE TIMESTAMP >= ? AND TIMESTAMP <= ?",
(month_start_ts, now_ts),
)
return cursor.fetchone()[0] or 0
def query_battery_level(self, cursor) -> Any:
cursor.execute(
"SELECT LEVEL FROM BATTERY_LEVEL ORDER BY TIMESTAMP DESC LIMIT 1"
)
row = cursor.fetchone()
return row[0] if row else None
def query_latest_weight(self, cursor) -> Any:
cursor.execute(
"SELECT WEIGHT_KG FROM MI_SCALE_WEIGHT_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1"
)
row = cursor.fetchone()
return row[0] if row else None
def query_latest_heart_rate(self, cursor) -> Any:
cursor.execute(
"SELECT HEART_RATE FROM XIAOMI_ACTIVITY_SAMPLE WHERE HEART_RATE > 0 AND HEART_RATE < 255 ORDER BY TIMESTAMP DESC LIMIT 1"
)
row = cursor.fetchone()
return row[0] if row else None
def query_hr_resting(self, cursor) -> Any:
day_midnight_ts_ms = self.get_day_midnight_timestamp_ms()
cursor.execute(
"SELECT HR_RESTING FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1",
(day_midnight_ts_ms,)
)
row = cursor.fetchone()
return row[0] if row else None
def query_hr_max(self, cursor) -> Any:
day_midnight_ts_ms = self.get_day_midnight_timestamp_ms()
cursor.execute(
"SELECT HR_MAX FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1",
(day_midnight_ts_ms,)
)
row = cursor.fetchone()
return row[0] if row else None
def query_hr_avg(self, cursor) -> Any:
day_midnight_ts_ms = self.get_day_midnight_timestamp_ms()
cursor.execute(
"SELECT HR_AVG FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1",
(day_midnight_ts_ms,)
)
row = cursor.fetchone()
return row[0] if row else None
def query_calories(self, cursor) -> Any:
day_midnight_ts_ms = self.get_day_midnight_timestamp_ms()
cursor.execute(
"SELECT CALORIES FROM XIAOMI_DAILY_SUMMARY_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1",
(day_midnight_ts_ms,)
)
row = cursor.fetchone()
return row[0] if row else None
def query_is_awake(self, cursor) -> Any:
cursor.execute("""SELECT TIMESTAMP, IS_AWAKE, WAKEUP_TIME FROM XIAOMI_SLEEP_TIME_SAMPLE ORDER BY TIMESTAMP DESC LIMIT 1""")
row = cursor.fetchone()
# 1. No data at all -> Assume Awake
if not row:
return True
last_ts_epoch = row[0] // 1000 # Convert from milliseconds to seconds
is_awake_val = row[1] # This can be 1, 0, or None (NULL)
wakeup_time = row[2] # Wakeup time in milliseconds
# 2. Check if WAKEUP_TIME is in the past (user has woken up)
current_time_ms = int(time.time()) * 1000
if wakeup_time and wakeup_time <= current_time_ms:
return True
# 3. Timeout Safety: If last sleep data is older than 12 hours, force Awake
# This handles "Band Removed" or "Sync Failed" scenarios
if (int(time.time()) - last_ts_epoch) > (12 * 3600):
return True
# 4. Explicit Status Check
if is_awake_val == 1:
return True
# If is_awake_val is 0 or None, the user is likely asleep
def process_database(self):
"""Read database and publish data"""
if not self.db_path or not os.path.exists(self.db_path):
logger.warning("No database file found")
return False
def query_total_sleep_duration(self, cursor) -> Any:
# Get sleep from the last 24 hours
day_ago_ts = (int(time.time()) - 24 * 3600) * 1000 # 24 hours ago in milliseconds
cursor.execute(
"SELECT TOTAL_DURATION FROM XIAOMI_SLEEP_TIME_SAMPLE WHERE TIMESTAMP >= ? ORDER BY TIMESTAMP DESC LIMIT 1",
(day_ago_ts,)
)
row = cursor.fetchone()
# Convert minutes to hours, round to 2 decimals
return round(row[0] / 60, 2) if row and row[0] is not None else None
def query_server_time(self, cursor) -> Any:
"""Return current server time in ISO 8601 format for Home Assistant timestamp"""
# .astimezone() adds the system's local timezone offset (e.g., +01:00)
return datetime.now().astimezone().isoformat()
def get_sensor_data(self) -> Dict[str, Any]:
"""Query all sensors and return their values as a dict"""
if not os.path.exists(self.db_path):
self.logger.error(f"Database file not found: {self.db_path}")
return {}
try:
conn = sqlite3.connect(self.db_path, timeout=10.0)
cursor = conn.cursor()
data = {}
for sensor in self.sensors:
try:
data[sensor["unique_id"]] = sensor["query"](cursor)
except Exception as e:
self.logger.error(f"Error querying {sensor['unique_id']}: {e}")
data[sensor["unique_id"]] = None
# Get device name
self.device_name = self.get_device_alias(cursor)
# Query all sensors
data = self.query_sensors(cursor)
conn.close()
return data
if data:
self.publish_data(data)
return True
except Exception as e:
self.logger.error(f"Error querying database: {e}")
return {}
logger.error(f"Database error: {e}")
return False
def run(self):
"""Main loop"""
export_dir = self.config.get("export_dir", GB_EXPORT_DIR)
interval = self.config.get("publish_interval", PUBLISH_INTERVAL)
logger.info(f"Starting Gadgetbridge MQTT Publisher")
logger.info(f"Export directory: {export_dir}")
logger.info(f"Publish interval: {interval}s")
# Ensure export directory exists
os.makedirs(export_dir, exist_ok=True)
# Connect to MQTT
if not self.connect_mqtt():
logger.error("Failed to connect to MQTT. Exiting.")
sys.exit(1)
discovery_published = False
async def publish_sensor_data(self, data: Dict[str, Any]):
"""Publish all sensor data to MQTT asynchronously"""
for sensor in self.sensors:
value = data.get(sensor["unique_id"])
if value is not None:
try:
await self.mqtt_client.publish(
sensor["state_topic"], str(value), qos=1
)
except Exception as e:
self.logger.error(f"Failed to publish {sensor['unique_id']}: {e}")
raise
self.logger.info(f"Published sensor data: {data}")
async def run_main_loop(self):
"""Main execution loop with error recovery"""
while True:
try:
self.logger.info("Attempting MQTT connection...")
async with aiomqtt.Client(
hostname=self.mqtt_config["broker"],
port=self.mqtt_config["port"],
username=self.mqtt_config["username"] or None,
password=self.mqtt_config["password"] or None,
) as client:
self.mqtt_client = client
self.logger.info("MQTT connection successful")
current_time = time.time()
await self.setup_home_assistant_entities()
# Check if it's time to trigger sync and publish
if current_time - self.last_publish_time >= interval:
logger.info("Triggering Gadgetbridge sync...")
trigger_gadgetbridge_sync()
# Publish immediately on startup
sensor_data = self.get_sensor_data()
await self.publish_sensor_data(sensor_data)
self.logger.info(f"Next publish in {self.publish_interval} seconds...")
# Wait for export to complete
time.sleep(10)
# Main publishing loop
while True:
await asyncio.sleep(self.publish_interval)
sensor_data = self.get_sensor_data()
await self.publish_sensor_data(sensor_data)
self.logger.info(f"Next publish in {self.publish_interval} seconds...")
# Find latest database
self.db_path = find_latest_db(export_dir)
except Exception as e:
self.logger.error(f"Error in main loop: {e}")
self.logger.info(f"Restarting main loop in {self.retry_delay} seconds...")
await asyncio.sleep(self.retry_delay)
async def run(self):
"""Main execution method (async) - now with proper error recovery"""
self.logger.info("Starting Gadgetbridge MQTT Publisher")
# Initial database check with wait loop
if not os.path.exists(self.db_path):
self.logger.error(f"Database file not found: {self.db_path}")
self.logger.info("Waiting for database to become available...")
while not os.path.exists(self.db_path):
await asyncio.sleep(30)
self.logger.info("Still waiting for database...")
self.logger.info("Database file found, continuing...")
# Try to get proper device name now that DB might be available
try:
new_device_name = self.get_device_alias()
if new_device_name != self.device_name:
self.logger.info(f"Updating device name from {self.device_name} to {new_device_name}")
self.device_name = new_device_name
self.initialize_sensors() # Reinitialize with correct device name
except Exception as e:
self.logger.warning(f"Could not update device alias: {e}, using fallback")
# Run main loop with recovery
await self.run_main_loop()
def get_device_alias(self) -> str:
"""Fetch ALIAS from DEVICE table for device_name where NAME contains 'band' or 'watch' (case-insensitive)"""
if not os.path.exists(self.db_path):
self.logger.warning(f"Database file not found: {self.db_path}")
return "fitness_tracker"
if self.db_path:
logger.info(f"Using database: {os.path.basename(self.db_path)}")
# Publish discovery on first successful read
if not discovery_published:
# Need to read device name first
try:
conn = sqlite3.connect(self.db_path, timeout=5.0)
cursor = conn.cursor()
cursor.execute(
"""
SELECT ALIAS FROM DEVICE
WHERE LOWER(NAME) LIKE '%band%' OR LOWER(NAME) LIKE '%watch%'
LIMIT 1
"""
)
row = cursor.fetchone()
self.device_name = self.get_device_alias(cursor)
conn.close()
except:
pass
self.publish_discovery()
discovery_published = True
if row and row[0]:
# Sanitize alias for MQTT topics
return re.sub(r"\W+", "_", row[0]).lower()
self.process_database()
else:
return "fitness_tracker"
logger.warning(f"No database found in {export_dir}")
except Exception as e:
self.logger.error(f"Error fetching device alias: {e}")
return "fitness_tracker"
self.last_publish_time = current_time
logger.info(f"Next publish in {interval}s...")
time.sleep(10) # Check every 10 seconds
except KeyboardInterrupt:
logger.info("Shutting down...")
finally:
self.disconnect_mqtt()
def main():
config = load_config()
publisher = GadgetbridgeMQTT(config)
publisher.run()
# --- Main Entry Point ---
if __name__ == "__main__":
publisher = GadgetbridgeMQTTPublisher()
asyncio.run(publisher.run())
main()

227
setup.py Normal file
View File

@ -0,0 +1,227 @@
#!/usr/bin/env python3
"""
Gadgetbridge MQTT Setup Script for Termux
Configures MQTT connection and creates autostart script
"""
import os
import json
import sys
CONFIG_DIR = os.path.expanduser("~/.config/gadgetbridge_mqtt")
CONFIG_FILE = os.path.join(CONFIG_DIR, "config.json")
SCRIPTS_DIR = os.path.expanduser("~/scripts")
BOOT_DIR = os.path.expanduser("~/.termux/boot")
DEFAULT_EXPORT_DIR = "/storage/emulated/0/Documents/GB_Export"
DEFAULT_INTERVAL = 300
def print_banner():
print("\n" + "=" * 50)
print(" Gadgetbridge MQTT Setup for Termux")
print("=" * 50 + "\n")
def get_input(prompt, default=None, required=True):
"""Get user input with optional default value"""
if default:
prompt = f"{prompt} [{default}]: "
else:
prompt = f"{prompt}: "
value = input(prompt).strip()
if not value and default:
return default
if not value and required:
print("This field is required!")
return get_input(prompt.replace(f" [{default}]", "").replace(": ", ""), default, required)
return value
def setup_mqtt():
"""Configure MQTT settings"""
print("=== MQTT Configuration ===\n")
config = {}
config["mqtt_broker"] = get_input("MQTT Broker IP/hostname")
config["mqtt_port"] = int(get_input("MQTT Port", "1883"))
config["mqtt_username"] = get_input("MQTT Username (leave empty if none)", "", required=False)
config["mqtt_password"] = get_input("MQTT Password (leave empty if none)", "", required=False)
print("\n=== Gadgetbridge Settings ===\n")
config["export_dir"] = get_input("Gadgetbridge Export Directory", DEFAULT_EXPORT_DIR)
config["publish_interval"] = int(get_input("Publish Interval (seconds)", str(DEFAULT_INTERVAL)))
return config
def save_config(config):
"""Save configuration to file"""
os.makedirs(CONFIG_DIR, exist_ok=True)
with open(CONFIG_FILE, "w") as f:
json.dump(config, f, indent=2)
print(f"\n✓ Config saved to: {CONFIG_FILE}")
def setup_directories():
"""Create necessary directories"""
os.makedirs(SCRIPTS_DIR, exist_ok=True)
os.makedirs(BOOT_DIR, exist_ok=True)
print("✓ Directories created")
def download_main_script():
"""Download or copy main.py to scripts directory"""
main_script = os.path.join(SCRIPTS_DIR, "gadgetbridge_mqtt.py")
# Check if we're running from the repo
script_dir = os.path.dirname(os.path.abspath(__file__))
local_main = os.path.join(script_dir, "main.py")
if os.path.exists(local_main):
# Copy from local
with open(local_main, "r") as src:
content = src.read()
with open(main_script, "w") as dst:
dst.write(content)
print(f"✓ Copied main script to: {main_script}")
else:
# Download from repo
try:
import urllib.request
url = "https://git.olli.info/Oliver/GadgetbridgeMqtt/raw/branch/main/main.py"
urllib.request.urlretrieve(url, main_script)
print(f"✓ Downloaded main script to: {main_script}")
except Exception as e:
print(f"✗ Failed to download main script: {e}")
print(" Please manually copy main.py to ~/scripts/gadgetbridge_mqtt.py")
return False
return True
def create_autostart():
"""Create Termux:Boot autostart script"""
boot_script = os.path.join(BOOT_DIR, "start_gb_mqtt")
content = """#!/data/data/com.termux/files/usr/bin/sh
# Gadgetbridge MQTT Autostart Script
# Prevent device from sleeping
termux-wake-lock
# Wait for system to fully boot
sleep 30
# Start the MQTT publisher in background
cd ~
python ~/scripts/gadgetbridge_mqtt.py >> ~/gb_mqtt.log 2>&1 &
"""
with open(boot_script, "w") as f:
f.write(content)
os.chmod(boot_script, 0o755)
print(f"✓ Autostart script created: {boot_script}")
def install_dependencies():
"""Install required Python packages"""
print("\n=== Installing Dependencies ===\n")
try:
os.system("pip install paho-mqtt")
print("✓ paho-mqtt installed")
except Exception as e:
print(f"✗ Failed to install paho-mqtt: {e}")
print(" Run manually: pip install paho-mqtt")
def print_gadgetbridge_instructions(export_dir):
"""Print Gadgetbridge configuration instructions"""
print("\n" + "=" * 50)
print(" Gadgetbridge Configuration")
print("=" * 50)
print(f"""
1. Open Gadgetbridge app
2. Go to Settings Auto Export
- Enable "Auto Export"
- Set Location to: {export_dir}
- Set interval as desired (or leave for manual trigger)
3. The script will automatically trigger:
- ACTIVITY_SYNC (fetch data from band)
- TRIGGER_EXPORT (export database)
4. Grant Termux permissions:
- Run: termux-setup-storage
- Allow storage access when prompted
""")
def print_final_instructions():
"""Print final setup instructions"""
print("\n" + "=" * 50)
print(" Setup Complete!")
print("=" * 50)
print("""
To start manually:
python ~/scripts/gadgetbridge_mqtt.py
To view logs:
tail -f ~/gb_mqtt.log
The script will auto-start on boot via Termux:Boot.
Make sure to:
1. Open Termux:Boot app once to enable autostart
2. Configure Gadgetbridge export location (see above)
3. Reboot phone to test autostart
""")
def main():
print_banner()
# Check if config exists
if os.path.exists(CONFIG_FILE):
print(f"Existing config found: {CONFIG_FILE}")
overwrite = input("Overwrite? (y/N): ").strip().lower()
if overwrite != "y":
print("Setup cancelled.")
sys.exit(0)
# Setup MQTT
config = setup_mqtt()
# Save config
save_config(config)
# Setup directories
setup_directories()
# Install dependencies
install_dependencies()
# Download/copy main script
download_main_script()
# Create autostart
create_autostart()
# Print instructions
print_gadgetbridge_instructions(config["export_dir"])
print_final_instructions()
if __name__ == "__main__":
main()

View File

@ -1,75 +0,0 @@
import sqlite3, os, time, logging, sys
# --- Config ---
WATCH_DIR = "/storage/emulated/0/Documents/GB_Raw"
SYNC_DIR = "/storage/emulated/0/Documents/GB_Sync"
MAX_DAYS = 90
KEEP_TABLES = [
"USER", "DEVICE", "DEVICE_ATTRIBUTES", "BATTERY_LEVEL",
"XIAOMI_ACTIVITY_SAMPLE", "XIAOMI_SLEEP_TIME_SAMPLE",
"XIAOMI_DAILY_SUMMARY_SAMPLE", "MI_BAND_ACTIVITY_SAMPLE"
]
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', stream=sys.stdout)
def shrink_db(src_path, dst_path):
if os.path.getsize(src_path) < 1024: return
tmp_path = dst_path + ".tmp"
if os.path.exists(tmp_path): os.remove(tmp_path)
try:
con = sqlite3.connect(tmp_path)
con.execute("PRAGMA journal_mode=OFF")
con.execute("PRAGMA synchronous=0")
con.execute(f"ATTACH DATABASE '{src_path}' AS src")
tables = [r[0] for r in con.execute("SELECT name FROM src.sqlite_master WHERE type='table'").fetchall()]
for tbl in KEEP_TABLES:
if tbl not in tables: continue
# Copy Schema
sql = con.execute(f"SELECT sql FROM src.sqlite_master WHERE name='{tbl}'").fetchone()[0]
con.execute(sql)
# Copy Data
if tbl == "XIAOMI_ACTIVITY_SAMPLE" and MAX_DAYS > 0:
cutoff = int(time.time()) - (MAX_DAYS * 86400)
con.execute(f"INSERT INTO {tbl} SELECT * FROM src.{tbl} WHERE TIMESTAMP >= {cutoff}")
else:
con.execute(f"INSERT INTO {tbl} SELECT * FROM src.{tbl}")
con.commit()
con.close()
os.rename(tmp_path, dst_path)
logging.info(f"Exported: {os.path.basename(dst_path)} ({os.path.getsize(dst_path)>>10} KB)")
except Exception as e:
logging.error(f"Failed {src_path}: {e}")
if os.path.exists(tmp_path): os.remove(tmp_path)
def main():
os.makedirs(SYNC_DIR, exist_ok=True)
logging.info(f"Watching {WATCH_DIR}")
processed_files = {}
while True:
try:
for f in os.listdir(WATCH_DIR):
if "Gadgetbridge" in f and not f.endswith((".wal", ".shm", ".tmp")):
path = os.path.join(WATCH_DIR, f)
mtime = os.path.getmtime(path)
if path not in processed_files or processed_files[path] != mtime:
if (time.time() - mtime) > 5:
logging.info(f"Processing {f}...")
out_name = f"GB_Small_{int(time.time())}.db"
shrink_db(path, os.path.join(SYNC_DIR, out_name))
processed_files[path] = mtime
time.sleep(10)
except KeyboardInterrupt: break
except Exception as e: logging.error(e)
if __name__ == "__main__":
main()
EOF