Skip to content

Commit

Permalink
Build checkpoints directly instead of going to SQL, cleanup outdated …
Browse files Browse the repository at this point in the history
…functionality
  • Loading branch information
marta-lokhova committed Sep 12, 2024
1 parent 2638c51 commit eace482
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 553 deletions.
11 changes: 11 additions & 0 deletions src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <set>
#include <thread>

#include "history/FileTransferInfo.h"
#include "medida/counter.h"
#include "medida/meter.h"
#include "medida/metrics_registry.h"
Expand Down Expand Up @@ -133,6 +134,16 @@ BucketManagerImpl::initialize()
mApp.getConfig().QUERY_SNAPSHOT_LEDGERS);
}
}

// Create persistent publish directories
// Note: HISTORY_FILE_TYPE_BUCKET is already tracked by BucketList in
// BUCKET_DIR_PATH, HISTORY_FILE_TYPE_SCP is persisted to the database
// so create the remaining ledger header, transactions and results
// directories
createPublishDir(FileType::HISTORY_FILE_TYPE_LEDGER, mApp.getConfig());
createPublishDir(FileType::HISTORY_FILE_TYPE_TRANSACTIONS,
mApp.getConfig());
createPublishDir(FileType::HISTORY_FILE_TYPE_RESULTS, mApp.getConfig());
}

void
Expand Down
10 changes: 8 additions & 2 deletions src/database/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ bool Database::gDriversRegistered = false;

// smallest schema version supported
static unsigned long const MIN_SCHEMA_VERSION = 21;
static unsigned long const SCHEMA_VERSION = 22;
static unsigned long const SCHEMA_VERSION = 23;

// These should always match our compiled version precisely, since we are
// using a bundled version to get access to carray(). But in case someone
Expand Down Expand Up @@ -213,7 +213,13 @@ Database::applySchemaUpgrade(unsigned long vers)
switch (vers)
{
case 22:
deprecateTransactionFeeHistory(*this);
dropSupportTransactionFeeHistory(*this);
break;
case 23:
dropSupportTxSetHistory(*this);
CLOG_WARNING(Database,
"`txhistory` SQL table is deprecated and will be "
"removed in the future");
break;
default:
throw std::runtime_error("Unknown DB schema version");
Expand Down
17 changes: 17 additions & 0 deletions src/history/HistoryManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "herder/TxSetFrame.h"
#include "history/HistoryArchive.h"
#include "overlay/StellarXDR.h"
#include "util/GlobalChecks.h"
Expand Down Expand Up @@ -331,6 +332,9 @@ class HistoryManager
// Returns the number of publishes initiated.
virtual size_t publishQueuedHistory() = 0;

// Prepare checkpoint files for publishing
virtual void maybeCheckpointComplete() = 0;

// Return the set of buckets referenced by the persistent (DB) publish
// queue that are not present in the BucketManager. These need to be
// fetched from somewhere before publishing can begin again.
Expand All @@ -355,6 +359,19 @@ class HistoryManager
std::vector<std::string> const& originalBuckets,
bool success) = 0;

virtual void
appendTransactionSet(uint32_t ledgerSeq, TxSetXDRFrameConstPtr const& txSet,
TransactionResultSet const& resultSet) = 0;
virtual void appendLedgerHeader(LedgerHeader const& header) = 0;

// On startup, restore checkpoint files based on the last committed LCL
virtual void restoreCheckpoint(uint32_t lcl) = 0;

// Cleanup published files. If core is reset to genesis, any unpublished
// files will be cleaned by removal of the buckets directory.
virtual void deletePublishedFiles(uint32_t ledgerSeq,
Config const& cfg) = 0;

// clear the publish queue for any ledgers more recent than ledgerSeq
virtual void deleteCheckpointsNewerThan(uint32_t ledgerSeq) = 0;

Expand Down
77 changes: 77 additions & 0 deletions src/history/HistoryManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ HistoryManagerImpl::HistoryManagerImpl(Application& app)
app.getMetrics().NewMeter({"history", "publish", "failure"}, "event"))
, mEnqueueToPublishTimer(
app.getMetrics().NewTimer({"history", "publish", "time"}))
, mCheckpoint(std::make_unique<CheckpointBuilder>(app))
{
}

Expand Down Expand Up @@ -347,6 +348,20 @@ HistoryManagerImpl::publishQueuedHistory()
return 0;
}

