initial commit

This commit is contained in:
Ian Burgwin
2019-09-06 14:22:13 -07:00
commit af2bb123a7
26 changed files with 3842 additions and 0 deletions

7
pyctr/README.md Normal file
View File

@@ -0,0 +1,7 @@
# PyCTR
Python library to parse several Nintendo 3DS files.
The API is not stable and can significantly change at any point. (If anyone decides to use this, that is)
This will eventually be a separate repo...

6
pyctr/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
from .types.cia import *
from .types.exefs import *
from .types.ncch import *
from .types.romfs import *
from .types.smdh import *
from .types.tmd import *

82
pyctr/common.py Normal file
View File

@@ -0,0 +1,82 @@
# This file is a part of ninfs.
#
# Copyright (c) 2017-2019 Ian Burgwin
# This file is licensed under The MIT License (MIT).
# You can find the full license text in LICENSE.md in the root of this project.
from functools import wraps
from io import BufferedIOBase
from typing import TYPE_CHECKING
if TYPE_CHECKING:
# this is a lazy way to make type checkers stop complaining
from typing import BinaryIO
BufferedIOBase = BinaryIO
class PyCTRError(Exception):
"""Common base class for all PyCTR errors."""
def _raise_if_closed(method):
@wraps(method)
def decorator(self: '_ReaderOpenFileBase', *args, **kwargs):
if self._reader.closed:
self.closed = True
if self.closed:
raise ValueError('I/O operation on closed file')
return method(self, *args, **kwargs)
return decorator
class _ReaderOpenFileBase(BufferedIOBase):
"""Base class for all open files for Reader classes."""
_seek = 0
_info = None
closed = False
def __init__(self, reader, path):
self._reader = reader
self._path = path
def __repr__(self):
return f'<{type(self).__name__} path={self._path!r} info={self._info!r} reader={self._reader!r}>'
@_raise_if_closed
def read(self, size: int = -1) -> bytes:
if size == -1:
size = self._info.size - self._seek
data = self._reader.get_data(self._info, self._seek, size)
self._seek += len(data)
return data
read1 = read # probably make this act like read1 should, but this for now enables some other things to work
@_raise_if_closed
def seek(self, seek: int, whence: int = 0) -> int:
if whence == 0:
if seek < 0:
raise ValueError(f'negative seek value {seek}')
self._seek = min(seek, self._info.size)
elif whence == 1:
self._seek = max(self._seek + seek, 0)
elif whence == 2:
self._seek = max(self._info.size + seek, 0)
return self._seek
@_raise_if_closed
def tell(self) -> int:
return self._seek
@_raise_if_closed
def readable(self) -> bool:
return True
@_raise_if_closed
def writable(self) -> bool:
return False
@_raise_if_closed
def seekable(self) -> bool:
return True

598
pyctr/crypto.py Normal file
View File

