Skip to content

Commit

Permalink
feat(FileSystem): redirect operations on bad paths to lost+found dir
Browse files Browse the repository at this point in the history
  • Loading branch information
e3krisztian committed Aug 30, 2023
1 parent c2cf49b commit ebe141e
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 53 deletions.
63 changes: 56 additions & 7 deletions tests/test_file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
is_safe_path,
iterate_file,
iterate_patterns,
make_lost_and_found_path,
round_down,
round_up,
)
Expand Down Expand Up @@ -391,10 +392,11 @@ class TestFileSystem:
)
def test_get_extraction_path_success(self, path):
fs = FileSystem(Path("/unblob/sandbox"))
checked_path = fs._get_extraction_path(Path(path), "test") # noqa: SLF001
assert checked_path
extraction_path = fs._get_extraction_path(Path(path), "test") # noqa: SLF001
assert extraction_path
assert os.path.commonpath([extraction_path.resolve(), fs.root]) == str(fs.root)

assert fs.problems == []
assert checked_path.relative_to(fs.root)

@pytest.mark.parametrize(
"path",
Expand All @@ -407,16 +409,19 @@ def test_get_extraction_path_success(self, path):
)
def test_get_extraction_path_path_traversal_is_reported(self, path):
fs = FileSystem(Path("/unblob/sandbox"))
assert not fs._get_extraction_path(Path(path), "test") # noqa: SLF001
extraction_path = fs._get_extraction_path(Path(path), "test") # noqa: SLF001
assert extraction_path
assert os.path.commonpath([extraction_path.resolve(), fs.root]) == str(fs.root)

assert fs.problems

def test_get_extraction_path_path_traversal_reports(self):
fs = FileSystem(Path("/unblob/sandbox"))
op1 = f"test1-{object()}"
op2 = f"test2-{object()}"
assert op1 != op2
assert not fs._get_extraction_path(Path("../file"), op1) # noqa: SLF001
assert not fs._get_extraction_path(Path("../etc/passwd"), op2) # noqa: SLF001
fs._get_extraction_path(Path("../file"), op1) # noqa: SLF001
fs._get_extraction_path(Path("../etc/passwd"), op2) # noqa: SLF001

report1, report2 = fs.problems

Expand Down Expand Up @@ -462,7 +467,15 @@ def test_mkdir(self, sandbox: FileSystem):
assert sandbox.problems == []

def test_mkdir_outside_sandbox(self, sandbox: FileSystem):
sandbox.mkdir(Path("../directory"))
try:
sandbox.mkdir(Path("../directory"))
pytest.fail(
"expected failure, as lost+found directory is not created for mkdir"
)
except FileNotFoundError:
pass

sandbox.mkdir(Path("../directory"), parents=True)

assert not (sandbox.root / "../directory").exists()
assert sandbox.problems
Expand Down Expand Up @@ -558,3 +571,39 @@ def test_create_hardlink_outside_sandbox(self, sandbox: FileSystem):

assert not os.path.lexists(output_path)
assert sandbox.problems


@pytest.mark.parametrize(
"input_path, expected_path",
[
# the important thing here is that there is a hash, that is different for different parents
# even if they are reduced to the same slug
pytest.param(
"file",
".unblob-lost+found/_2727e5a04d8acc225b3320799348e34eff9ac515e1130101baab751a/file",
id="non-traversal",
),
pytest.param(
"../file",
".unblob-lost+found/_e90583b491d2138aab0c8a12478ee050701910fd80c84289ae747e7c/file",
id="path-traversal",
),
pytest.param(
"../../file",
".unblob-lost+found/_42a75ca4cfdad26e66c560d67ca640c8690ddbe20ba08e5e65d5733e/file",
id="path-traversal-further-down",
),
pytest.param(
"/etc/passwd",
".unblob-lost+found/etc_feb0ca54f8477feb6210163efa5aa746160c573118847d96422b5dfa/passwd",
id="absolute-path",
),
pytest.param(
"../m@u/n,g.e<d>p!a#t%h&t*o/file.md",
".unblob-lost+found/m-u-n-g-e-d-p-a-t-h-t-o_20bf817fac07c1c34418fcc37d153571577f9b67c5a0e5f0f63bcacb/file.md",
id="non-alnum-path-parts",
),
],
)
def test_make_lost_and_found_path(input_path: str, expected_path: str):
assert make_lost_and_found_path(Path(input_path)) == Path(expected_path)
111 changes: 65 additions & 46 deletions unblob/file_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import enum
import functools
import hashlib
import io
import math
import mmap
import os
import re
import shutil
import struct
import unicodedata
from pathlib import Path
from typing import Iterable, Iterator, List, Optional, Tuple, Union

Expand All @@ -16,6 +19,7 @@
from .report import (
ExtractionProblem,
LinkExtractionProblem,
PathTraversalProblem,
SpecialFileExtractionProblem,
)

Expand Down Expand Up @@ -368,15 +372,38 @@ def chop_root(path: Path):
return Path("/".join(relative_parts))


def make_lost_and_found_path(path: Path) -> Path:
"""Make a human readable, safe path."""
dir_path = path.parent

# . and .. would not be a valid filename, but they would lead to confusion
filename = {".": "dot", "..": "dot-dot"}.get(path.name, path.name)
dir_hash = hashlib.sha224(str(dir_path).encode(errors="ignore")).hexdigest()

# adapted from https://stackoverflow.com/questions/5574042/string-slugification-in-python
dir_slug = str(dir_path)
dir_slug = unicodedata.normalize("NFKD", dir_slug)
dir_slug = dir_slug.encode("ascii", "ignore").lower()
dir_slug = re.sub(rb"[^a-z0-9]+", b"-", dir_slug).strip(b"-")
dir_slug = re.sub(rb"[-]+", b"-", dir_slug).decode()