void
HistoryManagerImpl::maybeCheckpointComplete()
{
uint32_t lcl = mApp.getLedgerManager().getLastClosedLedgerNum();
if (!publishCheckpointOnLedgerClose(lcl) ||
!mApp.getHistoryArchiveManager().publishEnabled())
{
return;
}

releaseAssert(mCheckpoint);
mCheckpoint->checkpointComplete(lcl);
}

std::vector<HistoryArchiveState>
HistoryManagerImpl::getPublishQueueStates()
{
Expand Down Expand Up @@ -419,6 +434,26 @@ HistoryManagerImpl::getMissingBucketsReferencedByPublishQueue()
return std::vector<std::string>(buckets.begin(), buckets.end());
}

void
HistoryManagerImpl::deletePublishedFiles(uint32_t ledgerSeq, Config const& cfg)
{
releaseAssert(isLastLedgerInCheckpoint(ledgerSeq));
FileTransferInfo res(FileType::HISTORY_FILE_TYPE_RESULTS, ledgerSeq,
mApp.getConfig());
FileTransferInfo txs(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, ledgerSeq,
mApp.getConfig());
FileTransferInfo headers(FileType::HISTORY_FILE_TYPE_LEDGER, ledgerSeq,
mApp.getConfig());
// Dirty files shouldn't exist, but cleanup just in case
std::remove(res.localPath_nogz_dirty().c_str());
std::remove(txs.localPath_nogz_dirty().c_str());
std::remove(headers.localPath_nogz_dirty().c_str());
// Remove published files
std::remove(res.localPath_nogz().c_str());
std::remove(txs.localPath_nogz().c_str());
std::remove(headers.localPath_nogz().c_str());
}

void
HistoryManagerImpl::historyPublished(
uint32_t ledgerSeq, std::vector<std::string> const& originalBuckets,
Expand Down Expand Up @@ -451,6 +486,7 @@ HistoryManagerImpl::historyPublished(
}

mPublishQueueBuckets.removeBuckets(originalBuckets);
deletePublishedFiles(ledgerSeq, mApp.getConfig());
}
else
{
Expand All @@ -461,6 +497,47 @@ HistoryManagerImpl::historyPublished(
"HistoryManagerImpl: publishQueuedHistory");
}

void
HistoryManagerImpl::appendTransactionSet(uint32_t ledgerSeq,
TxSetXDRFrameConstPtr const& txSet,
TransactionResultSet const& resultSet)
{
if (mApp.getHistoryArchiveManager().publishEnabled())
{
releaseAssert(mCheckpoint);
mCheckpoint->appendTransactionSet(ledgerSeq, txSet, resultSet);
}
}

void
HistoryManagerImpl::appendLedgerHeader(LedgerHeader const& header)
{
if (mApp.getHistoryArchiveManager().publishEnabled())
{
releaseAssert(mCheckpoint);
mCheckpoint->appendLedgerHeader(header);
#ifdef BUILD_TESTS
if (mThrowOnLastAppend && isLastLedgerInCheckpoint(header.ledgerSeq))
{
throw std::runtime_error("Throwing for testing");
}
#endif
}
}

void
HistoryManagerImpl::restoreCheckpoint(uint32_t lcl)
{
if (mApp.getHistoryArchiveManager().publishEnabled())
{
releaseAssert(mCheckpoint);
mCheckpoint->cleanup(lcl);
// Maybe finalize checkpoint if we're at a checkpoint boundary and
// haven't rotated yet. No-op if checkpoint has been rotated already
maybeCheckpointComplete();
}
}

