Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archival bucketlist #4403

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
364 changes: 290 additions & 74 deletions src/bucket/Bucket.cpp

Large diffs are not rendered by default.

162 changes: 118 additions & 44 deletions src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "bucket/BucketIndex.h"
#include "bucket/BucketSnapshot.h"
#include "util/NonCopyable.h"
#include "util/ProtocolVersion.h"
#include "xdr/Stellar-ledger.h"
Expand Down Expand Up @@ -36,19 +37,28 @@ namespace stellar
* Two buckets can be merged together efficiently (in a single pass): elements
* from the newer bucket overwrite elements from the older bucket, the rest are
* merged in sorted order, and all elements are hashed while being added.
*
* Different types of BucketList vary on the type of entries they contain and by
* extension the merge logic of those entries. Additionally, some types of
* BucketList may have special operations only relevant to that specific type.
* This pure virtual base class provides the core functionality of a BucketList
* container and must be extened for each specific BucketList type. In
* particular, the fresh and merge functions must be defined for the specific
* type, while other functionality can be shared.
*/

class AbstractLedgerTxn;
class Application;
class BucketManager;
class SearchableBucketListSnapshot;
struct EvictionResultEntry;
class EvictionStatistics;
struct BucketEntryCounters;
template <class BucketT> class SearchableBucketListSnapshot;
enum class LedgerEntryTypeAndDurability : uint32_t;

class Bucket : public std::enable_shared_from_this<Bucket>,
public NonMovableOrCopyable
class Bucket : public NonMovableOrCopyable
{
protected:
std::filesystem::path const mFilename;
Hash const mHash;
size_t mSize{0};
Expand All @@ -62,6 +72,9 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
std::string ext);

public:
static constexpr ProtocolVersion
FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION = ProtocolVersion::V_23;

// Create an empty bucket. The empty bucket has hash '000000...' and its
// filename is the empty string.
Bucket();
Expand All @@ -76,10 +89,6 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
std::filesystem::path const& getFilename() const;
size_t getSize() const;

// Returns true if a BucketEntry that is key-wise identical to the given
// BucketEntry exists in the bucket. For testing.
bool containsBucketIdentity(BucketEntry const& id) const;

bool isEmpty() const;

// Delete index and close file stream
Expand All @@ -96,6 +105,62 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
// Sets index, throws if index is already set
void setIndex(std::unique_ptr<BucketIndex const>&& index);

// Merge two buckets together, producing a fresh one. Entries in `oldBucket`
// are overridden in the fresh bucket by keywise-equal entries in
// `newBucket`. Entries are inhibited from the fresh bucket by keywise-equal
// entries in any of the buckets in the provided `shadows` vector.
//
// Each bucket is self-describing in terms of the ledger protocol version it
// was constructed under, and the merge algorithm adjusts to the maximum of
// the versions attached to each input or shadow bucket. The provided
// `maxProtocolVersion` bounds this (for error checking) and should usually
// be the protocol of the ledger header at which the merge is starting. An
// exception will be thrown if any provided bucket versions exceed it.
template <class BucketT>
static std::shared_ptr<BucketT>
merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<BucketT> const& oldBucket,
std::shared_ptr<BucketT> const& newBucket,
std::vector<std::shared_ptr<BucketT>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents,
asio::io_context& ctx, bool doFsync);

static std::string randomBucketName(std::string const& tmpDir);
static std::string randomBucketIndexName(std::string const& tmpDir);

#ifdef BUILD_TESTS
BucketIndex const&
getIndexForTesting() const
{
return getIndex();
}

#endif // BUILD_TESTS

virtual uint32_t getBucketVersion() const = 0;

template <class BucketT> friend class BucketSnapshotBase;
};

/*
* Live Buckets are used by the LiveBucketList to store the current canonical
* state of the ledger. They contain entries of type BucketEntry.
*/
class LiveBucket : public Bucket,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Live/Hot buckets/bucket lists would benefit from at least top-level doc comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments and rebased.