@@ -0,0 +1,598 @@
# This file is a part of ninfs.
#
# Copyright (c) 2017-2019 Ian Burgwin
# This file is licensed under The MIT License (MIT).
# You can find the full license text in LICENSE.md in the root of this project.
from enum import IntEnum
from functools import wraps
from hashlib import sha256
from os import environ
from os.path import getsize, join as pjoin
from struct import pack, unpack
from typing import TYPE_CHECKING
from Cryptodome.Cipher import AES
from Cryptodome.Hash import CMAC
from Cryptodome.Util import Counter
from .common import PyCTRError
from .util import config_dirs, readbe, readle
if TYPE_CHECKING:
# noinspection PyProtectedMember
from Cryptodome.Cipher._mode_cbc import CbcMode
# noinspection PyProtectedMember
from Cryptodome.Cipher._mode_ctr import CtrMode
# noinspection PyProtectedMember
from Cryptodome.Cipher._mode_ecb import EcbMode
from Cryptodome.Hash.CMAC import CMAC as CMACObject
from typing import Dict, List, Union
__all__ = ['CryptoError', 'OTPLengthError', 'CorruptBootromError', 'KeyslotMissingError', 'TicketLengthError',
'BootromNotFoundError', 'CorruptOTPError', 'Keyslot', 'CryptoEngine']
class CryptoError(PyCTRError):
"""Generic exception for cryptography operations."""
class OTPLengthError(CryptoError):
"""OTP is the wrong length."""
class CorruptOTPError(CryptoError):
"""OTP hash does not match."""
class KeyslotMissingError(CryptoError):
"""Normal key is not set up for the keyslot."""
class BadMovableSedError(CryptoError):
"""movable.sed provided is invalid."""
class TicketLengthError(CryptoError):
"""Ticket is too small."""
def __init__(self, length):
super().__init__(length)
def __str__(self):
return f'0x350 expected, {self.args[0]:#x} given'
# wonder if I'm doing this right...
class BootromNotFoundError(CryptoError):
"""ARM9 bootROM was not found. Main argument is a tuple of checked paths."""
class CorruptBootromError(CryptoError):
"""ARM9 bootROM hash does not match."""
class Keyslot(IntEnum):
TWLNAND = 0x03
CTRNANDOld = 0x04
CTRNANDNew = 0x05
FIRM = 0x06
AGB = 0x07
CMACNANDDB = 0x0B
NCCH93 = 0x18
CMACCardSaveNew = 0x19
CardSaveNew = 0x1A
NCCH96 = 0x1B
CMACAGB = 0x24
NCCH70 = 0x25
NCCH = 0x2C
UDSLocalWAN = 0x2D
StreetPass = 0x2E
Save60 = 0x2F
CMACSDNAND = 0x30
CMACCardSave = 0x33
SD = 0x34
CardSave = 0x37
BOSS = 0x38
DownloadPlay = 0x39
DSiWareExport = 0x3A
CommonKey = 0x3D
# anything after 0x3F is custom to PyCTR
DecryptedTitlekey = 0x40
BOOT9_PROT_HASH = '7331f7edece3dd33f2ab4bd0b3a5d607229fd19212c10b734cedcaf78c1a7b98'
DEV_COMMON_KEY_0 = bytes.fromhex('55A3F872BDC80C555A654381139E153B')
common_key_y = (
# eShop
0xD07B337F9CA4385932A2E25723232EB9,
# System
0x0C767230F0998F1C46828202FAACBE4C,
# Unknown
0xC475CB3AB8C788BB575E12A10907B8A4,
# Unknown
0xE486EEE3D0C09C902F6686D4C06F649F,
# Unknown
0xED31BA9C04B067506C4497A35B7804FC,
# Unknown
0x5E66998AB4E8931606850FD7A16DD755
)
base_key_x = {
# New3DS 9.3 NCCH
0x18: (0x82E9C9BEBFB8BDB875ECC0A07D474374, 0x304BF1468372EE64115EBD4093D84276),
# New3DS 9.6 NCCH
0x1B: (0x45AD04953992C7C893724A9A7BCE6182, 0x6C8B2944A0726035F941DFC018524FB6),
# 7x NCCH
0x25: (0xCEE7D8AB30C00DAE850EF5E382AC5AF3, 0x81907A4B6F1B47323A677974CE4AD71B),
}
# global values to be copied to new CryptoEngine instances after the first one
_b9_key_x: 'Dict[int, int]' = {}
_b9_key_y: 'Dict[int, int]' = {}
_b9_key_normal: 'Dict[int, bytes]' = {}
_b9_extdata_otp: bytes = None
_b9_extdata_keygen: bytes = None
_b9_path: str = None
_otp_key: bytes = None
_otp_iv: bytes = None
b9_paths: 'List[str]' = []
for p in config_dirs:
b9_paths.append(pjoin(p, 'boot9.bin'))
b9_paths.append(pjoin(p, 'boot9_prot.bin'))
try:
b9_paths.insert(0, environ['BOOT9_PATH'])
except KeyError:
pass
def _requires_bootrom(method):
@wraps(method)
def wrapper(self, *args, **kwargs):
if not self.b9_keys_set:
raise KeyslotMissingError('bootrom is required to set up keys, see setup_keys_from_boot9')
return method(self, *args, **kwargs)
return wrapper
# used from http://www.falatic.com/index.php/108/python-and-bitwise-rotation
# converted to def because pycodestyle complained to me
def rol(val: int, r_bits: int, max_bits: int) -> int:
return (val << r_bits % max_bits) & (2 ** max_bits - 1) |\
((val & (2 ** max_bits - 1)) >> (max_bits - (r_bits % max_bits)))
class _TWLCryptoWrapper:
def __init__(self, cipher: 'CbcMode'):
self._cipher = cipher
def encrypt(self, data: bytes) -> bytes:
data_len = len(data)
data_rev = bytearray(data_len)
for i in range(0, data_len, 0x10):
data_rev[i:i + 0x10] = data[i:i + 0x10][::-1]
data_out = bytearray(self._cipher.encrypt(bytes(data_rev)))
for i in range(0, data_len, 0x10):
data_out[i:i + 0x10] = data_out[i:i + 0x10][::-1]
return bytes(data_out[0:data_len])
decrypt = encrypt
class CryptoEngine:
"""Class for 3DS crypto operations, including encryption and key generation."""
b9_keys_set: bool = False
b9_path: str = None
_b9_extdata_otp: bytes = None
_b9_extdata_keygen: bytes = None
_otp_key: bytes = None
_otp_iv: bytes = None
_id0: bytes = None
def __init__(self, boot9: str = None, dev: int = 0, setup_b9_keys: bool = True):
self.key_x: Dict[int, int] = {}
self.key_y: Dict[int, int] = {0x03: 0xE1A00005202DDD1DBD4DC4D30AB9DC76,
0x05: 0x4D804F4E9990194613A204AC584460BE}
self.key_normal: Dict[int, bytes] = {}
self.dev = dev
for keyslot, keys in base_key_x.items():
self.key_x[keyslot] = keys[dev]
if setup_b9_keys:
self.setup_keys_from_boot9_file(boot9)
@property
@_requires_bootrom
def b9_extdata_otp(self) -> bytes:
return self._b9_extdata_otp
@property
@_requires_bootrom
def b9_extdata_keygen(self) -> bytes:
return self._b9_extdata_keygen
@property
@_requires_bootrom
def otp_key(self) -> bytes:
return self._otp_key
@property
@_requires_bootrom
def otp_iv(self) -> bytes:
return self._otp_iv
@property
def id0(self) -> bytes:
if not self._id0:
raise KeyslotMissingError('load a movable.sed with setup_sd_key')
return self._id0
def create_cbc_cipher(self, keyslot: int, iv: bytes) -> 'CbcMode':
"""Create AES-CBC cipher with the given keyslot."""
try:
key = self.key_normal[keyslot]
except KeyError:
raise KeyslotMissingError(f'normal key for keyslot 0x{keyslot:02x} is not set up')
return AES.new(key, AES.MODE_CBC, iv)
def create_ctr_cipher(self, keyslot: int, ctr: int) -> 'Union[CtrMode, _TWLCryptoWrapper]':
"""
Create AES-CTR cipher with the given keyslot.
Normal and DSi crypto will be automatically chosen depending on keyslot.
"""
try:
key = self.key_normal[keyslot]
except KeyError:
raise KeyslotMissingError(f'normal key for keyslot 0x{keyslot:02x} is not set up')
cipher = AES.new(key, AES.MODE_CTR, counter=Counter.new(128, initial_value=ctr))
if keyslot < 0x04:
return _TWLCryptoWrapper(cipher)
else:
return cipher
def create_ecb_cipher(self, keyslot: int) -> 'EcbMode':
"""Create AES-ECB cipher with the given keyslot."""
try:
key = self.key_normal[keyslot]
except KeyError:
raise KeyslotMissingError(f'normal key for keyslot 0x{keyslot:02x} is not set up')
return AES.new(key, AES.MODE_ECB)
def create_cmac_object(self, keyslot: int) -> 'CMACObject':
"""Create a CMAC object with the given keyslot."""
try:
key = self.key_normal[keyslot]
except KeyError:
raise KeyslotMissingError(f'normal key for keyslot 0x{keyslot:02x} is not set up')
return CMAC.new(key, ciphermod=AES)
@staticmethod
def sd_path_to_iv(path: str) -> int:
# ensure the path is lowercase
path = path.lower()
# SD Save Data Backup does a copy of the raw, encrypted file from the game's data directory
# so we need to handle this and fake the path
if path.startswith('/backup') and len(path) > 28:
tid_upper = path[12:20]
tid_lower = path[20:28]
path = f'/title/{tid_upper}/{tid_lower}/data' + path[28:]
path_hash = sha256(path.encode('utf-16le') + b'\0\0').digest()
hash_p1 = readbe(path_hash[0:16])
hash_p2 = readbe(path_hash[16:32])
return hash_p1 ^ hash_p2
def load_from_ticket(self, ticket: bytes):
"""Load a titlekey from a ticket and set keyslot 0x40 to the decrypted titlekey."""
ticket_len = len(ticket)
# TODO: probably support other sig types which would be different lengths
# unlikely to happen in practice, but I would still like to
if ticket_len < 0x2AC:
raise TicketLengthError(ticket_len)
titlekey_enc = ticket[0x1BF:0x1CF]
title_id = ticket[0x1DC:0x1E4]
common_key_index = ticket[0x1F1]
if self.dev and common_key_index == 0:
self.set_normal_key(0x3D, DEV_COMMON_KEY_0)
else:
self.set_keyslot('y', 0x3D, common_key_y[common_key_index])
cipher = self.create_cbc_cipher(0x3D, title_id + (b'\0' * 8))
self.set_normal_key(0x40, cipher.decrypt(titlekey_enc))
def set_keyslot(self, xy: str, keyslot: int, key: 'Union[int, bytes]'):
"""Sets a keyslot to the specified key."""
to_use = None
if xy == 'x':
to_use = self.key_x
elif xy == 'y':
to_use = self.key_y
if isinstance(key, bytes):
key = int.from_bytes(key, 'big' if keyslot > 0x03 else 'little')
to_use[keyslot] = key
try:
self.key_normal[keyslot] = self.keygen(keyslot)
except KeyError:
pass
def set_normal_key(self, keyslot: int, key: bytes):
self.key_normal[keyslot] = key
def keygen(self, keyslot: int) -> bytes:
"""Generate a normal key based on the keyslot."""
if keyslot < 0x04:
# DSi
return self.keygen_twl_manual(self.key_x[keyslot], self.key_y[keyslot])
else:
# 3DS
return self.keygen_manual(self.key_x[keyslot], self.key_y[keyslot])
@staticmethod
def keygen_manual(key_x: int, key_y: int) -> bytes:
"""Generate a normal key using the 3DS AES keyscrambler."""
return rol((rol(key_x, 2, 128) ^ key_y) + 0x1FF9E9AAC5FE0408024591DC5D52768A, 87, 128).to_bytes(0x10, 'big')
@staticmethod
def keygen_twl_manual(key_x: int, key_y: int) -> bytes:
"""Generate a normal key using the DSi AES keyscrambler."""
# usually would convert to LE bytes in the end then flip with [::-1], but those just cancel out
return rol((key_x ^ key_y) + 0xFFFEFB4E295902582A680F5F1A4F3E79, 42, 128).to_bytes(0x10, 'big')
def _copy_global_keys(self):
self.key_x.update(_b9_key_x)
self.key_y.update(_b9_key_y)
self.key_normal.update(_b9_key_normal)
self._otp_key = _otp_key
self._otp_iv = _otp_iv
self._b9_extdata_otp = _b9_extdata_otp
self._b9_extdata_keygen = _b9_extdata_keygen
self.b9_keys_set = True
def setup_keys_from_boot9(self, b9: bytes):
"""Set up certain keys from an ARM9 bootROM dump."""
global _otp_key, _otp_iv, _b9_extdata_otp, _b9_extdata_keygen
if self.b9_keys_set:
return
if _b9_key_x:
self._copy_global_keys()
return
b9_len = len(b9)
if b9_len != 0x8000:
raise CorruptBootromError(f'wrong length: {b9_len}')
b9_hash_digest: str = sha256(b9).hexdigest()
if b9_hash_digest != BOOT9_PROT_HASH:
raise CorruptBootromError(f'expected: {BOOT9_PROT_HASH}; returned: {b9_hash_digest}')
keyblob_offset = 0x5860
otp_key_offset = 0x56E0
if self.dev:
keyblob_offset += 0x400
otp_key_offset += 0x20
_otp_key = b9[otp_key_offset:otp_key_offset + 0x10]
_otp_iv = b9[otp_key_offset + 0x10:otp_key_offset + 0x20]
keyblob: bytes = b9[keyblob_offset:keyblob_offset + 0x400]
_b9_extdata_keygen = keyblob[0:0x200]
_b9_extdata_otp = keyblob[0:0x24]
# Original NCCH key, UDS local-WLAN CCMP key, StreetPass key, 6.0 save key
_b9_key_x[0x2C] = _b9_key_x[0x2D] = _b9_key_x[0x2E] = _b9_key_x[0x2F] = readbe(keyblob[0x170:0x180])
# SD/NAND AES-CMAC key, APT wrap key, Unknown, Gamecard savedata AES-CMAC
_b9_key_x[0x30] = _b9_key_x[0x31] = _b9_key_x[0x32] = _b9_key_x[0x33] = readbe(keyblob[0x180:0x190])
# SD key (loaded from movable.sed), movable.sed key, Unknown (used by friends module),
# Gamecard savedata actual key
_b9_key_x[0x34] = _b9_key_x[0x35] = _b9_key_x[0x36] = _b9_key_x[0x37] = readbe(keyblob[0x190:0x1A0])
# BOSS key, Download Play key + actual NFC key for generating retail amiibo keys, CTR-CARD hardware-crypto seed
# decryption key
_b9_key_x[0x38] = _b9_key_x[0x39] = _b9_key_x[0x3A] = _b9_key_x[0x3B] = readbe(keyblob[0x1A0:0x1B0])
# Unused
_b9_key_x[0x3C] = readbe(keyblob[0x1B0:0x1C0])
# Common key (titlekey crypto)
_b9_key_x[0x3D] = readbe(keyblob[0x1C0:0x1D0])
# Unused
_b9_key_x[0x3E] = readbe(keyblob[0x1D0:0x1E0])
# NAND partition keys
_b9_key_y[0x04] = readbe(keyblob[0x1F0:0x200])
# correct 0x05 KeyY not set by boot9.
_b9_key_y[0x06] = readbe(keyblob[0x210:0x220])
_b9_key_y[0x07] = readbe(keyblob[0x220:0x230])
# Unused, Unused, DSiWare export key, NAND dbs/movable.sed AES-CMAC key
_b9_key_y[0x08] = readbe(keyblob[0x230:0x240])
_b9_key_y[0x09] = readbe(keyblob[0x240:0x250])
_b9_key_y[0x0A] = readbe(keyblob[0x250:0x260])
_b9_key_y[0x0B] = readbe(keyblob[0x260:0x270])
_b9_key_normal[0x0D] = keyblob[0x270:0x280]
self._copy_global_keys()
def setup_keys_from_boot9_file(self, path: str = None):
"""Set up certain keys from an ARM9 bootROM file."""
global _b9_path
if self.b9_keys_set:
return
if _b9_key_x:
self.b9_path = _b9_path
self._copy_global_keys()
return
paths = (path,) if path else b9_paths
for p in paths:
try:
b9_size = getsize(p)
if b9_size in {0x8000, 0x10000}:
with open(p, 'rb') as f:
if b9_size == 0x10000:
f.seek(0x8000)
self.setup_keys_from_boot9(f.read(0x8000))
_b9_path = p
self.b9_path = p
return
except FileNotFoundError:
continue
# if keys are not set...
raise BootromNotFoundError(paths)
@_requires_bootrom
def setup_keys_from_otp(self, otp: bytes):
"""Set up console-unique keys from an OTP dump. Encrypted and decrypted are supported."""
otp_len = len(otp)
if otp_len != 0x100:
raise OTPLengthError(otp_len)
cipher_otp = AES.new(self.otp_key, AES.MODE_CBC, self.otp_iv)
if otp[0:4] == b'\x0f\xb0\xad\xde':
# decrypted otp
otp_enc: bytes = cipher_otp.encrypt(otp)
otp_dec = otp
else:
# encrypted otp
otp_enc = otp
otp_dec: bytes = cipher_otp.decrypt(otp)
otp_hash: bytes = otp_dec[0xE0:0x100]
otp_hash_digest: bytes = sha256(otp_dec[0:0xE0]).digest()
if otp_hash_digest != otp_hash:
raise CorruptOTPError(f'expected: {otp_hash.hex()}; result: {otp_hash_digest.hex()}')
otp_keysect_hash: bytes = sha256(otp_enc[0:0x90]).digest()
self.set_keyslot('x', 0x11, otp_keysect_hash[0:0x10])
self.set_keyslot('y', 0x11, otp_keysect_hash[0:0x10])
# most otp code from https://github.com/Stary2001/3ds_tools/blob/master/three_ds/aesengine.py
twl_cid_lo, twl_cid_hi = readle(otp_dec[0x08:0xC]), readle(otp_dec[0xC:0x10])
twl_cid_lo ^= 0xB358A6AF
twl_cid_lo |= 0x80000000
twl_cid_hi ^= 0x08C267B7
twl_cid_lo = twl_cid_lo.to_bytes(4, 'little')
twl_cid_hi = twl_cid_hi.to_bytes(4, 'little')
self.set_keyslot('x', 0x03, twl_cid_lo + b'NINTENDO' + twl_cid_hi)
console_key_xy: bytes = sha256(otp_dec[0x90:0xAC] + self.b9_extdata_otp).digest()
self.set_keyslot('x', 0x3F, console_key_xy[0:0x10])
self.set_keyslot('y', 0x3F, console_key_xy[0x10:0x20])
extdata_off = 0
def gen(n: int) -> bytes:
nonlocal extdata_off
extdata_off += 36
iv = self.b9_extdata_keygen[extdata_off:extdata_off+16]
extdata_off += 16
data = self.create_cbc_cipher(0x3F, iv).encrypt(self.b9_extdata_keygen[extdata_off:extdata_off + 64])
extdata_off += n
return data
a = gen(64)
for i in range(0x4, 0x8):
self.set_keyslot('x', i, a[0:16])
for i in range(0x8, 0xc):
self.set_keyslot('x', i, a[16:32])
for i in range(0xc, 0x10):
self.set_keyslot('x', i, a[32:48])
self.set_keyslot('x', 0x10, a[48:64])
b = gen(16)
off = 0
for i in range(0x14, 0x18):
self.set_keyslot('x', i, b[off:off + 16])
off += 16
c = gen(64)
for i in range(0x18, 0x1c):
self.set_keyslot('x', i, c[0:16])
for i in range(0x1c, 0x20):
self.set_keyslot('x', i, c[16:32])
for i in range(0x20, 0x24):
self.set_keyslot('x', i, c[32:48])
self.set_keyslot('x', 0x24, c[48:64])
d = gen(16)
off = 0
for i in range(0x28, 0x2c):
self.set_keyslot('x', i, d[off:off + 16])
off += 16
@_requires_bootrom
def setup_keys_from_otp_file(self, path: str):
"""Set up console-unique keys from an OTP file. Encrypted and decrypted are supported."""
with open(path, 'rb') as f:
self.setup_keys_from_otp(f.read(0x100))
def setup_sd_key(self, data: bytes):
"""Set up the SD key from movable.sed. Must be 0x10 (only key), 0x120 (no cmac), or 0x140 (with cmac)."""
if len(data) == 0x10:
key = data
elif len(data) in {0x120, 0x140}:
key = data[0x110:0x120]
else:
raise BadMovableSedError(f'invalid length ({len(data):#x}')
self.set_keyslot('y', Keyslot.SD, key)
self.set_keyslot('y', Keyslot.CMACSDNAND, key)
self.set_keyslot('y', Keyslot.DSiWareExport, key)
key_hash = sha256(key).digest()[0:16]
hash_parts = unpack('<IIII', key_hash)
self._id0 = pack('>IIII', *hash_parts)
def setup_sd_key_from_file(self, path: str):
"""Set up the SD key from a movable.sed file."""
with open(path, 'rb') as f:
self.setup_sd_key(f.read(0x140))

