From ccf6b2717d4f49c0df2a6f00532466242f11c3e2 Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Wed, 11 Sep 2024 16:52:45 -0700 Subject: [PATCH] Add tests --- src/history/test/HistoryTests.cpp | 170 ++++++++++++++++++++++++- src/history/test/HistoryTestsUtils.cpp | 146 +++++++++++++-------- src/history/test/HistoryTestsUtils.h | 19 ++- src/util/Timer.h | 6 + 4 files changed, 281 insertions(+), 60 deletions(-) diff --git a/src/history/test/HistoryTests.cpp b/src/history/test/HistoryTests.cpp index 27dc84b43d..a843159735 100644 --- a/src/history/test/HistoryTests.cpp +++ b/src/history/test/HistoryTests.cpp @@ -6,9 +6,10 @@ #include "bucket/test/BucketTestUtils.h" #include "catchup/CatchupManagerImpl.h" #include "catchup/test/CatchupWorkTests.h" +#include "history/CheckpointBuilder.h" #include "history/FileTransferInfo.h" #include "history/HistoryArchiveManager.h" -#include "history/HistoryManager.h" +#include "history/HistoryManagerImpl.h" #include "history/test/HistoryTestsUtils.h" #include "historywork/GetHistoryArchiveStateWork.h" #include "historywork/GunzipFileWork.h" @@ -472,6 +473,59 @@ TEST_CASE("History publish", "[history][publish]") catchupSimulation.ensureOfflineCatchupPossible(checkpointLedger); } +TEST_CASE("History publish with restart", "[history][publish]") +{ + auto catchupSimulation = + CatchupSimulation(VirtualClock::VIRTUAL_TIME, + std::make_shared(), true, + Config::TESTDB_ON_DISK_SQLITE); + auto checkpointLedger = catchupSimulation.getLastCheckpointLedger(2); + + // Restart at various points in the checkpoint, core should continue + // properly writing checkpoint files + auto ledgerNums = std::vector{ + LedgerManager::GENESIS_LEDGER_SEQ, + 10, + catchupSimulation.getLastCheckpointLedger(1), + catchupSimulation.getApp() + .getHistoryManager() + .firstLedgerInCheckpointContaining(checkpointLedger), + checkpointLedger - 1, + checkpointLedger}; + for (auto ledgerNum : ledgerNums) + { + SECTION("Restart at ledger " + std::to_string(ledgerNum)) + { + SECTION("graceful") + { + catchupSimulation.ensureOfflineCatchupPossible(checkpointLedger, + ledgerNum); + } + SECTION("crash leaves dirty data") + { + auto& hm = static_cast( + catchupSimulation.getApp().getHistoryManager()); + hm.mThrowOnLastAppend = true; + REQUIRE_THROWS_AS( + catchupSimulation.ensureOfflineCatchupPossible( + checkpointLedger), + std::runtime_error); + // Restart app, truncate dirty data in checkpoints, proceed to + // publish + catchupSimulation.restartApp(); + catchupSimulation.ensureOfflineCatchupPossible( + checkpointLedger); + } + + // Now catchup to ensure published checkpoints are valid + auto app = catchupSimulation.createCatchupApplication( + std::numeric_limits::max(), + Config::TESTDB_ON_DISK_SQLITE, "app"); + REQUIRE(catchupSimulation.catchupOffline(app, checkpointLedger)); + } + } +} + TEST_CASE("History publish to multiple archives", "[history]") { Config cfg(getTestConfig()); @@ -1630,3 +1684,117 @@ TEST_CASE("Externalize gap while catchup work is running", "[history][catchup]") REQUIRE(catchupSimulation.catchupOnline(app, lcl + 2, 0, 0, 0, {128, 129, 127})); } + +TEST_CASE("CheckpointBuilder", "[history][publish]") +{ + VirtualClock clock; + auto cfg = getTestConfig(0, Config::TESTDB_ON_DISK_SQLITE); + TmpDirHistoryConfigurator().configure(cfg, true); + + auto app = createTestApplication(clock, cfg); + releaseAssert(app->getLedgerManager().getLastClosedLedgerNum() == + LedgerManager::GENESIS_LEDGER_SEQ); + auto& hm = static_cast(app->getHistoryManager()); + auto& cb = hm.getCheckpointBuilder(); + auto lcl = app->getLedgerManager().getLastClosedLedgerNum(); + + auto generate = [&](uint32_t count, bool appendHeaders = true) { + for (int i = lcl; i < lcl + count; ++i) + { + LedgerHeaderHistoryEntry lh; + lh.header.ledgerSeq = i; + cb.appendTransactionSet(i, TxSetXDRFrame::makeEmpty(lh), + TransactionResultSet{}); + // Do not append last ledger in a checkpoint if `appendHeaders` is + // false + if (!appendHeaders && i == count) + { + continue; + } + cb.appendLedgerHeader(lh.header); + } + }; + + auto validateHdr = [&](std::string path, uint32_t ledger) { + XDRInputFileStream hdrIn; + hdrIn.open(path); + LedgerHeaderHistoryEntry entry; + while (hdrIn && hdrIn.readOne(entry)) + { + REQUIRE(entry.header.ledgerSeq <= ledger); + } + REQUIRE(entry.header.ledgerSeq == ledger); + }; + + auto tmpCheckpointCheck = [&](uint32_t ledger, bool isFinalized) { + auto checkpoint = + app->getHistoryManager().checkpointContainingLedger(ledger); + FileTransferInfo res(FileType::HISTORY_FILE_TYPE_RESULTS, checkpoint, + cfg); + FileTransferInfo txs(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, + checkpoint, cfg); + FileTransferInfo headers(FileType::HISTORY_FILE_TYPE_LEDGER, checkpoint, + cfg); + if (isFinalized) + { + REQUIRE(fs::exists(res.localPath_nogz())); + REQUIRE(fs::exists(txs.localPath_nogz())); + REQUIRE(!fs::exists(res.localPath_nogz_dirty())); + REQUIRE(!fs::exists(txs.localPath_nogz_dirty())); + REQUIRE(!fs::exists(headers.localPath_nogz_dirty())); + validateHdr(headers.localPath_nogz(), ledger); + } + else + { + REQUIRE(!fs::exists(res.localPath_nogz())); + REQUIRE(!fs::exists(txs.localPath_nogz())); + REQUIRE(!fs::exists(headers.localPath_nogz())); + REQUIRE(fs::exists(res.localPath_nogz_dirty())); + REQUIRE(fs::exists(txs.localPath_nogz_dirty())); + validateHdr(headers.localPath_nogz_dirty(), ledger); + } + }; + + SECTION("truncate") + { + SECTION("truncate transactions, but not headers") + { + generate(10, false); + tmpCheckpointCheck(9, false); + } + SECTION("truncate both") + { + generate(10); + tmpCheckpointCheck(10, false); + } + SECTION("truncate due to partial write") + { + generate(10); + tmpCheckpointCheck(10, false); + FileTransferInfo headers( + FileType::HISTORY_FILE_TYPE_LEDGER, + app->getHistoryManager().checkpointContainingLedger(10), + app->getConfig()); + auto sz = + std::filesystem::file_size(headers.localPath_nogz_dirty()); + std::filesystem::resize_file(headers.localPath_nogz_dirty(), + sz - 1); + } + CheckpointBuilder cb2{*app}; + cb2.cleanup(9); + tmpCheckpointCheck(9, false); + } + SECTION("checkpoint complete") + { + auto ledgerSeq = hm.checkpointContainingLedger(1); + // Checkpoint not finalized + generate(ledgerSeq); + tmpCheckpointCheck(ledgerSeq, false); + cb.checkpointComplete(ledgerSeq); + tmpCheckpointCheck(ledgerSeq, true); + REQUIRE(!cb.mOpen); + // any subssequent call to checkpointComplete is a no-op + cb.checkpointComplete(ledgerSeq); + tmpCheckpointCheck(ledgerSeq, true); + } +} diff --git a/src/history/test/HistoryTestsUtils.cpp b/src/history/test/HistoryTestsUtils.cpp index 7f283461fc..9945d11329 100644 --- a/src/history/test/HistoryTestsUtils.cpp +++ b/src/history/test/HistoryTestsUtils.cpp @@ -377,24 +377,23 @@ operator!=(CatchupPerformedWork const& x, CatchupPerformedWork const& y) CatchupSimulation::CatchupSimulation(VirtualClock::Mode mode, std::shared_ptr cg, - bool startApp) - : mClock(mode) + bool startApp, Config::TestDbMode dbMode) + : mClock(std::make_unique(mode)) , mHistoryConfigurator(cg) - , mCfg(getTestConfig()) - , mAppPtr(createTestApplication(mClock, + , mCfg(getTestConfig(0, dbMode)) + , mAppPtr(createTestApplication(*mClock, mHistoryConfigurator->configure(mCfg, true), /*newDB*/ true, /*startApp*/ false)) - , mApp(*mAppPtr) { auto dirName = cg->getArchiveDirName(); if (!dirName.empty()) { - CHECK( - mApp.getHistoryArchiveManager().initializeHistoryArchive(dirName)); + CHECK(getApp().getHistoryArchiveManager().initializeHistoryArchive( + dirName)); } if (startApp) { - mApp.start(); + mAppPtr->start(); } } @@ -405,26 +404,27 @@ CatchupSimulation::~CatchupSimulation() uint32_t CatchupSimulation::getLastCheckpointLedger(uint32_t checkpointIndex) const { - return mApp.getHistoryManager().getCheckpointFrequency() * checkpointIndex - + return getApp().getHistoryManager().getCheckpointFrequency() * + checkpointIndex - 1; } void CatchupSimulation::generateRandomLedger(uint32_t version) { - auto& lm = mApp.getLedgerManager(); + auto& lm = getApp().getLedgerManager(); uint32_t ledgerSeq = lm.getLastClosedLedgerNum() + 1; uint64_t minBalance = lm.getLastMinBalance(5); uint64_t big = minBalance + ledgerSeq; uint64_t small = 100 + ledgerSeq; uint64_t closeTime = 60 * 5 * ledgerSeq; - auto root = TestAccount{mApp, getRoot(mApp.getNetworkID())}; - auto alice = TestAccount{mApp, getAccount("alice")}; - auto bob = TestAccount{mApp, getAccount("bob")}; - auto carol = TestAccount{mApp, getAccount("carol")}; - auto eve = TestAccount{mApp, getAccount("eve")}; - auto stroopy = TestAccount{mApp, getAccount("stroopy")}; + auto root = TestAccount{getApp(), getRoot(getApp().getNetworkID())}; + auto alice = TestAccount{getApp(), getAccount("alice")}; + auto bob = TestAccount{getApp(), getAccount("bob")}; + auto carol = TestAccount{getApp(), getAccount("carol")}; + auto eve = TestAccount{getApp(), getAccount("eve")}; + auto stroopy = TestAccount{getApp(), getAccount("stroopy")}; std::vector txs; std::vector sorobanTxs; @@ -485,16 +485,17 @@ CatchupSimulation::generateRandomLedger(uint32_t version) SOROBAN_PROTOCOL_VERSION)) { SorobanResources res; - res.instructions = - mApp.getLedgerManager().maxSorobanTransactionResources().getVal( - Resource::Type::INSTRUCTIONS) / - 10; + res.instructions = getApp() + .getLedgerManager() + .maxSorobanTransactionResources() + .getVal(Resource::Type::INSTRUCTIONS) / + 10; res.writeBytes = 100'000; uint32_t inclusion = 100; sorobanTxs.push_back(createUploadWasmTx( - mApp, stroopy, inclusion, DEFAULT_TEST_RESOURCE_FEE, res)); + getApp(), stroopy, inclusion, DEFAULT_TEST_RESOURCE_FEE, res)); sorobanTxs.push_back(createUploadWasmTx( - mApp, eve, inclusion * 5, DEFAULT_TEST_RESOURCE_FEE, res)); + getApp(), eve, inclusion * 5, DEFAULT_TEST_RESOURCE_FEE, res)); check = true; } } @@ -505,7 +506,7 @@ CatchupSimulation::generateRandomLedger(uint32_t version) ? TxSetPhaseTransactions{txs, sorobanTxs} : TxSetPhaseTransactions{txs}; TxSetXDRFrameConstPtr txSet = - makeTxSetFromTransactions(phases, mApp, 0, 0).first; + makeTxSetFromTransactions(phases, getApp(), 0, 0).first; CLOG_INFO(History, "Closing synthetic ledger {} with {} txs (txhash:{})", ledgerSeq, txSet->sizeTxTotal(), @@ -520,14 +521,14 @@ CatchupSimulation::generateRandomLedger(uint32_t version) upgrades.push_back(UpgradeType{v.begin(), v.end()}); } - StellarValue sv = - mApp.getHerder().makeStellarValue(txSet->getContentsHash(), closeTime, - upgrades, mApp.getConfig().NODE_SEED); + StellarValue sv = getApp().getHerder().makeStellarValue( + txSet->getContentsHash(), closeTime, upgrades, + getApp().getConfig().NODE_SEED); mLedgerCloseDatas.emplace_back(ledgerSeq, txSet, sv); auto& txsSucceeded = - mApp.getMetrics().NewCounter({"ledger", "apply", "success"}); + getApp().getMetrics().NewCounter({"ledger", "apply", "success"}); auto lastSucceeded = txsSucceeded.count(); lm.closeLedger(mLedgerCloseDatas.back()); @@ -543,12 +544,14 @@ CatchupSimulation::generateRandomLedger(uint32_t version) mLedgerSeqs.push_back(lclh.header.ledgerSeq); mLedgerHashes.push_back(lclh.hash); mBucketListHashes.push_back(lclh.header.bucketListHash); - mBucket0Hashes.push_back(mApp.getBucketManager() + mBucket0Hashes.push_back(getApp() + .getBucketManager() .getBucketList() .getLevel(0) .getCurr() ->getHash()); - mBucket1Hashes.push_back(mApp.getBucketManager() + mBucket1Hashes.push_back(getApp() + .getBucketManager() .getBucketList() .getLevel(2) .getCurr() @@ -573,19 +576,25 @@ void CatchupSimulation::setUpgradeLedger(uint32_t ledger, ProtocolVersion upgradeProtocolVersion) { - REQUIRE(mApp.getLedgerManager().getLastClosedLedgerNum() < ledger); + REQUIRE(getApp().getLedgerManager().getLastClosedLedgerNum() < ledger); mUpgradeLedgerSeq = ledger; mUpgradeProtocolVersion = upgradeProtocolVersion; } void -CatchupSimulation::ensureLedgerAvailable(uint32_t targetLedger) +CatchupSimulation::ensureLedgerAvailable(uint32_t targetLedger, + std::optional restartLedger) { - auto& lm = mApp.getLedgerManager(); - auto& hm = mApp.getHistoryManager(); - while (lm.getLastClosedLedgerNum() < targetLedger) + while (getApp().getLedgerManager().getLastClosedLedgerNum() < targetLedger) { - auto lcl = lm.getLastClosedLedgerNum(); + if (restartLedger && + getApp().getLedgerManager().getLastClosedLedgerNum() == + *restartLedger) + { + REQUIRE(*restartLedger < targetLedger); + restartApp(); + } + auto lcl = getApp().getLedgerManager().getLastClosedLedgerNum(); if (lcl + 1 == mUpgradeLedgerSeq) { // Force protocol upgrade @@ -594,11 +603,13 @@ CatchupSimulation::ensureLedgerAvailable(uint32_t targetLedger) } else { - generateRandomLedger( - lm.getLastClosedLedgerHeader().header.ledgerVersion); + generateRandomLedger(getApp() + .getLedgerManager() + .getLastClosedLedgerHeader() + .header.ledgerVersion); } - if (hm.publishCheckpointOnLedgerClose(lcl)) + if (getApp().getHistoryManager().publishCheckpointOnLedgerClose(lcl)) { mBucketListAtLastPublish = getApp().getBucketManager().getBucketList(); @@ -609,26 +620,49 @@ CatchupSimulation::ensureLedgerAvailable(uint32_t targetLedger) void CatchupSimulation::ensurePublishesComplete() { - auto& hm = mApp.getHistoryManager(); - while (!mApp.getWorkScheduler().allChildrenDone() || - (hm.getPublishSuccessCount() < hm.getPublishQueueCount())) + auto& hm = getApp().getHistoryManager(); + while (hm.publishQueueLength() > 0 && hm.getPublishFailureCount() == 0) { - REQUIRE(hm.getPublishFailureCount() == 0); - mApp.getClock().crank(true); + getApp().getClock().crank(true); } REQUIRE(hm.getPublishFailureCount() == 0); // Make sure all references to buckets were released REQUIRE(hm.getBucketsReferencedByPublishQueue().empty()); + + // Make sure all published checkpoint files have been cleaned up + auto lcl = getApp().getLedgerManager().getLastClosedLedgerNum(); + auto firstCheckpoint = + hm.checkpointContainingLedger(LedgerManager::GENESIS_LEDGER_SEQ); + auto lastCheckpoint = hm.lastLedgerBeforeCheckpointContaining(lcl); + + for (uint32_t i = firstCheckpoint; i <= lastCheckpoint; + i += hm.getCheckpointFrequency()) + { + FileTransferInfo res(FileType::HISTORY_FILE_TYPE_RESULTS, i, + getApp().getConfig()); + FileTransferInfo txs(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, i, + getApp().getConfig()); + FileTransferInfo headers(FileType::HISTORY_FILE_TYPE_LEDGER, i, + getApp().getConfig()); + REQUIRE(!fs::exists(res.localPath_nogz_dirty())); + REQUIRE(!fs::exists(txs.localPath_nogz_dirty())); + REQUIRE(!fs::exists(headers.localPath_nogz_dirty())); + REQUIRE(!fs::exists(res.localPath_nogz())); + REQUIRE(!fs::exists(txs.localPath_nogz())); + REQUIRE(!fs::exists(headers.localPath_nogz())); + } } void -CatchupSimulation::ensureOfflineCatchupPossible(uint32_t targetLedger) +CatchupSimulation::ensureOfflineCatchupPossible( + uint32_t targetLedger, std::optional restartLedger) { - auto& hm = mApp.getHistoryManager(); - // One additional ledger is needed for publish. - ensureLedgerAvailable(hm.checkpointContainingLedger(targetLedger) + 1); + auto target = + getApp().getHistoryManager().checkpointContainingLedger(targetLedger) + + 1; + ensureLedgerAvailable(target, restartLedger); ensurePublishesComplete(); } @@ -636,7 +670,7 @@ void CatchupSimulation::ensureOnlineCatchupPossible(uint32_t targetLedger, uint32_t bufferLedgers) { - auto& hm = mApp.getHistoryManager(); + auto& hm = getApp().getHistoryManager(); // One additional ledger is needed for publish, one as a trigger ledger for // catchup, one as closing ledger. @@ -652,7 +686,7 @@ CatchupSimulation::getAllPublishedCheckpoints() const assert(mLedgerHashes.size() == mLedgerSeqs.size()); auto hi = mLedgerHashes.begin(); auto si = mLedgerSeqs.begin(); - auto const& hm = mApp.getHistoryManager(); + auto const& hm = getApp().getHistoryManager(); while (si != mLedgerSeqs.end()) { if (hm.isLastLedgerInCheckpoint(*si)) @@ -675,7 +709,7 @@ CatchupSimulation::getLastPublishedCheckpoint() const assert(mLedgerHashes.size() == mLedgerSeqs.size()); auto hi = mLedgerHashes.rbegin(); auto si = mLedgerSeqs.rbegin(); - auto const& hm = mApp.getHistoryManager(); + auto const& hm = getApp().getHistoryManager(); while (si != mLedgerSeqs.rend()) { if (hm.isLastLedgerInCheckpoint(*si)) @@ -772,7 +806,7 @@ CatchupSimulation::catchupOffline(Application::pointer app, uint32_t toLedger, CatchupPerformedWork{endCatchupMetrics - startCatchupMetrics}; REQUIRE(catchupPerformedWork == expectedCatchupWork); - if (app->getHistoryArchiveManager().hasAnyWritableHistoryArchive()) + if (app->getHistoryArchiveManager().publishEnabled()) { auto& hm = app->getHistoryManager(); REQUIRE(hm.getPublishQueueCount() - hm.getPublishSuccessCount() <= @@ -934,7 +968,7 @@ CatchupSimulation::validateCatchup(Application::pointer app) size_t i = nextLedger - 3; - auto root = TestAccount{*app, getRoot(mApp.getNetworkID())}; + auto root = TestAccount{*app, getRoot(getApp().getNetworkID())}; auto alice = TestAccount{*app, getAccount("alice")}; auto bob = TestAccount{*app, getAccount("bob")}; auto carol = TestAccount{*app, getAccount("carol")}; @@ -1082,5 +1116,13 @@ CatchupSimulation::computeCatchupPerformedWork( txSetsDownloaded, txSetsApplied}; } + +void +CatchupSimulation::restartApp() +{ + mAppPtr.reset(); + mClock = std::make_unique(mClock->getMode()); + mAppPtr = createTestApplication(*mClock, mCfg, /*newDB*/ false); +} } } diff --git a/src/history/test/HistoryTestsUtils.h b/src/history/test/HistoryTestsUtils.h index 863a0f9fed..03256969fb 100644 --- a/src/history/test/HistoryTestsUtils.h +++ b/src/history/test/HistoryTestsUtils.h @@ -178,13 +178,12 @@ struct CatchupPerformedWork class CatchupSimulation { protected: - VirtualClock mClock; + std::unique_ptr mClock; std::list mSpawnedAppsClocks; std::shared_ptr mHistoryConfigurator; Config mCfg; std::vector mCfgs; Application::pointer mAppPtr; - Application& mApp; BucketList mBucketListAtLastPublish; std::vector mLedgerCloseDatas; @@ -217,19 +216,20 @@ class CatchupSimulation VirtualClock::Mode mode = VirtualClock::VIRTUAL_TIME, std::shared_ptr cg = std::make_shared(), - bool startApp = true); + bool startApp = true, + Config::TestDbMode dbMode = Config::TESTDB_IN_MEMORY_SQLITE); ~CatchupSimulation(); Application& getApp() const { - return mApp; + return *mAppPtr; } VirtualClock& getClock() { - return mClock; + return *mClock; } HistoryConfigurator& @@ -249,8 +249,12 @@ class CatchupSimulation void generateRandomLedger(uint32_t version = 0); void ensurePublishesComplete(); - void ensureLedgerAvailable(uint32_t targetLedger); - void ensureOfflineCatchupPossible(uint32_t targetLedger); + void + ensureLedgerAvailable(uint32_t targetLedger, + std::optional restartLedger = std::nullopt); + void ensureOfflineCatchupPossible( + uint32_t targetLedger, + std::optional restartLedger = std::nullopt); void ensureOnlineCatchupPossible(uint32_t targetLedger, uint32_t bufferLedgers = 0); @@ -276,6 +280,7 @@ class CatchupSimulation VirtualClock::duration duration); void setUpgradeLedger(uint32_t ledger, ProtocolVersion upgradeVersion); + void restartApp(); }; } } diff --git a/src/util/Timer.h b/src/util/Timer.h index 6af877207a..bd0130c540 100644 --- a/src/util/Timer.h +++ b/src/util/Timer.h @@ -129,6 +129,12 @@ class VirtualClock void shutdown(); bool isStopped(); + Mode + getMode() const + { + return mMode; + } + private: asio::io_context mIOContext; Mode const mMode;