public std::enable_shared_from_this<LiveBucket>
{
public:
LiveBucket();
virtual ~LiveBucket()
{
}
LiveBucket(std::string const& filename, Hash const& hash,
std::unique_ptr<BucketIndex const>&& index);

// Returns true if a BucketEntry that is key-wise identical to the given
// BucketEntry exists in the bucket. For testing.
bool containsBucketIdentity(BucketEntry const& id) const;

// At version 11, we added support for INITENTRY and METAENTRY. Before this
// we were only supporting LIVEENTRY and DEADENTRY.
static constexpr ProtocolVersion
Expand All @@ -113,23 +178,13 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
std::vector<LedgerEntry> const& liveEntries,
std::vector<LedgerKey> const& deadEntries);

static std::string randomBucketName(std::string const& tmpDir);
static std::string randomBucketIndexName(std::string const& tmpDir);

#ifdef BUILD_TESTS
// "Applies" the bucket to the database. For each entry in the bucket,
// if the entry is init or live, creates or updates the corresponding
// entry in the database (respectively; if the entry is dead (a
// tombstone), deletes the corresponding entry in the database.
void apply(Application& app) const;

BucketIndex const&
getIndexForTesting() const
{
return getIndex();
}

#endif // BUILD_TESTS
#endif

// Returns false if eof reached, true otherwise. Modifies iter as the bucket
// is scanned. Also modifies bytesToScan and maxEntriesToEvict such that
Expand All @@ -147,45 +202,64 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
bool scanForEviction(EvictionIterator& iter, uint32_t& bytesToScan,
uint32_t ledgerSeq,
std::list<EvictionResultEntry>& evictableKeys,
SearchableBucketListSnapshot& bl) const;
SearchableBucketListSnapshot<LiveBucket>& bl) const;

// Create a fresh bucket from given vectors of init (created) and live
// (updated) LedgerEntries, and dead LedgerEntryKeys. The bucket will
// be sorted, hashed, and adopted in the provided BucketManager.
static std::shared_ptr<Bucket>
static std::shared_ptr<LiveBucket>
fresh(BucketManager& bucketManager, uint32_t protocolVersion,
std::vector<LedgerEntry> const& initEntries,
std::vector<LedgerEntry> const& liveEntries,
std::vector<LedgerKey> const& deadEntries, bool countMergeEvents,
asio::io_context& ctx, bool doFsync);

// Merge two buckets together, producing a fresh one. Entries in `oldBucket`
// are overridden in the fresh bucket by keywise-equal entries in
// `newBucket`. Entries are inhibited from the fresh bucket by keywise-equal
// entries in any of the buckets in the provided `shadows` vector.
//
// Each bucket is self-describing in terms of the ledger protocol version it
// was constructed under, and the merge algorithm adjusts to the maximum of
// the versions attached to each input or shadow bucket. The provided
// `maxProtocolVersion` bounds this (for error checking) and should usually
// be the protocol of the ledger header at which the merge is starting. An
// exception will be thrown if any provided bucket versions exceed it.
static std::shared_ptr<Bucket>
merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<Bucket> const& oldBucket,
std::shared_ptr<Bucket> const& newBucket,
std::vector<std::shared_ptr<Bucket>> const& shadows,
bool keepDeadEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync);

static uint32_t getBucketVersion(std::shared_ptr<Bucket> const& bucket);
static uint32_t
getBucketVersion(std::shared_ptr<Bucket const> const& bucket);
// Returns true if the given BucketEntry should be dropped in the bottom
// level bucket (i.e. DEADENTRY)
static bool isTombstoneEntry(BucketEntry const& e);

uint32_t getBucketVersion() const override;

BucketEntryCounters const& getBucketEntryCounters() const;
friend class BucketSnapshot;

friend class LiveBucketSnapshot;
};

/*
* Hot Archive Buckets are used by the HotBucketList to store recently evicted
* entries. They contain entries of type HotArchiveBucketEntry.
*/
class HotArchiveBucket : public Bucket,
public std::enable_shared_from_this<HotArchiveBucket>
{
static std::vector<HotArchiveBucketEntry>
convertToBucketEntry(std::vector<LedgerEntry> const& archivedEntries,
std::vector<LedgerKey> const& restoredEntries,
std::vector<LedgerKey> const& deletedEntries);

public:
HotArchiveBucket();
virtual ~HotArchiveBucket()
{
}
HotArchiveBucket(std::string const& filename, Hash const& hash,
std::unique_ptr<BucketIndex const>&& index);
uint32_t getBucketVersion() const override;

static std::shared_ptr<HotArchiveBucket>
fresh(BucketManager& bucketManager, uint32_t protocolVersion,
std::vector<LedgerEntry> const& archivedEntries,
std::vector<LedgerKey> const& restoredEntries,
std::vector<LedgerKey> const& deletedEntries, bool countMergeEvents,
asio::io_context& ctx, bool doFsync);

// Returns true if the given BucketEntry should be dropped in the bottom
// level bucket (i.e. HOT_ARCHIVE_LIVE)
static bool isTombstoneEntry(HotArchiveBucketEntry const& e);

friend class HotArchiveBucketSnapshot;
};

