diff --git a/README.md b/README.md index d6b049f..ddd7213 100644 --- a/README.md +++ b/README.md @@ -22,13 +22,14 @@ Install and have your USB Rubber Ducky working in less than 5 minutes. 1. Clone the repo to get a local copy of the files. `git clone https://github.com/dbisu/pico-ducky.git` -2. Download [CircuitPython for the Raspberry Pi Pico](https://circuitpython.org/board/raspberry_pi_pico/). *Updated to 7.0.0 +2. Download [CircuitPython for the Raspberry Pi Pico](https://circuitpython.org/board/raspberry_pi_pico/). *Updated to 8.0.0 + Download [CircuitPython for the Raspberry Pi Pico W](https://circuitpython.org/board/raspberry_pi_pico_w/). *Updated to 8.0.0 3. Plug the device into a USB port while holding the boot button. It will show up as a removable media device named `RPI-RP2`. 4. Copy the downloaded `.uf2` file to the root of the Pico (`RPI-RP2`). The device will reboot and after a second or so, it will reconnect as `CIRCUITPY`. -5. Download `adafruit-circuitpython-bundle-7.x-mpy-YYYYMMDD.zip` [here](https://github.com/adafruit/Adafruit_CircuitPython_Bundle/releases/latest) and extract it outside the device. +5. Download `adafruit-circuitpython-bundle-8.x-mpy-YYYYMMDD.zip` [here](https://github.com/adafruit/Adafruit_CircuitPython_Bundle/releases/latest) and extract it outside the device. 6. Navigate to `lib` in the recently extracted folder and copy `adafruit_hid` to the `lib` folder on your Raspberry Pi Pico. @@ -36,35 +37,67 @@ Install and have your USB Rubber Ducky working in less than 5 minutes. 8. Copy `asyncio` to the `lib` folder on your Pico. -9. Copy `boot.py` from your clone to the root of your Pico. +9. Copy `adafruit_wsgi` to the `lib` folder on your Pico. -10. Copy `duckyinpython.py` as `code.py` in the root of the Raspberry Pi Pico, overwriting the previous file. - Linux: `cp duckyinpython.py +/write/ +/run/ +``` + +API endpoints +``` +/api/run/ +``` ### Setup mode To edit the payload, enter setup mode by connecting the pin 1 (`GP0`) to pin 3 (`GND`), this will stop the pico-ducky from injecting the payload in your own machine. -The easiest way to so is by using a jumper wire between those pins as seen bellow. +The easiest way to do so is by using a jumper wire between those pins as seen bellow. ![Setup mode with a jumper](images/setup-mode.png) ### USB enable/disable mode If you need the pico-ducky to not show up as a USB mass storage device for stealth, follow these instructions. -Enter setup mode. -Copy your payload script to the pico-ducky. -Disconnect the pico from your host PC. -Connect a jumper wire between pin 18 (`GND`) and pin 20 (`GPIO15`). +- Enter setup mode. +- Copy your payload script to the pico-ducky. +- Disconnect the pico from your host PC. +- Connect a jumper wire between pin 18 (`GND`) and pin 20 (`GPIO15`). This will prevent the pico-ducky from showing up as a USB drive when plugged into the target computer. -Remove the jumper and reconnect to your PC to reprogram. -The default mode is USB mass storage enabled. +- Remove the jumper and reconnect to your PC to reprogram. + +Pico: The default mode is USB mass storage enabled. +Pico W: The default mode is USB mass storage **disabled** ![USB enable/disable mode](images/usb-boot-mode.png) +### Multiple payloads + +Multiple payloads can be stored on the Pico and Pico W. +To select a payload, ground one of these pins: +- GP4 - payload.dd +- GP5 - payload2.dd +- GP10 - payload3.dd +- GP11 - payload4.dd + ### Changing Keyboard Layouts Copied from [Neradoc/Circuitpython_Keyboard_Layouts](https://github.com/Neradoc/Circuitpython_Keyboard_Layouts/blob/main/PICODUCKY.md) diff --git a/boot.py b/boot.py index dbf2b1e..7e7d434 100644 --- a/boot.py +++ b/boot.py @@ -1,13 +1,38 @@ +# License : GPLv2.0 +# copyright (c) 2023 Dave Bailey +# Author: Dave Bailey (dbisu, @daveisu) +# Pico and Pico W board support + from board import * +import board import digitalio import storage -noStorageStatus = False +noStorage = False noStoragePin = digitalio.DigitalInOut(GP15) noStoragePin.switch_to_input(pull=digitalio.Pull.UP) -noStorageStatus = not noStoragePin.value +noStorageStatus = noStoragePin.value -if(noStorageStatus == True): +# If GP15 is not connected, it will default to being pulled high (True) +# If GP is connected to GND, it will be low (False) + +# Pico: +# GP15 not connected == USB visible +# GP15 connected to GND == USB not visible + +# Pico W: +# GP15 not connected == USB NOT visible +# GP15 connected to GND == USB visible + +if(board.board_id == 'raspberry_pi_pico'): + # On Pi Pico, default to USB visible + noStorage = not noStorageStatus +elif(board.board_id == 'raspberry_pi_pico_w'): + # on Pi Pico W, default to USB hidden by default + # so webapp can access storage + noStorage = noStorageStatus + +if(noStorage == True): # don't show USB drive to host PC storage.disable_usb_drive() print("Disabling USB drive") diff --git a/code.py b/code.py new file mode 100644 index 0000000..2dd40f5 --- /dev/null +++ b/code.py @@ -0,0 +1,94 @@ +# License : GPLv2.0 +# copyright (c) 2023 Dave Bailey +# Author: Dave Bailey (dbisu, @daveisu) +# Pico and Pico W board support + + +import usb_hid +from adafruit_hid.keyboard import Keyboard + +# comment out these lines for non_US keyboards +from adafruit_hid.keyboard_layout_us import KeyboardLayoutUS as KeyboardLayout +from adafruit_hid.keycode import Keycode + +# uncomment these lines for non_US keyboards +# replace LANG with appropriate language +#from keyboard_layout_win_LANG import KeyboardLayout +#from keycode_win_LANG import Keycode + +import supervisor + + +import time +import digitalio +from board import * +import board +from duckyinpython import * +if(board.board_id == 'raspberry_pi_pico_w'): + import wifi + from webapp import * + + +# sleep at the start to allow the device to be recognized by the host computer +time.sleep(.5) + +def startWiFi(): + import ipaddress + # Get wifi details and more from a secrets.py file + try: + from secrets import secrets + except ImportError: + print("WiFi secrets are kept in secrets.py, please add them there!") + raise + + print("Connect wifi") + #wifi.radio.connect(secrets['ssid'],secrets['password']) + wifi.radio.start_ap(secrets['ssid'],secrets['password']) + + HOST = repr(wifi.radio.ipv4_address_ap) + PORT = 80 # Port to listen on + print(HOST,PORT) + +# turn off automatically reloading when files are written to the pico +#supervisor.disable_autoreload() +supervisor.runtime.autoreload = False + +if(board.board_id == 'raspberry_pi_pico'): + led = pwmio.PWMOut(board.LED, frequency=5000, duty_cycle=0) +elif(board.board_id == 'raspberry_pi_pico_w'): + led = digitalio.DigitalInOut(board.LED) + led.switch_to_output() + + +progStatus = False +progStatus = getProgrammingStatus() +print("progStatus", progStatus) +if(progStatus == False): + print("Finding payload") + # not in setup mode, inject the payload + payload = selectPayload() + print("Running ", payload) + runScript(payload) + + print("Done") +else: + print("Update your payload") + +led_state = False + +async def main_loop(): + global led,button1 + + button_task = asyncio.create_task(monitor_buttons(button1)) + if(board.board_id == 'raspberry_pi_pico_w'): + pico_led_task = asyncio.create_task(blink_pico_w_led(led)) + print("Starting Wifi") + startWiFi() + print("Starting Web Service") + webservice_task = asyncio.create_task(startWebService()) + await asyncio.gather(pico_led_task, button_task, webservice_task) + else: + pico_led_task = asyncio.create_task(blink_pico_led(led)) + await asyncio.gather(pico_led_task, button_task) + +asyncio.run(main_loop()) diff --git a/duckyinpython.py b/duckyinpython.py index 0eaa54f..a0a6087 100644 --- a/duckyinpython.py +++ b/duckyinpython.py @@ -1,7 +1,16 @@ # License : GPLv2.0 -# copyright (c) 2021 Dave Bailey +# copyright (c) 2023 Dave Bailey # Author: Dave Bailey (dbisu, @daveisu) + +import time +import digitalio +from digitalio import DigitalInOut, Pull +from adafruit_debouncer import Debouncer +import board +from board import * +import pwmio +import asyncio import usb_hid from adafruit_hid.keyboard import Keyboard @@ -14,34 +23,6 @@ from adafruit_hid.keycode import Keycode #from keyboard_layout_win_LANG import KeyboardLayout #from keycode_win_LANG import Keycode -import supervisor - -import time -import digitalio -from digitalio import DigitalInOut, Pull -from adafruit_debouncer import Debouncer -from board import * -import pwmio -import asyncio - -led = pwmio.PWMOut(LED, frequency=5000, duty_cycle=0) - -def led_pwm_up(led): - for i in range(100): - # PWM LED up and down - if i < 50: - led.duty_cycle = int(i * 2 * 65535 / 100) # Up - time.sleep(0.01) -def led_pwm_down(led): - for i in range(100): - # PWM LED up and down - if i >= 50: - led.duty_cycle = 65535 - int((i - 50) * 2 * 65535 / 100) # Down - time.sleep(0.01) - -# led = digitalio.DigitalInOut(LED) -# led.direction = digitalio.Direction.OUTPUT - duckyCommands = { 'WINDOWS': Keycode.WINDOWS, 'GUI': Keycode.GUI, 'APP': Keycode.APPLICATION, 'MENU': Keycode.APPLICATION, 'SHIFT': Keycode.SHIFT, @@ -123,13 +104,8 @@ def parseLine(line): kbd = Keyboard(usb_hid.devices) layout = KeyboardLayout(kbd) -# turn off automatically reloading when files are written to the pico -supervisor.disable_autoreload() -# sleep at the start to allow the device to be recognized by the host computer -time.sleep(.5) -led_pwm_up(led) #init button button1_pin = DigitalInOut(GP22) # defaults to input @@ -191,7 +167,6 @@ def selectPayload(): payload3State = not payload3Pin.value payload4State = not payload4Pin.value - if(payload1State == True): payload = "payload.dd" @@ -209,9 +184,14 @@ def selectPayload(): # default to payload1 payload = "payload.dd" - return payload +async def blink_led(led): + print("Blink") + if(board.board_id == 'raspberry_pi_pico'): + blink_pico_led(led) + elif(board.board_id == 'raspberry_pi_pico_w'): + blink_pico_w_led(led) async def blink_pico_led(led): print("starting blink_pico_led") @@ -219,7 +199,7 @@ async def blink_pico_led(led): while True: if led_state: #led_pwm_up(led) - print("led up") + #print("led up") for i in range(100): # PWM LED up and down if i < 50: @@ -228,7 +208,7 @@ async def blink_pico_led(led): led_state = False else: #led_pwm_down(led) - print("led down") + #print("led down") for i in range(100): # PWM LED up and down if i >= 50: @@ -237,6 +217,22 @@ async def blink_pico_led(led): led_state = True await asyncio.sleep(0) +async def blink_pico_w_led(led): + print("starting blink_pico_w_led") + led_state = False + while True: + if led_state: + #print("led on") + led.value = 1 + await asyncio.sleep(0.5) + led_state = False + else: + #print("led off") + led.value = 0 + await asyncio.sleep(0.5) + led_state = True + await asyncio.sleep(0.5) + async def monitor_buttons(button1): global inBlinkeyMode, inMenu, enableRandomBeep, enableSirenMode,pixel print("starting monitor_buttons") @@ -266,28 +262,3 @@ async def monitor_buttons(button1): button1Down = False await asyncio.sleep(0) - - - -progStatus = False -progStatus = getProgrammingStatus() - -if(progStatus == False): - # not in setup mode, inject the payload - payload = selectPayload() - print("Running ", payload) - runScript(payload) - - print("Done") -else: - print("Update your payload") - -led_state = False - -async def main_loop(): - global led,button1 - pico_led_task = asyncio.create_task(blink_pico_led(led)) - button_task = asyncio.create_task(monitor_buttons(button1)) - await asyncio.gather(pico_led_task, button_task) - -asyncio.run(main_loop()) diff --git a/webapp.py b/webapp.py new file mode 100644 index 0000000..88e6fb1 --- /dev/null +++ b/webapp.py @@ -0,0 +1,214 @@ +# License : GPLv2.0 +# copyright (c) 2023 Dave Bailey +# Author: Dave Bailey (dbisu, @daveisu) +# FeatherS2 board support + +import socketpool +import time +import os +import storage + +import wsgiserver as server +from adafruit_wsgi.wsgi_app import WSGIApp +import wifi + +from duckyinpython import * + +payload_html = """ + + Pico W Ducky +

Pico W Ducky

+ {}
PayloadActions
+
+ New Script + + +""" + +edit_html = """ + + + Script Editor + + +
+ +
+ +
+
+ Home + + +""" + +new_html = """ + + + New Script + + +
+ Script Name
+ + Script
+ +
+ +
+
+ Home + + +""" + +response_html = """ + + Pico W Ducky +

Pico W Ducky

+ {} +
+ Home + + +""" + +newrow_html = "{}Edit / Run" + +def setPayload(payload_number): + if(payload_number == 1): + payload = "payload.dd" + + else: + payload = "payload"+str(payload_number)+".dd" + + return(payload) + + +def ducky_main(request): + print("Ducky main") + payloads = [] + rows = "" + files = os.listdir() + #print(files) + for f in files: + if ('.dd' in f) == True: + payloads.append(f) + newrow = newrow_html.format(f,f,f) + #print(newrow) + rows = rows + newrow + + response = payload_html.format(rows) + + return(response) + +def cleanup_text(buffer): + return_buffer = buffer.replace('+', ' ').replace('%0D%0A', '\n') + '\n' + #print(return_buffer) + return(return_buffer) + +web_app = WSGIApp() + +@web_app.route("/ducky") +def duck_main(request): + response = ducky_main(request) + return("200 OK", [('Content-Type', 'text/html')], response) + +@web_app.route("/edit/") +def edit(request, filename): + print("Editing ", filename) + f = open(filename,"r",encoding='utf-8') + textbuffer = '' + for line in f: + textbuffer = textbuffer + line + f.close() + response = edit_html.format(filename,textbuffer) + #print(response) + + return("200 OK",[('Content-Type', 'text/html')], response) + +@web_app.route("/write/",methods=["POST"]) +def write_script(request, filename): + + data = request.body.getvalue() + fields = data.split("&") + form_data = {} + for field in fields: + key,value = field.split('=') + form_data[key] = value + + #print(form_data) + storage.remount("/",readonly=False) + f = open(filename,"w",encoding='utf-8') + textbuffer = form_data['scriptData'] + textbuffer = cleanup_text(textbuffer) + #print(textbuffer) + for line in textbuffer: + f.write(line) + f.close() + storage.remount("/",readonly=True) + response = response_html.format("Wrote script " + filename) + return("200 OK",[('Content-Type', 'text/html')], response) + +@web_app.route("/new",methods=['GET','POST']) +def write_new_script(request): + response = '' + if(request.method == 'GET'): + response = new_html + else: + data = request.body.getvalue() + fields = data.split("&") + form_data = {} + for field in fields: + key,value = field.split('=') + form_data[key] = value + #print(form_data) + filename = form_data['scriptName'] + textbuffer = form_data['scriptData'] + textbuffer = cleanup_text(textbuffer) + storage.remount("/",readonly=False) + f = open(filename,"w",encoding='utf-8') + for line in textbuffer: + f.write(line) + f.close() + storage.remount("/",readonly=True) + response = response_html.format("Wrote script " + filename) + return("200 OK",[('Content-Type', 'text/html')], response) + +@web_app.route("/run/") +def run_script(request, filename): + print("run_script ", filename) + response = response_html.format("Running script " + filename) + #print(response) + runScript(filename) + return("200 OK",[('Content-Type', 'text/html')], response) + +@web_app.route("/") +def index(request): + response = ducky_main(request) + return("200 OK", [('Content-Type', 'text/html')], response) + +@web_app.route("/api/run/") +def run_script(request, filenumber): + filename = setPayload(int(filenumber)) + print("run_script ", filenumber) + response = response_html.format("Running script " + filename) + #print(response) + runScript(filename) + return("200 OK",[('Content-Type', 'text/html')], response) + +async def startWebService(): + + HOST = repr(wifi.radio.ipv4_address_ap) + PORT = 80 # Port to listen on + print(HOST,PORT) + + wsgiServer = server.WSGIServer(80, application=web_app) + + print(f"open this IP in your browser: http://{HOST}:{PORT}/") + + # Start the server + wsgiServer.start() + while True: + wsgiServer.update_poll() + await asyncio.sleep(0) diff --git a/wsgiserver.py b/wsgiserver.py new file mode 100755 index 0000000..ff87362 --- /dev/null +++ b/wsgiserver.py @@ -0,0 +1,286 @@ +# SPDX-FileCopyrightText: Copyright (c) 2019 Matt Costi for Adafruit Industries +# +# SPDX-License-Identifier: MIT + +""" +`adafruit_esp32spi_wsgiserver` +================================================================================ + +A simple WSGI (Web Server Gateway Interface) server that interfaces with the ESP32 over SPI. +Opens a specified port on the ESP32 to listen for incoming HTTP Requests and +Accepts an Application object that must be callable, which gets called +whenever a new HTTP Request has been received. + +The Application MUST accept 2 ordered parameters: + 1. environ object (incoming request data) + 2. start_response function. Must be called before the Application + callable returns, in order to set the response status and headers. + +The Application MUST return a single string in a list, +which is the response data + +Requires update_poll being called in the applications main event loop. + +For more details about Python WSGI see: +https://www.python.org/dev/peps/pep-0333/ + +* Author(s): Matt Costi +""" +# pylint: disable=no-name-in-module + +import io +import gc +from micropython import const +import socketpool +import wifi + +class BadRequestError(Exception): + """Raised when the client sends an unexpected empty line""" + pass + +_BUFFER_SIZE = 32 +buffer = bytearray(_BUFFER_SIZE) +def readline(socketin): + """ + Implement readline() for native wifi using recv_into + """ + data_string = b"" + while True: + try: + num = socketin.recv_into(buffer, 1) + data_string += str(buffer, 'utf8')[:num] + if num == 0: + return data_string + if data_string[-2:] == b"\r\n": + return data_string[:-2] + except OSError as ex: + # if ex.errno == 9: # [Errno 9] EBADF + # return None + if ex.errno == 11: # [Errno 11] EAGAIN + continue + raise + + +def read(socketin,length = -1): + total = 0 + data_string = b"" + try: + if length > 0: + while total < length: + reste = length - total + num = socketin.recv_into(buffer, min(_BUFFER_SIZE, reste)) + # + if num == 0: + # timeout + # raise OSError(110) + return data_string + # + data_string += buffer[:num] + total = total + num + return data_string + else: + while True: + num = socketin.recv_into(buffer, 1) + data_string += str(buffer, 'utf8')[:num] + if num == 0: + return data_string + except OSError as ex: + if ex.errno == 11: # [Errno 11] EAGAIN + return data_string + raise + +def parse_headers(sock): + """ + Parses the header portion of an HTTP request/response from the socket. + Expects first line of HTTP request/response to have been read already + return: header dictionary + rtype: Dict + """ + headers = {} + while True: + line = readline(sock) + if not line or line == b"\r\n": + break + + #print("**line: ", line) + title, content = line.split(b': ', 1) + if title and content: + title = str(title.lower(), 'utf-8') + content = str(content, 'utf-8') + headers[title] = content + return headers + + +pool = socketpool.SocketPool(wifi.radio) + +NO_SOCK_AVAIL = const(255) + +# pylint: disable=invalid-name +class WSGIServer: + """ + A simple server that implements the WSGI interface + """ + + def __init__(self, port=80, debug=False, application=None): + self.application = application + self.port = port + self._server_sock = None + self._client_sock = None + self._debug = debug + + self._response_status = None + self._response_headers = [] + + def start(self): + """ + starts the server and begins listening for incoming connections. + Call update_poll in the main loop for the application callable to be + invoked on receiving an incoming request. + """ + self._server_sock = pool.socket(pool.AF_INET,pool.SOCK_STREAM) + HOST = repr(wifi.radio.ipv4_address_ap) + self._server_sock.bind((repr(wifi.radio.ipv4_address_ap), self.port)) + self._server_sock.listen(1) +# if self._debug: +# ip = _the_interface.pretty_ip(_the_interface.ip_address) +# print("Server available at {0}:{1}".format(ip, self.port)) +# print( +# "Sever status: ", +# _the_interface.get_server_state(self._server_sock.socknum), +# ) + + def pretty_ip(self): + return f"http://{wifi.radio.ipv4_address_ap}:{self.port}" + + def update_poll(self): + """ + Call this method inside your main event loop to get the server + check for new incoming client requests. When a request comes in, + the application callable will be invoked. + """ + self.client_available() + if self._client_sock: + try: + environ = self._get_environ(self._client_sock) + result = self.application(environ, self._start_response) + self.finish_response(result) + except BadRequestError: + self._start_response("400 Bad Request", []) + self.finish_response([]) + + def finish_response(self, result): + """ + Called after the application callbile returns result data to respond with. + Creates the HTTP Response payload from the response_headers and results data, + and sends it back to client. + + :param string result: the data string to send back in the response to the client. + """ + try: + response = "HTTP/1.1 {0}\r\n".format(self._response_status) + for header in self._response_headers: + response += "{0}: {1}\r\n".format(*header) + response += "\r\n" + self._client_sock.send(response.encode("utf-8")) + for data in result: + if isinstance(data, str): + data = data.encode("utf-8") + elif not isinstance(data, bytes): + data = str(data).encode("utf-8") + bytes_sent = 0 + while bytes_sent < len(data): + try: + bytes_sent += self._client_sock.send(data[bytes_sent:]) + except OSError as ex: + if ex.errno != 11: # [Errno 11] EAGAIN + raise + gc.collect() + except OSError as ex: + if ex.errno != 104: # [Errno 104] ECONNRESET + raise + finally: + #print("closing") + self._client_sock.close() + self._client_sock = None + + def client_available(self): + """ + returns a client socket connection if available. + Otherwise, returns None + :return: the client + :rtype: Socket + """ + sock = None + if not self._server_sock: + print("Server has not been started, cannot check for clients!") + elif not self._client_sock: + self._server_sock.setblocking(False) + try: + self._client_sock, addr = self._server_sock.accept() + except OSError as ex: + if ex.errno != 11: # [Errno 11] EAGAIN + raise + + return None + + def _start_response(self, status, response_headers): + """ + The application callable will be given this method as the second param + This is to be called before the application callable returns, to signify + the response can be started with the given status and headers. + + :param string status: a status string including the code and reason. ex: "200 OK" + :param list response_headers: a list of tuples to represent the headers. + ex ("header-name", "header value") + """ + self._response_status = status + self._response_headers = [("Server", "esp32WSGIServer")] + response_headers + + def _get_environ(self, client): + """ + The application callable will be given the resulting environ dictionary. + It contains metadata about the incoming request and the request body ("wsgi.input") + + :param Socket client: socket to read the request from + """ + env = {} + line = readline(client).decode("utf-8") + try: + (method, path, ver) = line.rstrip("\r\n").split(None, 2) + except ValueError: + raise BadRequestError("Unknown request from client.") + + env["wsgi.version"] = (1, 0) + env["wsgi.url_scheme"] = "http" + env["wsgi.multithread"] = False + env["wsgi.multiprocess"] = False + env["wsgi.run_once"] = False + + env["REQUEST_METHOD"] = method + env["SCRIPT_NAME"] = "" + env["SERVER_NAME"] = str(wifi.radio.ipv4_address_ap) + env["SERVER_PROTOCOL"] = ver + env["SERVER_PORT"] = self.port + if path.find("?") >= 0: + env["PATH_INFO"] = path.split("?")[0] + env["QUERY_STRING"] = path.split("?")[1] + else: + env["PATH_INFO"] = path + + headers = parse_headers(client) + if "content-type" in headers: + env["CONTENT_TYPE"] = headers.get("content-type") + if "content-length" in headers: + env["CONTENT_LENGTH"] = headers.get("content-length") + body = read(client, int(env["CONTENT_LENGTH"])) + env["wsgi.input"] = io.StringIO(body) + else: + body = read(client) + env["wsgi.input"] = io.StringIO(body) + for name, value in headers.items(): + key = "HTTP_" + name.replace("-", "_").upper() + if key in env: + value = "{0},{1}".format(env[key], value) + env[key] = value + + return env