mirror of
https://github.com/infinition/Bjorn.git
synced 2025-12-06 06:11:46 +00:00
590 lines
26 KiB
Python
590 lines
26 KiB
Python
#scanning.py
|
|
# This script performs a network scan to identify live hosts, their MAC addresses, and open ports.
|
|
# The results are saved to CSV files and displayed using Rich for enhanced visualization.
|
|
|
|
import os
|
|
import threading
|
|
import csv
|
|
import pandas as pd
|
|
import socket
|
|
import netifaces
|
|
import time
|
|
import glob
|
|
import logging
|
|
from datetime import datetime
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from rich.text import Text
|
|
from rich.progress import Progress
|
|
from getmac import get_mac_address as gma
|
|
from shared import SharedData
|
|
from logger import Logger
|
|
import ipaddress
|
|
import nmap
|
|
|
|
logger = Logger(name="scanning.py", level=logging.DEBUG)
|
|
|
|
b_class = "NetworkScanner"
|
|
b_module = "scanning"
|
|
b_status = "network_scanner"
|
|
b_port = None
|
|
b_parent = None
|
|
b_priority = 1
|
|
|
|
class NetworkScanner:
|
|
"""
|
|
This class handles the entire network scanning process.
|
|
"""
|
|
def __init__(self, shared_data):
|
|
self.shared_data = shared_data
|
|
self.logger = logger
|
|
self.displaying_csv = shared_data.displaying_csv
|
|
self.blacklistcheck = shared_data.blacklistcheck
|
|
self.mac_scan_blacklist = shared_data.mac_scan_blacklist
|
|
self.ip_scan_blacklist = shared_data.ip_scan_blacklist
|
|
self.console = Console()
|
|
self.lock = threading.Lock()
|
|
self.currentdir = shared_data.currentdir
|
|
self.semaphore = threading.Semaphore(200) # Limit the number of active threads to 20
|
|
self.nm = nmap.PortScanner() # Initialize nmap.PortScanner()
|
|
self.running = False
|
|
|
|
def check_if_csv_scan_file_exists(self, csv_scan_file, csv_result_file, netkbfile):
|
|
"""
|
|
Checks and prepares the necessary CSV files for the scan.
|
|
"""
|
|
with self.lock:
|
|
try:
|
|
if not os.path.exists(os.path.dirname(csv_scan_file)):
|
|
os.makedirs(os.path.dirname(csv_scan_file))
|
|
if not os.path.exists(os.path.dirname(netkbfile)):
|
|
os.makedirs(os.path.dirname(netkbfile))
|
|
if os.path.exists(csv_scan_file):
|
|
os.remove(csv_scan_file)
|
|
if os.path.exists(csv_result_file):
|
|
os.remove(csv_result_file)
|
|
if not os.path.exists(netkbfile):
|
|
with open(netkbfile, 'w', newline='') as file:
|
|
writer = csv.writer(file)
|
|
writer.writerow(['MAC Address', 'IPs', 'Hostnames', 'Alive', 'Ports'])
|
|
except Exception as e:
|
|
self.logger.error(f"Error in check_if_csv_scan_file_exists: {e}")
|
|
|
|
def get_current_timestamp(self):
|
|
"""
|
|
Returns the current timestamp in a specific format.
|
|
"""
|
|
return datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
def ip_key(self, ip):
|
|
"""
|
|
Converts an IP address to a tuple of integers for sorting.
|
|
"""
|
|
if ip == "STANDALONE":
|
|
return (0, 0, 0, 0)
|
|
try:
|
|
return tuple(map(int, ip.split('.')))
|
|
except ValueError as e:
|
|
self.logger.error(f"Error in ip_key: {e}")
|
|
return (0, 0, 0, 0)
|
|
|
|
def sort_and_write_csv(self, csv_scan_file):
|
|
"""
|
|
Sorts the CSV file based on IP addresses and writes the sorted content back to the file.
|
|
"""
|
|
with self.lock:
|
|
try:
|
|
with open(csv_scan_file, 'r') as file:
|
|
lines = file.readlines()
|
|
sorted_lines = [lines[0]] + sorted(lines[1:], key=lambda x: self.ip_key(x.split(',')[0]))
|
|
with open(csv_scan_file, 'w') as file:
|
|
file.writelines(sorted_lines)
|
|
except Exception as e:
|
|
self.logger.error(f"Error in sort_and_write_csv: {e}")
|
|
|
|
class GetIpFromCsv:
|
|
"""
|
|
Helper class to retrieve IP addresses, hostnames, and MAC addresses from a CSV file.
|
|
"""
|
|
def __init__(self, outer_instance, csv_scan_file):
|
|
self.outer_instance = outer_instance
|
|
self.csv_scan_file = csv_scan_file
|
|
self.ip_list = []
|
|
self.hostname_list = []
|
|
self.mac_list = []
|
|
self.get_ip_from_csv()
|
|
|
|
def get_ip_from_csv(self):
|
|
"""
|
|
Reads IP addresses, hostnames, and MAC addresses from the CSV file.
|
|
"""
|
|
with self.outer_instance.lock:
|
|
try:
|
|
with open(self.csv_scan_file, 'r') as csv_scan_file:
|
|
csv_reader = csv.reader(csv_scan_file)
|
|
next(csv_reader)
|
|
for row in csv_reader:
|
|
if row[0] == "STANDALONE" or row[1] == "STANDALONE" or row[2] == "STANDALONE":
|
|
continue
|
|
if not self.outer_instance.blacklistcheck or (row[2] not in self.outer_instance.mac_scan_blacklist and row[0] not in self.outer_instance.ip_scan_blacklist):
|
|
self.ip_list.append(row[0])
|
|
self.hostname_list.append(row[1])
|
|
self.mac_list.append(row[2])
|
|
except Exception as e:
|
|
self.outer_instance.logger.error(f"Error in get_ip_from_csv: {e}")
|
|
|
|
def update_netkb(self, netkbfile, netkb_data, alive_macs):
|
|
"""
|
|
Updates the net knowledge base (netkb) file with the scan results.
|
|
"""
|
|
with self.lock:
|
|
try:
|
|
netkb_entries = {}
|
|
existing_action_columns = []
|
|
|
|
# Read existing CSV file
|
|
if os.path.exists(netkbfile):
|
|
with open(netkbfile, 'r') as file:
|
|
reader = csv.DictReader(file)
|
|
existing_headers = reader.fieldnames
|
|
existing_action_columns = [header for header in existing_headers if header not in ["MAC Address", "IPs", "Hostnames", "Alive", "Ports"]]
|
|
for row in reader:
|
|
mac = row["MAC Address"]
|
|
ips = row["IPs"].split(';')
|
|
hostnames = row["Hostnames"].split(';')
|
|
alive = row["Alive"]
|
|
ports = row["Ports"].split(';')
|
|
netkb_entries[mac] = {
|
|
'IPs': set(ips) if ips[0] else set(),
|
|
'Hostnames': set(hostnames) if hostnames[0] else set(),
|
|
'Alive': alive,
|
|
'Ports': set(ports) if ports[0] else set()
|
|
}
|
|
for action in existing_action_columns:
|
|
netkb_entries[mac][action] = row.get(action, "")
|
|
|
|
ip_to_mac = {} # Dictionary to track IP to MAC associations
|
|
|
|
for data in netkb_data:
|
|
mac, ip, hostname, ports = data
|
|
if not mac or mac == "STANDALONE" or ip == "STANDALONE" or hostname == "STANDALONE":
|
|
continue
|
|
|
|
# Check if MAC address is "00:00:00:00:00:00"
|
|
if mac == "00:00:00:00:00:00":
|
|
continue
|
|
|
|
if self.blacklistcheck and (mac in self.mac_scan_blacklist or ip in self.ip_scan_blacklist):
|
|
continue
|
|
|
|
# Check if IP is already associated with a different MAC
|
|
if ip in ip_to_mac and ip_to_mac[ip] != mac:
|
|
# Mark the old MAC as not alive
|
|
old_mac = ip_to_mac[ip]
|
|
if old_mac in netkb_entries:
|
|
netkb_entries[old_mac]['Alive'] = '0'
|
|
|
|
# Update or create entry for the new MAC
|
|
ip_to_mac[ip] = mac
|
|
if mac in netkb_entries:
|
|
netkb_entries[mac]['IPs'].add(ip)
|
|
netkb_entries[mac]['Hostnames'].add(hostname)
|
|
netkb_entries[mac]['Alive'] = '1'
|
|
netkb_entries[mac]['Ports'].update(map(str, ports))
|
|
else:
|
|
netkb_entries[mac] = {
|
|
'IPs': {ip},
|
|
'Hostnames': {hostname},
|
|
'Alive': '1',
|
|
'Ports': set(map(str, ports))
|
|
}
|
|
for action in existing_action_columns:
|
|
netkb_entries[mac][action] = ""
|
|
|
|
# Update all existing entries to mark missing hosts as not alive
|
|
for mac in netkb_entries:
|
|
if mac not in alive_macs:
|
|
netkb_entries[mac]['Alive'] = '0'
|
|
|
|
# Remove entries with multiple IP addresses for a single MAC address
|
|
netkb_entries = {mac: data for mac, data in netkb_entries.items() if len(data['IPs']) == 1}
|
|
|
|
sorted_netkb_entries = sorted(netkb_entries.items(), key=lambda x: self.ip_key(sorted(x[1]['IPs'])[0]))
|
|
|
|
with open(netkbfile, 'w', newline='') as file:
|
|
writer = csv.writer(file)
|
|
writer.writerow(existing_headers) # Use existing headers
|
|
for mac, data in sorted_netkb_entries:
|
|
row = [
|
|
mac,
|
|
';'.join(sorted(data['IPs'], key=self.ip_key)),
|
|
';'.join(sorted(data['Hostnames'])),
|
|
data['Alive'],
|
|
';'.join(sorted(data['Ports'], key=int))
|
|
]
|
|
row.extend(data.get(action, "") for action in existing_action_columns)
|
|
writer.writerow(row)
|
|
except Exception as e:
|
|
self.logger.error(f"Error in update_netkb: {e}")
|
|
|
|
def display_csv(self, file_path):
|
|
"""
|
|
Displays the contents of the specified CSV file using Rich for enhanced visualization.
|
|
"""
|
|
with self.lock:
|
|
try:
|
|
table = Table(title=f"Contents of {file_path}", show_lines=True)
|
|
with open(file_path, 'r') as file:
|
|
reader = csv.reader(file)
|
|
headers = next(reader)
|
|
for header in headers:
|
|
table.add_column(header, style="cyan", no_wrap=True)
|
|
for row in reader:
|
|
formatted_row = [Text(cell, style="green bold") if cell else Text("", style="on red") for cell in row]
|
|
table.add_row(*formatted_row)
|
|
self.console.print(table)
|
|
except Exception as e:
|
|
self.logger.error(f"Error in display_csv: {e}")
|
|
|
|
def get_network(self):
|
|
"""
|
|
Retrieves the network information including the default gateway and subnet.
|
|
"""
|
|
try:
|
|
gws = netifaces.gateways()
|
|
default_gateway = gws['default'][netifaces.AF_INET][1]
|
|
iface = netifaces.ifaddresses(default_gateway)[netifaces.AF_INET][0]
|
|
ip_address = iface['addr']
|
|
netmask = iface['netmask']
|
|
cidr = sum([bin(int(x)).count('1') for x in netmask.split('.')])
|
|
network = ipaddress.IPv4Network(f"{ip_address}/{cidr}", strict=False)
|
|
self.logger.info(f"Network: {network}")
|
|
return network
|
|
except Exception as e:
|
|
self.logger.error(f"Error in get_network: {e}")
|
|
|
|
def get_mac_address(self, ip, hostname):
|
|
"""
|
|
Retrieves the MAC address for the given IP address and hostname.
|
|
"""
|
|
try:
|
|
mac = None
|
|
retries = 5
|
|
while not mac and retries > 0:
|
|
mac = gma(ip=ip)
|
|
if not mac:
|
|
time.sleep(2) # Attendre 2 secondes avant de réessayer
|
|
retries -= 1
|
|
if not mac:
|
|
mac = f"{ip}_{hostname}" if hostname else f"{ip}_NoHostname"
|
|
return mac
|
|
except Exception as e:
|
|
self.logger.error(f"Error in get_mac_address: {e}")
|
|
return None
|
|
|
|
class PortScanner:
|
|
"""
|
|
Helper class to perform port scanning on a target IP.
|
|
"""
|
|
def __init__(self, outer_instance, target, open_ports, portstart, portend, extra_ports):
|
|
self.outer_instance = outer_instance
|
|
self.logger = logger
|
|
self.target = target
|
|
self.open_ports = open_ports
|
|
self.portstart = portstart
|
|
self.portend = portend
|
|
self.extra_ports = extra_ports
|
|
|
|
def scan(self, port):
|
|
"""
|
|
Scans a specific port on the target IP.
|
|
"""
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(2)
|
|
try:
|
|
con = s.connect((self.target, port))
|
|
self.open_ports[self.target].append(port)
|
|
con.close()
|
|
except:
|
|
pass
|
|
finally:
|
|
s.close() # Ensure the socket is closed
|
|
|
|
def start(self):
|
|
"""
|
|
Starts the port scanning process for the specified range and extra ports.
|
|
"""
|
|
try:
|
|
for port in range(self.portstart, self.portend):
|
|
t = threading.Thread(target=self.scan_with_semaphore, args=(port,))
|
|
t.start()
|
|
for port in self.extra_ports:
|
|
t = threading.Thread(target=self.scan_with_semaphore, args=(port,))
|
|
t.start()
|
|
except Exception as e:
|
|
self.logger.info(f"Maximum threads defined in the semaphore reached: {e}")
|
|
|
|
def scan_with_semaphore(self, port):
|
|
"""
|
|
Scans a port using a semaphore to limit concurrent threads.
|
|
"""
|
|
with self.outer_instance.semaphore:
|
|
self.scan(port)
|
|
|
|
class ScanPorts:
|
|
"""
|
|
Helper class to manage the overall port scanning process for a network.
|
|
"""
|
|
def __init__(self, outer_instance, network, portstart, portend, extra_ports):
|
|
self.outer_instance = outer_instance
|
|
self.logger = logger
|
|
self.progress = 0
|
|
self.network = network
|
|
self.portstart = portstart
|
|
self.portend = portend
|
|
self.extra_ports = extra_ports
|
|
self.currentdir = outer_instance.currentdir
|
|
self.scan_results_dir = outer_instance.shared_data.scan_results_dir
|
|
self.timestamp = outer_instance.get_current_timestamp()
|
|
self.csv_scan_file = os.path.join(self.scan_results_dir, f'scan_{network.network_address}_{self.timestamp}.csv')
|
|
self.csv_result_file = os.path.join(self.scan_results_dir, f'result_{network.network_address}_{self.timestamp}.csv')
|
|
self.netkbfile = outer_instance.shared_data.netkbfile
|
|
self.ip_data = None
|
|
self.open_ports = {}
|
|
self.all_ports = []
|
|
self.ip_hostname_list = []
|
|
|
|
def scan_network_and_write_to_csv(self):
|
|
"""
|
|
Scans the network and writes the results to a CSV file.
|
|
"""
|
|
self.outer_instance.check_if_csv_scan_file_exists(self.csv_scan_file, self.csv_result_file, self.netkbfile)
|
|
with self.outer_instance.lock:
|
|
try:
|
|
with open(self.csv_scan_file, 'a', newline='') as file:
|
|
writer = csv.writer(file)
|
|
writer.writerow(['IP', 'Hostname', 'MAC Address'])
|
|
except Exception as e:
|
|
self.outer_instance.logger.error(f"Error in scan_network_and_write_to_csv (initial write): {e}")
|
|
|
|
# Use nmap to scan for live hosts
|
|
self.outer_instance.nm.scan(hosts=str(self.network), arguments='-sn')
|
|
for host in self.outer_instance.nm.all_hosts():
|
|
t = threading.Thread(target=self.scan_host, args=(host,))
|
|
t.start()
|
|
|
|
time.sleep(5)
|
|
self.outer_instance.sort_and_write_csv(self.csv_scan_file)
|
|
|
|
def scan_host(self, ip):
|
|
"""
|
|
Scans a specific host to check if it is alive and retrieves its hostname and MAC address.
|
|
"""
|
|
if self.outer_instance.blacklistcheck and ip in self.outer_instance.ip_scan_blacklist:
|
|
return
|
|
try:
|
|
hostname = self.outer_instance.nm[ip].hostname() if self.outer_instance.nm[ip].hostname() else ''
|
|
mac = self.outer_instance.get_mac_address(ip, hostname)
|
|
if not self.outer_instance.blacklistcheck or mac not in self.outer_instance.mac_scan_blacklist:
|
|
with self.outer_instance.lock:
|
|
with open(self.csv_scan_file, 'a', newline='') as file:
|
|
writer = csv.writer(file)
|
|
writer.writerow([ip, hostname, mac])
|
|
self.ip_hostname_list.append((ip, hostname, mac))
|
|
except Exception as e:
|
|
self.outer_instance.logger.error(f"Error getting MAC address or writing to file for IP {ip}: {e}")
|
|
self.progress += 1
|
|
time.sleep(0.1) # Adding a small delay to avoid overwhelming the network
|
|
|
|
def get_progress(self):
|
|
"""
|
|
Returns the progress of the scanning process.
|
|
"""
|
|
return (self.progress / self.total_ips) * 100
|
|
|
|
def start(self):
|
|
"""
|
|
Starts the network and port scanning process.
|
|
"""
|
|
self.scan_network_and_write_to_csv()
|
|
time.sleep(7)
|
|
self.ip_data = self.outer_instance.GetIpFromCsv(self.outer_instance, self.csv_scan_file)
|
|
self.open_ports = {ip: [] for ip in self.ip_data.ip_list}
|
|
with Progress() as progress:
|
|
task = progress.add_task("[cyan]Scanning IPs...", total=len(self.ip_data.ip_list))
|
|
for ip in self.ip_data.ip_list:
|
|
progress.update(task, advance=1)
|
|
port_scanner = self.outer_instance.PortScanner(self.outer_instance, ip, self.open_ports, self.portstart, self.portend, self.extra_ports)
|
|
port_scanner.start()
|
|
|
|
self.all_ports = sorted(list(set(port for ports in self.open_ports.values() for port in ports)))
|
|
alive_ips = set(self.ip_data.ip_list)
|
|
return self.ip_data, self.open_ports, self.all_ports, self.csv_result_file, self.netkbfile, alive_ips
|
|
|
|
class LiveStatusUpdater:
|
|
"""
|
|
Helper class to update the live status of hosts and clean up scan results.
|
|
"""
|
|
def __init__(self, source_csv_path, output_csv_path):
|
|
self.logger = logger
|
|
self.source_csv_path = source_csv_path
|
|
self.output_csv_path = output_csv_path
|
|
|
|
def read_csv(self):
|
|
"""
|
|
Reads the source CSV file into a DataFrame.
|
|
"""
|
|
try:
|
|
self.df = pd.read_csv(self.source_csv_path)
|
|
except Exception as e:
|
|
self.logger.error(f"Error in read_csv: {e}")
|
|
|
|
def calculate_open_ports(self):
|
|
"""
|
|
Calculates the total number of open ports for alive hosts.
|
|
"""
|
|
try:
|
|
alive_df = self.df[self.df['Alive'] == 1].copy()
|
|
alive_df.loc[:, 'Ports'] = alive_df['Ports'].fillna('')
|
|
alive_df.loc[:, 'Port Count'] = alive_df['Ports'].apply(lambda x: len(x.split(';')) if x else 0)
|
|
self.total_open_ports = alive_df['Port Count'].sum()
|
|
except Exception as e:
|
|
self.logger.error(f"Error in calculate_open_ports: {e}")
|
|
|
|
def calculate_hosts_counts(self):
|
|
"""
|
|
Calculates the total and alive host counts.
|
|
"""
|
|
try:
|
|
# self.all_known_hosts_count = self.df.shape[0]
|
|
self.all_known_hosts_count = self.df[self.df['MAC Address'] != 'STANDALONE'].shape[0]
|
|
self.alive_hosts_count = self.df[self.df['Alive'] == 1].shape[0]
|
|
except Exception as e:
|
|
self.logger.error(f"Error in calculate_hosts_counts: {e}")
|
|
|
|
def save_results(self):
|
|
"""
|
|
Saves the calculated results to the output CSV file.
|
|
"""
|
|
try:
|
|
if os.path.exists(self.output_csv_path):
|
|
results_df = pd.read_csv(self.output_csv_path)
|
|
results_df.loc[0, 'Total Open Ports'] = self.total_open_ports
|
|
results_df.loc[0, 'Alive Hosts Count'] = self.alive_hosts_count
|
|
results_df.loc[0, 'All Known Hosts Count'] = self.all_known_hosts_count
|
|
results_df.to_csv(self.output_csv_path, index=False)
|
|
else:
|
|
self.logger.error(f"File {self.output_csv_path} does not exist.")
|
|
except Exception as e:
|
|
self.logger.error(f"Error in save_results: {e}")
|
|
|
|
def update_livestatus(self):
|
|
"""
|
|
Updates the live status of hosts and saves the results.
|
|
"""
|
|
try:
|
|
self.read_csv()
|
|
self.calculate_open_ports()
|
|
self.calculate_hosts_counts()
|
|
self.save_results()
|
|
self.logger.info("Livestatus updated")
|
|
self.logger.info(f"Results saved to {self.output_csv_path}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error in update_livestatus: {e}")
|
|
|
|
def clean_scan_results(self, scan_results_dir):
|
|
"""
|
|
Cleans up old scan result files, keeping only the most recent ones.
|
|
"""
|
|
try:
|
|
files = glob.glob(scan_results_dir + '/*')
|
|
files.sort(key=os.path.getmtime)
|
|
for file in files[:-20]:
|
|
os.remove(file)
|
|
self.logger.info("Scan results cleaned up")
|
|
except Exception as e:
|
|
self.logger.error(f"Error in clean_scan_results: {e}")
|
|
|
|
def scan(self):
|
|
"""
|
|
Initiates the network scan, updates the netkb file, and displays the results.
|
|
"""
|
|
try:
|
|
self.shared_data.bjornorch_status = "NetworkScanner"
|
|
self.logger.info(f"Starting Network Scanner")
|
|
network = self.get_network()
|
|
self.shared_data.bjornstatustext2 = str(network)
|
|
portstart = self.shared_data.portstart
|
|
portend = self.shared_data.portend
|
|
extra_ports = self.shared_data.portlist
|
|
scanner = self.ScanPorts(self, network, portstart, portend, extra_ports)
|
|
ip_data, open_ports, all_ports, csv_result_file, netkbfile, alive_ips = scanner.start()
|
|
|
|
alive_macs = set(ip_data.mac_list)
|
|
|
|
table = Table(title="Scan Results", show_lines=True)
|
|
table.add_column("IP", style="cyan", no_wrap=True)
|
|
table.add_column("Hostname", style="cyan", no_wrap=True)
|
|
table.add_column("Alive", style="cyan", no_wrap=True)
|
|
table.add_column("MAC Address", style="cyan", no_wrap=True)
|
|
for port in all_ports:
|
|
table.add_column(f"{port}", style="green")
|
|
|
|
netkb_data = []
|
|
for ip, ports, hostname, mac in zip(ip_data.ip_list, open_ports.values(), ip_data.hostname_list, ip_data.mac_list):
|
|
if self.blacklistcheck and (mac in self.mac_scan_blacklist or ip in self.ip_scan_blacklist):
|
|
continue
|
|
alive = '1' if mac in alive_macs else '0'
|
|
row = [ip, hostname, alive, mac] + [Text(str(port), style="green bold") if port in ports else Text("", style="on red") for port in all_ports]
|
|
table.add_row(*row)
|
|
netkb_data.append([mac, ip, hostname, ports])
|
|
|
|
with self.lock:
|
|
with open(csv_result_file, 'w', newline='') as file:
|
|
writer = csv.writer(file)
|
|
writer.writerow(["IP", "Hostname", "Alive", "MAC Address"] + [str(port) for port in all_ports])
|
|
for ip, ports, hostname, mac in zip(ip_data.ip_list, open_ports.values(), ip_data.hostname_list, ip_data.mac_list):
|
|
if self.blacklistcheck and (mac in self.mac_scan_blacklist or ip in self.ip_scan_blacklist):
|
|
continue
|
|
alive = '1' if mac in alive_macs else '0'
|
|
writer.writerow([ip, hostname, alive, mac] + [str(port) if port in ports else '' for port in all_ports])
|
|
|
|
self.update_netkb(netkbfile, netkb_data, alive_macs)
|
|
|
|
if self.displaying_csv:
|
|
self.display_csv(csv_result_file)
|
|
|
|
source_csv_path = self.shared_data.netkbfile
|
|
output_csv_path = self.shared_data.livestatusfile
|
|
|
|
updater = self.LiveStatusUpdater(source_csv_path, output_csv_path)
|
|
updater.update_livestatus()
|
|
updater.clean_scan_results(self.shared_data.scan_results_dir)
|
|
except Exception as e:
|
|
self.logger.error(f"Error in scan: {e}")
|
|
|
|
def start(self):
|
|
"""
|
|
Starts the scanner in a separate thread.
|
|
"""
|
|
if not self.running:
|
|
self.running = True
|
|
self.thread = threading.Thread(target=self.scan)
|
|
self.thread.start()
|
|
logger.info("NetworkScanner started.")
|
|
|
|
def stop(self):
|
|
"""
|
|
Stops the scanner.
|
|
"""
|
|
if self.running:
|
|
self.running = False
|
|
if self.thread.is_alive():
|
|
self.thread.join()
|
|
logger.info("NetworkScanner stopped.")
|
|
|
|
if __name__ == "__main__":
|
|
shared_data = SharedData()
|
|
scanner = NetworkScanner(shared_data)
|
|
scanner.scan()
|