Files
Bjorn/Bjorn.py
Fabien POLLY eb20b168a6 Add RLUtils class for managing RL/AI dashboard endpoints
- Implemented methods for fetching AI stats, training history, and recent experiences.
- Added functionality to set operation mode (MANUAL, AUTO, AI) with appropriate handling.
- Included helper methods for querying the database and sending JSON responses.
- Integrated model metadata extraction for visualization purposes.
2026-02-18 22:36:10 +01:00

626 lines
22 KiB
Python

# Bjorn.py
# Main entry point and supervisor for the Bjorn project
# Manages lifecycle of threads, health monitoring, and crash protection.
# OPTIMIZED FOR PI ZERO 2: Low CPU overhead, aggressive RAM management.
import logging
import os
import signal
import subprocess
import sys
import threading
import time
import gc
import tracemalloc
import atexit
from comment import Commentaireia
from display import Display, handle_exit_display
from init_shared import shared_data
from logger import Logger
from orchestrator import Orchestrator
from runtime_state_updater import RuntimeStateUpdater
from webapp import web_thread
logger = Logger(name="Bjorn.py", level=logging.DEBUG)
_shutdown_lock = threading.Lock()
_shutdown_started = False
_instance_lock_fd = None
_instance_lock_path = "/tmp/bjorn_160226.lock"
try:
import fcntl
except Exception:
fcntl = None
def _release_instance_lock():
global _instance_lock_fd
if _instance_lock_fd is None:
return
try:
if fcntl is not None:
try:
fcntl.flock(_instance_lock_fd.fileno(), fcntl.LOCK_UN)
except Exception:
pass
_instance_lock_fd.close()
except Exception:
pass
_instance_lock_fd = None
def _acquire_instance_lock() -> bool:
"""Ensure only one Bjorn_160226 process can run at once."""
global _instance_lock_fd
if _instance_lock_fd is not None:
return True
try:
fd = open(_instance_lock_path, "a+", encoding="utf-8")
except Exception as exc:
logger.error(f"Unable to open instance lock file {_instance_lock_path}: {exc}")
return True
if fcntl is None:
_instance_lock_fd = fd
return True
try:
fcntl.flock(fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
fd.seek(0)
fd.truncate()
fd.write(str(os.getpid()))
fd.flush()
except OSError:
try:
fd.seek(0)
owner_pid = fd.read().strip() or "unknown"
except Exception:
owner_pid = "unknown"
logger.critical(f"Another Bjorn instance is already running (pid={owner_pid}).")
try:
fd.close()
except Exception:
pass
return False
_instance_lock_fd = fd
return True
class HealthMonitor(threading.Thread):
"""Periodic runtime health logger (threads/fd/rss/queue/epd metrics)."""
def __init__(self, shared_data_, interval_s: int = 60):
super().__init__(daemon=True, name="HealthMonitor")
self.shared_data = shared_data_
self.interval_s = max(10, int(interval_s))
self._stop_event = threading.Event()
self._tm_prev_snapshot = None
self._tm_last_report = 0.0
def stop(self):
self._stop_event.set()
def _fd_count(self) -> int:
try:
return len(os.listdir("/proc/self/fd"))
except Exception:
return -1
def _rss_kb(self) -> int:
try:
with open("/proc/self/status", "r", encoding="utf-8") as fh:
for line in fh:
if line.startswith("VmRSS:"):
parts = line.split()
if len(parts) >= 2:
return int(parts[1])
except Exception:
pass
return -1
def _queue_counts(self):
pending = running = scheduled = -1
try:
# Using query_one safe method from database
row = self.shared_data.db.query_one(
"""
SELECT
SUM(CASE WHEN status='pending' THEN 1 ELSE 0 END) AS pending,
SUM(CASE WHEN status='running' THEN 1 ELSE 0 END) AS running,
SUM(CASE WHEN status='scheduled' THEN 1 ELSE 0 END) AS scheduled
FROM action_queue
"""
)
if row:
pending = int(row.get("pending") or 0)
running = int(row.get("running") or 0)
scheduled = int(row.get("scheduled") or 0)
except Exception as exc:
logger.error_throttled(
f"Health monitor queue count query failed: {exc}",
key="health_queue_counts",
interval_s=120,
)
return pending, running, scheduled
def run(self):
while not self._stop_event.wait(self.interval_s):
try:
threads = threading.enumerate()
thread_count = len(threads)
top_threads = ",".join(t.name for t in threads[:8])
fd_count = self._fd_count()
rss_kb = self._rss_kb()
pending, running, scheduled = self._queue_counts()
# Lock to safely read shared metrics without race conditions
with self.shared_data.health_lock:
display_metrics = dict(getattr(self.shared_data, "display_runtime_metrics", {}) or {})
epd_enabled = int(display_metrics.get("epd_enabled", 0))
epd_failures = int(display_metrics.get("failed_updates", 0))
epd_reinit = int(display_metrics.get("reinit_attempts", 0))
epd_headless = int(display_metrics.get("headless", 0))
epd_last_success = display_metrics.get("last_success_epoch", 0)
logger.info(
"health "
f"thread_count={thread_count} "
f"rss_kb={rss_kb} "
f"queue_pending={pending} "
f"epd_failures={epd_failures} "
f"epd_reinit={epd_reinit} "
)
# Optional: tracemalloc report (only if enabled via PYTHONTRACEMALLOC or tracemalloc.start()).
try:
if tracemalloc.is_tracing():
now = time.monotonic()
tm_interval = float(self.shared_data.config.get("tracemalloc_report_interval_s", 300) or 300)
if tm_interval > 0 and (now - self._tm_last_report) >= tm_interval:
self._tm_last_report = now
top_n = int(self.shared_data.config.get("tracemalloc_top_n", 10) or 10)
top_n = max(3, min(top_n, 25))
snap = tracemalloc.take_snapshot()
if self._tm_prev_snapshot is not None:
stats = snap.compare_to(self._tm_prev_snapshot, "lineno")[:top_n]
logger.info(f"mem_top (tracemalloc diff, top_n={top_n})")
for st in stats:
logger.info(f"mem_top {st}")
else:
stats = snap.statistics("lineno")[:top_n]
logger.info(f"mem_top (tracemalloc, top_n={top_n})")
for st in stats:
logger.info(f"mem_top {st}")
self._tm_prev_snapshot = snap
except Exception as exc:
logger.error_throttled(
f"Health monitor tracemalloc failure: {exc}",
key="health_tracemalloc_error",
interval_s=300,
)
except Exception as exc:
logger.error_throttled(
f"Health monitor loop failure: {exc}",
key="health_loop_error",
interval_s=120,
)
class Bjorn:
"""Main class for Bjorn. Manages orchestration lifecycle."""
def __init__(self, shared_data_):
self.shared_data = shared_data_
self.commentaire_ia = Commentaireia()
self.orchestrator_thread = None
self.orchestrator = None
self.network_connected = False
self.wifi_connected = False
self.previous_network_connected = None
self._orch_lock = threading.Lock()
self._last_net_check = 0 # Throttling for network scan
self._last_orch_stop_attempt = 0.0
def run(self):
"""Main loop for Bjorn. Waits for network and starts/stops Orchestrator based on mode."""
if hasattr(self.shared_data, "startup_delay") and self.shared_data.startup_delay > 0:
logger.info(f"Waiting for startup delay: {self.shared_data.startup_delay} seconds")
time.sleep(self.shared_data.startup_delay)
backoff_s = 1.0
while not self.shared_data.should_exit:
try:
# Manual mode must stop orchestration so the user keeps full control.
if self.shared_data.operation_mode == "MANUAL":
# Avoid spamming stop requests if already stopped.
if self.orchestrator_thread is not None and self.orchestrator_thread.is_alive():
self.stop_orchestrator()
else:
self.check_and_start_orchestrator()
time.sleep(5)
backoff_s = 1.0 # Reset backoff on success
except Exception as exc:
logger.error(f"Bjorn main loop error: {exc}")
logger.error_throttled(
"Bjorn main loop entering backoff due to repeated errors",
key="bjorn_main_loop_backoff",
interval_s=60,
)
time.sleep(backoff_s)
backoff_s = min(backoff_s * 2.0, 30.0)
def check_and_start_orchestrator(self):
if self.shared_data.operation_mode == "MANUAL":
return
if self.is_network_connected():
self.wifi_connected = True
if self.orchestrator_thread is None or not self.orchestrator_thread.is_alive():
self.start_orchestrator()
else:
self.wifi_connected = False
logger.info_throttled(
"Waiting for network connection to start Orchestrator...",
key="bjorn_wait_network",
interval_s=30,
)
def start_orchestrator(self):
with self._orch_lock:
# Re-check network inside lock
if not self.network_connected:
return
if self.orchestrator_thread is not None and self.orchestrator_thread.is_alive():
logger.debug("Orchestrator thread is already running.")
return
logger.info("Starting Orchestrator thread...")
self.shared_data.orchestrator_should_exit = False
self.orchestrator = Orchestrator()
self.orchestrator_thread = threading.Thread(
target=self.orchestrator.run,
daemon=True,
name="OrchestratorMain",
)
self.orchestrator_thread.start()
logger.info("Orchestrator thread started.")
def stop_orchestrator(self):
with self._orch_lock:
thread = self.orchestrator_thread
if thread is None or not thread.is_alive():
self.orchestrator_thread = None
self.orchestrator = None
return
# Keep MANUAL sticky so supervisor does not auto-restart orchestration.
try:
self.shared_data.operation_mode = "MANUAL"
except Exception:
pass
now = time.time()
if now - self._last_orch_stop_attempt >= 10.0:
logger.info("Stop requested: stopping Orchestrator")
self._last_orch_stop_attempt = now
self.shared_data.orchestrator_should_exit = True
self.shared_data.queue_event.set() # Wake up thread
thread.join(timeout=10.0)
if thread.is_alive():
logger.warning_throttled(
"Orchestrator thread did not stop gracefully",
key="orch_stop_not_graceful",
interval_s=20,
)
return
self.orchestrator_thread = None
self.orchestrator = None
self.shared_data.bjorn_orch_status = "IDLE"
self.shared_data.bjorn_status_text2 = ""
def is_network_connected(self):
"""Checks for network connectivity with throttling and low-CPU checks."""
now = time.time()
# Throttling: Do not scan more than once every 10 seconds
if now - self._last_net_check < 10:
return self.network_connected
self._last_net_check = now
def interface_has_ip(interface_name):
try:
# OPTIMIZATION: Check /sys/class/net first to avoid spawning subprocess if interface doesn't exist
if not os.path.exists(f"/sys/class/net/{interface_name}"):
return False
# Check for IP address
result = subprocess.run(
["ip", "-4", "addr", "show", interface_name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=2,
)
if result.returncode != 0:
return False
return "inet " in result.stdout
except Exception:
return False
eth_connected = interface_has_ip("eth0")
wifi_connected = interface_has_ip("wlan0")
self.network_connected = eth_connected or wifi_connected
if self.network_connected != self.previous_network_connected:
if self.network_connected:
logger.info(f"Network status changed: Connected (eth0={eth_connected}, wlan0={wifi_connected})")
else:
logger.warning("Network status changed: Connection lost")
self.previous_network_connected = self.network_connected
return self.network_connected
@staticmethod
def start_display(old_display=None):
# Ensure the previous Display's controller is fully stopped to release frames
if old_display is not None:
try:
old_display.display_controller.stop(timeout=3.0)
except Exception:
pass
display = Display(shared_data)
display_thread = threading.Thread(
target=display.run,
daemon=True,
name="DisplayMain",
)
display_thread.start()
return display_thread, display
def _request_shutdown():
"""Signals all threads to stop."""
shared_data.should_exit = True
shared_data.orchestrator_should_exit = True
shared_data.display_should_exit = True
shared_data.webapp_should_exit = True
shared_data.queue_event.set()
def handle_exit(
sig,
frame,
display_thread,
bjorn_thread,
web_thread_obj,
health_thread=None,
runtime_state_thread=None,
from_signal=False,
):
global _shutdown_started
with _shutdown_lock:
if _shutdown_started:
if from_signal:
logger.warning("Forcing exit (SIGINT/SIGTERM received twice)")
os._exit(130)
return
_shutdown_started = True
logger.info(f"Shutdown signal received: {sig}")
_request_shutdown()
# 1. Stop Display (handles EPD cleanup)
try:
handle_exit_display(sig, frame, display_thread)
except Exception:
pass
# 2. Stop Health Monitor
try:
if health_thread and hasattr(health_thread, "stop"):
health_thread.stop()
except Exception:
pass
# 2b. Stop Runtime State Updater
try:
if runtime_state_thread and hasattr(runtime_state_thread, "stop"):
runtime_state_thread.stop()
except Exception:
pass
# 3. Stop Web Server
try:
if web_thread_obj and hasattr(web_thread_obj, "shutdown"):
web_thread_obj.shutdown()
except Exception:
pass
# 4. Join all threads
for thread in (display_thread, bjorn_thread, web_thread_obj, health_thread, runtime_state_thread):
try:
if thread and thread.is_alive():
thread.join(timeout=5.0)
except Exception:
pass
# 5. Close Database (Prevent corruption)
try:
if hasattr(shared_data, "db") and hasattr(shared_data.db, "close"):
shared_data.db.close()
except Exception as exc:
logger.error(f"Database shutdown error: {exc}")
logger.info("Bjorn stopped. Clean exit.")
_release_instance_lock()
if from_signal:
sys.exit(0)
def _install_thread_excepthook():
def _hook(args):
logger.error(f"Unhandled thread exception: {args.thread.name} - {args.exc_type.__name__}: {args.exc_value}")
# We don't force shutdown here to avoid killing the app on minor thread glitches,
# unless it's critical. The Crash Shield will handle restarts.
threading.excepthook = _hook
if __name__ == "__main__":
if not _acquire_instance_lock():
sys.exit(1)
atexit.register(_release_instance_lock)
_install_thread_excepthook()
display_thread = None
display_instance = None
bjorn_thread = None
health_thread = None
runtime_state_thread = None
last_gc_time = time.time()
try:
logger.info("Bjorn Startup: Loading config...")
shared_data.load_config()
logger.info("Starting Runtime State Updater...")
runtime_state_thread = RuntimeStateUpdater(shared_data)
runtime_state_thread.start()
logger.info("Starting Display...")
shared_data.display_should_exit = False
display_thread, display_instance = Bjorn.start_display()
logger.info("Starting Bjorn Core...")
bjorn = Bjorn(shared_data)
shared_data.bjorn_instance = bjorn
bjorn_thread = threading.Thread(target=bjorn.run, daemon=True, name="BjornMain")
bjorn_thread.start()
if shared_data.config.get("websrv", False):
logger.info("Starting Web Server...")
if not web_thread.is_alive():
web_thread.start()
health_interval = int(shared_data.config.get("health_log_interval", 60))
health_thread = HealthMonitor(shared_data, interval_s=health_interval)
health_thread.start()
# Signal Handlers
exit_handler = lambda s, f: handle_exit(
s,
f,
display_thread,
bjorn_thread,
web_thread,
health_thread,
runtime_state_thread,
True,
)
signal.signal(signal.SIGINT, exit_handler)
signal.signal(signal.SIGTERM, exit_handler)
# --- SUPERVISOR LOOP (Crash Shield) ---
restart_times = []
max_restarts = 5
restart_window_s = 300
logger.info("Bjorn Supervisor running.")
while not shared_data.should_exit:
time.sleep(2) # CPU Friendly polling
now = time.time()
# --- OPTIMIZATION: Periodic Garbage Collection ---
# Forces cleanup of circular references and free RAM every 2 mins
if now - last_gc_time > 120:
gc.collect()
last_gc_time = now
logger.debug("System: Forced Garbage Collection executed.")
# --- CRASH SHIELD: Bjorn Thread ---
if bjorn_thread and not bjorn_thread.is_alive() and not shared_data.should_exit:
restart_times = [t for t in restart_times if (now - t) <= restart_window_s]
restart_times.append(now)
if len(restart_times) <= max_restarts:
logger.warning("Crash Shield: Restarting Bjorn Main Thread")
bjorn_thread = threading.Thread(target=bjorn.run, daemon=True, name="BjornMain")
bjorn_thread.start()
else:
logger.critical("Crash Shield: Bjorn exceeded restart budget. Shutting down.")
_request_shutdown()
break
# --- CRASH SHIELD: Display Thread ---
if display_thread and not display_thread.is_alive() and not shared_data.should_exit:
restart_times = [t for t in restart_times if (now - t) <= restart_window_s]
restart_times.append(now)
if len(restart_times) <= max_restarts:
logger.warning("Crash Shield: Restarting Display Thread")
display_thread, display_instance = Bjorn.start_display(old_display=display_instance)
else:
logger.critical("Crash Shield: Display exceeded restart budget. Shutting down.")
_request_shutdown()
break
# --- CRASH SHIELD: Runtime State Updater ---
if runtime_state_thread and not runtime_state_thread.is_alive() and not shared_data.should_exit:
restart_times = [t for t in restart_times if (now - t) <= restart_window_s]
restart_times.append(now)
if len(restart_times) <= max_restarts:
logger.warning("Crash Shield: Restarting Runtime State Updater")
runtime_state_thread = RuntimeStateUpdater(shared_data)
runtime_state_thread.start()
else:
logger.critical("Crash Shield: Runtime State Updater exceeded restart budget. Shutting down.")
_request_shutdown()
break
# Exit cleanup
if health_thread:
health_thread.stop()
if runtime_state_thread:
runtime_state_thread.stop()
handle_exit(
signal.SIGTERM,
None,
display_thread,
bjorn_thread,
web_thread,
health_thread,
runtime_state_thread,
False,
)
except Exception as exc:
logger.critical(f"Critical bootstrap failure: {exc}")
_request_shutdown()
# Try to clean up anyway
try:
handle_exit(
signal.SIGTERM,
None,
display_thread,
bjorn_thread,
web_thread,
health_thread,
runtime_state_thread,
False,
)
except:
pass
sys.exit(1)