0
pyctr/types/__init__.py Normal file
View File

12
pyctr/types/base/title.py Normal file
View File

@@ -0,0 +1,12 @@
# This file is a part of ninfs.
#
# Copyright (c) 2017-2019 Ian Burgwin
# This file is licensed under The MIT License (MIT).
# You can find the full license text in LICENSE.md in the root of this project.
class TitleReaderBase:
closed = False

237
pyctr/types/cia.py Normal file
View File

@@ -0,0 +1,237 @@
# This file is a part of ninfs.
#
# Copyright (c) 2017-2019 Ian Burgwin
# This file is licensed under The MIT License (MIT).
# You can find the full license text in LICENSE.md in the root of this project.
from enum import IntEnum
from io import BytesIO
from threading import Lock
from typing import TYPE_CHECKING, NamedTuple
from ..common import PyCTRError, _ReaderOpenFileBase
from ..crypto import CryptoEngine, Keyslot
from ..types.ncch import NCCHReader
from ..types.tmd import TitleMetadataReader
from ..util import readle, roundup
if TYPE_CHECKING:
from typing import BinaryIO, Dict, Optional, Union
ALIGN_SIZE = 64
class CIAError(PyCTRError):
"""Generic error for CIA operations."""
class InvalidCIAError(CIAError):
"""Invalid CIA header exception."""
class CIASection(IntEnum):
# these values as negative, as positive ones are used for contents
ArchiveHeader = -4
CertificateChain = -3
Ticket = -2
TitleMetadata = -1
Meta = -5
class CIARegion(NamedTuple):
section: 'Union[int, CIASection]'
offset: int
size: int
iv: bytes # only used for encrypted sections
class _CIASectionFile(_ReaderOpenFileBase):
"""Provides a raw CIA section as a file-like object."""
def __init__(self, reader: 'CIAReader', path: 'CIASection'):
super().__init__(reader, path)
self._info = reader.sections[path]
class CIAReader:
"""Class for the 3DS CIA container."""
closed = False
def __init__(self, fp: 'Union[str, BinaryIO]', *, case_insensitive: bool = True, crypto: CryptoEngine = None,
dev: bool = False, seeddb: str = None, load_contents: bool = True):
if isinstance(fp, str):
fp = open(fp, 'rb')
if crypto:
self._crypto = crypto
else:
self._crypto = CryptoEngine(dev=dev)
# store the starting offset so the CIA can be read from any point in the base file
self._start = fp.tell()
self._fp = fp
# store case-insensitivity for RomFSReader
self._case_insensitive = case_insensitive
# threading lock
self._lock = Lock()
header = fp.read(0x20)
archive_header_size = readle(header[0x0:0x4])
if archive_header_size != 0x2020:
raise InvalidCIAError('Archive Header Size is not 0x2020')
# in practice, the certificate chain is the same for all retail titles
cert_chain_size = readle(header[0x8:0xC])
# the ticket size usually never changes from 0x350
# there is one ticket (without an associated title) that is smaller though
ticket_size = readle(header[0xC:0x10])
# tmd contains info about the contents of the title
tmd_size = readle(header[0x10:0x14])
# meta contains info such as the SMDH and Title ID dependency list
meta_size = readle(header[0x14:0x18])
# content size is the total size of the contents
# I'm not sure what happens yet if one of the contents is not aligned to 0x40 bytes.
content_size = readle(header[0x18:0x20])
# the content index determines what contents are in the CIA
# this is not stored as int, so it's faster to parse(?)
content_index = fp.read(archive_header_size - 0x20)
active_contents = set()
for idx, b in enumerate(content_index):
offset = idx * 8
curr = b
for x in range(7, -1, -1):
if curr & 1:
active_contents.add(x + offset)
curr >>= 1
# the header only stores sizes; offsets need to be calculated.
# the sections are aligned to 64(0x40) bytes. for example, if something is 0x78,
# it will take up 0x80, with the remaining 0x8 being padding.
cert_chain_offset = roundup(archive_header_size, ALIGN_SIZE)
ticket_offset = cert_chain_offset + roundup(cert_chain_size, ALIGN_SIZE)
tmd_offset = ticket_offset + roundup(ticket_size, ALIGN_SIZE)
content_offset = tmd_offset + roundup(tmd_size, ALIGN_SIZE)
meta_offset = content_offset + roundup(content_size, ALIGN_SIZE)
# lazy method to get the total size
self.total_size = meta_offset + meta_size
# this contains the location of each section, as well as the IV of encrypted ones
self.sections: Dict[Union[int, CIASection], CIARegion] = {}
def add_region(section: 'Union[int, CIASection]', offset: int, size: int, iv: 'Optional[bytes]'):
region = CIARegion(section=section, offset=offset, size=size, iv=iv)
self.sections[section] = region
# add each part of the header
add_region(CIASection.ArchiveHeader, 0, archive_header_size, None)
add_region(CIASection.CertificateChain, cert_chain_offset, cert_chain_size, None)
add_region(CIASection.Ticket, ticket_offset, ticket_size, None)
add_region(CIASection.TitleMetadata, tmd_offset, tmd_size, None)
if meta_size:
add_region(CIASection.Meta, meta_offset, meta_size, None)
# this will load the titlekey to decrypt the contents
self._fp.seek(self._start + ticket_offset)
ticket = self._fp.read(ticket_size)
self._crypto.load_from_ticket(ticket)
# the tmd describes the contents: ID, index, size, and hash
self._fp.seek(self._start + tmd_offset)
tmd_data = self._fp.read(tmd_size)
self.tmd = TitleMetadataReader.load(BytesIO(tmd_data))
active_contents_tmd = set()
self.content_info = []
# this does a first check to make sure there are no missing contents that are marked active in content_index
for record in self.tmd.chunk_records:
if record.cindex in active_contents:
active_contents_tmd.add(record.cindex)
self.content_info.append(record)
# if the result of this is not an empty set, it means there are contents enabled in content_index
# that are not in the tmd, which is bad
if active_contents ^ active_contents_tmd:
raise InvalidCIAError('Missing active contents in the TMD')
self.contents = {}
# this goes through the contents and figures out their regions, then creates an NCCHReader
curr_offset = content_offset
for record in self.content_info:
iv = None
if record.type.encrypted:
iv = record.cindex.to_bytes(2, 'big') + (b'\0' * 14)
add_region(record.cindex, curr_offset, record.size, iv)
if load_contents:
# check if the content is a Nintendo DS ROM (SRL) first
is_srl = record.cindex == 0 and self.tmd.title_id[3:5] == '48'
if not is_srl:
content_fp = self.open_raw_section(record.cindex)
self.contents[record.cindex] = NCCHReader(content_fp, case_insensitive=case_insensitive,
dev=dev, seeddb=seeddb)
curr_offset += record.size
def close(self):
self.closed = True
try:
self._fp.close()
except AttributeError:
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
__del__ = close
def __repr__(self):
info = [('title_id', self.tmd.title_id)]
try:
info.append(('title_name', repr(self.contents[0].exefs.icon.get_app_title().short_desc)))
except KeyError:
info.append(('title_name', 'unknown'))
info.append(('content_count', len(self.contents)))
info_final = " ".join(x + ": " + str(y) for x, y in info)
return f'<{type(self).__name__} {info_final}>'
def open_raw_section(self, section: 'CIASection'):
"""Open a raw CIA section for reading."""
return _CIASectionFile(self, section)
def get_data(self, region: 'CIARegion', offset: int, size: int) -> bytes:
if offset + size > region.size:
# prevent reading past the region
size = region.size - offset
with self._lock:
if region.iv:
real_size = size
# if encrypted, the block needs to be decrypted first
# CBC requires a full block (0x10 in this case). and the previous
# block is used as the IV. so that's quite a bit to read if the
# application requires just a few bytes.
# thanks Stary2001 for help with random-access crypto
before = offset % 16
if size % 16 != 0:
size = size + 16 - size % 16
if offset - before == 0:
iv = region.iv
else:
self._fp.seek(self._start + region.offset + offset - before - 0x10)
iv = self._fp.read(0x10)
# read to block size
self._fp.seek(self._start + region.offset + offset - before)
# adding x10 to the size fixes some kind of decryption bug I think. this needs more testing.
return self._crypto.create_cbc_cipher(Keyslot.DecryptedTitlekey,
iv).decrypt(self._fp.read(size + 0x10))[before:real_size + before]
else:
# no encryption
self._fp.seek(self._start + region.offset + offset)
return self._fp.read(size)

308
pyctr/types/exefs.py Normal file
View File

