Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chunks index caching #8403

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from .item import Item, ArchiveItem, ItemDiff
from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
from .remote import RemoteRepository, cache_if_remote
from .repository import Repository, NoManifestError
from .repository import Repository, NoManifestError, StoreObjectNotFound
from .repoobj import RepoObj

has_link = hasattr(os, "link")
Expand Down Expand Up @@ -1644,7 +1644,7 @@ def check(
self.check_all = not any((first, last, match, older, newer, oldest, newest))
self.repair = repair
self.repository = repository
self.chunks = build_chunkindex_from_repo(self.repository)
self.chunks = build_chunkindex_from_repo(self.repository, disable_caches=True, cache_immediately=not repair)
self.key = self.make_key(repository)
self.repo_objs = RepoObj(self.key)
if verify_data:
Expand Down Expand Up @@ -2100,6 +2100,18 @@ def valid_item(obj):

def finish(self):
if self.repair:
logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.")
# we may have deleted chunks, invalidate/remove the chunks index cache!
try:
self.repository.store_delete("cache/chunks_hash")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
pass
try:
self.repository.store_delete("cache/chunks")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
pass
logger.info("Writing Manifest.")
self.manifest.write()

Expand Down
132 changes: 71 additions & 61 deletions src/borg/archiver/compact_cmd.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import argparse
from typing import Tuple, Dict
from typing import Tuple, Set

from ._common import with_repository
from ..archive import Archive
from ..cache import write_chunkindex_to_repo_cache
from ..constants import * # NOQA
from ..hashindex import ChunkIndex, ChunkIndexEntry
from ..helpers import set_ec, EXIT_WARNING, EXIT_ERROR, format_file_size, bin_to_hex
from ..helpers import ProgressIndicatorPercent
from ..manifest import Manifest
Expand All @@ -20,48 +22,75 @@
self.repository = repository
assert isinstance(repository, (Repository, RemoteRepository))
self.manifest = manifest
self.repository_chunks = None # what we have in the repository, id -> stored_size
self.used_chunks = None # what archives currently reference
self.wanted_chunks = None # chunks that would be nice to have for next borg check --repair
self.chunks = None # a ChunkIndex, here used for: id -> (is_used, stored_size)
self.total_files = None # overall number of source files written to all archives in this repo
self.total_size = None # overall size of source file content data written to all archives
self.archives_count = None # number of archives

@property
def repository_size(self):
if self.repository_chunks is None:
if self.chunks is None:
return None
return sum(self.repository_chunks.values()) # sum of stored sizes
return sum(entry.size for id, entry in self.chunks.iteritems()) # sum of stored sizes

def garbage_collect(self):
"""Removes unused chunks from a repository."""
logger.info("Starting compaction / garbage collection...")
logger.info("Getting object IDs present in the repository...")
self.repository_chunks = self.get_repository_chunks()
self.chunks = self.get_repository_chunks()
logger.info("Computing object IDs used by archives...")
(self.used_chunks, self.wanted_chunks, self.total_files, self.total_size, self.archives_count) = (
(self.missing_chunks, self.reappeared_chunks, self.total_files, self.total_size, self.archives_count) = (
self.analyze_archives()
)
self.report_and_delete()
self.save_chunk_index()
logger.info("Finished compaction / garbage collection...")

def get_repository_chunks(self) -> Dict[bytes, int]:
def get_repository_chunks(self) -> ChunkIndex:
"""Build a dict id -> size of all chunks present in the repository"""
repository_chunks = {}
chunks = ChunkIndex()
marker = None
while True:
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
if not result:
break
marker = result[-1][0]
for id, stored_size in result:
repository_chunks[id] = stored_size
return repository_chunks

def analyze_archives(self) -> Tuple[Dict[bytes, int], Dict[bytes, int], int, int, int]:
# we add this id to the chunks index, using refcount == 0, because
# we do not know yet whether it is actually referenced from some archives.
# we "abuse" the size field here. usually there is the plaintext size,
# but we use it for the size of the stored object here.
chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size)
return chunks

def save_chunk_index(self):
# first clean up:
for id, entry in self.chunks.iteritems():
# we already deleted the unused chunks, so everything left must be used:
assert entry.refcount == ChunkIndex.MAX_VALUE
# as we put the wrong size in there, we need to clean up the size:
self.chunks[id] = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
# now self.chunks is an uptodate ChunkIndex, usable for general borg usage!
write_chunkindex_to_repo_cache(self.repository, self.chunks, compact=True, clear=True, force_write=True)
self.chunks = None # nothing there (cleared!)

def analyze_archives(self) -> Tuple[Set, Set, int, int, int]:
"""Iterate over all items in all archives, create the dicts id -> size of all used/wanted chunks."""
used_chunks = {} # chunks referenced by item.chunks
wanted_chunks = {} # additional "wanted" chunks seen in item.chunks_healthy

def use_it(id, *, wanted=False):
entry = self.chunks.get(id)
if entry is not None:
# the chunk is in the repo, mark it used by setting refcount to max.
self.chunks[id] = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=entry.size)
if wanted:
# chunk id is from chunks_healthy list: a lost chunk has re-appeared!
reappeared_chunks.add(id)