void
HistoryManagerImpl::deleteCheckpointsNewerThan(uint32_t ledgerSeq)
{
Expand Down
19 changes: 18 additions & 1 deletion src/history/HistoryManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "bucket/PublishQueueBuckets.h"
#include "history/CheckpointBuilder.h"
#include "history/HistoryManager.h"
#include "util/TmpDir.h"
#include "work/Work.h"
Expand All @@ -13,6 +14,7 @@
namespace medida
{
class Meter;
class Timer;
}

namespace stellar
Expand All @@ -36,6 +38,7 @@ class HistoryManagerImpl : public HistoryManager

medida::Timer& mEnqueueToPublishTimer;
UnorderedMap<uint32_t, std::chrono::steady_clock::time_point> mEnqueueTimes;
std::unique_ptr<CheckpointBuilder> mCheckpoint;

PublishQueueBuckets::BucketCount loadBucketsReferencedByPublishQueue();
#ifdef BUILD_TESTS
Expand Down Expand Up @@ -64,6 +67,8 @@ class HistoryManagerImpl : public HistoryManager

size_t publishQueuedHistory() override;

void maybeCheckpointComplete() override;

std::vector<std::string>
getMissingBucketsReferencedByPublishQueue() override;

Expand All @@ -74,7 +79,12 @@ class HistoryManagerImpl : public HistoryManager
void historyPublished(uint32_t ledgerSeq,
std::vector<std::string> const& originalBuckets,
bool success) override;

void appendTransactionSet(uint32_t ledgerSeq,
TxSetXDRFrameConstPtr const& txSet,
TransactionResultSet const& resultSet) override;
void appendLedgerHeader(LedgerHeader const& header) override;
void restoreCheckpoint(uint32_t lcl) override;
void deletePublishedFiles(uint32_t ledgerSeq, Config const& cfg) override;
void deleteCheckpointsNewerThan(uint32_t ledgerSeq) override;

std::string const& getTmpDir() override;
Expand All @@ -87,6 +97,13 @@ class HistoryManagerImpl : public HistoryManager

#ifdef BUILD_TESTS
void setPublicationEnabled(bool enabled) override;
bool mThrowOnLastAppend{false};
CheckpointBuilder&
getCheckpointBuilder() const
{
releaseAssert(mCheckpoint);
return *mCheckpoint;
}
#endif
};
}
36 changes: 3 additions & 33 deletions src/history/StateSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,12 @@ StateSnapshot::writeHistoryBlocks() const
// All files are streamed out of the database, entry-by-entry.
size_t nbSCPMessages;
uint32_t begin, count;
size_t nHeaders;
{
bool doFsync = !mApp.getConfig().DISABLE_XDR_FSYNC;
asio::io_context& ctx = mApp.getClock().getIOContext();
XDROutputFileStream ledgerOut(ctx, doFsync), txOut(ctx, doFsync),
txResultOut(ctx, doFsync), scpHistory(ctx, doFsync);
ledgerOut.open(mLedgerSnapFile->localPath_nogz());
txOut.open(mTransactionSnapFile->localPath_nogz());
txResultOut.open(mTransactionResultSnapFile->localPath_nogz());

// Extract SCP messages from the database
XDROutputFileStream scpHistory(ctx, doFsync);
scpHistory.open(mSCPHistorySnapFile->localPath_nogz());

auto& hm = mApp.getHistoryManager();
Expand All @@ -85,17 +82,6 @@ StateSnapshot::writeHistoryBlocks() const
CLOG_DEBUG(History, "Streaming {} ledgers worth of history, from {}",
count, begin);

nHeaders = LedgerHeaderUtils::copyToStream(mApp.getDatabase(), sess,
begin, count, ledgerOut);

size_t nTxs = copyTransactionsToStream(mApp, sess, begin, count, txOut,
txResultOut);
CLOG_DEBUG(History, "Wrote {} ledger headers to {}", nHeaders,
mLedgerSnapFile->localPath_nogz());
CLOG_DEBUG(History, "Wrote {} transactions to {} and {}", nTxs,
mTransactionSnapFile->localPath_nogz(),
mTransactionResultSnapFile->localPath_nogz());

nbSCPMessages = HerderPersistence::copySCPHistoryToStream(
mApp.getDatabase(), sess, begin, count, scpHistory);

Expand All @@ -112,22 +98,6 @@ StateSnapshot::writeHistoryBlocks() const
// When writing checkpoint 0x3f (63) we will have written 63 headers because
// header 0 doesn't exist, ledger 1 is the first. For all later checkpoints
// we will write 64 headers; any less and something went wrong[1].
//
// [1]: Probably our read transaction was serialized ahead of the write
// transaction composing the history itself, despite occurring in the
// opposite wall-clock order, this is legal behavior in SERIALIZABLE
// transaction-isolation level -- the highest offered! -- as txns only have
// to be applied in isolation and in _some_ order, not the wall-clock order
// we issued them. Anyway this is transient and should go away upon retry.
if (nHeaders != count)
{
CLOG_WARNING(
History,
"Only wrote {} ledger headers for {}, expecting {}, will retry",
nHeaders, mLedgerSnapFile->localPath_nogz(), count);
return false;
}

return true;
}

Expand Down
Loading

0 comments on commit eace482

Please sign in to comment.