From 36e3d63474ec6463a9681fec5ca3b3eb72901dcb Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Sat, 21 Sep 2024 23:57:23 +0200 Subject: [PATCH] chunks index caching, fixes #8397 borg compact now uses ChunkIndex (a specialized, memory-efficient data structure), so it needs less memory now. Also, it saves that chunks index to cache/chunks in the repository. When the chunks index is needed, it is first tried to get it from cache/chunks. If that fails, fall back to building the chunks index via repository.list(), which can be rather slow and immediately cache the resulting ChunkIndex in the repo. borg check --repair currently just deletes the chunks cache, because it might have deleted some invalid chunks in the repo. cache.close now saves the chunks index to cache/chunks in repo if it was modified. thus, borg create will update the cached chunks index with new chunks. cache/chunks_hash can be used to validate cache/chunks (and also to validate / invalidate locally cached copies of that). --- src/borg/archive.py | 16 +++- src/borg/archiver/compact_cmd.py | 132 +++++++++++++++++-------------- src/borg/cache.py | 75 +++++++++++++++++- 3 files changed, 157 insertions(+), 66 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 75720ca41f..956c718872 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -50,7 +50,7 @@ from .item import Item, ArchiveItem, ItemDiff from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname from .remote import RemoteRepository, cache_if_remote -from .repository import Repository, NoManifestError +from .repository import Repository, NoManifestError, StoreObjectNotFound from .repoobj import RepoObj has_link = hasattr(os, "link") @@ -1644,7 +1644,7 @@ def check( self.check_all = not any((first, last, match, older, newer, oldest, newest)) self.repair = repair self.repository = repository - self.chunks = build_chunkindex_from_repo(self.repository) + self.chunks = build_chunkindex_from_repo(self.repository, disable_caches=True, cache_immediately=not repair) self.key = self.make_key(repository) self.repo_objs = RepoObj(self.key) if verify_data: @@ -2100,6 +2100,18 @@ def valid_item(obj): def finish(self): if self.repair: + logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.") + # we may have deleted chunks, invalidate/remove the chunks index cache! + try: + self.repository.store_delete("cache/chunks_hash") + except (Repository.ObjectNotFound, StoreObjectNotFound): + # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF + pass + try: + self.repository.store_delete("cache/chunks") + except (Repository.ObjectNotFound, StoreObjectNotFound): + # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF + pass logger.info("Writing Manifest.") self.manifest.write() diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index a9c6dced52..8fa37e90ca 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -1,9 +1,11 @@ import argparse -from typing import Tuple, Dict +from typing import Tuple, Set from ._common import with_repository from ..archive import Archive +from ..cache import write_chunkindex_to_repo_cache from ..constants import * # NOQA +from ..hashindex import ChunkIndex, ChunkIndexEntry from ..helpers import set_ec, EXIT_WARNING, EXIT_ERROR, format_file_size, bin_to_hex from ..helpers import ProgressIndicatorPercent from ..manifest import Manifest @@ -20,34 +22,33 @@ def __init__(self, repository, manifest): self.repository = repository assert isinstance(repository, (Repository, RemoteRepository)) self.manifest = manifest - self.repository_chunks = None # what we have in the repository, id -> stored_size - self.used_chunks = None # what archives currently reference - self.wanted_chunks = None # chunks that would be nice to have for next borg check --repair + self.chunks = None # a ChunkIndex, here used for: id -> (is_used, stored_size) self.total_files = None # overall number of source files written to all archives in this repo self.total_size = None # overall size of source file content data written to all archives self.archives_count = None # number of archives @property def repository_size(self): - if self.repository_chunks is None: + if self.chunks is None: return None - return sum(self.repository_chunks.values()) # sum of stored sizes + return sum(entry.size for id, entry in self.chunks.iteritems()) # sum of stored sizes def garbage_collect(self): """Removes unused chunks from a repository.""" logger.info("Starting compaction / garbage collection...") logger.info("Getting object IDs present in the repository...") - self.repository_chunks = self.get_repository_chunks() + self.chunks = self.get_repository_chunks() logger.info("Computing object IDs used by archives...") - (self.used_chunks, self.wanted_chunks, self.total_files, self.total_size, self.archives_count) = ( + (self.missing_chunks, self.reappeared_chunks, self.total_files, self.total_size, self.archives_count) = ( self.analyze_archives() ) self.report_and_delete() + self.save_chunk_index() logger.info("Finished compaction / garbage collection...") - def get_repository_chunks(self) -> Dict[bytes, int]: + def get_repository_chunks(self) -> ChunkIndex: """Build a dict id -> size of all chunks present in the repository""" - repository_chunks = {} + chunks = ChunkIndex() marker = None while True: result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker) @@ -55,13 +56,41 @@ def get_repository_chunks(self) -> Dict[bytes, int]: break marker = result[-1][0] for id, stored_size in result: - repository_chunks[id] = stored_size - return repository_chunks - - def analyze_archives(self) -> Tuple[Dict[bytes, int], Dict[bytes, int], int, int, int]: + # we add this id to the chunks index, using refcount == 0, because + # we do not know yet whether it is actually referenced from some archives. + # we "abuse" the size field here. usually there is the plaintext size, + # but we use it for the size of the stored object here. + chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size) + return chunks + + def save_chunk_index(self): + # first clean up: + for id, entry in self.chunks.iteritems(): + # we already deleted the unused chunks, so everything left must be used: + assert entry.refcount == ChunkIndex.MAX_VALUE + # as we put the wrong size in there, we need to clean up the size: + self.chunks[id] = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0) + # now self.chunks is an uptodate ChunkIndex, usable for general borg usage! + write_chunkindex_to_repo_cache(self.repository, self.chunks, compact=True, clear=True, force_write=True) + self.chunks = None # nothing there (cleared!) + + def analyze_archives(self) -> Tuple[Set, Set, int, int, int]: """Iterate over all items in all archives, create the dicts id -> size of all used/wanted chunks.""" - used_chunks = {} # chunks referenced by item.chunks - wanted_chunks = {} # additional "wanted" chunks seen in item.chunks_healthy + + def use_it(id, *, wanted=False): + entry = self.chunks.get(id) + if entry is not None: + # the chunk is in the repo, mark it used by setting refcount to max. + self.chunks[id] = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=entry.size) + if wanted: + # chunk id is from chunks_healthy list: a lost chunk has re-appeared! + reappeared_chunks.add(id) + else: + # we do NOT have this chunk in the repository! + missing_chunks.add(id) + + missing_chunks: set[bytes] = set() + reappeared_chunks: set[bytes] = set() archive_infos = self.manifest.archives.list(sort_by=["ts"]) num_archives = len(archive_infos) pi = ProgressIndicatorPercent( @@ -73,79 +102,60 @@ def analyze_archives(self) -> Tuple[Dict[bytes, int], Dict[bytes, int], int, int logger.info(f"Analyzing archive {info.name} {info.ts} {bin_to_hex(info.id)} ({i + 1}/{num_archives})") archive = Archive(self.manifest, info.id) # archive metadata size unknown, but usually small/irrelevant: - used_chunks[archive.id] = 0 + use_it(archive.id) for id in archive.metadata.item_ptrs: - used_chunks[id] = 0 + use_it(id) for id in archive.metadata.items: - used_chunks[id] = 0 + use_it(id) # archive items content data: for item in archive.iter_items(): total_files += 1 # every fs object counts, not just regular files if "chunks" in item: for id, size in item.chunks: total_size += size # original, uncompressed file content size - used_chunks[id] = size + use_it(id) if "chunks_healthy" in item: # we also consider the chunks_healthy chunks as referenced - do not throw away # anything that borg check --repair might still need. for id, size in item.chunks_healthy: - if id not in used_chunks: - wanted_chunks[id] = size + use_it(id, wanted=True) pi.finish() - return used_chunks, wanted_chunks, total_files, total_size, num_archives + return missing_chunks, reappeared_chunks, total_files, total_size, num_archives def report_and_delete(self): run_repair = " Run borg check --repair!" - missing_new = set(self.used_chunks) - set(self.repository_chunks) - if missing_new: - logger.error(f"Repository has {len(missing_new)} new missing objects." + run_repair) + if self.missing_chunks: + logger.error(f"Repository has {len(self.missing_chunks)} missing objects." + run_repair) set_ec(EXIT_ERROR) - missing_known = set(self.wanted_chunks) - set(self.repository_chunks) - if missing_known: - logger.warning(f"Repository has {len(missing_known)} known missing objects.") - set_ec(EXIT_WARNING) - - missing_found = set(self.wanted_chunks) & set(self.repository_chunks) - if missing_found: - logger.warning(f"{len(missing_found)} previously missing objects re-appeared!" + run_repair) + if self.reappeared_chunks: + logger.warning(f"{len(self.reappeared_chunks)} previously missing objects re-appeared!" + run_repair) set_ec(EXIT_WARNING) repo_size_before = self.repository_size - referenced_chunks = set(self.used_chunks) | set(self.wanted_chunks) - unused = set(self.repository_chunks) - referenced_chunks - logger.info(f"Repository has {len(unused)} objects to delete.") - if unused: - logger.info(f"Deleting {len(unused)} unused objects...") - pi = ProgressIndicatorPercent( - total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete" - ) - for i, id in enumerate(unused): - pi.show(i) - self.repository.delete(id) - del self.repository_chunks[id] - pi.finish() + logger.info("Determining unused objects...") + unused = set() + for id, entry in self.chunks.iteritems(): + if entry.refcount == 0: + unused.add(id) + logger.info(f"Deleting {len(unused)} unused objects...") + pi = ProgressIndicatorPercent( + total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete" + ) + for i, id in enumerate(unused): + pi.show(i) + self.repository.delete(id) + del self.chunks[id] + pi.finish() repo_size_after = self.repository_size - count = len(self.repository_chunks) + count = len(self.chunks) logger.info(f"Overall statistics, considering all {self.archives_count} archives in this repository:") logger.info( f"Source data size was {format_file_size(self.total_size, precision=0)} in {self.total_files} files." ) - dsize = 0 - for id in self.repository_chunks: - if id in self.used_chunks: - dsize += self.used_chunks[id] - elif id in self.wanted_chunks: - dsize += self.wanted_chunks[id] - else: - raise KeyError(bin_to_hex(id)) - logger.info(f"Repository size is {format_file_size(self.repository_size, precision=0)} in {count} objects.") - if self.total_size != 0: - logger.info(f"Space reduction factor due to deduplication: {dsize / self.total_size:.3f}") - if dsize != 0: - logger.info(f"Space reduction factor due to compression: {self.repository_size / dsize:.3f}") + logger.info(f"Repository size is {format_file_size(repo_size_after, precision=0)} in {count} objects.") logger.info(f"Compaction saved {format_file_size(repo_size_before - repo_size_after, precision=0)}.") diff --git a/src/borg/cache.py b/src/borg/cache.py index 5276b66bba..28d4951ae0 100644 --- a/src/borg/cache.py +++ b/src/borg/cache.py @@ -1,4 +1,5 @@ import configparser +import io import os import shutil import stat @@ -30,7 +31,7 @@ from .manifest import Manifest from .platform import SaveFile from .remote import RemoteRepository -from .repository import LIST_SCAN_LIMIT, Repository +from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound # chunks is a list of ChunkListEntry FileCacheEntry = namedtuple("FileCacheEntry", "age inode size ctime mtime chunks") @@ -618,7 +619,64 @@ def memorize_file(self, hashed_path, path_hash, st, chunks): ) -def build_chunkindex_from_repo(repository): +def load_chunks_hash(repository) -> bytes: + try: + hash = repository.store_load("cache/chunks_hash") + logger.debug(f"cache/chunks_hash is '{bin_to_hex(hash)}'.") + except (Repository.ObjectNotFound, StoreObjectNotFound): + # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF + hash = b"" + logger.debug("cache/chunks_hash missing!") + return hash + + +def write_chunkindex_to_repo_cache(repository, chunks, *, compact=False, clear=False, force_write=False): + cached_hash = load_chunks_hash(repository) + if compact: + # if we don't need the in-memory chunks index anymore: + chunks.compact() # vacuum the hash table + with io.BytesIO() as f: + chunks.write(f) + data = f.getvalue() + if clear: + # if we don't need the in-memory chunks index anymore: + chunks.clear() # free memory, immediately + new_hash = xxh64(data) + if force_write or new_hash != cached_hash: + # when an updated chunks index is stored into the cache, we also store its hash into the cache. + # when a client is loading the chunks index from a cache, it has to compare its xxh64 + # hash against cache/chunks_hash in the repository. if it is the same, the cache + # is valid. If it is different, the cache is either corrupted or out of date and + # has to be discarded. + # when some functionality is DELETING chunks from the repository, it has to either update + # both cache/chunks and cache/chunks_hash (like borg compact does) or it has to delete both, + # so that all clients will discard any client-local chunks index caches. + logger.debug(f"caching chunks index {bin_to_hex(new_hash)} in repository...") + repository.store_store("cache/chunks", data) + repository.store_store("cache/chunks_hash", new_hash) + return new_hash + + +def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False): + chunks = None + # first, try to load a pre-computed and centrally cached chunks index: + if not disable_caches: + wanted_hash = load_chunks_hash(repository) + logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_hash)}) from the repo...") + try: + chunks_data = repository.store_load("cache/chunks") + except (Repository.ObjectNotFound, StoreObjectNotFound): + # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF + logger.debug("cache/chunks not found in the repository.") + else: + if xxh64(chunks_data) == wanted_hash: + logger.debug("cache/chunks is valid.") + with io.BytesIO(chunks_data) as f: + chunks = ChunkIndex.read(f) + return chunks + else: + logger.debug("cache/chunks is invalid.") + # if we didn't get anything from the cache, compute the ChunkIndex the slow way: logger.debug("querying the chunk IDs list from the repo...") chunks = ChunkIndex() t0 = perf_counter() @@ -646,6 +704,9 @@ def build_chunkindex_from_repo(repository): # Protocol overhead is neglected in this calculation. speed = format_file_size(num_chunks * 34 / duration) logger.debug(f"queried {num_chunks} chunk IDs in {duration} s ({num_requests} requests), ~{speed}/s") + if cache_immediately: + # immediately update cache/chunks, so we only rarely have to do it the slow way: + write_chunkindex_to_repo_cache(repository, chunks, compact=False, clear=False, force_write=True) return chunks @@ -660,7 +721,7 @@ def __init__(self): @property def chunks(self): if self._chunks is None: - self._chunks = build_chunkindex_from_repo(self.repository) + self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True) return self._chunks def seen_chunk(self, id, size=None): @@ -709,6 +770,11 @@ def add_chunk( stats.update(size, not exists) return ChunkListEntry(id, size) + def _write_chunks_cache(self, chunks): + # this is called from .close, so we can clear/compact here: + write_chunkindex_to_repo_cache(self.repository, self._chunks, compact=True, clear=True) + self._chunks = None # nothing there (cleared!) + class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin): """ @@ -794,6 +860,9 @@ def close(self): pi.output("Saving files cache") integrity_data = self._write_files_cache(self._files) self.cache_config.integrity[self.files_cache_name()] = integrity_data + if self._chunks is not None: + pi.output("Saving chunks cache") + self._write_chunks_cache(self._chunks) # cache/chunks in repo has a different integrity mechanism pi.output("Saving cache config") self.cache_config.save(self.manifest) self.cache_config.close()