Check warning on line 87 in src/borg/archiver/compact_cmd.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archiver/compact_cmd.py#L87

Added line #L87 was not covered by tests
else:
# we do NOT have this chunk in the repository!
missing_chunks.add(id)

Check warning on line 90 in src/borg/archiver/compact_cmd.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archiver/compact_cmd.py#L90

Added line #L90 was not covered by tests

missing_chunks: set[bytes] = set()
reappeared_chunks: set[bytes] = set()
archive_infos = self.manifest.archives.list(sort_by=["ts"])
num_archives = len(archive_infos)
pi = ProgressIndicatorPercent(
Expand All @@ -73,79 +102,60 @@
logger.info(f"Analyzing archive {info.name} {info.ts} {bin_to_hex(info.id)} ({i + 1}/{num_archives})")
archive = Archive(self.manifest, info.id)
# archive metadata size unknown, but usually small/irrelevant:
used_chunks[archive.id] = 0
use_it(archive.id)
for id in archive.metadata.item_ptrs:
used_chunks[id] = 0
use_it(id)
for id in archive.metadata.items:
used_chunks[id] = 0
use_it(id)
# archive items content data:
for item in archive.iter_items():
total_files += 1 # every fs object counts, not just regular files
if "chunks" in item:
for id, size in item.chunks:
total_size += size # original, uncompressed file content size
used_chunks[id] = size
use_it(id)
if "chunks_healthy" in item:
# we also consider the chunks_healthy chunks as referenced - do not throw away
# anything that borg check --repair might still need.
for id, size in item.chunks_healthy:
if id not in used_chunks:
wanted_chunks[id] = size
use_it(id, wanted=True)

Check warning on line 121 in src/borg/archiver/compact_cmd.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archiver/compact_cmd.py#L121

Added line #L121 was not covered by tests
pi.finish()
return used_chunks, wanted_chunks, total_files, total_size, num_archives
return missing_chunks, reappeared_chunks, total_files, total_size, num_archives

def report_and_delete(self):
run_repair = " Run borg check --repair!"

missing_new = set(self.used_chunks) - set(self.repository_chunks)
if missing_new:
logger.error(f"Repository has {len(missing_new)} new missing objects." + run_repair)
if self.missing_chunks:
logger.error(f"Repository has {len(self.missing_chunks)} missing objects." + run_repair)

Check warning on line 129 in src/borg/archiver/compact_cmd.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archiver/compact_cmd.py#L129

Added line #L129 was not covered by tests
set_ec(EXIT_ERROR)

missing_known = set(self.wanted_chunks) - set(self.repository_chunks)
if missing_known:
logger.warning(f"Repository has {len(missing_known)} known missing objects.")
set_ec(EXIT_WARNING)

missing_found = set(self.wanted_chunks) & set(self.repository_chunks)
if missing_found:
logger.warning(f"{len(missing_found)} previously missing objects re-appeared!" + run_repair)
if self.reappeared_chunks:
logger.warning(f"{len(self.reappeared_chunks)} previously missing objects re-appeared!" + run_repair)

Check warning on line 133 in src/borg/archiver/compact_cmd.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archiver/compact_cmd.py#L133

Added line #L133 was not covered by tests
set_ec(EXIT_WARNING)

repo_size_before = self.repository_size
referenced_chunks = set(self.used_chunks) | set(self.wanted_chunks)
unused = set(self.repository_chunks) - referenced_chunks
logger.info(f"Repository has {len(unused)} objects to delete.")
if unused:
logger.info(f"Deleting {len(unused)} unused objects...")
pi = ProgressIndicatorPercent(
total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete"
)
for i, id in enumerate(unused):
pi.show(i)
self.repository.delete(id)
del self.repository_chunks[id]
pi.finish()
logger.info("Determining unused objects...")
unused = set()
for id, entry in self.chunks.iteritems():
if entry.refcount == 0:
unused.add(id)
logger.info(f"Deleting {len(unused)} unused objects...")
pi = ProgressIndicatorPercent(
total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete"
)
for i, id in enumerate(unused):
pi.show(i)
self.repository.delete(id)
del self.chunks[id]
pi.finish()
repo_size_after = self.repository_size

count = len(self.repository_chunks)
count = len(self.chunks)
logger.info(f"Overall statistics, considering all {self.archives_count} archives in this repository:")
logger.info(
f"Source data size was {format_file_size(self.total_size, precision=0)} in {self.total_files} files."
)
dsize = 0
for id in self.repository_chunks:
if id in self.used_chunks:
dsize += self.used_chunks[id]
elif id in self.wanted_chunks:
dsize += self.wanted_chunks[id]
else:
raise KeyError(bin_to_hex(id))
logger.info(f"Repository size is {format_file_size(self.repository_size, precision=0)} in {count} objects.")
if self.total_size != 0:
logger.info(f"Space reduction factor due to deduplication: {dsize / self.total_size:.3f}")
if dsize != 0:
logger.info(f"Space reduction factor due to compression: {self.repository_size / dsize:.3f}")
logger.info(f"Repository size is {format_file_size(repo_size_after, precision=0)} in {count} objects.")
logger.info(f"Compaction saved {format_file_size(repo_size_before - repo_size_after, precision=0)}.")


Expand Down
75 changes: 72 additions & 3 deletions src/borg/cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import configparser
import io
import os
import shutil
import stat
Expand Down Expand Up @@ -30,7 +31,7 @@
from .manifest import Manifest
from .platform import SaveFile
from .remote import RemoteRepository
from .repository import LIST_SCAN_LIMIT, Repository
from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound

# chunks is a list of ChunkListEntry
FileCacheEntry = namedtuple("FileCacheEntry", "age inode size ctime mtime chunks")
Expand Down Expand Up @@ -618,7 +619,64 @@
)


