Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 151 additions & 2 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ readme = "README.md"
packages = [{include = "ubireader"}]

[tool.poetry.dependencies]
python = ">=3.9"
python = ">=3.9.2"
lzallright = "^0.2.1"
cryptography = "^44.0.2"

[build-system]
requires = ["poetry-core"]
Expand Down
20 changes: 18 additions & 2 deletions ubireader/scripts/ubireader_extract_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def main():
parser.add_argument('-o', '--output-dir', dest='outpath',
help='Specify output directory path.')

parser.add_argument('-K', '--master-key', dest='master_key',
help='Master key file, given with fscryptctl e.g. to encrypt the UBIFS (support limited to fscrypt v1 policies)')

parser.add_argument('filepath', help='File to extract contents of.')

if len(sys.argv) == 1:
Expand All @@ -104,6 +107,19 @@ def main():

settings.uboot_fix = args.uboot_fix

if args.master_key:
path = args.master_key
if not os.path.exists(path):
parser.error("File path doesn't exist.")
else :
with open(path, "rb") as file:
if os.stat(path).st_size != 64:
parser.error("Master key file size is not 64 bytes.")
else:
master_key = file.read(64)
else:
master_key = None

if args.filepath:
path = args.filepath
if not os.path.exists(path):
Expand Down Expand Up @@ -179,14 +195,14 @@ def main():
lebv_file = leb_virtual_file(ubi_obj, vol_blocks)

# Extract files from UBI image.
ubifs_obj = ubifs(lebv_file)
ubifs_obj = ubifs(lebv_file, master_key=master_key)
print('Extracting files to: %s' % vol_outpath)
extract_files(ubifs_obj, vol_outpath, perms)


elif filetype == UBIFS_NODE_MAGIC:
# Create UBIFS object
ubifs_obj = ubifs(ufile_obj)
ubifs_obj = ubifs(ufile_obj, master_key=master_key)

# Create directory for files.
create_output_dir(outpath)
Expand Down
20 changes: 18 additions & 2 deletions ubireader/scripts/ubireader_list_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ def main():
parser.add_argument('-D', '--copy-dest', dest='copyfiledest',
help='Copy Destination.')

parser.add_argument('-K', '--master-key', dest='master_key',
help='Master key file, given with fscryptctl e.g. to encrypt the UBIFS (support limited to fscrypt v1 policies)')

parser.add_argument('filepath', help='UBI/UBIFS image file.')

if len(sys.argv) == 1:
Expand All @@ -96,6 +99,19 @@ def main():

settings.uboot_fix = args.uboot_fix

if args.master_key:
path = args.master_key
if not os.path.exists(path):
parser.error("File path doesn't exist.")
else :
with open(path, "rb") as file:
if os.stat(path).st_size != 64:
parser.error("Master key file size is not 64 bytes.")
else:
master_key = file.read(64)
else:
master_key = None

if args.filepath:
path = args.filepath
if not os.path.exists(path):
Expand Down Expand Up @@ -154,7 +170,7 @@ def main():
lebv_file = leb_virtual_file(ubi_obj, vol_blocks)

# Create UBIFS object.
ubifs_obj = ubifs(lebv_file)
ubifs_obj = ubifs(lebv_file, master_key=master_key)

if args.listpath:
list_files(ubifs_obj, args.listpath)
Expand All @@ -163,7 +179,7 @@ def main():

elif filetype == UBIFS_NODE_MAGIC:
# Create UBIFS object
ubifs_obj = ubifs(ufile_obj)
ubifs_obj = ubifs(ufile_obj, master_key=master_key)

if args.listpath:
list_files(ubifs_obj, args.listpath)
Expand Down
4 changes: 3 additions & 1 deletion ubireader/ubifs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ubireader.debug import error, log, verbose_display
from ubireader.ubifs.defines import *
from ubireader.ubifs import nodes, display
from typing import Optional

class ubifs():
"""UBIFS object
Expand All @@ -35,9 +36,10 @@ class ubifs():
Obj:mst_node -- Master Node of UBIFS image LEB1
Obj:mst_node2 -- Master Node 2 of UBIFS image LEB2
"""
def __init__(self, ubifs_file):
def __init__(self, ubifs_file, master_key: Optional[bytes] = None):
self.__name__ = 'UBIFS'
self._file = ubifs_file
self.master_key = master_key
try:
self.file.reset()
sb_chdr = nodes.common_hdr(self.file.read(UBIFS_COMMON_HDR_SZ))
Expand Down
102 changes: 102 additions & 0 deletions ubireader/ubifs/decrypt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from ubireader.ubifs.defines import UBIFS_XATTR_NAME_ENCRYPTION_CONTEXT
from ubireader.debug import error
from cryptography.hazmat.primitives.ciphers import (
Cipher, algorithms, modes
)

AES_BLOCK_SIZE = algorithms.AES.block_size // 8

def lookup_inode_nonce(inodes: dict, inode: dict) -> bytes:
# get the extended attribute 'xent' of the inode
if 'xent' not in inode or not inode['xent']:
raise ValueError(f"No xent found for inode {inode}")