enum class LedgerEntryTypeAndDurability : uint32_t;
struct BucketEntryCounters
{
std::map<LedgerEntryTypeAndDurability, size_t> entryTypeCounts;
Expand Down
10 changes: 5 additions & 5 deletions src/bucket/BucketApplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ BucketApplicator::BucketApplicator(Application& app,
uint32_t maxProtocolVersion,
uint32_t minProtocolVersionSeen,
uint32_t level,
std::shared_ptr<Bucket const> bucket,
std::shared_ptr<LiveBucket const> bucket,
std::function<bool(LedgerEntryType)> filter,
std::unordered_set<LedgerKey>& seenKeys)
: mApp(app)
Expand Down Expand Up @@ -135,7 +135,7 @@ BucketApplicator::advance(BucketApplicator::Counters& counters)
}

BucketEntry const& e = *mBucketIter;
Bucket::checkProtocolLegality(e, mMaxProtocolVersion);
LiveBucket::checkProtocolLegality(e, mMaxProtocolVersion);

if (shouldApplyEntry(mEntryTypeFilter, e))
{
Expand Down Expand Up @@ -167,15 +167,15 @@ BucketApplicator::advance(BucketApplicator::Counters& counters)
// The last level can have live entries, but at that point we
// know that they are actually init entries because the earliest
// state of all entries is init, so we mark them as such here
if (mLevel == BucketList::kNumLevels - 1 &&
if (mLevel == LiveBucketList::kNumLevels - 1 &&
e.type() == LIVEENTRY)
{
ltx->createWithoutLoading(e.liveEntry());
}
else if (
protocolVersionIsBefore(
mMinProtocolVersionSeen,
Bucket::
LiveBucket::
FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY))
{
// Prior to protocol 11, INITENTRY didn't exist, so we need
Expand Down Expand Up @@ -207,7 +207,7 @@ BucketApplicator::advance(BucketApplicator::Counters& counters)
releaseAssertOrThrow(!isUsingBucketListDB);
if (protocolVersionIsBefore(
mMinProtocolVersionSeen,
Bucket::
LiveBucket::
FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY))
{
// Prior to protocol 11, DEAD entries could exist
Expand Down
4 changes: 2 additions & 2 deletions src/bucket/BucketApplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BucketApplicator
uint32_t mMaxProtocolVersion;
uint32_t mMinProtocolVersionSeen;
uint32_t mLevel;
BucketInputIterator mBucketIter;
LiveBucketInputIterator mBucketIter;
size_t mCount{0};
std::function<bool(LedgerEntryType)> mEntryTypeFilter;
std::unordered_set<LedgerKey>& mSeenKeys;
Expand Down Expand Up @@ -72,7 +72,7 @@ class BucketApplicator
// When this flag is set, each offer key read is added to seenKeys
BucketApplicator(Application& app, uint32_t maxProtocolVersion,
uint32_t minProtocolVersionSeen, uint32_t level,
std::shared_ptr<Bucket const> bucket,
std::shared_ptr<LiveBucket const> bucket,
std::function<bool(LedgerEntryType)> filter,
std::unordered_set<LedgerKey>& seenKeys);
operator bool() const;
Expand Down
1 change: 1 addition & 0 deletions src/bucket/BucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class BucketIndex : public NonMovableOrCopyable
// the largest buckets) and should only be called once. If pageSize == 0 or
// if file size is less than the cutoff, individual key index is used.
// Otherwise range index is used, with the range defined by pageSize.
template <class BucketEntryT>
static std::unique_ptr<BucketIndex const>
createIndex(BucketManager& bm, std::filesystem::path const& filename,
Hash const& hash);
Expand Down
Loading
Loading