return Path(f".unblob-lost+found/{dir_slug}_{dir_hash}/{filename}")


class _FSPath:
def __init__(self, *, root: Path, path: Path) -> None:
self.root = root
self.relative_path = chop_root(path)
self.absolute_path = root / self.relative_path
self.is_safe = is_safe_path(self.root, self.absolute_path)
absolute_path = root / self.relative_path
self.is_safe = is_safe_path(root, absolute_path)

def format_path(self) -> str:
return str(self.relative_path)
if self.is_safe:
self.safe_relative_path = self.relative_path
self.absolute_path = absolute_path
else:
self.safe_relative_path = make_lost_and_found_path(path)
self.absolute_path = root / self.safe_relative_path
assert is_safe_path(root, self.absolute_path)


class _FSLink:
Expand Down Expand Up @@ -429,80 +456,72 @@ def _fs_path(self, path: Path) -> _FSPath:
def _ensure_parent_dir(self, path: Path):
path.parent.mkdir(parents=True, exist_ok=True)

def _get_extraction_path(
self, path: Path, path_use_description: str
) -> Optional[Path]:
def _get_extraction_path(self, path: Path, path_use_description: str) -> Path:
fs_path = self._fs_path(path)
if fs_path.is_safe:
return fs_path.absolute_path

report = ExtractionProblem(
path=fs_path.format_path(),
problem=f"Potential path traversal through {path_use_description}",
resolution="Skipped.",
)
self.record_problem(report)
return None
if not fs_path.is_safe:
report = PathTraversalProblem(
path=str(fs_path.relative_path),
extraction_path=str(fs_path.safe_relative_path),
problem=f"Potential path traversal through {path_use_description}",
resolution="Redirected.",
)
self.record_problem(report)

return fs_path.absolute_path

def write_bytes(self, path: Path, content: bytes):
logger.debug("creating file", file_path=path, _verbosity=3)
safe_path = self._get_extraction_path(path, "write_bytes")

if safe_path:
self._ensure_parent_dir(safe_path)
safe_path.write_bytes(content)
self._ensure_parent_dir(safe_path)
safe_path.write_bytes(content)

def write_chunks(self, path: Path, chunks: Iterable[bytes]):
logger.debug("creating file", file_path=path, _verbosity=3)
safe_path = self._get_extraction_path(path, "write_chunks")

if safe_path:
self._ensure_parent_dir(safe_path)
with safe_path.open("wb") as f:
for chunk in chunks:
f.write(chunk)
self._ensure_parent_dir(safe_path)
with safe_path.open("wb") as f:
for chunk in chunks:
f.write(chunk)

def carve(self, path: Path, file: File, start_offset: int, size: int):
logger.debug("carving file", path=path, _verbosity=3)
safe_path = self._get_extraction_path(path, "carve")

if safe_path:
self._ensure_parent_dir(safe_path)
carve(safe_path, file, start_offset, size)
self._ensure_parent_dir(safe_path)
carve(safe_path, file, start_offset, size)

def mkdir(self, path: Path, *, mode=0o777, parents=False, exist_ok=False):
logger.debug("creating directory", dir_path=path, _verbosity=3)
safe_path = self._get_extraction_path(path, "mkdir")

if safe_path:
self._ensure_parent_dir(safe_path)
safe_path.mkdir(mode=mode, parents=parents, exist_ok=exist_ok)
safe_path.mkdir(mode=mode, parents=parents, exist_ok=exist_ok)

def mkfifo(self, path: Path, mode=0o666):
logger.debug("creating fifo", path=path, _verbosity=3)
safe_path = self._get_extraction_path(path, "mkfifo")

if safe_path:
self._ensure_parent_dir(safe_path)
os.mkfifo(safe_path, mode=mode)
self._ensure_parent_dir(safe_path)
os.mkfifo(safe_path, mode=mode)

def mknod(self, path: Path, mode=0o600, device=0):
logger.debug("creating special file", special_path=path, _verbosity=3)
safe_path = self._get_extraction_path(path, "mknod")

if safe_path:
if self.has_root_permissions:
self._ensure_parent_dir(safe_path)
os.mknod(safe_path, mode=mode, device=device)
else:
problem = SpecialFileExtractionProblem(
problem="Root privileges are required to create block and char devices.",
resolution="Skipped.",
path=str(path),
mode=mode,
device=device,
)
self.record_problem(problem)
if self.has_root_permissions:
self._ensure_parent_dir(safe_path)
os.mknod(safe_path, mode=mode, device=device)
else:
problem = SpecialFileExtractionProblem(
problem="Root privileges are required to create block and char devices.",
resolution="Skipped.",
path=str(path),
mode=mode,
device=device,
)
self.record_problem(problem)

def _get_checked_link(self, src: Path, dst: Path) -> Optional[_FSLink]:
link = _FSLink(root=self.root, src=src, dst=dst)
Expand Down
12 changes: 12 additions & 0 deletions unblob/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,18 @@ def log_with(self, logger):
logger.warning(self.log_msg, path=self.path)


@attr.define(kw_only=True, frozen=True)
class PathTraversalProblem(ExtractionProblem):
extraction_path: str

def log_with(self, logger):
logger.warning(
self.log_msg,
path=self.path,
extraction_path=self.extraction_path,
)


@attr.define(kw_only=True, frozen=True)
class LinkExtractionProblem(ExtractionProblem):
link_path: str
Expand Down

0 comments on commit ebe141e

Please sign in to comment.