# Bjorn.py # Main entry point and supervisor for the Bjorn project # Manages lifecycle of threads, health monitoring, and crash protection. # OPTIMIZED FOR PI ZERO 2: Low CPU overhead, aggressive RAM management. import logging import os import signal import subprocess import sys import threading import time import gc import tracemalloc import atexit from comment import Commentaireia from display import Display, handle_exit_display from init_shared import shared_data from logger import Logger from orchestrator import Orchestrator from runtime_state_updater import RuntimeStateUpdater from webapp import web_thread logger = Logger(name="Bjorn.py", level=logging.DEBUG) _shutdown_lock = threading.Lock() _shutdown_started = False _instance_lock_fd = None _instance_lock_path = "/tmp/bjorn_160226.lock" try: import fcntl except Exception: fcntl = None def _release_instance_lock(): global _instance_lock_fd if _instance_lock_fd is None: return try: if fcntl is not None: try: fcntl.flock(_instance_lock_fd.fileno(), fcntl.LOCK_UN) except Exception: pass _instance_lock_fd.close() except Exception: pass _instance_lock_fd = None def _acquire_instance_lock() -> bool: """Ensure only one Bjorn_160226 process can run at once.""" global _instance_lock_fd if _instance_lock_fd is not None: return True try: fd = open(_instance_lock_path, "a+", encoding="utf-8") except Exception as exc: logger.error(f"Unable to open instance lock file {_instance_lock_path}: {exc}") return True if fcntl is None: _instance_lock_fd = fd return True try: fcntl.flock(fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) fd.seek(0) fd.truncate() fd.write(str(os.getpid())) fd.flush() except OSError: try: fd.seek(0) owner_pid = fd.read().strip() or "unknown" except Exception: owner_pid = "unknown" logger.critical(f"Another Bjorn instance is already running (pid={owner_pid}).") try: fd.close() except Exception: pass return False _instance_lock_fd = fd return True class HealthMonitor(threading.Thread): """Periodic runtime health logger (threads/fd/rss/queue/epd metrics).""" def __init__(self, shared_data_, interval_s: int = 60): super().__init__(daemon=True, name="HealthMonitor") self.shared_data = shared_data_ self.interval_s = max(10, int(interval_s)) self._stop_event = threading.Event() self._tm_prev_snapshot = None self._tm_last_report = 0.0 def stop(self): self._stop_event.set() def _fd_count(self) -> int: try: return len(os.listdir("/proc/self/fd")) except Exception: return -1 def _rss_kb(self) -> int: try: with open("/proc/self/status", "r", encoding="utf-8") as fh: for line in fh: if line.startswith("VmRSS:"): parts = line.split() if len(parts) >= 2: return int(parts[1]) except Exception: pass return -1 def _queue_counts(self): pending = running = scheduled = -1 try: # Using query_one safe method from database row = self.shared_data.db.query_one( """ SELECT SUM(CASE WHEN status='pending' THEN 1 ELSE 0 END) AS pending, SUM(CASE WHEN status='running' THEN 1 ELSE 0 END) AS running, SUM(CASE WHEN status='scheduled' THEN 1 ELSE 0 END) AS scheduled FROM action_queue """ ) if row: pending = int(row.get("pending") or 0) running = int(row.get("running") or 0) scheduled = int(row.get("scheduled") or 0) except Exception as exc: logger.error_throttled( f"Health monitor queue count query failed: {exc}", key="health_queue_counts", interval_s=120, ) return pending, running, scheduled def run(self): while not self._stop_event.wait(self.interval_s): try: threads = threading.enumerate() thread_count = len(threads) top_threads = ",".join(t.name for t in threads[:8]) fd_count = self._fd_count() rss_kb = self._rss_kb() pending, running, scheduled = self._queue_counts() # Lock to safely read shared metrics without race conditions with self.shared_data.health_lock: display_metrics = dict(getattr(self.shared_data, "display_runtime_metrics", {}) or {}) epd_enabled = int(display_metrics.get("epd_enabled", 0)) epd_failures = int(display_metrics.get("failed_updates", 0)) epd_reinit = int(display_metrics.get("reinit_attempts", 0)) epd_headless = int(display_metrics.get("headless", 0)) epd_last_success = display_metrics.get("last_success_epoch", 0) logger.info( "health " f"thread_count={thread_count} " f"rss_kb={rss_kb} " f"queue_pending={pending} " f"epd_failures={epd_failures} " f"epd_reinit={epd_reinit} " ) # Optional: tracemalloc report (only if enabled via PYTHONTRACEMALLOC or tracemalloc.start()). try: if tracemalloc.is_tracing(): now = time.monotonic() tm_interval = float(self.shared_data.config.get("tracemalloc_report_interval_s", 300) or 300) if tm_interval > 0 and (now - self._tm_last_report) >= tm_interval: self._tm_last_report = now top_n = int(self.shared_data.config.get("tracemalloc_top_n", 10) or 10) top_n = max(3, min(top_n, 25)) snap = tracemalloc.take_snapshot() if self._tm_prev_snapshot is not None: stats = snap.compare_to(self._tm_prev_snapshot, "lineno")[:top_n] logger.info(f"mem_top (tracemalloc diff, top_n={top_n})") for st in stats: logger.info(f"mem_top {st}") else: stats = snap.statistics("lineno")[:top_n] logger.info(f"mem_top (tracemalloc, top_n={top_n})") for st in stats: logger.info(f"mem_top {st}") self._tm_prev_snapshot = snap except Exception as exc: logger.error_throttled( f"Health monitor tracemalloc failure: {exc}", key="health_tracemalloc_error", interval_s=300, ) except Exception as exc: logger.error_throttled( f"Health monitor loop failure: {exc}", key="health_loop_error", interval_s=120, ) class Bjorn: """Main class for Bjorn. Manages orchestration lifecycle.""" def __init__(self, shared_data_): self.shared_data = shared_data_ self.commentaire_ia = Commentaireia() self.orchestrator_thread = None self.orchestrator = None self.network_connected = False self.wifi_connected = False self.previous_network_connected = None self._orch_lock = threading.Lock() self._last_net_check = 0 # Throttling for network scan self._last_orch_stop_attempt = 0.0 def run(self): """Main loop for Bjorn. Waits for network and starts/stops Orchestrator based on mode.""" if hasattr(self.shared_data, "startup_delay") and self.shared_data.startup_delay > 0: logger.info(f"Waiting for startup delay: {self.shared_data.startup_delay} seconds") time.sleep(self.shared_data.startup_delay) backoff_s = 1.0 while not self.shared_data.should_exit: try: # Manual mode must stop orchestration so the user keeps full control. if self.shared_data.operation_mode == "MANUAL": # Avoid spamming stop requests if already stopped. if self.orchestrator_thread is not None and self.orchestrator_thread.is_alive(): self.stop_orchestrator() else: self.check_and_start_orchestrator() time.sleep(5) backoff_s = 1.0 # Reset backoff on success except Exception as exc: logger.error(f"Bjorn main loop error: {exc}") logger.error_throttled( "Bjorn main loop entering backoff due to repeated errors", key="bjorn_main_loop_backoff", interval_s=60, ) time.sleep(backoff_s) backoff_s = min(backoff_s * 2.0, 30.0) def check_and_start_orchestrator(self): if self.shared_data.operation_mode == "MANUAL": return if self.is_network_connected(): self.wifi_connected = True if self.orchestrator_thread is None or not self.orchestrator_thread.is_alive(): self.start_orchestrator() else: self.wifi_connected = False logger.info_throttled( "Waiting for network connection to start Orchestrator...", key="bjorn_wait_network", interval_s=30, ) def start_orchestrator(self): with self._orch_lock: # Re-check network inside lock if not self.network_connected: return if self.orchestrator_thread is not None and self.orchestrator_thread.is_alive(): logger.debug("Orchestrator thread is already running.") return logger.info("Starting Orchestrator thread...") self.shared_data.orchestrator_should_exit = False self.orchestrator = Orchestrator() self.orchestrator_thread = threading.Thread( target=self.orchestrator.run, daemon=True, name="OrchestratorMain", ) self.orchestrator_thread.start() logger.info("Orchestrator thread started.") def stop_orchestrator(self): with self._orch_lock: thread = self.orchestrator_thread if thread is None or not thread.is_alive(): self.orchestrator_thread = None self.orchestrator = None return # Keep MANUAL sticky so supervisor does not auto-restart orchestration. try: self.shared_data.operation_mode = "MANUAL" except Exception: pass now = time.time() if now - self._last_orch_stop_attempt >= 10.0: logger.info("Stop requested: stopping Orchestrator") self._last_orch_stop_attempt = now self.shared_data.orchestrator_should_exit = True self.shared_data.queue_event.set() # Wake up thread thread.join(timeout=10.0) if thread.is_alive(): logger.warning_throttled( "Orchestrator thread did not stop gracefully", key="orch_stop_not_graceful", interval_s=20, ) return self.orchestrator_thread = None self.orchestrator = None self.shared_data.bjorn_orch_status = "IDLE" self.shared_data.bjorn_status_text2 = "" def is_network_connected(self): """Checks for network connectivity with throttling and low-CPU checks.""" now = time.time() # Throttling: Do not scan more than once every 10 seconds if now - self._last_net_check < 10: return self.network_connected self._last_net_check = now def interface_has_ip(interface_name): try: # OPTIMIZATION: Check /sys/class/net first to avoid spawning subprocess if interface doesn't exist if not os.path.exists(f"/sys/class/net/{interface_name}"): return False # Check for IP address result = subprocess.run( ["ip", "-4", "addr", "show", interface_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=2, ) if result.returncode != 0: return False return "inet " in result.stdout except Exception: return False eth_connected = interface_has_ip("eth0") wifi_connected = interface_has_ip("wlan0") self.network_connected = eth_connected or wifi_connected if self.network_connected != self.previous_network_connected: if self.network_connected: logger.info(f"Network status changed: Connected (eth0={eth_connected}, wlan0={wifi_connected})") else: logger.warning("Network status changed: Connection lost") self.previous_network_connected = self.network_connected return self.network_connected @staticmethod def start_display(old_display=None): # Ensure the previous Display's controller is fully stopped to release frames if old_display is not None: try: old_display.display_controller.stop(timeout=3.0) except Exception: pass display = Display(shared_data) display_thread = threading.Thread( target=display.run, daemon=True, name="DisplayMain", ) display_thread.start() return display_thread, display def _request_shutdown(): """Signals all threads to stop.""" shared_data.should_exit = True shared_data.orchestrator_should_exit = True shared_data.display_should_exit = True shared_data.webapp_should_exit = True shared_data.queue_event.set() def handle_exit( sig, frame, display_thread, bjorn_thread, web_thread_obj, health_thread=None, runtime_state_thread=None, from_signal=False, ): global _shutdown_started with _shutdown_lock: if _shutdown_started: if from_signal: logger.warning("Forcing exit (SIGINT/SIGTERM received twice)") os._exit(130) return _shutdown_started = True logger.info(f"Shutdown signal received: {sig}") _request_shutdown() # 1. Stop Display (handles EPD cleanup) try: handle_exit_display(sig, frame, display_thread) except Exception: pass # 2. Stop Health Monitor try: if health_thread and hasattr(health_thread, "stop"): health_thread.stop() except Exception: pass # 2b. Stop Runtime State Updater try: if runtime_state_thread and hasattr(runtime_state_thread, "stop"): runtime_state_thread.stop() except Exception: pass # 3. Stop Web Server try: if web_thread_obj and hasattr(web_thread_obj, "shutdown"): web_thread_obj.shutdown() except Exception: pass # 4. Join all threads for thread in (display_thread, bjorn_thread, web_thread_obj, health_thread, runtime_state_thread): try: if thread and thread.is_alive(): thread.join(timeout=5.0) except Exception: pass # 5. Close Database (Prevent corruption) try: if hasattr(shared_data, "db") and hasattr(shared_data.db, "close"): shared_data.db.close() except Exception as exc: logger.error(f"Database shutdown error: {exc}") logger.info("Bjorn stopped. Clean exit.") _release_instance_lock() if from_signal: sys.exit(0) def _install_thread_excepthook(): def _hook(args): logger.error(f"Unhandled thread exception: {args.thread.name} - {args.exc_type.__name__}: {args.exc_value}") # We don't force shutdown here to avoid killing the app on minor thread glitches, # unless it's critical. The Crash Shield will handle restarts. threading.excepthook = _hook if __name__ == "__main__": if not _acquire_instance_lock(): sys.exit(1) atexit.register(_release_instance_lock) _install_thread_excepthook() display_thread = None display_instance = None bjorn_thread = None health_thread = None runtime_state_thread = None last_gc_time = time.time() try: logger.info("Bjorn Startup: Loading config...") shared_data.load_config() logger.info("Starting Runtime State Updater...") runtime_state_thread = RuntimeStateUpdater(shared_data) runtime_state_thread.start() logger.info("Starting Display...") shared_data.display_should_exit = False display_thread, display_instance = Bjorn.start_display() logger.info("Starting Bjorn Core...") bjorn = Bjorn(shared_data) shared_data.bjorn_instance = bjorn bjorn_thread = threading.Thread(target=bjorn.run, daemon=True, name="BjornMain") bjorn_thread.start() if shared_data.config.get("websrv", False): logger.info("Starting Web Server...") if not web_thread.is_alive(): web_thread.start() health_interval = int(shared_data.config.get("health_log_interval", 60)) health_thread = HealthMonitor(shared_data, interval_s=health_interval) health_thread.start() # Signal Handlers exit_handler = lambda s, f: handle_exit( s, f, display_thread, bjorn_thread, web_thread, health_thread, runtime_state_thread, True, ) signal.signal(signal.SIGINT, exit_handler) signal.signal(signal.SIGTERM, exit_handler) # --- SUPERVISOR LOOP (Crash Shield) --- restart_times = [] max_restarts = 5 restart_window_s = 300 logger.info("Bjorn Supervisor running.") while not shared_data.should_exit: time.sleep(2) # CPU Friendly polling now = time.time() # --- OPTIMIZATION: Periodic Garbage Collection --- # Forces cleanup of circular references and free RAM every 2 mins if now - last_gc_time > 120: gc.collect() last_gc_time = now logger.debug("System: Forced Garbage Collection executed.") # --- CRASH SHIELD: Bjorn Thread --- if bjorn_thread and not bjorn_thread.is_alive() and not shared_data.should_exit: restart_times = [t for t in restart_times if (now - t) <= restart_window_s] restart_times.append(now) if len(restart_times) <= max_restarts: logger.warning("Crash Shield: Restarting Bjorn Main Thread") bjorn_thread = threading.Thread(target=bjorn.run, daemon=True, name="BjornMain") bjorn_thread.start() else: logger.critical("Crash Shield: Bjorn exceeded restart budget. Shutting down.") _request_shutdown() break # --- CRASH SHIELD: Display Thread --- if display_thread and not display_thread.is_alive() and not shared_data.should_exit: restart_times = [t for t in restart_times if (now - t) <= restart_window_s] restart_times.append(now) if len(restart_times) <= max_restarts: logger.warning("Crash Shield: Restarting Display Thread") display_thread, display_instance = Bjorn.start_display(old_display=display_instance) else: logger.critical("Crash Shield: Display exceeded restart budget. Shutting down.") _request_shutdown() break # --- CRASH SHIELD: Runtime State Updater --- if runtime_state_thread and not runtime_state_thread.is_alive() and not shared_data.should_exit: restart_times = [t for t in restart_times if (now - t) <= restart_window_s] restart_times.append(now) if len(restart_times) <= max_restarts: logger.warning("Crash Shield: Restarting Runtime State Updater") runtime_state_thread = RuntimeStateUpdater(shared_data) runtime_state_thread.start() else: logger.critical("Crash Shield: Runtime State Updater exceeded restart budget. Shutting down.") _request_shutdown() break # Exit cleanup if health_thread: health_thread.stop() if runtime_state_thread: runtime_state_thread.stop() handle_exit( signal.SIGTERM, None, display_thread, bjorn_thread, web_thread, health_thread, runtime_state_thread, False, ) except Exception as exc: logger.critical(f"Critical bootstrap failure: {exc}") _request_shutdown() # Try to clean up anyway try: handle_exit( signal.SIGTERM, None, display_thread, bjorn_thread, web_thread, health_thread, runtime_state_thread, False, ) except: pass sys.exit(1)