Skip to content

Commit

Permalink
Allow ledger close path to run async during externalize
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-lokhova committed Oct 15, 2024
1 parent 1bccbc9 commit 97c9ee3
Show file tree
Hide file tree
Showing 25 changed files with 318 additions and 219 deletions.
4 changes: 4 additions & 0 deletions src/bucket/test/BucketTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ closeLedger(Application& app, std::optional<SecretKey> skToSignValue,
app.getHerder().externalizeValue(TxSetXDRFrame::makeEmpty(lcl), ledgerNum,
lcl.header.scpValue.closeTime, upgrades,
skToSignValue);
testutil::crankUntil(
app,
[&lm, ledgerNum]() { return lm.getLastClosedLedgerNum() == ledgerNum; },
std::chrono::seconds(10));
return lm.getLastClosedLedgerHeader().hash;
}

Expand Down
3 changes: 2 additions & 1 deletion src/catchup/ApplyLedgerWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ BasicWork::State
ApplyLedgerWork::onRun()
{
ZoneScoped;
mApp.getLedgerManager().closeLedger(mLedgerCloseData);
mApp.getLedgerManager().closeLedger(mLedgerCloseData,
/* externalize */ false);
return BasicWork::State::WORK_SUCCESS;
}

Expand Down
7 changes: 6 additions & 1 deletion src/catchup/CatchupManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ class CatchupManager

// Process ledgers that could not be applied, and determine if catchup
// should run
virtual void processLedger(LedgerCloseData const& ledgerData) = 0;

// Return true is latest ledger was applied, and there are no syncing
// ledgers, Return false if ledgers are buffered with gaps, and we need to
// start catchup
virtual bool processLedger(LedgerCloseData const& ledgerData,
bool isLatestSlot) = 0;

// Forcibly switch the application into catchup mode, treating `toLedger`
// as the destination ledger number and count as the number of past ledgers
Expand Down
111 changes: 77 additions & 34 deletions src/catchup/CatchupManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,12 @@ CatchupManagerImpl::getCatchupCount()
: mApp.getConfig().CATCHUP_RECENT;
}

void
CatchupManagerImpl::processLedger(LedgerCloseData const& ledgerData)
bool
CatchupManagerImpl::processLedger(LedgerCloseData const& ledgerData,
bool isLatestSlot)
{
maybeUpdateLastQueuedToApply();

ZoneScoped;
if (catchupWorkIsDone())
{
Expand All @@ -119,26 +122,32 @@ CatchupManagerImpl::processLedger(LedgerCloseData const& ledgerData)
logAndUpdateCatchupStatus(true);
}

// Always skip old ledgers
uint32_t lastReceivedLedgerSeq = ledgerData.getLedgerSeq();
if (lastReceivedLedgerSeq <= *mLastQueuedToApply)
{
// If LCL is already at-or-ahead of the ledger we just received from the
// network, we're up to date. Return early, nothing to do.
CLOG_INFO(
Ledger,
"Skipping close ledger: local state is {}, more recent than {}",
*mLastQueuedToApply, ledgerData.getLedgerSeq());
return true;
}

// Always add a newer ledger, maybe apply
mSyncingLedgers.emplace(lastReceivedLedgerSeq, ledgerData);
mLargestLedgerSeqHeard =
std::max(mLargestLedgerSeqHeard, lastReceivedLedgerSeq);

// 1. CatchupWork is not running yet
// 2. CatchupManager received ledger that was immediately applied by
// 2. CatchupManager received ledger that should be immediately applied by
// LedgerManager: check if we have any sequential ledgers.
// If so, attempt to apply mSyncingLedgers and possibly get back in sync
if (!mCatchupWork && lastReceivedLedgerSeq ==
mApp.getLedgerManager().getLastClosedLedgerNum())
if (!mCatchupWork && lastReceivedLedgerSeq == *mLastQueuedToApply + 1)
{
tryApplySyncingLedgers();
return;
}
else if (lastReceivedLedgerSeq <=
mApp.getLedgerManager().getLastClosedLedgerNum())
{
// If LCL is already at-or-ahead of the ledger we just received from the
// network, we're up to date. Return early, nothing to do.
return;
return true;
}

// For the rest of this method: we know LCL has fallen behind the network
Expand All @@ -151,6 +160,9 @@ CatchupManagerImpl::processLedger(LedgerCloseData const& ledgerData)
// to history and commence catchup, running the (checkpoint-driven) catchup
// state machine to ledger L-1 (the end of the checkpoint covering K) and
// then replay buffered ledgers from L onwards.
CLOG_INFO(Ledger,
"Close of ledger {} buffered. mSyncingLedgers has {} ledgers",
ledgerData.getLedgerSeq(), mSyncingLedgers.size());

// First: if CatchupWork has started, just buffer and return early.
if (mCatchupWork)
Expand All @@ -160,17 +172,17 @@ CatchupManagerImpl::processLedger(LedgerCloseData const& ledgerData)
auto const& config = mCatchupWork->getCatchupConfiguration();
if (ledgerData.getLedgerSeq() <= config.toLedger())
{
return;
return false;
}

addAndTrimSyncingLedgers(ledgerData);
trimSyncingLedgers();
logAndUpdateCatchupStatus(true);
return;
return false;
}

