From d0dff2f2be6ec676d10f279d151766273ca74c1c Mon Sep 17 00:00:00 2001 From: Quentin Kaiser Date: Thu, 31 Aug 2023 08:46:10 +0200 Subject: [PATCH] fix(handlers): replace CPIO extractor (7z) by unblob based extractor. Multiple issues were identified in how 7z handles CPIO: - processing of CPIO archives depends on the file name, and the rules are not explicit (name.cpio is OK, name.cpio.truncated is OK, but name.cpio.ext is not) - special device files (e.g. block devices, character devices) are extracted as empty files rather than created using mknod We therefore decided to implement a CPIO extractor based on unblob's Extractor. Since CPIO has so many different format, each format has its own extractor: - binary CPIO (bin)- BinaryCPIOExtractor - portable ASCII CPIO (odc) - PortableOldASCIIExtractor - portable ASCII CPIO (newc) - PortableASCIIExtractor - portable ASCII with CRC (crc) - PortableASCIIWithCRCExtractor Each extractor use a different parser, adapted to the format. --- .../dev/{console => .gitkeep} | 0 .../dev/{console => .gitkeep} | 0 unblob/handlers/archive/cpio.py | 455 +++++++++++++----- 3 files changed, 326 insertions(+), 129 deletions(-) rename tests/integration/archive/cpio/cpio_portable_ascii/__output__/sample.nofilepad.cpio-newc_extract/0-512.cpio_portable_ascii_extract/dev/{console => .gitkeep} (100%) rename tests/integration/archive/cpio/cpio_portable_ascii/__output__/sample.nofilepad.unaligned.cpio-newc_extract/1-513.cpio_portable_ascii_extract/dev/{console => .gitkeep} (100%) diff --git a/tests/integration/archive/cpio/cpio_portable_ascii/__output__/sample.nofilepad.cpio-newc_extract/0-512.cpio_portable_ascii_extract/dev/console b/tests/integration/archive/cpio/cpio_portable_ascii/__output__/sample.nofilepad.cpio-newc_extract/0-512.cpio_portable_ascii_extract/dev/.gitkeep similarity index 100% rename from tests/integration/archive/cpio/cpio_portable_ascii/__output__/sample.nofilepad.cpio-newc_extract/0-512.cpio_portable_ascii_extract/dev/console rename to tests/integration/archive/cpio/cpio_portable_ascii/__output__/sample.nofilepad.cpio-newc_extract/0-512.cpio_portable_ascii_extract/dev/.gitkeep diff --git a/tests/integration/archive/cpio/cpio_portable_ascii/__output__/sample.nofilepad.unaligned.cpio-newc_extract/1-513.cpio_portable_ascii_extract/dev/console b/tests/integration/archive/cpio/cpio_portable_ascii/__output__/sample.nofilepad.unaligned.cpio-newc_extract/1-513.cpio_portable_ascii_extract/dev/.gitkeep similarity index 100% rename from tests/integration/archive/cpio/cpio_portable_ascii/__output__/sample.nofilepad.unaligned.cpio-newc_extract/1-513.cpio_portable_ascii_extract/dev/console rename to tests/integration/archive/cpio/cpio_portable_ascii/__output__/sample.nofilepad.unaligned.cpio-newc_extract/1-513.cpio_portable_ascii_extract/dev/.gitkeep diff --git a/unblob/handlers/archive/cpio.py b/unblob/handlers/archive/cpio.py index 30c3c34067..fa10f9cc5a 100644 --- a/unblob/handlers/archive/cpio.py +++ b/unblob/handlers/archive/cpio.py @@ -1,15 +1,35 @@ import io -from typing import Optional +import os +import stat +from pathlib import Path +from typing import Optional, Type +import attr +from dissect.cstruct import Instance from structlog import get_logger -from ...extractors import Command -from ...file_utils import OffsetFile, decode_int, round_up, snull -from ...models import File, HexString, StructHandler, ValidChunk +from ...file_utils import ( + Endian, + FileSystem, + InvalidInputFormat, + StructParser, + decode_int, + iterate_file, + round_up, + snull, +) +from ...models import ( + Extractor, + ExtractResult, + File, + Handler, + HexString, + ValidChunk, +) logger = get_logger() -CPIO_TRAILER_NAME = b"TRAILER!!!" +CPIO_TRAILER_NAME = "TRAILER!!!" MAX_LINUX_PATH_LENGTH = 0x1000 C_ISBLK = 0o60000 @@ -40,28 +60,87 @@ C_STICKY_BITS = (C_NONE, C_ISUID, C_ISGID, C_ISVTX, C_ISUID_ISGID) - -class _CPIOHandlerBase(StructHandler): - """A common base for all CPIO formats. - - The format should be parsed the same, there are small differences how to calculate - file and filename sizes padding and conversion from octal / hex. - """ - - EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-o{outdir}") - +C_DEFINITIONS = r""" + typedef struct old_cpio_header + { + uint16 c_magic; + uint16 c_dev; + uint16 c_ino; + uint16 c_mode; + uint16 c_uid; + uint16 c_gid; + uint16 c_nlink; + uint16 c_rdev; + uint16 c_mtimes[2]; + uint16 c_namesize; + uint16 c_filesize[2]; + } old_cpio_header_t; + + typedef struct old_ascii_header + { + char c_magic[6]; + char c_dev[6]; + char c_ino[6]; + char c_mode[6]; + char c_uid[6]; + char c_gid[6]; + char c_nlink[6]; + char c_rdev[6]; + char c_mtime[11]; + char c_namesize[6]; + char c_filesize[11]; + } old_ascii_header_t; + + typedef struct new_ascii_header + { + char c_magic[6]; + char c_ino[8]; + char c_mode[8]; + char c_uid[8]; + char c_gid[8]; + char c_nlink[8]; + char c_mtime[8]; + char c_filesize[8]; + char c_dev_maj[8]; + char c_dev_min[8]; + char c_rdev_maj[8]; + char c_rdev_min[8]; + char c_namesize[8]; + char c_chksum[8]; + } new_ascii_header_t; +""" + + +@attr.define +class CPIOEntry: + start_offset: int + size: int + dev: int + mode: int + rdev: int + path: Path + + +class CPIOParserBase: _PAD_ALIGN: int _FILE_PAD_ALIGN: int = 512 + HEADER_STRUCT: str - def calculate_chunk( # noqa: C901 - self, file: File, start_offset: int - ) -> Optional[ValidChunk]: - file_with_offset = OffsetFile(file, start_offset) - current_offset = start_offset + def __init__(self, file: File, start_offset: int): + self.file = file + self.start_offset = start_offset + self.end_offset = -1 + self.entries = [] + self.struct_parser = StructParser(C_DEFINITIONS) + + def parse(self): # noqa: C901 + current_offset = self.start_offset while True: - file.seek(current_offset) + self.file.seek(current_offset, io.SEEK_SET) try: - header = self.parse_header(file) + header = self.struct_parser.parse( + self.HEADER_STRUCT, self.file, Endian.LITTLE + ) except EOFError: break @@ -70,19 +149,27 @@ def calculate_chunk( # noqa: C901 # heuristics 1: check the filename if c_namesize > MAX_LINUX_PATH_LENGTH: - return None + raise InvalidInputFormat("CPIO entry filename is too long.") + + if c_namesize == 0: + raise InvalidInputFormat("CPIO entry filename empty.") - if c_namesize > 0: - tmp_filename = file_with_offset.read(c_namesize) + padded_header_size = self._pad_header(header, c_namesize) + current_offset += padded_header_size - # heuristics 2: check that filename is null-byte terminated - if not tmp_filename.endswith(b"\x00"): - return None + tmp_filename = self.file.read(c_namesize) - filename = snull(tmp_filename) - if filename == CPIO_TRAILER_NAME: - current_offset += self._pad_content(header, c_filesize, c_namesize) - break + # heuristics 2: check that filename is null-byte terminated + if not tmp_filename.endswith(b"\x00"): + raise InvalidInputFormat( + "CPIO entry filename is not null-byte terminated" + ) + + filename = snull(tmp_filename).decode("utf-8") + + if filename == CPIO_TRAILER_NAME: + current_offset += self._pad_content(c_filesize) + break c_mode = self._calculate_mode(header) @@ -92,23 +179,59 @@ def calculate_chunk( # noqa: C901 # heuristics 3: check mode field is_valid = file_type in C_FILE_TYPES and sticky_bit in C_STICKY_BITS if not is_valid: - return None - - file_with_offset.seek(c_filesize, io.SEEK_CUR) - current_offset += self._pad_content(header, c_filesize, c_namesize) - - end_offset = start_offset + self._pad_file( - file_with_offset, current_offset - start_offset - ) - if start_offset == end_offset: - return None - return ValidChunk( - start_offset=start_offset, - end_offset=end_offset, - ) + raise InvalidInputFormat("CPIO entry mode is invalid.") + + if self.valid_checksum(header, current_offset): + self.entries.append( + CPIOEntry( + start_offset=current_offset, + size=c_filesize, + dev=self._calculate_dev(header), + mode=c_mode, + rdev=self._calculate_rdev(header), + path=Path(filename), + ) + ) + else: + logger.warning("Invalid CRC for CPIO entry, skipping.", header=header) + + current_offset += self._pad_content(c_filesize) + + self.end_offset = self.start_offset + self._pad_file(self.file, current_offset) + if self.start_offset == self.end_offset: + raise InvalidInputFormat("Invalid CPIO archive.") + + def dump_entries(self, fs: FileSystem): + for entry in self.entries: + # skip entries with "." as filename + if entry.path.name in ("", "."): + continue + + if stat.S_ISREG(entry.mode): + fs.carve(entry.path, self.file, entry.start_offset, entry.size) + elif stat.S_ISDIR(entry.mode): + fs.mkdir( + entry.path, mode=entry.mode & 0o777, parents=True, exist_ok=True + ) + elif stat.S_ISLNK(entry.mode): + link_path = Path( + snull( + self.file[entry.start_offset : entry.start_offset + entry.size] + ).decode("utf-8") + ) + fs.create_symlink(src=link_path, dst=entry.path) + elif ( + stat.S_ISCHR(entry.mode) + or stat.S_ISBLK(entry.mode) + or stat.S_ISSOCK(entry.mode) + or stat.S_ISSOCK(entry.mode) + ): + fs.mknod(entry.path, mode=entry.mode & 0o777, device=entry.rdev) + else: + logger.warning("unknown file type in CPIO archive") @classmethod - def _pad_file(cls, file: OffsetFile, end_offset: int) -> int: + def _pad_file(cls, file: File, end_offset: int) -> int: """CPIO archives can have a 512 bytes block padding at the end.""" file.seek(end_offset, io.SEEK_SET) padded_end_offset = round_up(end_offset, cls._FILE_PAD_ALIGN) @@ -118,11 +241,13 @@ def _pad_file(cls, file: OffsetFile, end_offset: int) -> int: return end_offset @classmethod - def _pad_content(cls, header, c_filesize: int, c_namesize: int) -> int: + def _pad_header(cls, header: Instance, c_namesize: int) -> int: + return round_up(len(header) + c_namesize, cls._PAD_ALIGN) + + @classmethod + def _pad_content(cls, c_filesize: int) -> int: """Pad header and content with _PAD_ALIGN bytes.""" - padded_header = round_up(len(header) + c_namesize, cls._PAD_ALIGN) - padded_content = round_up(c_filesize, cls._PAD_ALIGN) - return padded_header + padded_content + return round_up(c_filesize, cls._PAD_ALIGN) @staticmethod def _calculate_file_size(header) -> int: @@ -136,31 +261,23 @@ def _calculate_name_size(header) -> int: def _calculate_mode(header) -> int: raise NotImplementedError + @staticmethod + def _calculate_dev(header) -> int: + raise NotImplementedError -class BinaryHandler(_CPIOHandlerBase): - NAME = "cpio_binary" - PATTERNS = [HexString("c7 71 // (default, bin, hpbin)")] + @staticmethod + def _calculate_rdev(header) -> int: + raise NotImplementedError - C_DEFINITIONS = r""" - typedef struct old_cpio_header - { - uint16 c_magic; - uint16 c_dev; - uint16 c_ino; - uint16 c_mode; - uint16 c_uid; - uint16 c_gid; - uint16 c_nlink; - uint16 c_rdev; - uint16 c_mtimes[2]; - uint16 c_namesize; - uint16 c_filesize[2]; - } old_cpio_header_t; - """ - HEADER_STRUCT = "old_cpio_header_t" + def valid_checksum(self, header, start_offset: int) -> bool: # noqa: ARG002 + return True + +class BinaryCPIOParser(CPIOParserBase): _PAD_ALIGN = 2 + HEADER_STRUCT = "old_cpio_header_t" + @staticmethod def _calculate_file_size(header) -> int: return header.c_filesize[0] << 16 | header.c_filesize[1] @@ -173,32 +290,20 @@ def _calculate_name_size(header) -> int: def _calculate_mode(header) -> int: return header.c_mode + @staticmethod + def _calculate_dev(header) -> int: + return header.c_dev -class PortableOldASCIIHandler(_CPIOHandlerBase): - NAME = "cpio_portable_old_ascii" - - PATTERNS = [HexString("30 37 30 37 30 37 // 07 07 07")] + @staticmethod + def _calculate_rdev(header) -> int: + return header.c_rdev - C_DEFINITIONS = r""" - typedef struct old_ascii_header - { - char c_magic[6]; - char c_dev[6]; - char c_ino[6]; - char c_mode[6]; - char c_uid[6]; - char c_gid[6]; - char c_nlink[6]; - char c_rdev[6]; - char c_mtime[11]; - char c_namesize[6]; - char c_filesize[11]; - } old_ascii_header_t; - """ - HEADER_STRUCT = "old_ascii_header_t" +class PortableOldASCIIParser(CPIOParserBase): _PAD_ALIGN = 1 + HEADER_STRUCT = "old_ascii_header_t" + @staticmethod def _calculate_file_size(header) -> int: return decode_int(header.c_filesize, 8) @@ -211,35 +316,18 @@ def _calculate_name_size(header) -> int: def _calculate_mode(header) -> int: return decode_int(header.c_mode, 8) + @staticmethod + def _calculate_dev(header) -> int: + return decode_int(header.c_dev, 8) -class _NewASCIICommon(StructHandler): - C_DEFINITIONS = r""" - typedef struct new_ascii_header - { - char c_magic[6]; - char c_ino[8]; - char c_mode[8]; - char c_uid[8]; - char c_gid[8]; - char c_nlink[8]; - char c_mtime[8]; - char c_filesize[8]; - char c_dev_maj[8]; - char c_dev_min[8]; - char c_rdev_maj[8]; - char c_rdev_min[8]; - char c_namesize[8]; - char c_chksum[8]; - } new_ascii_header_t; - """ - HEADER_STRUCT = "new_ascii_header_t" - - _PAD_ALIGN = 4 + @staticmethod + def _calculate_rdev(header) -> int: + return decode_int(header.c_rdev, 8) -class PortableASCIIHandler(_NewASCIICommon, _CPIOHandlerBase): - NAME = "cpio_portable_ascii" - PATTERNS = [HexString("30 37 30 37 30 31 // 07 07 01 (newc)")] +class PortableASCIIParser(CPIOParserBase): + _PAD_ALIGN = 4 + HEADER_STRUCT = "new_ascii_header_t" @staticmethod def _calculate_file_size(header) -> int: @@ -253,19 +341,128 @@ def _calculate_name_size(header) -> int: def _calculate_mode(header) -> int: return decode_int(header.c_mode, 16) + @staticmethod + def _calculate_dev(header) -> int: + return os.makedev( + decode_int(header.c_dev_maj, 16), decode_int(header.c_dev_min, 16) + ) + + @staticmethod + def _calculate_rdev(header) -> int: + return os.makedev( + decode_int(header.c_rdev_maj, 16), decode_int(header.c_rdev_min, 16) + ) + + +class PortableASCIIWithCRCParser(PortableASCIIParser): + def valid_checksum(self, header, start_offset: int) -> bool: + header_checksum = decode_int(header.c_chksum, 16) + calculated_checksum = 0 + file_size = self._calculate_file_size(header) + + for chunk in iterate_file(self.file, start_offset, file_size): + calculated_checksum += sum(bytearray(chunk)) + + return header_checksum == calculated_checksum -class PortableASCIIWithCRCHandler(_NewASCIICommon, _CPIOHandlerBase): + +class _CPIOExtractorBase(Extractor): + PARSER: Type[CPIOParserBase] + + def extract(self, inpath: Path, outdir: Path) -> Optional[ExtractResult]: + fs = FileSystem(outdir) + + with File.from_path(inpath) as file: + parser = self.PARSER(file, 0) + parser.parse() + parser.dump_entries(fs) + + +class BinaryCPIOExtractor(_CPIOExtractorBase): + PARSER = BinaryCPIOParser + + +class PortableOldASCIIExtractor(_CPIOExtractorBase): + PARSER = PortableOldASCIIParser + + +class PortableASCIIExtractor(_CPIOExtractorBase): + PARSER = PortableASCIIParser + + +class PortableASCIIWithCRCExtractor(_CPIOExtractorBase): + PARSER = PortableASCIIWithCRCParser + + +class _CPIOHandlerBase(Handler): + """A common base for all CPIO formats. + + The format should be parsed the same, there are small differences how to calculate + file and filename sizes padding and conversion from octal / hex. + """ + + EXTRACTOR = None + + def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: + raise NotImplementedError + + +class BinaryHandler(_CPIOHandlerBase): + NAME = "cpio_binary" + PATTERNS = [HexString("c7 71 // (default, bin, hpbin)")] + + EXTRACTOR = BinaryCPIOExtractor() + + def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: + parser = BinaryCPIOParser(file, start_offset) + parser.parse() + return ValidChunk( + start_offset=start_offset, + end_offset=parser.end_offset, + ) + + +class PortableOldASCIIHandler(_CPIOHandlerBase): + NAME = "cpio_portable_old_ascii" + + PATTERNS = [HexString("30 37 30 37 30 37 // 07 07 07")] + + EXTRACTOR = PortableOldASCIIExtractor() + + def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: + parser = PortableOldASCIIParser(file, start_offset) + parser.parse() + return ValidChunk( + start_offset=start_offset, + end_offset=parser.end_offset, + ) + + +class PortableASCIIHandler(_CPIOHandlerBase): + NAME = "cpio_portable_ascii" + PATTERNS = [HexString("30 37 30 37 30 31 // 07 07 01 (newc)")] + + EXTRACTOR = PortableASCIIExtractor() + + def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: + parser = PortableASCIIParser(file, start_offset) + parser.parse() + return ValidChunk( + start_offset=start_offset, + end_offset=parser.end_offset, + ) + + +class PortableASCIIWithCRCHandler(_CPIOHandlerBase): NAME = "cpio_portable_ascii_crc" PATTERNS = [HexString("30 37 30 37 30 32 // 07 07 02")] - @staticmethod - def _calculate_file_size(header): - return decode_int(header.c_filesize, 16) - - @staticmethod - def _calculate_name_size(header): - return decode_int(header.c_namesize, 16) + EXTRACTOR = PortableASCIIWithCRCExtractor() - @staticmethod - def _calculate_mode(header) -> int: - return decode_int(header.c_mode, 16) + def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: + parser = PortableASCIIWithCRCParser(file, start_offset) + parser.parse() + return ValidChunk( + start_offset=start_offset, + end_offset=parser.end_offset, + )