Fix commit error

This commit is contained in:
Fabien POLLY
2024-11-13 00:14:11 +01:00
parent d0d771e6cb
commit 779a0b3101

View File

@@ -40,6 +40,18 @@ class Display:
self.screen_reversed = self.shared_data.screen_reversed self.screen_reversed = self.shared_data.screen_reversed
self.web_screen_reversed = self.shared_data.web_screen_reversed self.web_screen_reversed = self.shared_data.web_screen_reversed
# Define frise positions for different display types
self.frise_positions = {
"epd2in7": {
"x": 50,
"y": 160
},
"default": { # Default position for other display types
"x": 0,
"y": 160
}
}
try: try:
self.epd_helper = self.shared_data.epd_helper self.epd_helper = self.shared_data.epd_helper
self.epd_helper.init_partial_update() self.epd_helper.init_partial_update()
@@ -63,6 +75,15 @@ class Display:
self.scale_factor_x = self.shared_data.scale_factor_x self.scale_factor_x = self.shared_data.scale_factor_x
self.scale_factor_y = self.shared_data.scale_factor_y self.scale_factor_y = self.shared_data.scale_factor_y
def get_frise_position(self):
"""Get the frise position based on the display type."""
display_type = self.config.get("epd_type", "default")
position = self.frise_positions.get(display_type, self.frise_positions["default"])
return (
int(position["x"] * self.scale_factor_x),
int(position["y"] * self.scale_factor_y)
)
def schedule_update_shared_data(self): def schedule_update_shared_data(self):
"""Periodically update the shared data with the latest system information.""" """Periodically update the shared data with the latest system information."""
while not self.shared_data.display_should_exit: while not self.shared_data.display_should_exit:
@@ -87,6 +108,7 @@ class Display:
time.sleep(random.uniform(self.shared_data.image_display_delaymin, self.shared_data.image_display_delaymax)) time.sleep(random.uniform(self.shared_data.image_display_delaymin, self.shared_data.image_display_delaymax))
except Exception as e: except Exception as e:
logger.error(f"An error occurred in update_main_image: {e}") logger.error(f"An error occurred in update_main_image: {e}")
def get_open_files(self): def get_open_files(self):
"""Get the number of open FD files on the system.""" """Get the number of open FD files on the system."""
try: try:
@@ -102,17 +124,15 @@ class Display:
with self.semaphore: with self.semaphore:
try: try:
if not os.path.exists(self.shared_data.vuln_summary_file): if not os.path.exists(self.shared_data.vuln_summary_file):
# Create the file with the necessary columns if it does not exist
df = pd.DataFrame(columns=["IP", "Hostname", "MAC Address", "Port", "Vulnerabilities"]) df = pd.DataFrame(columns=["IP", "Hostname", "MAC Address", "Port", "Vulnerabilities"])
df.to_csv(self.shared_data.vuln_summary_file, index=False) df.to_csv(self.shared_data.vuln_summary_file, index=False)
self.shared_data.vulnnbr = 0 self.shared_data.vulnnbr = 0
logger.info("Vulnerability summary file created.") logger.info("Vulnerability summary file created.")
else: else:
# Load the netkbfile to check for "Alive" IPs
if os.path.exists(self.shared_data.netkbfile): if os.path.exists(self.shared_data.netkbfile):
with open(self.shared_data.netkbfile, 'r') as file: with open(self.shared_data.netkbfile, 'r') as file:
netkb_df = pd.read_csv(file) netkb_df = pd.read_csv(file)
alive_macs = set(netkb_df[(netkb_df["Alive"] == 1) & (netkb_df["MAC Address"] != "STANDALONE")]["MAC Address"]) # Get all alive MACs based on the 'Alive' column and ignore "STANDALONE" alive_macs = set(netkb_df[(netkb_df["Alive"] == 1) & (netkb_df["MAC Address"] != "STANDALONE")]["MAC Address"])
else: else:
alive_macs = set() alive_macs = set()
@@ -122,19 +142,17 @@ class Display:
for index, row in df.iterrows(): for index, row in df.iterrows():
mac_address = row["MAC Address"] mac_address = row["MAC Address"]
if mac_address in alive_macs and mac_address != "STANDALONE": # Ignore "STANDALONE" MAC addresses if mac_address in alive_macs and mac_address != "STANDALONE":
vulnerabilities = row["Vulnerabilities"] vulnerabilities = row["Vulnerabilities"]
if pd.isna(vulnerabilities) or not isinstance(vulnerabilities, str): # Check if vulnerabilities is NaN or not a string if pd.isna(vulnerabilities) or not isinstance(vulnerabilities, str):
# logger.debug(f"No valid vulnerabilities for MAC Address: {mac_address}")
continue continue
if vulnerabilities and isinstance(vulnerabilities, str): if vulnerabilities and isinstance(vulnerabilities, str):
all_vulnerabilities.update(vulnerabilities.split("; ")) # Add the vulnerabilities to the set all_vulnerabilities.update(vulnerabilities.split("; "))
self.shared_data.vulnnbr = len(all_vulnerabilities) self.shared_data.vulnnbr = len(all_vulnerabilities)
logger.debug(f"Updated vulnerabilities count: {self.shared_data.vulnnbr}") logger.debug(f"Updated vulnerabilities count: {self.shared_data.vulnnbr}")
# Update the livestatusfile
if os.path.exists(self.shared_data.livestatusfile): if os.path.exists(self.shared_data.livestatusfile):
with open(self.shared_data.livestatusfile, 'r+') as livestatus_file: with open(self.shared_data.livestatusfile, 'r+') as livestatus_file:
livestatus_df = pd.read_csv(livestatus_file) livestatus_df = pd.read_csv(livestatus_file)
@@ -146,50 +164,43 @@ class Display:
except Exception as e: except Exception as e:
logger.error(f"An error occurred in update_vuln_count: {e}") logger.error(f"An error occurred in update_vuln_count: {e}")
def update_shared_data(self): def update_shared_data(self):
"""Update the shared data with the latest system information.""" """Update the shared data with the latest system information."""
with self.semaphore: with self.semaphore:
"""
Update shared data from CSV files live_status.csv and cracked_passwords.csv.
"""
try: try:
with open(self.shared_data.livestatusfile, 'r') as file: with open(self.shared_data.livestatusfile, 'r') as file:
livestatus_df = pd.read_csv(file) livestatus_df = pd.read_csv(file)
self.shared_data.portnbr = livestatus_df['Total Open Ports'].iloc[0] # Get the total number of open ports self.shared_data.portnbr = livestatus_df['Total Open Ports'].iloc[0]
self.shared_data.targetnbr = livestatus_df['Alive Hosts Count'].iloc[0] # Get the total number of alive hosts self.shared_data.targetnbr = livestatus_df['Alive Hosts Count'].iloc[0]
self.shared_data.networkkbnbr = livestatus_df['All Known Hosts Count'].iloc[0] # Get the total number of known hosts self.shared_data.networkkbnbr = livestatus_df['All Known Hosts Count'].iloc[0]
self.shared_data.vulnnbr = livestatus_df['Vulnerabilities Count'].iloc[0] # Get the total number of vulnerable ports self.shared_data.vulnnbr = livestatus_df['Vulnerabilities Count'].iloc[0]
crackedpw_files = glob.glob(f"{self.shared_data.crackedpwddir}/*.csv") # Get all CSV files in the cracked password directory crackedpw_files = glob.glob(f"{self.shared_data.crackedpwddir}/*.csv")
total_passwords = 0 total_passwords = 0
for file in crackedpw_files: for file in crackedpw_files:
with open(file, 'r') as f: with open(file, 'r') as f:
total_passwords += len(pd.read_csv(f, usecols=[0])) total_passwords += len(pd.read_csv(f, usecols=[0]))
self.shared_data.crednbr = total_passwords # Set the total number of cracked passwords to shared data self.shared_data.crednbr = total_passwords
total_data = sum([len(files) for r, d, files in os.walk(self.shared_data.datastolendir)]) # Get the total number of data files in the data store directory total_data = sum([len(files) for r, d, files in os.walk(self.shared_data.datastolendir)])
self.shared_data.datanbr = total_data self.shared_data.datanbr = total_data
total_zombies = sum([len(files) for r, d, files in os.walk(self.shared_data.zombiesdir)]) # Get the total number of zombies in the zombies directory total_zombies = sum([len(files) for r, d, files in os.walk(self.shared_data.zombiesdir)])
self.shared_data.zombiesnbr = total_zombies self.shared_data.zombiesnbr = total_zombies
total_attacks = sum([len(files) for r, d, files in os.walk(self.shared_data.actions_dir) if not r.endswith("__pycache__")]) - 2 total_attacks = sum([len(files) for r, d, files in os.walk(self.shared_data.actions_dir) if not r.endswith("__pycache__")]) - 2
self.shared_data.attacksnbr = total_attacks self.shared_data.attacksnbr = total_attacks
self.shared_data.update_stats() self.shared_data.update_stats()
# Update Bluetooth, WiFi, and PAN connection status
self.shared_data.manual_mode = self.is_manual_mode() self.shared_data.manual_mode = self.is_manual_mode()
if self.shared_data.manual_mode: if self.shared_data.manual_mode:
self.manual_mode_txt = "M" self.manual_mode_txt = "M"
else: else:
self.manual_mode_txt = "A" self.manual_mode_txt = "A"
# # self.shared_data.bluetooth_active = self.is_bluetooth_connected()
self.shared_data.wifi_connected = self.is_wifi_connected() self.shared_data.wifi_connected = self.is_wifi_connected()
self.shared_data.usb_active = self.is_usb_connected() self.shared_data.usb_active = self.is_usb_connected()
########self.shared_data.pan_connected = self.is_interface_connected('pan0') or self.is_interface_connected('usb0')
self.get_open_files() self.get_open_files()
except (FileNotFoundError, pd.errors.EmptyDataError) as e: except (FileNotFoundError, pd.errors.EmptyDataError) as e:
@@ -198,11 +209,11 @@ class Display:
logger.error(f"Error updating shared data: {e}") logger.error(f"Error updating shared data: {e}")
def display_comment(self, status): def display_comment(self, status):
""" Display the comment based on the status of the BjornOrch. """ """Display the comment based on the status of the BjornOrch."""
comment = self.commentaire_ia.get_commentaire(status) # Get the comment from Commentaireia comment = self.commentaire_ia.get_commentaire(status)
if comment: if comment:
self.shared_data.bjornsay = comment # Set the comment to shared data self.shared_data.bjornsay = comment
self.shared_data.bjornstatustext = self.shared_data.bjornorch_status # Set the status to shared data self.shared_data.bjornstatustext = self.shared_data.bjornorch_status
else: else:
pass pass
@@ -222,16 +233,14 @@ class Display:
# # # return False # # # return False
def is_wifi_connected(self): def is_wifi_connected(self):
""" """Check if WiFi is connected by checking the current SSID."""
Check if WiFi is connected by checking the current SSID.
"""
try: try:
result = subprocess.Popen(['iwgetid', '-r'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) result = subprocess.Popen(['iwgetid', '-r'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
ssid, error = result.communicate() ssid, error = result.communicate()
if result.returncode != 0: if result.returncode != 0:
logger.error(f"Error executing 'iwgetid -r': {error}") logger.error(f"Error executing 'iwgetid -r': {error}")
return False return False
return bool(ssid.strip()) # Return True if connected to a WiFi network return bool(ssid.strip())
except Exception as e: except Exception as e:
logger.error(f"Error checking WiFi status: {e}") logger.error(f"Error checking WiFi status: {e}")
return False return False
@@ -241,10 +250,7 @@ class Display:
return self.shared_data.manual_mode return self.shared_data.manual_mode
def is_interface_connected(self, interface): def is_interface_connected(self, interface):
""" """Check if any device is connected to the specified interface."""
Check if any device is connected to the specified interface (pan0 or usb0)
by checking the output of 'ip neigh show dev <interface>'.
"""
try: try:
result = subprocess.Popen(['ip', 'neigh', 'show', 'dev', interface], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) result = subprocess.Popen(['ip', 'neigh', 'show', 'dev', interface], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
output, error = result.communicate() output, error = result.communicate()
@@ -257,9 +263,7 @@ class Display:
return False return False
def is_usb_connected(self): def is_usb_connected(self):
""" """Check if any device is connected to the USB interface."""
Check if any device is connected to the USB (usb0) interface by checking the output of 'ip neigh show dev usb0'.
"""
try: try:
result = subprocess.Popen(['ip', 'neigh', 'show', 'dev', 'usb0'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) result = subprocess.Popen(['ip', 'neigh', 'show', 'dev', 'usb0'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
output, error = result.communicate() output, error = result.communicate()
@@ -272,19 +276,18 @@ class Display:
return False return False
def run(self): def run(self):
""" """Main loop for updating the EPD display with shared data."""
Main loop for updating the EPD display with shared data.
"""
self.manual_mode_txt = "" self.manual_mode_txt = ""
while not self.shared_data.display_should_exit: while not self.shared_data.display_should_exit:
try: try:
self.epd_helper.init_partial_update() self.epd_helper.init_partial_update()
self.display_comment(self.shared_data.bjornorch_status) # Display the comment self.display_comment(self.shared_data.bjornorch_status)
image = Image.new('1', (self.shared_data.width, self.shared_data.height)) image = Image.new('1', (self.shared_data.width, self.shared_data.height))
draw = ImageDraw.Draw(image) draw = ImageDraw.Draw(image)
draw.rectangle((0, 0, self.shared_data.width, self.shared_data.height), fill=255) draw.rectangle((0, 0, self.shared_data.width, self.shared_data.height), fill=255)
draw.text((int(37 * self.scale_factor_x), int(5 * self.scale_factor_y)), "BJORN", font=self.shared_data.font_viking, fill=0) draw.text((int(37 * self.scale_factor_x), int(5 * self.scale_factor_y)), "BJORN", font=self.shared_data.font_viking, fill=0)
draw.text((int(110 * self.scale_factor_x), int(170 * self.scale_factor_y)), self.manual_mode_txt, font=self.shared_data.font_arial14, fill=0) draw.text((int(110 * self.scale_factor_x), int(170 * self.scale_factor_y)), self.manual_mode_txt, font=self.shared_data.font_arial14, fill=0)
if self.shared_data.wifi_connected: if self.shared_data.wifi_connected:
image.paste(self.shared_data.wifi, (int(3 * self.scale_factor_x), int(3 * self.scale_factor_y))) image.paste(self.shared_data.wifi, (int(3 * self.scale_factor_x), int(3 * self.scale_factor_y)))
# # # if self.shared_data.bluetooth_active: # # # if self.shared_data.bluetooth_active:
@@ -316,11 +319,9 @@ class Display:
draw.text((int(35 * self.scale_factor_x), int(65 * self.scale_factor_y)), self.shared_data.bjornstatustext, font=self.shared_data.font_arial9, fill=0) draw.text((int(35 * self.scale_factor_x), int(65 * self.scale_factor_y)), self.shared_data.bjornstatustext, font=self.shared_data.font_arial9, fill=0)
draw.text((int(35 * self.scale_factor_x), int(75 * self.scale_factor_y)), self.shared_data.bjornstatustext2, font=self.shared_data.font_arial9, fill=0) draw.text((int(35 * self.scale_factor_x), int(75 * self.scale_factor_y)), self.shared_data.bjornstatustext2, font=self.shared_data.font_arial9, fill=0)
#Fix the frise for the 2in7 display # Get frise position based on display type
if self.config["epd_type"] == "epd2in7": frise_x, frise_y = self.get_frise_position()
image.paste(self.shared_data.frise, (int(50 * self.scale_factor_x), int(160 * self.scale_factor_y))) image.paste(self.shared_data.frise, (frise_x, frise_y))
image.paste(self.shared_data.frise, (int(0 * self.scale_factor_x), int(160 * self.scale_factor_y)))
draw.rectangle((1, 1, self.shared_data.width - 1, self.shared_data.height - 1), outline=0) draw.rectangle((1, 1, self.shared_data.width - 1, self.shared_data.height - 1), outline=0)
draw.line((1, 20, self.shared_data.width - 1, 20), fill=0) draw.line((1, 20, self.shared_data.width - 1, 20), fill=0)
@@ -336,12 +337,12 @@ class Display:
logger.error("Main image not found in shared_data.") logger.error("Main image not found in shared_data.")
for line in lines: for line in lines:
draw.text((int(4 * self.scale_factor_x), y_text), line, font=self.shared_data.font_arialbold, fill=0) # Display the comment draw.text((int(4 * self.scale_factor_x), y_text), line, font=self.shared_data.font_arialbold, fill=0)
y_text += (self.shared_data.font_arialbold.getbbox(line)[3] - self.shared_data.font_arialbold.getbbox(line)[1]) + 3 # Calculate the height of the text depending on the font size, 3 means the space between lines y_text += (self.shared_data.font_arialbold.getbbox(line)[3] - self.shared_data.font_arialbold.getbbox(line)[1]) + 3
if self.screen_reversed: if self.screen_reversed:
image = image.transpose(Image.ROTATE_180) image = image.transpose(Image.ROTATE_180)
self.epd_helper.display_partial(image) self.epd_helper.display_partial(image)
self.epd_helper.display_partial(image) self.epd_helper.display_partial(image)
@@ -354,12 +355,12 @@ class Display:
time.sleep(self.shared_data.screen_delay) time.sleep(self.shared_data.screen_delay)
except Exception as e: except Exception as e:
logger.error(f"An exception occurred: {e}") logger.error(f"An error occurred: {e}")
def handle_exit_display(signum, frame, display_thread): def handle_exit_display(signum, frame, display_thread):
"""Handle the exit signal and close the display.""" """Handle the exit signal and close the display."""
global should_exit global should_exit
shared_data.display_should_exit = True shared_data.display_should_exit = True
logger.info("Exit signal received. Waiting for the main loop to finish...") logger.info("Exit signal received. Waiting for the main loop to finish...")
try: try:
if main_loop and main_loop.epd: if main_loop and main_loop.epd:
@@ -369,7 +370,7 @@ def handle_exit_display(signum, frame, display_thread):
logger.error(f"Error while closing the display: {e}") logger.error(f"Error while closing the display: {e}")
display_thread.join() display_thread.join()
logger.info("Main loop finished. Clean exit.") logger.info("Main loop finished. Clean exit.")
sys.exit(0) # Used sys.exit(0) instead of exit(0) sys.exit(0)
# Declare main_loop globally # Declare main_loop globally
main_loop = None main_loop = None
@@ -387,4 +388,4 @@ if __name__ == "__main__":
except Exception as e: except Exception as e:
logger.error(f"An exception occurred during program execution: {e}") logger.error(f"An exception occurred during program execution: {e}")
handle_exit_display(signal.SIGINT, None, display_thread) handle_exit_display(signal.SIGINT, None, display_thread)
sys.exit(1) # Used sys.exit(1) instead of exit(1) sys.exit(1)