// Next, we buffer every out of sync ledger to allow us to get back in sync
// in case the ledgers we're missing are received.
addAndTrimSyncingLedgers(ledgerData);
trimSyncingLedgers();

// Finally we wait some number of ledgers beyond the smallest buffered
// checkpoint ledger before we trigger the CatchupWork. This could be any
Expand Down Expand Up @@ -233,6 +245,7 @@ CatchupManagerImpl::processLedger(LedgerCloseData const& ledgerData)
}
}
logAndUpdateCatchupStatus(true, message);
return false;
}

void
Expand All @@ -241,7 +254,9 @@ CatchupManagerImpl::startCatchup(
std::set<std::shared_ptr<Bucket>> bucketsToRetain)
{
ZoneScoped;
auto lastClosedLedger = mApp.getLedgerManager().getLastClosedLedgerNum();
maybeUpdateLastQueuedToApply();

auto lastClosedLedger = *mLastQueuedToApply;
if ((configuration.toLedger() != CatchupConfiguration::CURRENT) &&
(configuration.toLedger() <= lastClosedLedger))
{
Expand Down Expand Up @@ -329,10 +344,13 @@ CatchupManagerImpl::logAndUpdateCatchupStatus(bool contiguous)
std::optional<LedgerCloseData>
CatchupManagerImpl::maybeGetNextBufferedLedgerToApply()
{
releaseAssert(threadIsMain());
// Since we just applied a ledger, refresh mLastQueuedToApply
maybeUpdateLastQueuedToApply();

trimSyncingLedgers();
if (!mSyncingLedgers.empty() &&
mSyncingLedgers.begin()->first ==
mApp.getLedgerManager().getLastClosedLedgerNum() + 1)
mSyncingLedgers.begin()->first == *mLastQueuedToApply + 1)
{
return std::make_optional<LedgerCloseData>(
mSyncingLedgers.begin()->second);
Expand Down Expand Up @@ -370,14 +388,18 @@ CatchupManagerImpl::syncMetrics()
}

void
CatchupManagerImpl::addAndTrimSyncingLedgers(LedgerCloseData const& ledgerData)
CatchupManagerImpl::maybeUpdateLastQueuedToApply()
{
mSyncingLedgers.emplace(ledgerData.getLedgerSeq(), ledgerData);
trimSyncingLedgers();

CLOG_INFO(Ledger,
"Close of ledger {} buffered. mSyncingLedgers has {} ledgers",
ledgerData.getLedgerSeq(), mSyncingLedgers.size());
if (mLastQueuedToApply)
{
mLastQueuedToApply = mApp.getLedgerManager().getLastClosedLedgerNum();
}
else
{
mLastQueuedToApply =
std::max(*mLastQueuedToApply,
mApp.getLedgerManager().getLastClosedLedgerNum());
}
}

void
Expand Down Expand Up @@ -408,7 +430,7 @@ CatchupManagerImpl::trimSyncingLedgers()
// This erases [begin, it).
mSyncingLedgers.erase(mSyncingLedgers.begin(), it);
};
removeLedgersLessThan(mApp.getLedgerManager().getLastClosedLedgerNum() + 1);
removeLedgersLessThan(*mLastQueuedToApply + 1);
auto& hm = mApp.getHistoryManager();
if (!mSyncingLedgers.empty())
{
Expand Down Expand Up @@ -439,8 +461,7 @@ void
CatchupManagerImpl::tryApplySyncingLedgers()
{
ZoneScoped;
auto const& ledgerHeader =
mApp.getLedgerManager().getLastClosedLedgerHeader();
uint32_t nextToClose = *mLastQueuedToApply + 1;

// We can apply multiple ledgers here, which might be slow. This is a rare
// occurrence so we should be fine.
Expand All @@ -450,16 +471,38 @@ CatchupManagerImpl::tryApplySyncingLedgers()
auto const& lcd = it->second;

// we still have a missing ledger
if (ledgerHeader.header.ledgerSeq + 1 != lcd.getLedgerSeq())
if (nextToClose != lcd.getLedgerSeq())
{
break;
}

mApp.getLedgerManager().closeLedger(lcd);
CLOG_INFO(History, "Closed buffered ledger: {}",
LedgerManager::ledgerAbbrev(ledgerHeader));
if (mApp.getConfig()
.ARTIFICIALLY_DELAY_LEDGER_CLOSE_FOR_TESTING.count() > 0)
{
// Close a ledger asynchronously, with an added delay
// Usefult to test async extrnalize flow
mApp.postOnMainThread(
[&app = mApp, lcd]() {
if (app.isStopping())
{
return;
}
std::this_thread::sleep_for(
app.getConfig()
.ARTIFICIALLY_DELAY_LEDGER_CLOSE_FOR_TESTING);
app.getLedgerManager().closeLedger(lcd,
/* externalize */ true);
},
"closeLedger");
}
else
{
mApp.getLedgerManager().closeLedger(lcd, /* externalize */ true);
}
mLastQueuedToApply = lcd.getLedgerSeq();

++it;
++nextToClose;
}

mSyncingLedgers.erase(mSyncingLedgers.cbegin(), it);
Expand Down
9 changes: 7 additions & 2 deletions src/catchup/CatchupManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ class CatchupManagerImpl : public CatchupManager
// maintain the invariants above.
std::map<uint32_t, LedgerCloseData> mSyncingLedgers;
medida::Counter& mSyncingLedgersSize;
// Most recent ledger that was queued to be applied by CatchupManager.
// Once applued, and before a new ledger is scheduled, this is equivalent to
// LCL.
std::optional<uint32_t> mLastQueuedToApply;

void addAndTrimSyncingLedgers(LedgerCloseData const& ledgerData);
void maybeUpdateLastQueuedToApply();
void startOnlineCatchup();
void trimSyncingLedgers();
void tryApplySyncingLedgers();
Expand All @@ -61,7 +65,8 @@ class CatchupManagerImpl : public CatchupManager
CatchupManagerImpl(Application& app);
~CatchupManagerImpl() override;

void processLedger(LedgerCloseData const& ledgerData) override;
bool processLedger(LedgerCloseData const& ledgerData,
bool isLatestSlot) override;
void
startCatchup(CatchupConfiguration configuration,
std::shared_ptr<HistoryArchive> archive,
Expand Down
6 changes: 3 additions & 3 deletions src/catchup/DownloadApplyTxsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ DownloadApplyTxsWork::DownloadApplyTxsWork(
: BatchWork(app, "download-apply-ledgers")
, mRange(range)
, mDownloadDir(downloadDir)
, mLastApplied(lastApplied)
, mLastQueuedToApply(lastApplied)
, mCheckpointToQueue(
app.getHistoryManager().checkpointContainingLedger(range.mFirst))
, mWaitForPublish(waitForPublish)
Expand Down Expand Up @@ -171,7 +171,7 @@ DownloadApplyTxsWork::resetIter()
mCheckpointToQueue =
mApp.getHistoryManager().checkpointContainingLedger(mRange.mFirst);
mLastYieldedWork.reset();
mLastApplied = mApp.getLedgerManager().getLastClosedLedgerHeader();
mLastQueuedToApply = mApp.getLedgerManager().getLastClosedLedgerHeader();
}

bool
Expand All @@ -189,7 +189,7 @@ DownloadApplyTxsWork::hasNext() const
void
DownloadApplyTxsWork::onSuccess()
{
mLastApplied = mApp.getLedgerManager().getLastClosedLedgerHeader();
mLastQueuedToApply = mApp.getLedgerManager().getLastClosedLedgerHeader();
}

std::string
Expand Down
2 changes: 1 addition & 1 deletion src/catchup/DownloadApplyTxsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class DownloadApplyTxsWork : public BatchWork
{
LedgerRange const mRange;
TmpDir const& mDownloadDir;
LedgerHeaderHistoryEntry& mLastApplied;
LedgerHeaderHistoryEntry& mLastQueuedToApply;
uint32_t mCheckpointToQueue;
std::shared_ptr<BasicWork> mLastYieldedWork;
bool const mWaitForPublish;
Expand Down
3 changes: 2 additions & 1 deletion src/catchup/ReplayDebugMetaWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ ReplayDebugMetaWork::applyLastLedger()
if (lcl + 1 == debugTxSet.ledgerSeq)
{
mApp.getLedgerManager().closeLedger(
LedgerCloseData::toLedgerCloseData(debugTxSet));
LedgerCloseData::toLedgerCloseData(debugTxSet),
/* externalize */ false);
}
else
{
Expand Down
3 changes: 2 additions & 1 deletion src/herder/Herder.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class Herder
// restores Herder's state from disk
virtual void start() = 0;

virtual void lastClosedLedgerIncreased(bool latest) = 0;
virtual void lastClosedLedgerIncreased(bool latest,
TxSetXDRFrameConstPtr txSet) = 0;

// Setup Herder's state to fully participate in consensus
virtual void setTrackingSCPState(uint64_t index, StellarValue const& value,
Expand Down
21 changes: 11 additions & 10 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "herder/HerderImpl.h"
#include "bucket/BucketListSnapshot.h"
#include "bucket/BucketManager.h"
#include "bucket/BucketSnapshotManager.h"
#include "crypto/Hex.h"
#include "crypto/KeyUtils.h"
#include "crypto/SHA.h"
Expand All @@ -14,9 +17,6 @@
#include "herder/TxSetFrame.h"
#include "herder/TxSetUtils.h"
#include "ledger/LedgerManager.h"
#include "ledger/LedgerTxn.h"
#include "ledger/LedgerTxnEntry.h"
#include "ledger/LedgerTxnHeader.h"
#include "lib/json/json.h"
#include "main/Application.h"
#include "main/Config.h"
Expand Down Expand Up @@ -249,10 +249,6 @@ HerderImpl::newSlotExternalized(bool synchronous, StellarValue const& value)
// start timing next externalize from this point
mLastExternalize = mApp.getClock().now();

// In order to update the transaction queue we need to get the
// applied transactions.
updateTransactionQueue(mPendingEnvelopes.getTxSet(value.txSetHash));

// perform cleanups
// Evict slots that are outside of our ledger validity bracket
auto minSlotToRemember = getMinLedgerSeqToRemember();
Expand Down Expand Up @@ -359,7 +355,7 @@ HerderImpl::processExternalized(uint64 slotIndex, StellarValue const& value,
writeDebugTxSet(ledgerData);
}

mLedgerManager.valueExternalized(ledgerData);
mLedgerManager.valueExternalized(ledgerData, isLatestSlot);
}

void
Expand Down Expand Up @@ -1136,14 +1132,19 @@ HerderImpl::safelyProcessSCPQueue(bool synchronous)
}

void
HerderImpl::lastClosedLedgerIncreased(bool latest)
HerderImpl::lastClosedLedgerIncreased(bool latest, TxSetXDRFrameConstPtr txSet)
{
releaseAssert(threadIsMain());
maybeSetupSorobanQueue(
mLedgerManager.getLastClosedLedgerHeader().header.ledgerVersion);

// Ensure potential upgrades are handled in overlay
maybeHandleUpgrade();

// In order to update the transaction queue we need to get the
// applied transactions.
updateTransactionQueue(txSet);

if (latest)
{
releaseAssert(isTracking());
Expand Down Expand Up @@ -1531,7 +1532,7 @@ HerderImpl::getUpgradesJson()
void
HerderImpl::forceSCPStateIntoSyncWithLastClosedLedger()
{
auto const& header = mLedgerManager.getLastClosedLedgerHeader().header;
auto header = mLedgerManager.getLastClosedLedgerHeader().header;
setTrackingSCPState(header.ledgerSeq, header.scpValue,
/* isTrackingNetwork */ true);
}
Expand Down
Loading

0 comments on commit 97c9ee3

Please sign in to comment.