diff --git a/README.md b/README.md index 0f6bc93..f2391e6 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ python3 custominstall.py -b boot9.bin -m movable.sed --sd /media/GM9SD file.cia ``` ## License/Credits -`pyctr/` is from [ninfs `d994c78`](https://github.com/ihaveamac/ninfs/tree/d994c78acf5ff3840df1ef5a6aabdc12ca98e806/ninfs/pyctr). +`pyctr/` is from [ninfs `795373d`](https://github.com/ihaveamac/ninfs/tree/795373db07be0cacd60215d8eccf16fe03535984/ninfs/pyctr). [save3ds by wwylele](https://github.com/wwylele/save3ds) is used to interact with the Title Database (details in `bin/README`). diff --git a/pyctr/.DS_Store b/pyctr/.DS_Store new file mode 100644 index 0000000..b1d2f03 Binary files /dev/null and b/pyctr/.DS_Store differ diff --git a/pyctr/__init__.py b/pyctr/__init__.py index bdd69fc..e69de29 100644 --- a/pyctr/__init__.py +++ b/pyctr/__init__.py @@ -1,6 +0,0 @@ -from .types.cia import * -from .types.exefs import * -from .types.ncch import * -from .types.romfs import * -from .types.smdh import * -from .types.tmd import * diff --git a/pyctr/crypto.py b/pyctr/crypto.py index 2e80505..681d438 100644 --- a/pyctr/crypto.py +++ b/pyctr/crypto.py @@ -7,6 +7,7 @@ from enum import IntEnum from functools import wraps from hashlib import sha256 +from io import BufferedIOBase from os import environ from os.path import getsize, join as pjoin from struct import pack, unpack @@ -16,7 +17,7 @@ from Cryptodome.Cipher import AES from Cryptodome.Hash import CMAC from Cryptodome.Util import Counter -from .common import PyCTRError +from .common import PyCTRError, _raise_if_closed from .util import config_dirs, readbe, readle if TYPE_CHECKING: @@ -27,7 +28,7 @@ if TYPE_CHECKING: # noinspection PyProtectedMember from Cryptodome.Cipher._mode_ecb import EcbMode from Cryptodome.Hash.CMAC import CMAC as CMACObject - from typing import Dict, List, Union + from typing import BinaryIO, Dict, List, Optional, Union __all__ = ['CryptoError', 'OTPLengthError', 'CorruptBootromError', 'KeyslotMissingError', 'TicketLengthError', 'BootromNotFoundError', 'CorruptOTPError', 'Keyslot', 'CryptoEngine'] @@ -201,6 +202,7 @@ class CryptoEngine: _b9_extdata_otp: bytes = None _b9_extdata_keygen: bytes = None + _otp_device_id: int = None _otp_key: bytes = None _otp_iv: bytes = None @@ -230,6 +232,11 @@ class CryptoEngine: def b9_extdata_keygen(self) -> bytes: return self._b9_extdata_keygen + @property + @_requires_bootrom + def otp_device_id(self) -> int: + return self._otp_device_id + @property @_requires_bootrom def otp_key(self) -> bytes: @@ -246,7 +253,7 @@ class CryptoEngine: raise KeyslotMissingError('load a movable.sed with setup_sd_key') return self._id0 - def create_cbc_cipher(self, keyslot: int, iv: bytes) -> 'CbcMode': + def create_cbc_cipher(self, keyslot: Keyslot, iv: bytes) -> 'CbcMode': """Create AES-CBC cipher with the given keyslot.""" try: key = self.key_normal[keyslot] @@ -255,9 +262,9 @@ class CryptoEngine: return AES.new(key, AES.MODE_CBC, iv) - def create_ctr_cipher(self, keyslot: int, ctr: int) -> 'Union[CtrMode, _TWLCryptoWrapper]': + def create_ctr_cipher(self, keyslot: Keyslot, ctr: int) -> 'Union[CtrMode, _TWLCryptoWrapper]': """ - Create AES-CTR cipher with the given keyslot. + Create an AES-CTR cipher with the given keyslot. Normal and DSi crypto will be automatically chosen depending on keyslot. """ @@ -273,8 +280,8 @@ class CryptoEngine: else: return cipher - def create_ecb_cipher(self, keyslot: int) -> 'EcbMode': - """Create AES-ECB cipher with the given keyslot.""" + def create_ecb_cipher(self, keyslot: Keyslot) -> 'EcbMode': + """Create an AES-ECB cipher with the given keyslot.""" try: key = self.key_normal[keyslot] except KeyError: @@ -282,7 +289,7 @@ class CryptoEngine: return AES.new(key, AES.MODE_ECB) - def create_cmac_object(self, keyslot: int) -> 'CMACObject': + def create_cmac_object(self, keyslot: Keyslot) -> 'CMACObject': """Create a CMAC object with the given keyslot.""" try: key = self.key_normal[keyslot] @@ -291,6 +298,14 @@ class CryptoEngine: return CMAC.new(key, ciphermod=AES) + def create_ctr_io(self, keyslot: Keyslot, fh: 'BinaryIO', ctr: int): + """Create an AES-CTR read-write file object with the given keyslot.""" + return CTRFileIO(fh, self, keyslot, ctr) + + def create_cbc_io(self, keyslot: Keyslot, fh: 'BinaryIO', iv: bytes): + """Create an AES-CBC read-only file object with the given keyslot.""" + return CBCFileIO(fh, self, keyslot, iv) + @staticmethod def sd_path_to_iv(path: str) -> int: # ensure the path is lowercase @@ -495,6 +510,8 @@ class CryptoEngine: otp_enc = otp otp_dec: bytes = cipher_otp.decrypt(otp) + self._otp_device_id = int.from_bytes(otp_dec[4:8], 'little') + otp_hash: bytes = otp_dec[0xE0:0x100] otp_hash_digest: bytes = sha256(otp_dec[0:0xE0]).digest() if otp_hash_digest != otp_hash: @@ -596,3 +613,135 @@ class CryptoEngine: """Set up the SD key from a movable.sed file.""" with open(path, 'rb') as f: self.setup_sd_key(f.read(0x140)) + + +class _CryptoFileBase(BufferedIOBase): + """Base class for CTR and CBC IO classes.""" + + closed = False + _reader: 'BinaryIO' + + def close(self): + self.closed = True + + __del__ = close + + @_raise_if_closed + def flush(self): + self._reader.flush() + + @_raise_if_closed + def tell(self) -> int: + return self._reader.tell() + + @_raise_if_closed + def readable(self) -> bool: + return self._reader.readable() + + @_raise_if_closed + def writable(self) -> bool: + return self._reader.writable() + + @_raise_if_closed + def seekable(self) -> bool: + return self._reader.seekable() + + +class CTRFileIO(_CryptoFileBase): + """Provides transparent read-write AES-CTR encryption as a file-like object.""" + + def __init__(self, file: 'BinaryIO', crypto: 'CryptoEngine', keyslot: Keyslot, counter: int): + self._reader = file + self._crypto = crypto + self._keyslot = keyslot + self._counter = counter + + def __repr__(self): + return f'{type(self).__name__}(file={self._reader!r}, keyslot={self._keyslot:#04x}, counter={self._counter!r})' + + @_raise_if_closed + def read(self, size: int = -1) -> bytes: + cur_offset = self.tell() + data = self._reader.read(size) + counter = self._counter + (cur_offset >> 4) + cipher = self._crypto.create_ctr_cipher(self._keyslot, counter) + # beginning padding + cipher.decrypt(b'\0' * (cur_offset % 0x10)) + return cipher.decrypt(data) + + read1 = read # probably make this act like read1 should, but this for now enables some other things to work + + @_raise_if_closed + def write(self, data: bytes) -> int: + cur_offset = self.tell() + counter = self._counter + (cur_offset >> 4) + cipher = self._crypto.create_ctr_cipher(self._keyslot, counter) + # beginning padding + cipher.encrypt(b'\0' * (cur_offset % 0x10)) + return self._reader.write(cipher.encrypt(data)) + + @_raise_if_closed + def seek(self, seek: int, whence: int = 0) -> int: + # TODO: if the seek goes past the file, the data between the former EOF and seek point should also be encrypted. + return self._reader.seek(seek, whence) + + def truncate(self, size: 'Optional[int]' = None) -> int: + return self._reader.truncate(size) + + +class CBCFileIO(_CryptoFileBase): + """Provides transparent read-only AES-CBC encryption as a file-like object.""" + + def __init__(self, file: 'BinaryIO', crypto: 'CryptoEngine', keyslot: Keyslot, iv: bytes): + self._reader = file + self._crypto = crypto + self._keyslot = keyslot + self._iv = iv + + def __repr__(self): + return f'{type(self).__name__}(file={self._reader!r}, keyslot={self._keyslot:#04x}, iv={self._iv!r})' + + @_raise_if_closed + def read(self, size: int = -1): + offset = self._reader.tell() + + # if encrypted, the block needs to be decrypted first + # CBC requires a full block (0x10 in this case). and the previous + # block is used as the IV. so that's quite a bit to read if the + # application requires just a few bytes. + # thanks Stary2001 for help with random-access crypto + + before = offset % 16 + if offset - before == 0: + iv = self._iv + else: + # seek back one block to read it as iv + self._reader.seek(-0x10 - before, 1) + iv = self._reader.read(0x10) + # this is done since we may not know the original size of the file + # and the caller may have requested -1 to read all the remaining data + data_before = self._reader.read(before) + data_requested = self._reader.read(size) + data_requested_len = len(data_requested) + data_total_len = len(data_before) + data_requested_len + if data_total_len % 16: + data_after = self._reader.read(16 - (data_total_len % 16)) + self._reader.seek(-len(data_after), 1) + else: + data_after = b'' + cipher = self._crypto.create_cbc_cipher(self._keyslot, iv) + # decrypt data, and cut off extra bytes + return cipher.decrypt( + b''.join((data_before, data_requested, data_after)) + )[before:data_requested_len + before] + + read1 = read # probably make this act like read1 should, but this for now enables some other things to work + + @_raise_if_closed + def seek(self, seek: int, whence: int = 0): + # even though read re-seeks to read required data, this allows the underlying object to handle seek how it wants + return self._reader.seek(seek, whence) + + @_raise_if_closed + def writable(self) -> bool: + return False diff --git a/pyctr/fileio.py b/pyctr/fileio.py new file mode 100644 index 0000000..992b85c --- /dev/null +++ b/pyctr/fileio.py @@ -0,0 +1,107 @@ +from io import BufferedIOBase +from threading import Lock +from weakref import WeakValueDictionary +from typing import TYPE_CHECKING + +from .common import _raise_if_closed + +if TYPE_CHECKING: + from typing import BinaryIO + +# this prevents two SubsectionIO instances on the same file object from interfering with eachother +_lock_objects = WeakValueDictionary() + + +class SubsectionIO(BufferedIOBase): + """Provides read-write access to a subsection of a file.""" + + closed = False + _seek = 0 + + def __init__(self, file: 'BinaryIO', offset: int, size: int): + # get existing Lock object for file, or create a new one + file_id = id(file) + try: + self._lock = _lock_objects[file_id] + except KeyError: + self._lock = Lock() + _lock_objects[file_id] = self._lock + + self._reader = file + self._offset = offset + self._size = size + # subsection end is stored for convenience + self._end = offset + size + + def __repr__(self): + return f'{type(self).__name__}(file={self._reader!r}, offset={self._offset!r}, size={self._size!r})' + + def close(self): + self.closed = True + # remove Lock reference, so it can be automatically removed from the WeakValueDictionary once all SubsectionIO + # instances for the base file are closed + self._lock = None + + __del__ = close + + @_raise_if_closed + def read(self, size: int = -1) -> bytes: + if size == -1: + size = self._size - self._seek + if self._offset + self._seek > self._end: + # if attempting to read after the section, return nothing + return b'' + if self._seek + size > self._size: + size = self._size - self._seek + + with self._lock: + self._reader.seek(self._seek + self._offset) + data = self._reader.read(size) + + self._seek += len(data) + return data + + @_raise_if_closed + def seek(self, seek: int, whence: int = 0) -> int: + if whence == 0: + if seek < 0: + raise ValueError(f'negative seek value {seek}') + self._seek = min(seek, self._size) + elif whence == 1: + self._seek = max(self._seek + seek, 0) + elif whence == 2: + self._seek = max(self._size + seek, 0) + else: + if not isinstance(whence, int): + raise TypeError(f'an integer is required (got type {type(whence).__name__}') + raise ValueError(f'invalid whence ({seek}, should be 0, 1 or 2)') + return self._seek + + @_raise_if_closed + def write(self, data: bytes) -> int: + if self._seek > self._size: + # attempting to write past subsection + return 0 + data_len = len(data) + data_end = data_len + self._seek + if data_end > self._size: + data = data[:-(data_end - self._size)] + + with self._lock: + self._reader.seek(self._seek + self._offset) + data_written = self._reader.write(data) + + self._seek += data_written + return data_written + + @_raise_if_closed + def readable(self) -> bool: + return self._reader.readable() + + @_raise_if_closed + def writable(self) -> bool: + return self._reader.writable() + + @_raise_if_closed + def seekable(self) -> bool: + return self._reader.seekable() diff --git a/pyctr/types/__init__.py b/pyctr/type/__init__.py similarity index 100% rename from pyctr/types/__init__.py rename to pyctr/type/__init__.py diff --git a/pyctr/types/base/title.py b/pyctr/type/base/title.py similarity index 100% rename from pyctr/types/base/title.py rename to pyctr/type/base/title.py diff --git a/pyctr/type/cci.py b/pyctr/type/cci.py new file mode 100644 index 0000000..3c6e1cf --- /dev/null +++ b/pyctr/type/cci.py @@ -0,0 +1,145 @@ +# This file is a part of ninfs. +# +# Copyright (c) 2017-2019 Ian Burgwin +# This file is licensed under The MIT License (MIT). +# You can find the full license text in LICENSE.md in the root of this project. + +from enum import IntEnum +from threading import Lock +from typing import TYPE_CHECKING, NamedTuple + +from ..common import PyCTRError +from ..fileio import SubsectionIO +from ..type.ncch import NCCHReader +from ..util import readle + +if TYPE_CHECKING: + from typing import BinaryIO, Dict, Union + +CCI_MEDIA_UNIT = 0x200 + + +class CCIError(PyCTRError): + """Generic error for CCI operations.""" + + +class InvalidCCIError(CCIError): + """Invalid CCI header exception.""" + + +class CCISection(IntEnum): + Header = -3 + CardInfo = -2 + DevInfo = -1 + + Application = 0 + Manual = 1 + DownloadPlayChild = 2 + Unk3 = 3 + Unk4 = 4 + Unk5 = 5 + UpdateOld3DS = 6 + UpdateNew3DS = 7 + + +class CCIRegion(NamedTuple): + section: 'Union[int, CCISection]' + offset: int + size: int + + +class CCIReader: + """Class for the 3DS CCI container.""" + + closed = False + + def __init__(self, fp: 'Union[str, BinaryIO]', *, case_insensitive: bool = True, dev: bool = False, + load_contents: bool = True, assume_decrypted: bool = False): + if isinstance(fp, str): + fp = open(fp, 'rb') + + # store the starting offset so the CCI can be read from any point in the base file + self._start = fp.tell() + self._fp = fp + # store case-insensitivity for RomFSReader + self._case_insensitive = case_insensitive + # threading lock + self._lock = Lock() + + # ignore the signature, we don't need it + self._fp.seek(0x100, 1) + header = fp.read(0x100) + if header[0:4] != b'NCSD': + raise InvalidCCIError('NCSD magic not found') + + # make sure the Media ID is not 00, which is used for the NAND header + self.media_id = header[0x8:0x10][::-1].hex() + if self.media_id == '00' * 8: + raise InvalidCCIError('Media ID is ' + self.media_id) + + self.image_size = readle(header[4:8]) * CCI_MEDIA_UNIT + + # this contains the location of each section + self.sections: Dict[CCISection, CCIRegion] = {} + + # this contains loaded sections + self.contents: Dict[CCISection, NCCHReader] = {} + + def add_region(section: 'CCISection', offset: int, size: int): + region = CCIRegion(section=section, offset=offset, size=size) + self.sections[section] = region + + # add each part of the header + add_region(CCISection.Header, 0, 0x200) + add_region(CCISection.CardInfo, 0x200, 0x1000) + add_region(CCISection.DevInfo, 0x1200, 0x300) + + # use a CCISection value for section keys + partition_sections = [x for x in CCISection if x >= 0] + + part_raw = header[0x20:0x60] + + # the first content always starts at 0x4000 but this code makes no assumptions about it + for idx, info_offset in enumerate(range(0, 0x40, 0x8)): + part_info = part_raw[info_offset:info_offset + 8] + part_offset = int.from_bytes(part_info[0:4], 'little') * CCI_MEDIA_UNIT + part_size = int.from_bytes(part_info[4:8], 'little') * CCI_MEDIA_UNIT + if part_offset: + section_id = partition_sections[idx] + add_region(section_id, part_offset, part_size) + + if load_contents: + content_fp = self.open_raw_section(section_id) + self.contents[section_id] = NCCHReader(content_fp, case_insensitive=case_insensitive, dev=dev, + assume_decrypted=assume_decrypted) + + def close(self): + self.closed = True + try: + self._fp.close() + except AttributeError: + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + __del__ = close + + def __repr__(self): + info = [('media_id', self.media_id)] + try: + info.append(('title_name', + repr(self.contents[CCISection.Application].exefs.icon.get_app_title().short_desc))) + except KeyError: + info.append(('title_name', 'unknown')) + info.append(('partition_count', len(self.contents))) + info_final = " ".join(x + ": " + str(y) for x, y in info) + return f'<{type(self).__name__} {info_final}>' + + def open_raw_section(self, section: 'CCISection'): + """Open a raw CCI section for reading.""" + region = self.sections[section] + return SubsectionIO(self._fp, self._start + region.offset, region.size) diff --git a/pyctr/types/cia.py b/pyctr/type/cia.py similarity index 78% rename from pyctr/types/cia.py rename to pyctr/type/cia.py index c3a11a2..f973e2a 100644 --- a/pyctr/types/cia.py +++ b/pyctr/type/cia.py @@ -9,10 +9,11 @@ from io import BytesIO from threading import Lock from typing import TYPE_CHECKING, NamedTuple -from ..common import PyCTRError, _ReaderOpenFileBase +from ..common import PyCTRError from ..crypto import CryptoEngine, Keyslot -from ..types.ncch import NCCHReader -from ..types.tmd import TitleMetadataReader +from ..fileio import SubsectionIO +from ..type.ncch import NCCHReader +from ..type.tmd import TitleMetadataReader from ..util import readle, roundup if TYPE_CHECKING: @@ -35,6 +36,9 @@ class CIASection(IntEnum): CertificateChain = -3 Ticket = -2 TitleMetadata = -1 + Application = 0 + Manual = 1 + DownloadPlayChild = 2 Meta = -5 @@ -45,14 +49,6 @@ class CIARegion(NamedTuple): iv: bytes # only used for encrypted sections -class _CIASectionFile(_ReaderOpenFileBase): - """Provides a raw CIA section as a file-like object.""" - - def __init__(self, reader: 'CIAReader', path: 'CIASection'): - super().__init__(reader, path) - self._info = reader.sections[path] - - class CIAReader: """Class for the 3DS CIA container.""" @@ -203,35 +199,8 @@ class CIAReader: def open_raw_section(self, section: 'CIASection'): """Open a raw CIA section for reading.""" - return _CIASectionFile(self, section) - - def get_data(self, region: 'CIARegion', offset: int, size: int) -> bytes: - if offset + size > region.size: - # prevent reading past the region - size = region.size - offset - - with self._lock: - if region.iv: - real_size = size - # if encrypted, the block needs to be decrypted first - # CBC requires a full block (0x10 in this case). and the previous - # block is used as the IV. so that's quite a bit to read if the - # application requires just a few bytes. - # thanks Stary2001 for help with random-access crypto - before = offset % 16 - if size % 16 != 0: - size = size + 16 - size % 16 - if offset - before == 0: - iv = region.iv - else: - self._fp.seek(self._start + region.offset + offset - before - 0x10) - iv = self._fp.read(0x10) - # read to block size - self._fp.seek(self._start + region.offset + offset - before) - # adding x10 to the size fixes some kind of decryption bug I think. this needs more testing. - return self._crypto.create_cbc_cipher(Keyslot.DecryptedTitlekey, - iv).decrypt(self._fp.read(size + 0x10))[before:real_size + before] - else: - # no encryption - self._fp.seek(self._start + region.offset + offset) - return self._fp.read(size) + region = self.sections[section] + fh = SubsectionIO(self._fp, self._start + region.offset, region.size) + if region.iv: + fh = self._crypto.create_cbc_io(Keyslot.DecryptedTitlekey, fh, region.iv) + return fh diff --git a/pyctr/types/exefs.py b/pyctr/type/exefs.py similarity index 92% rename from pyctr/types/exefs.py rename to pyctr/type/exefs.py index 0e0a89f..fe94d2e 100644 --- a/pyctr/types/exefs.py +++ b/pyctr/type/exefs.py @@ -9,8 +9,9 @@ from threading import Lock from typing import TYPE_CHECKING, NamedTuple from ..common import PyCTRError, _ReaderOpenFileBase +from ..fileio import SubsectionIO from ..util import readle -from ..types.smdh import SMDH, InvalidSMDHError +from ..type.smdh import SMDH, InvalidSMDHError if TYPE_CHECKING: from typing import BinaryIO, Dict, Union @@ -165,10 +166,7 @@ class _ExeFSOpenFile(_ReaderOpenFileBase): def __init__(self, reader: 'ExeFSReader', path: str): super().__init__(reader, path) - try: - self._info = reader.entries[self._path] - except KeyError: - raise ExeFSFileNotFoundError(self._path) + self._info = reader.entries[self._path] class ExeFSReader: @@ -182,7 +180,7 @@ class ExeFSReader: _code_dec = None icon: 'SMDH' = None - def __init__(self, fp: 'Union[str, BinaryIO]', *, _load_icon: bool = True): + def __init__(self, fp: 'Union[str, BinaryIO]', *, closefd: bool = True, _load_icon: bool = True): if isinstance(fp, str): fp = open(fp, 'rb') @@ -190,6 +188,7 @@ class ExeFSReader: self._start = fp.tell() self._fp = fp self._lock = Lock() + self._closefd = closefd self.entries: 'Dict[str, ExeFSEntry]' = {} @@ -232,7 +231,7 @@ class ExeFSReader: try: with self.open('icon') as f: self.icon = SMDH.load(f) - except InvalidSMDHError: + except (ExeFSFileNotFoundError, InvalidSMDHError): pass def __len__(self) -> int: @@ -241,10 +240,11 @@ class ExeFSReader: def close(self): self.closed = True - try: - self._fp.close() - except AttributeError: - pass + if self._closefd: + try: + self._fp.close() + except AttributeError: + pass __del__ = close @@ -259,7 +259,15 @@ class ExeFSReader: if normalize: # remove beginning "/" and ending ".bin" path = _normalize_path(path) - return _ExeFSOpenFile(self, path) + try: + entry = self.entries[path] + except KeyError: + raise ExeFSFileNotFoundError(path) + if entry.offset == -1: + # this would be the decompressed .code, if the original .code was compressed + return _ExeFSOpenFile(self, path) + else: + return SubsectionIO(self._fp, self._start + EXEFS_HEADER_SIZE + entry.offset, entry.size) def get_data(self, info: ExeFSEntry, offset: int, size: int) -> bytes: if offset + size > info.size: diff --git a/pyctr/types/ncch.py b/pyctr/type/ncch.py similarity index 94% rename from pyctr/types/ncch.py rename to pyctr/type/ncch.py index 92b6f56..10e0efa 100644 --- a/pyctr/types/ncch.py +++ b/pyctr/type/ncch.py @@ -15,8 +15,9 @@ from typing import TYPE_CHECKING, NamedTuple from .exefs import ExeFSReader, EXEFS_HEADER_SIZE from .romfs import RomFSReader from ..common import PyCTRError, _ReaderOpenFileBase -from ..util import config_dirs, readle, roundup from ..crypto import CryptoEngine, Keyslot +from ..fileio import SubsectionIO +from ..util import config_dirs, readle, roundup if TYPE_CHECKING: from typing import BinaryIO, Dict, List, Optional, Tuple, Union @@ -150,7 +151,7 @@ class NCCHReader: romfs: 'Optional[RomFSReader]' = None def __init__(self, fp: 'Union[str, BinaryIO]', *, case_insensitive: bool = True, crypto: CryptoEngine = None, - dev: bool = False, seeddb: str = None, load_sections: bool = True): + dev: bool = False, seeddb: str = None, load_sections: bool = True, assume_decrypted: bool = False): if isinstance(fp, str): fp = open(fp, 'rb') @@ -159,6 +160,9 @@ class NCCHReader: else: self._crypto = CryptoEngine(dev=dev) + # old decryption methods did not fix the flags, so sometimes we have to assume it is decrypted + self.assume_decrypted = assume_decrypted + # store the starting offset so the NCCH can be read from any point in the base file self._start = fp.tell() self._fp = fp @@ -308,7 +312,17 @@ class NCCHReader: def open_raw_section(self, section: 'NCCHSection'): """Open a raw NCCH section for reading.""" - return _NCCHSectionFile(self, section) + # check if the region is ExeFS and uses a newer keyslot, or is fulldec, and use a specific file class + if (section == NCCHSection.ExeFS and self.extra_keyslot) or (section == NCCHSection.FullDecrypted): + return _NCCHSectionFile(self, section) + else: + region = self.sections[section] + fh = SubsectionIO(self._fp, self._start + region.offset, region.size) + # if the region is encrypted (not ExeFS if an extra keyslot is in use), wrap it in CTRFileIO + if not (self.assume_decrypted or self.flags.no_crypto or section in NO_ENCRYPTION): + keyslot = self.extra_keyslot if region.section == NCCHSection.RomFS else Keyslot.NCCH + fh = self._crypto.create_ctr_io(keyslot, fh, region.iv) + return fh def get_key_y(self, original: bool = False) -> bytes: if original or not self.flags.uses_seed: @@ -447,7 +461,8 @@ class NCCHReader: with self._lock: # check if decryption is really needed - if self.flags.no_crypto or region.section in NO_ENCRYPTION: + if self.assume_decrypted or self.flags.no_crypto or region.section in NO_ENCRYPTION: + # this is currently used to support FullDecrypted. other sections use SubsectionIO + CTRFileIO. self._fp.seek(self._start + region.offset + offset) return self._fp.read(size) @@ -503,6 +518,8 @@ class NCCHReader: # join all the chunks into one bytes result and return it return b''.join(do_thing(aligned_offset, aligned_size, before, 0x200 - ((size + before) % 0x200))) else: + # this is currently used to support FullDecrypted. other sections use SubsectionIO + CTRFileIO. + # seek to the real offset of the section + the requested offset self._fp.seek(self._start + region.offset + offset) data = self._fp.read(size) diff --git a/pyctr/types/romfs.py b/pyctr/type/romfs.py similarity index 92% rename from pyctr/types/romfs.py rename to pyctr/type/romfs.py index 6d25125..760208a 100644 --- a/pyctr/types/romfs.py +++ b/pyctr/type/romfs.py @@ -9,6 +9,7 @@ from threading import Lock from typing import overload, TYPE_CHECKING, NamedTuple from ..common import PyCTRError, _ReaderOpenFileBase +from ..fileio import SubsectionIO from ..util import readle, roundup if TYPE_CHECKING: @@ -64,16 +65,6 @@ class RomFSFileEntry(NamedTuple): size: int -class _RomFSOpenFile(_ReaderOpenFileBase): - """Class for open RomFS file entries.""" - - def __init__(self, reader: 'RomFSReader', path: str): - super().__init__(reader, path) - self._info: RomFSFileEntry = reader.get_info_from_path(path) - if not isinstance(self._info, RomFSFileEntry): - raise RomFSIsADirectoryError(path) - - class RomFSReader: """ Class for 3DS RomFS Level 3 partition. @@ -206,11 +197,14 @@ class RomFSReader: @overload def open(self, path: str, encoding: None = None, errors: 'Optional[str]' = None, - newline: 'Optional[str]' = None) -> _RomFSOpenFile: ... + newline: 'Optional[str]' = None) -> SubsectionIO: ... def open(self, path, encoding=None, errors=None, newline=None): """Open a file in the RomFS for reading.""" - f = _RomFSOpenFile(self, path) + file_info = self.get_info_from_path(path) + if not isinstance(file_info, RomFSFileEntry): + raise RomFSIsADirectoryError(path) + f = SubsectionIO(self._fp, self._start + self.data_offset + file_info.offset, file_info.size) if encoding is not None: f = TextIOWrapper(f, encoding, errors, newline) return f @@ -237,10 +231,3 @@ class RomFSReader: return RomFSDirectoryEntry(name=curr['name'], type='dir', contents=(*contents,)) elif curr['type'] == 'file': return RomFSFileEntry(name=curr['name'], type='file', offset=curr['offset'], size=curr['size']) - - def get_data(self, info: RomFSFileEntry, offset: int, size: int) -> bytes: - if offset + size > info.size: - size = info.size - offset - with self._lock: - self._fp.seek(self._start + self.data_offset + info.offset + offset) - return self._fp.read(size) diff --git a/pyctr/types/smdh.py b/pyctr/type/smdh.py similarity index 100% rename from pyctr/types/smdh.py rename to pyctr/type/smdh.py diff --git a/pyctr/types/tmd.py b/pyctr/type/tmd.py similarity index 100% rename from pyctr/types/tmd.py rename to pyctr/type/tmd.py diff --git a/pyctr/types/extheader.py b/pyctr/types/extheader.py deleted file mode 100644 index ceea160..0000000 --- a/pyctr/types/extheader.py +++ /dev/null @@ -1,9 +0,0 @@ -# This file is a part of ninfs. -# -# Copyright (c) 2017-2019 Ian Burgwin -# This file is licensed under The MIT License (MIT). -# You can find the full license text in LICENSE.md in the root of this project. - - -class ExtendedHeaderReader: - def __init__(self):