for xattr_inode in inode['xent']:
if (xattr_inode.name == UBIFS_XATTR_NAME_ENCRYPTION_CONTEXT):
nonce_ino = inodes[xattr_inode.inum]['ino']
nonce = nonce_ino.data[-16:]
if len(nonce) != 16:
raise ValueError(f"Invalid nonce length for inode {inode}")
return nonce


def derive_key_from_nonce(master_key: bytes, nonce: bytes) -> bytes:
encryptor = Cipher(
algorithms.AES(nonce),
modes.ECB(),
).encryptor()
derived_key = encryptor.update(master_key) + encryptor.finalize()
return derived_key


def filename_decrypt(key: bytes, ciphertext: bytes):

# using AES CTS-CBC mode not supported by pyca cryptography
if len(ciphertext) > AES_BLOCK_SIZE:
# Cipher Text Stealing Step
pad = AES_BLOCK_SIZE - len(ciphertext) % AES_BLOCK_SIZE

if pad > 0: # Steal ciphertext only if needed (CTS)
decryptor = Cipher(
algorithms.AES(key[:32]),
modes.ECB(),
).decryptor()
second_to_last = ciphertext[-2*AES_BLOCK_SIZE+pad:-AES_BLOCK_SIZE+pad]
plaintext = decryptor.update(second_to_last) + decryptor.finalize()
# Apply padding
ciphertext += plaintext[-pad:]
# Swap the last two blocks
ciphertext = ciphertext[:-2*AES_BLOCK_SIZE] + ciphertext[-AES_BLOCK_SIZE:] + ciphertext[-2*AES_BLOCK_SIZE:-AES_BLOCK_SIZE]

# AES-CBC step
NULL_IV = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'

decryptor = Cipher(
algorithms.AES(key[:32]),
modes.CBC(NULL_IV),
).decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
return plaintext.rstrip(b'\x00')


def datablock_decrypt(block_key: bytes, block_iv: bytes, block_data: bytes):
decryptor = Cipher(
algorithms.AES(block_key),
modes.XTS(block_iv),
).decryptor()
return decryptor.update(block_data) + decryptor.finalize()


def decrypt_filenames(ubifs, inodes):
if ubifs.master_key is None:
for inode in inodes.values():
for dent in inode['dent']:
dent.name = dent.raw_name.decode()
return
try:
# for every node holding a cryptographic xattr, lookup the
# nonce inode from the xattr 'inum' attr
for inode in inodes.values():
if "dent" not in inode:
continue
nonce = lookup_inode_nonce(inodes, inode)
dec_key = derive_key_from_nonce(ubifs.master_key, nonce)
for dent in inode['dent']:
dent.name = filename_decrypt(dec_key, dent.raw_name).decode()
except Exception as e:
error(decrypt_filenames, 'Error', str(e))


def decrypt_symlink_target(ubifs, inodes, dent_node) -> str:
if ubifs.master_key is None:
return inodes[dent_node.inum]['ino'].data.decode()
inode = inodes[dent_node.inum]
ino = inode['ino']
nonce = lookup_inode_nonce(inodes, inode)
# the first two bytes is just header 0x10 0x00 all the time
# the second byte is a null byte (0x00) added, need to be removed
# before decryption
encrypted_name = ino.data[2:-1]
dec_key = derive_key_from_nonce(ubifs.master_key, nonce)
lnkname = filename_decrypt(dec_key, encrypted_name)
return lnkname.decode()
19 changes: 17 additions & 2 deletions ubireader/ubifs/defines.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,20 @@
# 'data', no size.
UBIFS_INO_NODE_SZ = struct.calcsize(UBIFS_INO_NODE_FORMAT)

# Extended attribute node are identical to DENT_NODES
UBIFS_XENT_NODE_FORMAT = '<%ssQBBHI' % (UBIFS_MAX_KEY_LEN)
UBIFS_XENT_NODE_FIELDS = ['key', # Node key.
'inum', # Target inode number.
'padding1', # Reserved for future, zeros.
'type', # Type of target inode.
'nlen', # Name length.
'cookie', # 32bit random number, used to
# construct a 64bit identifier.
]
# 'name', no size.
UBIFS_XENT_NODE_SZ = struct.calcsize(UBIFS_XENT_NODE_FORMAT)

UBIFS_XATTR_NAME_ENCRYPTION_CONTEXT = "c"

# Directory entry node
UBIFS_DENT_NODE_FORMAT = '<%ssQBBHI' % (UBIFS_MAX_KEY_LEN)
Expand All @@ -282,8 +296,9 @@
UBIFS_DATA_NODE_FIELDS = ['key', # Node key.
'size', # Uncompressed data size.
'compr_type', # Compression type UBIFS_COMPR_*.
'compr_size', # Compressed data size in bytes
# only valid when data is encrypted.
'plaintext_size', # Compressed data size in bytes
# before encryption only valid
# when data is encrypted.
]
# 'data', no size.
UBIFS_DATA_NODE_SZ = struct.calcsize(UBIFS_DATA_NODE_FORMAT)
Expand Down
31 changes: 24 additions & 7 deletions ubireader/ubifs/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import os
import time
import struct
from ubireader.ubifs.decrypt import lookup_inode_nonce, derive_key_from_nonce, datablock_decrypt, decrypt_symlink_target
from ubireader.ubifs.defines import *
from ubireader.ubifs import walk
from ubireader.ubifs.misc import decompress
Expand Down Expand Up @@ -86,7 +88,7 @@ def copy_file(ubifs, filepath, destpath):

