Skip to content

Commit

Permalink
Merge pull request #8411 from ThomasWaldmann/optimize-repo-list-usage
Browse files Browse the repository at this point in the history
bugfix: remove superfluous repository.list() call
  • Loading branch information
ThomasWaldmann authored Sep 25, 2024
2 parents 7d02fe2 + 1436bbb commit 67b62b5
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 94 deletions.
20 changes: 7 additions & 13 deletions src/borg/archiver/compact_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ..helpers import ProgressIndicatorPercent
from ..manifest import Manifest
from ..remote import RemoteRepository
from ..repository import Repository
from ..repository import Repository, repo_lister

from ..logger import create_logger

Expand Down Expand Up @@ -49,18 +49,12 @@ def garbage_collect(self):
def get_repository_chunks(self) -> ChunkIndex:
"""Build a dict id -> size of all chunks present in the repository"""
chunks = ChunkIndex()
marker = None
while True:
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
if not result:
break
marker = result[-1][0]
for id, stored_size in result:
# we add this id to the chunks index, using refcount == 0, because
# we do not know yet whether it is actually referenced from some archives.
# we "abuse" the size field here. usually there is the plaintext size,
# but we use it for the size of the stored object here.
chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size)
for id, stored_size in repo_lister(self.repository, limit=LIST_SCAN_LIMIT):
# we add this id to the chunks index, using refcount == 0, because
# we do not know yet whether it is actually referenced from some archives.
# we "abuse" the size field here. usually there is the plaintext size,
# but we use it for the size of the stored object here.
chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size)
return chunks

def save_chunk_index(self):
Expand Down
66 changes: 27 additions & 39 deletions src/borg/archiver/debug_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ..helpers import CommandError, RTError
from ..manifest import Manifest
from ..platform import get_process_id
from ..repository import Repository, LIST_SCAN_LIMIT
from ..repository import Repository, LIST_SCAN_LIMIT, repo_lister
from ..repoobj import RepoObj

from ._common import with_repository, Highlander
Expand Down Expand Up @@ -130,15 +130,9 @@ def decrypt_dump(id, cdata):
cdata = repository.get(id)
key = key_factory(repository, cdata)
repo_objs = RepoObj(key)
marker = None
while True:
result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
if not result:
break
marker = result[-1][0]
for id, stored_size in result:
cdata = repository.get(id)
decrypt_dump(id, cdata)
for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT):
cdata = repository.get(id)
decrypt_dump(id, cdata)
print("Done.")

@with_repository(manifest=False)
Expand Down Expand Up @@ -177,38 +171,32 @@ def print_finding(info, wanted, data, offset):
key = key_factory(repository, cdata)
repo_objs = RepoObj(key)

marker = None
last_data = b""
last_id = None
i = 0
while True:
result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
if not result:
break
marker = result[-1][0]
for id, stored_size in result:
cdata = repository.get(id)
_, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE)

# try to locate wanted sequence crossing the border of last_data and data
boundary_data = last_data[-(len(wanted) - 1) :] + data[: len(wanted) - 1]
if wanted in boundary_data:
boundary_data = last_data[-(len(wanted) - 1 + context) :] + data[: len(wanted) - 1 + context]
offset = boundary_data.find(wanted)
info = "%d %s | %s" % (i, last_id.hex(), id.hex())
print_finding(info, wanted, boundary_data, offset)

# try to locate wanted sequence in data
count = data.count(wanted)
if count:
offset = data.find(wanted) # only determine first occurrence's offset
info = "%d %s #%d" % (i, id.hex(), count)
print_finding(info, wanted, data, offset)

last_id, last_data = id, data
i += 1
if i % 10000 == 0:
print("%d objects processed." % i)
for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT):
cdata = repository.get(id)
_, data = repo_objs.parse(id, cdata, ro_type=ROBJ_DONTCARE)

# try to locate wanted sequence crossing the border of last_data and data
boundary_data = last_data[-(len(wanted) - 1) :] + data[: len(wanted) - 1]
if wanted in boundary_data:
boundary_data = last_data[-(len(wanted) - 1 + context) :] + data[: len(wanted) - 1 + context]
offset = boundary_data.find(wanted)
info = "%d %s | %s" % (i, last_id.hex(), id.hex())
print_finding(info, wanted, boundary_data, offset)

# try to locate wanted sequence in data
count = data.count(wanted)
if count:
offset = data.find(wanted) # only determine first occurrence's offset
info = "%d %s #%d" % (i, id.hex(), count)
print_finding(info, wanted, data, offset)

last_id, last_data = id, data
i += 1
if i % 10000 == 0:
print("%d objects processed." % i)
print("Done.")

@with_repository(manifest=False)
Expand Down
26 changes: 9 additions & 17 deletions src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from .manifest import Manifest
from .platform import SaveFile
from .remote import RemoteRepository
from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound
from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister

# chunks is a list of ChunkListEntry
FileCacheEntry = namedtuple("FileCacheEntry", "age inode size ctime mtime chunks")
Expand Down Expand Up @@ -680,30 +680,22 @@ def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immedi
logger.debug("querying the chunk IDs list from the repo...")
chunks = ChunkIndex()
t0 = perf_counter()
num_requests = 0
num_chunks = 0
marker = None
while True:
result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
num_requests += 1
if not result:
break
marker = result[-1][0]
# The repo says it has these chunks, so we assume they are referenced chunks.
# We do not care for refcounting anymore, so we just set refcount = MAX_VALUE.
# We do not know the plaintext size (!= stored_size), thus we set size = 0.
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
for id, stored_size in result:
num_chunks += 1
chunks[id] = init_entry
# The repo says it has these chunks, so we assume they are referenced chunks.
# We do not care for refcounting anymore, so we just set refcount = MAX_VALUE.
# We do not know the plaintext size (!= stored_size), thus we set size = 0.
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT):
num_chunks += 1
chunks[id] = init_entry
# Cache does not contain the manifest.
if not isinstance(repository, (Repository, RemoteRepository)):
del chunks[Manifest.MANIFEST_ID]
duration = perf_counter() - t0 or 0.001
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
# Protocol overhead is neglected in this calculation.
speed = format_file_size(num_chunks * 34 / duration)
logger.debug(f"queried {num_chunks} chunk IDs in {duration} s ({num_requests} requests), ~{speed}/s")
logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s")
if cache_immediately:
# immediately update cache/chunks, so we only rarely have to do it the slow way:
write_chunkindex_to_repo_cache(repository, chunks, compact=False, clear=False, force_write=True)
Expand Down
12 changes: 12 additions & 0 deletions src/borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@
logger = create_logger(__name__)


def repo_lister(repository, *, limit=None):
marker = None
finished = False
while not finished:
result = repository.list(limit=limit, marker=marker)
finished = (len(result) < limit) if limit is not None else (len(result) == 0)
if not finished:
marker = result[-1][0]
for id, stored_size in result:
yield id, stored_size


class Repository:
"""borgstore based key value store"""

Expand Down
44 changes: 19 additions & 25 deletions src/borg/testsuite/archiver/repo_compress_cmd.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os

from ...constants import * # NOQA
from ...repository import Repository
from ...repository import Repository, repo_lister
from ...manifest import Manifest
from ...compress import ZSTD, ZLIB, LZ4, CNONE
from ...helpers import bin_to_hex
Expand All @@ -15,30 +15,24 @@ def check_compression(ctype, clevel, olevel):
repository = Repository(archiver.repository_path, exclusive=True)
with repository:
manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
marker = None
while True:
result = repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
if not result:
break
marker = result[-1][0]
for id, _ in result:
chunk = repository.get(id, read_data=True)
meta, data = manifest.repo_objs.parse(
id, chunk, ro_type=ROBJ_DONTCARE
) # will also decompress according to metadata
m_olevel = meta.get("olevel", -1)
m_psize = meta.get("psize", -1)
print(bin_to_hex(id), meta["ctype"], meta["clevel"], meta["csize"], meta["size"], m_olevel, m_psize)
# this is not as easy as one thinks due to the DecidingCompressor choosing the smallest of
# (desired compressed, lz4 compressed, not compressed).
assert meta["ctype"] in (ctype, LZ4.ID, CNONE.ID)
assert meta["clevel"] in (clevel, 255) # LZ4 and CNONE has level 255
if olevel != -1: # we expect obfuscation
assert "psize" in meta
assert m_olevel == olevel
else:
assert "psize" not in meta
assert "olevel" not in meta
for id, _ in repo_lister(repository, limit=LIST_SCAN_LIMIT):
chunk = repository.get(id, read_data=True)
meta, data = manifest.repo_objs.parse(
id, chunk, ro_type=ROBJ_DONTCARE
) # will also decompress according to metadata
m_olevel = meta.get("olevel", -1)
m_psize = meta.get("psize", -1)
print(bin_to_hex(id), meta["ctype"], meta["clevel"], meta["csize"], meta["size"], m_olevel, m_psize)
# this is not as easy as one thinks due to the DecidingCompressor choosing the smallest of
# (desired compressed, lz4 compressed, not compressed).
assert meta["ctype"] in (ctype, LZ4.ID, CNONE.ID)
assert meta["clevel"] in (clevel, 255) # LZ4 and CNONE has level 255
if olevel != -1: # we expect obfuscation
assert "psize" in meta
assert m_olevel == olevel
else:
assert "psize" not in meta
assert "olevel" not in meta

create_regular_file(archiver.input_path, "file1", size=1024 * 10)
create_regular_file(archiver.input_path, "file2", contents=os.urandom(1024 * 10))
Expand Down

0 comments on commit 67b62b5

Please sign in to comment.