#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ valkyrie_scout.py — Web surface scout (Pi Zero friendly, orchestrator compatible). What it does: - Probes a small set of common web paths on a target (ip, port). - Extracts high-signal indicators from responses (auth type, login form hints, missing security headers, error/debug strings). No exploitation, no bruteforce. - Writes results into DB table `webenum` (tool='valkyrie_scout') so the UI can browse findings. - Updates EPD fields: bjorn_orch_status, bjorn_status_text2, comment_params, bjorn_progress. """ import json import logging import re import ssl import time from http.client import HTTPConnection, HTTPSConnection, RemoteDisconnected from typing import Dict, List, Optional, Tuple from logger import Logger from actions.bruteforce_common import ProgressTracker logger = Logger(name="valkyrie_scout.py", level=logging.DEBUG) # -------------------- Action metadata (AST-friendly) -------------------- b_class = "ValkyrieScout" b_module = "valkyrie_scout" b_status = "ValkyrieScout" b_port = 80 b_parent = None b_service = '["http","https"]' b_trigger = "on_web_service" b_priority = 50 b_action = "normal" b_cooldown = 1800 b_rate_limit = "8/86400" b_enabled = 0 # keep disabled by default; enable via Actions UI/DB when ready. # Small default list to keep the action cheap on Pi Zero. DEFAULT_PATHS = [ "/", "/robots.txt", "/login", "/signin", "/auth", "/admin", "/administrator", "/wp-login.php", "/user/login", ] # Keep patterns minimal and high-signal. SQLI_ERRORS = [ "error in your sql syntax", "mysql_fetch", "unclosed quotation mark", "ora-", "postgresql", "sqlite error", ] LFI_HINTS = [ "include(", "require(", "include_once(", "require_once(", ] DEBUG_HINTS = [ "stack trace", "traceback", "exception", "fatal error", "notice:", "warning:", "debug", ] def _scheme_for_port(port: int) -> str: https_ports = {443, 8443, 9443, 10443, 9444, 5000, 5001, 7080, 9080} return "https" if int(port) in https_ports else "http" def _first_hostname_from_row(row: Dict) -> str: try: hn = (row.get("Hostname") or row.get("hostname") or row.get("hostnames") or "").strip() if ";" in hn: hn = hn.split(";", 1)[0].strip() return hn except Exception: return "" def _lower_headers(headers: Dict[str, str]) -> Dict[str, str]: out = {} for k, v in (headers or {}).items(): if not k: continue out[str(k).lower()] = str(v) return out def _detect_signals(status: int, headers: Dict[str, str], body_snippet: str) -> Dict[str, object]: h = _lower_headers(headers) www = h.get("www-authenticate", "") set_cookie = h.get("set-cookie", "") auth_type = None if status == 401 and "basic" in www.lower(): auth_type = "basic" elif status == 401 and "digest" in www.lower(): auth_type = "digest" snippet = (body_snippet or "").lower() has_form = "