2 Commits

14 changed files with 104 additions and 1593 deletions

View File

@@ -1,34 +0,0 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''
---
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
2. Click on '....'
3. Scroll down to '....'
4. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Debug info**
If possible, include debug serial data.
On Windows, use PuTTY, connect to the debug serial port (commonly COM3, but could vary)
On Linux, use minicom (minicom -b 115200 -o -D /dev/ttyACM0, where ttyACM0 corresponds to your device)
**Additional context**
Add any other context about the problem here.

View File

@@ -1 +0,0 @@
blank_issues_enabled: false

View File

@@ -1,29 +0,0 @@
---
name: Not Working
about: Create a report if something is not working correctly
title: ''
labels: ''
assignees: ''
---
**Describe the issue**
A clear and concise description of the symptoms
**To Reproduce**
Steps to reproduce the behavior:
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Debug info**
If possible, include debug serial data.
On Windows, use PuTTY, connect to the debug serial port (commonly COM3, but could vary)
On Linux, use minicom (minicom -b 115200 -o -D /dev/ttyACM0, where ttyACM0 corresponds to your device)
**Additional context**
Add any other context about the problem here.

View File

@@ -1,6 +0,0 @@
# Instructions on how to collect debug logs
* On Windows, use [PuTTY](https://www.chiark.greenend.org.uk/~sgtatham/putty/latest.html) to connect to the debug serial port (commonly COM3, but could vary on your machine).
* On Linux, use minicom (minicom -b 115200 -o -D /dev/ttyACM0, where ttyACM0 corresponds to your device).
* On Ubuntu: `sudo apt install minicom`

179
README.md
View File

@@ -16,156 +16,48 @@
<br />
## Quick Start Guide
Install and have your USB Rubber Ducky working in less than 5 minutes.
1. Download the latest release from the [Releases](https://github.com/dbisu/pico-ducky/releases) page.
2. Plug the device into a USB port while holding the boot button. It will show up as a removable media device named RPI-RP2.
3. Install CircutlPython on the Pico or Pico W
If using a Pico board:
Copy the adafruit-circuitpython-raspberry_pi_pico-en_US-9.2.1.uf2 file to the root of the Pico (RPI-RP2). The device will reboot and after a second or so, it will reconnect as CIRCUITPY.
If using a Pico W board:
Copy the adafruit-circuitpython-raspberry_pi_pico_w-en_US-9.2.1.uf2 file to the root of the Pico (RPI-RP2). The device will reboot and after a second or so, it will reconnect as CIRCUITPY.
If using a Pico 2 board:
Copy the adafruit-circuitpython-raspberry_pi_pico2-en_US-9.2.1.uf2 file to the root of the Pico (RPI-RP2). The device will reboot and after a second or so, it will reconnect as CIRCUITPY.
If using a Pico 2W board:
Copy the adafruit-circuitpython-raspberry_pi_pico2_w-en_US-9.2.1.uf2 file to the root of the Pico (RPI-RP2). The device will reboot and after a second or so, it will reconnect as CIRCUITPY.
4. Copy the lib folder to the root of the CIRCUITPY
5. Copy *.py to the root of the CIRCUITPY
6. Follow the instructions in README.md to enter setup mode
7. Copy your payload as payload.dd to the root of the CIRCUITPY
8. Unplug the device from the USB port and remove the setup jumper.
Enjoy your Pico-Ducky.
## Setup mode
To edit the payload, enter setup mode by connecting the pin 1 (`GP0`) to pin 3 (`GND`), this will stop the pico-ducky from injecting the payload in your own machine.
The easiest way to do so is by using a jumper wire between those pins as seen bellow.
![Setup mode with a jumper](images/setup-mode.png)
## USB enable/disable mode
If you need the pico-ducky to not show up as a USB mass storage device for stealth, follow these instructions.
- Enter setup mode.
- Copy your payload script to the pico-ducky.
- Disconnect the pico from your host PC.
- Connect a jumper wire between pin 18 (`GND`) and pin 20 (`GPIO15`).
This will prevent the pico-ducky from showing up as a USB drive when plugged into the target computer.
- Remove the jumper and reconnect to your PC to reprogram.
Pico: The default mode is USB mass storage enabled.
Pico W: The default mode is USB mass storage **disabled**
![USB enable/disable mode](images/usb-boot-mode.png)
-----
# Full Install Instructions
## Install
Install and have your USB Rubber Ducky working in less than 5 minutes.
1. Clone the repo to get a local copy of the files. `git clone https://github.com/dbisu/pico-ducky.git`
1. Download [CircuitPython for the Raspberry Pi Pico](https://circuitpython.org/board/raspberry_pi_pico/). *Updated to 7.0.0
2. Download [CircuitPython for the Raspberry Pi Pico](https://circuitpython.org/board/raspberry_pi_pico/). *Updated to 9.2.1
Download [CircuitPython for the Raspberry Pi Pico W](https://circuitpython.org/board/raspberry_pi_pico_w/). *Updated to 9.2.1
Download [CircuitPython for the Raspberry Pi Pico 2](https://circuitpython.org/board/raspberry_pi_pico2/). *Updated to 9.2.1
Download [CircuitPython for the Raspberry Pi Pico 2W](https://circuitpython.org/board/raspberry_pi_pico2_w/). *Updated to 9.2.1
2. Plug the device into a USB port while holding the boot button. It will show up as a removable media device named `RPI-RP2`.
3. Plug the device into a USB port while holding the boot button. It will show up as a removable media device named `RPI-RP2`.
3. Copy the downloaded `.uf2` file to the root of the Pico (`RPI-RP2`). The device will reboot and after a second or so, it will reconnect as `CIRCUITPY`.
4. Copy the downloaded `.uf2` file to the root of the Pico (`RPI-RP2`). The device will reboot and after a second or so, it will reconnect as `CIRCUITPY`.
4. Download `adafruit-circuitpython-bundle-7.x-mpy-YYYYMMDD.zip` [here](https://github.com/adafruit/Adafruit_CircuitPython_Bundle/releases/latest) and extract it outside the device.
5. Download `adafruit-circuitpython-bundle-9.x-mpy-YYYYMMDD.zip` [here](https://github.com/adafruit/Adafruit_CircuitPython_Bundle/releases/latest) and extract it outside the device.
5. Navigate to `lib` in the recently extracted folder and copy `adafruit_hid` to the `lib` folder in your Raspberry Pi Pico.
6. Navigate to `lib` in the recently extracted folder and copy `adafruit_hid` to the `lib` folder on your Raspberry Pi Pico.
6. Click [here](https://raw.githubusercontent.com/dbisu/pico-ducky/main/duckyinpython.py), press CTRL + S and save the file as `code.py` in the root of the Raspberry Pi Pico, overwriting the previous file.
7. Copy `adafruit_debouncer.mpy` and `adafruit_ticks.mpy` to the `lib` folder on your Raspberry Pi Pico.
7. Find a script [here](https://github.com/hak5darren/USB-Rubber-Ducky/wiki/Payloads) or [create your own one using Ducky Script](https://github.com/hak5darren/USB-Rubber-Ducky/wiki/Duckyscript) and save it as `payload.dd` in the Pico.
8. Copy `asyncio` to the `lib` folder on your Pico.
8. Be careful, if your device isn't in [setup mode](#setup-mode), the device will reboot and after half a second, the script will run.
9. Copy `adafruit_wsgi` to the `lib` folder on your Pico.
10. Copy `boot.py` from your clone to the root of your Pico.
11. Copy `duckyinpython.py`, `code.py`, `webapp.py`, `wsgiserver.py` to the root folder of the Pico.
12. *For Pico W Only* Create the file `secrets.py` in the root of the Pico W. This contains the AP name and password to be created by the Pico W.
`secrets = { 'ssid' : "BadAPName", 'password' : "badpassword" }`
13. Find a script [here](https://github.com/hak5/usbrubberducky-payloads) or [create your own one using Ducky Script](https://docs.hak5.org/hak5-usb-rubber-ducky/ducky-script-basics/hello-world) and save it as `payload.dd` in the Pico. Currently, pico-ducky only supports DuckyScript 1.0, and some of 3.0.
14. Be careful, if your device isn't in [setup mode](#setup-mode), the device will reboot and after half a second, the script will run.
15. **Please note:** by default Pico W will not show as a USB drive
### Pico W Web Service
The Pico W AP defaults to ip address `192.168.4.1`. You should be able to find the webservice at `http://192.168.4.1:80`
The following endpoints are available on the webservice:
```
/
/new
/ducky
/edit/<filename>
/write/<filename>
/run/<filename>
```
API endpoints
```
/api/run/<filenumber>
```
## Setup mode
### Setup mode
To edit the payload, enter setup mode by connecting the pin 1 (`GP0`) to pin 3 (`GND`), this will stop the pico-ducky from injecting the payload in your own machine.
The easiest way to do so is by using a jumper wire between those pins as seen bellow.
The easiest way to so is by using a jumper wire between those pins as seen bellow.
![Setup mode with a jumper](images/setup-mode.png)
## USB enable/disable mode
### USB enable/disable mode
If you need the pico-ducky to not show up as a USB mass storage device for stealth, follow these instructions.
- Enter setup mode.
- Copy your payload script to the pico-ducky.
- Disconnect the pico from your host PC.
- Connect a jumper wire between pin 18 (`GND`) and pin 20 (`GPIO15`).
Enter setup mode.
Copy boot.py to the root of the pico-ducky.
Copy your payload script to the pico-ducky.
Disconnect the pico from your host PC.
Connect a jumper wire between pin 18 and pin 20.
This will prevent the pico-ducky from showing up as a USB drive when plugged into the target computer.
- Remove the jumper and reconnect to your PC to reprogram.
Pico: The default mode is USB mass storage enabled.
Pico W: The default mode is USB mass storage **disabled**
Remove the jumper and reconnect to your PC to reprogram.
The default mode is USB mass storage enabled.
![USB enable/disable mode](images/usb-boot-mode.png)
## Multiple payloads
Multiple payloads can be stored on the Pico and Pico W.
To select a payload, ground one of these pins:
- GP4 - payload.dd
- GP5 - payload2.dd
- GP10 - payload3.dd
- GP11 - payload4.dd
## Changing Keyboard Layouts
### Changing Keyboard Layouts
Copied from [Neradoc/Circuitpython_Keyboard_Layouts](https://github.com/Neradoc/Circuitpython_Keyboard_Layouts/blob/main/PICODUCKY.md)
@@ -219,33 +111,8 @@ from keyboard_layout_win_LANG import KeyboardLayout
from keycode_win_LANG import Keycode
```
##### Example: Set to German Keyboard (WIN_DE)
```py
from keyboard_layout_win_de import KeyboardLayout
from keycode_win_de import Keycode
```
Copy the files keyboard_layout_win_de.mpy and keycode_win_de.mpy to the /lib folder on the Pico board
```
adafruit_hid/
keyboard_layout_win_de.mpy
keycode_win_de.mpy
```
## Useful links and resources
### How to recover your Pico if it becomes corrupted or doesn't boot.
[Reset Instructions](RESET.md)
### Installation Tool
[ryo-yamada](https://github.com/ryo-yamada) Created a tool to convert a blank RPi Pico to a ducky.
You can find the tool [here](https://github.com/ryo-yamada/PicoDuckyBuilder)
### Docs
[CircuitPython](https://circuitpython.readthedocs.io/en/6.3.x/README.html)
@@ -261,9 +128,3 @@ You can find the tool [here](https://github.com/ryo-yamada/PicoDuckyBuilder)
[USB Rubber Ducky playlist by **Hak5**](https://www.youtube.com/playlist?list=PLW5y1tjAOzI0YaJslcjcI4zKI366tMBYk)
[CircuitPython tutorial on the Raspberry Pi Pico by **DroneBot Workshop**](https://www.youtube.com/watch?v=07vG-_CcDG0)
## Related Projects
[Defcon31-ducky](https://github.com/iot-pwn/defcon31-ducky)
There are still a few of these available to purchase, US only.

View File

@@ -1,7 +0,0 @@
# Instructions on how to reset a Raspberry Pi Pico
Follow these instructions if your Pico ends up in an odd state
1. Download the reset firmware from [flash_nuke.uf2](https://datasheets.raspberrypi.com/soft/flash_nuke.uf2)
2. While holding the BOOTSEL button on the Pico, plug in the USB cable to your computer.
3. When the RPI-RP2 drive shows up on your computer, copy the flash_nuke.uf2 file to the Pico
4. After the device reboots, follow the Install instructions [here](https://github.com/dbisu/pico-ducky/blob/main/README.md)

50
boot.py
View File

@@ -1,58 +1,20 @@
# License : GPLv2.0
# copyright (c) 2023 Dave Bailey
# Author: Dave Bailey (dbisu, @daveisu)
# Pico and Pico W board support
from board import *
import board
import digitalio
import storage
import os
import usb_hid
def is_exfil_enabled(payload_path="payload.dd"):
try:
with open(payload_path, "r") as f:
for line in f:
if "$_EXFIL_MODE_ENABLED" in line and "TRUE" in line.upper():
return True
except OSError:
pass
return False
usb_hid.disable()
usb_hid.enable((usb_hid.Device.KEYBOARD,))
exfil_enabled = is_exfil_enabled()
loot_exists = "loot.bin" in os.listdir("/")
noStorage = False
noStorageStatus = False
noStoragePin = digitalio.DigitalInOut(GP15)
noStoragePin.switch_to_input(pull=digitalio.Pull.UP)
noStorageStatus = noStoragePin.value
noStorageStatus = not noStoragePin.value
# If GP15 is not connected, it will default to being pulled high (True)
# If GP is connected to GND, it will be low (False)
# Pico:
# GP15 not connected == USB visible
# GP15 connected to GND == USB not visible
# Pico W:
# GP15 not connected == USB NOT visible
# GP15 connected to GND == USB visible
if exfil_enabled:
if not loot_exists:
storage.disable_usb_drive()
if(board.board_id == 'raspberry_pi_pico' or board.board_id == 'raspberry_pi_pico2'):
# On Pi Pico, default to USB visible
noStorage = not noStorageStatus
elif(board.board_id == 'raspberry_pi_pico_w' or board.board_id == 'raspberry_pi_pico2_w'):
# on Pi Pico W, default to USB hidden by default
# so webapp can access storage
noStorage = noStorageStatus
if(noStorage == True):
if(noStorageStatus == True):
# don't show USB drive to host PC
storage.disable_usb_drive()
print("Disabling USB drive")
else:
# normal boot
print("USB drive enabled")

View File

@@ -1,125 +0,0 @@
import os
import shutil
import re
import sys
import zipfile
languages = [ "MAC_FR",
"US_DVO",
"WIN_BR",
"WIN_CZ",
"WIN_CZ1",
"WIN_DA",
"WIN_DE",
"WIN_ES",
"WIN_FR",
"WIN_HU",
"WIN_IT",
"WIN_PO",
"WIN_SW",
"WIN_TR",
"WIN_UK" ]
supported_boards = ["raspberry_pi_pico",
"raspberry_pi_pico_w",
"raspberry_pi_pico2",
"raspberry_pi_pico2_w"]
files_to_bundle = ["boot.py",
"code.py",
"duckyinpython.py",
"wsgiserver.py",
"webapp.py",
"secrets.py",
"payload.dd",
"payload2.dd",
"payload3.dd",
"payload4.dd",
"INSTALL.txt"]
dirs_to_bundle = ["lib"]
def bundle_files_to_zip(source_dir, destination_dir, file_list, target_file, replacement_dict, version):
"""
Bundles files from a source directory into a new directory with a unique name.
Args:
source_dir: Path to the source directory containing the files.
destination_dir: Path to the destination directory where bundles will be created.
file_list: List of filenames to be included in the bundle.
target_file: Filename of the file to be modified.
replacement_dict: Dictionary containing key-value pairs for text replacements.
Returns:
None
"""
if not os.path.exists(destination_dir):
os.makedirs(destination_dir)
# Generate a unique bundle name (e.g., using a timestamp)
bundle_name = f"pico-ducky-{version}-{destination_dir}.zip"
bundle_path = os.path.join(destination_dir, bundle_name)
# Create a temporary directory for the bundle contents
temp_dir = os.path.join(destination_dir, "temp_bundle")
os.makedirs(temp_dir)
for filename in file_list:
source_file = os.path.join(source_dir, filename)
destination_file = os.path.join(temp_dir, filename)
if filename == target_file:
with open(source_file, 'r') as f:
file_content = f.read()
for key, value in replacement_dict.items():
file_content = re.sub(key, value, file_content)
with open(destination_file, 'w') as f:
f.write(file_content)
else:
shutil.copy2(source_file, destination_file)
for dir in dirs_to_bundle:
shutil.copytree(os.path.join(source_dir,dir),os.path.join(temp_dir,dir))
#find uf2 files for supported boards
for root, dirs, files in os.walk(source_dir):
for file in files:
for board in supported_boards:
if '-'+board+'-' in file:
source_file = os.path.join(source_dir, file)
destination_file = os.path.join(temp_dir, file)
shutil.copy2(source_file, destination_file)
# Create the ZIP archive
with zipfile.ZipFile(bundle_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(temp_dir):
for file in files:
file_path = os.path.join(root, file)
archive_path = os.path.relpath(file_path, temp_dir)
zipf.write(file_path, archive_path)
# Remove the temporary directory
shutil.rmtree(temp_dir)
def main(argv):
version = argv[0]
for dest_dir in languages:
source_directory = "US"
target_file_to_modify = "duckyinpython.py"
replacements = {
"#from keyboard_layout_win_LANG": "from keyboard_layout_"+dest_dir.lower(),
"#from keycode_win_LANG": "from keycode_"+dest_dir.lower(),
"from adafruit_hid.keyboard_": "#from adafruit_hid.keyboard_",
"from adafruit_hid.keycode": "#from adafruit_hid.keycode"
}
bundle_files_to_zip(source_directory, dest_dir, files_to_bundle,
target_file_to_modify, replacements, version)
if __name__ == "__main__":
main(sys.argv[1:])

86
code.py
View File

@@ -1,86 +0,0 @@
# License : GPLv2.0
# copyright (c) 2023 Dave Bailey
# Author: Dave Bailey (dbisu, @daveisu)
# Pico and Pico W board support
import supervisor
import os
import pwmio
import time
import digitalio
from board import *
import board
from duckyinpython import *
if(board.board_id == 'raspberry_pi_pico_w' or board.board_id == 'raspberry_pi_pico2_w'):
import wifi
from webapp import *
# sleep at the start to allow the device to be recognized by the host computer
time.sleep(.5)
def startWiFi():
import ipaddress
# Get wifi details and more from a secrets.py file
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
print("Connect wifi")
#wifi.radio.connect(secrets['ssid'],secrets['password'])
wifi.radio.start_ap(secrets['ssid'],secrets['password'])
HOST = repr(wifi.radio.ipv4_address_ap)
PORT = 80 # Port to listen on
print(HOST,PORT)
# turn off automatically reloading when files are written to the pico
#supervisor.disable_autoreload()
supervisor.runtime.autoreload = False
if(board.board_id == 'raspberry_pi_pico' or board.board_id == 'raspberry_pi_pico2'):
led = pwmio.PWMOut(board.LED, frequency=5000, duty_cycle=0)
elif(board.board_id == 'raspberry_pi_pico_w' or board.board_id == 'raspberry_pi_pico2_w'):
led = digitalio.DigitalInOut(board.LED)
led.switch_to_output()
async def run_payload_on_startup():
progStatus = False
progStatus = getProgrammingStatus()
print("progStatus", progStatus)
if(progStatus == False):
print("Finding payload")
if "loot.bin" in os.listdir("/"):
print("loot.bin exists, skipping payload execution.")
else:
payload = selectPayload()
await asyncio.sleep(0.1)
print("Running")
await runScript(payload)
else:
print("Done")
led_state = False
async def main_loop():
global led,button1
button_task = asyncio.create_task(monitor_buttons(button1))
payload_task = asyncio.create_task(run_payload_on_startup())
led_task = asyncio.create_task(monitor_led_changes())
if(board.board_id == 'raspberry_pi_pico_w' or board.board_id == 'raspberry_pi_pico2_w'):
pico_led_task = asyncio.create_task(blink_pico_w_led(led))
print("Starting Wifi")
startWiFi()
print("Starting Web Service")
webservice_task = asyncio.create_task(startWebService())
await asyncio.gather(pico_led_task, button_task, webservice_task, payload_task, led_task)
else:
pico_led_task = asyncio.create_task(blink_pico_led(led))
await asyncio.gather(pico_led_task, button_task, payload_task, led_task )
asyncio.run(main_loop())

View File

@@ -1,23 +1,9 @@
# License : GPLv2.0
# copyright (c) 2023 Dave Bailey
# copyright (c) 2021 Dave Bailey
# Author: Dave Bailey (dbisu, @daveisu)
#
# TODO: ADD support for the following:
# Add jitter
# Add LED functionality
import re
import time
import random
import digitalio
from digitalio import DigitalInOut, Pull
from adafruit_debouncer import Debouncer
import board
from board import *
import asyncio
import usb_hid
from adafruit_hid.keyboard import Keyboard
from adafruit_hid.consumer_control import ConsumerControl
from adafruit_hid.consumer_control_code import ConsumerControlCode
# comment out these lines for non_US keyboards
from adafruit_hid.keyboard_layout_us import KeyboardLayoutUS as KeyboardLayout
@@ -25,40 +11,21 @@ from adafruit_hid.keycode import Keycode
# uncomment these lines for non_US keyboards
# replace LANG with appropriate language
#from keyboard_layout_win_LANG import KeyboardLayout as KeyboardLayout
#from keyboard_layout_win_LANG import KeyboardLayout
#from keycode_win_LANG import Keycode
def _capsOn():
return kbd.led_on(Keyboard.LED_CAPS_LOCK)
import supervisor
def _numOn():
return kbd.led_on(Keyboard.LED_NUM_LOCK)
import time
import digitalio
from board import *
led = digitalio.DigitalInOut(LED)
led.direction = digitalio.Direction.OUTPUT
def _scrollOn():
return kbd.led_on(Keyboard.LED_SCROLL_LOCK)
def pressLock(key):
kbd.press(key)
kbd.release(key)
def SaveKeyboardLedState():
variables["$_INITIAL_SCROLLLOCK"] = _scrollOn()
variables["$_INITIAL_NUMLOCK"] = _numOn()
variables ["$_INITIAL_CAPSLOCK"] = _capsOn()
def RestoreKeyboardLedState():
if(variables["$_INITIAL_CAPSLOCK"] != _capsOn()):
pressLock(Keycode.CAPS_LOCK)
if(variables["$_INITIAL_NUMLOCK"] != _numOn()):
pressLock(Keycode.NUM_LOCK)
if(variables["$_INITIAL_SCROLLLOCK"] != _scrollOn()):
pressLock(Keycode.SCROLL_LOCK)
duckyKeys = {
'WINDOWS': Keycode.GUI, 'RWINDOWS': Keycode.RIGHT_GUI, 'GUI': Keycode.GUI, 'RGUI': Keycode.RIGHT_GUI, 'COMMAND': Keycode.GUI, 'RCOMMAND': Keycode.RIGHT_GUI,
'APP': Keycode.APPLICATION, 'MENU': Keycode.APPLICATION, 'SHIFT': Keycode.SHIFT, 'RSHIFT': Keycode.RIGHT_SHIFT,
'ALT': Keycode.ALT, 'RALT': Keycode.RIGHT_ALT, 'OPTION': Keycode.ALT, 'ROPTION': Keycode.RIGHT_ALT, 'CONTROL': Keycode.CONTROL, 'CTRL': Keycode.CONTROL, 'RCTRL': Keycode.RIGHT_CONTROL,
duckyCommands = {
'WINDOWS': Keycode.WINDOWS, 'GUI': Keycode.GUI,
'APP': Keycode.APPLICATION, 'MENU': Keycode.APPLICATION, 'SHIFT': Keycode.SHIFT,
'ALT': Keycode.ALT, 'CONTROL': Keycode.CONTROL, 'CTRL': Keycode.CONTROL,
'DOWNARROW': Keycode.DOWN_ARROW, 'DOWN': Keycode.DOWN_ARROW, 'LEFTARROW': Keycode.LEFT_ARROW,
'LEFT': Keycode.LEFT_ARROW, 'RIGHTARROW': Keycode.RIGHT_ARROW, 'RIGHT': Keycode.RIGHT_ARROW,
'UPARROW': Keycode.UP_ARROW, 'UP': Keycode.UP_ARROW, 'BREAK': Keycode.PAUSE,
@@ -76,247 +43,48 @@ duckyKeys = {
'Z': Keycode.Z, 'F1': Keycode.F1, 'F2': Keycode.F2, 'F3': Keycode.F3,
'F4': Keycode.F4, 'F5': Keycode.F5, 'F6': Keycode.F6, 'F7': Keycode.F7,
'F8': Keycode.F8, 'F9': Keycode.F9, 'F10': Keycode.F10, 'F11': Keycode.F11,
'F12': Keycode.F12, 'F13': Keycode.F13, 'F14': Keycode.F14, 'F15': Keycode.F15,
'F16': Keycode.F16, 'F17': Keycode.F17, 'F18': Keycode.F18, 'F19': Keycode.F19,
'F20': Keycode.F20, 'F21': Keycode.F21, 'F22': Keycode.F22, 'F23': Keycode.F23,
'F24': Keycode.F24
'F12': Keycode.F12,
}
duckyConsumerKeys = {
'MK_VOLUP': ConsumerControlCode.VOLUME_INCREMENT, 'MK_VOLDOWN': ConsumerControlCode.VOLUME_DECREMENT, 'MK_MUTE': ConsumerControlCode.MUTE,
'MK_NEXT': ConsumerControlCode.SCAN_NEXT_TRACK, 'MK_PREV': ConsumerControlCode.SCAN_PREVIOUS_TRACK,
'MK_PP': ConsumerControlCode.PLAY_PAUSE, 'MK_STOP': ConsumerControlCode.STOP
}
variables = {"$_RANDOM_MIN": 0, "$_RANDOM_MAX": 65535,"$_EXFIL_MODE_ENABLED": False,"$_EXFIL_LEDS_ENABLED": False,"$_INITIAL_SCROLLLOCK": False, "$_INITIAL_NUMLOCK": False, "$_INITIAL_CAPSLOCK": False}
internalVariables = {"$_CAPSLOCK_ON": _capsOn, "$_NUMLOCK_ON": _numOn, "$_SCROLLLOCK_ON": _scrollOn}
defines = {}
functions = {}
letters = "abcdefghijklmnopqrstuvwxyz"
numbers = "0123456789"
specialChars = "!@#$%^&*()"
class IF:
def __init__(self, condition, codeIter):
self.condition = condition
self.codeIter = list(codeIter)
self.lastIfResult = None
def _exitIf(self):
_depth = 0
for line in self.codeIter:
line = self.codeIter.pop(0)
line = line.strip()
if line.upper().startswith("END_IF"):
_depth -= 1
elif line.upper().startswith("IF"):
_depth += 1
if _depth < 0:
print("No else, exiting" + str(list(self.codeIter)))
break
return(self.codeIter)
def runIf(self):
if isinstance(self.condition, str):
self.lastIfResult = evaluateExpression(self.condition)
elif isinstance(self.condition, bool):
self.lastIfResult = self.condition
else:
raise ValueError("Invalid condition type")
# print(f"condition {self.condition} result is {self.lastIfResult} since \"$VAR\" is {variables["$VAR"]}, code is {self.codeIter}")
depth = 0
for line in self.codeIter:
line = self.codeIter.pop(0)
line = line.strip()
if line == "":
continue
# print(line)
if line.startswith("IF"):
depth += 1
elif line.startswith("END_IF"):
if depth == 0:
return(self.codeIter, -1)
depth -=1
elif line.startswith("ELSE") and depth == 0:
# print(f"ELSE LINE {line}, lastIfResult: {self.lastIfResult}")
if self.lastIfResult is False:
line = line[4:].strip() # Remove 'ELSE' and strip whitespace
if line.startswith("IF"):
nestedCondition = _getIfCondition(line)
# print(f"nested IF {nestedCondition}")
self.codeIter, self.lastIfResult = IF(nestedCondition, self.codeIter).runIf()
if self.lastIfResult == -1 or self.lastIfResult == True:
# print(f"self.lastIfResult {self.lastIfResult}")
return(self.codeIter, True)
else:
return IF(True, self.codeIter).runIf() #< Regular ELSE block
else:
self._exitIf()
break
# Process regular lines
elif self.lastIfResult:
# print(f"running line {line}")
self.codeIter = list(parseLine(line, self.codeIter))
# print("end of if")
return(self.codeIter, self.lastIfResult)
def _getIfCondition(line):
return str(line)[2:-4].strip()
def _isCodeBlock(line):
line = line.upper().strip()
if line.startswith("IF") or line.startswith("WHILE"):
return True
return False
def _getCodeBlock(linesIter):
"""Returns the code block starting at the given line."""
code = []
depth = 1
for line in linesIter:
line = line.strip()
if line.upper().startswith("END_"):
depth -= 1
elif _isCodeBlock(line):
depth += 1
if depth <= 0:
break
code.append(line)
return code
def evaluateExpression(expression):
"""Evaluates an expression with variables and returns the result."""
# Replace variables (e.g., $FOO) in the expression with their values
expression = re.sub(r"\$(\w+)", lambda m: str(variables.get(f"${m.group(1)}", 0)), expression)
expression = expression.replace("^", "**") #< Replace ^ with ** for exponentiation
expression = expression.replace("&&", "and")
expression = expression.replace("||", "or")
expression = expression.replace("TRUE", "True")
expression = expression.replace("FALSE", "False")
return eval(expression, {}, variables)
def deepcopy(List):
return(List[:])
def convertLine(line):
commands = []
newline = []
# print(line)
# loop on each key - the filter removes empty values
for key in filter(None, line.split(" ")):
key = key.upper()
# find the keycode for the command in the list
command_keycode = duckyKeys.get(key, None)
command_consumer_keycode = duckyConsumerKeys.get(key, None)
command_keycode = duckyCommands.get(key, None)
if command_keycode is not None:
# if it exists in the list, use it
commands.append(command_keycode)
elif command_consumer_keycode is not None:
# if it exists in the list, use it
commands.append(1000+command_consumer_keycode)
newline.append(command_keycode)
elif hasattr(Keycode, key):
# if it's in the Keycode module, use it (allows any valid keycode)
commands.append(getattr(Keycode, key))
newline.append(getattr(Keycode, key))
else:
# if it's not a known key name, show the error for diagnosis
print(f"Unknown key: <{key}>")
# print(commands)
return commands
# print(newline)
return newline
def runScriptLine(line):
keys = convertLine(line)
for k in keys:
if k > 1000:
consumerControl.press(int(k-1000))
else:
for k in line:
kbd.press(k)
for k in reversed(keys):
if k > 1000:
consumerControl.release()
else:
kbd.release(k)
kbd.release_all()
def sendString(line):
layout.write(line)
def replaceVariables(line):
for var in variables:
line = line.replace(var, str(variables[var]))
for var in internalVariables:
line = line.replace(var, str(internalVariables[var]()))
return line
def replaceDefines(line):
for define, value in defines.items():
line = line.replace(define, value)
return line
async def parseLine(line, script_lines):
global defaultDelay, variables, functions, defines
line = line.strip()
line = line.replace("$_RANDOM_INT", str(random.randint(int(variables.get("$_RANDOM_MIN", 0)), int(variables.get("$_RANDOM_MAX", 65535)))))
line = replaceDefines(line)
if line[:10] == "INJECT_MOD":
line = line[11:]
elif line.startswith("REM_BLOCK"):
while line.startswith("END_REM") == False:
line = next(script_lines).strip()
# print(line)
elif(line[0:3] == "REM"):
def parseLine(line):
global defaultDelay
if(line[0:3] == "REM"):
# ignore ducky script comments
pass
elif line.startswith("HOLD"):
# HOLD command to press and hold a key
key = line[5:].strip().upper()
commandKeycode = duckyKeys.get(key, None)
if commandKeycode:
kbd.press(commandKeycode)
else:
print(f"Unknown key to HOLD: <{key}>")
elif line.startswith("RELEASE"):
# RELEASE command to release a held key
key = line[8:].strip().upper()
commandKeycode = duckyKeys.get(key, None)
if commandKeycode:
kbd.release(commandKeycode)
else:
print(f"Unknown key to RELEASE: <{key}>")
elif(line[0:5] == "DELAY"):
line = replaceVariables(line)
time.sleep(float(line[6:])/1000)
elif line == "STRINGLN": #< stringLN block
line = next(script_lines).strip()
line = replaceVariables(line)
while line.startswith("END_STRINGLN") == False:
sendString(line)
kbd.press(Keycode.ENTER)
kbd.release(Keycode.ENTER)
line = next(script_lines).strip()
line = replaceVariables(line)
line = replaceDefines(line)
elif(line[0:8] == "STRINGLN"):
sendString(replaceVariables(line[9:]))
kbd.press(Keycode.ENTER)
kbd.release(Keycode.ENTER)
elif line == "STRING": #< string block
line = next(script_lines).strip()
line = replaceVariables(line)
while line.startswith("END_STRING") == False:
sendString(line)
line = next(script_lines).strip()
line = replaceVariables(line)
line = replaceDefines(line)
elif(line[0:6] == "STRING"):
sendString(replaceVariables(line[7:]))
sendString(line[7:])
elif(line[0:5] == "PRINT"):
line = replaceVariables(line[6:])
print("[SCRIPT]: " + line)
print("[SCRIPT]: " + line[6:])
elif(line[0:6] == "IMPORT"):
runScript(line[7:])
elif(line[0:13] == "DEFAULT_DELAY"):
@@ -328,151 +96,21 @@ async def parseLine(line, script_lines):
led.value = False
else:
led.value = True
elif(line[0:3] == "LED"):
if(led.value == True):
led.value = False
else:
led.value = True
elif(line[:7] == "LED_OFF"):
led.value = False
elif(line[:5] == "LED_R"):
led.value = True
elif(line[:5] == "LED_G"):
led.value = True
elif(line[0:21] == "WAIT_FOR_BUTTON_PRESS"):
button_pressed = False
# NOTE: we don't use assincio in this case because we want to block code execution
while not button_pressed:
button1.update()
button1Pushed = button1.fell
button1Released = button1.rose
button1Held = not button1.value
if(button1Pushed):
print("Button 1 pushed")
button_pressed = True
elif line.startswith("VAR"):
match = re.match(r"VAR\s+\$(\w+)\s*=\s*(.+)", line)
if match:
varName = f"${match.group(1)}"
value = evaluateExpression(match.group(2))
variables[varName] = value
else:
raise SyntaxError(f"Invalid variable declaration: {line}")
elif line.startswith("$"):
match = re.match(r"\$(\w+)\s*=\s*(.+)", line)
if match:
varName = f"${match.group(1)}"
expression = match.group(2)
value = evaluateExpression(expression)
variables[varName] = value
else:
raise SyntaxError(f"Invalid variable update, declare variable first: {line}")
elif line.startswith("DEFINE"):
defineLocation = line.find(" ")
valueLocation = line.find(" ", defineLocation + 1)
defineName = line[defineLocation+1:valueLocation]
defineValue = line[valueLocation+1:]
defines[defineName] = defineValue
elif line.startswith("FUNCTION"):
# print("ENTER FUNCTION")
func_name = line.split()[1]
functions[func_name] = []
line = next(script_lines).strip()
while line != "END_FUNCTION":
functions[func_name].append(line)
line = next(script_lines).strip()
elif line.startswith("WHILE"):
# print("ENTER WHILE LOOP")
condition = line[5:].strip()
loopCode = list(_getCodeBlock(script_lines))
while evaluateExpression(condition) == True:
currentIterCode = deepcopy(loopCode)
print(loopCode)
while currentIterCode:
loopLine = currentIterCode.pop(0)
currentIterCode = list(parseLine(loopLine, iter(currentIterCode))) #< very inefficient, should be replaced later.
elif line.upper().startswith("IF"):
# print("ENTER IF")
script_lines, ret = IF(_getIfCondition(line), script_lines).runIf()
print(f"IF returned {ret} code")
elif line.upper().startswith("END_IF"):
pass
elif line == "RANDOM_LOWERCASE_LETTER":
sendString(random.choice(letters))
elif line == "RANDOM_UPPERCASE_LETTER":
sendString(random.choice(letters.upper()))
elif line == "RANDOM_LETTER":
sendString(random.choice(letters + letters.upper()))
elif line == "RANDOM_NUMBER":
sendString(random.choice(numbers))
elif line == "RANDOM_SPECIAL":
sendString(random.choice(specialChars))
elif line == "RANDOM_CHAR":
sendString(random.choice(letters + letters.upper() + numbers + specialChars))
elif line == "VID_RANDOM" or line == "PID_RANDOM":
for _ in range(4):
sendString(random.choice("0123456789ABCDEF"))
elif line == "MAN_RANDOM" or line == "PROD_RANDOM":
for _ in range(12):
sendString(random.choice(letters + letters.upper() + numbers))
elif line == "SERIAL_RANDOM":
for _ in range(12):
sendString(random.choice(letters + letters.upper() + numbers + specialChars))
elif line == "RESET":
kbd.release_all()
elif line == "SAVE_HOST_KEYBOARD_LOCK_STATE":
SaveKeyboardLedState()
elif line == "RESTORE_HOST_KEYBOARD_LOCK_STATE":
RestoreKeyboardLedState()
elif line == "WAIT_FOR_SCROLL_CHANGE":
last_scroll_state = _scrollOn()
while True:
current_scroll_state = _scrollOn()
if current_scroll_state != last_scroll_state:
break
await asyncio.sleep(0.01)
elif line in functions:
updated_lines = []
inside_while_block = False
for func_line in functions[line]:
if func_line.startswith("WHILE"):
inside_while_block = True # Start skipping lines
updated_lines.append(func_line)
elif func_line.startswith("END_WHILE"):
inside_while_block = False # Stop skipping lines
updated_lines.append(func_line)
parseLine(updated_lines[0], iter(updated_lines))
updated_lines = [] # Clear updated_lines after parsing
elif inside_while_block:
updated_lines.append(func_line)
elif not (func_line.startswith("END_WHILE") or func_line.startswith("WHILE")):
parseLine(func_line, iter(functions[line]))
else:
runScriptLine(line)
return(script_lines)
newScriptLine = convertLine(line)
runScriptLine(newScriptLine)
kbd = Keyboard(usb_hid.devices)
consumerControl = ConsumerControl(usb_hid.devices)
layout = KeyboardLayout(kbd)
#init button
button1_pin = DigitalInOut(GP22) # defaults to input
button1_pin.pull = Pull.UP # turn on internal pull-up resistor
button1 = Debouncer(button1_pin)
# turn off automatically reloading when files are written to the pico
supervisor.disable_autoreload()
# sleep at the start to allow the device to be recognized by the host computer
time.sleep(.5)
led.value = True
#init payload selection switch
payload1Pin = digitalio.DigitalInOut(GP4)
payload1Pin.switch_to_input(pull=digitalio.Pull.UP)
payload2Pin = digitalio.DigitalInOut(GP5)
payload2Pin.switch_to_input(pull=digitalio.Pull.UP)
payload3Pin = digitalio.DigitalInOut(GP10)
payload3Pin.switch_to_input(pull=digitalio.Pull.UP)
payload4Pin = digitalio.DigitalInOut(GP11)
payload4Pin.switch_to_input(pull=digitalio.Pull.UP)
def getProgrammingStatus():
# check GP0 for setup mode
@@ -485,50 +123,45 @@ def getProgrammingStatus():
defaultDelay = 0
async def runScript(file):
def runScript(file):
global defaultDelay
duckyScriptPath = file
restart = True
try:
while restart:
restart = False
with open(duckyScriptPath, "r", encoding='utf-8') as f:
script_lines = iter(f.readlines())
f = open(duckyScriptPath,"r",encoding='utf-8')
previousLine = ""
for line in script_lines:
print(f"runScript: {line}")
for line in f:
line = line.rstrip()
if(line[0:6] == "REPEAT"):
for i in range(int(line[7:])):
#repeat the last command
parseLine(previousLine, script_lines)
parseLine(previousLine)
time.sleep(float(defaultDelay)/1000)
elif line.startswith("RESTART_PAYLOAD"):
restart = True
break
elif line.startswith("STOP_PAYLOAD"):
restart = False
break
else:
await parseLine(line, script_lines)
parseLine(line)
previousLine = line
time.sleep(float(defaultDelay)/1000)
except OSError as e:
print("Unable to open file", file)
def selectPayload():
global payload1Pin, payload2Pin, payload3Pin, payload4Pin
payload = "payload.dd"
# check switch status
# payload1 = GPIO4 to GND
# payload2 = GPIO5 to GND
# payload3 = GPIO10 to GND
# payload4 = GPIO11 to GND
payload1Pin = digitalio.DigitalInOut(GP4)
payload1Pin.switch_to_input(pull=digitalio.Pull.UP)
payload1State = not payload1Pin.value
payload2Pin = digitalio.DigitalInOut(GP5)
payload2Pin.switch_to_input(pull=digitalio.Pull.UP)
payload2State = not payload2Pin.value
payload3Pin = digitalio.DigitalInOut(GP10)
payload3Pin.switch_to_input(pull=digitalio.Pull.UP)
payload3State = not payload3Pin.value
payload4Pin = digitalio.DigitalInOut(GP11)
payload4Pin.switch_to_input(pull=digitalio.Pull.UP)
payload4State = not payload4Pin.value
if(payload1State == True):
payload = "payload.dd"
@@ -546,132 +179,24 @@ def selectPayload():
# default to payload1
payload = "payload.dd"
return payload
async def blink_led(led):
print("Blink")
if(board.board_id == 'raspberry_pi_pico' or board.board_id == 'raspberry_pi_pico2'):
blink_pico_led(led)
elif(board.board_id == 'raspberry_pi_pico_w' or board.board_id == 'raspberry_pi_pico2_w'):
blink_pico_w_led(led)
progStatus = False
progStatus = getProgrammingStatus()
async def blink_pico_led(led):
print("starting blink_pico_led")
led_state = False
while True:
if(variables.get("$_EXFIL_LEDS_ENABLED")):
led.duty_cycle = 65535
else:
if led_state:
#led_pwm_up(led)
#print("led up")
for i in range(100):
# PWM LED up and down
if i < 50:
led.duty_cycle = int(i * 2 * 65535 / 100) # Up
await asyncio.sleep(0.01)
led_state = False
else:
#led_pwm_down(led)
#print("led down")
for i in range(100):
# PWM LED up and down
if i >= 50:
led.duty_cycle = 65535 - int((i - 50) * 2 * 65535 / 100) # Down
await asyncio.sleep(0.01)
led_state = True
await asyncio.sleep(0)
async def blink_pico_w_led(led):
print("starting blink_pico_w_led")
led_state = False
while True:
if(variables.get("$_EXFIL_LEDS_ENABLED")):
led.value = 1
else:
if led_state:
#print("led on")
led.value = 1
await asyncio.sleep(0.5)
led_state = False
else:
#print("led off")
led.value = 0
await asyncio.sleep(0.5)
led_state = True
await asyncio.sleep(0.5)
async def monitor_buttons(button1):
global inBlinkeyMode, inMenu, enableRandomBeep, enableSirenMode,pixel
print("starting monitor_buttons")
button1Down = False
while True:
button1.update()
button1Pushed = button1.fell
button1Released = button1.rose
button1Held = not button1.value
if(button1Pushed):
print("Button 1 pushed")
button1Down = True
if(button1Released):
print("Button 1 released")
if(button1Down):
print("push and released")
if(button1Released):
if(button1Down):
# Run selected payload
if(progStatus == False):
# not in setup mode, inject the payload
payload = selectPayload()
print("Running ", payload)
await runScript(payload)
runScript(payload)
print("Done")
button1Down = False
await asyncio.sleep(0)
async def monitor_led_changes():
print("starting monitor_led_changes")
else:
print("Update your payload")
while True:
if variables.get("$_EXFIL_MODE_ENABLED"):
try:
bit_list = []
last_caps_state = _capsOn()
last_num_state = _numOn()
last_scroll_state = _scrollOn()
with open("loot.bin", "ab") as file:
while variables.get("$_EXFIL_MODE_ENABLED"):
caps_state = _capsOn()
num_state = _numOn()
scroll_state = _scrollOn()
if caps_state != last_caps_state:
bit_list.append(0)
last_caps_state = caps_state
elif num_state != last_num_state:
bit_list.append(1)
last_num_state = num_state
if len(bit_list) == 8:
byte = 0
for b in bit_list:
byte = (byte << 1) | b
file.write(bytes([byte]))
bit_list = []
if scroll_state != last_scroll_state:
variables["$_EXFIL_LEDS_ENABLED"] = False
break
await asyncio.sleep(0.001)
except Exception as e:
print(f"Error occurred: {e}")
await asyncio.sleep(0.0)
time.sleep(1.0)
led.value = False
time.sleep(1.0)
led.value = True

View File

@@ -1,13 +0,0 @@
REM Example Function
FUNCTION COUNTDOWN()
REM The next four lines open Notepad in Windows and type "Hello World!"
GUI r
DELAY 1000
STRING notepad
ENTER
DELAY 2000
STRING Hello World!
ENTER
END_FUNCTION
COUNTDOWN()

View File

@@ -1,7 +0,0 @@
VAR $TIME = 3
WHILE ($TIME > 0)
STRING .
DELAY 500
STRING While Looop!!
ENTER
END_WHILE

243
webapp.py
View File

@@ -1,243 +0,0 @@
# License : GPLv2.0
# copyright (c) 2023 Dave Bailey
# Author: Dave Bailey (dbisu, @daveisu)
# FeatherS2 board support
import socketpool
import time
import os
import storage
import wsgiserver as server
from adafruit_wsgi.wsgi_app import WSGIApp
import wifi
from duckyinpython import *
payload_html = """<!DOCTYPE html>
<html>
<head> <title>Pico W Ducky</title> </head>
<body> <h1>Pico W Ducky</h1>
<table border="1"> <tr><th>Payload</th><th>Actions</th></tr> {} </table>
<br>
<a href="/new">New Script</a>
</body>
</html>
"""
edit_html = """<!DOCTYPE html>
<html>
<head>
<title>Script Editor</title>
</head>
<body>
<form action="/write/{}" method="POST">
<textarea rows="5" cols="60" name="scriptData">{}</textarea>
<br/>
<input type="submit" value="submit"/>
</form>
<br>
<a href="/ducky">Home</a>
</body>
</html>
"""
new_html = """<!DOCTYPE html>
<html>
<head>
<title>New Script</title>
</head>
<body>
<form action="/new" method="POST">
Script Name<br>
<textarea rows="1" cols="60" name="scriptName"></textarea>
Script<br>
<textarea rows="5" cols="60" name="scriptData"></textarea>
<br/>
<input type="submit" value="submit"/>
</form>
<br>
<a href="/ducky">Home</a>
</body>
</html>
"""
response_html = """<!DOCTYPE html>
<html>
<head> <title>Pico W Ducky</title> </head>
<body> <h1>Pico W Ducky</h1>
{}
<br>
<a href="/ducky">Home</a>
</body>
</html>
"""
newrow_html = "<tr><td>{}</td><td><a href='/edit/{}'>Edit</a> / <a href='/run/{}'>Run</a></tr>"
def setPayload(payload_number):
if(payload_number == 1):
payload = "payload.dd"
else:
payload = "payload"+str(payload_number)+".dd"
return(payload)
def ducky_main(request):
print("Ducky main")
payloads = []
rows = ""
files = os.listdir()
#print(files)
for f in files:
if ('.dd' in f) == True:
payloads.append(f)
newrow = newrow_html.format(f,f,f)
#print(newrow)
rows = rows + newrow
response = payload_html.format(rows)
return(response)
_hexdig = '0123456789ABCDEFabcdef'
_hextobyte = None
def cleanup_text(string):
"""unquote('abc%20def') -> b'abc def'."""
global _hextobyte
if not string:
return b''
if isinstance(string, str):
string = string.encode('utf-8')
bits = string.split(b'%')
if len(bits) == 1:
return string
res = [bits[0]]
append = res.append
if _hextobyte is None:
_hextobyte = {(a + b).encode(): bytes([int(a + b, 16)])
for a in _hexdig for b in _hexdig}
for item in bits[1:]:
try:
append(_hextobyte[item[:2]])
append(item[2:])
except KeyError:
append(b'%')
append(item)
return b''.join(res).decode().replace('+',' ')
web_app = WSGIApp()
@web_app.route("/ducky")
def duck_main(request):
response = ducky_main(request)
return("200 OK", [('Content-Type', 'text/html')], response)
@web_app.route("/edit/<filename>")
def edit(request, filename):
print("Editing ", filename)
f = open(filename,"r",encoding='utf-8')
textbuffer = ''
for line in f:
textbuffer = textbuffer + line
f.close()
response = edit_html.format(filename,textbuffer)
#print(response)
return("200 OK",[('Content-Type', 'text/html')], response)
@web_app.route("/write/<filename>",methods=["POST"])
def write_script(request, filename):
data = request.body.getvalue()
fields = data.split("&")
form_data = {}
for field in fields:
key,value = field.split('=')
form_data[key] = value
#print(form_data)
storage.remount("/",readonly=False)
f = open(filename,"w",encoding='utf-8')
textbuffer = form_data['scriptData']
textbuffer = cleanup_text(textbuffer)
#print(textbuffer)
for line in textbuffer:
f.write(line)
f.close()
storage.remount("/",readonly=True)
response = response_html.format("Wrote script " + filename)
return("200 OK",[('Content-Type', 'text/html')], response)
@web_app.route("/new",methods=['GET','POST'])
def write_new_script(request):
response = ''
if(request.method == 'GET'):
response = new_html
else:
data = request.body.getvalue()
fields = data.split("&")
form_data = {}
for field in fields:
key,value = field.split('=')
form_data[key] = value
#print(form_data)
filename = form_data['scriptName']
textbuffer = form_data['scriptData']
textbuffer = cleanup_text(textbuffer)
storage.remount("/",readonly=False)
f = open(filename,"w",encoding='utf-8')
for line in textbuffer:
f.write(line)
f.close()
storage.remount("/",readonly=True)
response = response_html.format("Wrote script " + filename)
return("200 OK",[('Content-Type', 'text/html')], response)
@web_app.route("/run/<filename>")
def run_script(request, filename):
print("run_script ", filename)
response = response_html.format("Running script " + filename)
#print(response)
runScript(filename)
return("200 OK",[('Content-Type', 'text/html')], response)
@web_app.route("/")
def index(request):
response = ducky_main(request)
return("200 OK", [('Content-Type', 'text/html')], response)
@web_app.route("/api/run/<filenumber>")
def run_script(request, filenumber):
filename = setPayload(int(filenumber))
print("run_script ", filenumber)
response = response_html.format("Running script " + filename)
#print(response)
runScript(filename)
return("200 OK",[('Content-Type', 'text/html')], response)
async def startWebService():
HOST = repr(wifi.radio.ipv4_address_ap)
PORT = 80 # Port to listen on
print(HOST,PORT)
wsgiServer = server.WSGIServer(PORT, application=web_app)
print(f"open this IP in your browser: http://{HOST}:{PORT}/")
# Start the server
wsgiServer.start()
while True:
wsgiServer.update_poll()
await asyncio.sleep(0)

View File

@@ -1,286 +0,0 @@
# SPDX-FileCopyrightText: Copyright (c) 2019 Matt Costi for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_esp32spi_wsgiserver`
================================================================================
A simple WSGI (Web Server Gateway Interface) server that interfaces with the ESP32 over SPI.
Opens a specified port on the ESP32 to listen for incoming HTTP Requests and
Accepts an Application object that must be callable, which gets called
whenever a new HTTP Request has been received.
The Application MUST accept 2 ordered parameters:
1. environ object (incoming request data)
2. start_response function. Must be called before the Application
callable returns, in order to set the response status and headers.
The Application MUST return a single string in a list,
which is the response data
Requires update_poll being called in the applications main event loop.
For more details about Python WSGI see:
https://www.python.org/dev/peps/pep-0333/
* Author(s): Matt Costi
"""
# pylint: disable=no-name-in-module
import io
import gc
from micropython import const
import socketpool
import wifi
class BadRequestError(Exception):
"""Raised when the client sends an unexpected empty line"""
pass
_BUFFER_SIZE = 32
buffer = bytearray(_BUFFER_SIZE)
def readline(socketin):
"""
Implement readline() for native wifi using recv_into
"""
data_string = b""
while True:
try:
num = socketin.recv_into(buffer, 1)
data_string += str(buffer, 'utf8')[:num]
if num == 0:
return data_string
if data_string[-2:] == b"\r\n":
return data_string[:-2]
except OSError as ex:
# if ex.errno == 9: # [Errno 9] EBADF
# return None
if ex.errno == 11: # [Errno 11] EAGAIN
continue
raise
def read(socketin,length = -1):
total = 0
data_string = b""
try:
if length > 0:
while total < length:
reste = length - total
num = socketin.recv_into(buffer, min(_BUFFER_SIZE, reste))
#
if num == 0:
# timeout
# raise OSError(110)
return data_string
#
data_string += buffer[:num]
total = total + num
return data_string
else:
while True:
num = socketin.recv_into(buffer, 1)
data_string += str(buffer, 'utf8')[:num]
if num == 0:
return data_string
except OSError as ex:
if ex.errno == 11: # [Errno 11] EAGAIN
return data_string
raise
def parse_headers(sock):
"""
Parses the header portion of an HTTP request/response from the socket.
Expects first line of HTTP request/response to have been read already
return: header dictionary
rtype: Dict
"""
headers = {}
while True:
line = readline(sock)
if not line or line == b"\r\n":
break
#print("**line: ", line)
title, content = line.split(b': ', 1)
if title and content:
title = str(title.lower(), 'utf-8')
content = str(content, 'utf-8')
headers[title] = content
return headers
pool = socketpool.SocketPool(wifi.radio)
NO_SOCK_AVAIL = const(255)
# pylint: disable=invalid-name
class WSGIServer:
"""
A simple server that implements the WSGI interface
"""
def __init__(self, port=80, debug=False, application=None):
self.application = application
self.port = port
self._server_sock = None
self._client_sock = None
self._debug = debug
self._response_status = None
self._response_headers = []
def start(self):
"""
starts the server and begins listening for incoming connections.
Call update_poll in the main loop for the application callable to be
invoked on receiving an incoming request.
"""
self._server_sock = pool.socket(pool.AF_INET,pool.SOCK_STREAM)
HOST = repr(wifi.radio.ipv4_address_ap)
self._server_sock.bind((repr(wifi.radio.ipv4_address_ap), self.port))
self._server_sock.listen(1)
# if self._debug:
# ip = _the_interface.pretty_ip(_the_interface.ip_address)
# print("Server available at {0}:{1}".format(ip, self.port))
# print(
# "Sever status: ",
# _the_interface.get_server_state(self._server_sock.socknum),
# )
def pretty_ip(self):
return f"http://{wifi.radio.ipv4_address_ap}:{self.port}"
def update_poll(self):
"""
Call this method inside your main event loop to get the server
check for new incoming client requests. When a request comes in,
the application callable will be invoked.
"""
self.client_available()
if self._client_sock:
try:
environ = self._get_environ(self._client_sock)
result = self.application(environ, self._start_response)
self.finish_response(result)
except BadRequestError:
self._start_response("400 Bad Request", [])
self.finish_response([])
def finish_response(self, result):
"""
Called after the application callbile returns result data to respond with.
Creates the HTTP Response payload from the response_headers and results data,
and sends it back to client.
:param string result: the data string to send back in the response to the client.
"""
try:
response = "HTTP/1.1 {0}\r\n".format(self._response_status)
for header in self._response_headers:
response += "{0}: {1}\r\n".format(*header)
response += "\r\n"
self._client_sock.send(response.encode("utf-8"))
for data in result:
if isinstance(data, str):
data = data.encode("utf-8")
elif not isinstance(data, bytes):
data = str(data).encode("utf-8")
bytes_sent = 0
while bytes_sent < len(data):
try:
bytes_sent += self._client_sock.send(data[bytes_sent:])
except OSError as ex:
if ex.errno != 11: # [Errno 11] EAGAIN
raise
gc.collect()
except OSError as ex:
if ex.errno != 104: # [Errno 104] ECONNRESET
raise
finally:
#print("closing")
self._client_sock.close()
self._client_sock = None
def client_available(self):
"""
returns a client socket connection if available.
Otherwise, returns None
:return: the client
:rtype: Socket
"""
sock = None
if not self._server_sock:
print("Server has not been started, cannot check for clients!")
elif not self._client_sock:
self._server_sock.setblocking(False)
try:
self._client_sock, addr = self._server_sock.accept()
except OSError as ex:
if ex.errno != 11: # [Errno 11] EAGAIN
raise
return None
def _start_response(self, status, response_headers):
"""
The application callable will be given this method as the second param
This is to be called before the application callable returns, to signify
the response can be started with the given status and headers.
:param string status: a status string including the code and reason. ex: "200 OK"
:param list response_headers: a list of tuples to represent the headers.
ex ("header-name", "header value")
"""
self._response_status = status
self._response_headers = [("Server", "esp32WSGIServer")] + response_headers
def _get_environ(self, client):
"""
The application callable will be given the resulting environ dictionary.
It contains metadata about the incoming request and the request body ("wsgi.input")
:param Socket client: socket to read the request from
"""
env = {}
line = readline(client).decode("utf-8")
try:
(method, path, ver) = line.rstrip("\r\n").split(None, 2)
except ValueError:
raise BadRequestError("Unknown request from client.")
env["wsgi.version"] = (1, 0)
env["wsgi.url_scheme"] = "http"
env["wsgi.multithread"] = False
env["wsgi.multiprocess"] = False
env["wsgi.run_once"] = False
env["REQUEST_METHOD"] = method
env["SCRIPT_NAME"] = ""
env["SERVER_NAME"] = str(wifi.radio.ipv4_address_ap)
env["SERVER_PROTOCOL"] = ver
env["SERVER_PORT"] = self.port
if path.find("?") >= 0:
env["PATH_INFO"] = path.split("?")[0]
env["QUERY_STRING"] = path.split("?")[1]
else:
env["PATH_INFO"] = path
headers = parse_headers(client)
if "content-type" in headers:
env["CONTENT_TYPE"] = headers.get("content-type")
if "content-length" in headers:
env["CONTENT_LENGTH"] = headers.get("content-length")
body = read(client, int(env["CONTENT_LENGTH"]))
env["wsgi.input"] = io.StringIO(body)
else:
body = read(client)
env["wsgi.input"] = io.StringIO(body)
for name, value in headers.items():
key = "HTTP_" + name.replace("-", "_").upper()
if key in env:
value = "{0},{1}".format(env[key], value)
env[key] = value
return env