mirror of
https://github.com/ihaveamac/custom-install.git
synced 2025-12-06 06:41:45 +00:00
pyctr: update
This commit is contained in:
@@ -59,7 +59,7 @@ python3 custominstall.py -b boot9.bin -m movable.sed --sd /media/GM9SD file.cia
|
||||
```
|
||||
|
||||
## License/Credits
|
||||
`pyctr/` is from [ninfs `d994c78`](https://github.com/ihaveamac/ninfs/tree/d994c78acf5ff3840df1ef5a6aabdc12ca98e806/ninfs/pyctr).
|
||||
`pyctr/` is from [ninfs `795373d`](https://github.com/ihaveamac/ninfs/tree/795373db07be0cacd60215d8eccf16fe03535984/ninfs/pyctr).
|
||||
|
||||
[save3ds by wwylele](https://github.com/wwylele/save3ds) is used to interact with the Title Database (details in `bin/README`).
|
||||
|
||||
|
||||
BIN
pyctr/.DS_Store
vendored
Normal file
BIN
pyctr/.DS_Store
vendored
Normal file
Binary file not shown.
@@ -1,6 +0,0 @@
|
||||
from .types.cia import *
|
||||
from .types.exefs import *
|
||||
from .types.ncch import *
|
||||
from .types.romfs import *
|
||||
from .types.smdh import *
|
||||
from .types.tmd import *
|
||||
|
||||
165
pyctr/crypto.py
165
pyctr/crypto.py
@@ -7,6 +7,7 @@
|
||||
from enum import IntEnum
|
||||
from functools import wraps
|
||||
from hashlib import sha256
|
||||
from io import BufferedIOBase
|
||||
from os import environ
|
||||
from os.path import getsize, join as pjoin
|
||||
from struct import pack, unpack
|
||||
@@ -16,7 +17,7 @@ from Cryptodome.Cipher import AES
|
||||
from Cryptodome.Hash import CMAC
|
||||
from Cryptodome.Util import Counter
|
||||
|
||||
from .common import PyCTRError
|
||||
from .common import PyCTRError, _raise_if_closed
|
||||
from .util import config_dirs, readbe, readle
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -27,7 +28,7 @@ if TYPE_CHECKING:
|
||||
# noinspection PyProtectedMember
|
||||
from Cryptodome.Cipher._mode_ecb import EcbMode
|
||||
from Cryptodome.Hash.CMAC import CMAC as CMACObject
|
||||
from typing import Dict, List, Union
|
||||
from typing import BinaryIO, Dict, List, Optional, Union
|
||||
|
||||
__all__ = ['CryptoError', 'OTPLengthError', 'CorruptBootromError', 'KeyslotMissingError', 'TicketLengthError',
|
||||
'BootromNotFoundError', 'CorruptOTPError', 'Keyslot', 'CryptoEngine']
|
||||
@@ -201,6 +202,7 @@ class CryptoEngine:
|
||||
_b9_extdata_otp: bytes = None
|
||||
_b9_extdata_keygen: bytes = None
|
||||
|
||||
_otp_device_id: int = None
|
||||
_otp_key: bytes = None
|
||||
_otp_iv: bytes = None
|
||||
|
||||
@@ -230,6 +232,11 @@ class CryptoEngine:
|
||||
def b9_extdata_keygen(self) -> bytes:
|
||||
return self._b9_extdata_keygen
|
||||
|
||||
@property
|
||||
@_requires_bootrom
|
||||
def otp_device_id(self) -> int:
|
||||
return self._otp_device_id
|
||||
|
||||
@property
|
||||
@_requires_bootrom
|
||||
def otp_key(self) -> bytes:
|
||||
@@ -246,7 +253,7 @@ class CryptoEngine:
|
||||
raise KeyslotMissingError('load a movable.sed with setup_sd_key')
|
||||
return self._id0
|
||||
|
||||
def create_cbc_cipher(self, keyslot: int, iv: bytes) -> 'CbcMode':
|
||||
def create_cbc_cipher(self, keyslot: Keyslot, iv: bytes) -> 'CbcMode':
|
||||
"""Create AES-CBC cipher with the given keyslot."""
|
||||
try:
|
||||
key = self.key_normal[keyslot]
|
||||
@@ -255,9 +262,9 @@ class CryptoEngine:
|
||||
|
||||
return AES.new(key, AES.MODE_CBC, iv)
|
||||
|
||||
def create_ctr_cipher(self, keyslot: int, ctr: int) -> 'Union[CtrMode, _TWLCryptoWrapper]':
|
||||
def create_ctr_cipher(self, keyslot: Keyslot, ctr: int) -> 'Union[CtrMode, _TWLCryptoWrapper]':
|
||||
"""
|
||||
Create AES-CTR cipher with the given keyslot.
|
||||
Create an AES-CTR cipher with the given keyslot.
|
||||
|
||||
Normal and DSi crypto will be automatically chosen depending on keyslot.
|
||||
"""
|
||||
@@ -273,8 +280,8 @@ class CryptoEngine:
|
||||
else:
|
||||
return cipher
|
||||
|
||||
def create_ecb_cipher(self, keyslot: int) -> 'EcbMode':
|
||||
"""Create AES-ECB cipher with the given keyslot."""
|
||||
def create_ecb_cipher(self, keyslot: Keyslot) -> 'EcbMode':
|
||||
"""Create an AES-ECB cipher with the given keyslot."""
|
||||
try:
|
||||
key = self.key_normal[keyslot]
|
||||
except KeyError:
|
||||
@@ -282,7 +289,7 @@ class CryptoEngine:
|
||||
|
||||
return AES.new(key, AES.MODE_ECB)
|
||||
|
||||
def create_cmac_object(self, keyslot: int) -> 'CMACObject':
|
||||
def create_cmac_object(self, keyslot: Keyslot) -> 'CMACObject':
|
||||
"""Create a CMAC object with the given keyslot."""
|
||||
try:
|
||||
key = self.key_normal[keyslot]
|
||||
@@ -291,6 +298,14 @@ class CryptoEngine:
|
||||
|
||||
return CMAC.new(key, ciphermod=AES)
|
||||
|
||||
def create_ctr_io(self, keyslot: Keyslot, fh: 'BinaryIO', ctr: int):
|
||||
"""Create an AES-CTR read-write file object with the given keyslot."""
|
||||
return CTRFileIO(fh, self, keyslot, ctr)
|
||||
|
||||
def create_cbc_io(self, keyslot: Keyslot, fh: 'BinaryIO', iv: bytes):
|
||||
"""Create an AES-CBC read-only file object with the given keyslot."""
|
||||
return CBCFileIO(fh, self, keyslot, iv)
|
||||
|
||||
@staticmethod
|
||||
def sd_path_to_iv(path: str) -> int:
|
||||
# ensure the path is lowercase
|
||||
@@ -495,6 +510,8 @@ class CryptoEngine:
|
||||
otp_enc = otp
|
||||
otp_dec: bytes = cipher_otp.decrypt(otp)
|
||||
|
||||
self._otp_device_id = int.from_bytes(otp_dec[4:8], 'little')
|
||||
|
||||
otp_hash: bytes = otp_dec[0xE0:0x100]
|
||||
otp_hash_digest: bytes = sha256(otp_dec[0:0xE0]).digest()
|
||||
if otp_hash_digest != otp_hash:
|
||||
@@ -596,3 +613,135 @@ class CryptoEngine:
|
||||
"""Set up the SD key from a movable.sed file."""
|
||||
with open(path, 'rb') as f:
|
||||
self.setup_sd_key(f.read(0x140))
|
||||
|
||||
|
||||
class _CryptoFileBase(BufferedIOBase):
|
||||
"""Base class for CTR and CBC IO classes."""
|
||||
|
||||
closed = False
|
||||
_reader: 'BinaryIO'
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
__del__ = close
|
||||
|
||||
@_raise_if_closed
|
||||
def flush(self):
|
||||
self._reader.flush()
|
||||
|
||||
@_raise_if_closed
|
||||
def tell(self) -> int:
|
||||
return self._reader.tell()
|
||||
|
||||
@_raise_if_closed
|
||||
def readable(self) -> bool:
|
||||
return self._reader.readable()
|
||||
|
||||
@_raise_if_closed
|
||||
def writable(self) -> bool:
|
||||
return self._reader.writable()
|
||||
|
||||
@_raise_if_closed
|
||||
def seekable(self) -> bool:
|
||||
return self._reader.seekable()
|
||||
|
||||
|
||||
class CTRFileIO(_CryptoFileBase):
|
||||
"""Provides transparent read-write AES-CTR encryption as a file-like object."""
|
||||
|
||||
def __init__(self, file: 'BinaryIO', crypto: 'CryptoEngine', keyslot: Keyslot, counter: int):
|
||||
self._reader = file
|
||||
self._crypto = crypto
|
||||
self._keyslot = keyslot
|
||||
self._counter = counter
|
||||
|
||||
def __repr__(self):
|
||||
return f'{type(self).__name__}(file={self._reader!r}, keyslot={self._keyslot:#04x}, counter={self._counter!r})'
|
||||
|
||||
@_raise_if_closed
|
||||
def read(self, size: int = -1) -> bytes:
|
||||
cur_offset = self.tell()
|
||||
data = self._reader.read(size)
|
||||
counter = self._counter + (cur_offset >> 4)
|
||||
cipher = self._crypto.create_ctr_cipher(self._keyslot, counter)
|
||||
# beginning padding
|
||||
cipher.decrypt(b'\0' * (cur_offset % 0x10))
|
||||
return cipher.decrypt(data)
|
||||
|
||||
read1 = read # probably make this act like read1 should, but this for now enables some other things to work
|
||||
|
||||
@_raise_if_closed
|
||||
def write(self, data: bytes) -> int:
|
||||
cur_offset = self.tell()
|
||||
counter = self._counter + (cur_offset >> 4)
|
||||
cipher = self._crypto.create_ctr_cipher(self._keyslot, counter)
|
||||
# beginning padding
|
||||
cipher.encrypt(b'\0' * (cur_offset % 0x10))
|
||||
return self._reader.write(cipher.encrypt(data))
|
||||
|
||||
@_raise_if_closed
|
||||
def seek(self, seek: int, whence: int = 0) -> int:
|
||||
# TODO: if the seek goes past the file, the data between the former EOF and seek point should also be encrypted.
|
||||
return self._reader.seek(seek, whence)
|
||||
|
||||
def truncate(self, size: 'Optional[int]' = None) -> int:
|
||||
return self._reader.truncate(size)
|
||||
|
||||
|
||||
class CBCFileIO(_CryptoFileBase):
|
||||
"""Provides transparent read-only AES-CBC encryption as a file-like object."""
|
||||
|
||||
def __init__(self, file: 'BinaryIO', crypto: 'CryptoEngine', keyslot: Keyslot, iv: bytes):
|
||||
self._reader = file
|
||||
self._crypto = crypto
|
||||
self._keyslot = keyslot
|
||||
self._iv = iv
|
||||
|
||||
def __repr__(self):
|
||||
return f'{type(self).__name__}(file={self._reader!r}, keyslot={self._keyslot:#04x}, iv={self._iv!r})'
|
||||
|
||||
@_raise_if_closed
|
||||
def read(self, size: int = -1):
|
||||
offset = self._reader.tell()
|
||||
|
||||
# if encrypted, the block needs to be decrypted first
|
||||
# CBC requires a full block (0x10 in this case). and the previous
|
||||
# block is used as the IV. so that's quite a bit to read if the
|
||||
# application requires just a few bytes.
|
||||
# thanks Stary2001 for help with random-access crypto
|
||||
|
||||
before = offset % 16
|
||||
if offset - before == 0:
|
||||
iv = self._iv
|
||||
else:
|
||||
# seek back one block to read it as iv
|
||||
self._reader.seek(-0x10 - before, 1)
|
||||
iv = self._reader.read(0x10)
|
||||
# this is done since we may not know the original size of the file
|
||||
# and the caller may have requested -1 to read all the remaining data
|
||||
data_before = self._reader.read(before)
|
||||
data_requested = self._reader.read(size)
|
||||
data_requested_len = len(data_requested)
|
||||
data_total_len = len(data_before) + data_requested_len
|
||||
if data_total_len % 16:
|
||||
data_after = self._reader.read(16 - (data_total_len % 16))
|
||||
self._reader.seek(-len(data_after), 1)
|
||||
else:
|
||||
data_after = b''
|
||||
cipher = self._crypto.create_cbc_cipher(self._keyslot, iv)
|
||||
# decrypt data, and cut off extra bytes
|
||||
return cipher.decrypt(
|
||||
b''.join((data_before, data_requested, data_after))
|
||||
)[before:data_requested_len + before]
|
||||
|
||||
read1 = read # probably make this act like read1 should, but this for now enables some other things to work
|
||||
|
||||
@_raise_if_closed
|
||||
def seek(self, seek: int, whence: int = 0):
|
||||
# even though read re-seeks to read required data, this allows the underlying object to handle seek how it wants
|
||||
return self._reader.seek(seek, whence)
|
||||
|
||||
@_raise_if_closed
|
||||
def writable(self) -> bool:
|
||||
return False
|
||||
|
||||
107
pyctr/fileio.py
Normal file
107
pyctr/fileio.py
Normal file
@@ -0,0 +1,107 @@
|
||||
from io import BufferedIOBase
|
||||
from threading import Lock
|
||||
from weakref import WeakValueDictionary
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from .common import _raise_if_closed
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import BinaryIO
|
||||
|
||||
# this prevents two SubsectionIO instances on the same file object from interfering with eachother
|
||||
_lock_objects = WeakValueDictionary()
|
||||
|
||||
|
||||
class SubsectionIO(BufferedIOBase):
|
||||
"""Provides read-write access to a subsection of a file."""
|
||||
|
||||
closed = False
|
||||
_seek = 0
|
||||
|
||||
def __init__(self, file: 'BinaryIO', offset: int, size: int):
|
||||
# get existing Lock object for file, or create a new one
|
||||
file_id = id(file)
|
||||
try:
|
||||
self._lock = _lock_objects[file_id]
|
||||
except KeyError:
|
||||
self._lock = Lock()
|
||||
_lock_objects[file_id] = self._lock
|
||||
|
||||
self._reader = file
|
||||
self._offset = offset
|
||||
self._size = size
|
||||
# subsection end is stored for convenience
|
||||
self._end = offset + size
|
||||
|
||||
def __repr__(self):
|
||||
return f'{type(self).__name__}(file={self._reader!r}, offset={self._offset!r}, size={self._size!r})'
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
# remove Lock reference, so it can be automatically removed from the WeakValueDictionary once all SubsectionIO
|
||||
# instances for the base file are closed
|
||||
self._lock = None
|
||||
|
||||
__del__ = close
|
||||
|
||||
@_raise_if_closed
|
||||
def read(self, size: int = -1) -> bytes:
|
||||
if size == -1:
|
||||
size = self._size - self._seek
|
||||
if self._offset + self._seek > self._end:
|
||||
# if attempting to read after the section, return nothing
|
||||
return b''
|
||||
if self._seek + size > self._size:
|
||||
size = self._size - self._seek
|
||||
|
||||
with self._lock:
|
||||
self._reader.seek(self._seek + self._offset)
|
||||
data = self._reader.read(size)
|
||||
|
||||
self._seek += len(data)
|
||||
return data
|
||||
|
||||
@_raise_if_closed
|
||||
def seek(self, seek: int, whence: int = 0) -> int:
|
||||
if whence == 0:
|
||||
if seek < 0:
|
||||
raise ValueError(f'negative seek value {seek}')
|
||||
self._seek = min(seek, self._size)
|
||||
elif whence == 1:
|
||||
self._seek = max(self._seek + seek, 0)
|
||||
elif whence == 2:
|
||||
self._seek = max(self._size + seek, 0)
|
||||
else:
|
||||
if not isinstance(whence, int):
|
||||
raise TypeError(f'an integer is required (got type {type(whence).__name__}')
|
||||
raise ValueError(f'invalid whence ({seek}, should be 0, 1 or 2)')
|
||||
return self._seek
|
||||
|
||||
@_raise_if_closed
|
||||
def write(self, data: bytes) -> int:
|
||||
if self._seek > self._size:
|
||||
# attempting to write past subsection
|
||||
return 0
|
||||
data_len = len(data)
|
||||
data_end = data_len + self._seek
|
||||
if data_end > self._size:
|
||||
data = data[:-(data_end - self._size)]
|
||||
|
||||
with self._lock:
|
||||
self._reader.seek(self._seek + self._offset)
|
||||
data_written = self._reader.write(data)
|
||||
|
||||
self._seek += data_written
|
||||
return data_written
|
||||
|
||||
@_raise_if_closed
|
||||
def readable(self) -> bool:
|
||||
return self._reader.readable()
|
||||
|
||||
@_raise_if_closed
|
||||
def writable(self) -> bool:
|
||||
return self._reader.writable()
|
||||
|
||||
@_raise_if_closed
|
||||
def seekable(self) -> bool:
|
||||
return self._reader.seekable()
|
||||
145
pyctr/type/cci.py
Normal file
145
pyctr/type/cci.py
Normal file
@@ -0,0 +1,145 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
from enum import IntEnum
|
||||
from threading import Lock
|
||||
from typing import TYPE_CHECKING, NamedTuple
|
||||
|
||||
from ..common import PyCTRError
|
||||
from ..fileio import SubsectionIO
|
||||
from ..type.ncch import NCCHReader
|
||||
from ..util import readle
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import BinaryIO, Dict, Union
|
||||
|
||||
CCI_MEDIA_UNIT = 0x200
|
||||
|
||||
|
||||
class CCIError(PyCTRError):
|
||||
"""Generic error for CCI operations."""
|
||||
|
||||
|
||||
class InvalidCCIError(CCIError):
|
||||
"""Invalid CCI header exception."""
|
||||
|
||||
|
||||
class CCISection(IntEnum):
|
||||
Header = -3
|
||||
CardInfo = -2
|
||||
DevInfo = -1
|
||||
|
||||
Application = 0
|
||||
Manual = 1
|
||||
DownloadPlayChild = 2
|
||||
Unk3 = 3
|
||||
Unk4 = 4
|
||||
Unk5 = 5
|
||||
UpdateOld3DS = 6
|
||||
UpdateNew3DS = 7
|
||||
|
||||
|
||||
class CCIRegion(NamedTuple):
|
||||
section: 'Union[int, CCISection]'
|
||||
offset: int
|
||||
size: int
|
||||
|
||||
|
||||
class CCIReader:
|
||||
"""Class for the 3DS CCI container."""
|
||||
|
||||
closed = False
|
||||
|
||||
def __init__(self, fp: 'Union[str, BinaryIO]', *, case_insensitive: bool = True, dev: bool = False,
|
||||
load_contents: bool = True, assume_decrypted: bool = False):
|
||||
if isinstance(fp, str):
|
||||
fp = open(fp, 'rb')
|
||||
|
||||
# store the starting offset so the CCI can be read from any point in the base file
|
||||
self._start = fp.tell()
|
||||
self._fp = fp
|
||||
# store case-insensitivity for RomFSReader
|
||||
self._case_insensitive = case_insensitive
|
||||
# threading lock
|
||||
self._lock = Lock()
|
||||
|
||||
# ignore the signature, we don't need it
|
||||
self._fp.seek(0x100, 1)
|
||||
header = fp.read(0x100)
|
||||
if header[0:4] != b'NCSD':
|
||||
raise InvalidCCIError('NCSD magic not found')
|
||||
|
||||
# make sure the Media ID is not 00, which is used for the NAND header
|
||||
self.media_id = header[0x8:0x10][::-1].hex()
|
||||
if self.media_id == '00' * 8:
|
||||
raise InvalidCCIError('Media ID is ' + self.media_id)
|
||||
|
||||
self.image_size = readle(header[4:8]) * CCI_MEDIA_UNIT
|
||||
|
||||
# this contains the location of each section
|
||||
self.sections: Dict[CCISection, CCIRegion] = {}
|
||||
|
||||
# this contains loaded sections
|
||||
self.contents: Dict[CCISection, NCCHReader] = {}
|
||||
|
||||
def add_region(section: 'CCISection', offset: int, size: int):
|
||||
region = CCIRegion(section=section, offset=offset, size=size)
|
||||
self.sections[section] = region
|
||||
|
||||
# add each part of the header
|
||||
add_region(CCISection.Header, 0, 0x200)
|
||||
add_region(CCISection.CardInfo, 0x200, 0x1000)
|
||||
add_region(CCISection.DevInfo, 0x1200, 0x300)
|
||||
|
||||
# use a CCISection value for section keys
|
||||
partition_sections = [x for x in CCISection if x >= 0]
|
||||
|
||||
part_raw = header[0x20:0x60]
|
||||
|
||||
# the first content always starts at 0x4000 but this code makes no assumptions about it
|
||||
for idx, info_offset in enumerate(range(0, 0x40, 0x8)):
|
||||
part_info = part_raw[info_offset:info_offset + 8]
|
||||
part_offset = int.from_bytes(part_info[0:4], 'little') * CCI_MEDIA_UNIT
|
||||
part_size = int.from_bytes(part_info[4:8], 'little') * CCI_MEDIA_UNIT
|
||||
if part_offset:
|
||||
section_id = partition_sections[idx]
|
||||
add_region(section_id, part_offset, part_size)
|
||||
|
||||
if load_contents:
|
||||
content_fp = self.open_raw_section(section_id)
|
||||
self.contents[section_id] = NCCHReader(content_fp, case_insensitive=case_insensitive, dev=dev,
|
||||
assume_decrypted=assume_decrypted)
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
try:
|
||||
self._fp.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.close()
|
||||
|
||||
__del__ = close
|
||||
|
||||
def __repr__(self):
|
||||
info = [('media_id', self.media_id)]
|
||||
try:
|
||||
info.append(('title_name',
|
||||
repr(self.contents[CCISection.Application].exefs.icon.get_app_title().short_desc)))
|
||||
except KeyError:
|
||||
info.append(('title_name', 'unknown'))
|
||||
info.append(('partition_count', len(self.contents)))
|
||||
info_final = " ".join(x + ": " + str(y) for x, y in info)
|
||||
return f'<{type(self).__name__} {info_final}>'
|
||||
|
||||
def open_raw_section(self, section: 'CCISection'):
|
||||
"""Open a raw CCI section for reading."""
|
||||
region = self.sections[section]
|
||||
return SubsectionIO(self._fp, self._start + region.offset, region.size)
|
||||
@@ -9,10 +9,11 @@ from io import BytesIO
|
||||
from threading import Lock
|
||||
from typing import TYPE_CHECKING, NamedTuple
|
||||
|
||||
from ..common import PyCTRError, _ReaderOpenFileBase
|
||||
from ..common import PyCTRError
|
||||
from ..crypto import CryptoEngine, Keyslot
|
||||
from ..types.ncch import NCCHReader
|
||||
from ..types.tmd import TitleMetadataReader
|
||||
from ..fileio import SubsectionIO
|
||||
from ..type.ncch import NCCHReader
|
||||
from ..type.tmd import TitleMetadataReader
|
||||
from ..util import readle, roundup
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -35,6 +36,9 @@ class CIASection(IntEnum):
|
||||
CertificateChain = -3
|
||||
Ticket = -2
|
||||
TitleMetadata = -1
|
||||
Application = 0
|
||||
Manual = 1
|
||||
DownloadPlayChild = 2
|
||||
Meta = -5
|
||||
|
||||
|
||||
@@ -45,14 +49,6 @@ class CIARegion(NamedTuple):
|
||||
iv: bytes # only used for encrypted sections
|
||||
|
||||
|
||||
class _CIASectionFile(_ReaderOpenFileBase):
|
||||
"""Provides a raw CIA section as a file-like object."""
|
||||
|
||||
def __init__(self, reader: 'CIAReader', path: 'CIASection'):
|
||||
super().__init__(reader, path)
|
||||
self._info = reader.sections[path]
|
||||
|
||||
|
||||
class CIAReader:
|
||||
"""Class for the 3DS CIA container."""
|
||||
|
||||
@@ -203,35 +199,8 @@ class CIAReader:
|
||||
|
||||
def open_raw_section(self, section: 'CIASection'):
|
||||
"""Open a raw CIA section for reading."""
|
||||
return _CIASectionFile(self, section)
|
||||
|
||||
def get_data(self, region: 'CIARegion', offset: int, size: int) -> bytes:
|
||||
if offset + size > region.size:
|
||||
# prevent reading past the region
|
||||
size = region.size - offset
|
||||
|
||||
with self._lock:
|
||||
region = self.sections[section]
|
||||
fh = SubsectionIO(self._fp, self._start + region.offset, region.size)
|
||||
if region.iv:
|
||||
real_size = size
|
||||
# if encrypted, the block needs to be decrypted first
|
||||
# CBC requires a full block (0x10 in this case). and the previous
|
||||
# block is used as the IV. so that's quite a bit to read if the
|
||||
# application requires just a few bytes.
|
||||
# thanks Stary2001 for help with random-access crypto
|
||||
before = offset % 16
|
||||
if size % 16 != 0:
|
||||
size = size + 16 - size % 16
|
||||
if offset - before == 0:
|
||||
iv = region.iv
|
||||
else:
|
||||
self._fp.seek(self._start + region.offset + offset - before - 0x10)
|
||||
iv = self._fp.read(0x10)
|
||||
# read to block size
|
||||
self._fp.seek(self._start + region.offset + offset - before)
|
||||
# adding x10 to the size fixes some kind of decryption bug I think. this needs more testing.
|
||||
return self._crypto.create_cbc_cipher(Keyslot.DecryptedTitlekey,
|
||||
iv).decrypt(self._fp.read(size + 0x10))[before:real_size + before]
|
||||
else:
|
||||
# no encryption
|
||||
self._fp.seek(self._start + region.offset + offset)
|
||||
return self._fp.read(size)
|
||||
fh = self._crypto.create_cbc_io(Keyslot.DecryptedTitlekey, fh, region.iv)
|
||||
return fh
|
||||
@@ -9,8 +9,9 @@ from threading import Lock
|
||||
from typing import TYPE_CHECKING, NamedTuple
|
||||
|
||||
from ..common import PyCTRError, _ReaderOpenFileBase
|
||||
from ..fileio import SubsectionIO
|
||||
from ..util import readle
|
||||
from ..types.smdh import SMDH, InvalidSMDHError
|
||||
from ..type.smdh import SMDH, InvalidSMDHError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import BinaryIO, Dict, Union
|
||||
@@ -165,10 +166,7 @@ class _ExeFSOpenFile(_ReaderOpenFileBase):
|
||||
|
||||
def __init__(self, reader: 'ExeFSReader', path: str):
|
||||
super().__init__(reader, path)
|
||||
try:
|
||||
self._info = reader.entries[self._path]
|
||||
except KeyError:
|
||||
raise ExeFSFileNotFoundError(self._path)
|
||||
|
||||
|
||||
class ExeFSReader:
|
||||
@@ -182,7 +180,7 @@ class ExeFSReader:
|
||||
_code_dec = None
|
||||
icon: 'SMDH' = None
|
||||
|
||||
def __init__(self, fp: 'Union[str, BinaryIO]', *, _load_icon: bool = True):
|
||||
def __init__(self, fp: 'Union[str, BinaryIO]', *, closefd: bool = True, _load_icon: bool = True):
|
||||
if isinstance(fp, str):
|
||||
fp = open(fp, 'rb')
|
||||
|
||||
@@ -190,6 +188,7 @@ class ExeFSReader:
|
||||
self._start = fp.tell()
|
||||
self._fp = fp
|
||||
self._lock = Lock()
|
||||
self._closefd = closefd
|
||||
|
||||
self.entries: 'Dict[str, ExeFSEntry]' = {}
|
||||
|
||||
@@ -232,7 +231,7 @@ class ExeFSReader:
|
||||
try:
|
||||
with self.open('icon') as f:
|
||||
self.icon = SMDH.load(f)
|
||||
except InvalidSMDHError:
|
||||
except (ExeFSFileNotFoundError, InvalidSMDHError):
|
||||
pass
|
||||
|
||||
def __len__(self) -> int:
|
||||
@@ -241,6 +240,7 @@ class ExeFSReader:
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
if self._closefd:
|
||||
try:
|
||||
self._fp.close()
|
||||
except AttributeError:
|
||||
@@ -259,7 +259,15 @@ class ExeFSReader:
|
||||
if normalize:
|
||||
# remove beginning "/" and ending ".bin"
|
||||
path = _normalize_path(path)
|
||||
try:
|
||||
entry = self.entries[path]
|
||||
except KeyError:
|
||||
raise ExeFSFileNotFoundError(path)
|
||||
if entry.offset == -1:
|
||||
# this would be the decompressed .code, if the original .code was compressed
|
||||
return _ExeFSOpenFile(self, path)
|
||||
else:
|
||||
return SubsectionIO(self._fp, self._start + EXEFS_HEADER_SIZE + entry.offset, entry.size)
|
||||
|
||||
def get_data(self, info: ExeFSEntry, offset: int, size: int) -> bytes:
|
||||
if offset + size > info.size:
|
||||
@@ -15,8 +15,9 @@ from typing import TYPE_CHECKING, NamedTuple
|
||||
from .exefs import ExeFSReader, EXEFS_HEADER_SIZE
|
||||
from .romfs import RomFSReader
|
||||
from ..common import PyCTRError, _ReaderOpenFileBase
|
||||
from ..util import config_dirs, readle, roundup
|
||||
from ..crypto import CryptoEngine, Keyslot
|
||||
from ..fileio import SubsectionIO
|
||||
from ..util import config_dirs, readle, roundup
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import BinaryIO, Dict, List, Optional, Tuple, Union
|
||||
@@ -150,7 +151,7 @@ class NCCHReader:
|
||||
romfs: 'Optional[RomFSReader]' = None
|
||||
|
||||
def __init__(self, fp: 'Union[str, BinaryIO]', *, case_insensitive: bool = True, crypto: CryptoEngine = None,
|
||||
dev: bool = False, seeddb: str = None, load_sections: bool = True):
|
||||
dev: bool = False, seeddb: str = None, load_sections: bool = True, assume_decrypted: bool = False):
|
||||
if isinstance(fp, str):
|
||||
fp = open(fp, 'rb')
|
||||
|
||||
@@ -159,6 +160,9 @@ class NCCHReader:
|
||||
else:
|
||||
self._crypto = CryptoEngine(dev=dev)
|
||||
|
||||
# old decryption methods did not fix the flags, so sometimes we have to assume it is decrypted
|
||||
self.assume_decrypted = assume_decrypted
|
||||
|
||||
# store the starting offset so the NCCH can be read from any point in the base file
|
||||
self._start = fp.tell()
|
||||
self._fp = fp
|
||||
@@ -308,7 +312,17 @@ class NCCHReader:
|
||||
|
||||
def open_raw_section(self, section: 'NCCHSection'):
|
||||
"""Open a raw NCCH section for reading."""
|
||||
# check if the region is ExeFS and uses a newer keyslot, or is fulldec, and use a specific file class
|
||||
if (section == NCCHSection.ExeFS and self.extra_keyslot) or (section == NCCHSection.FullDecrypted):
|
||||
return _NCCHSectionFile(self, section)
|
||||
else:
|
||||
region = self.sections[section]
|
||||
fh = SubsectionIO(self._fp, self._start + region.offset, region.size)
|
||||
# if the region is encrypted (not ExeFS if an extra keyslot is in use), wrap it in CTRFileIO
|
||||
if not (self.assume_decrypted or self.flags.no_crypto or section in NO_ENCRYPTION):
|
||||
keyslot = self.extra_keyslot if region.section == NCCHSection.RomFS else Keyslot.NCCH
|
||||
fh = self._crypto.create_ctr_io(keyslot, fh, region.iv)
|
||||
return fh
|
||||
|
||||
def get_key_y(self, original: bool = False) -> bytes:
|
||||
if original or not self.flags.uses_seed:
|
||||
@@ -447,7 +461,8 @@ class NCCHReader:
|
||||
|
||||
with self._lock:
|
||||
# check if decryption is really needed
|
||||
if self.flags.no_crypto or region.section in NO_ENCRYPTION:
|
||||
if self.assume_decrypted or self.flags.no_crypto or region.section in NO_ENCRYPTION:
|
||||
# this is currently used to support FullDecrypted. other sections use SubsectionIO + CTRFileIO.
|
||||
self._fp.seek(self._start + region.offset + offset)
|
||||
return self._fp.read(size)
|
||||
|
||||
@@ -503,6 +518,8 @@ class NCCHReader:
|
||||
# join all the chunks into one bytes result and return it
|
||||
return b''.join(do_thing(aligned_offset, aligned_size, before, 0x200 - ((size + before) % 0x200)))
|
||||
else:
|
||||
# this is currently used to support FullDecrypted. other sections use SubsectionIO + CTRFileIO.
|
||||
|
||||
# seek to the real offset of the section + the requested offset
|
||||
self._fp.seek(self._start + region.offset + offset)
|
||||
data = self._fp.read(size)
|
||||
@@ -9,6 +9,7 @@ from threading import Lock
|
||||
from typing import overload, TYPE_CHECKING, NamedTuple
|
||||
|
||||
from ..common import PyCTRError, _ReaderOpenFileBase
|
||||
from ..fileio import SubsectionIO
|
||||
from ..util import readle, roundup
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -64,16 +65,6 @@ class RomFSFileEntry(NamedTuple):
|
||||
size: int
|
||||
|
||||
|
||||
class _RomFSOpenFile(_ReaderOpenFileBase):
|
||||
"""Class for open RomFS file entries."""
|
||||
|
||||
def __init__(self, reader: 'RomFSReader', path: str):
|
||||
super().__init__(reader, path)
|
||||
self._info: RomFSFileEntry = reader.get_info_from_path(path)
|
||||
if not isinstance(self._info, RomFSFileEntry):
|
||||
raise RomFSIsADirectoryError(path)
|
||||
|
||||
|
||||
class RomFSReader:
|
||||
"""
|
||||
Class for 3DS RomFS Level 3 partition.
|
||||
@@ -206,11 +197,14 @@ class RomFSReader:
|
||||
|
||||
@overload
|
||||
def open(self, path: str, encoding: None = None, errors: 'Optional[str]' = None,
|
||||
newline: 'Optional[str]' = None) -> _RomFSOpenFile: ...
|
||||
newline: 'Optional[str]' = None) -> SubsectionIO: ...
|
||||
|
||||
def open(self, path, encoding=None, errors=None, newline=None):
|
||||
"""Open a file in the RomFS for reading."""
|
||||
f = _RomFSOpenFile(self, path)
|
||||
file_info = self.get_info_from_path(path)
|
||||
if not isinstance(file_info, RomFSFileEntry):
|
||||
raise RomFSIsADirectoryError(path)
|
||||
f = SubsectionIO(self._fp, self._start + self.data_offset + file_info.offset, file_info.size)
|
||||
if encoding is not None:
|
||||
f = TextIOWrapper(f, encoding, errors, newline)
|
||||
return f
|
||||
@@ -237,10 +231,3 @@ class RomFSReader:
|
||||
return RomFSDirectoryEntry(name=curr['name'], type='dir', contents=(*contents,))
|
||||
elif curr['type'] == 'file':
|
||||
return RomFSFileEntry(name=curr['name'], type='file', offset=curr['offset'], size=curr['size'])
|
||||
|
||||
def get_data(self, info: RomFSFileEntry, offset: int, size: int) -> bytes:
|
||||
if offset + size > info.size:
|
||||
size = info.size - offset
|
||||
with self._lock:
|
||||
self._fp.seek(self._start + self.data_offset + info.offset + offset)
|
||||
return self._fp.read(size)
|
||||
@@ -1,9 +0,0 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
|
||||
class ExtendedHeaderReader:
|
||||
def __init__(self):
|
||||
Reference in New Issue
Block a user