From fe7d62fbda2e017432f0129be28b081d16637d04 Mon Sep 17 00:00:00 2001 From: Ian Burgwin Date: Sun, 9 Feb 2020 02:28:51 -0800 Subject: [PATCH] remove pyctr copy, add to requirements.txt --- pyctr/.DS_Store | Bin 6148 -> 0 bytes pyctr/README.md | 7 - pyctr/__init__.py | 0 pyctr/common.py | 82 ----- pyctr/crypto.py | 747 --------------------------------------- pyctr/fileio.py | 107 ------ pyctr/type/__init__.py | 0 pyctr/type/base/title.py | 12 - pyctr/type/cci.py | 145 -------- pyctr/type/cia.py | 206 ----------- pyctr/type/exefs.py | 316 ----------------- pyctr/type/ncch.py | 538 ---------------------------- pyctr/type/romfs.py | 233 ------------ pyctr/type/smdh.py | 111 ------ pyctr/type/tmd.py | 316 ----------------- pyctr/util.py | 41 --- requirements.txt | 1 + 17 files changed, 1 insertion(+), 2861 deletions(-) delete mode 100644 pyctr/.DS_Store delete mode 100644 pyctr/README.md delete mode 100644 pyctr/__init__.py delete mode 100644 pyctr/common.py delete mode 100644 pyctr/crypto.py delete mode 100644 pyctr/fileio.py delete mode 100644 pyctr/type/__init__.py delete mode 100644 pyctr/type/base/title.py delete mode 100644 pyctr/type/cci.py delete mode 100644 pyctr/type/cia.py delete mode 100644 pyctr/type/exefs.py delete mode 100644 pyctr/type/ncch.py delete mode 100644 pyctr/type/romfs.py delete mode 100644 pyctr/type/smdh.py delete mode 100644 pyctr/type/tmd.py delete mode 100644 pyctr/util.py diff --git a/pyctr/.DS_Store b/pyctr/.DS_Store deleted file mode 100644 index b1d2f03a919a9c21483da04a6d0edefade33ab0e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHKy-veG4EBW#L0vjB-oVCGB=%5+fjJLQXj;*sRWiW9mW>5=HU=bK0v29^M_}M7 z_eM|clM2x1Z-ep z3>X8(z!orop3RaB1Z_12i~(byVSv969?F<1)`H>FfhCLpz!>HrxaL`cbG%}zSPQ}f zagqv@RHq|`lXTd<#-)n2prn)2;lt_9PA3!>cE|I52q%{c+G-3K16>Ao' - - @_raise_if_closed - def read(self, size: int = -1) -> bytes: - if size == -1: - size = self._info.size - self._seek - data = self._reader.get_data(self._info, self._seek, size) - self._seek += len(data) - return data - - read1 = read # probably make this act like read1 should, but this for now enables some other things to work - - @_raise_if_closed - def seek(self, seek: int, whence: int = 0) -> int: - if whence == 0: - if seek < 0: - raise ValueError(f'negative seek value {seek}') - self._seek = min(seek, self._info.size) - elif whence == 1: - self._seek = max(self._seek + seek, 0) - elif whence == 2: - self._seek = max(self._info.size + seek, 0) - return self._seek - - @_raise_if_closed - def tell(self) -> int: - return self._seek - - @_raise_if_closed - def readable(self) -> bool: - return True - - @_raise_if_closed - def writable(self) -> bool: - return False - - @_raise_if_closed - def seekable(self) -> bool: - return True diff --git a/pyctr/crypto.py b/pyctr/crypto.py deleted file mode 100644 index 681d438..0000000 --- a/pyctr/crypto.py +++ /dev/null @@ -1,747 +0,0 @@ -# This file is a part of ninfs. -# -# Copyright (c) 2017-2019 Ian Burgwin -# This file is licensed under The MIT License (MIT). -# You can find the full license text in LICENSE.md in the root of this project. - -from enum import IntEnum -from functools import wraps -from hashlib import sha256 -from io import BufferedIOBase -from os import environ -from os.path import getsize, join as pjoin -from struct import pack, unpack -from typing import TYPE_CHECKING - -from Cryptodome.Cipher import AES -from Cryptodome.Hash import CMAC -from Cryptodome.Util import Counter - -from .common import PyCTRError, _raise_if_closed -from .util import config_dirs, readbe, readle - -if TYPE_CHECKING: - # noinspection PyProtectedMember - from Cryptodome.Cipher._mode_cbc import CbcMode - # noinspection PyProtectedMember - from Cryptodome.Cipher._mode_ctr import CtrMode - # noinspection PyProtectedMember - from Cryptodome.Cipher._mode_ecb import EcbMode - from Cryptodome.Hash.CMAC import CMAC as CMACObject - from typing import BinaryIO, Dict, List, Optional, Union - -__all__ = ['CryptoError', 'OTPLengthError', 'CorruptBootromError', 'KeyslotMissingError', 'TicketLengthError', - 'BootromNotFoundError', 'CorruptOTPError', 'Keyslot', 'CryptoEngine'] - - -class CryptoError(PyCTRError): - """Generic exception for cryptography operations.""" - - -class OTPLengthError(CryptoError): - """OTP is the wrong length.""" - - -class CorruptOTPError(CryptoError): - """OTP hash does not match.""" - - -class KeyslotMissingError(CryptoError): - """Normal key is not set up for the keyslot.""" - - -class BadMovableSedError(CryptoError): - """movable.sed provided is invalid.""" - - -class TicketLengthError(CryptoError): - """Ticket is too small.""" - def __init__(self, length): - super().__init__(length) - - def __str__(self): - return f'0x350 expected, {self.args[0]:#x} given' - - -# wonder if I'm doing this right... -class BootromNotFoundError(CryptoError): - """ARM9 bootROM was not found. Main argument is a tuple of checked paths.""" - - -class CorruptBootromError(CryptoError): - """ARM9 bootROM hash does not match.""" - - -class Keyslot(IntEnum): - TWLNAND = 0x03 - CTRNANDOld = 0x04 - CTRNANDNew = 0x05 - FIRM = 0x06 - AGB = 0x07 - - CMACNANDDB = 0x0B - - NCCH93 = 0x18 - CMACCardSaveNew = 0x19 - CardSaveNew = 0x1A - NCCH96 = 0x1B - - CMACAGB = 0x24 - NCCH70 = 0x25 - - NCCH = 0x2C - UDSLocalWAN = 0x2D - StreetPass = 0x2E - Save60 = 0x2F - CMACSDNAND = 0x30 - - CMACCardSave = 0x33 - SD = 0x34 - - CardSave = 0x37 - BOSS = 0x38 - DownloadPlay = 0x39 - - DSiWareExport = 0x3A - - CommonKey = 0x3D - - # anything after 0x3F is custom to PyCTR - DecryptedTitlekey = 0x40 - - -BOOT9_PROT_HASH = '7331f7edece3dd33f2ab4bd0b3a5d607229fd19212c10b734cedcaf78c1a7b98' - -DEV_COMMON_KEY_0 = bytes.fromhex('55A3F872BDC80C555A654381139E153B') - -common_key_y = ( - # eShop - 0xD07B337F9CA4385932A2E25723232EB9, - # System - 0x0C767230F0998F1C46828202FAACBE4C, - # Unknown - 0xC475CB3AB8C788BB575E12A10907B8A4, - # Unknown - 0xE486EEE3D0C09C902F6686D4C06F649F, - # Unknown - 0xED31BA9C04B067506C4497A35B7804FC, - # Unknown - 0x5E66998AB4E8931606850FD7A16DD755 -) - -base_key_x = { - # New3DS 9.3 NCCH - 0x18: (0x82E9C9BEBFB8BDB875ECC0A07D474374, 0x304BF1468372EE64115EBD4093D84276), - # New3DS 9.6 NCCH - 0x1B: (0x45AD04953992C7C893724A9A7BCE6182, 0x6C8B2944A0726035F941DFC018524FB6), - # 7x NCCH - 0x25: (0xCEE7D8AB30C00DAE850EF5E382AC5AF3, 0x81907A4B6F1B47323A677974CE4AD71B), -} - -# global values to be copied to new CryptoEngine instances after the first one -_b9_key_x: 'Dict[int, int]' = {} -_b9_key_y: 'Dict[int, int]' = {} -_b9_key_normal: 'Dict[int, bytes]' = {} -_b9_extdata_otp: bytes = None -_b9_extdata_keygen: bytes = None -_b9_path: str = None -_otp_key: bytes = None -_otp_iv: bytes = None - -b9_paths: 'List[str]' = [] -for p in config_dirs: - b9_paths.append(pjoin(p, 'boot9.bin')) - b9_paths.append(pjoin(p, 'boot9_prot.bin')) -try: - b9_paths.insert(0, environ['BOOT9_PATH']) -except KeyError: - pass - - -def _requires_bootrom(method): - @wraps(method) - def wrapper(self, *args, **kwargs): - if not self.b9_keys_set: - raise KeyslotMissingError('bootrom is required to set up keys, see setup_keys_from_boot9') - return method(self, *args, **kwargs) - return wrapper - - -# used from http://www.falatic.com/index.php/108/python-and-bitwise-rotation -# converted to def because pycodestyle complained to me -def rol(val: int, r_bits: int, max_bits: int) -> int: - return (val << r_bits % max_bits) & (2 ** max_bits - 1) |\ - ((val & (2 ** max_bits - 1)) >> (max_bits - (r_bits % max_bits))) - - -class _TWLCryptoWrapper: - def __init__(self, cipher: 'CbcMode'): - self._cipher = cipher - - def encrypt(self, data: bytes) -> bytes: - data_len = len(data) - data_rev = bytearray(data_len) - for i in range(0, data_len, 0x10): - data_rev[i:i + 0x10] = data[i:i + 0x10][::-1] - - data_out = bytearray(self._cipher.encrypt(bytes(data_rev))) - - for i in range(0, data_len, 0x10): - data_out[i:i + 0x10] = data_out[i:i + 0x10][::-1] - return bytes(data_out[0:data_len]) - - decrypt = encrypt - - -class CryptoEngine: - """Class for 3DS crypto operations, including encryption and key generation.""" - - b9_keys_set: bool = False - b9_path: str = None - - _b9_extdata_otp: bytes = None - _b9_extdata_keygen: bytes = None - - _otp_device_id: int = None - _otp_key: bytes = None - _otp_iv: bytes = None - - _id0: bytes = None - - def __init__(self, boot9: str = None, dev: int = 0, setup_b9_keys: bool = True): - self.key_x: Dict[int, int] = {} - self.key_y: Dict[int, int] = {0x03: 0xE1A00005202DDD1DBD4DC4D30AB9DC76, - 0x05: 0x4D804F4E9990194613A204AC584460BE} - self.key_normal: Dict[int, bytes] = {} - - self.dev = dev - - for keyslot, keys in base_key_x.items(): - self.key_x[keyslot] = keys[dev] - - if setup_b9_keys: - self.setup_keys_from_boot9_file(boot9) - - @property - @_requires_bootrom - def b9_extdata_otp(self) -> bytes: - return self._b9_extdata_otp - - @property - @_requires_bootrom - def b9_extdata_keygen(self) -> bytes: - return self._b9_extdata_keygen - - @property - @_requires_bootrom - def otp_device_id(self) -> int: - return self._otp_device_id - - @property - @_requires_bootrom - def otp_key(self) -> bytes: - return self._otp_key - - @property - @_requires_bootrom - def otp_iv(self) -> bytes: - return self._otp_iv - - @property - def id0(self) -> bytes: - if not self._id0: - raise KeyslotMissingError('load a movable.sed with setup_sd_key') - return self._id0 - - def create_cbc_cipher(self, keyslot: Keyslot, iv: bytes) -> 'CbcMode': - """Create AES-CBC cipher with the given keyslot.""" - try: - key = self.key_normal[keyslot] - except KeyError: - raise KeyslotMissingError(f'normal key for keyslot 0x{keyslot:02x} is not set up') - - return AES.new(key, AES.MODE_CBC, iv) - - def create_ctr_cipher(self, keyslot: Keyslot, ctr: int) -> 'Union[CtrMode, _TWLCryptoWrapper]': - """ - Create an AES-CTR cipher with the given keyslot. - - Normal and DSi crypto will be automatically chosen depending on keyslot. - """ - try: - key = self.key_normal[keyslot] - except KeyError: - raise KeyslotMissingError(f'normal key for keyslot 0x{keyslot:02x} is not set up') - - cipher = AES.new(key, AES.MODE_CTR, counter=Counter.new(128, initial_value=ctr)) - - if keyslot < 0x04: - return _TWLCryptoWrapper(cipher) - else: - return cipher - - def create_ecb_cipher(self, keyslot: Keyslot) -> 'EcbMode': - """Create an AES-ECB cipher with the given keyslot.""" - try: - key = self.key_normal[keyslot] - except KeyError: - raise KeyslotMissingError(f'normal key for keyslot 0x{keyslot:02x} is not set up') - - return AES.new(key, AES.MODE_ECB) - - def create_cmac_object(self, keyslot: Keyslot) -> 'CMACObject': - """Create a CMAC object with the given keyslot.""" - try: - key = self.key_normal[keyslot] - except KeyError: - raise KeyslotMissingError(f'normal key for keyslot 0x{keyslot:02x} is not set up') - - return CMAC.new(key, ciphermod=AES) - - def create_ctr_io(self, keyslot: Keyslot, fh: 'BinaryIO', ctr: int): - """Create an AES-CTR read-write file object with the given keyslot.""" - return CTRFileIO(fh, self, keyslot, ctr) - - def create_cbc_io(self, keyslot: Keyslot, fh: 'BinaryIO', iv: bytes): - """Create an AES-CBC read-only file object with the given keyslot.""" - return CBCFileIO(fh, self, keyslot, iv) - - @staticmethod - def sd_path_to_iv(path: str) -> int: - # ensure the path is lowercase - path = path.lower() - - # SD Save Data Backup does a copy of the raw, encrypted file from the game's data directory - # so we need to handle this and fake the path - if path.startswith('/backup') and len(path) > 28: - tid_upper = path[12:20] - tid_lower = path[20:28] - path = f'/title/{tid_upper}/{tid_lower}/data' + path[28:] - - path_hash = sha256(path.encode('utf-16le') + b'\0\0').digest() - hash_p1 = readbe(path_hash[0:16]) - hash_p2 = readbe(path_hash[16:32]) - return hash_p1 ^ hash_p2 - - def load_from_ticket(self, ticket: bytes): - """Load a titlekey from a ticket and set keyslot 0x40 to the decrypted titlekey.""" - ticket_len = len(ticket) - # TODO: probably support other sig types which would be different lengths - # unlikely to happen in practice, but I would still like to - if ticket_len < 0x2AC: - raise TicketLengthError(ticket_len) - - titlekey_enc = ticket[0x1BF:0x1CF] - title_id = ticket[0x1DC:0x1E4] - common_key_index = ticket[0x1F1] - - if self.dev and common_key_index == 0: - self.set_normal_key(0x3D, DEV_COMMON_KEY_0) - else: - self.set_keyslot('y', 0x3D, common_key_y[common_key_index]) - - cipher = self.create_cbc_cipher(0x3D, title_id + (b'\0' * 8)) - self.set_normal_key(0x40, cipher.decrypt(titlekey_enc)) - - def set_keyslot(self, xy: str, keyslot: int, key: 'Union[int, bytes]'): - """Sets a keyslot to the specified key.""" - to_use = None - if xy == 'x': - to_use = self.key_x - elif xy == 'y': - to_use = self.key_y - if isinstance(key, bytes): - key = int.from_bytes(key, 'big' if keyslot > 0x03 else 'little') - to_use[keyslot] = key - try: - self.key_normal[keyslot] = self.keygen(keyslot) - except KeyError: - pass - - def set_normal_key(self, keyslot: int, key: bytes): - self.key_normal[keyslot] = key - - def keygen(self, keyslot: int) -> bytes: - """Generate a normal key based on the keyslot.""" - if keyslot < 0x04: - # DSi - return self.keygen_twl_manual(self.key_x[keyslot], self.key_y[keyslot]) - else: - # 3DS - return self.keygen_manual(self.key_x[keyslot], self.key_y[keyslot]) - - @staticmethod - def keygen_manual(key_x: int, key_y: int) -> bytes: - """Generate a normal key using the 3DS AES keyscrambler.""" - return rol((rol(key_x, 2, 128) ^ key_y) + 0x1FF9E9AAC5FE0408024591DC5D52768A, 87, 128).to_bytes(0x10, 'big') - - @staticmethod - def keygen_twl_manual(key_x: int, key_y: int) -> bytes: - """Generate a normal key using the DSi AES keyscrambler.""" - # usually would convert to LE bytes in the end then flip with [::-1], but those just cancel out - return rol((key_x ^ key_y) + 0xFFFEFB4E295902582A680F5F1A4F3E79, 42, 128).to_bytes(0x10, 'big') - - def _copy_global_keys(self): - self.key_x.update(_b9_key_x) - self.key_y.update(_b9_key_y) - self.key_normal.update(_b9_key_normal) - self._otp_key = _otp_key - self._otp_iv = _otp_iv - self._b9_extdata_otp = _b9_extdata_otp - self._b9_extdata_keygen = _b9_extdata_keygen - - self.b9_keys_set = True - - def setup_keys_from_boot9(self, b9: bytes): - """Set up certain keys from an ARM9 bootROM dump.""" - global _otp_key, _otp_iv, _b9_extdata_otp, _b9_extdata_keygen - if self.b9_keys_set: - return - - if _b9_key_x: - self._copy_global_keys() - return - - b9_len = len(b9) - if b9_len != 0x8000: - raise CorruptBootromError(f'wrong length: {b9_len}') - - b9_hash_digest: str = sha256(b9).hexdigest() - if b9_hash_digest != BOOT9_PROT_HASH: - raise CorruptBootromError(f'expected: {BOOT9_PROT_HASH}; returned: {b9_hash_digest}') - - keyblob_offset = 0x5860 - otp_key_offset = 0x56E0 - if self.dev: - keyblob_offset += 0x400 - otp_key_offset += 0x20 - - _otp_key = b9[otp_key_offset:otp_key_offset + 0x10] - _otp_iv = b9[otp_key_offset + 0x10:otp_key_offset + 0x20] - - keyblob: bytes = b9[keyblob_offset:keyblob_offset + 0x400] - - _b9_extdata_keygen = keyblob[0:0x200] - _b9_extdata_otp = keyblob[0:0x24] - - # Original NCCH key, UDS local-WLAN CCMP key, StreetPass key, 6.0 save key - _b9_key_x[0x2C] = _b9_key_x[0x2D] = _b9_key_x[0x2E] = _b9_key_x[0x2F] = readbe(keyblob[0x170:0x180]) - - # SD/NAND AES-CMAC key, APT wrap key, Unknown, Gamecard savedata AES-CMAC - _b9_key_x[0x30] = _b9_key_x[0x31] = _b9_key_x[0x32] = _b9_key_x[0x33] = readbe(keyblob[0x180:0x190]) - - # SD key (loaded from movable.sed), movable.sed key, Unknown (used by friends module), - # Gamecard savedata actual key - _b9_key_x[0x34] = _b9_key_x[0x35] = _b9_key_x[0x36] = _b9_key_x[0x37] = readbe(keyblob[0x190:0x1A0]) - - # BOSS key, Download Play key + actual NFC key for generating retail amiibo keys, CTR-CARD hardware-crypto seed - # decryption key - _b9_key_x[0x38] = _b9_key_x[0x39] = _b9_key_x[0x3A] = _b9_key_x[0x3B] = readbe(keyblob[0x1A0:0x1B0]) - - # Unused - _b9_key_x[0x3C] = readbe(keyblob[0x1B0:0x1C0]) - - # Common key (titlekey crypto) - _b9_key_x[0x3D] = readbe(keyblob[0x1C0:0x1D0]) - - # Unused - _b9_key_x[0x3E] = readbe(keyblob[0x1D0:0x1E0]) - - # NAND partition keys - _b9_key_y[0x04] = readbe(keyblob[0x1F0:0x200]) - # correct 0x05 KeyY not set by boot9. - _b9_key_y[0x06] = readbe(keyblob[0x210:0x220]) - _b9_key_y[0x07] = readbe(keyblob[0x220:0x230]) - - # Unused, Unused, DSiWare export key, NAND dbs/movable.sed AES-CMAC key - _b9_key_y[0x08] = readbe(keyblob[0x230:0x240]) - _b9_key_y[0x09] = readbe(keyblob[0x240:0x250]) - _b9_key_y[0x0A] = readbe(keyblob[0x250:0x260]) - _b9_key_y[0x0B] = readbe(keyblob[0x260:0x270]) - - _b9_key_normal[0x0D] = keyblob[0x270:0x280] - - self._copy_global_keys() - - def setup_keys_from_boot9_file(self, path: str = None): - """Set up certain keys from an ARM9 bootROM file.""" - global _b9_path - if self.b9_keys_set: - return - - if _b9_key_x: - self.b9_path = _b9_path - self._copy_global_keys() - return - - paths = (path,) if path else b9_paths - - for p in paths: - try: - b9_size = getsize(p) - if b9_size in {0x8000, 0x10000}: - with open(p, 'rb') as f: - if b9_size == 0x10000: - f.seek(0x8000) - self.setup_keys_from_boot9(f.read(0x8000)) - _b9_path = p - self.b9_path = p - return - except FileNotFoundError: - continue - - # if keys are not set... - raise BootromNotFoundError(paths) - - @_requires_bootrom - def setup_keys_from_otp(self, otp: bytes): - """Set up console-unique keys from an OTP dump. Encrypted and decrypted are supported.""" - otp_len = len(otp) - if otp_len != 0x100: - raise OTPLengthError(otp_len) - - cipher_otp = AES.new(self.otp_key, AES.MODE_CBC, self.otp_iv) - if otp[0:4] == b'\x0f\xb0\xad\xde': - # decrypted otp - otp_enc: bytes = cipher_otp.encrypt(otp) - otp_dec = otp - else: - # encrypted otp - otp_enc = otp - otp_dec: bytes = cipher_otp.decrypt(otp) - - self._otp_device_id = int.from_bytes(otp_dec[4:8], 'little') - - otp_hash: bytes = otp_dec[0xE0:0x100] - otp_hash_digest: bytes = sha256(otp_dec[0:0xE0]).digest() - if otp_hash_digest != otp_hash: - raise CorruptOTPError(f'expected: {otp_hash.hex()}; result: {otp_hash_digest.hex()}') - - otp_keysect_hash: bytes = sha256(otp_enc[0:0x90]).digest() - - self.set_keyslot('x', 0x11, otp_keysect_hash[0:0x10]) - self.set_keyslot('y', 0x11, otp_keysect_hash[0:0x10]) - - # most otp code from https://github.com/Stary2001/3ds_tools/blob/master/three_ds/aesengine.py - - twl_cid_lo, twl_cid_hi = readle(otp_dec[0x08:0xC]), readle(otp_dec[0xC:0x10]) - twl_cid_lo ^= 0xB358A6AF - twl_cid_lo |= 0x80000000 - twl_cid_hi ^= 0x08C267B7 - twl_cid_lo = twl_cid_lo.to_bytes(4, 'little') - twl_cid_hi = twl_cid_hi.to_bytes(4, 'little') - self.set_keyslot('x', 0x03, twl_cid_lo + b'NINTENDO' + twl_cid_hi) - - console_key_xy: bytes = sha256(otp_dec[0x90:0xAC] + self.b9_extdata_otp).digest() - self.set_keyslot('x', 0x3F, console_key_xy[0:0x10]) - self.set_keyslot('y', 0x3F, console_key_xy[0x10:0x20]) - - extdata_off = 0 - - def gen(n: int) -> bytes: - nonlocal extdata_off - extdata_off += 36 - iv = self.b9_extdata_keygen[extdata_off:extdata_off+16] - extdata_off += 16 - - data = self.create_cbc_cipher(0x3F, iv).encrypt(self.b9_extdata_keygen[extdata_off:extdata_off + 64]) - - extdata_off += n - return data - - a = gen(64) - for i in range(0x4, 0x8): - self.set_keyslot('x', i, a[0:16]) - - for i in range(0x8, 0xc): - self.set_keyslot('x', i, a[16:32]) - - for i in range(0xc, 0x10): - self.set_keyslot('x', i, a[32:48]) - - self.set_keyslot('x', 0x10, a[48:64]) - - b = gen(16) - off = 0 - for i in range(0x14, 0x18): - self.set_keyslot('x', i, b[off:off + 16]) - off += 16 - - c = gen(64) - for i in range(0x18, 0x1c): - self.set_keyslot('x', i, c[0:16]) - - for i in range(0x1c, 0x20): - self.set_keyslot('x', i, c[16:32]) - - for i in range(0x20, 0x24): - self.set_keyslot('x', i, c[32:48]) - - self.set_keyslot('x', 0x24, c[48:64]) - - d = gen(16) - off = 0 - - for i in range(0x28, 0x2c): - self.set_keyslot('x', i, d[off:off + 16]) - off += 16 - - @_requires_bootrom - def setup_keys_from_otp_file(self, path: str): - """Set up console-unique keys from an OTP file. Encrypted and decrypted are supported.""" - with open(path, 'rb') as f: - self.setup_keys_from_otp(f.read(0x100)) - - def setup_sd_key(self, data: bytes): - """Set up the SD key from movable.sed. Must be 0x10 (only key), 0x120 (no cmac), or 0x140 (with cmac).""" - if len(data) == 0x10: - key = data - elif len(data) in {0x120, 0x140}: - key = data[0x110:0x120] - else: - raise BadMovableSedError(f'invalid length ({len(data):#x}') - - self.set_keyslot('y', Keyslot.SD, key) - self.set_keyslot('y', Keyslot.CMACSDNAND, key) - self.set_keyslot('y', Keyslot.DSiWareExport, key) - - key_hash = sha256(key).digest()[0:16] - hash_parts = unpack('IIII', *hash_parts) - - def setup_sd_key_from_file(self, path: str): - """Set up the SD key from a movable.sed file.""" - with open(path, 'rb') as f: - self.setup_sd_key(f.read(0x140)) - - -class _CryptoFileBase(BufferedIOBase): - """Base class for CTR and CBC IO classes.""" - - closed = False - _reader: 'BinaryIO' - - def close(self): - self.closed = True - - __del__ = close - - @_raise_if_closed - def flush(self): - self._reader.flush() - - @_raise_if_closed - def tell(self) -> int: - return self._reader.tell() - - @_raise_if_closed - def readable(self) -> bool: - return self._reader.readable() - - @_raise_if_closed - def writable(self) -> bool: - return self._reader.writable() - - @_raise_if_closed - def seekable(self) -> bool: - return self._reader.seekable() - - -class CTRFileIO(_CryptoFileBase): - """Provides transparent read-write AES-CTR encryption as a file-like object.""" - - def __init__(self, file: 'BinaryIO', crypto: 'CryptoEngine', keyslot: Keyslot, counter: int): - self._reader = file - self._crypto = crypto - self._keyslot = keyslot - self._counter = counter - - def __repr__(self): - return f'{type(self).__name__}(file={self._reader!r}, keyslot={self._keyslot:#04x}, counter={self._counter!r})' - - @_raise_if_closed - def read(self, size: int = -1) -> bytes: - cur_offset = self.tell() - data = self._reader.read(size) - counter = self._counter + (cur_offset >> 4) - cipher = self._crypto.create_ctr_cipher(self._keyslot, counter) - # beginning padding - cipher.decrypt(b'\0' * (cur_offset % 0x10)) - return cipher.decrypt(data) - - read1 = read # probably make this act like read1 should, but this for now enables some other things to work - - @_raise_if_closed - def write(self, data: bytes) -> int: - cur_offset = self.tell() - counter = self._counter + (cur_offset >> 4) - cipher = self._crypto.create_ctr_cipher(self._keyslot, counter) - # beginning padding - cipher.encrypt(b'\0' * (cur_offset % 0x10)) - return self._reader.write(cipher.encrypt(data)) - - @_raise_if_closed - def seek(self, seek: int, whence: int = 0) -> int: - # TODO: if the seek goes past the file, the data between the former EOF and seek point should also be encrypted. - return self._reader.seek(seek, whence) - - def truncate(self, size: 'Optional[int]' = None) -> int: - return self._reader.truncate(size) - - -class CBCFileIO(_CryptoFileBase): - """Provides transparent read-only AES-CBC encryption as a file-like object.""" - - def __init__(self, file: 'BinaryIO', crypto: 'CryptoEngine', keyslot: Keyslot, iv: bytes): - self._reader = file - self._crypto = crypto - self._keyslot = keyslot - self._iv = iv - - def __repr__(self): - return f'{type(self).__name__}(file={self._reader!r}, keyslot={self._keyslot:#04x}, iv={self._iv!r})' - - @_raise_if_closed - def read(self, size: int = -1): - offset = self._reader.tell() - - # if encrypted, the block needs to be decrypted first - # CBC requires a full block (0x10 in this case). and the previous - # block is used as the IV. so that's quite a bit to read if the - # application requires just a few bytes. - # thanks Stary2001 for help with random-access crypto - - before = offset % 16 - if offset - before == 0: - iv = self._iv - else: - # seek back one block to read it as iv - self._reader.seek(-0x10 - before, 1) - iv = self._reader.read(0x10) - # this is done since we may not know the original size of the file - # and the caller may have requested -1 to read all the remaining data - data_before = self._reader.read(before) - data_requested = self._reader.read(size) - data_requested_len = len(data_requested) - data_total_len = len(data_before) + data_requested_len - if data_total_len % 16: - data_after = self._reader.read(16 - (data_total_len % 16)) - self._reader.seek(-len(data_after), 1) - else: - data_after = b'' - cipher = self._crypto.create_cbc_cipher(self._keyslot, iv) - # decrypt data, and cut off extra bytes - return cipher.decrypt( - b''.join((data_before, data_requested, data_after)) - )[before:data_requested_len + before] - - read1 = read # probably make this act like read1 should, but this for now enables some other things to work - - @_raise_if_closed - def seek(self, seek: int, whence: int = 0): - # even though read re-seeks to read required data, this allows the underlying object to handle seek how it wants - return self._reader.seek(seek, whence) - - @_raise_if_closed - def writable(self) -> bool: - return False diff --git a/pyctr/fileio.py b/pyctr/fileio.py deleted file mode 100644 index 992b85c..0000000 --- a/pyctr/fileio.py +++ /dev/null @@ -1,107 +0,0 @@ -from io import BufferedIOBase -from threading import Lock -from weakref import WeakValueDictionary -from typing import TYPE_CHECKING - -from .common import _raise_if_closed - -if TYPE_CHECKING: - from typing import BinaryIO - -# this prevents two SubsectionIO instances on the same file object from interfering with eachother -_lock_objects = WeakValueDictionary() - - -class SubsectionIO(BufferedIOBase): - """Provides read-write access to a subsection of a file.""" - - closed = False - _seek = 0 - - def __init__(self, file: 'BinaryIO', offset: int, size: int): - # get existing Lock object for file, or create a new one - file_id = id(file) - try: - self._lock = _lock_objects[file_id] - except KeyError: - self._lock = Lock() - _lock_objects[file_id] = self._lock - - self._reader = file - self._offset = offset - self._size = size - # subsection end is stored for convenience - self._end = offset + size - - def __repr__(self): - return f'{type(self).__name__}(file={self._reader!r}, offset={self._offset!r}, size={self._size!r})' - - def close(self): - self.closed = True - # remove Lock reference, so it can be automatically removed from the WeakValueDictionary once all SubsectionIO - # instances for the base file are closed - self._lock = None - - __del__ = close - - @_raise_if_closed - def read(self, size: int = -1) -> bytes: - if size == -1: - size = self._size - self._seek - if self._offset + self._seek > self._end: - # if attempting to read after the section, return nothing - return b'' - if self._seek + size > self._size: - size = self._size - self._seek - - with self._lock: - self._reader.seek(self._seek + self._offset) - data = self._reader.read(size) - - self._seek += len(data) - return data - - @_raise_if_closed - def seek(self, seek: int, whence: int = 0) -> int: - if whence == 0: - if seek < 0: - raise ValueError(f'negative seek value {seek}') - self._seek = min(seek, self._size) - elif whence == 1: - self._seek = max(self._seek + seek, 0) - elif whence == 2: - self._seek = max(self._size + seek, 0) - else: - if not isinstance(whence, int): - raise TypeError(f'an integer is required (got type {type(whence).__name__}') - raise ValueError(f'invalid whence ({seek}, should be 0, 1 or 2)') - return self._seek - - @_raise_if_closed - def write(self, data: bytes) -> int: - if self._seek > self._size: - # attempting to write past subsection - return 0 - data_len = len(data) - data_end = data_len + self._seek - if data_end > self._size: - data = data[:-(data_end - self._size)] - - with self._lock: - self._reader.seek(self._seek + self._offset) - data_written = self._reader.write(data) - - self._seek += data_written - return data_written - - @_raise_if_closed - def readable(self) -> bool: - return self._reader.readable() - - @_raise_if_closed - def writable(self) -> bool: - return self._reader.writable() - - @_raise_if_closed - def seekable(self) -> bool: - return self._reader.seekable() diff --git a/pyctr/type/__init__.py b/pyctr/type/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pyctr/type/base/title.py b/pyctr/type/base/title.py deleted file mode 100644 index be62d5e..0000000 --- a/pyctr/type/base/title.py +++ /dev/null @@ -1,12 +0,0 @@ -# This file is a part of ninfs. -# -# Copyright (c) 2017-2019 Ian Burgwin -# This file is licensed under The MIT License (MIT). -# You can find the full license text in LICENSE.md in the root of this project. - - -class TitleReaderBase: - - closed = False - - diff --git a/pyctr/type/cci.py b/pyctr/type/cci.py deleted file mode 100644 index 3c6e1cf..0000000 --- a/pyctr/type/cci.py +++ /dev/null @@ -1,145 +0,0 @@ -# This file is a part of ninfs. -# -# Copyright (c) 2017-2019 Ian Burgwin -# This file is licensed under The MIT License (MIT). -# You can find the full license text in LICENSE.md in the root of this project. - -from enum import IntEnum -from threading import Lock -from typing import TYPE_CHECKING, NamedTuple - -from ..common import PyCTRError -from ..fileio import SubsectionIO -from ..type.ncch import NCCHReader -from ..util import readle - -if TYPE_CHECKING: - from typing import BinaryIO, Dict, Union - -CCI_MEDIA_UNIT = 0x200 - - -class CCIError(PyCTRError): - """Generic error for CCI operations.""" - - -class InvalidCCIError(CCIError): - """Invalid CCI header exception.""" - - -class CCISection(IntEnum): - Header = -3 - CardInfo = -2 - DevInfo = -1 - - Application = 0 - Manual = 1 - DownloadPlayChild = 2 - Unk3 = 3 - Unk4 = 4 - Unk5 = 5 - UpdateOld3DS = 6 - UpdateNew3DS = 7 - - -class CCIRegion(NamedTuple): - section: 'Union[int, CCISection]' - offset: int - size: int - - -class CCIReader: - """Class for the 3DS CCI container.""" - - closed = False - - def __init__(self, fp: 'Union[str, BinaryIO]', *, case_insensitive: bool = True, dev: bool = False, - load_contents: bool = True, assume_decrypted: bool = False): - if isinstance(fp, str): - fp = open(fp, 'rb') - - # store the starting offset so the CCI can be read from any point in the base file - self._start = fp.tell() - self._fp = fp - # store case-insensitivity for RomFSReader - self._case_insensitive = case_insensitive - # threading lock - self._lock = Lock() - - # ignore the signature, we don't need it - self._fp.seek(0x100, 1) - header = fp.read(0x100) - if header[0:4] != b'NCSD': - raise InvalidCCIError('NCSD magic not found') - - # make sure the Media ID is not 00, which is used for the NAND header - self.media_id = header[0x8:0x10][::-1].hex() - if self.media_id == '00' * 8: - raise InvalidCCIError('Media ID is ' + self.media_id) - - self.image_size = readle(header[4:8]) * CCI_MEDIA_UNIT - - # this contains the location of each section - self.sections: Dict[CCISection, CCIRegion] = {} - - # this contains loaded sections - self.contents: Dict[CCISection, NCCHReader] = {} - - def add_region(section: 'CCISection', offset: int, size: int): - region = CCIRegion(section=section, offset=offset, size=size) - self.sections[section] = region - - # add each part of the header - add_region(CCISection.Header, 0, 0x200) - add_region(CCISection.CardInfo, 0x200, 0x1000) - add_region(CCISection.DevInfo, 0x1200, 0x300) - - # use a CCISection value for section keys - partition_sections = [x for x in CCISection if x >= 0] - - part_raw = header[0x20:0x60] - - # the first content always starts at 0x4000 but this code makes no assumptions about it - for idx, info_offset in enumerate(range(0, 0x40, 0x8)): - part_info = part_raw[info_offset:info_offset + 8] - part_offset = int.from_bytes(part_info[0:4], 'little') * CCI_MEDIA_UNIT - part_size = int.from_bytes(part_info[4:8], 'little') * CCI_MEDIA_UNIT - if part_offset: - section_id = partition_sections[idx] - add_region(section_id, part_offset, part_size) - - if load_contents: - content_fp = self.open_raw_section(section_id) - self.contents[section_id] = NCCHReader(content_fp, case_insensitive=case_insensitive, dev=dev, - assume_decrypted=assume_decrypted) - - def close(self): - self.closed = True - try: - self._fp.close() - except AttributeError: - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - __del__ = close - - def __repr__(self): - info = [('media_id', self.media_id)] - try: - info.append(('title_name', - repr(self.contents[CCISection.Application].exefs.icon.get_app_title().short_desc))) - except KeyError: - info.append(('title_name', 'unknown')) - info.append(('partition_count', len(self.contents))) - info_final = " ".join(x + ": " + str(y) for x, y in info) - return f'<{type(self).__name__} {info_final}>' - - def open_raw_section(self, section: 'CCISection'): - """Open a raw CCI section for reading.""" - region = self.sections[section] - return SubsectionIO(self._fp, self._start + region.offset, region.size) diff --git a/pyctr/type/cia.py b/pyctr/type/cia.py deleted file mode 100644 index f973e2a..0000000 --- a/pyctr/type/cia.py +++ /dev/null @@ -1,206 +0,0 @@ -# This file is a part of ninfs. -# -# Copyright (c) 2017-2019 Ian Burgwin -# This file is licensed under The MIT License (MIT). -# You can find the full license text in LICENSE.md in the root of this project. - -from enum import IntEnum -from io import BytesIO -from threading import Lock -from typing import TYPE_CHECKING, NamedTuple - -from ..common import PyCTRError -from ..crypto import CryptoEngine, Keyslot -from ..fileio import SubsectionIO -from ..type.ncch import NCCHReader -from ..type.tmd import TitleMetadataReader -from ..util import readle, roundup - -if TYPE_CHECKING: - from typing import BinaryIO, Dict, Optional, Union - -ALIGN_SIZE = 64 - - -class CIAError(PyCTRError): - """Generic error for CIA operations.""" - - -class InvalidCIAError(CIAError): - """Invalid CIA header exception.""" - - -class CIASection(IntEnum): - # these values as negative, as positive ones are used for contents - ArchiveHeader = -4 - CertificateChain = -3 - Ticket = -2 - TitleMetadata = -1 - Application = 0 - Manual = 1 - DownloadPlayChild = 2 - Meta = -5 - - -class CIARegion(NamedTuple): - section: 'Union[int, CIASection]' - offset: int - size: int - iv: bytes # only used for encrypted sections - - -class CIAReader: - """Class for the 3DS CIA container.""" - - closed = False - - def __init__(self, fp: 'Union[str, BinaryIO]', *, case_insensitive: bool = True, crypto: CryptoEngine = None, - dev: bool = False, seeddb: str = None, load_contents: bool = True): - if isinstance(fp, str): - fp = open(fp, 'rb') - - if crypto: - self._crypto = crypto - else: - self._crypto = CryptoEngine(dev=dev) - - # store the starting offset so the CIA can be read from any point in the base file - self._start = fp.tell() - self._fp = fp - # store case-insensitivity for RomFSReader - self._case_insensitive = case_insensitive - # threading lock - self._lock = Lock() - - header = fp.read(0x20) - - archive_header_size = readle(header[0x0:0x4]) - if archive_header_size != 0x2020: - raise InvalidCIAError('Archive Header Size is not 0x2020') - # in practice, the certificate chain is the same for all retail titles - cert_chain_size = readle(header[0x8:0xC]) - # the ticket size usually never changes from 0x350 - # there is one ticket (without an associated title) that is smaller though - ticket_size = readle(header[0xC:0x10]) - # tmd contains info about the contents of the title - tmd_size = readle(header[0x10:0x14]) - # meta contains info such as the SMDH and Title ID dependency list - meta_size = readle(header[0x14:0x18]) - # content size is the total size of the contents - # I'm not sure what happens yet if one of the contents is not aligned to 0x40 bytes. - content_size = readle(header[0x18:0x20]) - # the content index determines what contents are in the CIA - # this is not stored as int, so it's faster to parse(?) - content_index = fp.read(archive_header_size - 0x20) - - active_contents = set() - for idx, b in enumerate(content_index): - offset = idx * 8 - curr = b - for x in range(7, -1, -1): - if curr & 1: - active_contents.add(x + offset) - curr >>= 1 - - # the header only stores sizes; offsets need to be calculated. - # the sections are aligned to 64(0x40) bytes. for example, if something is 0x78, - # it will take up 0x80, with the remaining 0x8 being padding. - cert_chain_offset = roundup(archive_header_size, ALIGN_SIZE) - ticket_offset = cert_chain_offset + roundup(cert_chain_size, ALIGN_SIZE) - tmd_offset = ticket_offset + roundup(ticket_size, ALIGN_SIZE) - content_offset = tmd_offset + roundup(tmd_size, ALIGN_SIZE) - meta_offset = content_offset + roundup(content_size, ALIGN_SIZE) - - # lazy method to get the total size - self.total_size = meta_offset + meta_size - - # this contains the location of each section, as well as the IV of encrypted ones - self.sections: Dict[Union[int, CIASection], CIARegion] = {} - - def add_region(section: 'Union[int, CIASection]', offset: int, size: int, iv: 'Optional[bytes]'): - region = CIARegion(section=section, offset=offset, size=size, iv=iv) - self.sections[section] = region - - # add each part of the header - add_region(CIASection.ArchiveHeader, 0, archive_header_size, None) - add_region(CIASection.CertificateChain, cert_chain_offset, cert_chain_size, None) - add_region(CIASection.Ticket, ticket_offset, ticket_size, None) - add_region(CIASection.TitleMetadata, tmd_offset, tmd_size, None) - if meta_size: - add_region(CIASection.Meta, meta_offset, meta_size, None) - - # this will load the titlekey to decrypt the contents - self._fp.seek(self._start + ticket_offset) - ticket = self._fp.read(ticket_size) - self._crypto.load_from_ticket(ticket) - - # the tmd describes the contents: ID, index, size, and hash - self._fp.seek(self._start + tmd_offset) - tmd_data = self._fp.read(tmd_size) - self.tmd = TitleMetadataReader.load(BytesIO(tmd_data)) - - active_contents_tmd = set() - self.content_info = [] - - # this does a first check to make sure there are no missing contents that are marked active in content_index - for record in self.tmd.chunk_records: - if record.cindex in active_contents: - active_contents_tmd.add(record.cindex) - self.content_info.append(record) - - # if the result of this is not an empty set, it means there are contents enabled in content_index - # that are not in the tmd, which is bad - if active_contents ^ active_contents_tmd: - raise InvalidCIAError('Missing active contents in the TMD') - - self.contents = {} - - # this goes through the contents and figures out their regions, then creates an NCCHReader - curr_offset = content_offset - for record in self.content_info: - iv = None - if record.type.encrypted: - iv = record.cindex.to_bytes(2, 'big') + (b'\0' * 14) - add_region(record.cindex, curr_offset, record.size, iv) - if load_contents: - # check if the content is a Nintendo DS ROM (SRL) first - is_srl = record.cindex == 0 and self.tmd.title_id[3:5] == '48' - if not is_srl: - content_fp = self.open_raw_section(record.cindex) - self.contents[record.cindex] = NCCHReader(content_fp, case_insensitive=case_insensitive, - dev=dev, seeddb=seeddb) - - curr_offset += record.size - - def close(self): - self.closed = True - try: - self._fp.close() - except AttributeError: - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - __del__ = close - - def __repr__(self): - info = [('title_id', self.tmd.title_id)] - try: - info.append(('title_name', repr(self.contents[0].exefs.icon.get_app_title().short_desc))) - except KeyError: - info.append(('title_name', 'unknown')) - info.append(('content_count', len(self.contents))) - info_final = " ".join(x + ": " + str(y) for x, y in info) - return f'<{type(self).__name__} {info_final}>' - - def open_raw_section(self, section: 'CIASection'): - """Open a raw CIA section for reading.""" - region = self.sections[section] - fh = SubsectionIO(self._fp, self._start + region.offset, region.size) - if region.iv: - fh = self._crypto.create_cbc_io(Keyslot.DecryptedTitlekey, fh, region.iv) - return fh diff --git a/pyctr/type/exefs.py b/pyctr/type/exefs.py deleted file mode 100644 index fe94d2e..0000000 --- a/pyctr/type/exefs.py +++ /dev/null @@ -1,316 +0,0 @@ -# This file is a part of ninfs. -# -# Copyright (c) 2017-2019 Ian Burgwin -# This file is licensed under The MIT License (MIT). -# You can find the full license text in LICENSE.md in the root of this project. - -from hashlib import sha256 -from threading import Lock -from typing import TYPE_CHECKING, NamedTuple - -from ..common import PyCTRError, _ReaderOpenFileBase -from ..fileio import SubsectionIO -from ..util import readle -from ..type.smdh import SMDH, InvalidSMDHError - -if TYPE_CHECKING: - from typing import BinaryIO, Dict, Union - -__all__ = ['EXEFS_EMPTY_ENTRY', 'EXEFS_ENTRY_SIZE', 'EXEFS_ENTRY_COUNT', 'EXEFS_HEADER_SIZE', 'ExeFSError', - 'ExeFSFileNotFoundError', 'InvalidExeFSError', 'ExeFSNameError', 'BadOffsetError', 'CodeDecompressionError', - 'decompress_code', 'ExeFSReader'] - -EXEFS_ENTRY_SIZE = 0x10 -EXEFS_ENTRY_COUNT = 10 -EXEFS_EMPTY_ENTRY = b'\0' * EXEFS_ENTRY_SIZE -EXEFS_HEADER_SIZE = 0x200 - -CODE_DECOMPRESSED_NAME = '.code-decompressed' - - -class ExeFSError(PyCTRError): - """Generic exception for ExeFS operations.""" - - -class ExeFSFileNotFoundError(ExeFSError): - """File not found in the ExeFS.""" - - -class InvalidExeFSError(ExeFSError): - """Invalid ExeFS header.""" - - -class ExeFSNameError(InvalidExeFSError): - """Name could not be decoded, likely making the file not a valid ExeFS.""" - - def __str__(self): - return f'could not decode from ascii: {self.args[0]!r}' - - -class BadOffsetError(InvalidExeFSError): - """Offset is not a multiple of 0x200. This kind of ExeFS will not work on a 3DS.""" - - def __str__(self): - return f'offset is not a multiple of 0x200: {self.args[0]:#x}' - - -class CodeDecompressionError(ExeFSError): - """Exception when attempting to decompress ExeFS .code.""" - - -# lazy check -CODE_MAX_SIZE = 0x2300000 - - -def decompress_code(code: bytes) -> bytes: - # remade from C code, this could probably be done better - # https://github.com/d0k3/GodMode9/blob/689f6f7cf4280bf15885cbbf848d8dce81def36b/arm9/source/game/codelzss.c#L25-L93 - off_size_comp = int.from_bytes(code[-8:-4], 'little') - add_size = int.from_bytes(code[-4:], 'little') - comp_start = 0 - code_len = len(code) - - code_comp_size = off_size_comp & 0xFFFFFF - code_comp_end = code_comp_size - ((off_size_comp >> 24) % 0xFF) - code_dec_size = code_len + add_size - - if code_len < 8: - raise CodeDecompressionError('code_len < 8') - if code_len > CODE_MAX_SIZE: - raise CodeDecompressionError('code_len > CODE_MAX_SIZE') - - if code_comp_size <= code_len: - comp_start = code_len - code_comp_size - - if code_comp_end < 0: - raise CodeDecompressionError('code_comp_end < 0') - if code_dec_size > CODE_MAX_SIZE: - raise CodeDecompressionError('code_dec_size > CODE_MAX_SIZE') - - dec = bytearray(code) - dec.extend(b'\0' * add_size) - - data_end = comp_start + code_dec_size - ptr_in = comp_start + code_comp_end - ptr_out = code_dec_size - - while ptr_in > comp_start and ptr_out > comp_start: - if ptr_out < ptr_in: - raise CodeDecompressionError('ptr_out < ptr_in') - - ptr_in -= 1 - ctrl_byte = dec[ptr_in] - for i in range(7, -1, -1): - if ptr_in <= comp_start or ptr_out <= comp_start: - break - - if (ctrl_byte >> i) & 1: - ptr_in -= 2 - seg_code = int.from_bytes(dec[ptr_in:ptr_in + 2], 'little') - if ptr_in < comp_start: - raise CodeDecompressionError('ptr_in < comp_start') - seg_off = (seg_code & 0x0FFF) + 2 - seg_len = ((seg_code >> 12) & 0xF) + 3 - - if ptr_out - seg_len < comp_start: - raise CodeDecompressionError('ptr_out - seg_len < comp_start') - if ptr_out + seg_off >= data_end: - raise CodeDecompressionError('ptr_out + seg_off >= data_end') - - c = 0 - while c < seg_len: - byte = dec[ptr_out + seg_off] - ptr_out -= 1 - dec[ptr_out] = byte - c += 1 - else: - if ptr_out == comp_start: - raise CodeDecompressionError('ptr_out == comp_start') - if ptr_in == comp_start: - raise CodeDecompressionError('ptr_in == comp_start') - - ptr_out -= 1 - ptr_in -= 1 - dec[ptr_out] = dec[ptr_in] - - if ptr_in != comp_start: - raise CodeDecompressionError('ptr_in != comp_start') - if ptr_out != comp_start: - raise CodeDecompressionError('ptr_out != comp_start') - - return bytes(dec) - - -class ExeFSEntry(NamedTuple): - name: str - offset: int - size: int - hash: bytes - - -def _normalize_path(p: str): - """Fix a given path to work with ExeFS filenames.""" - if p.startswith('/'): - p = p[1:] - # while it is technically possible for an ExeFS entry to contain ".bin", - # this would not happen in practice. - # even so, normalization can be disabled by passing normalize=False to - # ExeFSReader.open - if p.lower().endswith('.bin'): - p = p[:4] - return p - - -class _ExeFSOpenFile(_ReaderOpenFileBase): - """Class for open ExeFS file entries.""" - - def __init__(self, reader: 'ExeFSReader', path: str): - super().__init__(reader, path) - self._info = reader.entries[self._path] - - -class ExeFSReader: - """ - Class to read the 3DS ExeFS container. - - http://3dbrew.org/wiki/ExeFS - """ - - closed = False - _code_dec = None - icon: 'SMDH' = None - - def __init__(self, fp: 'Union[str, BinaryIO]', *, closefd: bool = True, _load_icon: bool = True): - if isinstance(fp, str): - fp = open(fp, 'rb') - - # storing the starting offset lets it work from anywhere in the file - self._start = fp.tell() - self._fp = fp - self._lock = Lock() - self._closefd = closefd - - self.entries: 'Dict[str, ExeFSEntry]' = {} - - header = fp.read(EXEFS_HEADER_SIZE) - - # ExeFS entries can fit up to 10 names. hashes are stored in reverse order - # (e.g. the first entry would have the hash at the very end - 0x1E0) - for entry_n, hash_n in zip(range(0, EXEFS_ENTRY_COUNT * EXEFS_ENTRY_SIZE, EXEFS_ENTRY_SIZE), - range(0x1E0, 0xA0, -0x20)): - entry_raw = header[entry_n:entry_n + 0x10] - entry_hash = header[hash_n:hash_n + 0x20] - if entry_raw == EXEFS_EMPTY_ENTRY: - continue - - try: - # ascii is used since only a-z would be used in practice - name = entry_raw[0:8].rstrip(b'\0').decode('ascii') - except UnicodeDecodeError: - raise ExeFSNameError(entry_raw[0:8]) - - entry = ExeFSEntry(name=name, - offset=readle(entry_raw[8:12]), - size=readle(entry_raw[12:16]), - hash=entry_hash) - - # the 3DS fails to parse an ExeFS with an offset that isn't a multiple of 0x200 - # so we should do the same here - if entry.offset % 0x200: - raise BadOffsetError(entry.offset) - - self.entries[name] = entry - - # this sometimes needs to be loaded outside, since reading it here may cause encryption problems - # when the NCCH has not fully initialized yet and needs to figure out what ExeFS regions need - # to be decrypted with the Original NCCH key - if _load_icon: - self._load_icon() - - def _load_icon(self): - try: - with self.open('icon') as f: - self.icon = SMDH.load(f) - except (ExeFSFileNotFoundError, InvalidSMDHError): - pass - - def __len__(self) -> int: - """Return the amount of entries in the ExeFS.""" - return len(self.entries) - - def close(self): - self.closed = True - if self._closefd: - try: - self._fp.close() - except AttributeError: - pass - - __del__ = close - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - def open(self, path: str, *, normalize: bool = True): - """Open a file in the ExeFS for reading.""" - if normalize: - # remove beginning "/" and ending ".bin" - path = _normalize_path(path) - try: - entry = self.entries[path] - except KeyError: - raise ExeFSFileNotFoundError(path) - if entry.offset == -1: - # this would be the decompressed .code, if the original .code was compressed - return _ExeFSOpenFile(self, path) - else: - return SubsectionIO(self._fp, self._start + EXEFS_HEADER_SIZE + entry.offset, entry.size) - - def get_data(self, info: ExeFSEntry, offset: int, size: int) -> bytes: - if offset + size > info.size: - size = info.size - offset - with self._lock: - if info.offset == -1: - # return the decompressed code instead - return self._code_dec[offset:offset + size] - else: - # data for ExeFS entries start relative to the end of the header - self._fp.seek(self._start + EXEFS_HEADER_SIZE + info.offset + offset) - return self._fp.read(size) - - def decompress_code(self) -> bool: - """ - Decompress '.code' in the container. The result will be available as '.code-decompressed'. - - The return value is if '.code' was actually decompressed. - """ - with self.open('.code') as f: - code = f.read() - - # if it's already decompressed, this would return the code unmodified - code_dec = decompress_code(code) - - decompressed = code_dec != code - - if decompressed: - code_dec_hash = sha256(code_dec) - entry = ExeFSEntry(name=CODE_DECOMPRESSED_NAME, - offset=-1, - size=len(code_dec), - hash=code_dec_hash.digest()) - self._code_dec = code_dec - else: - # if the code was already decompressed, don't store a second copy in memory - code_entry = self.entries['.code'] - entry = ExeFSEntry(name=CODE_DECOMPRESSED_NAME, - offset=code_entry.offset, - size=code_entry.size, - hash=code_entry.hash) - - self.entries[CODE_DECOMPRESSED_NAME] = entry - - # returns if the code was actually decompressed or not - return decompressed diff --git a/pyctr/type/ncch.py b/pyctr/type/ncch.py deleted file mode 100644 index 10e0efa..0000000 --- a/pyctr/type/ncch.py +++ /dev/null @@ -1,538 +0,0 @@ -# This file is a part of ninfs. -# -# Copyright (c) 2017-2019 Ian Burgwin -# This file is licensed under The MIT License (MIT). -# You can find the full license text in LICENSE.md in the root of this project. - -from hashlib import sha256 -from enum import IntEnum -from math import ceil -from os import environ -from os.path import join as pjoin -from threading import Lock -from typing import TYPE_CHECKING, NamedTuple - -from .exefs import ExeFSReader, EXEFS_HEADER_SIZE -from .romfs import RomFSReader -from ..common import PyCTRError, _ReaderOpenFileBase -from ..crypto import CryptoEngine, Keyslot -from ..fileio import SubsectionIO -from ..util import config_dirs, readle, roundup - -if TYPE_CHECKING: - from typing import BinaryIO, Dict, List, Optional, Tuple, Union - -__all__ = ['NCCH_MEDIA_UNIT', 'NO_ENCRYPTION', 'EXEFS_NORMAL_CRYPTO_FILES', 'FIXED_SYSTEM_KEY', 'NCCHError', - 'InvalidNCCHError', 'NCCHSeedError', 'MissingSeedError', 'SeedDBNotFoundError', 'get_seed', - 'extra_cryptoflags', 'NCCHSection', 'NCCHRegion', 'NCCHFlags', 'NCCHReader'] - - -class NCCHError(PyCTRError): - """Generic exception for NCCH operations.""" - - -class InvalidNCCHError(NCCHError): - """Invalid NCCH header exception.""" - - -class NCCHSeedError(NCCHError): - """NCCH seed is not set up, or attempted to set up seed when seed crypto is not used.""" - - -class MissingSeedError(NCCHSeedError): - """Seed could not be found.""" - - -class SeedDBNotFoundError(NCCHSeedError): - """SeedDB was not found. Main argument is a tuple of checked paths.""" - - -def get_seed(f: 'BinaryIO', program_id: int) -> bytes: - """Get a seed in a seeddb.bin from an I/O stream.""" - # convert the Program ID to little-endian bytes, as the TID is stored in seeddb.bin this way - tid_bytes = program_id.to_bytes(0x8, 'little') - f.seek(0) - # get the amount of seeds - seed_count = readle(f.read(4)) - f.seek(0x10) - for _ in range(seed_count): - entry = f.read(0x20) - if entry[0:8] == tid_bytes: - return entry[0x8:0x18] - raise NCCHSeedError(f'missing seed for {program_id:016X} from seeddb.bin') - - -seeddb_paths = [pjoin(x, 'seeddb.bin') for x in config_dirs] -try: - # try to insert the path in the SEEDDB_PATH environment variable - seeddb_paths.insert(0, environ['SEEDDB_PATH']) -except KeyError: - pass - -# NCCH sections are stored in media units -# for example, ExeFS may be stored in 13 media units, which is 0x1A00 bytes (13 * 0x200) -NCCH_MEDIA_UNIT = 0x200 -# depending on the crypto_method flag, a different keyslot may be used for RomFS and parts of ExeFS. -extra_cryptoflags = {0x00: Keyslot.NCCH, 0x01: Keyslot.NCCH70, 0x0A: Keyslot.NCCH93, 0x0B: Keyslot.NCCH96} - -# if fixed_crypto_key is enabled, the normal key is normally all zeros. -# however is (program_id & (0x10 << 32)) is true, this key is used instead. -FIXED_SYSTEM_KEY = 0x527CE630A9CA305F3696F3CDE954194B - - -# this is IntEnum to make generating the IV easier -class NCCHSection(IntEnum): - ExtendedHeader = 1 - ExeFS = 2 - RomFS = 3 - - # no crypto - Header = 4 - Logo = 5 - Plain = 6 - - # special - FullDecrypted = 7 - Raw = 8 - - -# these sections don't use encryption at all -NO_ENCRYPTION = {NCCHSection.Header, NCCHSection.Logo, NCCHSection.Plain, NCCHSection.Raw} -# the contents of these files in the ExeFS, plus the header, will always use the Original NCCH keyslot -# therefore these regions need to be stored to check what keyslot is used to decrypt -EXEFS_NORMAL_CRYPTO_FILES = {'icon', 'banner'} - - -class NCCHRegion(NamedTuple): - section: 'NCCHSection' - offset: int - size: int - end: int # this is just offset + size, stored to avoid re-calculation later on - # not all sections will actually use this (see NCCHSection), so some have a useless value - iv: int - - -class NCCHFlags(NamedTuple): - # determines the extra keyslot used for RomFS and parts of ExeFS - crypto_method: int - # if this is a CXI (CTR Executable Image) or CFA (CTR File Archive) - # in the raw flags, "Data" has to be set for it to be a CFA, while "Executable" is unset. - executable: bool - # if the content is encrypted using a fixed normal key. - fixed_crypto_key: bool - # if RomFS is to be ignored - no_romfs: bool - # if the NCCH has no encryption - no_crypto: bool - # if a seed must be loaded to load RomFS and parts of ExeFS - uses_seed: bool - - -class _NCCHSectionFile(_ReaderOpenFileBase): - """Provides a raw, decrypted NCCH section as a file-like object.""" - - def __init__(self, reader: 'NCCHReader', path: 'NCCHSection'): - super().__init__(reader, path) - self._info = reader.sections[path] - - -class NCCHReader: - """Class for 3DS NCCH container.""" - - seed_set_up = False - seed: 'Optional[bytes]' = None - # this is the KeyY when generated using the seed - _seeded_key_y = None - closed = False - - # this lists the ranges of the ExeFS to decrypt with Original NCCH (see load_sections) - _exefs_keyslot_normal_range: 'List[Tuple[int, int]]' - exefs: 'Optional[ExeFSReader]' = None - romfs: 'Optional[RomFSReader]' = None - - def __init__(self, fp: 'Union[str, BinaryIO]', *, case_insensitive: bool = True, crypto: CryptoEngine = None, - dev: bool = False, seeddb: str = None, load_sections: bool = True, assume_decrypted: bool = False): - if isinstance(fp, str): - fp = open(fp, 'rb') - - if crypto: - self._crypto = crypto - else: - self._crypto = CryptoEngine(dev=dev) - - # old decryption methods did not fix the flags, so sometimes we have to assume it is decrypted - self.assume_decrypted = assume_decrypted - - # store the starting offset so the NCCH can be read from any point in the base file - self._start = fp.tell() - self._fp = fp - # store case-insensitivity for RomFSReader - self._case_insensitive = case_insensitive - # threaing lock - self._lock = Lock() - - header = fp.read(0x200) - - # load the Key Y from the first 0x10 of the signature - self._key_y = header[0x0:0x10] - # store the ncch version - self.version = readle(header[0x112:0x114]) - # get the total size of the NCCH container, and store it in bytes - self.content_size = readle(header[0x104:0x108]) * NCCH_MEDIA_UNIT - # get the Partition ID, which is used in the encryption - # this is generally different for each content in a title, except for DLC - self.partition_id = readle(header[0x108:0x110]) - # load the seed verify field, which is part of an sha256 hash to verify if - # a seed is correct for this title - self._seed_verify = header[0x114:0x118] - # load the Product Code store it as a unicode string - self.product_code = header[0x150:0x160].decode('ascii').strip('\0') - # load the Program ID - # this is the Title ID, and - self.program_id = readle(header[0x118:0x120]) - # load the extheader size, but this code only uses it to determine if it exists - extheader_size = readle(header[0x180:0x184]) - - # each section is stored with the section ID, then the region information (offset, size, IV) - self.sections: 'Dict[NCCHSection, NCCHRegion]' = {} - # same as above, but includes non-existant regions too, for the full-decrypted handler - self._all_sections: 'Dict[NCCHSection, NCCHRegion]' = {} - - def add_region(section: 'NCCHSection', starting_unit: int, units: int): - offset = starting_unit * NCCH_MEDIA_UNIT - size = units * NCCH_MEDIA_UNIT - region = NCCHRegion(section=section, - offset=offset, - size=size, - end=offset + size, - iv=self.partition_id << 64 | (section << 56)) - self._all_sections[section] = region - if units != 0: # only add existing regions - self.sections[section] = region - - # add the header as the first region - add_region(NCCHSection.Header, 0, 1) - - # add the full decrypted content, which when read, simulates a fully decrypted NCCH container - add_region(NCCHSection.FullDecrypted, 0, self.content_size // NCCH_MEDIA_UNIT) - # add the full raw content - add_region(NCCHSection.Raw, 0, self.content_size // NCCH_MEDIA_UNIT) - - # only care about the exheader if it's the expected size - if extheader_size == 0x400: - add_region(NCCHSection.ExtendedHeader, 1, 4) - else: - add_region(NCCHSection.ExtendedHeader, 0, 0) - - # add the remaining NCCH regions - # some of these may not exist, and won't be added if units (second value) is 0 - add_region(NCCHSection.Logo, readle(header[0x198:0x19C]), readle(header[0x19C:0x1A0])) - add_region(NCCHSection.Plain, readle(header[0x190:0x194]), readle(header[0x194:0x198])) - add_region(NCCHSection.ExeFS, readle(header[0x1A0:0x1A4]), readle(header[0x1A4:0x1A8])) - add_region(NCCHSection.RomFS, readle(header[0x1B0:0x1B4]), readle(header[0x1B4:0x1B8])) - - # parse flags - flags_raw = header[0x188:0x190] - self.flags = NCCHFlags(crypto_method=flags_raw[3], executable=bool(flags_raw[5] & 0x2), - fixed_crypto_key=bool(flags_raw[7] & 0x1), no_romfs=bool(flags_raw[7] & 0x2), - no_crypto=bool(flags_raw[7] & 0x4), uses_seed=bool(flags_raw[7] & 0x20)) - - # load the original (non-seeded) KeyY into the Original NCCH slot - self._crypto.set_keyslot('y', Keyslot.NCCH, self.get_key_y(original=True)) - - # load the seed if needed - if self.flags.uses_seed: - self.load_seed_from_seeddb(seeddb) - - # load the (seeded, if needed) key into the extra keyslot - self._crypto.set_keyslot('y', self.extra_keyslot, self.get_key_y()) - - # load the sections using their specific readers - if load_sections: - self.load_sections() - - def close(self): - self.closed = True - try: - self._fp.close() - except AttributeError: - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - __del__ = close - - def load_sections(self): - """Load the sections of the NCCH (Extended Header, ExeFS, and RomFS).""" - - # try to load the ExeFS - try: - self._fp.seek(self._start + self.sections[NCCHSection.ExeFS].offset) - except KeyError: - pass # no ExeFS - else: - # this is to generate what regions should be decrypted with the Original NCCH keyslot - # technically, it's not actually 0x200 chunks or units. the actual space of the file - # is encrypted with the different key. for example, if .code is 0x300 bytes, that - # means the first 0x300 are encrypted with the NCCH 7.x key, and the remaining - # 0x100 uses Original NCCH. however this would be quite a pain to implement properly - # with random access, so I only work with 0x200 chunks here. after all, the space - # after the file is effectively unused. it makes no difference, except for - # perfectionists who want it perfectly decrypted. GodMode9 does it properly I think, - # if that is what you want. or you can fix the empty space yourself with a hex editor. - self._exefs_keyslot_normal_range = [(0, 0x200)] - exefs_fp = self.open_raw_section(NCCHSection.ExeFS) - # load the RomFS reader - self.exefs = ExeFSReader(exefs_fp, _load_icon=False) - - for entry in self.exefs.entries.values(): - if entry.name in EXEFS_NORMAL_CRYPTO_FILES: - # this will add the offset (relative to ExeFS start), with the size - # rounded up to 0x200 chunks - r = (entry.offset + EXEFS_HEADER_SIZE, - entry.offset + EXEFS_HEADER_SIZE + roundup(entry.size, NCCH_MEDIA_UNIT)) - self._exefs_keyslot_normal_range.append(r) - - self.exefs._load_icon() - - # try to load RomFS - if not self.flags.no_romfs: - try: - self._fp.seek(self._start + self.sections[NCCHSection.RomFS].offset) - except KeyError: - pass # no RomFS - else: - romfs_fp = self.open_raw_section(NCCHSection.RomFS) - # load the RomFS reader - self.romfs = RomFSReader(romfs_fp, case_insensitive=self._case_insensitive) - - def open_raw_section(self, section: 'NCCHSection'): - """Open a raw NCCH section for reading.""" - # check if the region is ExeFS and uses a newer keyslot, or is fulldec, and use a specific file class - if (section == NCCHSection.ExeFS and self.extra_keyslot) or (section == NCCHSection.FullDecrypted): - return _NCCHSectionFile(self, section) - else: - region = self.sections[section] - fh = SubsectionIO(self._fp, self._start + region.offset, region.size) - # if the region is encrypted (not ExeFS if an extra keyslot is in use), wrap it in CTRFileIO - if not (self.assume_decrypted or self.flags.no_crypto or section in NO_ENCRYPTION): - keyslot = self.extra_keyslot if region.section == NCCHSection.RomFS else Keyslot.NCCH - fh = self._crypto.create_ctr_io(keyslot, fh, region.iv) - return fh - - def get_key_y(self, original: bool = False) -> bytes: - if original or not self.flags.uses_seed: - return self._key_y - if self.flags.uses_seed and not self.seed_set_up: - raise MissingSeedError('NCCH uses seed crypto, but seed is not set up') - else: - return self._seeded_key_y - - @property - def extra_keyslot(self) -> int: - return extra_cryptoflags[self.flags.crypto_method] - - def check_for_extheader(self) -> bool: - return NCCHSection.ExtendedHeader in self.sections - - def setup_seed(self, seed: bytes): - if not self.flags.uses_seed: - raise NCCHSeedError('NCCH does not use seed crypto') - seed_verify_hash = sha256(seed + self.program_id.to_bytes(0x8, 'little')).digest() - if seed_verify_hash[0x0:0x4] != self._seed_verify: - raise NCCHSeedError('given seed does not match with seed verify hash in header') - self.seed = seed - self._seeded_key_y = sha256(self._key_y + seed).digest()[0:16] - self.seed_set_up = True - - def load_seed_from_seeddb(self, path: str = None): - if not self.flags.uses_seed: - raise NCCHSeedError('NCCH does not use seed crypto') - if path: - # if a path was provided, use only that - paths = (path,) - else: - # use the fixed set of paths - paths = seeddb_paths - for fn in paths: - try: - with open(fn, 'rb') as f: - # try to load the seed from the file - self.setup_seed(get_seed(f, self.program_id)) - return - except FileNotFoundError: - continue - - # if keys are not set... - raise InvalidNCCHError(paths) - - def get_data(self, section: 'Union[NCCHRegion, NCCHSection]', offset: int, size: int) -> bytes: - try: - region = self._all_sections[section] - except KeyError: - region = section - if offset + size > region.size: - # prevent reading past the region - size = region.size - offset - - # the full-decrypted handler is done outside of the thread lock - if region.section == NCCHSection.FullDecrypted: - before = offset % 0x200 - aligned_offset = offset - before - aligned_size = size + before - - def do_thing(al_offset: int, al_size: int, cut_start: int, cut_end: int): - # get the offset of the end of the last chunk - end = al_offset + (ceil(al_size / 0x200) * 0x200) - - # store the sections to read - # dict is ordered by default in CPython since 3.6.0, and part of the language spec since 3.7.0 - to_read: Dict[Tuple[NCCHSection, int], List[int]] = {} - - # get each section to a local variable for easier access - header = self._all_sections[NCCHSection.Header] - extheader = self._all_sections[NCCHSection.ExtendedHeader] - logo = self._all_sections[NCCHSection.Logo] - plain = self._all_sections[NCCHSection.Plain] - exefs = self._all_sections[NCCHSection.ExeFS] - romfs = self._all_sections[NCCHSection.RomFS] - - last_region = False - - # this is somewhat hardcoded for performance reasons. this may be optimized better later. - for chunk_offset in range(al_offset, end, 0x200): - # RomFS check first, since it might be faster - if romfs.offset <= chunk_offset < romfs.end: - region = (NCCHSection.RomFS, 0) - curr_offset = romfs.offset - - # ExeFS check second, since it might be faster - elif exefs.offset <= chunk_offset < exefs.end: - region = (NCCHSection.ExeFS, 0) - curr_offset = exefs.offset - - elif header.offset <= chunk_offset < header.end: - region = (NCCHSection.Header, 0) - curr_offset = header.offset - - elif extheader.offset <= chunk_offset < extheader.end: - region = (NCCHSection.ExtendedHeader, 0) - curr_offset = extheader.offset - - elif logo.offset <= chunk_offset < logo.end: - region = (NCCHSection.Logo, 0) - curr_offset = logo.offset - - elif plain.offset <= chunk_offset < plain.end: - region = (NCCHSection.Plain, 0) - curr_offset = plain.offset - - else: - region = (NCCHSection.Raw, chunk_offset) - curr_offset = 0 - - if region not in to_read: - to_read[region] = [chunk_offset - curr_offset, 0] - to_read[region][1] += 0x200 - last_region = region - - is_start = True - for region, info in to_read.items(): - new_data = self.get_data(region[0], info[0], info[1]) - if region[0] == NCCHSection.Header: - # fix crypto flags - ncch_array = bytearray(new_data) - ncch_array[0x18B] = 0 - ncch_array[0x18F] = 4 - new_data = bytes(ncch_array) - if is_start: - new_data = new_data[cut_start:] - is_start = False - if region == last_region and cut_end != 0x200: - new_data = new_data[:-cut_end] - - yield new_data - - return b''.join(do_thing(aligned_offset, aligned_size, before, 0x200 - ((size + before) % 0x200))) - - with self._lock: - # check if decryption is really needed - if self.assume_decrypted or self.flags.no_crypto or region.section in NO_ENCRYPTION: - # this is currently used to support FullDecrypted. other sections use SubsectionIO + CTRFileIO. - self._fp.seek(self._start + region.offset + offset) - return self._fp.read(size) - - # thanks Stary2001 for help with random-access crypto - - # if the region is ExeFS and extra crypto is being used, special handling is required - # because different parts use different encryption methods - if region.section == NCCHSection.ExeFS and self.flags.crypto_method != 0x00: - # get the amount to cut off at the beginning - before = offset % 0x200 - - # get the offset of the starting chunk - aligned_offset = offset - before - - # get the real offset of the starting chunk - aligned_real_offset = self._start + region.offset + aligned_offset - - # get the aligned total size of the requested size - aligned_size = size + before - self._fp.seek(aligned_real_offset) - - def do_thing(al_offset: int, al_size: int, cut_start: int, cut_end: int): - # get the offset of the end of the last chunk - end = al_offset + (ceil(al_size / 0x200) * 0x200) - - # get the offset to the last chunk - last_chunk_offset = end - 0x200 - - # noinspection PyTypeChecker - for chunk in range(al_offset, end, 0x200): - # generate the IV for this chunk - iv = region.iv + (chunk >> 4) - - # get the extra keyslot - keyslot = self.extra_keyslot - - for r in self._exefs_keyslot_normal_range: - if r[0] <= self._fp.tell() - region.offset < r[1]: - # if the chunk is within the "normal keyslot" ranges, - # use the Original NCCH keyslot instead - keyslot = Keyslot.NCCH - - # decrypt the data - out = self._crypto.create_ctr_cipher(keyslot, iv).decrypt(self._fp.read(0x200)) - if chunk == al_offset: - # cut off the beginning if it's the first chunk - out = out[cut_start:] - if chunk == last_chunk_offset and cut_end != 0x200: - # cut off the end of it's the last chunk - out = out[:-cut_end] - yield out - - # join all the chunks into one bytes result and return it - return b''.join(do_thing(aligned_offset, aligned_size, before, 0x200 - ((size + before) % 0x200))) - else: - # this is currently used to support FullDecrypted. other sections use SubsectionIO + CTRFileIO. - - # seek to the real offset of the section + the requested offset - self._fp.seek(self._start + region.offset + offset) - data = self._fp.read(size) - - # choose the extra keyslot only for RomFS here - # ExeFS needs special handling if a newer keyslot is used, therefore it's not checked here - keyslot = self.extra_keyslot if region.section == NCCHSection.RomFS else Keyslot.NCCH - - # get the amount of padding required at the beginning - before = offset % 16 - - # pad the beginning of the data if needed (the ending part doesn't need padding) - data = (b'\0' * before) + data - - # decrypt the data, then cut off the padding - return self._crypto.create_ctr_cipher(keyslot, region.iv + (offset >> 4)).decrypt(data)[before:] diff --git a/pyctr/type/romfs.py b/pyctr/type/romfs.py deleted file mode 100644 index 760208a..0000000 --- a/pyctr/type/romfs.py +++ /dev/null @@ -1,233 +0,0 @@ -# This file is a part of ninfs. -# -# Copyright (c) 2017-2019 Ian Burgwin -# This file is licensed under The MIT License (MIT). -# You can find the full license text in LICENSE.md in the root of this project. - -from io import TextIOWrapper -from threading import Lock -from typing import overload, TYPE_CHECKING, NamedTuple - -from ..common import PyCTRError, _ReaderOpenFileBase -from ..fileio import SubsectionIO -from ..util import readle, roundup - -if TYPE_CHECKING: - from typing import BinaryIO, Optional, Tuple, Union - -__all__ = ['IVFC_HEADER_SIZE', 'IVFC_ROMFS_MAGIC_NUM', 'ROMFS_LV3_HEADER_SIZE', 'RomFSError', 'InvalidIVFCError', - 'InvalidRomFSHeaderError', 'RomFSEntryError', 'RomFSFileNotFoundError', 'RomFSReader'] - -IVFC_HEADER_SIZE = 0x5C -IVFC_ROMFS_MAGIC_NUM = 0x10000 -ROMFS_LV3_HEADER_SIZE = 0x28 - - -class RomFSError(PyCTRError): - """Generic exception for RomFS operations.""" - - -class InvalidIVFCError(RomFSError): - """Invalid IVFC header exception.""" - - -class InvalidRomFSHeaderError(RomFSError): - """Invalid RomFS Level 3 header.""" - - -class RomFSEntryError(RomFSError): - """Error with RomFS Directory or File entry.""" - - -class RomFSFileNotFoundError(RomFSEntryError): - """Invalid file path in RomFS Level 3.""" - - -class RomFSIsADirectoryError(RomFSEntryError): - """Attempted to open a directory as a file.""" - - -class RomFSRegion(NamedTuple): - offset: int - size: int - - -class RomFSDirectoryEntry(NamedTuple): - name: str - type: str - contents: 'Tuple[str, ...]' - - -class RomFSFileEntry(NamedTuple): - name: str - type: str - offset: int - size: int - - -class RomFSReader: - """ - Class for 3DS RomFS Level 3 partition. - - https://www.3dbrew.org/wiki/RomFS - """ - - closed = False - lv3_offset = 0 - data_offset = 0 - - def __init__(self, fp: 'Union[str, BinaryIO]', case_insensitive: bool = False): - if isinstance(fp, str): - fp = open(fp, 'rb') - - self._start = fp.tell() - self._fp = fp - self.case_insensitive = case_insensitive - self._lock = Lock() - - lv3_offset = fp.tell() - magic = fp.read(4) - - # detect ivfc and get the lv3 offset - if magic == b'IVFC': - ivfc = magic + fp.read(0x54) # IVFC_HEADER_SIZE - 4 - ivfc_magic_num = readle(ivfc[0x4:0x8]) - if ivfc_magic_num != IVFC_ROMFS_MAGIC_NUM: - raise InvalidIVFCError(f'IVFC magic number is invalid ' - f'({ivfc_magic_num:#X} instead of {IVFC_ROMFS_MAGIC_NUM:#X})') - master_hash_size = readle(ivfc[0x8:0xC]) - lv3_block_size = readle(ivfc[0x4C:0x50]) - lv3_hash_block_size = 1 << lv3_block_size - lv3_offset += roundup(0x60 + master_hash_size, lv3_hash_block_size) - fp.seek(self._start + lv3_offset) - magic = fp.read(4) - self.lv3_offset = lv3_offset - - lv3_header = magic + fp.read(0x24) # ROMFS_LV3_HEADER_SIZE - 4 - - # get offsets and sizes from lv3 header - lv3_header_size = readle(magic) - lv3_dirhash = RomFSRegion(offset=readle(lv3_header[0x4:0x8]), size=readle(lv3_header[0x8:0xC])) - lv3_dirmeta = RomFSRegion(offset=readle(lv3_header[0xC:0x10]), size=readle(lv3_header[0x10:0x14])) - lv3_filehash = RomFSRegion(offset=readle(lv3_header[0x14:0x18]), size=readle(lv3_header[0x18:0x1C])) - lv3_filemeta = RomFSRegion(offset=readle(lv3_header[0x1C:0x20]), size=readle(lv3_header[0x20:0x24])) - lv3_filedata_offset = readle(lv3_header[0x24:0x28]) - self.data_offset = lv3_offset + lv3_filedata_offset - - # verify lv3 header - if lv3_header_size != ROMFS_LV3_HEADER_SIZE: - raise InvalidRomFSHeaderError('Length in RomFS Lv3 header is not 0x28') - if lv3_dirhash.offset < lv3_header_size: - raise InvalidRomFSHeaderError('Directory Hash offset is before the end of the Lv3 header') - if lv3_dirmeta.offset < lv3_dirhash.offset + lv3_dirhash.size: - raise InvalidRomFSHeaderError('Directory Metadata offset is before the end of the Directory Hash region') - if lv3_filehash.offset < lv3_dirmeta.offset + lv3_dirmeta.size: - raise InvalidRomFSHeaderError('File Hash offset is before the end of the Directory Metadata region') - if lv3_filemeta.offset < lv3_filehash.offset + lv3_filehash.size: - raise InvalidRomFSHeaderError('File Metadata offset is before the end of the File Hash region') - if lv3_filedata_offset < lv3_filemeta.offset + lv3_filemeta.size: - raise InvalidRomFSHeaderError('File Data offset is before the end of the File Metadata region') - - # get entries from dirmeta and filemeta - def iterate_dir(out: dict, raw: bytes, current_path: str): - first_child_dir = readle(raw[0x8:0xC]) - first_file = readle(raw[0xC:0x10]) - - out['type'] = 'dir' - out['contents'] = {} - - # iterate through all child directories - if first_child_dir != 0xFFFFFFFF: - fp.seek(self._start + lv3_offset + lv3_dirmeta.offset + first_child_dir) - while True: - child_dir_meta = fp.read(0x18) - next_sibling_dir = readle(child_dir_meta[0x4:0x8]) - child_dir_name = fp.read(readle(child_dir_meta[0x14:0x18])).decode('utf-16le') - child_dir_name_meta = child_dir_name.lower() if case_insensitive else child_dir_name - if child_dir_name_meta in out['contents']: - print(f'WARNING: Dirname collision! {current_path}{child_dir_name}') - out['contents'][child_dir_name_meta] = {'name': child_dir_name} - - iterate_dir(out['contents'][child_dir_name_meta], child_dir_meta, - f'{current_path}{child_dir_name}/') - if next_sibling_dir == 0xFFFFFFFF: - break - fp.seek(self._start + lv3_offset + lv3_dirmeta.offset + next_sibling_dir) - - if first_file != 0xFFFFFFFF: - fp.seek(self._start + lv3_offset + lv3_filemeta.offset + first_file) - while True: - child_file_meta = fp.read(0x20) - next_sibling_file = readle(child_file_meta[0x4:0x8]) - child_file_offset = readle(child_file_meta[0x8:0x10]) - child_file_size = readle(child_file_meta[0x10:0x18]) - child_file_name = fp.read(readle(child_file_meta[0x1C:0x20])).decode('utf-16le') - child_file_name_meta = child_file_name.lower() if self.case_insensitive else child_file_name - if child_file_name_meta in out['contents']: - print(f'WARNING: Filename collision! {current_path}{child_file_name}') - out['contents'][child_file_name_meta] = {'name': child_file_name, 'type': 'file', - 'offset': child_file_offset, 'size': child_file_size} - - self.total_size += child_file_size - if next_sibling_file == 0xFFFFFFFF: - break - fp.seek(self._start + lv3_offset + lv3_filemeta.offset + next_sibling_file) - - self._tree_root = {'name': 'ROOT'} - self.total_size = 0 - fp.seek(self._start + lv3_offset + lv3_dirmeta.offset) - iterate_dir(self._tree_root, fp.read(0x18), '/') - - def close(self): - self.closed = True - try: - self._fp.close() - except AttributeError: - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - @overload - def open(self, path: str, encoding: str, errors: 'Optional[str]' = None, - newline: 'Optional[str]' = None) -> TextIOWrapper: ... - - @overload - def open(self, path: str, encoding: None = None, errors: 'Optional[str]' = None, - newline: 'Optional[str]' = None) -> SubsectionIO: ... - - def open(self, path, encoding=None, errors=None, newline=None): - """Open a file in the RomFS for reading.""" - file_info = self.get_info_from_path(path) - if not isinstance(file_info, RomFSFileEntry): - raise RomFSIsADirectoryError(path) - f = SubsectionIO(self._fp, self._start + self.data_offset + file_info.offset, file_info.size) - if encoding is not None: - f = TextIOWrapper(f, encoding, errors, newline) - return f - - __del__ = close - - def get_info_from_path(self, path: str) -> 'Union[RomFSDirectoryEntry, RomFSFileEntry]': - """Get a directory or file entry""" - curr = self._tree_root - if self.case_insensitive: - path = path.lower() - if path[0] == '/': - path = path[1:] - for part in path.split('/'): - if part == '': - break - try: - # noinspection PyTypeChecker - curr = curr['contents'][part] - except KeyError: - raise RomFSFileNotFoundError(path) - if curr['type'] == 'dir': - contents = (k['name'] for k in curr['contents'].values()) - return RomFSDirectoryEntry(name=curr['name'], type='dir', contents=(*contents,)) - elif curr['type'] == 'file': - return RomFSFileEntry(name=curr['name'], type='file', offset=curr['offset'], size=curr['size']) diff --git a/pyctr/type/smdh.py b/pyctr/type/smdh.py deleted file mode 100644 index bd3b739..0000000 --- a/pyctr/type/smdh.py +++ /dev/null @@ -1,111 +0,0 @@ -# This file is a part of ninfs. -# -# Copyright (c) 2017-2019 Ian Burgwin -# This file is licensed under The MIT License (MIT). -# You can find the full license text in LICENSE.md in the root of this project. - -from types import MappingProxyType -from typing import TYPE_CHECKING, NamedTuple - -from ..common import PyCTRError - -if TYPE_CHECKING: - from typing import BinaryIO, Dict, Mapping, Optional, Tuple, Union - -SMDH_SIZE = 0x36C0 - -region_names = ( - 'Japanese', - 'English', - 'French', - 'German', - 'Italian', - 'Spanish', - 'Simplified Chinese', - 'Korean', - 'Dutch', - 'Portuguese', - 'Russian', - 'Traditional Chinese', -) - -# the order of the SMDH names to check. the difference here is that English is put before Japanese. -_region_order_check = ( - 'English', - 'Japanese', - 'French', - 'German', - 'Italian', - 'Spanish', - 'Simplified Chinese', - 'Korean', - 'Dutch', - 'Portuguese', - 'Russian', - 'Traditional Chinese', -) - - -class SMDHError(PyCTRError): - """Generic exception for SMDH operations.""" - - -class InvalidSMDHError(SMDHError): - """Invalid SMDH contents.""" - - -class AppTitle(NamedTuple): - short_desc: str - long_desc: str - publisher: str - - -class SMDH: - """ - Class for 3DS SMDH. Icon data is currently not supported. - - https://www.3dbrew.org/wiki/SMDH - """ - - # TODO: support other settings - - def __init__(self, names: 'Dict[str, AppTitle]'): - self.names: Mapping[str, AppTitle] = MappingProxyType({n: names.get(n, None) for n in region_names}) - - def __repr__(self): - return f'<{type(self).__name__} title: {self.get_app_title().short_desc}>' - - def get_app_title(self, language: 'Union[str, Tuple[str, ...]]' = _region_order_check) -> 'Optional[AppTitle]': - if isinstance(language, str): - language = (language,) - - for l in language: - apptitle = self.names[l] - if apptitle: - return apptitle - - # if, for some reason, it fails to return... - return AppTitle('unknown', 'unknown', 'unknown') - - @classmethod - def load(cls, fp: 'BinaryIO') -> 'SMDH': - """Load an SMDH from a file-like object.""" - smdh = fp.read(SMDH_SIZE) - if len(smdh) != SMDH_SIZE: - raise InvalidSMDHError(f'invalid size (expected: {SMDH_SIZE:#6x}, got: {len(smdh):#6x}') - if smdh[0:4] != b'SMDH': - raise InvalidSMDHError('SMDH magic not found') - - app_structs = smdh[8:0x2008] - names: Dict[str, AppTitle] = {} - # due to region_names only being 12 elements, this will only process 12. the other 4 are unused. - for app_title, region in zip((app_structs[x:x + 0x200] for x in range(0, 0x2000, 0x200)), region_names): - names[region] = AppTitle(app_title[0:0x80].decode('utf-16le').strip('\0'), - app_title[0x80:0x180].decode('utf-16le').strip('\0'), - app_title[0x180:0x200].decode('utf-16le').strip('\0')) - return cls(names) - - @classmethod - def from_file(cls, fn: str) -> 'SMDH': - with open(fn, 'rb') as f: - return cls.load(f) diff --git a/pyctr/type/tmd.py b/pyctr/type/tmd.py deleted file mode 100644 index 0da6fa1..0000000 --- a/pyctr/type/tmd.py +++ /dev/null @@ -1,316 +0,0 @@ -# This file is a part of ninfs. -# -# Copyright (c) 2017-2019 Ian Burgwin -# This file is licensed under The MIT License (MIT). -# You can find the full license text in LICENSE.md in the root of this project. - -from hashlib import sha256 -from struct import pack -from typing import TYPE_CHECKING, NamedTuple - -from ..common import PyCTRError -from ..util import readbe, readle - -if TYPE_CHECKING: - from typing import BinaryIO, Iterable - -__all__ = ['CHUNK_RECORD_SIZE', 'TitleMetadataError', 'InvalidSignatureTypeError', 'InvalidHashError', - 'ContentInfoRecord', 'ContentChunkRecord', 'ContentTypeFlags', 'TitleVersion', 'TitleMetadataReader'] - -CHUNK_RECORD_SIZE = 0x30 - -# sig-type: (sig-size, padding) -signature_types = { - # RSA_4096 SHA1 (unused on 3DS) - 0x00010000: (0x200, 0x3C), - # RSA_2048 SHA1 (unused on 3DS) - 0x00010001: (0x100, 0x3C), - # Elliptic Curve with SHA1 (unused on 3DS) - 0x00010002: (0x3C, 0x40), - # RSA_4096 SHA256 - 0x00010003: (0x200, 0x3C), - # RSA_2048 SHA256 - 0x00010004: (0x100, 0x3C), - # ECDSA with SHA256 - 0x00010005: (0x3C, 0x40), -} - -BLANK_SIG_PAIR = (0x00010004, b'\xFF' * signature_types[0x00010004][0]) - - -class TitleMetadataError(PyCTRError): - """Generic exception for TitleMetadata operations.""" - - -class InvalidTMDError(TitleMetadataError): - """Title Metadata is invalid.""" - - -class InvalidSignatureTypeError(InvalidTMDError): - """Invalid signature type was used.""" - - def __init__(self, sig_type): - super().__init__(sig_type) - - def __str__(self): - return f'{self.args[0]:#010x}' - - -class InvalidHashError(InvalidTMDError): - """Hash mismatch in the Title Metadata.""" - - -class InvalidInfoRecordError(InvalidHashError): - """Hash mismatch in the Content Info Records.""" - - def __init__(self, info_record): - super().__init__(info_record) - - def __str__(self): - return f'Invalid info record: {self.args[0]}' - - -class UnusualInfoRecordError(InvalidTMDError): - """Encountered Content Info Record that attempts to hash a Content Chunk Record that has already been hashed.""" - - def __init__(self, info_record, chunk_record): - super().__init__(info_record, chunk_record) - - def __str__(self): - return f'Attempted to hash twice: {self.args[0]}, {self.args[1]}' - - -class ContentTypeFlags(NamedTuple): - encrypted: bool - disc: bool - cfm: bool - optional: bool - shared: bool - - def __index__(self) -> int: - return self.encrypted | (self.disc << 1) | (self.cfm << 2) | (self.optional << 14) | (self.shared << 15) - - __int__ = __index__ - - def __format__(self, format_spec: str) -> str: - return self.__int__().__format__(format_spec) - - @classmethod - def from_int(cls, flags: int) -> 'ContentTypeFlags': - # noinspection PyArgumentList - return cls(bool(flags & 1), bool(flags & 2), bool(flags & 4), bool(flags & 0x4000), bool(flags & 0x8000)) - - -class ContentInfoRecord(NamedTuple): - index_offset: int - command_count: int - hash: bytes - - def __bytes__(self) -> bytes: - return b''.join((self.index_offset.to_bytes(2, 'big'), self.command_count.to_bytes(2, 'big'), self.hash)) - - -class ContentChunkRecord(NamedTuple): - id: str - cindex: int - type: ContentTypeFlags - size: int - hash: bytes - - def __bytes__(self) -> bytes: - return b''.join((bytes.fromhex(self.id), self.cindex.to_bytes(2, 'big'), int(self.type).to_bytes(2, 'big'), - self.size.to_bytes(8, 'big'), self.hash)) - - -class TitleVersion(NamedTuple): - major: int - minor: int - micro: int - - def __str__(self) -> str: - return f'{self.major}.{self.minor}.{self.micro}' - - def __index__(self) -> int: - return (self.major << 10) | (self.minor << 4) | self.micro - - __int__ = __index__ - - def __format__(self, format_spec: str) -> str: - return self.__int__().__format__(format_spec) - - @classmethod - def from_int(cls, ver: int) -> 'TitleVersion': - # noinspection PyArgumentList - return cls((ver >> 10) & 0x3F, (ver >> 4) & 0x3F, ver & 0xF) - - -class TitleMetadataReader: - """ - Class for 3DS Title Metadata. - - https://www.3dbrew.org/wiki/Title_metadata - """ - - __slots__ = ('title_id', 'save_size', 'srl_save_size', 'title_version', 'info_records', - 'chunk_records', 'content_count', 'signature', '_u_issuer', '_u_version', '_u_ca_crl_version', - '_u_signer_crl_version', '_u_reserved1', '_u_system_version', '_u_title_type', '_u_group_id', - '_u_reserved2', '_u_srl_flag', '_u_reserved3', '_u_access_rights', '_u_boot_count', '_u_padding') - - # arguments prefixed with _u_ are values unused by the 3DS and/or are only kept around to generate the final tmd - def __init__(self, *, title_id: str, save_size: int, srl_save_size: int, title_version: TitleVersion, - info_records: 'Iterable[ContentInfoRecord]', chunk_records: 'Iterable[ContentChunkRecord]', - signature=BLANK_SIG_PAIR, _u_issuer='Root-CA00000003-CP0000000b', _u_version=1, _u_ca_crl_version=0, - _u_signer_crl_version=0, _u_reserved1=0, _u_system_version=b'\0' * 8, _u_title_type=b'\0\0\0@', - _u_group_id=b'\0\0', _u_reserved2=b'\0\0\0\0', _u_srl_flag=0, _u_reserved3=b'\0' * 0x31, - _u_access_rights=b'\0' * 4, _u_boot_count=b'\0\0', _u_padding=b'\0\0'): - # TODO: add checks - self.title_id = title_id.lower() - self.save_size = save_size - self.srl_save_size = srl_save_size - self.title_version = title_version - self.info_records = tuple(info_records) - self.chunk_records = tuple(chunk_records) - self.content_count = len(self.chunk_records) - self.signature = signature # TODO: store this differently - - # unused values - self._u_issuer = _u_issuer - self._u_version = _u_version - self._u_ca_crl_version = _u_ca_crl_version - self._u_signer_crl_version = _u_signer_crl_version - self._u_reserved1 = _u_reserved1 - self._u_system_version = _u_system_version - self._u_title_type = _u_title_type - self._u_group_id = _u_group_id - self._u_reserved2 = _u_reserved2 - self._u_srl_flag = _u_srl_flag - self._u_reserved3 = _u_reserved3 - self._u_access_rights = _u_access_rights - self._u_boot_count = _u_boot_count - self._u_padding = _u_padding - - def __hash__(self) -> int: - return hash((self.title_id, self.save_size, self.srl_save_size, self.title_version, - self.info_records, self.chunk_records)) - - def __repr__(self) -> str: - return (f'') - - def __bytes__(self) -> bytes: - sig_data = pack(f'>I {signature_types[self.signature[0]][0]}s {signature_types[self.signature[0]][1]}x', - self.signature[0], self.signature[1]) - - info_records = b''.join(bytes(x) for x in self.info_records).ljust(0x900, b'\0') - - header = pack('>64s b b b b 8s 8s 4s 2s I I 4s b 49s 4s H H 2s 2s 32s', self._u_issuer.encode('ascii'), - self._u_version, self._u_ca_crl_version, self._u_signer_crl_version, self._u_reserved1, - self._u_system_version, bytes.fromhex(self.title_id), self._u_title_type, self._u_group_id, - self.save_size, self.srl_save_size, self._u_reserved2, self._u_srl_flag, self._u_reserved3, - self._u_access_rights, self.title_version, self.content_count, self._u_boot_count, - self._u_padding, sha256(info_records).digest()) - - chunk_records = b''.join(bytes(x) for x in self.chunk_records) - - return sig_data + header + info_records + chunk_records - - @classmethod - def load(cls, fp: 'BinaryIO', verify_hashes: bool = True) -> 'TitleMetadataReader': - """Load a tmd from a file-like object.""" - sig_type = readbe(fp.read(4)) - try: - sig_size, sig_padding = signature_types[sig_type] - except KeyError: - raise InvalidSignatureTypeError(sig_type) - - signature = fp.read(sig_size) - try: - fp.seek(sig_padding, 1) - except Exception: - # most streams are probably seekable, but for some that aren't... - fp.read(sig_padding) - - header = fp.read(0xC4) - if len(header) != 0xC4: - raise InvalidTMDError('Header length is not 0xC4') - - # only values that actually have a use are loaded here. (currently) - # several fields in were left in from the Wii tmd and have no function on 3DS. - title_id = header[0x4C:0x54].hex() - save_size = readle(header[0x5A:0x5E]) - srl_save_size = readle(header[0x5E:0x62]) - title_version = TitleVersion.from_int(readbe(header[0x9C:0x9E])) - content_count = readbe(header[0x9E:0xA0]) - - content_info_records_hash = header[0xA4:0xC4] - - content_info_records_raw = fp.read(0x900) - if len(content_info_records_raw) != 0x900: - raise InvalidTMDError('Content info records length is not 0x900') - - if verify_hashes: - real_hash = sha256(content_info_records_raw) - if content_info_records_hash != real_hash.digest(): - raise InvalidHashError('Content Info Records hash is invalid') - - content_chunk_records_raw = fp.read(content_count * CHUNK_RECORD_SIZE) - - chunk_records = [] - for cr_raw in (content_chunk_records_raw[i:i + CHUNK_RECORD_SIZE] for i in - range(0, content_count * CHUNK_RECORD_SIZE, CHUNK_RECORD_SIZE)): - chunk_records.append(ContentChunkRecord(id=cr_raw[0:4].hex(), - cindex=readbe(cr_raw[4:6]), - type=ContentTypeFlags.from_int(readbe(cr_raw[6:8])), - size=readbe(cr_raw[8:16]), - hash=cr_raw[16:48])) - - info_records = [] - for ir_raw in (content_info_records_raw[i:i + 0x24] for i in range(0, 0x900, 0x24)): - if ir_raw != b'\0' * 0x24: - info_records.append(ContentInfoRecord(index_offset=readbe(ir_raw[0:2]), - command_count=readbe(ir_raw[2:4]), - hash=ir_raw[4:36])) - - if verify_hashes: - chunk_records_hashed = set() - for ir in info_records: - to_hash = [] - for cr in chunk_records[ir.index_offset:ir.index_offset + ir.command_count]: - if cr in chunk_records_hashed: - raise InvalidTMDError('attempting to hash chunk record twice') - - chunk_records_hashed.add(cr) - to_hash.append(cr) - - hashed = sha256(b''.join(bytes(x) for x in to_hash)) - if hashed.digest() != ir.hash: - raise InvalidInfoRecordError(ir) - - # unused vales are loaded only for use when re-building the binary tmd - u_issuer = header[0:0x40].decode('ascii').rstrip('\0') - u_version = header[0x40] - u_ca_crl_version = header[0x41] - u_signer_crl_version = header[0x42] - u_reserved1 = header[0x43] - u_system_version = header[0x44:0x4C] - u_title_type = header[0x54:0x58] - u_group_id = header[0x58:0x5A] - u_reserved2 = header[0x62:0x66] - u_srl_flag = header[0x66] # is this one used for anything? - u_reserved3 = header[0x67:0x98] - u_access_rights = header[0x98:0x9C] - u_boot_count = header[0xA0:0xA2] - u_padding = header[0xA2:0xA4] - - return cls(title_id=title_id, save_size=save_size, srl_save_size=srl_save_size, title_version=title_version, - info_records=info_records, chunk_records=chunk_records, signature=(sig_type, signature), - _u_issuer=u_issuer, _u_version=u_version, _u_ca_crl_version=u_ca_crl_version, - _u_signer_crl_version=u_signer_crl_version, _u_reserved1=u_reserved1, - _u_system_version=u_system_version, _u_title_type=u_title_type, _u_group_id=u_group_id, - _u_reserved2=u_reserved2, _u_srl_flag=u_srl_flag, _u_reserved3=u_reserved3, - _u_access_rights=u_access_rights, _u_boot_count=u_boot_count, _u_padding=u_padding) - - @classmethod - def from_file(cls, fn: str, *, verify_hashes: bool = True) -> 'TitleMetadataReader': - with open(fn, 'rb') as f: - return cls.load(f, verify_hashes=verify_hashes) diff --git a/pyctr/util.py b/pyctr/util.py deleted file mode 100644 index 1206d8b..0000000 --- a/pyctr/util.py +++ /dev/null @@ -1,41 +0,0 @@ -# This file is a part of ninfs. -# -# Copyright (c) 2017-2019 Ian Burgwin -# This file is licensed under The MIT License (MIT). -# You can find the full license text in LICENSE.md in the root of this project. - -import os -from math import ceil -from sys import platform -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from typing import List - -__all__ = ['windows', 'macos', 'readle', 'readbe', 'roundup', 'config_dirs'] - -windows = platform in {'win32', 'cygwin'} -macos = platform == 'darwin' - - -def readle(b: bytes) -> int: - """Convert little-endian bytes to an int.""" - return int.from_bytes(b, 'little') - - -def readbe(b: bytes) -> int: - """Convert big-endian bytes to an int.""" - return int.from_bytes(b, 'big') - - -def roundup(offset: int, alignment: int) -> int: - """Round up a number to a provided alignment.""" - return int(ceil(offset / alignment) * alignment) - - -_home = os.path.expanduser('~') -config_dirs: 'List[str]' = [os.path.join(_home, '.3ds'), os.path.join(_home, '3ds')] -if windows: - config_dirs.insert(0, os.path.join(os.environ.get('APPDATA'), '3ds')) -elif macos: - config_dirs.insert(0, os.path.join(_home, 'Library', 'Application Support', '3ds')) diff --git a/requirements.txt b/requirements.txt index 9a12844..e9e3817 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ pycryptodomex==3.9.4 events==0.3 +pyctr==0.1.0