diff --git a/display.py b/display.py index 3967299..2f78a85 100644 --- a/display.py +++ b/display.py @@ -40,6 +40,18 @@ class Display: self.screen_reversed = self.shared_data.screen_reversed self.web_screen_reversed = self.shared_data.web_screen_reversed + # Define frise positions for different display types + self.frise_positions = { + "epd2in7": { + "x": 50, + "y": 160 + }, + "default": { # Default position for other display types + "x": 0, + "y": 160 + } + } + try: self.epd_helper = self.shared_data.epd_helper self.epd_helper.init_partial_update() @@ -63,6 +75,15 @@ class Display: self.scale_factor_x = self.shared_data.scale_factor_x self.scale_factor_y = self.shared_data.scale_factor_y + def get_frise_position(self): + """Get the frise position based on the display type.""" + display_type = self.config.get("epd_type", "default") + position = self.frise_positions.get(display_type, self.frise_positions["default"]) + return ( + int(position["x"] * self.scale_factor_x), + int(position["y"] * self.scale_factor_y) + ) + def schedule_update_shared_data(self): """Periodically update the shared data with the latest system information.""" while not self.shared_data.display_should_exit: @@ -87,6 +108,7 @@ class Display: time.sleep(random.uniform(self.shared_data.image_display_delaymin, self.shared_data.image_display_delaymax)) except Exception as e: logger.error(f"An error occurred in update_main_image: {e}") + def get_open_files(self): """Get the number of open FD files on the system.""" try: @@ -102,17 +124,15 @@ class Display: with self.semaphore: try: if not os.path.exists(self.shared_data.vuln_summary_file): - # Create the file with the necessary columns if it does not exist df = pd.DataFrame(columns=["IP", "Hostname", "MAC Address", "Port", "Vulnerabilities"]) df.to_csv(self.shared_data.vuln_summary_file, index=False) self.shared_data.vulnnbr = 0 logger.info("Vulnerability summary file created.") else: - # Load the netkbfile to check for "Alive" IPs if os.path.exists(self.shared_data.netkbfile): with open(self.shared_data.netkbfile, 'r') as file: netkb_df = pd.read_csv(file) - alive_macs = set(netkb_df[(netkb_df["Alive"] == 1) & (netkb_df["MAC Address"] != "STANDALONE")]["MAC Address"]) # Get all alive MACs based on the 'Alive' column and ignore "STANDALONE" + alive_macs = set(netkb_df[(netkb_df["Alive"] == 1) & (netkb_df["MAC Address"] != "STANDALONE")]["MAC Address"]) else: alive_macs = set() @@ -122,19 +142,17 @@ class Display: for index, row in df.iterrows(): mac_address = row["MAC Address"] - if mac_address in alive_macs and mac_address != "STANDALONE": # Ignore "STANDALONE" MAC addresses + if mac_address in alive_macs and mac_address != "STANDALONE": vulnerabilities = row["Vulnerabilities"] - if pd.isna(vulnerabilities) or not isinstance(vulnerabilities, str): # Check if vulnerabilities is NaN or not a string - # logger.debug(f"No valid vulnerabilities for MAC Address: {mac_address}") + if pd.isna(vulnerabilities) or not isinstance(vulnerabilities, str): continue if vulnerabilities and isinstance(vulnerabilities, str): - all_vulnerabilities.update(vulnerabilities.split("; ")) # Add the vulnerabilities to the set + all_vulnerabilities.update(vulnerabilities.split("; ")) self.shared_data.vulnnbr = len(all_vulnerabilities) logger.debug(f"Updated vulnerabilities count: {self.shared_data.vulnnbr}") - # Update the livestatusfile if os.path.exists(self.shared_data.livestatusfile): with open(self.shared_data.livestatusfile, 'r+') as livestatus_file: livestatus_df = pd.read_csv(livestatus_file) @@ -146,50 +164,43 @@ class Display: except Exception as e: logger.error(f"An error occurred in update_vuln_count: {e}") - def update_shared_data(self): """Update the shared data with the latest system information.""" with self.semaphore: - """ - Update shared data from CSV files live_status.csv and cracked_passwords.csv. - """ try: with open(self.shared_data.livestatusfile, 'r') as file: livestatus_df = pd.read_csv(file) - self.shared_data.portnbr = livestatus_df['Total Open Ports'].iloc[0] # Get the total number of open ports - self.shared_data.targetnbr = livestatus_df['Alive Hosts Count'].iloc[0] # Get the total number of alive hosts - self.shared_data.networkkbnbr = livestatus_df['All Known Hosts Count'].iloc[0] # Get the total number of known hosts - self.shared_data.vulnnbr = livestatus_df['Vulnerabilities Count'].iloc[0] # Get the total number of vulnerable ports + self.shared_data.portnbr = livestatus_df['Total Open Ports'].iloc[0] + self.shared_data.targetnbr = livestatus_df['Alive Hosts Count'].iloc[0] + self.shared_data.networkkbnbr = livestatus_df['All Known Hosts Count'].iloc[0] + self.shared_data.vulnnbr = livestatus_df['Vulnerabilities Count'].iloc[0] - crackedpw_files = glob.glob(f"{self.shared_data.crackedpwddir}/*.csv") # Get all CSV files in the cracked password directory + crackedpw_files = glob.glob(f"{self.shared_data.crackedpwddir}/*.csv") total_passwords = 0 for file in crackedpw_files: with open(file, 'r') as f: total_passwords += len(pd.read_csv(f, usecols=[0])) - self.shared_data.crednbr = total_passwords # Set the total number of cracked passwords to shared data + self.shared_data.crednbr = total_passwords - total_data = sum([len(files) for r, d, files in os.walk(self.shared_data.datastolendir)]) # Get the total number of data files in the data store directory + total_data = sum([len(files) for r, d, files in os.walk(self.shared_data.datastolendir)]) self.shared_data.datanbr = total_data - total_zombies = sum([len(files) for r, d, files in os.walk(self.shared_data.zombiesdir)]) # Get the total number of zombies in the zombies directory + total_zombies = sum([len(files) for r, d, files in os.walk(self.shared_data.zombiesdir)]) self.shared_data.zombiesnbr = total_zombies total_attacks = sum([len(files) for r, d, files in os.walk(self.shared_data.actions_dir) if not r.endswith("__pycache__")]) - 2 self.shared_data.attacksnbr = total_attacks self.shared_data.update_stats() - # Update Bluetooth, WiFi, and PAN connection status self.shared_data.manual_mode = self.is_manual_mode() if self.shared_data.manual_mode: self.manual_mode_txt = "M" else: self.manual_mode_txt = "A" - # # self.shared_data.bluetooth_active = self.is_bluetooth_connected() self.shared_data.wifi_connected = self.is_wifi_connected() self.shared_data.usb_active = self.is_usb_connected() - ########self.shared_data.pan_connected = self.is_interface_connected('pan0') or self.is_interface_connected('usb0') self.get_open_files() except (FileNotFoundError, pd.errors.EmptyDataError) as e: @@ -198,11 +209,11 @@ class Display: logger.error(f"Error updating shared data: {e}") def display_comment(self, status): - """ Display the comment based on the status of the BjornOrch. """ - comment = self.commentaire_ia.get_commentaire(status) # Get the comment from Commentaireia + """Display the comment based on the status of the BjornOrch.""" + comment = self.commentaire_ia.get_commentaire(status) if comment: - self.shared_data.bjornsay = comment # Set the comment to shared data - self.shared_data.bjornstatustext = self.shared_data.bjornorch_status # Set the status to shared data + self.shared_data.bjornsay = comment + self.shared_data.bjornstatustext = self.shared_data.bjornorch_status else: pass @@ -222,16 +233,14 @@ class Display: # # # return False def is_wifi_connected(self): - """ - Check if WiFi is connected by checking the current SSID. - """ + """Check if WiFi is connected by checking the current SSID.""" try: result = subprocess.Popen(['iwgetid', '-r'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) ssid, error = result.communicate() if result.returncode != 0: logger.error(f"Error executing 'iwgetid -r': {error}") return False - return bool(ssid.strip()) # Return True if connected to a WiFi network + return bool(ssid.strip()) except Exception as e: logger.error(f"Error checking WiFi status: {e}") return False @@ -241,10 +250,7 @@ class Display: return self.shared_data.manual_mode def is_interface_connected(self, interface): - """ - Check if any device is connected to the specified interface (pan0 or usb0) - by checking the output of 'ip neigh show dev '. - """ + """Check if any device is connected to the specified interface.""" try: result = subprocess.Popen(['ip', 'neigh', 'show', 'dev', interface], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) output, error = result.communicate() @@ -257,9 +263,7 @@ class Display: return False def is_usb_connected(self): - """ - Check if any device is connected to the USB (usb0) interface by checking the output of 'ip neigh show dev usb0'. - """ + """Check if any device is connected to the USB interface.""" try: result = subprocess.Popen(['ip', 'neigh', 'show', 'dev', 'usb0'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) output, error = result.communicate() @@ -272,19 +276,18 @@ class Display: return False def run(self): - """ - Main loop for updating the EPD display with shared data. - """ + """Main loop for updating the EPD display with shared data.""" self.manual_mode_txt = "" while not self.shared_data.display_should_exit: try: self.epd_helper.init_partial_update() - self.display_comment(self.shared_data.bjornorch_status) # Display the comment + self.display_comment(self.shared_data.bjornorch_status) image = Image.new('1', (self.shared_data.width, self.shared_data.height)) draw = ImageDraw.Draw(image) draw.rectangle((0, 0, self.shared_data.width, self.shared_data.height), fill=255) draw.text((int(37 * self.scale_factor_x), int(5 * self.scale_factor_y)), "BJORN", font=self.shared_data.font_viking, fill=0) draw.text((int(110 * self.scale_factor_x), int(170 * self.scale_factor_y)), self.manual_mode_txt, font=self.shared_data.font_arial14, fill=0) + if self.shared_data.wifi_connected: image.paste(self.shared_data.wifi, (int(3 * self.scale_factor_x), int(3 * self.scale_factor_y))) # # # if self.shared_data.bluetooth_active: @@ -316,11 +319,9 @@ class Display: draw.text((int(35 * self.scale_factor_x), int(65 * self.scale_factor_y)), self.shared_data.bjornstatustext, font=self.shared_data.font_arial9, fill=0) draw.text((int(35 * self.scale_factor_x), int(75 * self.scale_factor_y)), self.shared_data.bjornstatustext2, font=self.shared_data.font_arial9, fill=0) - #Fix the frise for the 2in7 display - if self.config["epd_type"] == "epd2in7": - image.paste(self.shared_data.frise, (int(50 * self.scale_factor_x), int(160 * self.scale_factor_y))) - - image.paste(self.shared_data.frise, (int(0 * self.scale_factor_x), int(160 * self.scale_factor_y))) + # Get frise position based on display type + frise_x, frise_y = self.get_frise_position() + image.paste(self.shared_data.frise, (frise_x, frise_y)) draw.rectangle((1, 1, self.shared_data.width - 1, self.shared_data.height - 1), outline=0) draw.line((1, 20, self.shared_data.width - 1, 20), fill=0) @@ -336,11 +337,11 @@ class Display: logger.error("Main image not found in shared_data.") for line in lines: - draw.text((int(4 * self.scale_factor_x), y_text), line, font=self.shared_data.font_arialbold, fill=0) # Display the comment - y_text += (self.shared_data.font_arialbold.getbbox(line)[3] - self.shared_data.font_arialbold.getbbox(line)[1]) + 3 # Calculate the height of the text depending on the font size, 3 means the space between lines - if self.screen_reversed: - image = image.transpose(Image.ROTATE_180) + draw.text((int(4 * self.scale_factor_x), y_text), line, font=self.shared_data.font_arialbold, fill=0) + y_text += (self.shared_data.font_arialbold.getbbox(line)[3] - self.shared_data.font_arialbold.getbbox(line)[1]) + 3 + if self.screen_reversed: + image = image.transpose(Image.ROTATE_180) self.epd_helper.display_partial(image) self.epd_helper.display_partial(image) @@ -354,12 +355,12 @@ class Display: time.sleep(self.shared_data.screen_delay) except Exception as e: - logger.error(f"An exception occurred: {e}") + logger.error(f"An error occurred: {e}") def handle_exit_display(signum, frame, display_thread): """Handle the exit signal and close the display.""" global should_exit - shared_data.display_should_exit = True + shared_data.display_should_exit = True logger.info("Exit signal received. Waiting for the main loop to finish...") try: if main_loop and main_loop.epd: @@ -369,7 +370,7 @@ def handle_exit_display(signum, frame, display_thread): logger.error(f"Error while closing the display: {e}") display_thread.join() logger.info("Main loop finished. Clean exit.") - sys.exit(0) # Used sys.exit(0) instead of exit(0) + sys.exit(0) # Declare main_loop globally main_loop = None @@ -387,4 +388,4 @@ if __name__ == "__main__": except Exception as e: logger.error(f"An exception occurred during program execution: {e}") handle_exit_display(signal.SIGINT, None, display_thread) - sys.exit(1) # Used sys.exit(1) instead of exit(1) + sys.exit(1) \ No newline at end of file