@@ -0,0 +1,308 @@
# This file is a part of ninfs.
#
# Copyright (c) 2017-2019 Ian Burgwin
# This file is licensed under The MIT License (MIT).
# You can find the full license text in LICENSE.md in the root of this project.
from hashlib import sha256
from threading import Lock
from typing import TYPE_CHECKING, NamedTuple
from ..common import PyCTRError, _ReaderOpenFileBase
from ..util import readle
from ..types.smdh import SMDH, InvalidSMDHError
if TYPE_CHECKING:
from typing import BinaryIO, Dict, Union
__all__ = ['EXEFS_EMPTY_ENTRY', 'EXEFS_ENTRY_SIZE', 'EXEFS_ENTRY_COUNT', 'EXEFS_HEADER_SIZE', 'ExeFSError',
'ExeFSFileNotFoundError', 'InvalidExeFSError', 'ExeFSNameError', 'BadOffsetError', 'CodeDecompressionError',
'decompress_code', 'ExeFSReader']
EXEFS_ENTRY_SIZE = 0x10
EXEFS_ENTRY_COUNT = 10
EXEFS_EMPTY_ENTRY = b'\0' * EXEFS_ENTRY_SIZE
EXEFS_HEADER_SIZE = 0x200
CODE_DECOMPRESSED_NAME = '.code-decompressed'
class ExeFSError(PyCTRError):
"""Generic exception for ExeFS operations."""
class ExeFSFileNotFoundError(ExeFSError):
"""File not found in the ExeFS."""
class InvalidExeFSError(ExeFSError):
"""Invalid ExeFS header."""
class ExeFSNameError(InvalidExeFSError):
"""Name could not be decoded, likely making the file not a valid ExeFS."""
def __str__(self):
return f'could not decode from ascii: {self.args[0]!r}'
class BadOffsetError(InvalidExeFSError):
"""Offset is not a multiple of 0x200. This kind of ExeFS will not work on a 3DS."""
def __str__(self):
return f'offset is not a multiple of 0x200: {self.args[0]:#x}'
class CodeDecompressionError(ExeFSError):
"""Exception when attempting to decompress ExeFS .code."""
# lazy check
CODE_MAX_SIZE = 0x2300000
def decompress_code(code: bytes) -> bytes:
# remade from C code, this could probably be done better
# https://github.com/d0k3/GodMode9/blob/689f6f7cf4280bf15885cbbf848d8dce81def36b/arm9/source/game/codelzss.c#L25-L93
off_size_comp = int.from_bytes(code[-8:-4], 'little')
add_size = int.from_bytes(code[-4:], 'little')
comp_start = 0
code_len = len(code)
code_comp_size = off_size_comp & 0xFFFFFF
code_comp_end = code_comp_size - ((off_size_comp >> 24) % 0xFF)
code_dec_size = code_len + add_size
if code_len < 8:
raise CodeDecompressionError('code_len < 8')
if code_len > CODE_MAX_SIZE:
raise CodeDecompressionError('code_len > CODE_MAX_SIZE')
if code_comp_size <= code_len:
comp_start = code_len - code_comp_size
if code_comp_end < 0:
raise CodeDecompressionError('code_comp_end < 0')
if code_dec_size > CODE_MAX_SIZE:
raise CodeDecompressionError('code_dec_size > CODE_MAX_SIZE')
dec = bytearray(code)
dec.extend(b'\0' * add_size)
data_end = comp_start + code_dec_size
ptr_in = comp_start + code_comp_end
ptr_out = code_dec_size
while ptr_in > comp_start and ptr_out > comp_start:
if ptr_out < ptr_in:
raise CodeDecompressionError('ptr_out < ptr_in')
ptr_in -= 1
ctrl_byte = dec[ptr_in]
for i in range(7, -1, -1):
if ptr_in <= comp_start or ptr_out <= comp_start:
break
if (ctrl_byte >> i) & 1:
ptr_in -= 2
seg_code = int.from_bytes(dec[ptr_in:ptr_in + 2], 'little')
if ptr_in < comp_start:
raise CodeDecompressionError('ptr_in < comp_start')
seg_off = (seg_code & 0x0FFF) + 2
seg_len = ((seg_code >> 12) & 0xF) + 3
if ptr_out - seg_len < comp_start:
raise CodeDecompressionError('ptr_out - seg_len < comp_start')
if ptr_out + seg_off >= data_end:
raise CodeDecompressionError('ptr_out + seg_off >= data_end')
c = 0
while c < seg_len:
byte = dec[ptr_out + seg_off]
ptr_out -= 1
dec[ptr_out] = byte
c += 1
else:
if ptr_out == comp_start:
raise CodeDecompressionError('ptr_out == comp_start')
if ptr_in == comp_start:
raise CodeDecompressionError('ptr_in == comp_start')
ptr_out -= 1
ptr_in -= 1
dec[ptr_out] = dec[ptr_in]
if ptr_in != comp_start:
raise CodeDecompressionError('ptr_in != comp_start')
if ptr_out != comp_start:
raise CodeDecompressionError('ptr_out != comp_start')
return bytes(dec)
class ExeFSEntry(NamedTuple):
name: str
offset: int
size: int
hash: bytes
def _normalize_path(p: str):
"""Fix a given path to work with ExeFS filenames."""
if p.startswith('/'):
p = p[1:]
# while it is technically possible for an ExeFS entry to contain ".bin",
# this would not happen in practice.
# even so, normalization can be disabled by passing normalize=False to
# ExeFSReader.open
if p.lower().endswith('.bin'):
p = p[:4]
return p
class _ExeFSOpenFile(_ReaderOpenFileBase):
"""Class for open ExeFS file entries."""
def __init__(self, reader: 'ExeFSReader', path: str):
super().__init__(reader, path)
try:
self._info = reader.entries[self._path]
except KeyError:
raise ExeFSFileNotFoundError(self._path)
class ExeFSReader:
"""
Class to read the 3DS ExeFS container.
http://3dbrew.org/wiki/ExeFS
"""
closed = False
_code_dec = None
icon: 'SMDH' = None
def __init__(self, fp: 'Union[str, BinaryIO]', *, _load_icon: bool = True):
if isinstance(fp, str):
fp = open(fp, 'rb')
# storing the starting offset lets it work from anywhere in the file
self._start = fp.tell()
self._fp = fp
self._lock = Lock()
self.entries: 'Dict[str, ExeFSEntry]' = {}
header = fp.read(EXEFS_HEADER_SIZE)
# ExeFS entries can fit up to 10 names. hashes are stored in reverse order
# (e.g. the first entry would have the hash at the very end - 0x1E0)
for entry_n, hash_n in zip(range(0, EXEFS_ENTRY_COUNT * EXEFS_ENTRY_SIZE, EXEFS_ENTRY_SIZE),
range(0x1E0, 0xA0, -0x20)):
entry_raw = header[entry_n:entry_n + 0x10]
entry_hash = header[hash_n:hash_n + 0x20]
if entry_raw == EXEFS_EMPTY_ENTRY:
continue
try:
# ascii is used since only a-z would be used in practice
name = entry_raw[0:8].rstrip(b'\0').decode('ascii')
except UnicodeDecodeError:
raise ExeFSNameError(entry_raw[0:8])
entry = ExeFSEntry(name=name,
offset=readle(entry_raw[8:12]),
size=readle(entry_raw[12:16]),
hash=entry_hash)
# the 3DS fails to parse an ExeFS with an offset that isn't a multiple of 0x200
# so we should do the same here
if entry.offset % 0x200:
raise BadOffsetError(entry.offset)
self.entries[name] = entry
# this sometimes needs to be loaded outside, since reading it here may cause encryption problems
# when the NCCH has not fully initialized yet and needs to figure out what ExeFS regions need
# to be decrypted with the Original NCCH key
if _load_icon:
self._load_icon()
def _load_icon(self):
try:
with self.open('icon') as f:
self.icon = SMDH.load(f)
except InvalidSMDHError:
pass
def __len__(self) -> int:
"""Return the amount of entries in the ExeFS."""
return len(self.entries)
def close(self):
self.closed = True
try:
self._fp.close()
except AttributeError:
pass
__del__ = close
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def open(self, path: str, *, normalize: bool = True):
"""Open a file in the ExeFS for reading."""
if normalize:
# remove beginning "/" and ending ".bin"
path = _normalize_path(path)
return _ExeFSOpenFile(self, path)
def get_data(self, info: ExeFSEntry, offset: int, size: int) -> bytes:
if offset + size > info.size:
size = info.size - offset
with self._lock:
if info.offset == -1:
# return the decompressed code instead
return self._code_dec[offset:offset + size]
else:
# data for ExeFS entries start relative to the end of the header
self._fp.seek(self._start + EXEFS_HEADER_SIZE + info.offset + offset)
return self._fp.read(size)
def decompress_code(self) -> bool:
"""
Decompress '.code' in the container. The result will be available as '.code-decompressed'.
The return value is if '.code' was actually decompressed.
"""
with self.open('.code') as f:
code = f.read()
# if it's already decompressed, this would return the code unmodified
code_dec = decompress_code(code)
decompressed = code_dec != code
if decompressed:
code_dec_hash = sha256(code_dec)
entry = ExeFSEntry(name=CODE_DECOMPRESSED_NAME,
offset=-1,
size=len(code_dec),
hash=code_dec_hash.digest())
self._code_dec = code_dec
else:
# if the code was already decompressed, don't store a second copy in memory
code_entry = self.entries['.code']
entry = ExeFSEntry(name=CODE_DECOMPRESSED_NAME,
offset=code_entry.offset,
size=code_entry.size,
hash=code_entry.hash)
self.entries[CODE_DECOMPRESSED_NAME] = entry
# returns if the code was actually decompressed or not
return decompressed

9
pyctr/types/extheader.py Normal file
View File

@@ -0,0 +1,9 @@
# This file is a part of ninfs.
#
# Copyright (c) 2017-2019 Ian Burgwin
# This file is licensed under The MIT License (MIT).
# You can find the full license text in LICENSE.md in the root of this project.
class ExtendedHeaderReader:
def __init__(self):

521
pyctr/types/ncch.py Normal file
View File

