diff --git a/unblob/handlers/filesystem/fat.py b/unblob/handlers/filesystem/fat.py index 1b0d82f743..3041d79673 100644 --- a/unblob/handlers/filesystem/fat.py +++ b/unblob/handlers/filesystem/fat.py @@ -1,4 +1,7 @@ -from typing import Optional +import io +import struct +from enum import Enum +from typing import List, Optional from dissect.cstruct import Instance from structlog import get_logger @@ -15,6 +18,20 @@ VALID_MEDIAS = [0xF0, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF] +class FsType(Enum): + UNKNOWN = b"FAT " + FAT12 = b"FAT12 " + FAT16 = b"FAT16 " + FAT32 = b"FAT32 " + + +FAT_ENTRY_SIZES = { + FsType.FAT12: int(12 // 8), + FsType.FAT16: int(16 // 8), + FsType.FAT32: int(32 // 8), +} + + class FATHandler(StructHandler): NAME = "fat" @@ -132,10 +149,72 @@ def valid_header(self, header: Instance) -> bool: return self.valid_fat32_header(header) return True + def get_fat_size(self, header: Instance) -> int: + if FsType(header.FileSysType) in (FsType.FAT12, FsType.FAT16): + return header.common.FATSz16 + if FsType(header.FileSysType) == FsType.FAT32: + return header.FATSz32 + raise InvalidInputFormat("Invalid FAT type.") + + def read_fat(self, header: Instance, start_offset: int, file: File) -> List[bytes]: + # we go past the reserved sectors (boot sector, fs information sector, custom) + file.seek(start_offset + (header.common.BytesPerSec * header.common.RsvdSecCnt)) + + fats = [] + fat_size = self.get_fat_size(header) * header.common.BytesPerSec + + for _ in range(header.common.NumFATs): + content = file.read(fat_size) + if len(content) != fat_size: + raise InvalidInputFormat( + "An error occured when reading the File Allocation Table." + ) + fats += [content] + + if len(set(fats)) != 1: + logger.warning( + "There is a mismatch between the main and backup File Allocation Tables, filesystem is probably corrupted." + ) + return fats + + def parse_fat(self, header: Instance, start_offset: int, file: File) -> List[int]: + fat_size = self.get_fat_size(header) * header.common.BytesPerSec + + fats = self.read_fat(header, start_offset, file) + + entry_size = FAT_ENTRY_SIZES[FsType(header.FileSysType)] + total_entries = int(fat_size // entry_size) + fat = [0] * total_entries + + main_file_allocation_table = fats[0] + # backup_file_allocation_table = fats[1] + + if FsType(header.FileSysType) == FsType.FAT32: + for i in range(total_entries): + entry = main_file_allocation_table[ + (i * entry_size) : (i * entry_size) + entry_size + ] + fat[i] = struct.unpack("> ((i & 1) << 2)) & 0xFFF + else: + raise InvalidInputFormat("Invalid FAT type.") + return fat + def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: header = self.cparser_le.fat12_16_bootsec_t(file) - if header.FileSysType in (b"FAT12 ", b"FAT16 "): + if header.FileSysType in [FsType.FAT12.value, FsType.FAT16.value]: if header.common.TotSectors == 0: sector_count = header.NumSectors else: @@ -155,6 +234,29 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk] size = header.common.BytesPerSec * sector_count + file_size = file.size() + if start_offset + size > file_size: + logger.warning( + "Calculated chunk size overflows, falling back to active cluser based calculation." + ) + file.seek( + start_offset + (header.common.BytesPerSec * header.common.RsvdSecCnt), + io.SEEK_SET, + ) + # now we can parse the File Allocation Table + file_allocation_table = self.parse_fat(header, start_offset, file) + + # header size corresponds to the size of reserved sectors and FAT sectors + headers_size = (header.common.NumFATs * header.common.BytesPerSec) + ( + header.common.BytesPerSec * header.common.RsvdSecCnt + ) + + active_clusters = len( + [cluster for cluster in file_allocation_table if cluster != 0] + ) + cluster_size = header.common.BytesPerSec * header.common.SecPerClus + size = headers_size + (active_clusters * cluster_size) + return ValidChunk( start_offset=start_offset, end_offset=start_offset + size,