#!/usr/bin/env python3 """ Health check script for Gadgetbridge MQTT integration """ import os import sqlite3 import socket import sys import time def check_database(): """Check if Gadgetbridge database is accessible""" db_path = os.getenv("GADGETBRIDGE_DB_PATH", "/data/Gadgetbridge.db") if not os.path.exists(db_path): print(f"Database file not found: {db_path}") return False try: # Use timeout to prevent hanging conn = sqlite3.connect(db_path, timeout=5.0) cursor = conn.cursor() cursor.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='XIAOMI_ACTIVITY_SAMPLE'" ) result = cursor.fetchone() conn.close() return result is not None except Exception as e: print(f"Database check failed: {e}") return False def check_mqtt_connection(): """Check MQTT broker TCP connectivity (no paho)""" host = os.getenv("MQTT_BROKER", "localhost") port = int(os.getenv("MQTT_PORT", "1883")) try: with socket.create_connection((host, port), timeout=10): return True except Exception as e: print(f"MQTT connection check failed: {e}") return False def check_main_process(): """Check if main process is likely running by checking for Python processes""" try: import psutil for proc in psutil.process_iter(['pid', 'name', 'cmdline']): if proc.info['name'] == 'python' and proc.info['cmdline']: if 'main.py' in ' '.join(proc.info['cmdline']): return True except ImportError: # psutil not available, skip this check pass except Exception as e: print(f"Process check failed: {e}") return True # Assume OK if we can't check def main(): print("Starting health check...") # Add startup grace period startup_grace = int(os.getenv("HEALTHCHECK_STARTUP_GRACE", "0")) if startup_grace > 0: print(f"Waiting {startup_grace} seconds for startup grace period...") time.sleep(startup_grace) db_ok = check_database() mqtt_ok = check_mqtt_connection() process_ok = check_main_process() print(f"Health check results - DB: {db_ok}, MQTT: {mqtt_ok}, Process: {process_ok}") # Only fail if both DB and MQTT are down (more lenient) if db_ok or mqtt_ok: print("Health check passed") sys.exit(0) else: print("Health check failed - both DB and MQTT unavailable") sys.exit(1) if __name__ == "__main__": main()