def build_chunkindex_from_repo(repository):
def load_chunks_hash(repository) -> bytes:
try:
hash = repository.store_load("cache/chunks_hash")
logger.debug(f"cache/chunks_hash is '{bin_to_hex(hash)}'.")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
hash = b""
logger.debug("cache/chunks_hash missing!")
return hash


def write_chunkindex_to_repo_cache(repository, chunks, *, compact=False, clear=False, force_write=False):
cached_hash = load_chunks_hash(repository)
if compact:
# if we don't need the in-memory chunks index anymore:
chunks.compact() # vacuum the hash table
with io.BytesIO() as f:
chunks.write(f)
data = f.getvalue()
if clear:
# if we don't need the in-memory chunks index anymore:
chunks.clear() # free memory, immediately
new_hash = xxh64(data)
if force_write or new_hash != cached_hash:
# when an updated chunks index is stored into the cache, we also store its hash into the cache.
# when a client is loading the chunks index from a cache, it has to compare its xxh64
# hash against cache/chunks_hash in the repository. if it is the same, the cache
# is valid. If it is different, the cache is either corrupted or out of date and
# has to be discarded.
# when some functionality is DELETING chunks from the repository, it has to either update
# both cache/chunks and cache/chunks_hash (like borg compact does) or it has to delete both,
# so that all clients will discard any client-local chunks index caches.
logger.debug(f"caching chunks index {bin_to_hex(new_hash)} in repository...")
repository.store_store("cache/chunks", data)
repository.store_store("cache/chunks_hash", new_hash)
return new_hash


def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False):
chunks = None
# first, try to load a pre-computed and centrally cached chunks index:
if not disable_caches:
wanted_hash = load_chunks_hash(repository)
logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_hash)}) from the repo...")
try:
chunks_data = repository.store_load("cache/chunks")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
logger.debug("cache/chunks not found in the repository.")
else:
if xxh64(chunks_data) == wanted_hash:
logger.debug("cache/chunks is valid.")
with io.BytesIO(chunks_data) as f:
chunks = ChunkIndex.read(f)
return chunks
else:
logger.debug("cache/chunks is invalid.")

Check warning on line 678 in src/borg/cache.py

View check run for this annotation

Codecov / codecov/patch

src/borg/cache.py#L678

Added line #L678 was not covered by tests
# if we didn't get anything from the cache, compute the ChunkIndex the slow way:
logger.debug("querying the chunk IDs list from the repo...")
chunks = ChunkIndex()
t0 = perf_counter()
Expand Down Expand Up @@ -646,6 +704,9 @@
# Protocol overhead is neglected in this calculation.
speed = format_file_size(num_chunks * 34 / duration)
logger.debug(f"queried {num_chunks} chunk IDs in {duration} s ({num_requests} requests), ~{speed}/s")
if cache_immediately:
# immediately update cache/chunks, so we only rarely have to do it the slow way:
write_chunkindex_to_repo_cache(repository, chunks, compact=False, clear=False, force_write=True)
return chunks


Expand All @@ -660,7 +721,7 @@
@property
def chunks(self):
if self._chunks is None:
self._chunks = build_chunkindex_from_repo(self.repository)
self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True)
return self._chunks

def seen_chunk(self, id, size=None):
Expand Down Expand Up @@ -709,6 +770,11 @@
stats.update(size, not exists)
return ChunkListEntry(id, size)

def _write_chunks_cache(self, chunks):
# this is called from .close, so we can clear/compact here:
write_chunkindex_to_repo_cache(self.repository, self._chunks, compact=True, clear=True)
self._chunks = None # nothing there (cleared!)


class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
"""
Expand Down Expand Up @@ -794,6 +860,9 @@
pi.output("Saving files cache")
integrity_data = self._write_files_cache(self._files)
self.cache_config.integrity[self.files_cache_name()] = integrity_data
if self._chunks is not None:
pi.output("Saving chunks cache")
self._write_chunks_cache(self._chunks) # cache/chunks in repo has a different integrity mechanism
pi.output("Saving cache config")
self.cache_config.save(self.manifest)
self.cache_config.close()
Expand Down
Loading