for dent in inodes[inum]['dent']:
if dent.name == filename:
filedata = _process_reg_file(ubifs, inodes[dent.inum], filepath)
filedata = _process_reg_file(ubifs, inodes[dent.inum], filepath, inodes)
if os.path.isdir(destpath):
destpath = os.path.join(destpath, filename)
with open(destpath, 'wb') as f:
Expand Down Expand Up @@ -114,7 +116,7 @@ def print_dent(ubifs, inodes, dent_node, long=True, longts=False):

lnk = ""
if dent_node.type == UBIFS_ITYPE_LNK:
lnk = " -> " + inode['ino'].data.decode('utf-8')
lnk = " -> " + decrypt_symlink_target(ubifs, inodes, dent_node)

if longts:
mtime = inode['ino'].mtime_sec
Expand Down Expand Up @@ -143,32 +145,47 @@ def file_leng(ubifs, inode):
return 0


def _process_reg_file(ubifs, inode, path):
def _process_reg_file(ubifs, inode, path, inodes):
try:
buf = bytearray()
start_key = (UBIFS_DATA_KEY << UBIFS_S_KEY_BLOCK_BITS)
if 'data' in inode:
compr_type = 0
sorted_data = sorted(inode['data'], key=lambda x: x.key['khash'])
last_khash = sorted_data[0].key['khash']-1
last_khash = start_key - 1

for data in sorted_data:

# If data nodes are missing in sequence, fill in blanks
# with \x00 * UBIFS_BLOCK_SIZE
if data.key['khash'] - last_khash != 1:
while 1 != (data.key['khash'] - last_khash):
buf += b'\x00'*UBIFS_BLOCK_SIZE
buf += b'\x00' * UBIFS_BLOCK_SIZE
last_khash += 1

compr_type = data.compr_type
ubifs.file.seek(data.offset)
d = ubifs.file.read(data.compr_len)

if ubifs.master_key is not None:
nonce = lookup_inode_nonce(inodes, inode)
block_key = derive_key_from_nonce(ubifs.master_key, nonce)
# block_id is based on the current hash
# there could be empty blocks
block_id = data.key['khash']-start_key
block_iv = struct.pack("<QQ", block_id, 0)
d = datablock_decrypt(block_key, block_iv, d)
# if unpading is needed the plaintext_size is valid and set to the
# original size of current block, so we can use this to get the amout
# of bytes to unpad
d = d[:data.plaintext_size]

buf += decompress(compr_type, data.size, d)
last_khash = data.key['khash']
verbose_log(_process_reg_file, 'ino num: %s, compression: %s, path: %s' % (inode['ino'].key['ino_num'], compr_type, path))

except Exception as e:
error(_process_reg_file, 'Warn', 'inode num:%s :%s' % (inode['ino'].key['ino_num'], e))
error(_process_reg_file, 'Warn', 'inode num:%s path:%s :%s' % (inode['ino'].key['ino_num'], path, e))

# Pad end of file with \x00 if needed.
if inode['ino'].size > len(buf):
Expand Down
32 changes: 30 additions & 2 deletions ubireader/ubifs/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,34 @@ def __iter__(self):
def display(self, tab=''):
return display.ino_node(self, tab)

class xent_node(object):
"""Get xattr entry node at given LEB number + offset.

Arguments:
Bin:buf -- Raw data to extract header information from.

See ubifs/defines.py for object attributes.
"""
def __init__(self, buf):
fields = dict(zip(UBIFS_XENT_NODE_FIELDS, struct.unpack(UBIFS_XENT_NODE_FORMAT, buf[0:UBIFS_XENT_NODE_SZ])))
for key in fields:
if key == 'key':
setattr(self, key, parse_key(fields[key]))
else:
setattr(self, key, fields[key])
setattr(self, 'name', buf[-self.nlen-1:-1].decode())
setattr(self, 'errors', [])

def __repr__(self):
return 'UBIFS XATTR Entry Node'

def __iter__(self):
for key in dir(self):
if not key.startswith('_'):
yield key, getattr(self, key)

def display(self, tab=''):
return display.dent_node(self, tab)

class dent_node(object):
"""Get dir entry node at given LEB number + offset.
Expand All @@ -96,8 +124,8 @@ def __init__(self, buf):
setattr(self, key, parse_key(fields[key]))
else:
setattr(self, key, fields[key])

setattr(self, 'name', '%s' % buf[-self.nlen-1:-1].decode('utf-8'))
setattr(self, 'raw_name', buf[-self.nlen-1:-1])
setattr(self, 'name', "")
setattr(self, 'errors', [])

def __repr__(self):
Expand Down
Loading