Add LLM configuration and MCP server management UI and backend functionality

- Implemented a new SPA page for LLM Bridge and MCP Server settings in `llm-config.js`.
- Added functionality for managing LLM and MCP configurations, including toggling, saving settings, and testing connections.
- Created HTTP endpoints in `llm_utils.py` for handling LLM chat, status checks, and MCP server configuration.
- Integrated model fetching from LaRuche and Ollama backends.
- Enhanced error handling and logging for better debugging and user feedback.
This commit is contained in:
infinition
2026-03-16 20:33:22 +01:00
parent aac77a3e76
commit b759ab6d4b
41 changed files with 9991 additions and 397 deletions

View File

@@ -2,18 +2,21 @@
"""
Network utilities for WiFi/network operations.
Handles WiFi scanning, connection, known networks management.
Compatible with both legacy NM keyfiles and Trixie netplan.
"""
from __future__ import annotations
import json
import subprocess
import logging
import re
import os
import glob
import re
from typing import Any, Dict, Optional, List
import logging
from logger import Logger
logger = Logger(name="network_utils.py", level=logging.DEBUG)
class NetworkUtils:
"""Utilities for network and WiFi management."""
@@ -21,94 +24,107 @@ class NetworkUtils:
self.logger = logger
self.shared_data = shared_data
def get_known_wifi(self, handler):
"""List known WiFi networks with priorities."""
try:
result = subprocess.run(
['nmcli', '-t', '-f', 'NAME,TYPE,AUTOCONNECT-PRIORITY', 'connection', 'show'],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout = result.stdout
self.logger.debug(f"nmcli connection show output:\n{stdout}")
# ── helpers ───────────────────────────────────────────────────────
known_networks = []
lines = stdout.strip().split('\n')
for line in lines:
@staticmethod
def _run(cmd: list[str], **kw) -> subprocess.CompletedProcess:
"""Run a command, returning CompletedProcess."""
return subprocess.run(
cmd, check=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True, **kw,
)
@staticmethod
def _json_response(handler, code: int, payload: dict):
handler.send_response(code)
handler.send_header("Content-type", "application/json")
handler.end_headers()
handler.wfile.write(json.dumps(payload).encode('utf-8'))
# ── known networks ───────────────────────────────────────────────
def get_known_wifi(self, handler):
"""List known WiFi networks with priorities.
Uses nmcli terse output. On Trixie, netplan-generated profiles
(named ``netplan-wlan0-*``) appear alongside user-created NM
profiles — both are returned.
"""
try:
result = self._run(
['nmcli', '-t', '-f', 'NAME,TYPE,AUTOCONNECT-PRIORITY', 'connection', 'show']
)
self.logger.debug(f"nmcli connection show output:\n{result.stdout}")
known_networks: list[dict] = []
for line in result.stdout.strip().splitlines():
if not line.strip():
continue
parts = line.split(':')
# nmcli -t uses ':' as delimiter — SSIDs with ':' are
# escaped by nmcli (backslash-colon), so split from
# the right to be safe: last field = priority,
# second-to-last = type, rest = name.
parts = line.rsplit(':', 2)
if len(parts) == 3:
name, conn_type, priority = parts
name, conn_type, priority_str = parts
elif len(parts) == 2:
name, conn_type = parts
priority = '0'
self.logger.warning(f"Missing priority for connection {name}. Assigning priority 0.")
priority_str = '0'
else:
self.logger.warning(f"Unexpected line format: {line}")
continue
if conn_type.lower() in ['802-11-wireless', 'wireless', 'wifi']:
try:
priority_int = int(priority) if priority.isdigit() else 0
except ValueError:
priority_int = 0
self.logger.warning(f"Non-numeric priority for {name}. Assigning priority 0.")
known_networks.append({
'ssid': name,
'priority': priority_int
})
# Unescape nmcli backslash-colon
name = name.replace('\\:', ':')
if conn_type.strip().lower() not in (
'802-11-wireless', 'wireless', 'wifi',
):
continue
try:
priority_int = int(priority_str.strip())
except (ValueError, AttributeError):
priority_int = 0
known_networks.append({
'ssid': name.strip(),
'priority': priority_int,
})
self.logger.debug(f"Extracted known networks: {known_networks}")
known_networks.sort(key=lambda x: x['priority'], reverse=True)
self._json_response(handler, 200, {"known_networks": known_networks})
handler.send_response(200)
handler.send_header("Content-type", "application/json")
handler.end_headers()
handler.wfile.write(json.dumps({"known_networks": known_networks}).encode('utf-8'))
except subprocess.CalledProcessError as e:
self.logger.error(f"Error getting known Wi-Fi networks: {e.stderr.strip()}")
handler.send_response(500)
handler.send_header("Content-type", "application/json")
handler.end_headers()
handler.wfile.write(json.dumps({"error": e.stderr.strip()}).encode('utf-8'))
self._json_response(handler, 500, {"error": e.stderr.strip()})
except Exception as e:
self.logger.error(f"Error getting known Wi-Fi networks: {e}")
handler.send_response(500)
handler.send_header("Content-type", "application/json")
handler.end_headers()
handler.wfile.write(json.dumps({"error": str(e)}).encode('utf-8'))
self._json_response(handler, 500, {"error": str(e)})
def delete_known_wifi(self, data):
"""Delete a known WiFi connection."""
ssid = None
ssid = data.get('ssid')
try:
ssid = data['ssid']
result = subprocess.run(
['sudo', 'nmcli', 'connection', 'delete', ssid],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
if not ssid:
return {"status": "error", "message": "Missing SSID"}
self._run(['sudo', 'nmcli', 'connection', 'delete', ssid])
self.logger.info(f"Deleted Wi-Fi connection: {ssid}")
return {"status": "success", "message": f"Network {ssid} deleted"}
except subprocess.CalledProcessError as e:
error_message = f"Error deleting Wi-Fi connection {ssid if ssid else 'unknown'}: {e.stderr.strip()}"
self.logger.error(error_message)
self.logger.error(f"Error deleting Wi-Fi connection {ssid}: {e.stderr.strip()}")
return {"status": "error", "message": e.stderr.strip()}
except Exception as e:
error_message = f"Unexpected error deleting Wi-Fi connection {ssid if ssid else 'unknown'}: {e}"
self.logger.error(error_message)
self.logger.error(f"Unexpected error deleting Wi-Fi connection {ssid}: {e}")
return {"status": "error", "message": str(e)}
def connect_known_wifi(self, data):
"""Connect to a known WiFi network."""
ssid = data.get('ssid', '')
try:
ssid = data['ssid']
if not self.validate_network_configuration(ssid):
raise Exception(f"Invalid or non-existent configuration for network '{ssid}'.")
result = subprocess.run(
['sudo', 'nmcli', 'connection', 'up', ssid],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
if not self.check_connection_exists(ssid):
return {"status": "error", "message": f"Network '{ssid}' not found in saved connections."}
self._run(['sudo', 'nmcli', 'connection', 'up', ssid])
self.logger.info(f"Connected to known Wi-Fi network: {ssid}")
return {"status": "success", "message": f"Connected to {ssid}"}
except subprocess.CalledProcessError as e:
@@ -119,14 +135,20 @@ class NetworkUtils:
return {"status": "error", "message": str(e)}
def update_wifi_priority(self, data):
"""Update WiFi connection priority."""
"""Update WiFi connection priority.
Works for both NM-native and netplan-generated profiles.
For netplan profiles (prefixed ``netplan-``), nmcli modify
writes a persistent override into
/etc/NetworkManager/system-connections/.
"""
ssid = data.get('ssid', '')
try:
ssid = data['ssid']
priority = int(data['priority'])
result = subprocess.run(
['sudo', 'nmcli', 'connection', 'modify', ssid, 'connection.autoconnect-priority', str(priority)],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
self._run([
'sudo', 'nmcli', 'connection', 'modify', ssid,
'connection.autoconnect-priority', str(priority),
])
self.logger.info(f"Priority updated for {ssid} to {priority}")
return {"status": "success", "message": "Priority updated"}
except subprocess.CalledProcessError as e:
@@ -136,95 +158,122 @@ class NetworkUtils:
self.logger.error(f"Unexpected error updating Wi-Fi priority: {e}")
return {"status": "error", "message": str(e)}
# ── scanning ─────────────────────────────────────────────────────
def scan_wifi(self, handler):
"""Scan for available WiFi networks."""
"""Scan for available WiFi networks.
Uses ``nmcli -t`` (terse) output for reliable parsing.
Signal is returned as a percentage 0-100.
"""
try:
result = subprocess.run(
['sudo', 'nmcli', 'device', 'wifi', 'list'],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
# Trigger a rescan first (best-effort)
subprocess.run(
['sudo', 'nmcli', 'device', 'wifi', 'rescan'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
)
stdout = result.stdout
networks = self.parse_scan_result(stdout)
self.logger.info(f"Found {len(networks)} networks")
result = self._run([
'sudo', 'nmcli', '-t', '-f',
'SSID,SIGNAL,SECURITY,IN-USE',
'device', 'wifi', 'list',
])
networks = self._parse_terse_scan(result.stdout)
current_ssid = self.get_current_ssid()
self.logger.info(f"Current SSID: {current_ssid}")
self.logger.info(f"Found {len(networks)} networks, current={current_ssid}")
handler.send_response(200)
handler.send_header("Content-type", "application/json")
handler.end_headers()
handler.wfile.write(json.dumps({"networks": networks, "current_ssid": current_ssid}).encode('utf-8'))
self._json_response(handler, 200, {
"networks": networks,
"current_ssid": current_ssid,
})
except subprocess.CalledProcessError as e:
self.logger.error(f"Error scanning Wi-Fi networks: {e.stderr.strip()}")
handler.send_response(500)
handler.send_header("Content-type", "application/json")
handler.end_headers()
handler.wfile.write(json.dumps({"error": e.stderr.strip()}).encode('utf-8'))
self._json_response(handler, 500, {"error": e.stderr.strip()})
def parse_scan_result(self, scan_output):
"""Parse nmcli scan output."""
networks = []
lines = scan_output.split('\n')
headers = []
for idx, line in enumerate(lines):
if line.startswith("IN-USE"):
headers = re.split(r'\s{2,}', line)
@staticmethod
def _parse_terse_scan(output: str) -> list[dict]:
"""Parse ``nmcli -t -f SSID,SIGNAL,SECURITY,IN-USE device wifi list``.
Terse output uses ':' as separator. SSIDs containing ':'
are escaped by nmcli as ``\\:``.
Returns a deduplicated list sorted by signal descending.
"""
seen: dict[str, dict] = {}
for line in output.strip().splitlines():
if not line.strip():
continue
if headers and line.strip():
fields = re.split(r'\s{2,}', line)
if len(fields) >= len(headers):
network = dict(zip(headers, fields))
ssid = network.get('SSID', '')
signal_level = int(network.get('SIGNAL', '0'))
security = network.get('SECURITY', '')
networks.append({
'ssid': ssid,
'signal_level': signal_level,
'security': security
})
return networks
def get_current_ssid(self):
# Split from the right: IN-USE (last), SECURITY, SIGNAL, rest=SSID
# IN-USE is '*' or '' — always one char field at the end
parts = line.rsplit(':', 3)
if len(parts) < 4:
continue
raw_ssid, signal_str, security, in_use = parts
# Unescape nmcli backslash-colon in SSID
ssid = raw_ssid.replace('\\:', ':').strip()
if not ssid:
continue
try:
signal = int(signal_str.strip())
except (ValueError, AttributeError):
signal = 0
# Normalize security string
security = security.strip()
if not security or security == '--':
security = 'Open'
# Keep the strongest signal per SSID
if ssid not in seen or signal > seen[ssid]['signal']:
seen[ssid] = {
'ssid': ssid,
'signal': signal,
'security': security,
'in_use': in_use.strip() == '*',
}
result = sorted(seen.values(), key=lambda n: n['signal'], reverse=True)
return result
def get_current_ssid(self) -> Optional[str]:
"""Get currently connected SSID."""
try:
result = subprocess.run(
['nmcli', '-t', '-f', 'active,ssid', 'dev', 'wifi'],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
lines = result.stdout.strip().split('\n')
for line in lines:
active, ssid = line.split(':', 1)
if active == 'yes':
return ssid
result = self._run(['nmcli', '-t', '-f', 'active,ssid', 'dev', 'wifi'])
for line in result.stdout.strip().splitlines():
parts = line.split(':', 1)
if len(parts) == 2 and parts[0] == 'yes':
return parts[1]
return None
except subprocess.CalledProcessError as e:
self.logger.error(f"Error getting current SSID: {e.stderr.strip()}")
return None
def connect_wifi(self, data):
"""Connect to WiFi network (new or existing)."""
try:
ssid = data['ssid']
password = data.get('password', '')
# ── connect ──────────────────────────────────────────────────────
def connect_wifi(self, data):
"""Connect to WiFi network (new or existing).
On Trixie, ``nmcli device wifi connect`` creates a persistent
NM keyfile in /etc/NetworkManager/system-connections/,
which survives reboots even when netplan manages the initial
Wi-Fi profile.
"""
ssid = data.get('ssid', '')
password = data.get('password', '')
try:
if self.check_connection_exists(ssid):
result = subprocess.run(
['sudo', 'nmcli', 'connection', 'up', ssid],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
return {"status": "success", "message": f"Connected to {ssid}"}
else:
if password:
result = subprocess.run(
['sudo', 'nmcli', 'device', 'wifi', 'connect', ssid, 'password', password],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
else:
result = subprocess.run(
['sudo', 'nmcli', 'device', 'wifi', 'connect', ssid],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
self._run(['sudo', 'nmcli', 'connection', 'up', ssid])
return {"status": "success", "message": f"Connected to {ssid}"}
cmd = ['sudo', 'nmcli', 'device', 'wifi', 'connect', ssid]
if password:
cmd += ['password', password]
self._run(cmd)
return {"status": "success", "message": f"Connected to {ssid}"}
except subprocess.CalledProcessError as e:
self.logger.error(f"Error connecting to network {ssid}: {e.stderr.strip()}")
return {"status": "error", "message": e.stderr.strip()}
@@ -232,127 +281,216 @@ class NetworkUtils:
self.logger.error(f"Error in connect_wifi: {e}")
return {"status": "error", "message": str(e)}
def check_connection_exists(self, ssid):
"""Check if a WiFi connection already exists."""
def check_connection_exists(self, ssid: str) -> bool:
"""Check if a WiFi connection profile exists (exact match)."""
try:
result = subprocess.run(
['nmcli', '-t', '-f', 'NAME', 'connection', 'show'],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
connections = result.stdout.strip().split('\n')
return ssid in connections
except subprocess.CalledProcessError as e:
self.logger.error(f"Error checking existing connections: {e.stderr.strip()}")
result = self._run(['nmcli', '-t', '-f', 'NAME', 'connection', 'show'])
for name in result.stdout.strip().splitlines():
# nmcli escapes ':' in names with backslash
if name.replace('\\:', ':').strip() == ssid:
return True
return False
except subprocess.CalledProcessError:
return False
def validate_network_configuration(self, ssid):
"""Validate network configuration in NetworkManager."""
try:
result = subprocess.run(
['nmcli', '-t', '-f', 'NAME,UUID,TYPE,AUTOCONNECT', 'connection', 'show'],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
connections = result.stdout.strip().split('\n')
for conn in connections:
if ssid in conn:
self.logger.info(f"Network {ssid} validated in NetworkManager.")
return True
self.logger.warning(f"Network {ssid} not found in NetworkManager.")
return False
except Exception as e:
self.logger.error(f"Error validating network {ssid}: {e}")
return False
def validate_network_configuration(self, ssid: str) -> bool:
"""Validate that a WiFi connection profile exists (exact match)."""
return self.check_connection_exists(ssid)
# ── potfile import ───────────────────────────────────────────────
def import_potfiles(self, data=None):
"""Import WiFi credentials from .pot/.potfile files."""
"""Import WiFi credentials from .pot/.potfile files.
Creates NM connection profiles via nmcli — these are stored
in /etc/NetworkManager/system-connections/ and persist across
reboots on both legacy and Trixie builds.
"""
try:
potfiles_folder = self.shared_data.potfiles_dir
import glob
potfile_paths = glob.glob(f"{potfiles_folder}/*.pot") + glob.glob(f"{potfiles_folder}/*.potfile")
potfile_paths = (
glob.glob(f"{potfiles_folder}/*.pot")
+ glob.glob(f"{potfiles_folder}/*.potfile")
)
networks_added = []
networks_added: list[str] = []
networks_skipped: list[str] = []
networks_failed: list[str] = []
DEFAULT_PRIORITY = 5
for potfile_path in potfile_paths:
with open(potfile_path, 'r') as potfile:
for line in potfile:
line = line.strip()
if ':' not in line:
self.logger.warning(f"Ignoring malformed line in {potfile_path}: {line}")
continue
if line.startswith('$WPAPSK$') and '#' in line:
try:
ssid_hash_part, password = line.split(':', 1)
ssid = ssid_hash_part.split('#')[0].replace('$WPAPSK$', '')
except ValueError:
self.logger.warning(f"Failed to parse WPAPSK line in {potfile_path}: {line}")
try:
with open(potfile_path, 'r', errors='replace') as potfile:
for line in potfile:
line = line.strip()
if not line or ':' not in line:
continue
elif len(line.split(':')) == 4:
try:
_, _, ssid, password = line.split(':')
except ValueError:
self.logger.warning(f"Failed to parse custom line in {potfile_path}: {line}")
ssid, password = self._parse_potfile_line(line)
if not ssid or not password:
continue
else:
self.logger.warning(f"Unknown format in {potfile_path}: {line}")
continue
if ssid and password:
if not self.check_connection_exists(ssid):
try:
subprocess.run(
['sudo', 'nmcli', 'connection', 'add', 'type', 'wifi',
'con-name', ssid, 'ifname', '*', 'ssid', ssid,
'wifi-sec.key-mgmt', 'wpa-psk', 'wifi-sec.psk', password,
'connection.autoconnect', 'yes',
'connection.autoconnect-priority', str(DEFAULT_PRIORITY)],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
networks_added.append(ssid)
self.logger.info(f"Imported network {ssid} from {potfile_path}")
except subprocess.CalledProcessError as e:
self.logger.error(f"Failed to add network {ssid}: {e.stderr.strip()}")
else:
self.logger.info(f"Network {ssid} already exists. Skipping.")
else:
self.logger.warning(f"Incomplete data in {potfile_path}: {line}")
if self.check_connection_exists(ssid):
networks_skipped.append(ssid)
continue
return {"status": "success", "networks_added": networks_added}
try:
self._run([
'sudo', 'nmcli', 'connection', 'add',
'type', 'wifi',
'con-name', ssid,
'ifname', '*',
'ssid', ssid,
'wifi-sec.key-mgmt', 'wpa-psk',
'wifi-sec.psk', password,
'connection.autoconnect', 'yes',
'connection.autoconnect-priority', str(DEFAULT_PRIORITY),
])
networks_added.append(ssid)
self.logger.info(f"Imported network {ssid} from {potfile_path}")
except subprocess.CalledProcessError as e:
networks_failed.append(ssid)
self.logger.error(f"Failed to add network {ssid}: {e.stderr.strip()}")
except OSError as e:
self.logger.error(f"Failed to read potfile {potfile_path}: {e}")
return {
"status": "success",
"networks_added": networks_added,
"imported": len(networks_added),
"skipped": len(networks_skipped),
"failed": len(networks_failed),
}
except Exception as e:
self.logger.error(f"Unexpected error importing potfiles: {e}")
return {"status": "error", "message": str(e)}
@staticmethod
def _parse_potfile_line(line: str) -> tuple[str, str]:
"""Parse a single potfile line, returning (ssid, password) or ('', '')."""
# Format 1: $WPAPSK$SSID#hash:password
if line.startswith('$WPAPSK$') and '#' in line:
try:
ssid_hash_part, password = line.split(':', 1)
ssid = ssid_hash_part.split('#')[0].replace('$WPAPSK$', '')
return ssid.strip(), password.strip()
except ValueError:
return '', ''
# Format 2: MAC:MAC:SSID:password (4 colon-separated fields)
parts = line.split(':')
if len(parts) == 4:
return parts[2].strip(), parts[3].strip()
# Format 3: SSID:password (2 colon-separated fields)
if len(parts) == 2:
return parts[0].strip(), parts[1].strip()
return '', ''
# ── preconfigured file management (legacy compat) ────────────────
def delete_preconfigured_file(self, handler):
"""Delete the legacy preconfigured.nmconnection file.
On Trixie this file typically does not exist (Wi-Fi is managed
by netplan). The endpoint returns 200/success even if the file
is missing to avoid breaking the frontend.
"""
path = '/etc/NetworkManager/system-connections/preconfigured.nmconnection'
try:
os.remove('/etc/NetworkManager/system-connections/preconfigured.nmconnection')
handler.send_response(200)
handler.send_header("Content-type", "application/json")
handler.end_headers()
handler.wfile.write(json.dumps({"status": "success"}).encode('utf-8'))
except FileNotFoundError:
handler.send_response(404)
handler.send_header("Content-type", "application/json")
handler.end_headers()
handler.wfile.write(json.dumps({"status": "error", "message": "Fichier introuvable"}).encode('utf-8'))
if os.path.exists(path):
os.remove(path)
self.logger.info("Deleted preconfigured.nmconnection")
else:
self.logger.info("preconfigured.nmconnection not found (Trixie/netplan — this is normal)")
self._json_response(handler, 200, {"status": "success"})
except Exception as e:
handler.send_response(500)
handler.send_header("Content-type", "application/json")
handler.end_headers()
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
self.logger.error(f"Error deleting preconfigured file: {e}")
self._json_response(handler, 500, {"status": "error", "message": str(e)})
def create_preconfigured_file(self, handler):
"""Create a preconfigured.nmconnection file (legacy compat).
On Trixie this is a no-op: Wi-Fi is managed by netplan.
Returns success regardless to avoid breaking the frontend.
"""
self.logger.warning("create_preconfigured_file called — no-op on Trixie/netplan builds")
self._json_response(handler, 200, {
"status": "success",
"message": "No action needed on netplan-managed builds",
})
# ── potfile upload ────────────────────────────────────────────────
def upload_potfile(self, handler):
"""Upload a .pot/.potfile file to the potfiles directory.
Accepts multipart/form-data with a 'potfile' field.
Saves to shared_data.potfiles_dir.
Manual multipart parsing — no cgi module (removed in Python 3.13).
"""
try:
with open('/etc/NetworkManager/system-connections/preconfigured.nmconnection', 'w') as f:
f.write('Exemple de contenu') # Ajoutez le contenu par défaut
handler.send_response(200)
handler.send_header("Content-type", "application/json")
handler.end_headers()
handler.wfile.write(json.dumps({"status": "success"}).encode('utf-8'))
content_type = handler.headers.get("Content-Type", "")
if "multipart/form-data" not in content_type:
self._json_response(handler, 400, {
"status": "error",
"message": "Content-Type must be multipart/form-data",
})
return
boundary = content_type.split("=")[1].encode()
content_length = int(handler.headers.get("Content-Length", 0))
body = handler.rfile.read(content_length)
parts = body.split(b"--" + boundary)
filename = None
file_data = None
for part in parts:
if b"Content-Disposition" not in part:
continue
if b'name="potfile"' not in part:
continue
if b"filename=" not in part:
continue
headers_raw, data = part.split(b"\r\n\r\n", 1)
headers_str = headers_raw.decode(errors="replace")
match = re.search(r'filename="(.+?)"', headers_str)
if match:
filename = os.path.basename(match.group(1))
# Strip trailing boundary markers
file_data = data.rstrip(b"\r\n--").rstrip(b"\r\n")
break
if not filename or file_data is None:
self._json_response(handler, 400, {
"status": "error",
"message": "No potfile provided",
})
return
# Sanitise filename
safe_name = "".join(
c for c in filename if c.isalnum() or c in ".-_"
) or "uploaded.potfile"
dest_dir = self.shared_data.potfiles_dir
os.makedirs(dest_dir, exist_ok=True)
dest_path = os.path.join(dest_dir, safe_name)
with open(dest_path, "wb") as f:
f.write(file_data)
self.logger.info(f"Uploaded potfile: {safe_name} ({len(file_data)} bytes)")
self._json_response(handler, 200, {
"status": "success",
"filename": safe_name,
})
except Exception as e:
handler.send_response(500)
handler.send_header("Content-type", "application/json")
handler.end_headers()
handler.wfile.write(json.dumps({"status": "error", "message": str(e)}).encode('utf-8'))
self.logger.error(f"Error uploading potfile: {e}")
self._json_response(handler, 500, {
"status": "error",
"message": str(e),
})