@@ -0,0 +1,521 @@
# This file is a part of ninfs.
#
# Copyright (c) 2017-2019 Ian Burgwin
# This file is licensed under The MIT License (MIT).
# You can find the full license text in LICENSE.md in the root of this project.
from hashlib import sha256
from enum import IntEnum
from math import ceil
from os import environ
from os.path import join as pjoin
from threading import Lock
from typing import TYPE_CHECKING, NamedTuple
from .exefs import ExeFSReader, EXEFS_HEADER_SIZE
from .romfs import RomFSReader
from ..common import PyCTRError, _ReaderOpenFileBase
from ..util import config_dirs, readle, roundup
from ..crypto import CryptoEngine, Keyslot
if TYPE_CHECKING:
from typing import BinaryIO, Dict, List, Optional, Tuple, Union
__all__ = ['NCCH_MEDIA_UNIT', 'NO_ENCRYPTION', 'EXEFS_NORMAL_CRYPTO_FILES', 'FIXED_SYSTEM_KEY', 'NCCHError',
'InvalidNCCHError', 'NCCHSeedError', 'MissingSeedError', 'SeedDBNotFoundError', 'get_seed',
'extra_cryptoflags', 'NCCHSection', 'NCCHRegion', 'NCCHFlags', 'NCCHReader']
class NCCHError(PyCTRError):
"""Generic exception for NCCH operations."""
class InvalidNCCHError(NCCHError):
"""Invalid NCCH header exception."""
class NCCHSeedError(NCCHError):
"""NCCH seed is not set up, or attempted to set up seed when seed crypto is not used."""
class MissingSeedError(NCCHSeedError):
"""Seed could not be found."""
class SeedDBNotFoundError(NCCHSeedError):
"""SeedDB was not found. Main argument is a tuple of checked paths."""
def get_seed(f: 'BinaryIO', program_id: int) -> bytes:
"""Get a seed in a seeddb.bin from an I/O stream."""
# convert the Program ID to little-endian bytes, as the TID is stored in seeddb.bin this way
tid_bytes = program_id.to_bytes(0x8, 'little')
f.seek(0)
# get the amount of seeds
seed_count = readle(f.read(4))
f.seek(0x10)
for _ in range(seed_count):
entry = f.read(0x20)
if entry[0:8] == tid_bytes:
return entry[0x8:0x18]
raise NCCHSeedError(f'missing seed for {program_id:016X} from seeddb.bin')
seeddb_paths = [pjoin(x, 'seeddb.bin') for x in config_dirs]
try:
# try to insert the path in the SEEDDB_PATH environment variable
seeddb_paths.insert(0, environ['SEEDDB_PATH'])
except KeyError:
pass
# NCCH sections are stored in media units
# for example, ExeFS may be stored in 13 media units, which is 0x1A00 bytes (13 * 0x200)
NCCH_MEDIA_UNIT = 0x200
# depending on the crypto_method flag, a different keyslot may be used for RomFS and parts of ExeFS.
extra_cryptoflags = {0x00: Keyslot.NCCH, 0x01: Keyslot.NCCH70, 0x0A: Keyslot.NCCH93, 0x0B: Keyslot.NCCH96}
# if fixed_crypto_key is enabled, the normal key is normally all zeros.
# however is (program_id & (0x10 << 32)) is true, this key is used instead.
FIXED_SYSTEM_KEY = 0x527CE630A9CA305F3696F3CDE954194B
# this is IntEnum to make generating the IV easier
class NCCHSection(IntEnum):
ExtendedHeader = 1
ExeFS = 2
RomFS = 3
# no crypto
Header = 4
Logo = 5
Plain = 6
# special
FullDecrypted = 7
Raw = 8
# these sections don't use encryption at all
NO_ENCRYPTION = {NCCHSection.Header, NCCHSection.Logo, NCCHSection.Plain, NCCHSection.Raw}
# the contents of these files in the ExeFS, plus the header, will always use the Original NCCH keyslot
# therefore these regions need to be stored to check what keyslot is used to decrypt
EXEFS_NORMAL_CRYPTO_FILES = {'icon', 'banner'}
class NCCHRegion(NamedTuple):
section: 'NCCHSection'
offset: int
size: int
end: int # this is just offset + size, stored to avoid re-calculation later on
# not all sections will actually use this (see NCCHSection), so some have a useless value
iv: int
class NCCHFlags(NamedTuple):
# determines the extra keyslot used for RomFS and parts of ExeFS
crypto_method: int
# if this is a CXI (CTR Executable Image) or CFA (CTR File Archive)
# in the raw flags, "Data" has to be set for it to be a CFA, while "Executable" is unset.
executable: bool
# if the content is encrypted using a fixed normal key.
fixed_crypto_key: bool
# if RomFS is to be ignored
no_romfs: bool
# if the NCCH has no encryption
no_crypto: bool
# if a seed must be loaded to load RomFS and parts of ExeFS
uses_seed: bool
class _NCCHSectionFile(_ReaderOpenFileBase):
"""Provides a raw, decrypted NCCH section as a file-like object."""
def __init__(self, reader: 'NCCHReader', path: 'NCCHSection'):
super().__init__(reader, path)
self._info = reader.sections[path]
class NCCHReader:
"""Class for 3DS NCCH container."""
seed_set_up = False
seed: 'Optional[bytes]' = None
# this is the KeyY when generated using the seed
_seeded_key_y = None
closed = False
# this lists the ranges of the ExeFS to decrypt with Original NCCH (see load_sections)
_exefs_keyslot_normal_range: 'List[Tuple[int, int]]'
exefs: 'Optional[ExeFSReader]' = None
romfs: 'Optional[RomFSReader]' = None
def __init__(self, fp: 'Union[str, BinaryIO]', *, case_insensitive: bool = True, crypto: CryptoEngine = None,
dev: bool = False, seeddb: str = None, load_sections: bool = True):
if isinstance(fp, str):
fp = open(fp, 'rb')
if crypto:
self._crypto = crypto
else:
self._crypto = CryptoEngine(dev=dev)
# store the starting offset so the NCCH can be read from any point in the base file
self._start = fp.tell()
self._fp = fp
# store case-insensitivity for RomFSReader
self._case_insensitive = case_insensitive
# threaing lock
self._lock = Lock()
header = fp.read(0x200)
# load the Key Y from the first 0x10 of the signature
self._key_y = header[0x0:0x10]
# store the ncch version
self.version = readle(header[0x112:0x114])
# get the total size of the NCCH container, and store it in bytes
self.content_size = readle(header[0x104:0x108]) * NCCH_MEDIA_UNIT
# get the Partition ID, which is used in the encryption
# this is generally different for each content in a title, except for DLC
self.partition_id = readle(header[0x108:0x110])
# load the seed verify field, which is part of an sha256 hash to verify if
# a seed is correct for this title
self._seed_verify = header[0x114:0x118]
# load the Product Code store it as a unicode string
self.product_code = header[0x150:0x160].decode('ascii').strip('\0')
# load the Program ID
# this is the Title ID, and
self.program_id = readle(header[0x118:0x120])
# load the extheader size, but this code only uses it to determine if it exists
extheader_size = readle(header[0x180:0x184])
# each section is stored with the section ID, then the region information (offset, size, IV)
self.sections: 'Dict[NCCHSection, NCCHRegion]' = {}
# same as above, but includes non-existant regions too, for the full-decrypted handler
self._all_sections: 'Dict[NCCHSection, NCCHRegion]' = {}
def add_region(section: 'NCCHSection', starting_unit: int, units: int):
offset = starting_unit * NCCH_MEDIA_UNIT
size = units * NCCH_MEDIA_UNIT
region = NCCHRegion(section=section,
offset=offset,
size=size,
end=offset + size,
iv=self.partition_id << 64 | (section << 56))
self._all_sections[section] = region
if units != 0: # only add existing regions
self.sections[section] = region
# add the header as the first region
add_region(NCCHSection.Header, 0, 1)
# add the full decrypted content, which when read, simulates a fully decrypted NCCH container
add_region(NCCHSection.FullDecrypted, 0, self.content_size // NCCH_MEDIA_UNIT)
# add the full raw content
add_region(NCCHSection.Raw, 0, self.content_size // NCCH_MEDIA_UNIT)
# only care about the exheader if it's the expected size
if extheader_size == 0x400:
add_region(NCCHSection.ExtendedHeader, 1, 4)
else:
add_region(NCCHSection.ExtendedHeader, 0, 0)
# add the remaining NCCH regions
# some of these may not exist, and won't be added if units (second value) is 0
add_region(NCCHSection.Logo, readle(header[0x198:0x19C]), readle(header[0x19C:0x1A0]))
add_region(NCCHSection.Plain, readle(header[0x190:0x194]), readle(header[0x194:0x198]))
add_region(NCCHSection.ExeFS, readle(header[0x1A0:0x1A4]), readle(header[0x1A4:0x1A8]))
add_region(NCCHSection.RomFS, readle(header[0x1B0:0x1B4]), readle(header[0x1B4:0x1B8]))
# parse flags
flags_raw = header[0x188:0x190]
self.flags = NCCHFlags(crypto_method=flags_raw[3], executable=bool(flags_raw[5] & 0x2),
fixed_crypto_key=bool(flags_raw[7] & 0x1), no_romfs=bool(flags_raw[7] & 0x2),
no_crypto=bool(flags_raw[7] & 0x4), uses_seed=bool(flags_raw[7] & 0x20))
# load the original (non-seeded) KeyY into the Original NCCH slot
self._crypto.set_keyslot('y', Keyslot.NCCH, self.get_key_y(original=True))
# load the seed if needed
if self.flags.uses_seed:
self.load_seed_from_seeddb(seeddb)
# load the (seeded, if needed) key into the extra keyslot
self._crypto.set_keyslot('y', self.extra_keyslot, self.get_key_y())
# load the sections using their specific readers
if load_sections:
self.load_sections()
def close(self):
self.closed = True
try:
self._fp.close()
except AttributeError:
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
__del__ = close
def load_sections(self):
"""Load the sections of the NCCH (Extended Header, ExeFS, and RomFS)."""
# try to load the ExeFS
try:
self._fp.seek(self._start + self.sections[NCCHSection.ExeFS].offset)
except KeyError:
pass # no ExeFS
else:
# this is to generate what regions should be decrypted with the Original NCCH keyslot
# technically, it's not actually 0x200 chunks or units. the actual space of the file
# is encrypted with the different key. for example, if .code is 0x300 bytes, that
# means the first 0x300 are encrypted with the NCCH 7.x key, and the remaining
# 0x100 uses Original NCCH. however this would be quite a pain to implement properly
# with random access, so I only work with 0x200 chunks here. after all, the space
# after the file is effectively unused. it makes no difference, except for
# perfectionists who want it perfectly decrypted. GodMode9 does it properly I think,
# if that is what you want. or you can fix the empty space yourself with a hex editor.
self._exefs_keyslot_normal_range = [(0, 0x200)]
exefs_fp = self.open_raw_section(NCCHSection.ExeFS)
# load the RomFS reader
self.exefs = ExeFSReader(exefs_fp, _load_icon=False)
for entry in self.exefs.entries.values():
if entry.name in EXEFS_NORMAL_CRYPTO_FILES:
# this will add the offset (relative to ExeFS start), with the size
# rounded up to 0x200 chunks
r = (entry.offset + EXEFS_HEADER_SIZE,
entry.offset + EXEFS_HEADER_SIZE + roundup(entry.size, NCCH_MEDIA_UNIT))
self._exefs_keyslot_normal_range.append(r)
self.exefs._load_icon()
# try to load RomFS
if not self.flags.no_romfs:
try:
self._fp.seek(self._start + self.sections[NCCHSection.RomFS].offset)
except KeyError:
pass # no RomFS
else:
romfs_fp = self.open_raw_section(NCCHSection.RomFS)
# load the RomFS reader
self.romfs = RomFSReader(romfs_fp, case_insensitive=self._case_insensitive)
def open_raw_section(self, section: 'NCCHSection'):
"""Open a raw NCCH section for reading."""
return _NCCHSectionFile(self, section)
def get_key_y(self, original: bool = False) -> bytes:
if original or not self.flags.uses_seed:
return self._key_y
if self.flags.uses_seed and not self.seed_set_up:
raise MissingSeedError('NCCH uses seed crypto, but seed is not set up')
else:
return self._seeded_key_y
@property
def extra_keyslot(self) -> int:
return extra_cryptoflags[self.flags.crypto_method]
def check_for_extheader(self) -> bool:
return NCCHSection.ExtendedHeader in self.sections
def setup_seed(self, seed: bytes):
if not self.flags.uses_seed:
raise NCCHSeedError('NCCH does not use seed crypto')
seed_verify_hash = sha256(seed + self.program_id.to_bytes(0x8, 'little')).digest()
if seed_verify_hash[0x0:0x4] != self._seed_verify:
raise NCCHSeedError('given seed does not match with seed verify hash in header')
self.seed = seed
self._seeded_key_y = sha256(self._key_y + seed).digest()[0:16]
self.seed_set_up = True
def load_seed_from_seeddb(self, path: str = None):
if not self.flags.uses_seed:
raise NCCHSeedError('NCCH does not use seed crypto')
if path:
# if a path was provided, use only that
paths = (path,)
else:
# use the fixed set of paths
paths = seeddb_paths
for fn in paths:
try:
with open(fn, 'rb') as f:
# try to load the seed from the file
self.setup_seed(get_seed(f, self.program_id))
return
except FileNotFoundError:
continue
# if keys are not set...
raise InvalidNCCHError(paths)
def get_data(self, section: 'Union[NCCHRegion, NCCHSection]', offset: int, size: int) -> bytes:
try:
region = self._all_sections[section]
except KeyError:
region = section
if offset + size > region.size:
# prevent reading past the region
size = region.size - offset
# the full-decrypted handler is done outside of the thread lock
if region.section == NCCHSection.FullDecrypted:
before = offset % 0x200
aligned_offset = offset - before
aligned_size = size + before
def do_thing(al_offset: int, al_size: int, cut_start: int, cut_end: int):
# get the offset of the end of the last chunk
end = al_offset + (ceil(al_size / 0x200) * 0x200)
# store the sections to read
# dict is ordered by default in CPython since 3.6.0, and part of the language spec since 3.7.0
to_read: Dict[Tuple[NCCHSection, int], List[int]] = {}
# get each section to a local variable for easier access
header = self._all_sections[NCCHSection.Header]
extheader = self._all_sections[NCCHSection.ExtendedHeader]
logo = self._all_sections[NCCHSection.Logo]
plain = self._all_sections[NCCHSection.Plain]
exefs = self._all_sections[NCCHSection.ExeFS]
romfs = self._all_sections[NCCHSection.RomFS]
last_region = False
# this is somewhat hardcoded for performance reasons. this may be optimized better later.
for chunk_offset in range(al_offset, end, 0x200):
# RomFS check first, since it might be faster
if romfs.offset <= chunk_offset < romfs.end:
region = (NCCHSection.RomFS, 0)
curr_offset = romfs.offset
# ExeFS check second, since it might be faster
elif exefs.offset <= chunk_offset < exefs.end:
region = (NCCHSection.ExeFS, 0)
curr_offset = exefs.offset
elif header.offset <= chunk_offset < header.end:
region = (NCCHSection.Header, 0)
curr_offset = header.offset
elif extheader.offset <= chunk_offset < extheader.end:
region = (NCCHSection.ExtendedHeader, 0)
curr_offset = extheader.offset
elif logo.offset <= chunk_offset < logo.end:
region = (NCCHSection.Logo, 0)
curr_offset = logo.offset
elif plain.offset <= chunk_offset < plain.end:
region = (NCCHSection.Plain, 0)
curr_offset = plain.offset
else:
region = (NCCHSection.Raw, chunk_offset)
curr_offset = 0
if region not in to_read:
to_read[region] = [chunk_offset - curr_offset, 0]
to_read[region][1] += 0x200
last_region = region
is_start = True
for region, info in to_read.items():
new_data = self.get_data(region[0], info[0], info[1])
if region[0] == NCCHSection.Header:
# fix crypto flags
ncch_array = bytearray(new_data)
ncch_array[0x18B] = 0
ncch_array[0x18F] = 4
new_data = bytes(ncch_array)
if is_start:
new_data = new_data[cut_start:]
is_start = False
if region == last_region and cut_end != 0x200:
new_data = new_data[:-cut_end]
yield new_data
return b''.join(do_thing(aligned_offset, aligned_size, before, 0x200 - ((size + before) % 0x200)))
with self._lock:
# check if decryption is really needed
if self.flags.no_crypto or region.section in NO_ENCRYPTION:
self._fp.seek(self._start + region.offset + offset)
return self._fp.read(size)
# thanks Stary2001 for help with random-access crypto
# if the region is ExeFS and extra crypto is being used, special handling is required
# because different parts use different encryption methods
if region.section == NCCHSection.ExeFS and self.flags.crypto_method != 0x00:
# get the amount to cut off at the beginning
before = offset % 0x200
# get the offset of the starting chunk
aligned_offset = offset - before
# get the real offset of the starting chunk
aligned_real_offset = self._start + region.offset + aligned_offset
# get the aligned total size of the requested size
aligned_size = size + before
self._fp.seek(aligned_real_offset)
def do_thing(al_offset: int, al_size: int, cut_start: int, cut_end: int):
# get the offset of the end of the last chunk
end = al_offset + (ceil(al_size / 0x200) * 0x200)
# get the offset to the last chunk
last_chunk_offset = end - 0x200
# noinspection PyTypeChecker
for chunk in range(al_offset, end, 0x200):
# generate the IV for this chunk
iv = region.iv + (chunk >> 4)
# get the extra keyslot
keyslot = self.extra_keyslot
for r in self._exefs_keyslot_normal_range:
if r[0] <= self._fp.tell() - region.offset < r[1]:
# if the chunk is within the "normal keyslot" ranges,
# use the Original NCCH keyslot instead
keyslot = Keyslot.NCCH
# decrypt the data
out = self._crypto.create_ctr_cipher(keyslot, iv).decrypt(self._fp.read(0x200))
if chunk == al_offset:
# cut off the beginning if it's the first chunk
out = out[cut_start:]
if chunk == last_chunk_offset and cut_end != 0x200:
# cut off the end of it's the last chunk
out = out[:-cut_end]
yield out
# join all the chunks into one bytes result and return it
return b''.join(do_thing(aligned_offset, aligned_size, before, 0x200 - ((size + before) % 0x200)))
else:
# seek to the real offset of the section + the requested offset
self._fp.seek(self._start + region.offset + offset)
data = self._fp.read(size)
# choose the extra keyslot only for RomFS here
# ExeFS needs special handling if a newer keyslot is used, therefore it's not checked here
keyslot = self.extra_keyslot if region.section == NCCHSection.RomFS else Keyslot.NCCH
# get the amount of padding required at the beginning
before = offset % 16
# pad the beginning of the data if needed (the ending part doesn't need padding)
data = (b'\0' * before) + data
# decrypt the data, then cut off the padding
return self._crypto.create_ctr_cipher(keyslot, region.iv + (offset >> 4)).decrypt(data)[before:]

246
pyctr/types/romfs.py Normal file
View File

@@ -0,0 +1,246 @@
# This file is a part of ninfs.
#
# Copyright (c) 2017-2019 Ian Burgwin
# This file is licensed under The MIT License (MIT).
# You can find the full license text in LICENSE.md in the root of this project.
from io import TextIOWrapper
from threading import Lock
from typing import overload, TYPE_CHECKING, NamedTuple
from ..common import PyCTRError, _ReaderOpenFileBase
from ..util import readle, roundup
if TYPE_CHECKING:
from typing import BinaryIO, Optional, Tuple, Union
__all__ = ['IVFC_HEADER_SIZE', 'IVFC_ROMFS_MAGIC_NUM', 'ROMFS_LV3_HEADER_SIZE', 'RomFSError', 'InvalidIVFCError',
'InvalidRomFSHeaderError', 'RomFSEntryError', 'RomFSFileNotFoundError', 'RomFSReader']
IVFC_HEADER_SIZE = 0x5C
IVFC_ROMFS_MAGIC_NUM = 0x10000
ROMFS_LV3_HEADER_SIZE = 0x28
class RomFSError(PyCTRError):
"""Generic exception for RomFS operations."""
class InvalidIVFCError(RomFSError):
"""Invalid IVFC header exception."""
class InvalidRomFSHeaderError(RomFSError):
"""Invalid RomFS Level 3 header."""
class RomFSEntryError(RomFSError):
"""Error with RomFS Directory or File entry."""
class RomFSFileNotFoundError(RomFSEntryError):
"""Invalid file path in RomFS Level 3."""
class RomFSIsADirectoryError(RomFSEntryError):
"""Attempted to open a directory as a file."""
class RomFSRegion(NamedTuple):
offset: int
size: int
class RomFSDirectoryEntry(NamedTuple):
name: str
type: str
contents: 'Tuple[str, ...]'
class RomFSFileEntry(NamedTuple):
name: str
type: str
offset: int
size: int
class _RomFSOpenFile(_ReaderOpenFileBase):
"""Class for open RomFS file entries."""
def __init__(self, reader: 'RomFSReader', path: str):
super().__init__(reader, path)
self._info: RomFSFileEntry = reader.get_info_from_path(path)
if not isinstance(self._info, RomFSFileEntry):
raise RomFSIsADirectoryError(path)
class RomFSReader:
"""
Class for 3DS RomFS Level 3 partition.
https://www.3dbrew.org/wiki/RomFS
"""
closed = False
lv3_offset = 0
data_offset = 0
def __init__(self, fp: 'Union[str, BinaryIO]', case_insensitive: bool = False):
if isinstance(fp, str):
fp = open(fp, 'rb')
self._start = fp.tell()
self._fp = fp
self.case_insensitive = case_insensitive
self._lock = Lock()
lv3_offset = fp.tell()
magic = fp.read(4)
# detect ivfc and get the lv3 offset
if magic == b'IVFC':
ivfc = magic + fp.read(0x54) # IVFC_HEADER_SIZE - 4
ivfc_magic_num = readle(ivfc[0x4:0x8])
if ivfc_magic_num != IVFC_ROMFS_MAGIC_NUM:
raise InvalidIVFCError(f'IVFC magic number is invalid '
f'({ivfc_magic_num:#X} instead of {IVFC_ROMFS_MAGIC_NUM:#X})')
master_hash_size = readle(ivfc[0x8:0xC])
lv3_block_size = readle(ivfc[0x4C:0x50])
lv3_hash_block_size = 1 << lv3_block_size
lv3_offset += roundup(0x60 + master_hash_size, lv3_hash_block_size)
fp.seek(self._start + lv3_offset)
magic = fp.read(4)
self.lv3_offset = lv3_offset
lv3_header = magic + fp.read(0x24) # ROMFS_LV3_HEADER_SIZE - 4
# get offsets and sizes from lv3 header
lv3_header_size = readle(magic)
lv3_dirhash = RomFSRegion(offset=readle(lv3_header[0x4:0x8]), size=readle(lv3_header[0x8:0xC]))
lv3_dirmeta = RomFSRegion(offset=readle(lv3_header[0xC:0x10]), size=readle(lv3_header[0x10:0x14]))
lv3_filehash = RomFSRegion(offset=readle(lv3_header[0x14:0x18]), size=readle(lv3_header[0x18:0x1C]))
lv3_filemeta = RomFSRegion(offset=readle(lv3_header[0x1C:0x20]), size=readle(lv3_header[0x20:0x24]))
lv3_filedata_offset = readle(lv3_header[0x24:0x28])
self.data_offset = lv3_offset + lv3_filedata_offset
# verify lv3 header
if lv3_header_size != ROMFS_LV3_HEADER_SIZE:
raise InvalidRomFSHeaderError('Length in RomFS Lv3 header is not 0x28')
if lv3_dirhash.offset < lv3_header_size:
raise InvalidRomFSHeaderError('Directory Hash offset is before the end of the Lv3 header')
if lv3_dirmeta.offset < lv3_dirhash.offset + lv3_dirhash.size:
raise InvalidRomFSHeaderError('Directory Metadata offset is before the end of the Directory Hash region')
if lv3_filehash.offset < lv3_dirmeta.offset + lv3_dirmeta.size:
raise InvalidRomFSHeaderError('File Hash offset is before the end of the Directory Metadata region')
if lv3_filemeta.offset < lv3_filehash.offset + lv3_filehash.size:
raise InvalidRomFSHeaderError('File Metadata offset is before the end of the File Hash region')
if lv3_filedata_offset < lv3_filemeta.offset + lv3_filemeta.size:
raise InvalidRomFSHeaderError('File Data offset is before the end of the File Metadata region')
# get entries from dirmeta and filemeta
def iterate_dir(out: dict, raw: bytes, current_path: str):
first_child_dir = readle(raw[0x8:0xC])
first_file = readle(raw[0xC:0x10])
out['type'] = 'dir'
out['contents'] = {}
# iterate through all child directories
if first_child_dir != 0xFFFFFFFF:
fp.seek(self._start + lv3_offset + lv3_dirmeta.offset + first_child_dir)
while True:
child_dir_meta = fp.read(0x18)
next_sibling_dir = readle(child_dir_meta[0x4:0x8])
child_dir_name = fp.read(readle(child_dir_meta[0x14:0x18])).decode('utf-16le')
child_dir_name_meta = child_dir_name.lower() if case_insensitive else child_dir_name
if child_dir_name_meta in out['contents']:
print(f'WARNING: Dirname collision! {current_path}{child_dir_name}')
out['contents'][child_dir_name_meta] = {'name': child_dir_name}
iterate_dir(out['contents'][child_dir_name_meta], child_dir_meta,
f'{current_path}{child_dir_name}/')
if next_sibling_dir == 0xFFFFFFFF:
break
fp.seek(self._start + lv3_offset + lv3_dirmeta.offset + next_sibling_dir)
if first_file != 0xFFFFFFFF:
fp.seek(self._start + lv3_offset + lv3_filemeta.offset + first_file)
while True:
child_file_meta = fp.read(0x20)
next_sibling_file = readle(child_file_meta[0x4:0x8])
child_file_offset = readle(child_file_meta[0x8:0x10])
child_file_size = readle(child_file_meta[0x10:0x18])
child_file_name = fp.read(readle(child_file_meta[0x1C:0x20])).decode('utf-16le')
child_file_name_meta = child_file_name.lower() if self.case_insensitive else child_file_name
if child_file_name_meta in out['contents']:
print(f'WARNING: Filename collision! {current_path}{child_file_name}')
out['contents'][child_file_name_meta] = {'name': child_file_name, 'type': 'file',
'offset': child_file_offset, 'size': child_file_size}
self.total_size += child_file_size
if next_sibling_file == 0xFFFFFFFF:
break
fp.seek(self._start + lv3_offset + lv3_filemeta.offset + next_sibling_file)
self._tree_root = {'name': 'ROOT'}
self.total_size = 0
fp.seek(self._start + lv3_offset + lv3_dirmeta.offset)
iterate_dir(self._tree_root, fp.read(0x18), '/')
def close(self):
self.closed = True
try:
self._fp.close()
except AttributeError:
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@overload
def open(self, path: str, encoding: str, errors: 'Optional[str]' = None,
newline: 'Optional[str]' = None) -> TextIOWrapper: ...
@overload
def open(self, path: str, encoding: None = None, errors: 'Optional[str]' = None,
newline: 'Optional[str]' = None) -> _RomFSOpenFile: ...
def open(self, path, encoding=None, errors=None, newline=None):
"""Open a file in the RomFS for reading."""
f = _RomFSOpenFile(self, path)
if encoding is not None:
f = TextIOWrapper(f, encoding, errors, newline)
return f
__del__ = close
def get_info_from_path(self, path: str) -> 'Union[RomFSDirectoryEntry, RomFSFileEntry]':
"""Get a directory or file entry"""
curr = self._tree_root
if self.case_insensitive:
path = path.lower()
if path[0] == '/':
path = path[1:]
for part in path.split('/'):
if part == '':
break
try:
# noinspection PyTypeChecker
curr = curr['contents'][part]
except KeyError:
raise RomFSFileNotFoundError(path)
if curr['type'] == 'dir':
contents = (k['name'] for k in curr['contents'].values())
return RomFSDirectoryEntry(name=curr['name'], type='dir', contents=(*contents,))
elif curr['type'] == 'file':
return RomFSFileEntry(name=curr['name'], type='file', offset=curr['offset'], size=curr['size'])
def get_data(self, info: RomFSFileEntry, offset: int, size: int) -> bytes:
if offset + size > info.size:
size = info.size - offset
with self._lock:
self._fp.seek(self._start + self.data_offset + info.offset + offset)
return self._fp.read(size)

111
pyctr/types/smdh.py Normal file
View File

@@ -0,0 +1,111 @@
# This file is a part of ninfs.
#
# Copyright (c) 2017-2019 Ian Burgwin
# This file is licensed under The MIT License (MIT).
# You can find the full license text in LICENSE.md in the root of this project.
from types import MappingProxyType
from typing import TYPE_CHECKING, NamedTuple
from ..common import PyCTRError
if TYPE_CHECKING:
from typing import BinaryIO, Dict, Mapping, Optional, Tuple, Union
SMDH_SIZE = 0x36C0
region_names = (
'Japanese',
'English',
'French',
'German',
'Italian',
'Spanish',
'Simplified Chinese',
'Korean',
'Dutch',
'Portuguese',
'Russian',
'Traditional Chinese',
)
# the order of the SMDH names to check. the difference here is that English is put before Japanese.
_region_order_check = (
'English',
'Japanese',
'French',
'German',
'Italian',
'Spanish',
'Simplified Chinese',
'Korean',
'Dutch',
'Portuguese',
'Russian',
'Traditional Chinese',
)
class SMDHError(PyCTRError):
"""Generic exception for SMDH operations."""
class InvalidSMDHError(SMDHError):
"""Invalid SMDH contents."""
class AppTitle(NamedTuple):
short_desc: str
long_desc: str
publisher: str
class SMDH:
"""
Class for 3DS SMDH. Icon data is currently not supported.
https://www.3dbrew.org/wiki/SMDH
"""
# TODO: support other settings
def __init__(self, names: 'Dict[str, AppTitle]'):
self.names: Mapping[str, AppTitle] = MappingProxyType({n: names.get(n, None) for n in region_names})
def __repr__(self):
return f'<{type(self).__name__} title: {self.get_app_title().short_desc}>'
def get_app_title(self, language: 'Union[str, Tuple[str, ...]]' = _region_order_check) -> 'Optional[AppTitle]':
if isinstance(language, str):
language = (language,)
for l in language:
apptitle = self.names[l]
if apptitle:
return apptitle
# if, for some reason, it fails to return...
return AppTitle('unknown', 'unknown', 'unknown')
@classmethod
def load(cls, fp: 'BinaryIO') -> 'SMDH':
"""Load an SMDH from a file-like object."""
smdh = fp.read(SMDH_SIZE)
if len(smdh) != SMDH_SIZE:
raise InvalidSMDHError(f'invalid size (expected: {SMDH_SIZE:#6x}, got: {len(smdh):#6x}')
if smdh[0:4] != b'SMDH':
raise InvalidSMDHError('SMDH magic not found')
app_structs = smdh[8:0x2008]
names: Dict[str, AppTitle] = {}
# due to region_names only being 12 elements, this will only process 12. the other 4 are unused.
for app_title, region in zip((app_structs[x:x + 0x200] for x in range(0, 0x2000, 0x200)), region_names):
names[region] = AppTitle(app_title[0:0x80].decode('utf-16le').strip('\0'),
app_title[0x80:0x180].decode('utf-16le').strip('\0'),
app_title[0x180:0x200].decode('utf-16le').strip('\0'))
return cls(names)
@classmethod
def from_file(cls, fn: str) -> 'SMDH':
with open(fn, 'rb') as f:
return cls.load(f)

316
pyctr/types/tmd.py Normal file
View File

@@ -0,0 +1,316 @@
# This file is a part of ninfs.
#
# Copyright (c) 2017-2019 Ian Burgwin
# This file is licensed under The MIT License (MIT).
# You can find the full license text in LICENSE.md in the root of this project.
from hashlib import sha256
from struct import pack
from typing import TYPE_CHECKING, NamedTuple
from ..common import PyCTRError
from ..util import readbe, readle
if TYPE_CHECKING:
from typing import BinaryIO, Iterable
__all__ = ['CHUNK_RECORD_SIZE', 'TitleMetadataError', 'InvalidSignatureTypeError', 'InvalidHashError',
'ContentInfoRecord', 'ContentChunkRecord', 'ContentTypeFlags', 'TitleVersion', 'TitleMetadataReader']
CHUNK_RECORD_SIZE = 0x30
# sig-type: (sig-size, padding)
signature_types = {
# RSA_4096 SHA1 (unused on 3DS)
0x00010000: (0x200, 0x3C),
# RSA_2048 SHA1 (unused on 3DS)
0x00010001: (0x100, 0x3C),
# Elliptic Curve with SHA1 (unused on 3DS)
0x00010002: (0x3C, 0x40),
# RSA_4096 SHA256
0x00010003: (0x200, 0x3C),
# RSA_2048 SHA256
0x00010004: (0x100, 0x3C),
# ECDSA with SHA256
0x00010005: (0x3C, 0x40),
}
BLANK_SIG_PAIR = (0x00010004, b'\xFF' * signature_types[0x00010004][0])
class TitleMetadataError(PyCTRError):
"""Generic exception for TitleMetadata operations."""
class InvalidTMDError(TitleMetadataError):
"""Title Metadata is invalid."""
class InvalidSignatureTypeError(InvalidTMDError):
"""Invalid signature type was used."""
def __init__(self, sig_type):
super().__init__(sig_type)
def __str__(self):
return f'{self.args[0]:#010x}'
class InvalidHashError(InvalidTMDError):
"""Hash mismatch in the Title Metadata."""
class InvalidInfoRecordError(InvalidHashError):
"""Hash mismatch in the Content Info Records."""
def __init__(self, info_record):
super().__init__(info_record)
def __str__(self):
return f'Invalid info record: {self.args[0]}'
class UnusualInfoRecordError(InvalidTMDError):
"""Encountered Content Info Record that attempts to hash a Content Chunk Record that has already been hashed."""
def __init__(self, info_record, chunk_record):
super().__init__(info_record, chunk_record)
def __str__(self):
return f'Attempted to hash twice: {self.args[0]}, {self.args[1]}'
class ContentTypeFlags(NamedTuple):
encrypted: bool
disc: bool
cfm: bool
optional: bool
shared: bool
def __index__(self) -> int:
return self.encrypted | (self.disc << 1) | (self.cfm << 2) | (self.optional << 14) | (self.shared << 15)
__int__ = __index__
def __format__(self, format_spec: str) -> str:
return self.__int__().__format__(format_spec)
@classmethod
def from_int(cls, flags: int) -> 'ContentTypeFlags':
# noinspection PyArgumentList
return cls(bool(flags & 1), bool(flags & 2), bool(flags & 4), bool(flags & 0x4000), bool(flags & 0x8000))
class ContentInfoRecord(NamedTuple):
index_offset: int
command_count: int
hash: bytes
def __bytes__(self) -> bytes:
return b''.join((self.index_offset.to_bytes(2, 'big'), self.command_count.to_bytes(2, 'big'), self.hash))
class ContentChunkRecord(NamedTuple):
id: str
cindex: int
type: ContentTypeFlags
size: int
hash: bytes
def __bytes__(self) -> bytes:
return b''.join((bytes.fromhex(self.id), self.cindex.to_bytes(2, 'big'), int(self.type).to_bytes(2, 'big'),
self.size.to_bytes(8, 'big'), self.hash))
class TitleVersion(NamedTuple):
major: int
minor: int
micro: int
def __str__(self) -> str:
return f'{self.major}.{self.minor}.{self.micro}'
def __index__(self) -> int:
return (self.major << 10) | (self.minor << 4) | self.micro
__int__ = __index__
def __format__(self, format_spec: str) -> str:
return self.__int__().__format__(format_spec)
@classmethod
def from_int(cls, ver: int) -> 'TitleVersion':
# noinspection PyArgumentList
return cls((ver >> 10) & 0x3F, (ver >> 4) & 0x3F, ver & 0xF)
class TitleMetadataReader:
"""
Class for 3DS Title Metadata.
https://www.3dbrew.org/wiki/Title_metadata
"""
__slots__ = ('title_id', 'save_size', 'srl_save_size', 'title_version', 'info_records',
'chunk_records', 'content_count', 'signature', '_u_issuer', '_u_version', '_u_ca_crl_version',
'_u_signer_crl_version', '_u_reserved1', '_u_system_version', '_u_title_type', '_u_group_id',
'_u_reserved2', '_u_srl_flag', '_u_reserved3', '_u_access_rights', '_u_boot_count', '_u_padding')
# arguments prefixed with _u_ are values unused by the 3DS and/or are only kept around to generate the final tmd
def __init__(self, *, title_id: str, save_size: int, srl_save_size: int, title_version: TitleVersion,
info_records: 'Iterable[ContentInfoRecord]', chunk_records: 'Iterable[ContentChunkRecord]',
signature=BLANK_SIG_PAIR, _u_issuer='Root-CA00000003-CP0000000b', _u_version=1, _u_ca_crl_version=0,
_u_signer_crl_version=0, _u_reserved1=0, _u_system_version=b'\0' * 8, _u_title_type=b'\0\0\0@',
_u_group_id=b'\0\0', _u_reserved2=b'\0\0\0\0', _u_srl_flag=0, _u_reserved3=b'\0' * 0x31,
_u_access_rights=b'\0' * 4, _u_boot_count=b'\0\0', _u_padding=b'\0\0'):
# TODO: add checks
self.title_id = title_id.lower()
self.save_size = save_size
self.srl_save_size = srl_save_size
self.title_version = title_version
self.info_records = tuple(info_records)
self.chunk_records = tuple(chunk_records)
self.content_count = len(self.chunk_records)
self.signature = signature # TODO: store this differently
# unused values
self._u_issuer = _u_issuer
self._u_version = _u_version
self._u_ca_crl_version = _u_ca_crl_version
self._u_signer_crl_version = _u_signer_crl_version
self._u_reserved1 = _u_reserved1
self._u_system_version = _u_system_version
self._u_title_type = _u_title_type
self._u_group_id = _u_group_id
self._u_reserved2 = _u_reserved2
self._u_srl_flag = _u_srl_flag
self._u_reserved3 = _u_reserved3
self._u_access_rights = _u_access_rights
self._u_boot_count = _u_boot_count
self._u_padding = _u_padding
def __hash__(self) -> int:
return hash((self.title_id, self.save_size, self.srl_save_size, self.title_version,
self.info_records, self.chunk_records))
def __repr__(self) -> str:
return (f'<TitleMetadataReader title_id={self.title_id!r} title_version={self.title_version!r} '
f'content_count={self.content_count!r}>')
def __bytes__(self) -> bytes:
sig_data = pack(f'>I {signature_types[self.signature[0]][0]}s {signature_types[self.signature[0]][1]}x',
self.signature[0], self.signature[1])
info_records = b''.join(bytes(x) for x in self.info_records).ljust(0x900, b'\0')
header = pack('>64s b b b b 8s 8s 4s 2s I I 4s b 49s 4s H H 2s 2s 32s', self._u_issuer.encode('ascii'),
self._u_version, self._u_ca_crl_version, self._u_signer_crl_version, self._u_reserved1,
self._u_system_version, bytes.fromhex(self.title_id), self._u_title_type, self._u_group_id,
self.save_size, self.srl_save_size, self._u_reserved2, self._u_srl_flag, self._u_reserved3,
self._u_access_rights, self.title_version, self.content_count, self._u_boot_count,
self._u_padding, sha256(info_records).digest())
chunk_records = b''.join(bytes(x) for x in self.chunk_records)
return sig_data + header + info_records + chunk_records
@classmethod
def load(cls, fp: 'BinaryIO', verify_hashes: bool = True) -> 'TitleMetadataReader':
"""Load a tmd from a file-like object."""
sig_type = readbe(fp.read(4))
try:
sig_size, sig_padding = signature_types[sig_type]
except KeyError:
raise InvalidSignatureTypeError(sig_type)
signature = fp.read(sig_size)
try:
fp.seek(sig_padding, 1)
except Exception:
# most streams are probably seekable, but for some that aren't...
fp.read(sig_padding)
header = fp.read(0xC4)
if len(header) != 0xC4:
raise InvalidTMDError('Header length is not 0xC4')
# only values that actually have a use are loaded here. (currently)
# several fields in were left in from the Wii tmd and have no function on 3DS.
title_id = header[0x4C:0x54].hex()
save_size = readle(header[0x5A:0x5E])
srl_save_size = readle(header[0x5E:0x62])
title_version = TitleVersion.from_int(readbe(header[0x9C:0x9E]))
content_count = readbe(header[0x9E:0xA0])
content_info_records_hash = header[0xA4:0xC4]
content_info_records_raw = fp.read(0x900)
if len(content_info_records_raw) != 0x900:
raise InvalidTMDError('Content info records length is not 0x900')
if verify_hashes:
real_hash = sha256(content_info_records_raw)
if content_info_records_hash != real_hash.digest():
raise InvalidHashError('Content Info Records hash is invalid')
content_chunk_records_raw = fp.read(content_count * CHUNK_RECORD_SIZE)
chunk_records = []
for cr_raw in (content_chunk_records_raw[i:i + CHUNK_RECORD_SIZE] for i in
range(0, content_count * CHUNK_RECORD_SIZE, CHUNK_RECORD_SIZE)):
chunk_records.append(ContentChunkRecord(id=cr_raw[0:4].hex(),
cindex=readbe(cr_raw[4:6]),
type=ContentTypeFlags.from_int(readbe(cr_raw[6:8])),
size=readbe(cr_raw[8:16]),
hash=cr_raw[16:48]))
info_records = []
for ir_raw in (content_info_records_raw[i:i + 0x24] for i in range(0, 0x900, 0x24)):
if ir_raw != b'\0' * 0x24:
info_records.append(ContentInfoRecord(index_offset=readbe(ir_raw[0:2]),
command_count=readbe(ir_raw[2:4]),
hash=ir_raw[4:36]))
if verify_hashes:
chunk_records_hashed = set()
for ir in info_records:
to_hash = []
for cr in chunk_records[ir.index_offset:ir.index_offset + ir.command_count]:
if cr in chunk_records_hashed:
raise InvalidTMDError('attempting to hash chunk record twice')
chunk_records_hashed.add(cr)
to_hash.append(cr)
hashed = sha256(b''.join(bytes(x) for x in to_hash))
if hashed.digest() != ir.hash:
raise InvalidInfoRecordError(ir)
# unused vales are loaded only for use when re-building the binary tmd
u_issuer = header[0:0x40].decode('ascii').rstrip('\0')
u_version = header[0x40]
u_ca_crl_version = header[0x41]
u_signer_crl_version = header[0x42]
u_reserved1 = header[0x43]
u_system_version = header[0x44:0x4C]
u_title_type = header[0x54:0x58]
u_group_id = header[0x58:0x5A]
u_reserved2 = header[0x62:0x66]
u_srl_flag = header[0x66] # is this one used for anything?
u_reserved3 = header[0x67:0x98]
u_access_rights = header[0x98:0x9C]
u_boot_count = header[0xA0:0xA2]
u_padding = header[0xA2:0xA4]
return cls(title_id=title_id, save_size=save_size, srl_save_size=srl_save_size, title_version=title_version,
info_records=info_records, chunk_records=chunk_records, signature=(sig_type, signature),
_u_issuer=u_issuer, _u_version=u_version, _u_ca_crl_version=u_ca_crl_version,
_u_signer_crl_version=u_signer_crl_version, _u_reserved1=u_reserved1,
_u_system_version=u_system_version, _u_title_type=u_title_type, _u_group_id=u_group_id,
_u_reserved2=u_reserved2, _u_srl_flag=u_srl_flag, _u_reserved3=u_reserved3,
_u_access_rights=u_access_rights, _u_boot_count=u_boot_count, _u_padding=u_padding)
@classmethod
def from_file(cls, fn: str, *, verify_hashes: bool = True) -> 'TitleMetadataReader':
with open(fn, 'rb') as f:
return cls.load(f, verify_hashes=verify_hashes)

41
pyctr/util.py Normal file
View File

@@ -0,0 +1,41 @@
# This file is a part of ninfs.
#
# Copyright (c) 2017-2019 Ian Burgwin
# This file is licensed under The MIT License (MIT).
# You can find the full license text in LICENSE.md in the root of this project.
import os
from math import ceil
from sys import platform
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import List
__all__ = ['windows', 'macos', 'readle', 'readbe', 'roundup', 'config_dirs']
windows = platform in {'win32', 'cygwin'}
macos = platform == 'darwin'
def readle(b: bytes) -> int:
"""Convert little-endian bytes to an int."""
return int.from_bytes(b, 'little')
def readbe(b: bytes) -> int:
"""Convert big-endian bytes to an int."""
return int.from_bytes(b, 'big')
def roundup(offset: int, alignment: int) -> int:
"""Round up a number to a provided alignment."""
return int(ceil(offset / alignment) * alignment)
_home = os.path.expanduser('~')
config_dirs: 'List[str]' = [os.path.join(_home, '.3ds'), os.path.join(_home, '3ds')]
if windows:
config_dirs.insert(0, os.path.join(os.environ.get('APPDATA'), '3ds'))
elif macos:
config_dirs.insert(0, os.path.join(_home, 'Library', 'Application Support', '3ds'))