Skip to content

Commit

Permalink
fix(handler): improve FAT handler by adding a fallback mechanism for …
Browse files Browse the repository at this point in the history
…truncated images.

Some FAT samples that we have are truncated to match exactly the size of
reserved sectors + FAT sectors + active cluster sectors. It's like the
initial image minus the free clusters.

Another approach to compute the size of a FAT chunk is to parse the File
Allocation Table (FAT) and consider that the FAT chunk size is equal to
the amount of active clusters multiplied by cluster size + size of
reserved sectors + size of FAT sectors.

This approach works great with truncated images but not so much with
valid images as it triggers 'Unexpected end of archive' in 7z, even if
the aforementioned end of archive that got stripped is full of null
bytes.

Therefore, we only apply it as a fallback mechanism when the chunk size
overflows the file.
  • Loading branch information
qkaiser committed Aug 24, 2023
1 parent 0d3c2bb commit c204b86
Showing 1 changed file with 104 additions and 2 deletions.
106 changes: 104 additions & 2 deletions unblob/handlers/filesystem/fat.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Optional
import io
import struct
from enum import Enum
from typing import List, Optional

from dissect.cstruct import Instance
from structlog import get_logger
Expand All @@ -15,6 +18,20 @@
VALID_MEDIAS = [0xF0, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF]


class FsType(Enum):
UNKNOWN = b"FAT "
FAT12 = b"FAT12 "
FAT16 = b"FAT16 "
FAT32 = b"FAT32 "


FAT_ENTRY_SIZES = {
FsType.FAT12: int(12 // 8),
FsType.FAT16: int(16 // 8),
FsType.FAT32: int(32 // 8),
}


class FATHandler(StructHandler):
NAME = "fat"

Expand Down Expand Up @@ -132,10 +149,72 @@ def valid_header(self, header: Instance) -> bool:
return self.valid_fat32_header(header)
return True

def get_fat_size(self, header: Instance) -> int:
if FsType(header.FileSysType) in (FsType.FAT12, FsType.FAT16):
return header.common.FATSz16
if FsType(header.FileSysType) == FsType.FAT32:
return header.FATSz32
raise InvalidInputFormat("Invalid FAT type.")

def read_fat(self, header: Instance, start_offset: int, file: File) -> List[bytes]:
# we go past the reserved sectors (boot sector, fs information sector, custom)
file.seek(start_offset + (header.common.BytesPerSec * header.common.RsvdSecCnt))

fats = []
fat_size = self.get_fat_size(header) * header.common.BytesPerSec

for _ in range(header.common.NumFATs):
content = file.read(fat_size)
if len(content) != fat_size:
raise InvalidInputFormat(
"An error occured when reading the File Allocation Table."
)
fats += [content]

if len(set(fats)) != 1:
logger.warning(
"There is a mismatch between the main and backup File Allocation Tables, filesystem is probably corrupted."
)
return fats

def parse_fat(self, header: Instance, start_offset: int, file: File) -> List[int]:
fat_size = self.get_fat_size(header) * header.common.BytesPerSec

fats = self.read_fat(header, start_offset, file)

entry_size = FAT_ENTRY_SIZES[FsType(header.FileSysType)]
total_entries = int(fat_size // entry_size)
fat = [0] * total_entries

main_file_allocation_table = fats[0]
# backup_file_allocation_table = fats[1]

if FsType(header.FileSysType) == FsType.FAT32:
for i in range(total_entries):
entry = main_file_allocation_table[
(i * entry_size) : (i * entry_size) + entry_size
]
fat[i] = struct.unpack("<L", entry)[0] & 0x0F_FF_FF_FF
elif FsType(header.FileSysType) == FsType.FAT16:
for i in range(total_entries):
entry = main_file_allocation_table[
(i * entry_size) : (i * entry_size) + entry_size
]
fat[i] = struct.unpack("<H", entry)[0]
elif FsType(header.FileSysType) == FsType.FAT12:
for i in range(total_entries - 1):
entry = main_file_allocation_table[
(i * entry_size) : (i * entry_size) + 2
]
fat[i] = (struct.unpack("<H", entry)[0] >> ((i & 1) << 2)) & 0xFFF
else:
raise InvalidInputFormat("Invalid FAT type.")
return fat

def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
header = self.cparser_le.fat12_16_bootsec_t(file)

if header.FileSysType in (b"FAT12 ", b"FAT16 "):
if header.FileSysType in [FsType.FAT12.value, FsType.FAT16.value]:
if header.common.TotSectors == 0:
sector_count = header.NumSectors
else:
Expand All @@ -155,6 +234,29 @@ def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]

size = header.common.BytesPerSec * sector_count

file_size = file.size()
if start_offset + size > file_size:
logger.warning(
"Calculated chunk size overflows, falling back to active cluser based calculation."
)
file.seek(
start_offset + (header.common.BytesPerSec * header.common.RsvdSecCnt),
io.SEEK_SET,
)
# now we can parse the File Allocation Table
file_allocation_table = self.parse_fat(header, start_offset, file)

# header size corresponds to the size of reserved sectors and FAT sectors
headers_size = (header.common.NumFATs * header.common.BytesPerSec) + (
header.common.BytesPerSec * header.common.RsvdSecCnt
)

active_clusters = len(
[cluster for cluster in file_allocation_table if cluster != 0]
)
cluster_size = header.common.BytesPerSec * header.common.SecPerClus
size = headers_size + (active_clusters * cluster_size)

return ValidChunk(
start_offset=start_offset,
end_offset=start_offset + size,
Expand Down

0 comments on commit c204b86

Please sign in to comment.