mirror of
https://github.com/ihaveamac/custom-install.git
synced 2025-12-06 06:41:45 +00:00
remove pyctr copy, add to requirements.txt
This commit is contained in:
BIN
pyctr/.DS_Store
vendored
BIN
pyctr/.DS_Store
vendored
Binary file not shown.
@@ -1,7 +0,0 @@
|
||||
# PyCTR
|
||||
|
||||
Python library to parse several Nintendo 3DS files.
|
||||
|
||||
The API is not stable and can significantly change at any point. (If anyone decides to use this, that is)
|
||||
|
||||
This will eventually be a separate repo...
|
||||
@@ -1,82 +0,0 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
from functools import wraps
|
||||
from io import BufferedIOBase
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# this is a lazy way to make type checkers stop complaining
|
||||
from typing import BinaryIO
|
||||
BufferedIOBase = BinaryIO
|
||||
|
||||
|
||||
class PyCTRError(Exception):
|
||||
"""Common base class for all PyCTR errors."""
|
||||
|
||||
|
||||
def _raise_if_closed(method):
|
||||
@wraps(method)
|
||||
def decorator(self: '_ReaderOpenFileBase', *args, **kwargs):
|
||||
if self._reader.closed:
|
||||
self.closed = True
|
||||
if self.closed:
|
||||
raise ValueError('I/O operation on closed file')
|
||||
return method(self, *args, **kwargs)
|
||||
return decorator
|
||||
|
||||
|
||||
class _ReaderOpenFileBase(BufferedIOBase):
|
||||
"""Base class for all open files for Reader classes."""
|
||||
|
||||
_seek = 0
|
||||
_info = None
|
||||
closed = False
|
||||
|
||||
def __init__(self, reader, path):
|
||||
self._reader = reader
|
||||
self._path = path
|
||||
|
||||
def __repr__(self):
|
||||
return f'<{type(self).__name__} path={self._path!r} info={self._info!r} reader={self._reader!r}>'
|
||||
|
||||
@_raise_if_closed
|
||||
def read(self, size: int = -1) -> bytes:
|
||||
if size == -1:
|
||||
size = self._info.size - self._seek
|
||||
data = self._reader.get_data(self._info, self._seek, size)
|
||||
self._seek += len(data)
|
||||
return data
|
||||
|
||||
read1 = read # probably make this act like read1 should, but this for now enables some other things to work
|
||||
|
||||
@_raise_if_closed
|
||||
def seek(self, seek: int, whence: int = 0) -> int:
|
||||
if whence == 0:
|
||||
if seek < 0:
|
||||
raise ValueError(f'negative seek value {seek}')
|
||||
self._seek = min(seek, self._info.size)
|
||||
elif whence == 1:
|
||||
self._seek = max(self._seek + seek, 0)
|
||||
elif whence == 2:
|
||||
self._seek = max(self._info.size + seek, 0)
|
||||
return self._seek
|
||||
|
||||
@_raise_if_closed
|
||||
def tell(self) -> int:
|
||||
return self._seek
|
||||
|
||||
@_raise_if_closed
|
||||
def readable(self) -> bool:
|
||||
return True
|
||||
|
||||
@_raise_if_closed
|
||||
def writable(self) -> bool:
|
||||
return False
|
||||
|
||||
@_raise_if_closed
|
||||
def seekable(self) -> bool:
|
||||
return True
|
||||
747
pyctr/crypto.py
747
pyctr/crypto.py
@@ -1,747 +0,0 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
from enum import IntEnum
|
||||
from functools import wraps
|
||||
from hashlib import sha256
|
||||
from io import BufferedIOBase
|
||||
from os import environ
|
||||
from os.path import getsize, join as pjoin
|
||||
from struct import pack, unpack
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from Cryptodome.Cipher import AES
|
||||
from Cryptodome.Hash import CMAC
|
||||
from Cryptodome.Util import Counter
|
||||
|
||||
from .common import PyCTRError, _raise_if_closed
|
||||
from .util import config_dirs, readbe, readle
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# noinspection PyProtectedMember
|
||||
from Cryptodome.Cipher._mode_cbc import CbcMode
|
||||
# noinspection PyProtectedMember
|
||||
from Cryptodome.Cipher._mode_ctr import CtrMode
|
||||
# noinspection PyProtectedMember
|
||||
from Cryptodome.Cipher._mode_ecb import EcbMode
|
||||
from Cryptodome.Hash.CMAC import CMAC as CMACObject
|
||||
from typing import BinaryIO, Dict, List, Optional, Union
|
||||
|
||||
__all__ = ['CryptoError', 'OTPLengthError', 'CorruptBootromError', 'KeyslotMissingError', 'TicketLengthError',
|
||||
'BootromNotFoundError', 'CorruptOTPError', 'Keyslot', 'CryptoEngine']
|
||||
|
||||
|
||||
class CryptoError(PyCTRError):
|
||||
"""Generic exception for cryptography operations."""
|
||||
|
||||
|
||||
class OTPLengthError(CryptoError):
|
||||
"""OTP is the wrong length."""
|
||||
|
||||
|
||||
class CorruptOTPError(CryptoError):
|
||||
"""OTP hash does not match."""
|
||||
|
||||
|
||||
class KeyslotMissingError(CryptoError):
|
||||
"""Normal key is not set up for the keyslot."""
|
||||
|
||||
|
||||
class BadMovableSedError(CryptoError):
|
||||
"""movable.sed provided is invalid."""
|
||||
|
||||
|
||||
class TicketLengthError(CryptoError):
|
||||
"""Ticket is too small."""
|
||||
def __init__(self, length):
|
||||
super().__init__(length)
|
||||
|
||||
def __str__(self):
|
||||
return f'0x350 expected, {self.args[0]:#x} given'
|
||||
|
||||
|
||||
# wonder if I'm doing this right...
|
||||
class BootromNotFoundError(CryptoError):
|
||||
"""ARM9 bootROM was not found. Main argument is a tuple of checked paths."""
|
||||
|
||||
|
||||
class CorruptBootromError(CryptoError):
|
||||
"""ARM9 bootROM hash does not match."""
|
||||
|
||||
|
||||
class Keyslot(IntEnum):
|
||||
TWLNAND = 0x03
|
||||
CTRNANDOld = 0x04
|
||||
CTRNANDNew = 0x05
|
||||
FIRM = 0x06
|
||||
AGB = 0x07
|
||||
|
||||
CMACNANDDB = 0x0B
|
||||
|
||||
NCCH93 = 0x18
|
||||
CMACCardSaveNew = 0x19
|
||||
CardSaveNew = 0x1A
|
||||
NCCH96 = 0x1B
|
||||
|
||||
CMACAGB = 0x24
|
||||
NCCH70 = 0x25
|
||||
|
||||
NCCH = 0x2C
|
||||
UDSLocalWAN = 0x2D
|
||||
StreetPass = 0x2E
|
||||
Save60 = 0x2F
|
||||
CMACSDNAND = 0x30
|
||||
|
||||
CMACCardSave = 0x33
|
||||
SD = 0x34
|
||||
|
||||
CardSave = 0x37
|
||||
BOSS = 0x38
|
||||
DownloadPlay = 0x39
|
||||
|
||||
DSiWareExport = 0x3A
|
||||
|
||||
CommonKey = 0x3D
|
||||
|
||||
# anything after 0x3F is custom to PyCTR
|
||||
DecryptedTitlekey = 0x40
|
||||
|
||||
|
||||
BOOT9_PROT_HASH = '7331f7edece3dd33f2ab4bd0b3a5d607229fd19212c10b734cedcaf78c1a7b98'
|
||||
|
||||
DEV_COMMON_KEY_0 = bytes.fromhex('55A3F872BDC80C555A654381139E153B')
|
||||
|
||||
common_key_y = (
|
||||
# eShop
|
||||
0xD07B337F9CA4385932A2E25723232EB9,
|
||||
# System
|
||||
0x0C767230F0998F1C46828202FAACBE4C,
|
||||
# Unknown
|
||||
0xC475CB3AB8C788BB575E12A10907B8A4,
|
||||
# Unknown
|
||||
0xE486EEE3D0C09C902F6686D4C06F649F,
|
||||
# Unknown
|
||||
0xED31BA9C04B067506C4497A35B7804FC,
|
||||
# Unknown
|
||||
0x5E66998AB4E8931606850FD7A16DD755
|
||||
)
|
||||
|
||||
base_key_x = {
|
||||
# New3DS 9.3 NCCH
|
||||
0x18: (0x82E9C9BEBFB8BDB875ECC0A07D474374, 0x304BF1468372EE64115EBD4093D84276),
|
||||
# New3DS 9.6 NCCH
|
||||
0x1B: (0x45AD04953992C7C893724A9A7BCE6182, 0x6C8B2944A0726035F941DFC018524FB6),
|
||||
# 7x NCCH
|
||||
0x25: (0xCEE7D8AB30C00DAE850EF5E382AC5AF3, 0x81907A4B6F1B47323A677974CE4AD71B),
|
||||
}
|
||||
|
||||
# global values to be copied to new CryptoEngine instances after the first one
|
||||
_b9_key_x: 'Dict[int, int]' = {}
|
||||
_b9_key_y: 'Dict[int, int]' = {}
|
||||
_b9_key_normal: 'Dict[int, bytes]' = {}
|
||||
_b9_extdata_otp: bytes = None
|
||||
_b9_extdata_keygen: bytes = None
|
||||
_b9_path: str = None
|
||||
_otp_key: bytes = None
|
||||
_otp_iv: bytes = None
|
||||
|
||||
b9_paths: 'List[str]' = []
|
||||
for p in config_dirs:
|
||||
b9_paths.append(pjoin(p, 'boot9.bin'))
|
||||
b9_paths.append(pjoin(p, 'boot9_prot.bin'))
|
||||
try:
|
||||
b9_paths.insert(0, environ['BOOT9_PATH'])
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
|
||||
def _requires_bootrom(method):
|
||||
@wraps(method)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
if not self.b9_keys_set:
|
||||
raise KeyslotMissingError('bootrom is required to set up keys, see setup_keys_from_boot9')
|
||||
return method(self, *args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
# used from http://www.falatic.com/index.php/108/python-and-bitwise-rotation
|
||||
# converted to def because pycodestyle complained to me
|
||||
def rol(val: int, r_bits: int, max_bits: int) -> int:
|
||||
return (val << r_bits % max_bits) & (2 ** max_bits - 1) |\
|
||||
((val & (2 ** max_bits - 1)) >> (max_bits - (r_bits % max_bits)))
|
||||
|
||||
|
||||
class _TWLCryptoWrapper:
|
||||
def __init__(self, cipher: 'CbcMode'):
|
||||
self._cipher = cipher
|
||||
|
||||
def encrypt(self, data: bytes) -> bytes:
|
||||
data_len = len(data)
|
||||
data_rev = bytearray(data_len)
|
||||
for i in range(0, data_len, 0x10):
|
||||
data_rev[i:i + 0x10] = data[i:i + 0x10][::-1]
|
||||
|
||||
data_out = bytearray(self._cipher.encrypt(bytes(data_rev)))
|
||||
|
||||
for i in range(0, data_len, 0x10):
|
||||
data_out[i:i + 0x10] = data_out[i:i + 0x10][::-1]
|
||||
return bytes(data_out[0:data_len])
|
||||
|
||||
decrypt = encrypt
|
||||
|
||||
|
||||
class CryptoEngine:
|
||||
"""Class for 3DS crypto operations, including encryption and key generation."""
|
||||
|
||||
b9_keys_set: bool = False
|
||||
b9_path: str = None
|
||||
|
||||
_b9_extdata_otp: bytes = None
|
||||
_b9_extdata_keygen: bytes = None
|
||||
|
||||
_otp_device_id: int = None
|
||||
_otp_key: bytes = None
|
||||
_otp_iv: bytes = None
|
||||
|
||||
_id0: bytes = None
|
||||
|
||||
def __init__(self, boot9: str = None, dev: int = 0, setup_b9_keys: bool = True):
|
||||
self.key_x: Dict[int, int] = {}
|
||||
self.key_y: Dict[int, int] = {0x03: 0xE1A00005202DDD1DBD4DC4D30AB9DC76,
|
||||
0x05: 0x4D804F4E9990194613A204AC584460BE}
|
||||
self.key_normal: Dict[int, bytes] = {}
|
||||
|
||||
self.dev = dev
|
||||
|
||||
for keyslot, keys in base_key_x.items():
|
||||
self.key_x[keyslot] = keys[dev]
|
||||
|
||||
if setup_b9_keys:
|
||||
self.setup_keys_from_boot9_file(boot9)
|
||||
|
||||
@property
|
||||
@_requires_bootrom
|
||||
def b9_extdata_otp(self) -> bytes:
|
||||
return self._b9_extdata_otp
|
||||
|
||||
@property
|
||||
@_requires_bootrom
|
||||
def b9_extdata_keygen(self) -> bytes:
|
||||
return self._b9_extdata_keygen
|
||||
|
||||
@property
|
||||
@_requires_bootrom
|
||||
def otp_device_id(self) -> int:
|
||||
return self._otp_device_id
|
||||
|
||||
@property
|
||||
@_requires_bootrom
|
||||
def otp_key(self) -> bytes:
|
||||
return self._otp_key
|
||||
|
||||
@property
|
||||
@_requires_bootrom
|
||||
def otp_iv(self) -> bytes:
|
||||
return self._otp_iv
|
||||
|
||||
@property
|
||||
def id0(self) -> bytes:
|
||||
if not self._id0:
|
||||
raise KeyslotMissingError('load a movable.sed with setup_sd_key')
|
||||
return self._id0
|
||||
|
||||
def create_cbc_cipher(self, keyslot: Keyslot, iv: bytes) -> 'CbcMode':
|
||||
"""Create AES-CBC cipher with the given keyslot."""
|
||||
try:
|
||||
key = self.key_normal[keyslot]
|
||||
except KeyError:
|
||||
raise KeyslotMissingError(f'normal key for keyslot 0x{keyslot:02x} is not set up')
|
||||
|
||||
return AES.new(key, AES.MODE_CBC, iv)
|
||||
|
||||
def create_ctr_cipher(self, keyslot: Keyslot, ctr: int) -> 'Union[CtrMode, _TWLCryptoWrapper]':
|
||||
"""
|
||||
Create an AES-CTR cipher with the given keyslot.
|
||||
|
||||
Normal and DSi crypto will be automatically chosen depending on keyslot.
|
||||
"""
|
||||
try:
|
||||
key = self.key_normal[keyslot]
|
||||
except KeyError:
|
||||
raise KeyslotMissingError(f'normal key for keyslot 0x{keyslot:02x} is not set up')
|
||||
|
||||
cipher = AES.new(key, AES.MODE_CTR, counter=Counter.new(128, initial_value=ctr))
|
||||
|
||||
if keyslot < 0x04:
|
||||
return _TWLCryptoWrapper(cipher)
|
||||
else:
|
||||
return cipher
|
||||
|
||||
def create_ecb_cipher(self, keyslot: Keyslot) -> 'EcbMode':
|
||||
"""Create an AES-ECB cipher with the given keyslot."""
|
||||
try:
|
||||
key = self.key_normal[keyslot]
|
||||
except KeyError:
|
||||
raise KeyslotMissingError(f'normal key for keyslot 0x{keyslot:02x} is not set up')
|
||||
|
||||
return AES.new(key, AES.MODE_ECB)
|
||||
|
||||
def create_cmac_object(self, keyslot: Keyslot) -> 'CMACObject':
|
||||
"""Create a CMAC object with the given keyslot."""
|
||||
try:
|
||||
key = self.key_normal[keyslot]
|
||||
except KeyError:
|
||||
raise KeyslotMissingError(f'normal key for keyslot 0x{keyslot:02x} is not set up')
|
||||
|
||||
return CMAC.new(key, ciphermod=AES)
|
||||
|
||||
def create_ctr_io(self, keyslot: Keyslot, fh: 'BinaryIO', ctr: int):
|
||||
"""Create an AES-CTR read-write file object with the given keyslot."""
|
||||
return CTRFileIO(fh, self, keyslot, ctr)
|
||||
|
||||
def create_cbc_io(self, keyslot: Keyslot, fh: 'BinaryIO', iv: bytes):
|
||||
"""Create an AES-CBC read-only file object with the given keyslot."""
|
||||
return CBCFileIO(fh, self, keyslot, iv)
|
||||
|
||||
@staticmethod
|
||||
def sd_path_to_iv(path: str) -> int:
|
||||
# ensure the path is lowercase
|
||||
path = path.lower()
|
||||
|
||||
# SD Save Data Backup does a copy of the raw, encrypted file from the game's data directory
|
||||
# so we need to handle this and fake the path
|
||||
if path.startswith('/backup') and len(path) > 28:
|
||||
tid_upper = path[12:20]
|
||||
tid_lower = path[20:28]
|
||||
path = f'/title/{tid_upper}/{tid_lower}/data' + path[28:]
|
||||
|
||||
path_hash = sha256(path.encode('utf-16le') + b'\0\0').digest()
|
||||
hash_p1 = readbe(path_hash[0:16])
|
||||
hash_p2 = readbe(path_hash[16:32])
|
||||
return hash_p1 ^ hash_p2
|
||||
|
||||
def load_from_ticket(self, ticket: bytes):
|
||||
"""Load a titlekey from a ticket and set keyslot 0x40 to the decrypted titlekey."""
|
||||
ticket_len = len(ticket)
|
||||
# TODO: probably support other sig types which would be different lengths
|
||||
# unlikely to happen in practice, but I would still like to
|
||||
if ticket_len < 0x2AC:
|
||||
raise TicketLengthError(ticket_len)
|
||||
|
||||
titlekey_enc = ticket[0x1BF:0x1CF]
|
||||
title_id = ticket[0x1DC:0x1E4]
|
||||
common_key_index = ticket[0x1F1]
|
||||
|
||||
if self.dev and common_key_index == 0:
|
||||
self.set_normal_key(0x3D, DEV_COMMON_KEY_0)
|
||||
else:
|
||||
self.set_keyslot('y', 0x3D, common_key_y[common_key_index])
|
||||
|
||||
cipher = self.create_cbc_cipher(0x3D, title_id + (b'\0' * 8))
|
||||
self.set_normal_key(0x40, cipher.decrypt(titlekey_enc))
|
||||
|
||||
def set_keyslot(self, xy: str, keyslot: int, key: 'Union[int, bytes]'):
|
||||
"""Sets a keyslot to the specified key."""
|
||||
to_use = None
|
||||
if xy == 'x':
|
||||
to_use = self.key_x
|
||||
elif xy == 'y':
|
||||
to_use = self.key_y
|
||||
if isinstance(key, bytes):
|
||||
key = int.from_bytes(key, 'big' if keyslot > 0x03 else 'little')
|
||||
to_use[keyslot] = key
|
||||
try:
|
||||
self.key_normal[keyslot] = self.keygen(keyslot)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
def set_normal_key(self, keyslot: int, key: bytes):
|
||||
self.key_normal[keyslot] = key
|
||||
|
||||
def keygen(self, keyslot: int) -> bytes:
|
||||
"""Generate a normal key based on the keyslot."""
|
||||
if keyslot < 0x04:
|
||||
# DSi
|
||||
return self.keygen_twl_manual(self.key_x[keyslot], self.key_y[keyslot])
|
||||
else:
|
||||
# 3DS
|
||||
return self.keygen_manual(self.key_x[keyslot], self.key_y[keyslot])
|
||||
|
||||
@staticmethod
|
||||
def keygen_manual(key_x: int, key_y: int) -> bytes:
|
||||
"""Generate a normal key using the 3DS AES keyscrambler."""
|
||||
return rol((rol(key_x, 2, 128) ^ key_y) + 0x1FF9E9AAC5FE0408024591DC5D52768A, 87, 128).to_bytes(0x10, 'big')
|
||||
|
||||
@staticmethod
|
||||
def keygen_twl_manual(key_x: int, key_y: int) -> bytes:
|
||||
"""Generate a normal key using the DSi AES keyscrambler."""
|
||||
# usually would convert to LE bytes in the end then flip with [::-1], but those just cancel out
|
||||
return rol((key_x ^ key_y) + 0xFFFEFB4E295902582A680F5F1A4F3E79, 42, 128).to_bytes(0x10, 'big')
|
||||
|
||||
def _copy_global_keys(self):
|
||||
self.key_x.update(_b9_key_x)
|
||||
self.key_y.update(_b9_key_y)
|
||||
self.key_normal.update(_b9_key_normal)
|
||||
self._otp_key = _otp_key
|
||||
self._otp_iv = _otp_iv
|
||||
self._b9_extdata_otp = _b9_extdata_otp
|
||||
self._b9_extdata_keygen = _b9_extdata_keygen
|
||||
|
||||
self.b9_keys_set = True
|
||||
|
||||
def setup_keys_from_boot9(self, b9: bytes):
|
||||
"""Set up certain keys from an ARM9 bootROM dump."""
|
||||
global _otp_key, _otp_iv, _b9_extdata_otp, _b9_extdata_keygen
|
||||
if self.b9_keys_set:
|
||||
return
|
||||
|
||||
if _b9_key_x:
|
||||
self._copy_global_keys()
|
||||
return
|
||||
|
||||
b9_len = len(b9)
|
||||
if b9_len != 0x8000:
|
||||
raise CorruptBootromError(f'wrong length: {b9_len}')
|
||||
|
||||
b9_hash_digest: str = sha256(b9).hexdigest()
|
||||
if b9_hash_digest != BOOT9_PROT_HASH:
|
||||
raise CorruptBootromError(f'expected: {BOOT9_PROT_HASH}; returned: {b9_hash_digest}')
|
||||
|
||||
keyblob_offset = 0x5860
|
||||
otp_key_offset = 0x56E0
|
||||
if self.dev:
|
||||
keyblob_offset += 0x400
|
||||
otp_key_offset += 0x20
|
||||
|
||||
_otp_key = b9[otp_key_offset:otp_key_offset + 0x10]
|
||||
_otp_iv = b9[otp_key_offset + 0x10:otp_key_offset + 0x20]
|
||||
|
||||
keyblob: bytes = b9[keyblob_offset:keyblob_offset + 0x400]
|
||||
|
||||
_b9_extdata_keygen = keyblob[0:0x200]
|
||||
_b9_extdata_otp = keyblob[0:0x24]
|
||||
|
||||
# Original NCCH key, UDS local-WLAN CCMP key, StreetPass key, 6.0 save key
|
||||
_b9_key_x[0x2C] = _b9_key_x[0x2D] = _b9_key_x[0x2E] = _b9_key_x[0x2F] = readbe(keyblob[0x170:0x180])
|
||||
|
||||
# SD/NAND AES-CMAC key, APT wrap key, Unknown, Gamecard savedata AES-CMAC
|
||||
_b9_key_x[0x30] = _b9_key_x[0x31] = _b9_key_x[0x32] = _b9_key_x[0x33] = readbe(keyblob[0x180:0x190])
|
||||
|
||||
# SD key (loaded from movable.sed), movable.sed key, Unknown (used by friends module),
|
||||
# Gamecard savedata actual key
|
||||
_b9_key_x[0x34] = _b9_key_x[0x35] = _b9_key_x[0x36] = _b9_key_x[0x37] = readbe(keyblob[0x190:0x1A0])
|
||||
|
||||
# BOSS key, Download Play key + actual NFC key for generating retail amiibo keys, CTR-CARD hardware-crypto seed
|
||||
# decryption key
|
||||
_b9_key_x[0x38] = _b9_key_x[0x39] = _b9_key_x[0x3A] = _b9_key_x[0x3B] = readbe(keyblob[0x1A0:0x1B0])
|
||||
|
||||
# Unused
|
||||
_b9_key_x[0x3C] = readbe(keyblob[0x1B0:0x1C0])
|
||||
|
||||
# Common key (titlekey crypto)
|
||||
_b9_key_x[0x3D] = readbe(keyblob[0x1C0:0x1D0])
|
||||
|
||||
# Unused
|
||||
_b9_key_x[0x3E] = readbe(keyblob[0x1D0:0x1E0])
|
||||
|
||||
# NAND partition keys
|
||||
_b9_key_y[0x04] = readbe(keyblob[0x1F0:0x200])
|
||||
# correct 0x05 KeyY not set by boot9.
|
||||
_b9_key_y[0x06] = readbe(keyblob[0x210:0x220])
|
||||
_b9_key_y[0x07] = readbe(keyblob[0x220:0x230])
|
||||
|
||||
# Unused, Unused, DSiWare export key, NAND dbs/movable.sed AES-CMAC key
|
||||
_b9_key_y[0x08] = readbe(keyblob[0x230:0x240])
|
||||
_b9_key_y[0x09] = readbe(keyblob[0x240:0x250])
|
||||
_b9_key_y[0x0A] = readbe(keyblob[0x250:0x260])
|
||||
_b9_key_y[0x0B] = readbe(keyblob[0x260:0x270])
|
||||
|
||||
_b9_key_normal[0x0D] = keyblob[0x270:0x280]
|
||||
|
||||
self._copy_global_keys()
|
||||
|
||||
def setup_keys_from_boot9_file(self, path: str = None):
|
||||
"""Set up certain keys from an ARM9 bootROM file."""
|
||||
global _b9_path
|
||||
if self.b9_keys_set:
|
||||
return
|
||||
|
||||
if _b9_key_x:
|
||||
self.b9_path = _b9_path
|
||||
self._copy_global_keys()
|
||||
return
|
||||
|
||||
paths = (path,) if path else b9_paths
|
||||
|
||||
for p in paths:
|
||||
try:
|
||||
b9_size = getsize(p)
|
||||
if b9_size in {0x8000, 0x10000}:
|
||||
with open(p, 'rb') as f:
|
||||
if b9_size == 0x10000:
|
||||
f.seek(0x8000)
|
||||
self.setup_keys_from_boot9(f.read(0x8000))
|
||||
_b9_path = p
|
||||
self.b9_path = p
|
||||
return
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
|
||||
# if keys are not set...
|
||||
raise BootromNotFoundError(paths)
|
||||
|
||||
@_requires_bootrom
|
||||
def setup_keys_from_otp(self, otp: bytes):
|
||||
"""Set up console-unique keys from an OTP dump. Encrypted and decrypted are supported."""
|
||||
otp_len = len(otp)
|
||||
if otp_len != 0x100:
|
||||
raise OTPLengthError(otp_len)
|
||||
|
||||
cipher_otp = AES.new(self.otp_key, AES.MODE_CBC, self.otp_iv)
|
||||
if otp[0:4] == b'\x0f\xb0\xad\xde':
|
||||
# decrypted otp
|
||||
otp_enc: bytes = cipher_otp.encrypt(otp)
|
||||
otp_dec = otp
|
||||
else:
|
||||
# encrypted otp
|
||||
otp_enc = otp
|
||||
otp_dec: bytes = cipher_otp.decrypt(otp)
|
||||
|
||||
self._otp_device_id = int.from_bytes(otp_dec[4:8], 'little')
|
||||
|
||||
otp_hash: bytes = otp_dec[0xE0:0x100]
|
||||
otp_hash_digest: bytes = sha256(otp_dec[0:0xE0]).digest()
|
||||
if otp_hash_digest != otp_hash:
|
||||
raise CorruptOTPError(f'expected: {otp_hash.hex()}; result: {otp_hash_digest.hex()}')
|
||||
|
||||
otp_keysect_hash: bytes = sha256(otp_enc[0:0x90]).digest()
|
||||
|
||||
self.set_keyslot('x', 0x11, otp_keysect_hash[0:0x10])
|
||||
self.set_keyslot('y', 0x11, otp_keysect_hash[0:0x10])
|
||||
|
||||
# most otp code from https://github.com/Stary2001/3ds_tools/blob/master/three_ds/aesengine.py
|
||||
|
||||
twl_cid_lo, twl_cid_hi = readle(otp_dec[0x08:0xC]), readle(otp_dec[0xC:0x10])
|
||||
twl_cid_lo ^= 0xB358A6AF
|
||||
twl_cid_lo |= 0x80000000
|
||||
twl_cid_hi ^= 0x08C267B7
|
||||
twl_cid_lo = twl_cid_lo.to_bytes(4, 'little')
|
||||
twl_cid_hi = twl_cid_hi.to_bytes(4, 'little')
|
||||
self.set_keyslot('x', 0x03, twl_cid_lo + b'NINTENDO' + twl_cid_hi)
|
||||
|
||||
console_key_xy: bytes = sha256(otp_dec[0x90:0xAC] + self.b9_extdata_otp).digest()
|
||||
self.set_keyslot('x', 0x3F, console_key_xy[0:0x10])
|
||||
self.set_keyslot('y', 0x3F, console_key_xy[0x10:0x20])
|
||||
|
||||
extdata_off = 0
|
||||
|
||||
def gen(n: int) -> bytes:
|
||||
nonlocal extdata_off
|
||||
extdata_off += 36
|
||||
iv = self.b9_extdata_keygen[extdata_off:extdata_off+16]
|
||||
extdata_off += 16
|
||||
|
||||
data = self.create_cbc_cipher(0x3F, iv).encrypt(self.b9_extdata_keygen[extdata_off:extdata_off + 64])
|
||||
|
||||
extdata_off += n
|
||||
return data
|
||||
|
||||
a = gen(64)
|
||||
for i in range(0x4, 0x8):
|
||||
self.set_keyslot('x', i, a[0:16])
|
||||
|
||||
for i in range(0x8, 0xc):
|
||||
self.set_keyslot('x', i, a[16:32])
|
||||
|
||||
for i in range(0xc, 0x10):
|
||||
self.set_keyslot('x', i, a[32:48])
|
||||
|
||||
self.set_keyslot('x', 0x10, a[48:64])
|
||||
|
||||
b = gen(16)
|
||||
off = 0
|
||||
for i in range(0x14, 0x18):
|
||||
self.set_keyslot('x', i, b[off:off + 16])
|
||||
off += 16
|
||||
|
||||
c = gen(64)
|
||||
for i in range(0x18, 0x1c):
|
||||
self.set_keyslot('x', i, c[0:16])
|
||||
|
||||
for i in range(0x1c, 0x20):
|
||||
self.set_keyslot('x', i, c[16:32])
|
||||
|
||||
for i in range(0x20, 0x24):
|
||||
self.set_keyslot('x', i, c[32:48])
|
||||
|
||||
self.set_keyslot('x', 0x24, c[48:64])
|
||||
|
||||
d = gen(16)
|
||||
off = 0
|
||||
|
||||
for i in range(0x28, 0x2c):
|
||||
self.set_keyslot('x', i, d[off:off + 16])
|
||||
off += 16
|
||||
|
||||
@_requires_bootrom
|
||||
def setup_keys_from_otp_file(self, path: str):
|
||||
"""Set up console-unique keys from an OTP file. Encrypted and decrypted are supported."""
|
||||
with open(path, 'rb') as f:
|
||||
self.setup_keys_from_otp(f.read(0x100))
|
||||
|
||||
def setup_sd_key(self, data: bytes):
|
||||
"""Set up the SD key from movable.sed. Must be 0x10 (only key), 0x120 (no cmac), or 0x140 (with cmac)."""
|
||||
if len(data) == 0x10:
|
||||
key = data
|
||||
elif len(data) in {0x120, 0x140}:
|
||||
key = data[0x110:0x120]
|
||||
else:
|
||||
raise BadMovableSedError(f'invalid length ({len(data):#x}')
|
||||
|
||||
self.set_keyslot('y', Keyslot.SD, key)
|
||||
self.set_keyslot('y', Keyslot.CMACSDNAND, key)
|
||||
self.set_keyslot('y', Keyslot.DSiWareExport, key)
|
||||
|
||||
key_hash = sha256(key).digest()[0:16]
|
||||
hash_parts = unpack('<IIII', key_hash)
|
||||
self._id0 = pack('>IIII', *hash_parts)
|
||||
|
||||
def setup_sd_key_from_file(self, path: str):
|
||||
"""Set up the SD key from a movable.sed file."""
|
||||
with open(path, 'rb') as f:
|
||||
self.setup_sd_key(f.read(0x140))
|
||||
|
||||
|
||||
class _CryptoFileBase(BufferedIOBase):
|
||||
"""Base class for CTR and CBC IO classes."""
|
||||
|
||||
closed = False
|
||||
_reader: 'BinaryIO'
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
__del__ = close
|
||||
|
||||
@_raise_if_closed
|
||||
def flush(self):
|
||||
self._reader.flush()
|
||||
|
||||
@_raise_if_closed
|
||||
def tell(self) -> int:
|
||||
return self._reader.tell()
|
||||
|
||||
@_raise_if_closed
|
||||
def readable(self) -> bool:
|
||||
return self._reader.readable()
|
||||
|
||||
@_raise_if_closed
|
||||
def writable(self) -> bool:
|
||||
return self._reader.writable()
|
||||
|
||||
@_raise_if_closed
|
||||
def seekable(self) -> bool:
|
||||
return self._reader.seekable()
|
||||
|
||||
|
||||
class CTRFileIO(_CryptoFileBase):
|
||||
"""Provides transparent read-write AES-CTR encryption as a file-like object."""
|
||||
|
||||
def __init__(self, file: 'BinaryIO', crypto: 'CryptoEngine', keyslot: Keyslot, counter: int):
|
||||
self._reader = file
|
||||
self._crypto = crypto
|
||||
self._keyslot = keyslot
|
||||
self._counter = counter
|
||||
|
||||
def __repr__(self):
|
||||
return f'{type(self).__name__}(file={self._reader!r}, keyslot={self._keyslot:#04x}, counter={self._counter!r})'
|
||||
|
||||
@_raise_if_closed
|
||||
def read(self, size: int = -1) -> bytes:
|
||||
cur_offset = self.tell()
|
||||
data = self._reader.read(size)
|
||||
counter = self._counter + (cur_offset >> 4)
|
||||
cipher = self._crypto.create_ctr_cipher(self._keyslot, counter)
|
||||
# beginning padding
|
||||
cipher.decrypt(b'\0' * (cur_offset % 0x10))
|
||||
return cipher.decrypt(data)
|
||||
|
||||
read1 = read # probably make this act like read1 should, but this for now enables some other things to work
|
||||
|
||||
@_raise_if_closed
|
||||
def write(self, data: bytes) -> int:
|
||||
cur_offset = self.tell()
|
||||
counter = self._counter + (cur_offset >> 4)
|
||||
cipher = self._crypto.create_ctr_cipher(self._keyslot, counter)
|
||||
# beginning padding
|
||||
cipher.encrypt(b'\0' * (cur_offset % 0x10))
|
||||
return self._reader.write(cipher.encrypt(data))
|
||||
|
||||
@_raise_if_closed
|
||||
def seek(self, seek: int, whence: int = 0) -> int:
|
||||
# TODO: if the seek goes past the file, the data between the former EOF and seek point should also be encrypted.
|
||||
return self._reader.seek(seek, whence)
|
||||
|
||||
def truncate(self, size: 'Optional[int]' = None) -> int:
|
||||
return self._reader.truncate(size)
|
||||
|
||||
|
||||
class CBCFileIO(_CryptoFileBase):
|
||||
"""Provides transparent read-only AES-CBC encryption as a file-like object."""
|
||||
|
||||
def __init__(self, file: 'BinaryIO', crypto: 'CryptoEngine', keyslot: Keyslot, iv: bytes):
|
||||
self._reader = file
|
||||
self._crypto = crypto
|
||||
self._keyslot = keyslot
|
||||
self._iv = iv
|
||||
|
||||
def __repr__(self):
|
||||
return f'{type(self).__name__}(file={self._reader!r}, keyslot={self._keyslot:#04x}, iv={self._iv!r})'
|
||||
|
||||
@_raise_if_closed
|
||||
def read(self, size: int = -1):
|
||||
offset = self._reader.tell()
|
||||
|
||||
# if encrypted, the block needs to be decrypted first
|
||||
# CBC requires a full block (0x10 in this case). and the previous
|
||||
# block is used as the IV. so that's quite a bit to read if the
|
||||
# application requires just a few bytes.
|
||||
# thanks Stary2001 for help with random-access crypto
|
||||
|
||||
before = offset % 16
|
||||
if offset - before == 0:
|
||||
iv = self._iv
|
||||
else:
|
||||
# seek back one block to read it as iv
|
||||
self._reader.seek(-0x10 - before, 1)
|
||||
iv = self._reader.read(0x10)
|
||||
# this is done since we may not know the original size of the file
|
||||
# and the caller may have requested -1 to read all the remaining data
|
||||
data_before = self._reader.read(before)
|
||||
data_requested = self._reader.read(size)
|
||||
data_requested_len = len(data_requested)
|
||||
data_total_len = len(data_before) + data_requested_len
|
||||
if data_total_len % 16:
|
||||
data_after = self._reader.read(16 - (data_total_len % 16))
|
||||
self._reader.seek(-len(data_after), 1)
|
||||
else:
|
||||
data_after = b''
|
||||
cipher = self._crypto.create_cbc_cipher(self._keyslot, iv)
|
||||
# decrypt data, and cut off extra bytes
|
||||
return cipher.decrypt(
|
||||
b''.join((data_before, data_requested, data_after))
|
||||
)[before:data_requested_len + before]
|
||||
|
||||
read1 = read # probably make this act like read1 should, but this for now enables some other things to work
|
||||
|
||||
@_raise_if_closed
|
||||
def seek(self, seek: int, whence: int = 0):
|
||||
# even though read re-seeks to read required data, this allows the underlying object to handle seek how it wants
|
||||
return self._reader.seek(seek, whence)
|
||||
|
||||
@_raise_if_closed
|
||||
def writable(self) -> bool:
|
||||
return False
|
||||
107
pyctr/fileio.py
107
pyctr/fileio.py
@@ -1,107 +0,0 @@
|
||||
from io import BufferedIOBase
|
||||
from threading import Lock
|
||||
from weakref import WeakValueDictionary
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from .common import _raise_if_closed
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import BinaryIO
|
||||
|
||||
# this prevents two SubsectionIO instances on the same file object from interfering with eachother
|
||||
_lock_objects = WeakValueDictionary()
|
||||
|
||||
|
||||
class SubsectionIO(BufferedIOBase):
|
||||
"""Provides read-write access to a subsection of a file."""
|
||||
|
||||
closed = False
|
||||
_seek = 0
|
||||
|
||||
def __init__(self, file: 'BinaryIO', offset: int, size: int):
|
||||
# get existing Lock object for file, or create a new one
|
||||
file_id = id(file)
|
||||
try:
|
||||
self._lock = _lock_objects[file_id]
|
||||
except KeyError:
|
||||
self._lock = Lock()
|
||||
_lock_objects[file_id] = self._lock
|
||||
|
||||
self._reader = file
|
||||
self._offset = offset
|
||||
self._size = size
|
||||
# subsection end is stored for convenience
|
||||
self._end = offset + size
|
||||
|
||||
def __repr__(self):
|
||||
return f'{type(self).__name__}(file={self._reader!r}, offset={self._offset!r}, size={self._size!r})'
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
# remove Lock reference, so it can be automatically removed from the WeakValueDictionary once all SubsectionIO
|
||||
# instances for the base file are closed
|
||||
self._lock = None
|
||||
|
||||
__del__ = close
|
||||
|
||||
@_raise_if_closed
|
||||
def read(self, size: int = -1) -> bytes:
|
||||
if size == -1:
|
||||
size = self._size - self._seek
|
||||
if self._offset + self._seek > self._end:
|
||||
# if attempting to read after the section, return nothing
|
||||
return b''
|
||||
if self._seek + size > self._size:
|
||||
size = self._size - self._seek
|
||||
|
||||
with self._lock:
|
||||
self._reader.seek(self._seek + self._offset)
|
||||
data = self._reader.read(size)
|
||||
|
||||
self._seek += len(data)
|
||||
return data
|
||||
|
||||
@_raise_if_closed
|
||||
def seek(self, seek: int, whence: int = 0) -> int:
|
||||
if whence == 0:
|
||||
if seek < 0:
|
||||
raise ValueError(f'negative seek value {seek}')
|
||||
self._seek = min(seek, self._size)
|
||||
elif whence == 1:
|
||||
self._seek = max(self._seek + seek, 0)
|
||||
elif whence == 2:
|
||||
self._seek = max(self._size + seek, 0)
|
||||
else:
|
||||
if not isinstance(whence, int):
|
||||
raise TypeError(f'an integer is required (got type {type(whence).__name__}')
|
||||
raise ValueError(f'invalid whence ({seek}, should be 0, 1 or 2)')
|
||||
return self._seek
|
||||
|
||||
@_raise_if_closed
|
||||
def write(self, data: bytes) -> int:
|
||||
if self._seek > self._size:
|
||||
# attempting to write past subsection
|
||||
return 0
|
||||
data_len = len(data)
|
||||
data_end = data_len + self._seek
|
||||
if data_end > self._size:
|
||||
data = data[:-(data_end - self._size)]
|
||||
|
||||
with self._lock:
|
||||
self._reader.seek(self._seek + self._offset)
|
||||
data_written = self._reader.write(data)
|
||||
|
||||
self._seek += data_written
|
||||
return data_written
|
||||
|
||||
@_raise_if_closed
|
||||
def readable(self) -> bool:
|
||||
return self._reader.readable()
|
||||
|
||||
@_raise_if_closed
|
||||
def writable(self) -> bool:
|
||||
return self._reader.writable()
|
||||
|
||||
@_raise_if_closed
|
||||
def seekable(self) -> bool:
|
||||
return self._reader.seekable()
|
||||
@@ -1,12 +0,0 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
|
||||
class TitleReaderBase:
|
||||
|
||||
closed = False
|
||||
|
||||
|
||||
@@ -1,145 +0,0 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
from enum import IntEnum
|
||||
from threading import Lock
|
||||
from typing import TYPE_CHECKING, NamedTuple
|
||||
|
||||
from ..common import PyCTRError
|
||||
from ..fileio import SubsectionIO
|
||||
from ..type.ncch import NCCHReader
|
||||
from ..util import readle
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import BinaryIO, Dict, Union
|
||||
|
||||
CCI_MEDIA_UNIT = 0x200
|
||||
|
||||
|
||||
class CCIError(PyCTRError):
|
||||
"""Generic error for CCI operations."""
|
||||
|
||||
|
||||
class InvalidCCIError(CCIError):
|
||||
"""Invalid CCI header exception."""
|
||||
|
||||
|
||||
class CCISection(IntEnum):
|
||||
Header = -3
|
||||
CardInfo = -2
|
||||
DevInfo = -1
|
||||
|
||||
Application = 0
|
||||
Manual = 1
|
||||
DownloadPlayChild = 2
|
||||
Unk3 = 3
|
||||
Unk4 = 4
|
||||
Unk5 = 5
|
||||
UpdateOld3DS = 6
|
||||
UpdateNew3DS = 7
|
||||
|
||||
|
||||
class CCIRegion(NamedTuple):
|
||||
section: 'Union[int, CCISection]'
|
||||
offset: int
|
||||
size: int
|
||||
|
||||
|
||||
class CCIReader:
|
||||
"""Class for the 3DS CCI container."""
|
||||
|
||||
closed = False
|
||||
|
||||
def __init__(self, fp: 'Union[str, BinaryIO]', *, case_insensitive: bool = True, dev: bool = False,
|
||||
load_contents: bool = True, assume_decrypted: bool = False):
|
||||
if isinstance(fp, str):
|
||||
fp = open(fp, 'rb')
|
||||
|
||||
# store the starting offset so the CCI can be read from any point in the base file
|
||||
self._start = fp.tell()
|
||||
self._fp = fp
|
||||
# store case-insensitivity for RomFSReader
|
||||
self._case_insensitive = case_insensitive
|
||||
# threading lock
|
||||
self._lock = Lock()
|
||||
|
||||
# ignore the signature, we don't need it
|
||||
self._fp.seek(0x100, 1)
|
||||
header = fp.read(0x100)
|
||||
if header[0:4] != b'NCSD':
|
||||
raise InvalidCCIError('NCSD magic not found')
|
||||
|
||||
# make sure the Media ID is not 00, which is used for the NAND header
|
||||
self.media_id = header[0x8:0x10][::-1].hex()
|
||||
if self.media_id == '00' * 8:
|
||||
raise InvalidCCIError('Media ID is ' + self.media_id)
|
||||
|
||||
self.image_size = readle(header[4:8]) * CCI_MEDIA_UNIT
|
||||
|
||||
# this contains the location of each section
|
||||
self.sections: Dict[CCISection, CCIRegion] = {}
|
||||
|
||||
# this contains loaded sections
|
||||
self.contents: Dict[CCISection, NCCHReader] = {}
|
||||
|
||||
def add_region(section: 'CCISection', offset: int, size: int):
|
||||
region = CCIRegion(section=section, offset=offset, size=size)
|
||||
self.sections[section] = region
|
||||
|
||||
# add each part of the header
|
||||
add_region(CCISection.Header, 0, 0x200)
|
||||
add_region(CCISection.CardInfo, 0x200, 0x1000)
|
||||
add_region(CCISection.DevInfo, 0x1200, 0x300)
|
||||
|
||||
# use a CCISection value for section keys
|
||||
partition_sections = [x for x in CCISection if x >= 0]
|
||||
|
||||
part_raw = header[0x20:0x60]
|
||||
|
||||
# the first content always starts at 0x4000 but this code makes no assumptions about it
|
||||
for idx, info_offset in enumerate(range(0, 0x40, 0x8)):
|
||||
part_info = part_raw[info_offset:info_offset + 8]
|
||||
part_offset = int.from_bytes(part_info[0:4], 'little') * CCI_MEDIA_UNIT
|
||||
part_size = int.from_bytes(part_info[4:8], 'little') * CCI_MEDIA_UNIT
|
||||
if part_offset:
|
||||
section_id = partition_sections[idx]
|
||||
add_region(section_id, part_offset, part_size)
|
||||
|
||||
if load_contents:
|
||||
content_fp = self.open_raw_section(section_id)
|
||||
self.contents[section_id] = NCCHReader(content_fp, case_insensitive=case_insensitive, dev=dev,
|
||||
assume_decrypted=assume_decrypted)
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
try:
|
||||
self._fp.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.close()
|
||||
|
||||
__del__ = close
|
||||
|
||||
def __repr__(self):
|
||||
info = [('media_id', self.media_id)]
|
||||
try:
|
||||
info.append(('title_name',
|
||||
repr(self.contents[CCISection.Application].exefs.icon.get_app_title().short_desc)))
|
||||
except KeyError:
|
||||
info.append(('title_name', 'unknown'))
|
||||
info.append(('partition_count', len(self.contents)))
|
||||
info_final = " ".join(x + ": " + str(y) for x, y in info)
|
||||
return f'<{type(self).__name__} {info_final}>'
|
||||
|
||||
def open_raw_section(self, section: 'CCISection'):
|
||||
"""Open a raw CCI section for reading."""
|
||||
region = self.sections[section]
|
||||
return SubsectionIO(self._fp, self._start + region.offset, region.size)
|
||||
@@ -1,206 +0,0 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
from enum import IntEnum
|
||||
from io import BytesIO
|
||||
from threading import Lock
|
||||
from typing import TYPE_CHECKING, NamedTuple
|
||||
|
||||
from ..common import PyCTRError
|
||||
from ..crypto import CryptoEngine, Keyslot
|
||||
from ..fileio import SubsectionIO
|
||||
from ..type.ncch import NCCHReader
|
||||
from ..type.tmd import TitleMetadataReader
|
||||
from ..util import readle, roundup
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import BinaryIO, Dict, Optional, Union
|
||||
|
||||
ALIGN_SIZE = 64
|
||||
|
||||
|
||||
class CIAError(PyCTRError):
|
||||
"""Generic error for CIA operations."""
|
||||
|
||||
|
||||
class InvalidCIAError(CIAError):
|
||||
"""Invalid CIA header exception."""
|
||||
|
||||
|
||||
class CIASection(IntEnum):
|
||||
# these values as negative, as positive ones are used for contents
|
||||
ArchiveHeader = -4
|
||||
CertificateChain = -3
|
||||
Ticket = -2
|
||||
TitleMetadata = -1
|
||||
Application = 0
|
||||
Manual = 1
|
||||
DownloadPlayChild = 2
|
||||
Meta = -5
|
||||
|
||||
|
||||
class CIARegion(NamedTuple):
|
||||
section: 'Union[int, CIASection]'
|
||||
offset: int
|
||||
size: int
|
||||
iv: bytes # only used for encrypted sections
|
||||
|
||||
|
||||
class CIAReader:
|
||||
"""Class for the 3DS CIA container."""
|
||||
|
||||
closed = False
|
||||
|
||||
def __init__(self, fp: 'Union[str, BinaryIO]', *, case_insensitive: bool = True, crypto: CryptoEngine = None,
|
||||
dev: bool = False, seeddb: str = None, load_contents: bool = True):
|
||||
if isinstance(fp, str):
|
||||
fp = open(fp, 'rb')
|
||||
|
||||
if crypto:
|
||||
self._crypto = crypto
|
||||
else:
|
||||
self._crypto = CryptoEngine(dev=dev)
|
||||
|
||||
# store the starting offset so the CIA can be read from any point in the base file
|
||||
self._start = fp.tell()
|
||||
self._fp = fp
|
||||
# store case-insensitivity for RomFSReader
|
||||
self._case_insensitive = case_insensitive
|
||||
# threading lock
|
||||
self._lock = Lock()
|
||||
|
||||
header = fp.read(0x20)
|
||||
|
||||
archive_header_size = readle(header[0x0:0x4])
|
||||
if archive_header_size != 0x2020:
|
||||
raise InvalidCIAError('Archive Header Size is not 0x2020')
|
||||
# in practice, the certificate chain is the same for all retail titles
|
||||
cert_chain_size = readle(header[0x8:0xC])
|
||||
# the ticket size usually never changes from 0x350
|
||||
# there is one ticket (without an associated title) that is smaller though
|
||||
ticket_size = readle(header[0xC:0x10])
|
||||
# tmd contains info about the contents of the title
|
||||
tmd_size = readle(header[0x10:0x14])
|
||||
# meta contains info such as the SMDH and Title ID dependency list
|
||||
meta_size = readle(header[0x14:0x18])
|
||||
# content size is the total size of the contents
|
||||
# I'm not sure what happens yet if one of the contents is not aligned to 0x40 bytes.
|
||||
content_size = readle(header[0x18:0x20])
|
||||
# the content index determines what contents are in the CIA
|
||||
# this is not stored as int, so it's faster to parse(?)
|
||||
content_index = fp.read(archive_header_size - 0x20)
|
||||
|
||||
active_contents = set()
|
||||
for idx, b in enumerate(content_index):
|
||||
offset = idx * 8
|
||||
curr = b
|
||||
for x in range(7, -1, -1):
|
||||
if curr & 1:
|
||||
active_contents.add(x + offset)
|
||||
curr >>= 1
|
||||
|
||||
# the header only stores sizes; offsets need to be calculated.
|
||||
# the sections are aligned to 64(0x40) bytes. for example, if something is 0x78,
|
||||
# it will take up 0x80, with the remaining 0x8 being padding.
|
||||
cert_chain_offset = roundup(archive_header_size, ALIGN_SIZE)
|
||||
ticket_offset = cert_chain_offset + roundup(cert_chain_size, ALIGN_SIZE)
|
||||
tmd_offset = ticket_offset + roundup(ticket_size, ALIGN_SIZE)
|
||||
content_offset = tmd_offset + roundup(tmd_size, ALIGN_SIZE)
|
||||
meta_offset = content_offset + roundup(content_size, ALIGN_SIZE)
|
||||
|
||||
# lazy method to get the total size
|
||||
self.total_size = meta_offset + meta_size
|
||||
|
||||
# this contains the location of each section, as well as the IV of encrypted ones
|
||||
self.sections: Dict[Union[int, CIASection], CIARegion] = {}
|
||||
|
||||
def add_region(section: 'Union[int, CIASection]', offset: int, size: int, iv: 'Optional[bytes]'):
|
||||
region = CIARegion(section=section, offset=offset, size=size, iv=iv)
|
||||
self.sections[section] = region
|
||||
|
||||
# add each part of the header
|
||||
add_region(CIASection.ArchiveHeader, 0, archive_header_size, None)
|
||||
add_region(CIASection.CertificateChain, cert_chain_offset, cert_chain_size, None)
|
||||
add_region(CIASection.Ticket, ticket_offset, ticket_size, None)
|
||||
add_region(CIASection.TitleMetadata, tmd_offset, tmd_size, None)
|
||||
if meta_size:
|
||||
add_region(CIASection.Meta, meta_offset, meta_size, None)
|
||||
|
||||
# this will load the titlekey to decrypt the contents
|
||||
self._fp.seek(self._start + ticket_offset)
|
||||
ticket = self._fp.read(ticket_size)
|
||||
self._crypto.load_from_ticket(ticket)
|
||||
|
||||
# the tmd describes the contents: ID, index, size, and hash
|
||||
self._fp.seek(self._start + tmd_offset)
|
||||
tmd_data = self._fp.read(tmd_size)
|
||||
self.tmd = TitleMetadataReader.load(BytesIO(tmd_data))
|
||||
|
||||
active_contents_tmd = set()
|
||||
self.content_info = []
|
||||
|
||||
# this does a first check to make sure there are no missing contents that are marked active in content_index
|
||||
for record in self.tmd.chunk_records:
|
||||
if record.cindex in active_contents:
|
||||
active_contents_tmd.add(record.cindex)
|
||||
self.content_info.append(record)
|
||||
|
||||
# if the result of this is not an empty set, it means there are contents enabled in content_index
|
||||
# that are not in the tmd, which is bad
|
||||
if active_contents ^ active_contents_tmd:
|
||||
raise InvalidCIAError('Missing active contents in the TMD')
|
||||
|
||||
self.contents = {}
|
||||
|
||||
# this goes through the contents and figures out their regions, then creates an NCCHReader
|
||||
curr_offset = content_offset
|
||||
for record in self.content_info:
|
||||
iv = None
|
||||
if record.type.encrypted:
|
||||
iv = record.cindex.to_bytes(2, 'big') + (b'\0' * 14)
|
||||
add_region(record.cindex, curr_offset, record.size, iv)
|
||||
if load_contents:
|
||||
# check if the content is a Nintendo DS ROM (SRL) first
|
||||
is_srl = record.cindex == 0 and self.tmd.title_id[3:5] == '48'
|
||||
if not is_srl:
|
||||
content_fp = self.open_raw_section(record.cindex)
|
||||
self.contents[record.cindex] = NCCHReader(content_fp, case_insensitive=case_insensitive,
|
||||
dev=dev, seeddb=seeddb)
|
||||
|
||||
curr_offset += record.size
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
try:
|
||||
self._fp.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.close()
|
||||
|
||||
__del__ = close
|
||||
|
||||
def __repr__(self):
|
||||
info = [('title_id', self.tmd.title_id)]
|
||||
try:
|
||||
info.append(('title_name', repr(self.contents[0].exefs.icon.get_app_title().short_desc)))
|
||||
except KeyError:
|
||||
info.append(('title_name', 'unknown'))
|
||||
info.append(('content_count', len(self.contents)))
|
||||
info_final = " ".join(x + ": " + str(y) for x, y in info)
|
||||
return f'<{type(self).__name__} {info_final}>'
|
||||
|
||||
def open_raw_section(self, section: 'CIASection'):
|
||||
"""Open a raw CIA section for reading."""
|
||||
region = self.sections[section]
|
||||
fh = SubsectionIO(self._fp, self._start + region.offset, region.size)
|
||||
if region.iv:
|
||||
fh = self._crypto.create_cbc_io(Keyslot.DecryptedTitlekey, fh, region.iv)
|
||||
return fh
|
||||
@@ -1,316 +0,0 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
from hashlib import sha256
|
||||
from threading import Lock
|
||||
from typing import TYPE_CHECKING, NamedTuple
|
||||
|
||||
from ..common import PyCTRError, _ReaderOpenFileBase
|
||||
from ..fileio import SubsectionIO
|
||||
from ..util import readle
|
||||
from ..type.smdh import SMDH, InvalidSMDHError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import BinaryIO, Dict, Union
|
||||
|
||||
__all__ = ['EXEFS_EMPTY_ENTRY', 'EXEFS_ENTRY_SIZE', 'EXEFS_ENTRY_COUNT', 'EXEFS_HEADER_SIZE', 'ExeFSError',
|
||||
'ExeFSFileNotFoundError', 'InvalidExeFSError', 'ExeFSNameError', 'BadOffsetError', 'CodeDecompressionError',
|
||||
'decompress_code', 'ExeFSReader']
|
||||
|
||||
EXEFS_ENTRY_SIZE = 0x10
|
||||
EXEFS_ENTRY_COUNT = 10
|
||||
EXEFS_EMPTY_ENTRY = b'\0' * EXEFS_ENTRY_SIZE
|
||||
EXEFS_HEADER_SIZE = 0x200
|
||||
|
||||
CODE_DECOMPRESSED_NAME = '.code-decompressed'
|
||||
|
||||
|
||||
class ExeFSError(PyCTRError):
|
||||
"""Generic exception for ExeFS operations."""
|
||||
|
||||
|
||||
class ExeFSFileNotFoundError(ExeFSError):
|
||||
"""File not found in the ExeFS."""
|
||||
|
||||
|
||||
class InvalidExeFSError(ExeFSError):
|
||||
"""Invalid ExeFS header."""
|
||||
|
||||
|
||||
class ExeFSNameError(InvalidExeFSError):
|
||||
"""Name could not be decoded, likely making the file not a valid ExeFS."""
|
||||
|
||||
def __str__(self):
|
||||
return f'could not decode from ascii: {self.args[0]!r}'
|
||||
|
||||
|
||||
class BadOffsetError(InvalidExeFSError):
|
||||
"""Offset is not a multiple of 0x200. This kind of ExeFS will not work on a 3DS."""
|
||||
|
||||
def __str__(self):
|
||||
return f'offset is not a multiple of 0x200: {self.args[0]:#x}'
|
||||
|
||||
|
||||
class CodeDecompressionError(ExeFSError):
|
||||
"""Exception when attempting to decompress ExeFS .code."""
|
||||
|
||||
|
||||
# lazy check
|
||||
CODE_MAX_SIZE = 0x2300000
|
||||
|
||||
|
||||
def decompress_code(code: bytes) -> bytes:
|
||||
# remade from C code, this could probably be done better
|
||||
# https://github.com/d0k3/GodMode9/blob/689f6f7cf4280bf15885cbbf848d8dce81def36b/arm9/source/game/codelzss.c#L25-L93
|
||||
off_size_comp = int.from_bytes(code[-8:-4], 'little')
|
||||
add_size = int.from_bytes(code[-4:], 'little')
|
||||
comp_start = 0
|
||||
code_len = len(code)
|
||||
|
||||
code_comp_size = off_size_comp & 0xFFFFFF
|
||||
code_comp_end = code_comp_size - ((off_size_comp >> 24) % 0xFF)
|
||||
code_dec_size = code_len + add_size
|
||||
|
||||
if code_len < 8:
|
||||
raise CodeDecompressionError('code_len < 8')
|
||||
if code_len > CODE_MAX_SIZE:
|
||||
raise CodeDecompressionError('code_len > CODE_MAX_SIZE')
|
||||
|
||||
if code_comp_size <= code_len:
|
||||
comp_start = code_len - code_comp_size
|
||||
|
||||
if code_comp_end < 0:
|
||||
raise CodeDecompressionError('code_comp_end < 0')
|
||||
if code_dec_size > CODE_MAX_SIZE:
|
||||
raise CodeDecompressionError('code_dec_size > CODE_MAX_SIZE')
|
||||
|
||||
dec = bytearray(code)
|
||||
dec.extend(b'\0' * add_size)
|
||||
|
||||
data_end = comp_start + code_dec_size
|
||||
ptr_in = comp_start + code_comp_end
|
||||
ptr_out = code_dec_size
|
||||
|
||||
while ptr_in > comp_start and ptr_out > comp_start:
|
||||
if ptr_out < ptr_in:
|
||||
raise CodeDecompressionError('ptr_out < ptr_in')
|
||||
|
||||
ptr_in -= 1
|
||||
ctrl_byte = dec[ptr_in]
|
||||
for i in range(7, -1, -1):
|
||||
if ptr_in <= comp_start or ptr_out <= comp_start:
|
||||
break
|
||||
|
||||
if (ctrl_byte >> i) & 1:
|
||||
ptr_in -= 2
|
||||
seg_code = int.from_bytes(dec[ptr_in:ptr_in + 2], 'little')
|
||||
if ptr_in < comp_start:
|
||||
raise CodeDecompressionError('ptr_in < comp_start')
|
||||
seg_off = (seg_code & 0x0FFF) + 2
|
||||
seg_len = ((seg_code >> 12) & 0xF) + 3
|
||||
|
||||
if ptr_out - seg_len < comp_start:
|
||||
raise CodeDecompressionError('ptr_out - seg_len < comp_start')
|
||||
if ptr_out + seg_off >= data_end:
|
||||
raise CodeDecompressionError('ptr_out + seg_off >= data_end')
|
||||
|
||||
c = 0
|
||||
while c < seg_len:
|
||||
byte = dec[ptr_out + seg_off]
|
||||
ptr_out -= 1
|
||||
dec[ptr_out] = byte
|
||||
c += 1
|
||||
else:
|
||||
if ptr_out == comp_start:
|
||||
raise CodeDecompressionError('ptr_out == comp_start')
|
||||
if ptr_in == comp_start:
|
||||
raise CodeDecompressionError('ptr_in == comp_start')
|
||||
|
||||
ptr_out -= 1
|
||||
ptr_in -= 1
|
||||
dec[ptr_out] = dec[ptr_in]
|
||||
|
||||
if ptr_in != comp_start:
|
||||
raise CodeDecompressionError('ptr_in != comp_start')
|
||||
if ptr_out != comp_start:
|
||||
raise CodeDecompressionError('ptr_out != comp_start')
|
||||
|
||||
return bytes(dec)
|
||||
|
||||
|
||||
class ExeFSEntry(NamedTuple):
|
||||
name: str
|
||||
offset: int
|
||||
size: int
|
||||
hash: bytes
|
||||
|
||||
|
||||
def _normalize_path(p: str):
|
||||
"""Fix a given path to work with ExeFS filenames."""
|
||||
if p.startswith('/'):
|
||||
p = p[1:]
|
||||
# while it is technically possible for an ExeFS entry to contain ".bin",
|
||||
# this would not happen in practice.
|
||||
# even so, normalization can be disabled by passing normalize=False to
|
||||
# ExeFSReader.open
|
||||
if p.lower().endswith('.bin'):
|
||||
p = p[:4]
|
||||
return p
|
||||
|
||||
|
||||
class _ExeFSOpenFile(_ReaderOpenFileBase):
|
||||
"""Class for open ExeFS file entries."""
|
||||
|
||||
def __init__(self, reader: 'ExeFSReader', path: str):
|
||||
super().__init__(reader, path)
|
||||
self._info = reader.entries[self._path]
|
||||
|
||||
|
||||
class ExeFSReader:
|
||||
"""
|
||||
Class to read the 3DS ExeFS container.
|
||||
|
||||
http://3dbrew.org/wiki/ExeFS
|
||||
"""
|
||||
|
||||
closed = False
|
||||
_code_dec = None
|
||||
icon: 'SMDH' = None
|
||||
|
||||
def __init__(self, fp: 'Union[str, BinaryIO]', *, closefd: bool = True, _load_icon: bool = True):
|
||||
if isinstance(fp, str):
|
||||
fp = open(fp, 'rb')
|
||||
|
||||
# storing the starting offset lets it work from anywhere in the file
|
||||
self._start = fp.tell()
|
||||
self._fp = fp
|
||||
self._lock = Lock()
|
||||
self._closefd = closefd
|
||||
|
||||
self.entries: 'Dict[str, ExeFSEntry]' = {}
|
||||
|
||||
header = fp.read(EXEFS_HEADER_SIZE)
|
||||
|
||||
# ExeFS entries can fit up to 10 names. hashes are stored in reverse order
|
||||
# (e.g. the first entry would have the hash at the very end - 0x1E0)
|
||||
for entry_n, hash_n in zip(range(0, EXEFS_ENTRY_COUNT * EXEFS_ENTRY_SIZE, EXEFS_ENTRY_SIZE),
|
||||
range(0x1E0, 0xA0, -0x20)):
|
||||
entry_raw = header[entry_n:entry_n + 0x10]
|
||||
entry_hash = header[hash_n:hash_n + 0x20]
|
||||
if entry_raw == EXEFS_EMPTY_ENTRY:
|
||||
continue
|
||||
|
||||
try:
|
||||
# ascii is used since only a-z would be used in practice
|
||||
name = entry_raw[0:8].rstrip(b'\0').decode('ascii')
|
||||
except UnicodeDecodeError:
|
||||
raise ExeFSNameError(entry_raw[0:8])
|
||||
|
||||
entry = ExeFSEntry(name=name,
|
||||
offset=readle(entry_raw[8:12]),
|
||||
size=readle(entry_raw[12:16]),
|
||||
hash=entry_hash)
|
||||
|
||||
# the 3DS fails to parse an ExeFS with an offset that isn't a multiple of 0x200
|
||||
# so we should do the same here
|
||||
if entry.offset % 0x200:
|
||||
raise BadOffsetError(entry.offset)
|
||||
|
||||
self.entries[name] = entry
|
||||
|
||||
# this sometimes needs to be loaded outside, since reading it here may cause encryption problems
|
||||
# when the NCCH has not fully initialized yet and needs to figure out what ExeFS regions need
|
||||
# to be decrypted with the Original NCCH key
|
||||
if _load_icon:
|
||||
self._load_icon()
|
||||
|
||||
def _load_icon(self):
|
||||
try:
|
||||
with self.open('icon') as f:
|
||||
self.icon = SMDH.load(f)
|
||||
except (ExeFSFileNotFoundError, InvalidSMDHError):
|
||||
pass
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""Return the amount of entries in the ExeFS."""
|
||||
return len(self.entries)
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
if self._closefd:
|
||||
try:
|
||||
self._fp.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
__del__ = close
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.close()
|
||||
|
||||
def open(self, path: str, *, normalize: bool = True):
|
||||
"""Open a file in the ExeFS for reading."""
|
||||
if normalize:
|
||||
# remove beginning "/" and ending ".bin"
|
||||
path = _normalize_path(path)
|
||||
try:
|
||||
entry = self.entries[path]
|
||||
except KeyError:
|
||||
raise ExeFSFileNotFoundError(path)
|
||||
if entry.offset == -1:
|
||||
# this would be the decompressed .code, if the original .code was compressed
|
||||
return _ExeFSOpenFile(self, path)
|
||||
else:
|
||||
return SubsectionIO(self._fp, self._start + EXEFS_HEADER_SIZE + entry.offset, entry.size)
|
||||
|
||||
def get_data(self, info: ExeFSEntry, offset: int, size: int) -> bytes:
|
||||
if offset + size > info.size:
|
||||
size = info.size - offset
|
||||
with self._lock:
|
||||
if info.offset == -1:
|
||||
# return the decompressed code instead
|
||||
return self._code_dec[offset:offset + size]
|
||||
else:
|
||||
# data for ExeFS entries start relative to the end of the header
|
||||
self._fp.seek(self._start + EXEFS_HEADER_SIZE + info.offset + offset)
|
||||
return self._fp.read(size)
|
||||
|
||||
def decompress_code(self) -> bool:
|
||||
"""
|
||||
Decompress '.code' in the container. The result will be available as '.code-decompressed'.
|
||||
|
||||
The return value is if '.code' was actually decompressed.
|
||||
"""
|
||||
with self.open('.code') as f:
|
||||
code = f.read()
|
||||
|
||||
# if it's already decompressed, this would return the code unmodified
|
||||
code_dec = decompress_code(code)
|
||||
|
||||
decompressed = code_dec != code
|
||||
|
||||
if decompressed:
|
||||
code_dec_hash = sha256(code_dec)
|
||||
entry = ExeFSEntry(name=CODE_DECOMPRESSED_NAME,
|
||||
offset=-1,
|
||||
size=len(code_dec),
|
||||
hash=code_dec_hash.digest())
|
||||
self._code_dec = code_dec
|
||||
else:
|
||||
# if the code was already decompressed, don't store a second copy in memory
|
||||
code_entry = self.entries['.code']
|
||||
entry = ExeFSEntry(name=CODE_DECOMPRESSED_NAME,
|
||||
offset=code_entry.offset,
|
||||
size=code_entry.size,
|
||||
hash=code_entry.hash)
|
||||
|
||||
self.entries[CODE_DECOMPRESSED_NAME] = entry
|
||||
|
||||
# returns if the code was actually decompressed or not
|
||||
return decompressed
|
||||
@@ -1,538 +0,0 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
from hashlib import sha256
|
||||
from enum import IntEnum
|
||||
from math import ceil
|
||||
from os import environ
|
||||
from os.path import join as pjoin
|
||||
from threading import Lock
|
||||
from typing import TYPE_CHECKING, NamedTuple
|
||||
|
||||
from .exefs import ExeFSReader, EXEFS_HEADER_SIZE
|
||||
from .romfs import RomFSReader
|
||||
from ..common import PyCTRError, _ReaderOpenFileBase
|
||||
from ..crypto import CryptoEngine, Keyslot
|
||||
from ..fileio import SubsectionIO
|
||||
from ..util import config_dirs, readle, roundup
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import BinaryIO, Dict, List, Optional, Tuple, Union
|
||||
|
||||
__all__ = ['NCCH_MEDIA_UNIT', 'NO_ENCRYPTION', 'EXEFS_NORMAL_CRYPTO_FILES', 'FIXED_SYSTEM_KEY', 'NCCHError',
|
||||
'InvalidNCCHError', 'NCCHSeedError', 'MissingSeedError', 'SeedDBNotFoundError', 'get_seed',
|
||||
'extra_cryptoflags', 'NCCHSection', 'NCCHRegion', 'NCCHFlags', 'NCCHReader']
|
||||
|
||||
|
||||
class NCCHError(PyCTRError):
|
||||
"""Generic exception for NCCH operations."""
|
||||
|
||||
|
||||
class InvalidNCCHError(NCCHError):
|
||||
"""Invalid NCCH header exception."""
|
||||
|
||||
|
||||
class NCCHSeedError(NCCHError):
|
||||
"""NCCH seed is not set up, or attempted to set up seed when seed crypto is not used."""
|
||||
|
||||
|
||||
class MissingSeedError(NCCHSeedError):
|
||||
"""Seed could not be found."""
|
||||
|
||||
|
||||
class SeedDBNotFoundError(NCCHSeedError):
|
||||
"""SeedDB was not found. Main argument is a tuple of checked paths."""
|
||||
|
||||
|
||||
def get_seed(f: 'BinaryIO', program_id: int) -> bytes:
|
||||
"""Get a seed in a seeddb.bin from an I/O stream."""
|
||||
# convert the Program ID to little-endian bytes, as the TID is stored in seeddb.bin this way
|
||||
tid_bytes = program_id.to_bytes(0x8, 'little')
|
||||
f.seek(0)
|
||||
# get the amount of seeds
|
||||
seed_count = readle(f.read(4))
|
||||
f.seek(0x10)
|
||||
for _ in range(seed_count):
|
||||
entry = f.read(0x20)
|
||||
if entry[0:8] == tid_bytes:
|
||||
return entry[0x8:0x18]
|
||||
raise NCCHSeedError(f'missing seed for {program_id:016X} from seeddb.bin')
|
||||
|
||||
|
||||
seeddb_paths = [pjoin(x, 'seeddb.bin') for x in config_dirs]
|
||||
try:
|
||||
# try to insert the path in the SEEDDB_PATH environment variable
|
||||
seeddb_paths.insert(0, environ['SEEDDB_PATH'])
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# NCCH sections are stored in media units
|
||||
# for example, ExeFS may be stored in 13 media units, which is 0x1A00 bytes (13 * 0x200)
|
||||
NCCH_MEDIA_UNIT = 0x200
|
||||
# depending on the crypto_method flag, a different keyslot may be used for RomFS and parts of ExeFS.
|
||||
extra_cryptoflags = {0x00: Keyslot.NCCH, 0x01: Keyslot.NCCH70, 0x0A: Keyslot.NCCH93, 0x0B: Keyslot.NCCH96}
|
||||
|
||||
# if fixed_crypto_key is enabled, the normal key is normally all zeros.
|
||||
# however is (program_id & (0x10 << 32)) is true, this key is used instead.
|
||||
FIXED_SYSTEM_KEY = 0x527CE630A9CA305F3696F3CDE954194B
|
||||
|
||||
|
||||
# this is IntEnum to make generating the IV easier
|
||||
class NCCHSection(IntEnum):
|
||||
ExtendedHeader = 1
|
||||
ExeFS = 2
|
||||
RomFS = 3
|
||||
|
||||
# no crypto
|
||||
Header = 4
|
||||
Logo = 5
|
||||
Plain = 6
|
||||
|
||||
# special
|
||||
FullDecrypted = 7
|
||||
Raw = 8
|
||||
|
||||
|
||||
# these sections don't use encryption at all
|
||||
NO_ENCRYPTION = {NCCHSection.Header, NCCHSection.Logo, NCCHSection.Plain, NCCHSection.Raw}
|
||||
# the contents of these files in the ExeFS, plus the header, will always use the Original NCCH keyslot
|
||||
# therefore these regions need to be stored to check what keyslot is used to decrypt
|
||||
EXEFS_NORMAL_CRYPTO_FILES = {'icon', 'banner'}
|
||||
|
||||
|
||||
class NCCHRegion(NamedTuple):
|
||||
section: 'NCCHSection'
|
||||
offset: int
|
||||
size: int
|
||||
end: int # this is just offset + size, stored to avoid re-calculation later on
|
||||
# not all sections will actually use this (see NCCHSection), so some have a useless value
|
||||
iv: int
|
||||
|
||||
|
||||
class NCCHFlags(NamedTuple):
|
||||
# determines the extra keyslot used for RomFS and parts of ExeFS
|
||||
crypto_method: int
|
||||
# if this is a CXI (CTR Executable Image) or CFA (CTR File Archive)
|
||||
# in the raw flags, "Data" has to be set for it to be a CFA, while "Executable" is unset.
|
||||
executable: bool
|
||||
# if the content is encrypted using a fixed normal key.
|
||||
fixed_crypto_key: bool
|
||||
# if RomFS is to be ignored
|
||||
no_romfs: bool
|
||||
# if the NCCH has no encryption
|
||||
no_crypto: bool
|
||||
# if a seed must be loaded to load RomFS and parts of ExeFS
|
||||
uses_seed: bool
|
||||
|
||||
|
||||
class _NCCHSectionFile(_ReaderOpenFileBase):
|
||||
"""Provides a raw, decrypted NCCH section as a file-like object."""
|
||||
|
||||
def __init__(self, reader: 'NCCHReader', path: 'NCCHSection'):
|
||||
super().__init__(reader, path)
|
||||
self._info = reader.sections[path]
|
||||
|
||||
|
||||
class NCCHReader:
|
||||
"""Class for 3DS NCCH container."""
|
||||
|
||||
seed_set_up = False
|
||||
seed: 'Optional[bytes]' = None
|
||||
# this is the KeyY when generated using the seed
|
||||
_seeded_key_y = None
|
||||
closed = False
|
||||
|
||||
# this lists the ranges of the ExeFS to decrypt with Original NCCH (see load_sections)
|
||||
_exefs_keyslot_normal_range: 'List[Tuple[int, int]]'
|
||||
exefs: 'Optional[ExeFSReader]' = None
|
||||
romfs: 'Optional[RomFSReader]' = None
|
||||
|
||||
def __init__(self, fp: 'Union[str, BinaryIO]', *, case_insensitive: bool = True, crypto: CryptoEngine = None,
|
||||
dev: bool = False, seeddb: str = None, load_sections: bool = True, assume_decrypted: bool = False):
|
||||
if isinstance(fp, str):
|
||||
fp = open(fp, 'rb')
|
||||
|
||||
if crypto:
|
||||
self._crypto = crypto
|
||||
else:
|
||||
self._crypto = CryptoEngine(dev=dev)
|
||||
|
||||
# old decryption methods did not fix the flags, so sometimes we have to assume it is decrypted
|
||||
self.assume_decrypted = assume_decrypted
|
||||
|
||||
# store the starting offset so the NCCH can be read from any point in the base file
|
||||
self._start = fp.tell()
|
||||
self._fp = fp
|
||||
# store case-insensitivity for RomFSReader
|
||||
self._case_insensitive = case_insensitive
|
||||
# threaing lock
|
||||
self._lock = Lock()
|
||||
|
||||
header = fp.read(0x200)
|
||||
|
||||
# load the Key Y from the first 0x10 of the signature
|
||||
self._key_y = header[0x0:0x10]
|
||||
# store the ncch version
|
||||
self.version = readle(header[0x112:0x114])
|
||||
# get the total size of the NCCH container, and store it in bytes
|
||||
self.content_size = readle(header[0x104:0x108]) * NCCH_MEDIA_UNIT
|
||||
# get the Partition ID, which is used in the encryption
|
||||
# this is generally different for each content in a title, except for DLC
|
||||
self.partition_id = readle(header[0x108:0x110])
|
||||
# load the seed verify field, which is part of an sha256 hash to verify if
|
||||
# a seed is correct for this title
|
||||
self._seed_verify = header[0x114:0x118]
|
||||
# load the Product Code store it as a unicode string
|
||||
self.product_code = header[0x150:0x160].decode('ascii').strip('\0')
|
||||
# load the Program ID
|
||||
# this is the Title ID, and
|
||||
self.program_id = readle(header[0x118:0x120])
|
||||
# load the extheader size, but this code only uses it to determine if it exists
|
||||
extheader_size = readle(header[0x180:0x184])
|
||||
|
||||
# each section is stored with the section ID, then the region information (offset, size, IV)
|
||||
self.sections: 'Dict[NCCHSection, NCCHRegion]' = {}
|
||||
# same as above, but includes non-existant regions too, for the full-decrypted handler
|
||||
self._all_sections: 'Dict[NCCHSection, NCCHRegion]' = {}
|
||||
|
||||
def add_region(section: 'NCCHSection', starting_unit: int, units: int):
|
||||
offset = starting_unit * NCCH_MEDIA_UNIT
|
||||
size = units * NCCH_MEDIA_UNIT
|
||||
region = NCCHRegion(section=section,
|
||||
offset=offset,
|
||||
size=size,
|
||||
end=offset + size,
|
||||
iv=self.partition_id << 64 | (section << 56))
|
||||
self._all_sections[section] = region
|
||||
if units != 0: # only add existing regions
|
||||
self.sections[section] = region
|
||||
|
||||
# add the header as the first region
|
||||
add_region(NCCHSection.Header, 0, 1)
|
||||
|
||||
# add the full decrypted content, which when read, simulates a fully decrypted NCCH container
|
||||
add_region(NCCHSection.FullDecrypted, 0, self.content_size // NCCH_MEDIA_UNIT)
|
||||
# add the full raw content
|
||||
add_region(NCCHSection.Raw, 0, self.content_size // NCCH_MEDIA_UNIT)
|
||||
|
||||
# only care about the exheader if it's the expected size
|
||||
if extheader_size == 0x400:
|
||||
add_region(NCCHSection.ExtendedHeader, 1, 4)
|
||||
else:
|
||||
add_region(NCCHSection.ExtendedHeader, 0, 0)
|
||||
|
||||
# add the remaining NCCH regions
|
||||
# some of these may not exist, and won't be added if units (second value) is 0
|
||||
add_region(NCCHSection.Logo, readle(header[0x198:0x19C]), readle(header[0x19C:0x1A0]))
|
||||
add_region(NCCHSection.Plain, readle(header[0x190:0x194]), readle(header[0x194:0x198]))
|
||||
add_region(NCCHSection.ExeFS, readle(header[0x1A0:0x1A4]), readle(header[0x1A4:0x1A8]))
|
||||
add_region(NCCHSection.RomFS, readle(header[0x1B0:0x1B4]), readle(header[0x1B4:0x1B8]))
|
||||
|
||||
# parse flags
|
||||
flags_raw = header[0x188:0x190]
|
||||
self.flags = NCCHFlags(crypto_method=flags_raw[3], executable=bool(flags_raw[5] & 0x2),
|
||||
fixed_crypto_key=bool(flags_raw[7] & 0x1), no_romfs=bool(flags_raw[7] & 0x2),
|
||||
no_crypto=bool(flags_raw[7] & 0x4), uses_seed=bool(flags_raw[7] & 0x20))
|
||||
|
||||
# load the original (non-seeded) KeyY into the Original NCCH slot
|
||||
self._crypto.set_keyslot('y', Keyslot.NCCH, self.get_key_y(original=True))
|
||||
|
||||
# load the seed if needed
|
||||
if self.flags.uses_seed:
|
||||
self.load_seed_from_seeddb(seeddb)
|
||||
|
||||
# load the (seeded, if needed) key into the extra keyslot
|
||||
self._crypto.set_keyslot('y', self.extra_keyslot, self.get_key_y())
|
||||
|
||||
# load the sections using their specific readers
|
||||
if load_sections:
|
||||
self.load_sections()
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
try:
|
||||
self._fp.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.close()
|
||||
|
||||
__del__ = close
|
||||
|
||||
def load_sections(self):
|
||||
"""Load the sections of the NCCH (Extended Header, ExeFS, and RomFS)."""
|
||||
|
||||
# try to load the ExeFS
|
||||
try:
|
||||
self._fp.seek(self._start + self.sections[NCCHSection.ExeFS].offset)
|
||||
except KeyError:
|
||||
pass # no ExeFS
|
||||
else:
|
||||
# this is to generate what regions should be decrypted with the Original NCCH keyslot
|
||||
# technically, it's not actually 0x200 chunks or units. the actual space of the file
|
||||
# is encrypted with the different key. for example, if .code is 0x300 bytes, that
|
||||
# means the first 0x300 are encrypted with the NCCH 7.x key, and the remaining
|
||||
# 0x100 uses Original NCCH. however this would be quite a pain to implement properly
|
||||
# with random access, so I only work with 0x200 chunks here. after all, the space
|
||||
# after the file is effectively unused. it makes no difference, except for
|
||||
# perfectionists who want it perfectly decrypted. GodMode9 does it properly I think,
|
||||
# if that is what you want. or you can fix the empty space yourself with a hex editor.
|
||||
self._exefs_keyslot_normal_range = [(0, 0x200)]
|
||||
exefs_fp = self.open_raw_section(NCCHSection.ExeFS)
|
||||
# load the RomFS reader
|
||||
self.exefs = ExeFSReader(exefs_fp, _load_icon=False)
|
||||
|
||||
for entry in self.exefs.entries.values():
|
||||
if entry.name in EXEFS_NORMAL_CRYPTO_FILES:
|
||||
# this will add the offset (relative to ExeFS start), with the size
|
||||
# rounded up to 0x200 chunks
|
||||
r = (entry.offset + EXEFS_HEADER_SIZE,
|
||||
entry.offset + EXEFS_HEADER_SIZE + roundup(entry.size, NCCH_MEDIA_UNIT))
|
||||
self._exefs_keyslot_normal_range.append(r)
|
||||
|
||||
self.exefs._load_icon()
|
||||
|
||||
# try to load RomFS
|
||||
if not self.flags.no_romfs:
|
||||
try:
|
||||
self._fp.seek(self._start + self.sections[NCCHSection.RomFS].offset)
|
||||
except KeyError:
|
||||
pass # no RomFS
|
||||
else:
|
||||
romfs_fp = self.open_raw_section(NCCHSection.RomFS)
|
||||
# load the RomFS reader
|
||||
self.romfs = RomFSReader(romfs_fp, case_insensitive=self._case_insensitive)
|
||||
|
||||
def open_raw_section(self, section: 'NCCHSection'):
|
||||
"""Open a raw NCCH section for reading."""
|
||||
# check if the region is ExeFS and uses a newer keyslot, or is fulldec, and use a specific file class
|
||||
if (section == NCCHSection.ExeFS and self.extra_keyslot) or (section == NCCHSection.FullDecrypted):
|
||||
return _NCCHSectionFile(self, section)
|
||||
else:
|
||||
region = self.sections[section]
|
||||
fh = SubsectionIO(self._fp, self._start + region.offset, region.size)
|
||||
# if the region is encrypted (not ExeFS if an extra keyslot is in use), wrap it in CTRFileIO
|
||||
if not (self.assume_decrypted or self.flags.no_crypto or section in NO_ENCRYPTION):
|
||||
keyslot = self.extra_keyslot if region.section == NCCHSection.RomFS else Keyslot.NCCH
|
||||
fh = self._crypto.create_ctr_io(keyslot, fh, region.iv)
|
||||
return fh
|
||||
|
||||
def get_key_y(self, original: bool = False) -> bytes:
|
||||
if original or not self.flags.uses_seed:
|
||||
return self._key_y
|
||||
if self.flags.uses_seed and not self.seed_set_up:
|
||||
raise MissingSeedError('NCCH uses seed crypto, but seed is not set up')
|
||||
else:
|
||||
return self._seeded_key_y
|
||||
|
||||
@property
|
||||
def extra_keyslot(self) -> int:
|
||||
return extra_cryptoflags[self.flags.crypto_method]
|
||||
|
||||
def check_for_extheader(self) -> bool:
|
||||
return NCCHSection.ExtendedHeader in self.sections
|
||||
|
||||
def setup_seed(self, seed: bytes):
|
||||
if not self.flags.uses_seed:
|
||||
raise NCCHSeedError('NCCH does not use seed crypto')
|
||||
seed_verify_hash = sha256(seed + self.program_id.to_bytes(0x8, 'little')).digest()
|
||||
if seed_verify_hash[0x0:0x4] != self._seed_verify:
|
||||
raise NCCHSeedError('given seed does not match with seed verify hash in header')
|
||||
self.seed = seed
|
||||
self._seeded_key_y = sha256(self._key_y + seed).digest()[0:16]
|
||||
self.seed_set_up = True
|
||||
|
||||
def load_seed_from_seeddb(self, path: str = None):
|
||||
if not self.flags.uses_seed:
|
||||
raise NCCHSeedError('NCCH does not use seed crypto')
|
||||
if path:
|
||||
# if a path was provided, use only that
|
||||
paths = (path,)
|
||||
else:
|
||||
# use the fixed set of paths
|
||||
paths = seeddb_paths
|
||||
for fn in paths:
|
||||
try:
|
||||
with open(fn, 'rb') as f:
|
||||
# try to load the seed from the file
|
||||
self.setup_seed(get_seed(f, self.program_id))
|
||||
return
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
|
||||
# if keys are not set...
|
||||
raise InvalidNCCHError(paths)
|
||||
|
||||
def get_data(self, section: 'Union[NCCHRegion, NCCHSection]', offset: int, size: int) -> bytes:
|
||||
try:
|
||||
region = self._all_sections[section]
|
||||
except KeyError:
|
||||
region = section
|
||||
if offset + size > region.size:
|
||||
# prevent reading past the region
|
||||
size = region.size - offset
|
||||
|
||||
# the full-decrypted handler is done outside of the thread lock
|
||||
if region.section == NCCHSection.FullDecrypted:
|
||||
before = offset % 0x200
|
||||
aligned_offset = offset - before
|
||||
aligned_size = size + before
|
||||
|
||||
def do_thing(al_offset: int, al_size: int, cut_start: int, cut_end: int):
|
||||
# get the offset of the end of the last chunk
|
||||
end = al_offset + (ceil(al_size / 0x200) * 0x200)
|
||||
|
||||
# store the sections to read
|
||||
# dict is ordered by default in CPython since 3.6.0, and part of the language spec since 3.7.0
|
||||
to_read: Dict[Tuple[NCCHSection, int], List[int]] = {}
|
||||
|
||||
# get each section to a local variable for easier access
|
||||
header = self._all_sections[NCCHSection.Header]
|
||||
extheader = self._all_sections[NCCHSection.ExtendedHeader]
|
||||
logo = self._all_sections[NCCHSection.Logo]
|
||||
plain = self._all_sections[NCCHSection.Plain]
|
||||
exefs = self._all_sections[NCCHSection.ExeFS]
|
||||
romfs = self._all_sections[NCCHSection.RomFS]
|
||||
|
||||
last_region = False
|
||||
|
||||
# this is somewhat hardcoded for performance reasons. this may be optimized better later.
|
||||
for chunk_offset in range(al_offset, end, 0x200):
|
||||
# RomFS check first, since it might be faster
|
||||
if romfs.offset <= chunk_offset < romfs.end:
|
||||
region = (NCCHSection.RomFS, 0)
|
||||
curr_offset = romfs.offset
|
||||
|
||||
# ExeFS check second, since it might be faster
|
||||
elif exefs.offset <= chunk_offset < exefs.end:
|
||||
region = (NCCHSection.ExeFS, 0)
|
||||
curr_offset = exefs.offset
|
||||
|
||||
elif header.offset <= chunk_offset < header.end:
|
||||
region = (NCCHSection.Header, 0)
|
||||
curr_offset = header.offset
|
||||
|
||||
elif extheader.offset <= chunk_offset < extheader.end:
|
||||
region = (NCCHSection.ExtendedHeader, 0)
|
||||
curr_offset = extheader.offset
|
||||
|
||||
elif logo.offset <= chunk_offset < logo.end:
|
||||
region = (NCCHSection.Logo, 0)
|
||||
curr_offset = logo.offset
|
||||
|
||||
elif plain.offset <= chunk_offset < plain.end:
|
||||
region = (NCCHSection.Plain, 0)
|
||||
curr_offset = plain.offset
|
||||
|
||||
else:
|
||||
region = (NCCHSection.Raw, chunk_offset)
|
||||
curr_offset = 0
|
||||
|
||||
if region not in to_read:
|
||||
to_read[region] = [chunk_offset - curr_offset, 0]
|
||||
to_read[region][1] += 0x200
|
||||
last_region = region
|
||||
|
||||
is_start = True
|
||||
for region, info in to_read.items():
|
||||
new_data = self.get_data(region[0], info[0], info[1])
|
||||
if region[0] == NCCHSection.Header:
|
||||
# fix crypto flags
|
||||
ncch_array = bytearray(new_data)
|
||||
ncch_array[0x18B] = 0
|
||||
ncch_array[0x18F] = 4
|
||||
new_data = bytes(ncch_array)
|
||||
if is_start:
|
||||
new_data = new_data[cut_start:]
|
||||
is_start = False
|
||||
if region == last_region and cut_end != 0x200:
|
||||
new_data = new_data[:-cut_end]
|
||||
|
||||
yield new_data
|
||||
|
||||
return b''.join(do_thing(aligned_offset, aligned_size, before, 0x200 - ((size + before) % 0x200)))
|
||||
|
||||
with self._lock:
|
||||
# check if decryption is really needed
|
||||
if self.assume_decrypted or self.flags.no_crypto or region.section in NO_ENCRYPTION:
|
||||
# this is currently used to support FullDecrypted. other sections use SubsectionIO + CTRFileIO.
|
||||
self._fp.seek(self._start + region.offset + offset)
|
||||
return self._fp.read(size)
|
||||
|
||||
# thanks Stary2001 for help with random-access crypto
|
||||
|
||||
# if the region is ExeFS and extra crypto is being used, special handling is required
|
||||
# because different parts use different encryption methods
|
||||
if region.section == NCCHSection.ExeFS and self.flags.crypto_method != 0x00:
|
||||
# get the amount to cut off at the beginning
|
||||
before = offset % 0x200
|
||||
|
||||
# get the offset of the starting chunk
|
||||
aligned_offset = offset - before
|
||||
|
||||
# get the real offset of the starting chunk
|
||||
aligned_real_offset = self._start + region.offset + aligned_offset
|
||||
|
||||
# get the aligned total size of the requested size
|
||||
aligned_size = size + before
|
||||
self._fp.seek(aligned_real_offset)
|
||||
|
||||
def do_thing(al_offset: int, al_size: int, cut_start: int, cut_end: int):
|
||||
# get the offset of the end of the last chunk
|
||||
end = al_offset + (ceil(al_size / 0x200) * 0x200)
|
||||
|
||||
# get the offset to the last chunk
|
||||
last_chunk_offset = end - 0x200
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
for chunk in range(al_offset, end, 0x200):
|
||||
# generate the IV for this chunk
|
||||
iv = region.iv + (chunk >> 4)
|
||||
|
||||
# get the extra keyslot
|
||||
keyslot = self.extra_keyslot
|
||||
|
||||
for r in self._exefs_keyslot_normal_range:
|
||||
if r[0] <= self._fp.tell() - region.offset < r[1]:
|
||||
# if the chunk is within the "normal keyslot" ranges,
|
||||
# use the Original NCCH keyslot instead
|
||||
keyslot = Keyslot.NCCH
|
||||
|
||||
# decrypt the data
|
||||
out = self._crypto.create_ctr_cipher(keyslot, iv).decrypt(self._fp.read(0x200))
|
||||
if chunk == al_offset:
|
||||
# cut off the beginning if it's the first chunk
|
||||
out = out[cut_start:]
|
||||
if chunk == last_chunk_offset and cut_end != 0x200:
|
||||
# cut off the end of it's the last chunk
|
||||
out = out[:-cut_end]
|
||||
yield out
|
||||
|
||||
# join all the chunks into one bytes result and return it
|
||||
return b''.join(do_thing(aligned_offset, aligned_size, before, 0x200 - ((size + before) % 0x200)))
|
||||
else:
|
||||
# this is currently used to support FullDecrypted. other sections use SubsectionIO + CTRFileIO.
|
||||
|
||||
# seek to the real offset of the section + the requested offset
|
||||
self._fp.seek(self._start + region.offset + offset)
|
||||
data = self._fp.read(size)
|
||||
|
||||
# choose the extra keyslot only for RomFS here
|
||||
# ExeFS needs special handling if a newer keyslot is used, therefore it's not checked here
|
||||
keyslot = self.extra_keyslot if region.section == NCCHSection.RomFS else Keyslot.NCCH
|
||||
|
||||
# get the amount of padding required at the beginning
|
||||
before = offset % 16
|
||||
|
||||
# pad the beginning of the data if needed (the ending part doesn't need padding)
|
||||
data = (b'\0' * before) + data
|
||||
|
||||
# decrypt the data, then cut off the padding
|
||||
return self._crypto.create_ctr_cipher(keyslot, region.iv + (offset >> 4)).decrypt(data)[before:]
|
||||
@@ -1,233 +0,0 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
from io import TextIOWrapper
|
||||
from threading import Lock
|
||||
from typing import overload, TYPE_CHECKING, NamedTuple
|
||||
|
||||
from ..common import PyCTRError, _ReaderOpenFileBase
|
||||
from ..fileio import SubsectionIO
|
||||
from ..util import readle, roundup
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import BinaryIO, Optional, Tuple, Union
|
||||
|
||||
__all__ = ['IVFC_HEADER_SIZE', 'IVFC_ROMFS_MAGIC_NUM', 'ROMFS_LV3_HEADER_SIZE', 'RomFSError', 'InvalidIVFCError',
|
||||
'InvalidRomFSHeaderError', 'RomFSEntryError', 'RomFSFileNotFoundError', 'RomFSReader']
|
||||
|
||||
IVFC_HEADER_SIZE = 0x5C
|
||||
IVFC_ROMFS_MAGIC_NUM = 0x10000
|
||||
ROMFS_LV3_HEADER_SIZE = 0x28
|
||||
|
||||
|
||||
class RomFSError(PyCTRError):
|
||||
"""Generic exception for RomFS operations."""
|
||||
|
||||
|
||||
class InvalidIVFCError(RomFSError):
|
||||
"""Invalid IVFC header exception."""
|
||||
|
||||
|
||||
class InvalidRomFSHeaderError(RomFSError):
|
||||
"""Invalid RomFS Level 3 header."""
|
||||
|
||||
|
||||
class RomFSEntryError(RomFSError):
|
||||
"""Error with RomFS Directory or File entry."""
|
||||
|
||||
|
||||
class RomFSFileNotFoundError(RomFSEntryError):
|
||||
"""Invalid file path in RomFS Level 3."""
|
||||
|
||||
|
||||
class RomFSIsADirectoryError(RomFSEntryError):
|
||||
"""Attempted to open a directory as a file."""
|
||||
|
||||
|
||||
class RomFSRegion(NamedTuple):
|
||||
offset: int
|
||||
size: int
|
||||
|
||||
|
||||
class RomFSDirectoryEntry(NamedTuple):
|
||||
name: str
|
||||
type: str
|
||||
contents: 'Tuple[str, ...]'
|
||||
|
||||
|
||||
class RomFSFileEntry(NamedTuple):
|
||||
name: str
|
||||
type: str
|
||||
offset: int
|
||||
size: int
|
||||
|
||||
|
||||
class RomFSReader:
|
||||
"""
|
||||
Class for 3DS RomFS Level 3 partition.
|
||||
|
||||
https://www.3dbrew.org/wiki/RomFS
|
||||
"""
|
||||
|
||||
closed = False
|
||||
lv3_offset = 0
|
||||
data_offset = 0
|
||||
|
||||
def __init__(self, fp: 'Union[str, BinaryIO]', case_insensitive: bool = False):
|
||||
if isinstance(fp, str):
|
||||
fp = open(fp, 'rb')
|
||||
|
||||
self._start = fp.tell()
|
||||
self._fp = fp
|
||||
self.case_insensitive = case_insensitive
|
||||
self._lock = Lock()
|
||||
|
||||
lv3_offset = fp.tell()
|
||||
magic = fp.read(4)
|
||||
|
||||
# detect ivfc and get the lv3 offset
|
||||
if magic == b'IVFC':
|
||||
ivfc = magic + fp.read(0x54) # IVFC_HEADER_SIZE - 4
|
||||
ivfc_magic_num = readle(ivfc[0x4:0x8])
|
||||
if ivfc_magic_num != IVFC_ROMFS_MAGIC_NUM:
|
||||
raise InvalidIVFCError(f'IVFC magic number is invalid '
|
||||
f'({ivfc_magic_num:#X} instead of {IVFC_ROMFS_MAGIC_NUM:#X})')
|
||||
master_hash_size = readle(ivfc[0x8:0xC])
|
||||
lv3_block_size = readle(ivfc[0x4C:0x50])
|
||||
lv3_hash_block_size = 1 << lv3_block_size
|
||||
lv3_offset += roundup(0x60 + master_hash_size, lv3_hash_block_size)
|
||||
fp.seek(self._start + lv3_offset)
|
||||
magic = fp.read(4)
|
||||
self.lv3_offset = lv3_offset
|
||||
|
||||
lv3_header = magic + fp.read(0x24) # ROMFS_LV3_HEADER_SIZE - 4
|
||||
|
||||
# get offsets and sizes from lv3 header
|
||||
lv3_header_size = readle(magic)
|
||||
lv3_dirhash = RomFSRegion(offset=readle(lv3_header[0x4:0x8]), size=readle(lv3_header[0x8:0xC]))
|
||||
lv3_dirmeta = RomFSRegion(offset=readle(lv3_header[0xC:0x10]), size=readle(lv3_header[0x10:0x14]))
|
||||
lv3_filehash = RomFSRegion(offset=readle(lv3_header[0x14:0x18]), size=readle(lv3_header[0x18:0x1C]))
|
||||
lv3_filemeta = RomFSRegion(offset=readle(lv3_header[0x1C:0x20]), size=readle(lv3_header[0x20:0x24]))
|
||||
lv3_filedata_offset = readle(lv3_header[0x24:0x28])
|
||||
self.data_offset = lv3_offset + lv3_filedata_offset
|
||||
|
||||
# verify lv3 header
|
||||
if lv3_header_size != ROMFS_LV3_HEADER_SIZE:
|
||||
raise InvalidRomFSHeaderError('Length in RomFS Lv3 header is not 0x28')
|
||||
if lv3_dirhash.offset < lv3_header_size:
|
||||
raise InvalidRomFSHeaderError('Directory Hash offset is before the end of the Lv3 header')
|
||||
if lv3_dirmeta.offset < lv3_dirhash.offset + lv3_dirhash.size:
|
||||
raise InvalidRomFSHeaderError('Directory Metadata offset is before the end of the Directory Hash region')
|
||||
if lv3_filehash.offset < lv3_dirmeta.offset + lv3_dirmeta.size:
|
||||
raise InvalidRomFSHeaderError('File Hash offset is before the end of the Directory Metadata region')
|
||||
if lv3_filemeta.offset < lv3_filehash.offset + lv3_filehash.size:
|
||||
raise InvalidRomFSHeaderError('File Metadata offset is before the end of the File Hash region')
|
||||
if lv3_filedata_offset < lv3_filemeta.offset + lv3_filemeta.size:
|
||||
raise InvalidRomFSHeaderError('File Data offset is before the end of the File Metadata region')
|
||||
|
||||
# get entries from dirmeta and filemeta
|
||||
def iterate_dir(out: dict, raw: bytes, current_path: str):
|
||||
first_child_dir = readle(raw[0x8:0xC])
|
||||
first_file = readle(raw[0xC:0x10])
|
||||
|
||||
out['type'] = 'dir'
|
||||
out['contents'] = {}
|
||||
|
||||
# iterate through all child directories
|
||||
if first_child_dir != 0xFFFFFFFF:
|
||||
fp.seek(self._start + lv3_offset + lv3_dirmeta.offset + first_child_dir)
|
||||
while True:
|
||||
child_dir_meta = fp.read(0x18)
|
||||
next_sibling_dir = readle(child_dir_meta[0x4:0x8])
|
||||
child_dir_name = fp.read(readle(child_dir_meta[0x14:0x18])).decode('utf-16le')
|
||||
child_dir_name_meta = child_dir_name.lower() if case_insensitive else child_dir_name
|
||||
if child_dir_name_meta in out['contents']:
|
||||
print(f'WARNING: Dirname collision! {current_path}{child_dir_name}')
|
||||
out['contents'][child_dir_name_meta] = {'name': child_dir_name}
|
||||
|
||||
iterate_dir(out['contents'][child_dir_name_meta], child_dir_meta,
|
||||
f'{current_path}{child_dir_name}/')
|
||||
if next_sibling_dir == 0xFFFFFFFF:
|
||||
break
|
||||
fp.seek(self._start + lv3_offset + lv3_dirmeta.offset + next_sibling_dir)
|
||||
|
||||
if first_file != 0xFFFFFFFF:
|
||||
fp.seek(self._start + lv3_offset + lv3_filemeta.offset + first_file)
|
||||
while True:
|
||||
child_file_meta = fp.read(0x20)
|
||||
next_sibling_file = readle(child_file_meta[0x4:0x8])
|
||||
child_file_offset = readle(child_file_meta[0x8:0x10])
|
||||
child_file_size = readle(child_file_meta[0x10:0x18])
|
||||
child_file_name = fp.read(readle(child_file_meta[0x1C:0x20])).decode('utf-16le')
|
||||
child_file_name_meta = child_file_name.lower() if self.case_insensitive else child_file_name
|
||||
if child_file_name_meta in out['contents']:
|
||||
print(f'WARNING: Filename collision! {current_path}{child_file_name}')
|
||||
out['contents'][child_file_name_meta] = {'name': child_file_name, 'type': 'file',
|
||||
'offset': child_file_offset, 'size': child_file_size}
|
||||
|
||||
self.total_size += child_file_size
|
||||
if next_sibling_file == 0xFFFFFFFF:
|
||||
break
|
||||
fp.seek(self._start + lv3_offset + lv3_filemeta.offset + next_sibling_file)
|
||||
|
||||
self._tree_root = {'name': 'ROOT'}
|
||||
self.total_size = 0
|
||||
fp.seek(self._start + lv3_offset + lv3_dirmeta.offset)
|
||||
iterate_dir(self._tree_root, fp.read(0x18), '/')
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
try:
|
||||
self._fp.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.close()
|
||||
|
||||
@overload
|
||||
def open(self, path: str, encoding: str, errors: 'Optional[str]' = None,
|
||||
newline: 'Optional[str]' = None) -> TextIOWrapper: ...
|
||||
|
||||
@overload
|
||||
def open(self, path: str, encoding: None = None, errors: 'Optional[str]' = None,
|
||||
newline: 'Optional[str]' = None) -> SubsectionIO: ...
|
||||
|
||||
def open(self, path, encoding=None, errors=None, newline=None):
|
||||
"""Open a file in the RomFS for reading."""
|
||||
file_info = self.get_info_from_path(path)
|
||||
if not isinstance(file_info, RomFSFileEntry):
|
||||
raise RomFSIsADirectoryError(path)
|
||||
f = SubsectionIO(self._fp, self._start + self.data_offset + file_info.offset, file_info.size)
|
||||
if encoding is not None:
|
||||
f = TextIOWrapper(f, encoding, errors, newline)
|
||||
return f
|
||||
|
||||
__del__ = close
|
||||
|
||||
def get_info_from_path(self, path: str) -> 'Union[RomFSDirectoryEntry, RomFSFileEntry]':
|
||||
"""Get a directory or file entry"""
|
||||
curr = self._tree_root
|
||||
if self.case_insensitive:
|
||||
path = path.lower()
|
||||
if path[0] == '/':
|
||||
path = path[1:]
|
||||
for part in path.split('/'):
|
||||
if part == '':
|
||||
break
|
||||
try:
|
||||
# noinspection PyTypeChecker
|
||||
curr = curr['contents'][part]
|
||||
except KeyError:
|
||||
raise RomFSFileNotFoundError(path)
|
||||
if curr['type'] == 'dir':
|
||||
contents = (k['name'] for k in curr['contents'].values())
|
||||
return RomFSDirectoryEntry(name=curr['name'], type='dir', contents=(*contents,))
|
||||
elif curr['type'] == 'file':
|
||||
return RomFSFileEntry(name=curr['name'], type='file', offset=curr['offset'], size=curr['size'])
|
||||
@@ -1,111 +0,0 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
from types import MappingProxyType
|
||||
from typing import TYPE_CHECKING, NamedTuple
|
||||
|
||||
from ..common import PyCTRError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import BinaryIO, Dict, Mapping, Optional, Tuple, Union
|
||||
|
||||
SMDH_SIZE = 0x36C0
|
||||
|
||||
region_names = (
|
||||
'Japanese',
|
||||
'English',
|
||||
'French',
|
||||
'German',
|
||||
'Italian',
|
||||
'Spanish',
|
||||
'Simplified Chinese',
|
||||
'Korean',
|
||||
'Dutch',
|
||||
'Portuguese',
|
||||
'Russian',
|
||||
'Traditional Chinese',
|
||||
)
|
||||
|
||||
# the order of the SMDH names to check. the difference here is that English is put before Japanese.
|
||||
_region_order_check = (
|
||||
'English',
|
||||
'Japanese',
|
||||
'French',
|
||||
'German',
|
||||
'Italian',
|
||||
'Spanish',
|
||||
'Simplified Chinese',
|
||||
'Korean',
|
||||
'Dutch',
|
||||
'Portuguese',
|
||||
'Russian',
|
||||
'Traditional Chinese',
|
||||
)
|
||||
|
||||
|
||||
class SMDHError(PyCTRError):
|
||||
"""Generic exception for SMDH operations."""
|
||||
|
||||
|
||||
class InvalidSMDHError(SMDHError):
|
||||
"""Invalid SMDH contents."""
|
||||
|
||||
|
||||
class AppTitle(NamedTuple):
|
||||
short_desc: str
|
||||
long_desc: str
|
||||
publisher: str
|
||||
|
||||
|
||||
class SMDH:
|
||||
"""
|
||||
Class for 3DS SMDH. Icon data is currently not supported.
|
||||
|
||||
https://www.3dbrew.org/wiki/SMDH
|
||||
"""
|
||||
|
||||
# TODO: support other settings
|
||||
|
||||
def __init__(self, names: 'Dict[str, AppTitle]'):
|
||||
self.names: Mapping[str, AppTitle] = MappingProxyType({n: names.get(n, None) for n in region_names})
|
||||
|
||||
def __repr__(self):
|
||||
return f'<{type(self).__name__} title: {self.get_app_title().short_desc}>'
|
||||
|
||||
def get_app_title(self, language: 'Union[str, Tuple[str, ...]]' = _region_order_check) -> 'Optional[AppTitle]':
|
||||
if isinstance(language, str):
|
||||
language = (language,)
|
||||
|
||||
for l in language:
|
||||
apptitle = self.names[l]
|
||||
if apptitle:
|
||||
return apptitle
|
||||
|
||||
# if, for some reason, it fails to return...
|
||||
return AppTitle('unknown', 'unknown', 'unknown')
|
||||
|
||||
@classmethod
|
||||
def load(cls, fp: 'BinaryIO') -> 'SMDH':
|
||||
"""Load an SMDH from a file-like object."""
|
||||
smdh = fp.read(SMDH_SIZE)
|
||||
if len(smdh) != SMDH_SIZE:
|
||||
raise InvalidSMDHError(f'invalid size (expected: {SMDH_SIZE:#6x}, got: {len(smdh):#6x}')
|
||||
if smdh[0:4] != b'SMDH':
|
||||
raise InvalidSMDHError('SMDH magic not found')
|
||||
|
||||
app_structs = smdh[8:0x2008]
|
||||
names: Dict[str, AppTitle] = {}
|
||||
# due to region_names only being 12 elements, this will only process 12. the other 4 are unused.
|
||||
for app_title, region in zip((app_structs[x:x + 0x200] for x in range(0, 0x2000, 0x200)), region_names):
|
||||
names[region] = AppTitle(app_title[0:0x80].decode('utf-16le').strip('\0'),
|
||||
app_title[0x80:0x180].decode('utf-16le').strip('\0'),
|
||||
app_title[0x180:0x200].decode('utf-16le').strip('\0'))
|
||||
return cls(names)
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, fn: str) -> 'SMDH':
|
||||
with open(fn, 'rb') as f:
|
||||
return cls.load(f)
|
||||
@@ -1,316 +0,0 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
from hashlib import sha256
|
||||
from struct import pack
|
||||
from typing import TYPE_CHECKING, NamedTuple
|
||||
|
||||
from ..common import PyCTRError
|
||||
from ..util import readbe, readle
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import BinaryIO, Iterable
|
||||
|
||||
__all__ = ['CHUNK_RECORD_SIZE', 'TitleMetadataError', 'InvalidSignatureTypeError', 'InvalidHashError',
|
||||
'ContentInfoRecord', 'ContentChunkRecord', 'ContentTypeFlags', 'TitleVersion', 'TitleMetadataReader']
|
||||
|
||||
CHUNK_RECORD_SIZE = 0x30
|
||||
|
||||
# sig-type: (sig-size, padding)
|
||||
signature_types = {
|
||||
# RSA_4096 SHA1 (unused on 3DS)
|
||||
0x00010000: (0x200, 0x3C),
|
||||
# RSA_2048 SHA1 (unused on 3DS)
|
||||
0x00010001: (0x100, 0x3C),
|
||||
# Elliptic Curve with SHA1 (unused on 3DS)
|
||||
0x00010002: (0x3C, 0x40),
|
||||
# RSA_4096 SHA256
|
||||
0x00010003: (0x200, 0x3C),
|
||||
# RSA_2048 SHA256
|
||||
0x00010004: (0x100, 0x3C),
|
||||
# ECDSA with SHA256
|
||||
0x00010005: (0x3C, 0x40),
|
||||
}
|
||||
|
||||
BLANK_SIG_PAIR = (0x00010004, b'\xFF' * signature_types[0x00010004][0])
|
||||
|
||||
|
||||
class TitleMetadataError(PyCTRError):
|
||||
"""Generic exception for TitleMetadata operations."""
|
||||
|
||||
|
||||
class InvalidTMDError(TitleMetadataError):
|
||||
"""Title Metadata is invalid."""
|
||||
|
||||
|
||||
class InvalidSignatureTypeError(InvalidTMDError):
|
||||
"""Invalid signature type was used."""
|
||||
|
||||
def __init__(self, sig_type):
|
||||
super().__init__(sig_type)
|
||||
|
||||
def __str__(self):
|
||||
return f'{self.args[0]:#010x}'
|
||||
|
||||
|
||||
class InvalidHashError(InvalidTMDError):
|
||||
"""Hash mismatch in the Title Metadata."""
|
||||
|
||||
|
||||
class InvalidInfoRecordError(InvalidHashError):
|
||||
"""Hash mismatch in the Content Info Records."""
|
||||
|
||||
def __init__(self, info_record):
|
||||
super().__init__(info_record)
|
||||
|
||||
def __str__(self):
|
||||
return f'Invalid info record: {self.args[0]}'
|
||||
|
||||
|
||||
class UnusualInfoRecordError(InvalidTMDError):
|
||||
"""Encountered Content Info Record that attempts to hash a Content Chunk Record that has already been hashed."""
|
||||
|
||||
def __init__(self, info_record, chunk_record):
|
||||
super().__init__(info_record, chunk_record)
|
||||
|
||||
def __str__(self):
|
||||
return f'Attempted to hash twice: {self.args[0]}, {self.args[1]}'
|
||||
|
||||
|
||||
class ContentTypeFlags(NamedTuple):
|
||||
encrypted: bool
|
||||
disc: bool
|
||||
cfm: bool
|
||||
optional: bool
|
||||
shared: bool
|
||||
|
||||
def __index__(self) -> int:
|
||||
return self.encrypted | (self.disc << 1) | (self.cfm << 2) | (self.optional << 14) | (self.shared << 15)
|
||||
|
||||
__int__ = __index__
|
||||
|
||||
def __format__(self, format_spec: str) -> str:
|
||||
return self.__int__().__format__(format_spec)
|
||||
|
||||
@classmethod
|
||||
def from_int(cls, flags: int) -> 'ContentTypeFlags':
|
||||
# noinspection PyArgumentList
|
||||
return cls(bool(flags & 1), bool(flags & 2), bool(flags & 4), bool(flags & 0x4000), bool(flags & 0x8000))
|
||||
|
||||
|
||||
class ContentInfoRecord(NamedTuple):
|
||||
index_offset: int
|
||||
command_count: int
|
||||
hash: bytes
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return b''.join((self.index_offset.to_bytes(2, 'big'), self.command_count.to_bytes(2, 'big'), self.hash))
|
||||
|
||||
|
||||
class ContentChunkRecord(NamedTuple):
|
||||
id: str
|
||||
cindex: int
|
||||
type: ContentTypeFlags
|
||||
size: int
|
||||
hash: bytes
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return b''.join((bytes.fromhex(self.id), self.cindex.to_bytes(2, 'big'), int(self.type).to_bytes(2, 'big'),
|
||||
self.size.to_bytes(8, 'big'), self.hash))
|
||||
|
||||
|
||||
class TitleVersion(NamedTuple):
|
||||
major: int
|
||||
minor: int
|
||||
micro: int
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'{self.major}.{self.minor}.{self.micro}'
|
||||
|
||||
def __index__(self) -> int:
|
||||
return (self.major << 10) | (self.minor << 4) | self.micro
|
||||
|
||||
__int__ = __index__
|
||||
|
||||
def __format__(self, format_spec: str) -> str:
|
||||
return self.__int__().__format__(format_spec)
|
||||
|
||||
@classmethod
|
||||
def from_int(cls, ver: int) -> 'TitleVersion':
|
||||
# noinspection PyArgumentList
|
||||
return cls((ver >> 10) & 0x3F, (ver >> 4) & 0x3F, ver & 0xF)
|
||||
|
||||
|
||||
class TitleMetadataReader:
|
||||
"""
|
||||
Class for 3DS Title Metadata.
|
||||
|
||||
https://www.3dbrew.org/wiki/Title_metadata
|
||||
"""
|
||||
|
||||
__slots__ = ('title_id', 'save_size', 'srl_save_size', 'title_version', 'info_records',
|
||||
'chunk_records', 'content_count', 'signature', '_u_issuer', '_u_version', '_u_ca_crl_version',
|
||||
'_u_signer_crl_version', '_u_reserved1', '_u_system_version', '_u_title_type', '_u_group_id',
|
||||
'_u_reserved2', '_u_srl_flag', '_u_reserved3', '_u_access_rights', '_u_boot_count', '_u_padding')
|
||||
|
||||
# arguments prefixed with _u_ are values unused by the 3DS and/or are only kept around to generate the final tmd
|
||||
def __init__(self, *, title_id: str, save_size: int, srl_save_size: int, title_version: TitleVersion,
|
||||
info_records: 'Iterable[ContentInfoRecord]', chunk_records: 'Iterable[ContentChunkRecord]',
|
||||
signature=BLANK_SIG_PAIR, _u_issuer='Root-CA00000003-CP0000000b', _u_version=1, _u_ca_crl_version=0,
|
||||
_u_signer_crl_version=0, _u_reserved1=0, _u_system_version=b'\0' * 8, _u_title_type=b'\0\0\0@',
|
||||
_u_group_id=b'\0\0', _u_reserved2=b'\0\0\0\0', _u_srl_flag=0, _u_reserved3=b'\0' * 0x31,
|
||||
_u_access_rights=b'\0' * 4, _u_boot_count=b'\0\0', _u_padding=b'\0\0'):
|
||||
# TODO: add checks
|
||||
self.title_id = title_id.lower()
|
||||
self.save_size = save_size
|
||||
self.srl_save_size = srl_save_size
|
||||
self.title_version = title_version
|
||||
self.info_records = tuple(info_records)
|
||||
self.chunk_records = tuple(chunk_records)
|
||||
self.content_count = len(self.chunk_records)
|
||||
self.signature = signature # TODO: store this differently
|
||||
|
||||
# unused values
|
||||
self._u_issuer = _u_issuer
|
||||
self._u_version = _u_version
|
||||
self._u_ca_crl_version = _u_ca_crl_version
|
||||
self._u_signer_crl_version = _u_signer_crl_version
|
||||
self._u_reserved1 = _u_reserved1
|
||||
self._u_system_version = _u_system_version
|
||||
self._u_title_type = _u_title_type
|
||||
self._u_group_id = _u_group_id
|
||||
self._u_reserved2 = _u_reserved2
|
||||
self._u_srl_flag = _u_srl_flag
|
||||
self._u_reserved3 = _u_reserved3
|
||||
self._u_access_rights = _u_access_rights
|
||||
self._u_boot_count = _u_boot_count
|
||||
self._u_padding = _u_padding
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash((self.title_id, self.save_size, self.srl_save_size, self.title_version,
|
||||
self.info_records, self.chunk_records))
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (f'<TitleMetadataReader title_id={self.title_id!r} title_version={self.title_version!r} '
|
||||
f'content_count={self.content_count!r}>')
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
sig_data = pack(f'>I {signature_types[self.signature[0]][0]}s {signature_types[self.signature[0]][1]}x',
|
||||
self.signature[0], self.signature[1])
|
||||
|
||||
info_records = b''.join(bytes(x) for x in self.info_records).ljust(0x900, b'\0')
|
||||
|
||||
header = pack('>64s b b b b 8s 8s 4s 2s I I 4s b 49s 4s H H 2s 2s 32s', self._u_issuer.encode('ascii'),
|
||||
self._u_version, self._u_ca_crl_version, self._u_signer_crl_version, self._u_reserved1,
|
||||
self._u_system_version, bytes.fromhex(self.title_id), self._u_title_type, self._u_group_id,
|
||||
self.save_size, self.srl_save_size, self._u_reserved2, self._u_srl_flag, self._u_reserved3,
|
||||
self._u_access_rights, self.title_version, self.content_count, self._u_boot_count,
|
||||
self._u_padding, sha256(info_records).digest())
|
||||
|
||||
chunk_records = b''.join(bytes(x) for x in self.chunk_records)
|
||||
|
||||
return sig_data + header + info_records + chunk_records
|
||||
|
||||
@classmethod
|
||||
def load(cls, fp: 'BinaryIO', verify_hashes: bool = True) -> 'TitleMetadataReader':
|
||||
"""Load a tmd from a file-like object."""
|
||||
sig_type = readbe(fp.read(4))
|
||||
try:
|
||||
sig_size, sig_padding = signature_types[sig_type]
|
||||
except KeyError:
|
||||
raise InvalidSignatureTypeError(sig_type)
|
||||
|
||||
signature = fp.read(sig_size)
|
||||
try:
|
||||
fp.seek(sig_padding, 1)
|
||||
except Exception:
|
||||
# most streams are probably seekable, but for some that aren't...
|
||||
fp.read(sig_padding)
|
||||
|
||||
header = fp.read(0xC4)
|
||||
if len(header) != 0xC4:
|
||||
raise InvalidTMDError('Header length is not 0xC4')
|
||||
|
||||
# only values that actually have a use are loaded here. (currently)
|
||||
# several fields in were left in from the Wii tmd and have no function on 3DS.
|
||||
title_id = header[0x4C:0x54].hex()
|
||||
save_size = readle(header[0x5A:0x5E])
|
||||
srl_save_size = readle(header[0x5E:0x62])
|
||||
title_version = TitleVersion.from_int(readbe(header[0x9C:0x9E]))
|
||||
content_count = readbe(header[0x9E:0xA0])
|
||||
|
||||
content_info_records_hash = header[0xA4:0xC4]
|
||||
|
||||
content_info_records_raw = fp.read(0x900)
|
||||
if len(content_info_records_raw) != 0x900:
|
||||
raise InvalidTMDError('Content info records length is not 0x900')
|
||||
|
||||
if verify_hashes:
|
||||
real_hash = sha256(content_info_records_raw)
|
||||
if content_info_records_hash != real_hash.digest():
|
||||
raise InvalidHashError('Content Info Records hash is invalid')
|
||||
|
||||
content_chunk_records_raw = fp.read(content_count * CHUNK_RECORD_SIZE)
|
||||
|
||||
chunk_records = []
|
||||
for cr_raw in (content_chunk_records_raw[i:i + CHUNK_RECORD_SIZE] for i in
|
||||
range(0, content_count * CHUNK_RECORD_SIZE, CHUNK_RECORD_SIZE)):
|
||||
chunk_records.append(ContentChunkRecord(id=cr_raw[0:4].hex(),
|
||||
cindex=readbe(cr_raw[4:6]),
|
||||
type=ContentTypeFlags.from_int(readbe(cr_raw[6:8])),
|
||||
size=readbe(cr_raw[8:16]),
|
||||
hash=cr_raw[16:48]))
|
||||
|
||||
info_records = []
|
||||
for ir_raw in (content_info_records_raw[i:i + 0x24] for i in range(0, 0x900, 0x24)):
|
||||
if ir_raw != b'\0' * 0x24:
|
||||
info_records.append(ContentInfoRecord(index_offset=readbe(ir_raw[0:2]),
|
||||
command_count=readbe(ir_raw[2:4]),
|
||||
hash=ir_raw[4:36]))
|
||||
|
||||
if verify_hashes:
|
||||
chunk_records_hashed = set()
|
||||
for ir in info_records:
|
||||
to_hash = []
|
||||
for cr in chunk_records[ir.index_offset:ir.index_offset + ir.command_count]:
|
||||
if cr in chunk_records_hashed:
|
||||
raise InvalidTMDError('attempting to hash chunk record twice')
|
||||
|
||||
chunk_records_hashed.add(cr)
|
||||
to_hash.append(cr)
|
||||
|
||||
hashed = sha256(b''.join(bytes(x) for x in to_hash))
|
||||
if hashed.digest() != ir.hash:
|
||||
raise InvalidInfoRecordError(ir)
|
||||
|
||||
# unused vales are loaded only for use when re-building the binary tmd
|
||||
u_issuer = header[0:0x40].decode('ascii').rstrip('\0')
|
||||
u_version = header[0x40]
|
||||
u_ca_crl_version = header[0x41]
|
||||
u_signer_crl_version = header[0x42]
|
||||
u_reserved1 = header[0x43]
|
||||
u_system_version = header[0x44:0x4C]
|
||||
u_title_type = header[0x54:0x58]
|
||||
u_group_id = header[0x58:0x5A]
|
||||
u_reserved2 = header[0x62:0x66]
|
||||
u_srl_flag = header[0x66] # is this one used for anything?
|
||||
u_reserved3 = header[0x67:0x98]
|
||||
u_access_rights = header[0x98:0x9C]
|
||||
u_boot_count = header[0xA0:0xA2]
|
||||
u_padding = header[0xA2:0xA4]
|
||||
|
||||
return cls(title_id=title_id, save_size=save_size, srl_save_size=srl_save_size, title_version=title_version,
|
||||
info_records=info_records, chunk_records=chunk_records, signature=(sig_type, signature),
|
||||
_u_issuer=u_issuer, _u_version=u_version, _u_ca_crl_version=u_ca_crl_version,
|
||||
_u_signer_crl_version=u_signer_crl_version, _u_reserved1=u_reserved1,
|
||||
_u_system_version=u_system_version, _u_title_type=u_title_type, _u_group_id=u_group_id,
|
||||
_u_reserved2=u_reserved2, _u_srl_flag=u_srl_flag, _u_reserved3=u_reserved3,
|
||||
_u_access_rights=u_access_rights, _u_boot_count=u_boot_count, _u_padding=u_padding)
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, fn: str, *, verify_hashes: bool = True) -> 'TitleMetadataReader':
|
||||
with open(fn, 'rb') as f:
|
||||
return cls.load(f, verify_hashes=verify_hashes)
|
||||
@@ -1,41 +0,0 @@
|
||||
# This file is a part of ninfs.
|
||||
#
|
||||
# Copyright (c) 2017-2019 Ian Burgwin
|
||||
# This file is licensed under The MIT License (MIT).
|
||||
# You can find the full license text in LICENSE.md in the root of this project.
|
||||
|
||||
import os
|
||||
from math import ceil
|
||||
from sys import platform
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import List
|
||||
|
||||
__all__ = ['windows', 'macos', 'readle', 'readbe', 'roundup', 'config_dirs']
|
||||
|
||||
windows = platform in {'win32', 'cygwin'}
|
||||
macos = platform == 'darwin'
|
||||
|
||||
|
||||
def readle(b: bytes) -> int:
|
||||
"""Convert little-endian bytes to an int."""
|
||||
return int.from_bytes(b, 'little')
|
||||
|
||||
|
||||
def readbe(b: bytes) -> int:
|
||||
"""Convert big-endian bytes to an int."""
|
||||
return int.from_bytes(b, 'big')
|
||||
|
||||
|
||||
def roundup(offset: int, alignment: int) -> int:
|
||||
"""Round up a number to a provided alignment."""
|
||||
return int(ceil(offset / alignment) * alignment)
|
||||
|
||||
|
||||
_home = os.path.expanduser('~')
|
||||
config_dirs: 'List[str]' = [os.path.join(_home, '.3ds'), os.path.join(_home, '3ds')]
|
||||
if windows:
|
||||
config_dirs.insert(0, os.path.join(os.environ.get('APPDATA'), '3ds'))
|
||||
elif macos:
|
||||
config_dirs.insert(0, os.path.join(_home, 'Library', 'Application Support', '3ds'))
|
||||
@@ -1,2 +1,3 @@
|
||||
pycryptodomex==3.9.4
|
||||
events==0.3
|
||||
pyctr==0.1.0
|
||||
|
||||
Reference in New Issue
Block a user