diff --git a/src/herder/Herder.h b/src/herder/Herder.h index 4e9c2ee1b6..367752b793 100644 --- a/src/herder/Herder.h +++ b/src/herder/Herder.h @@ -172,6 +172,11 @@ class Herder // sender in the pending or recent tx sets. virtual SequenceNumber getMaxSeqInPendingTxs(AccountID const&) = 0; + // If the node has asked about a tx set with the given hash, return the + // highest slot asked about for that tx set, otherwise return 0. Used for + // deciding what get tx set requests to accept. + virtual uint64 getLastSeenSlotIndexForTxSet(Hash const& hash) = 0; + // Returns sequence number for most recent completed checkpoint that the // node knows about, as derived from // trackingConsensusLedgerIndex diff --git a/src/herder/HerderImpl.cpp b/src/herder/HerderImpl.cpp index 3cb55c3f79..410c23ca4f 100644 --- a/src/herder/HerderImpl.cpp +++ b/src/herder/HerderImpl.cpp @@ -795,6 +795,17 @@ HerderImpl::recvSCPEnvelope(SCPEnvelope const& envelope) { std::string txt("FETCHING"); ZoneText(txt.c_str(), txt.size()); + // If nomination, we should still process this. + if (envelope.statement.pledges.type() == SCP_ST_NOMINATE) + { + auto qSetHash = Slot::getCompanionQuorumSetHashFromStatement( + envelope.statement); + auto qSet = mApp.getHerder().getQSet(qSetHash); + if (qSet != nullptr) + { + processSCPQueue(); + } + } } else if (status == Herder::ENVELOPE_STATUS_PROCESSED) { @@ -1220,6 +1231,12 @@ HerderImpl::getMaxSeqInPendingTxs(AccountID const& acc) return mTransactionQueue.getAccountTransactionQueueInfo(acc).mMaxSeq; } +uint64 +HerderImpl::getLastSeenSlotIndexForTxSet(Hash const& hash) +{ + return mPendingEnvelopes.getLastSeenSlotIndexForTxSet(hash); +} + uint32_t HerderImpl::getMostRecentCheckpointSeq() { diff --git a/src/herder/HerderImpl.h b/src/herder/HerderImpl.h index 6a9aea5f94..08ae795a3b 100644 --- a/src/herder/HerderImpl.h +++ b/src/herder/HerderImpl.h @@ -150,6 +150,7 @@ class HerderImpl : public Herder SCPStatement const& newSt) override; SequenceNumber getMaxSeqInPendingTxs(AccountID const&) override; + uint64 getLastSeenSlotIndexForTxSet(Hash const& hash) override; uint32_t getMostRecentCheckpointSeq() override; diff --git a/src/herder/HerderSCPDriver.cpp b/src/herder/HerderSCPDriver.cpp index 81dff31f4a..84f7a185c1 100644 --- a/src/herder/HerderSCPDriver.cpp +++ b/src/herder/HerderSCPDriver.cpp @@ -137,10 +137,9 @@ class SCPHerderEnvelopeWrapper : public SCPEnvelopeWrapper } else { - throw std::runtime_error(fmt::format( - FMT_STRING("SCPHerderEnvelopeWrapper: Wrapping an unknown " - "tx set {} from envelope"), - hexAbbrev(txSetH))); + CLOG_TRACE(Herder, + "unknown tx set {}, we might still able to vote", + hexAbbrev(txSetH)); } } } @@ -314,10 +313,10 @@ HerderSCPDriver::validateValueHelper(uint64_t slotIndex, StellarValue const& b, if (!txSet) { - CLOG_ERROR(Herder, "validateValue i:{} unknown txSet {}", slotIndex, + CLOG_TRACE(Herder, "validateValue i:{} unknown txSet {}", slotIndex, hexAbbrev(txSetHash)); - - res = SCPDriver::kInvalidValue; + res = + nomination ? SCPDriver::kVoteToNominate : SCPDriver::kInvalidValue; } else if (!checkAndCacheTxSetValid(txSet, closeTimeOffset)) { @@ -439,7 +438,7 @@ HerderSCPDriver::extractValidValue(uint64_t slotIndex, Value const& value) return res; } -// value marshaling +// value marshalling std::string HerderSCPDriver::toShortString(NodeID const& pk) const @@ -1178,10 +1177,8 @@ class SCPHerderValueWrapper : public ValueWrapper mTxSet = mHerder.getTxSet(sv.txSetHash); if (!mTxSet) { - throw std::runtime_error(fmt::format( - FMT_STRING( - "SCPHerderValueWrapper tried to bind an unknown tx set {}"), - hexAbbrev(sv.txSetHash))); + CLOG_TRACE(Herder, "unknown tx set {}, we might vote to nominate", + hexAbbrev(sv.txSetHash)); } } }; diff --git a/src/herder/PendingEnvelopes.cpp b/src/herder/PendingEnvelopes.cpp index 65efd75c54..63cffb3c3c 100644 --- a/src/herder/PendingEnvelopes.cpp +++ b/src/herder/PendingEnvelopes.cpp @@ -234,13 +234,21 @@ PendingEnvelopes::addTxSet(Hash const& hash, uint64 lastSeenSlotIndex, mTxSetFetcher.recv(hash, mFetchTxSetTimer); } +uint64 + +PendingEnvelopes::getLastSeenSlotIndexForTxSet(Hash const& hash) +{ + auto lastSeenSlotIndex = mTxSetFetcher.getLastSeenSlotIndex(hash); + return lastSeenSlotIndex; +} + bool PendingEnvelopes::recvTxSet(Hash const& hash, TxSetFrameConstPtr txset) { ZoneScoped; CLOG_TRACE(Herder, "Got TxSet {}", hexAbbrev(hash)); - auto lastSeenSlotIndex = mTxSetFetcher.getLastSeenSlotIndex(hash); + auto lastSeenSlotIndex = getLastSeenSlotIndexForTxSet(hash); if (lastSeenSlotIndex == 0) { return false; @@ -361,9 +369,26 @@ PendingEnvelopes::recvSCPEnvelope(SCPEnvelope const& envelope) } else { - // else just keep waiting for it to come in - // and refresh fetchers as needed - startFetch(envelope); + if (envelope.statement.pledges.type() == SCP_ST_NOMINATE) + { + auto qSetH = Slot::getCompanionQuorumSetHashFromStatement( + envelope.statement); + auto mQSet = mApp.getHerder().getQSet(qSetH); + if (mQSet != nullptr) + { + // The qset has been fetched. + // Not all tx sets have been fetched, + // but we should process this nomination statement + // in case we can vote for some/all of them. + envelopeReady(envelope); + } + } + else + { + // else just keep waiting for it to come in + // and refresh fetchers as needed + startFetch(envelope); + } } return Herder::ENVELOPE_STATUS_FETCHING; diff --git a/src/herder/PendingEnvelopes.h b/src/herder/PendingEnvelopes.h index 83a40a083a..94f6311980 100644 --- a/src/herder/PendingEnvelopes.h +++ b/src/herder/PendingEnvelopes.h @@ -164,6 +164,8 @@ class PendingEnvelopes TxSetFrameConstPtr putTxSet(Hash const& hash, uint64 slot, TxSetFrameConstPtr txset); + uint64 getLastSeenSlotIndexForTxSet(Hash const& hash); + /** * Check if @p txset identified by @p hash was requested before from peers. * If not, ignores that @p txset. If it was requested, calls diff --git a/src/herder/TxSetFrame.h b/src/herder/TxSetFrame.h index f09d7a6d8f..ac77b77ebb 100644 --- a/src/herder/TxSetFrame.h +++ b/src/herder/TxSetFrame.h @@ -159,6 +159,7 @@ class TxSetFrame : public NonMovableOrCopyable // Test helper that only checks the XDR structure validitiy without // validating internal transactions. virtual bool checkValidStructure() const; + void computeContentsHashForTesting(); static TxSetFrameConstPtr makeFromTransactions(Transactions txs, Application& app, uint64_t lowerBoundCloseTimeOffset, diff --git a/src/herder/test/HerderTests.cpp b/src/herder/test/HerderTests.cpp index 187f35e03f..c3006d671a 100644 --- a/src/herder/test/HerderTests.cpp +++ b/src/herder/test/HerderTests.cpp @@ -4346,6 +4346,7 @@ externalize(SecretKey const& sk, LedgerManager& lm, HerderImpl& herder, txsPhases.emplace_back(sorobanTxs); auto txSet = TxSetFrame::makeFromTransactions(txsPhases, app, 0, 0); + herder.getPendingEnvelopes().putTxSet(txSet->getContentsHash(), ledgerSeq, txSet); @@ -5339,3 +5340,159 @@ TEST_CASE("exclude transactions by operation type", "[herder]") TransactionQueue::AddResult::ADD_STATUS_PENDING); } } + +TEST_CASE("delay sending DONT_HAVE", "[herder]") +{ + VirtualClock clock; + auto const numNodes = 2; + std::vector> apps; + std::chrono::milliseconds const epsilon{1}; + auto s = SecretKey::pseudoRandomForTesting(); + + for (auto i = 0; i < numNodes; i++) + { + Config cfg = getTestConfig(i); + cfg.FLOOD_DEMAND_BACKOFF_DELAY_MS = std::chrono::milliseconds(200); + cfg.FLOOD_DEMAND_PERIOD_MS = std::chrono::milliseconds(200); + // Using a small tq set size such as 50 may lead to an unexpectedly + // small advert/demand size limit. + cfg.MANUAL_CLOSE = false; + cfg.TESTING_UPGRADE_MAX_TX_SET_SIZE = 1000; + cfg.SEND_DONT_HAVE_DELAY = std::chrono::milliseconds{300}; + // Setting validators + cfg.QUORUM_SET.validators.emplace_back(s.getPublicKey()); + // cfg.LEDGER_PROTOCOL_VERSION + apps.push_back(createTestApplication(clock, cfg)); + } + + auto connection = + std::make_shared(*apps[0], *apps[1]); + testutil::crankFor(clock, std::chrono::seconds(5)); + REQUIRE(connection->getInitiator()->isAuthenticated()); + REQUIRE(connection->getAcceptor()->isAuthenticated()); + + // TODO: maybe replace these lambdas with static definitions? + auto createTxn = [](auto n) { + StellarMessage txn; + txn.type(TRANSACTION); + Memo memo(MEMO_TEXT); + memo.text() = "tx" + std::to_string(n); + txn.transaction().v0().tx.memo = memo; + + return std::make_shared(txn); + }; + + auto createGetTxSetMessage = [](uint256 const& setID) { + StellarMessage getTxSet; + getTxSet.type(GET_TX_SET); + getTxSet.txSetHash() = setID; + + return std::make_shared(getTxSet); + }; + + auto createTxSetMessage = [](TxSetFrameConstPtr txSet) { + StellarMessage msg; + msg.type(GENERALIZED_TX_SET); + txSet->toXDR(msg.generalizedTxSet()); + return std::make_shared(msg); + }; + + using TxPair = std::pair; + using SVUpgrades = decltype(StellarValue::upgrades); + auto root = TestAccount::createRoot(*apps[0]); + + auto makeTxUpgradePair = [&](TxSetFrameConstPtr txSet, uint64_t closeTime, + SVUpgrades const& upgrades) { + StellarValue sv = apps[0]->getHerder().makeStellarValue( + txSet->getContentsHash(), closeTime, upgrades, root.getSecretKey()); + auto v = xdr::xdr_to_opaque(sv); + return TxPair{v, txSet}; + }; + + auto makeTxPair = [&](TxSetFrameConstPtr txSet, uint64_t closeTime) { + return makeTxUpgradePair(txSet, closeTime, emptyUpgradeSteps); + }; + + auto makeNominationEnvelope = [&s, &apps](TxPair const& p, Hash qSetHash, + uint64_t slotIndex) { + // herder must want the TxSet before receiving it, so we are sending it + // a fake envelope + auto envelope = SCPEnvelope{}; + envelope.statement.slotIndex = slotIndex; + envelope.statement.pledges.type(SCP_ST_NOMINATE); + envelope.statement.pledges.nominate().votes.push_back(p.first); + envelope.statement.pledges.nominate().quorumSetHash = qSetHash; + envelope.statement.nodeID = s.getPublicKey(); + static_cast(apps[0]->getHerder()) + .signEnvelope(s, envelope); + return envelope; + }; + + // Create txn set. + auto tx = createTxn(1); + std::vector txs = { + TransactionFrameBase::makeTransactionFromWire(apps[0]->getNetworkID(), + tx->transaction()), + }; + auto txnSetFrame = TxSetFrame::makeFromTransactions(txs, *apps[0], 0, 0); + + // Sending get tx set message. + auto txSetHash = txnSetFrame->getContentsHash(); + auto getTxSet = createGetTxSetMessage(txSetHash); + connection->getAcceptor()->sendMessage(getTxSet, false); + + REQUIRE(!apps[0]->getHerder().getTxSet(txSetHash)); + REQUIRE(!apps[1]->getHerder().getTxSet(txSetHash)); + + // Should not add to pending getTxSet requests while we wait before + // retrying. + testutil::crankFor(clock, epsilon); + + auto closedTime = apps[0] + ->getLedgerManager() + .getLastClosedLedgerHeader() + .header.scpValue.closeTime + + 1; + + auto p = makeTxPair(txnSetFrame, closedTime); + + auto recvTxPairEnvelope = [&apps, makeNominationEnvelope](TxPair p) { + auto envelope = makeNominationEnvelope( + p, {}, apps[0]->getHerder().trackingConsensusLedgerIndex() + 1); + + REQUIRE(apps[0]->getHerder().recvSCPEnvelope(envelope) == + Herder::ENVELOPE_STATUS_FETCHING); + REQUIRE(apps[1]->getHerder().recvSCPEnvelope(envelope) == + Herder::ENVELOPE_STATUS_FETCHING); + }; + + SECTION("tx set not received before timeout.") + { + recvTxPairEnvelope(p); + testutil::crankFor(clock, std::chrono::milliseconds{300}); + + // Receives tx set. + auto txSetMsg = createTxSetMessage(txnSetFrame); + connection->getInitiator()->recvMessage(*txSetMsg); + + testutil::crankFor(clock, std::chrono::seconds{1}); + // Check peer does not have the txn set hash. + REQUIRE(apps[0]->getHerder().getTxSet(txSetHash)); + REQUIRE(!apps[1]->getHerder().getTxSet(txSetHash)); + } + + SECTION("tx set received before timeout.") + { + recvTxPairEnvelope(p); + // Receives tx set. + auto txSetMsg = createTxSetMessage(txnSetFrame); + connection->getInitiator()->recvMessage(*txSetMsg); + + // Pending getTxSet requests are empty since node already has the tx + // set. + testutil::crankFor(clock, std::chrono::seconds{1}); + // Check peer has the txn set hash. + REQUIRE(apps[0]->getHerder().getTxSet(txSetHash)); + REQUIRE(apps[1]->getHerder().getTxSet(txSetHash)); + } +} \ No newline at end of file diff --git a/src/main/Config.cpp b/src/main/Config.cpp index 1787e7ef32..4675d44e75 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -156,6 +156,7 @@ Config::Config() : NODE_SEED(SecretKey::random()) AUTOMATIC_MAINTENANCE_COUNT = 400; // automatic self-check happens once every 3 hours AUTOMATIC_SELF_CHECK_PERIOD = std::chrono::seconds{3 * 60 * 60}; + SEND_DONT_HAVE_DELAY = std::chrono::milliseconds{200}; ARTIFICIALLY_GENERATE_LOAD_FOR_TESTING = false; ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING = false; ARTIFICIALLY_SET_CLOSE_TIME_FOR_TESTING = 0; @@ -224,6 +225,8 @@ Config::Config() : NODE_SEED(SecretKey::random()) FLOOD_ADVERT_PERIOD_MS = std::chrono::milliseconds(100); FLOOD_DEMAND_BACKOFF_DELAY_MS = std::chrono::milliseconds(500); + TX_SET_BACKOFF_DELAY_MS = std::chrono::milliseconds(1500); + MAX_BATCH_WRITE_COUNT = 1024; MAX_BATCH_WRITE_BYTES = 1 * 1024 * 1024; PREFERRED_PEERS_ONLY = false; @@ -1142,6 +1145,11 @@ Config::processConfig(std::shared_ptr t) AUTOMATIC_SELF_CHECK_PERIOD = std::chrono::seconds{readInt(item)}; } + else if (item.first == "SEND_DONT_HAVE_DELAY") + { + SEND_DONT_HAVE_DELAY = + std::chrono::seconds{readInt(item)}; + } else if (item.first == "MANUAL_CLOSE") { MANUAL_CLOSE = readBool(item); @@ -1272,6 +1280,11 @@ Config::processConfig(std::shared_ptr t) "bad value for FLOOD_ARB_TX_DAMPING_FACTOR"); } } + else if (item.first == "TX_SET_BACKOFF_DELAY_MS") + { + TX_SET_BACKOFF_DELAY_MS = + std::chrono::milliseconds(readInt(item, 1)); + } else if (item.first == "PREFERRED_PEERS") { PREFERRED_PEERS = readArray(item); diff --git a/src/main/Config.h b/src/main/Config.h index b651527be3..dfa4cfe0df 100644 --- a/src/main/Config.h +++ b/src/main/Config.h @@ -161,10 +161,14 @@ class Config : public std::enable_shared_from_this // Interval between automatic invocations of self-check. std::chrono::seconds AUTOMATIC_SELF_CHECK_PERIOD; + // Interwal for a peer to wait before sending DONT_HAVE for a tx set + // request. + std::chrono::milliseconds SEND_DONT_HAVE_DELAY; + // A config parameter that enables synthetic load generation on demand, - // using the `generateload` runtime command (see CommandHandler.cpp). This - // option only exists for stress-testing and should not be enabled in - // production networks. + // using the `generateload` runtime command (see CommandHandler.cpp). + // This option only exists for stress-testing and should not be enabled + // in production networks. bool ARTIFICIALLY_GENERATE_LOAD_FOR_TESTING; // A config parameter that reduces ledger close time to 1s and checkpoint @@ -501,6 +505,8 @@ class Config : public std::enable_shared_from_this static constexpr size_t const POSSIBLY_PREFERRED_EXTRA = 2; static constexpr size_t const REALLY_DEAD_NUM_FAILURES_CUTOFF = 120; + std::chrono::milliseconds TX_SET_BACKOFF_DELAY_MS; + // survey config std::set SURVEYOR_KEYS; diff --git a/src/overlay/OverlayManager.h b/src/overlay/OverlayManager.h index 4278318fa3..d611cb9f2d 100644 --- a/src/overlay/OverlayManager.h +++ b/src/overlay/OverlayManager.h @@ -6,6 +6,7 @@ #include "overlay/Peer.h" #include "overlay/StellarXDR.h" +#include /** * OverlayManager maintains a virtual broadcast network, consisting of a set of @@ -60,6 +61,9 @@ class OverlayManager uint32_t mBatchSize; }; + using MapPendingGetTxSetRequestsPerSlot = + std::unordered_map>; + static int constexpr MIN_INBOUND_FACTOR = 3; static std::unique_ptr create(Application& app); @@ -168,6 +172,7 @@ class OverlayManager // Return the current in-memory set of authenticated peers. virtual std::map getAuthenticatedPeers() const = 0; + virtual Peer::pointer getAuthenticatedPeer(NodeID id) const = 0; // Return number of authenticated peers virtual int getAuthenticatedPeersCount() const = 0; @@ -204,6 +209,10 @@ class OverlayManager virtual size_t getMaxAdvertSize() const = 0; + virtual void purgePendingGetTxSetRequestsBelow(uint64 ledgerSeq) = 0; + virtual std::map& + getPendingGetTxSetRequests() = 0; + virtual ~OverlayManager() { } diff --git a/src/overlay/OverlayManagerImpl.cpp b/src/overlay/OverlayManagerImpl.cpp index b6eb30f733..86afd5f53a 100644 --- a/src/overlay/OverlayManagerImpl.cpp +++ b/src/overlay/OverlayManagerImpl.cpp @@ -285,6 +285,7 @@ OverlayManagerImpl::OverlayManagerImpl(Application& app) , mDemandTimer(app) , mResolvingPeersWithBackoff(true) , mResolvingPeersRetryCount(0) + , mPendingGetTxSetRequests{} { mPeerSources[PeerType::INBOUND] = std::make_unique( mPeerManager, RandomPeerSource::nextAttemptCutoff(PeerType::INBOUND)); @@ -984,6 +985,20 @@ OverlayManagerImpl::getAuthenticatedPeers() const return result; } +Peer::pointer +OverlayManagerImpl::getAuthenticatedPeer(NodeID id) const +{ + auto const& peers = getAuthenticatedPeers(); + if (auto it = peers.find(id); it != peers.end()) + { + return it->second; + } + else + { + return nullptr; + } +} + std::shared_ptr OverlayManagerImpl::getLiveInboundPeersCounter() const { @@ -1294,6 +1309,31 @@ OverlayManagerImpl::getMaxAdvertSize() const return res; } +void +OverlayManagerImpl::purgePendingGetTxSetRequestsBelow(uint64 ledgerSeq) +{ + auto itr = mPendingGetTxSetRequests.begin(); + + while (itr != mPendingGetTxSetRequests.end()) + { + auto slotIndex = itr->first; + if (slotIndex < ledgerSeq) + { + itr = mPendingGetTxSetRequests.erase(itr); + } + else + { + break; + } + } +} + +std::map& +OverlayManagerImpl::getPendingGetTxSetRequests() +{ + return mPendingGetTxSetRequests; +} + size_t OverlayManagerImpl::getMaxDemandSize() const { diff --git a/src/overlay/OverlayManagerImpl.h b/src/overlay/OverlayManagerImpl.h index 058db03a11..8651cecc26 100644 --- a/src/overlay/OverlayManagerImpl.h +++ b/src/overlay/OverlayManagerImpl.h @@ -164,6 +164,7 @@ class OverlayManagerImpl : public OverlayManager std::map const& getOutboundAuthenticatedPeers() const override; std::map getAuthenticatedPeers() const override; + virtual Peer::pointer getAuthenticatedPeer(NodeID id) const override; int getAuthenticatedPeersCount() const override; // returns nullptr if the passed peer isn't found @@ -194,6 +195,10 @@ class OverlayManagerImpl : public OverlayManager std::shared_ptr peer) override; size_t getMaxAdvertSize() const override; + void purgePendingGetTxSetRequestsBelow(uint64 ledgerSeq) override; + std::map& + getPendingGetTxSetRequests() override; + private: struct ResolvedPeers { @@ -206,6 +211,8 @@ class OverlayManagerImpl : public OverlayManager std::future mResolvedPeers; bool mResolvingPeersWithBackoff; int mResolvingPeersRetryCount; + std::map + mPendingGetTxSetRequests; void triggerPeerResolution(); std::pair, bool> diff --git a/src/overlay/Peer.cpp b/src/overlay/Peer.cpp index 953cdf94f5..8ba07dc81e 100644 --- a/src/overlay/Peer.cpp +++ b/src/overlay/Peer.cpp @@ -67,6 +67,7 @@ Peer::Peer(Application& app, PeerRole role) , mEnqueueTimeOfLastWrite(app.getClock().now()) , mPeerMetrics(app.getClock().now()) , mTxAdvertQueue(app) + , mTxSetRequestTimer(app) , mAdvertTimer(app) , mAdvertHistory(ADVERT_CACHE_SIZE) { @@ -377,6 +378,7 @@ Peer::shutdown() mShuttingDown = true; mRecurringTimer.cancel(); mAdvertTimer.cancel(); + mTxSetRequestTimer.cancel(); mDelayedExecutionTimer.cancel(); } @@ -410,6 +412,73 @@ Peer::sendDontHave(MessageType type, uint256 const& itemID) sendMessage(msgPtr); } +void +Peer::sendDontHave(StellarMessage const& msg) +{ + CLOG_TRACE(Overlay, "Peer::recvGetTxSet {} sending DONT_HAVE for {}", + toString(), hexAbbrev(msg.txSetHash())); + // Technically we don't exactly know what is the kind of the tx set + // missing, however both TX_SET and GENERALIZED_TX_SET get the same + // treatment when missing, so it should be ok to maybe send the + // incorrect version during the upgrade. + auto messageType = protocolVersionIsBefore(mApp.getLedgerManager() + .getLastClosedLedgerHeader() + .header.ledgerVersion, + SOROBAN_PROTOCOL_VERSION) + ? TX_SET + : GENERALIZED_TX_SET; + // If peer is not aware of generalized tx sets and we don't have the + // requested hash, then it probably requests an old-style tx set we + // don't have. Another option is that the peer is in incorrect + // state, but it's also ok to say we don't have the requested + // old-style tx set. + if (messageType == GENERALIZED_TX_SET && + mRemoteOverlayVersion < + Peer::FIRST_VERSION_SUPPORTING_GENERALIZED_TX_SET) + { + sendDontHave(TX_SET, msg.txSetHash()); + return; + } + sendDontHave(messageType, msg.txSetHash()); +} + +void +Peer::maybeSendDontHaveAfterDelay(StellarMessage const& msg) +{ + if (auto txSet = mApp.getHerder().getTxSet(msg.txSetHash())) + { + sendTxSet(txSet); + return; + } + + auto slotIndex = + mApp.getHerder().getLastSeenSlotIndexForTxSet(msg.txSetHash()); + if (slotIndex != 0) + { + // Remove the local node from the pending getTxSet requests. We will + // not send the same tx set back to peer if we receive this tx set + // in the future and will send DONT_HAVE. + auto& pendingGetTxSetRequestsForSlot = + mApp.getOverlayManager().getPendingGetTxSetRequests(); + + auto it = pendingGetTxSetRequestsForSlot.find(slotIndex); + if (it == pendingGetTxSetRequestsForSlot.end()) + { + return; + } + auto& peersWaitingForTx = it->second; + auto peersMapItr = peersWaitingForTx.find(msg.txSetHash()); + if (peersMapItr == peersWaitingForTx.end()) + { + return; + } + auto& peers = peersMapItr->second; + peers.erase(mPeerID); + } + + sendDontHave(msg); +} + void Peer::sendSCPQuorumSet(SCPQuorumSetPtr qSet) { @@ -1077,79 +1146,161 @@ Peer::recvDontHave(StellarMessage const& msg) shared_from_this()); } +void +Peer::sendTxSet(TxSetFrameConstPtr txSet) +{ + StellarMessage newMsg; + if (txSet->isGeneralizedTxSet()) + { + if (mRemoteOverlayVersion < + Peer::FIRST_VERSION_SUPPORTING_GENERALIZED_TX_SET) + { + // The peer wouldn't be able to accept the generalized tx set, + // but it wouldn't be correct to say we don't have it. So we + // just let the request to timeout. + return; + } + newMsg.type(GENERALIZED_TX_SET); + txSet->toXDR(newMsg.generalizedTxSet()); + } + else + { + newMsg.type(TX_SET); + txSet->toXDR(newMsg.txSet()); + } + + auto newMsgPtr = std::make_shared(newMsg); + sendMessage(newMsgPtr); +} + +// ┌────────────────────────┐ +// │ │ +// │Adds txn set to ledger. │ +// ┏━━━━━━━━━━━━━━━━━┓ │ If there are pending │ +// ┃ ┃ │ getTxSet requests for │ +// ┃ ┃Local node │ this hash, send tx set │ +// ┃Txn set fetching ┃─receives ──▶│ to the waiting peers. │ +// ┃ ┃ new txn │ Clear pending getTxSet │ +// ┃ ┃ │ requests for the hash. │ +// ┗━━━━━━━━━━━━━━━━━┛ │ │ +// │ │ │ +// ▼ └────────────────────────┘ +// ┌────────────────┐ +// │Local node recvs│ +// │ GetTxnSet │ +// │ request. │ +// └────────────────┘ +// │ +// ▼ ┌────────────────────┐ +// Λ Y │Send txn set back to│ +// Node has ▕ ▏────────────────────────▶│ peer. │ +// txn set? V │ │ +// │ └────────────────────┘ +// N│ ▲ +// │ │ +// ▼ │ +// ┌────────────────┐ │ +// │Add remote peer │ │ +// │ to pending │ │ +// │ getTxSet │ │ +// │requests for tx │ │ +// └────────────────┘ │ +// │ │ +// │ │ +// Node has ▼ ┌──────────────────────┐ +// received Λ │Clear pending request │ +// tx set ▕ ▏──────Y────────────────▶│ for peer. │ +// when timer V │ │ +// fires up. │ └──────────────────────┘ +// N│ +// ▼ +// ┌───────────────────────────┐ +// │ │ +// │ Send DONT_HAVE. Clear │ +// │ pending request for peer. │ +// │ │ +// └───────────────────────────┘ +// Pending getTxSet requests are stored by their tx set hash and slotIndex. For +// each request, we store the node IDs of peers waiting for the tx set. Entries +// for stale slot indices are garbage collected as the Herder externalizes +// ledgers. void Peer::recvGetTxSet(StellarMessage const& msg) { ZoneScoped; + auto self = shared_from_this(); if (auto txSet = mApp.getHerder().getTxSet(msg.txSetHash())) { - StellarMessage newMsg; - if (txSet->isGeneralizedTxSet()) - { - if (mRemoteOverlayVersion < - Peer::FIRST_VERSION_SUPPORTING_GENERALIZED_TX_SET) - { - // The peer wouldn't be able to accept the generalized tx - // set, but it wouldn't be correct to say we don't have it. - // So we just let the request to timeout. - return; - } - newMsg.type(GENERALIZED_TX_SET); - txSet->toXDR(newMsg.generalizedTxSet()); - } - else - { - newMsg.type(TX_SET); - txSet->toXDR(newMsg.txSet()); - } - - auto newMsgPtr = std::make_shared(newMsg); - self->sendMessage(newMsgPtr); + sendTxSet(txSet); } else { - // Technically we don't exactly know what is the kind of the tx set - // missing, however both TX_SET and GENERALIZED_TX_SET get the same - // treatment when missing, so it should be ok to maybe send the - // incorrect version during the upgrade. - auto messageType = - protocolVersionIsBefore(mApp.getLedgerManager() - .getLastClosedLedgerHeader() - .header.ledgerVersion, - SOROBAN_PROTOCOL_VERSION) - ? TX_SET - : GENERALIZED_TX_SET; - // If peer is not aware of generalized tx sets and we don't have the - // requested hash, then it probably requests an old-style tx set we - // don't have. Another option is that the peer is in incorrect - // state, but it's also ok to say we don't have the requested - // old-style tx set. - if (messageType == GENERALIZED_TX_SET && - mRemoteOverlayVersion < - Peer::FIRST_VERSION_SUPPORTING_GENERALIZED_TX_SET) + // Register pending getTxSet request. + auto slotIndex = + mApp.getHerder().getLastSeenSlotIndexForTxSet(msg.txSetHash()); + if (slotIndex != 0) { - sendDontHave(TX_SET, msg.txSetHash()); + // If we currently do not have the tx set but are tracking it. + auto& pendingGetTxSetRequestsForSlot = + mApp.getOverlayManager() + .getPendingGetTxSetRequests()[slotIndex]; + pendingGetTxSetRequestsForSlot[msg.txSetHash()].insert(mPeerID); + + mTxSetRequestTimer.expires_from_now( + mApp.getConfig().SEND_DONT_HAVE_DELAY); + mTxSetRequestTimer.async_wait( + [this, msg] { this->maybeSendDontHaveAfterDelay(msg); }, + &VirtualTimer::onFailureNoop); return; } - sendDontHave(messageType, msg.txSetHash()); + sendDontHave(msg); } } void Peer::recvTxSet(StellarMessage const& msg) { - ZoneScoped; auto frame = TxSetFrame::makeFromWire(mApp, msg.txSet()); - mApp.getHerder().recvTxSet(frame->getContentsHash(), frame); + recvTxSetHelper(msg, frame); } void Peer::recvGeneralizedTxSet(StellarMessage const& msg) { - ZoneScoped; auto frame = TxSetFrame::makeFromWire(mApp, msg.generalizedTxSet()); - mApp.getHerder().recvTxSet(frame->getContentsHash(), frame); + recvTxSetHelper(msg, frame); +} + +// Helper method for receive a frame that contains either a tx set (legacy) or a +// generalized tx set. +void +Peer::recvTxSetHelper(StellarMessage const& msg, TxSetFrameConstPtr txSetFrame) +{ + ZoneScoped; + mApp.getHerder().recvTxSet(txSetFrame->getContentsHash(), txSetFrame); + auto& pendingTxSetRequests = + mApp.getOverlayManager().getPendingGetTxSetRequests(); + + for (auto& reqPair : pendingTxSetRequests) + { + auto& pendingTxSetRequestsForSlot = reqPair.second; + for (auto nodeID : + pendingTxSetRequestsForSlot[txSetFrame->getContentsHash()]) + { + mApp.postOnMainThread( + [nodeID, txSetFrame, this]() { + auto peer = + mApp.getOverlayManager().getAuthenticatedPeer(nodeID); + if (peer) + { + peer->sendTxSet(txSetFrame); + } + }, + "Sending tx set to peer"); + } + pendingTxSetRequestsForSlot.erase(txSetFrame->getContentsHash()); + } } void @@ -1158,6 +1309,7 @@ Peer::recvTransaction(StellarMessage const& msg) ZoneScoped; auto transaction = TransactionFrameBase::makeTransactionFromWire( mApp.getNetworkID(), msg.transaction()); + if (transaction) { // record that this peer sent us this transaction diff --git a/src/overlay/Peer.h b/src/overlay/Peer.h index 9d7607f363..04d799bdcb 100644 --- a/src/overlay/Peer.h +++ b/src/overlay/Peer.h @@ -27,6 +27,8 @@ static auto const MAX_MESSAGE_SIZE = 0x1000000; static const uint32_t MAX_CLASSIC_TX_SIZE_BYTES = 100 * 1024; class Application; +class TxSetFrame; +using TxSetFrameConstPtr = std::shared_ptr; class LoopbackPeer; struct OverlayMetrics; class FlowControl; @@ -211,6 +213,7 @@ class Peer : public std::enable_shared_from_this, VirtualTimer mRecurringTimer; VirtualTimer mDelayedExecutionTimer; + VirtualTimer mTxSetRequestTimer; VirtualClock::time_point mLastRead; VirtualClock::time_point mLastWrite; @@ -244,9 +247,13 @@ class Peer : public std::enable_shared_from_this, void recvSurveyResponseMessage(StellarMessage const& msg); void recvSendMore(StellarMessage const& msg); + void sendTxSet(std::shared_ptr txSet); + void recvGetTxSet(StellarMessage const& msg); void recvTxSet(StellarMessage const& msg); void recvGeneralizedTxSet(StellarMessage const& msg); + void recvTxSetHelper(StellarMessage const& msg, + TxSetFrameConstPtr txSetFrame); void recvTransaction(StellarMessage const& msg); void recvGetSCPQuorumSet(StellarMessage const& msg); void recvSCPQuorumSet(StellarMessage const& msg); @@ -259,6 +266,8 @@ class Peer : public std::enable_shared_from_this, void sendAuth(); void sendSCPQuorumSet(SCPQuorumSetPtr qSet); void sendDontHave(MessageType type, uint256 const& itemID); + void sendDontHave(StellarMessage const& msg); + void maybeSendDontHaveAfterDelay(StellarMessage const& msg); void sendPeers(); void sendError(ErrorCode error, std::string const& message); diff --git a/src/overlay/Tracker.cpp b/src/overlay/Tracker.cpp index 7cad0e2268..b1195bf353 100644 --- a/src/overlay/Tracker.cpp +++ b/src/overlay/Tracker.cpp @@ -21,7 +21,11 @@ namespace stellar { -std::chrono::milliseconds const Tracker::MS_TO_WAIT_FOR_FETCH_REPLY{1500}; +#ifdef BUILD_TESTS +std::optional + Tracker::mMillisecondsToWaitForFetchReplayForTesting = std::nullopt; +#endif + int const Tracker::MAX_REBUILD_FETCH_LIST = 10; Tracker::Tracker(Application& app, Hash const& hash, AskPeer& askPeer) @@ -42,6 +46,19 @@ Tracker::~Tracker() cancel(); } +std::chrono::milliseconds const +Tracker::getMillisecondsToWaitForFetchReplay() +{ +#ifdef BUILD_TESTS + if (mMillisecondsToWaitForFetchReplayForTesting) + { + return *mMillisecondsToWaitForFetchReplayForTesting; + } +#endif + + return MS_TO_WAIT_FOR_FETCH_REPLY; +} + SCPEnvelope Tracker::pop() { @@ -93,6 +110,7 @@ void Tracker::tryNextPeer() { ZoneScoped; + // will be called by some timer or when we get a // response saying they don't have it CLOG_TRACE(Overlay, "tryNextPeer {} last: {}", hexAbbrev(mItemHash), @@ -269,4 +287,14 @@ Tracker::getDuration() { return mFetchTime.checkElapsedTime(); } + +#ifdef BUILD_TESTS +void +Tracker::setMillisecondsToWaitForFetchReplayForTesting( + std::chrono::milliseconds duration) +{ + mMillisecondsToWaitForFetchReplayForTesting = duration; +} +#endif + } diff --git a/src/overlay/Tracker.h b/src/overlay/Tracker.h index 8464b1eb40..642784eb45 100644 --- a/src/overlay/Tracker.h +++ b/src/overlay/Tracker.h @@ -53,9 +53,15 @@ class Tracker medida::Meter& mTryNextPeer; uint64 mLastSeenSlotIndex{0}; LogSlowExecution mFetchTime; + static constexpr std::chrono::milliseconds MS_TO_WAIT_FOR_FETCH_REPLY = + std::chrono::milliseconds{1500}; + +#ifdef BUILD_TESTS + static std::optional + mMillisecondsToWaitForFetchReplayForTesting; +#endif public: - static std::chrono::milliseconds const MS_TO_WAIT_FOR_FETCH_REPLY; static int const MAX_REBUILD_FETCH_LIST; /** * Create Tracker that tracks data identified by @p hash. @p askPeer @@ -64,6 +70,9 @@ class Tracker explicit Tracker(Application& app, Hash const& hash, AskPeer& askPeer); virtual ~Tracker(); + static std::chrono::milliseconds const + getMillisecondsToWaitForFetchReplay(); + /** * Return true if does not wait for any envelope. */ @@ -162,6 +171,9 @@ class Tracker { return mLastAskedPeer; } + + static void setMillisecondsToWaitForFetchReplayForTesting( + std::chrono::milliseconds duration); #endif }; } diff --git a/src/overlay/test/ItemFetcherTests.cpp b/src/overlay/test/ItemFetcherTests.cpp index 455f59c69f..d26b9999da 100644 --- a/src/overlay/test/ItemFetcherTests.cpp +++ b/src/overlay/test/ItemFetcherTests.cpp @@ -268,7 +268,7 @@ TEST_CASE("ItemFetcher fetches", "[overlay][ItemFetcher]") } }; - crankFor(Tracker::MS_TO_WAIT_FOR_FETCH_REPLY * 2); + crankFor(Tracker::getMillisecondsToWaitForFetchReplay() * 2); REQUIRE(asked.size() == 2); @@ -338,15 +338,16 @@ TEST_CASE("ItemFetcher fetches", "[overlay][ItemFetcher]") } else { - REQUIRE(delta >= - Tracker::MS_TO_WAIT_FOR_FETCH_REPLY); + REQUIRE( + delta >= + Tracker::getMillisecondsToWaitForFetchReplay()); } if (i > 0) { auto deltaGroup = refTP - prevGroup; // gap between groups depend on number of retries auto nextTry = - Tracker::MS_TO_WAIT_FOR_FETCH_REPLY * + Tracker::getMillisecondsToWaitForFetchReplay() * std::min(Tracker::MAX_REBUILD_FETCH_LIST, (static_cast(i - 1)) / 2); REQUIRE(deltaGroup >= nextTry); diff --git a/src/overlay/test/LoopbackPeer.cpp b/src/overlay/test/LoopbackPeer.cpp index 9ecd7cbd61..8dc09ac2c8 100644 --- a/src/overlay/test/LoopbackPeer.cpp +++ b/src/overlay/test/LoopbackPeer.cpp @@ -110,6 +110,18 @@ LoopbackPeer::sendMessage(xdr::msg_ptr&& msg) std::copy(bytes.begin(), bytes.end(), mRecvMacKey.key.begin()); } + if (mSuspendTxFlooding) + { + AuthenticatedMessage am; + { + xdr::xdr_from_msg(msg, am); + } + if (am.v0().message.type() == TRANSACTION) + { + return; + } + } + TimestampedMessage tsm; tsm.mMessage = std::move(msg); tsm.mEnqueuedTime = mApp.getClock().now(); diff --git a/src/overlay/test/LoopbackPeer.h b/src/overlay/test/LoopbackPeer.h index ebc7cec80a..014158f7f4 100644 --- a/src/overlay/test/LoopbackPeer.h +++ b/src/overlay/test/LoopbackPeer.h @@ -35,6 +35,15 @@ class LoopbackPeer : public Peer bool mDamageCert{false}; bool mDamageAuth{false}; + + // Test tx set flooding by suspending tx flooding. + // If we allow tx flooding, it is possible that + // nodes don't need to flood tx sets because + // nodes may have the exact same txns + // in their TransactionQueue. + // This is more likely to be true + // esp in test environments with not many real-world obstacles. + bool mSuspendTxFlooding{false}; std::bernoulli_distribution mDuplicateProb{0.0}; std::bernoulli_distribution mReorderProb{0.0}; std::bernoulli_distribution mDamageProb{0.0}; @@ -110,6 +119,12 @@ class LoopbackPeer : public Peer double getReorderProbability() const; void setReorderProbability(double d); + void + suspendTxFlooding() + { + mSuspendTxFlooding = true; + } + void clearInAndOutQueues(); size_t diff --git a/src/overlay/test/OverlayTests.cpp b/src/overlay/test/OverlayTests.cpp index 35fe519a86..a1c5724df3 100644 --- a/src/overlay/test/OverlayTests.cpp +++ b/src/overlay/test/OverlayTests.cpp @@ -2956,4 +2956,97 @@ TEST_CASE("overlay pull mode with many peers", REQUIRE(getSentDemandCount(apps[0]) == maxRetry); } + +TEST_CASE("overlay tx set fetching test", "[overlay][acceptance]") +{ + auto networkID = sha256(getTestConfig().NETWORK_PASSPHRASE); + auto simulation = + std::make_shared(Simulation::OVER_LOOPBACK, networkID); + + SIMULATION_CREATE_NODE(Node1); + SIMULATION_CREATE_NODE(Node2); + SIMULATION_CREATE_NODE(Node3); + + // Not at all a safe quorum set + // but this ensures that node 1 is definitely node 1's leader, + // so it'll definitely nominate the generated load right away. + SCPQuorumSet qSet; + qSet.threshold = 1; + qSet.validators.push_back(vNode1NodeID); + + auto node1 = simulation->addNode(vNode1SecretKey, qSet); + auto node2 = simulation->addNode(vNode2SecretKey, qSet); + auto node3 = simulation->addNode(vNode3SecretKey, qSet); + + // Node 1 <-> Node 2 <-> Node 3 + // When Node 3 first hears about a nonempty tx set from Node 2, + // Node 2 doesn't have the tx set yet. (only the hash) + simulation->addConnection(vNode1NodeID, vNode2NodeID); + simulation->addConnection(vNode2NodeID, vNode3NodeID); + auto con1 = simulation->getLoopbackConnection(vNode1NodeID, vNode2NodeID); + auto con2 = simulation->getLoopbackConnection(vNode2NodeID, vNode3NodeID); + con1->getInitiator()->suspendTxFlooding(); + con1->getAcceptor()->suspendTxFlooding(); + con2->getAcceptor()->suspendTxFlooding(); + + simulation->startAllNodes(); + + REQUIRE(node1->getLedgerManager().isSynced()); + REQUIRE(node2->getLedgerManager().isSynced()); + REQUIRE(node3->getLedgerManager().isSynced()); + + auto& loadGen1 = node1->getLoadGenerator(); + auto const numAccounts = 100; + loadGen1.generateLoad( + GeneratedLoadConfig::createAccountsLoad(/* nAccounts */ numAccounts, + /* txRate */ 1)); + auto& loadGenDone1 = + node1->getMetrics().NewMeter({"loadgen", "run", "complete"}, "run"); + auto currLoadGenCount1 = loadGenDone1.count(); + + simulation->crankUntil( + [&]() { return loadGenDone1.count() > currLoadGenCount1; }, + 10 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false); + + // Generating load + loadGen1.generateLoad(GeneratedLoadConfig::txLoad( + LoadGenMode::PAY, /* nAccounts */ numAccounts, + /* nTxs */ 50, /* txRate */ 1, /* offset */ 0)); + auto& loadGen2 = node2->getLoadGenerator(); + auto& loadGen3 = node3->getLoadGenerator(); + loadGen2.generateLoad(GeneratedLoadConfig::txLoad( + LoadGenMode::PAY, /* nAccounts */ numAccounts, + /* nTxs */ 50, /* txRate */ 1, /* offset */ 1 * numAccounts)); + loadGen3.generateLoad(GeneratedLoadConfig::txLoad( + LoadGenMode::PAY, /* nAccounts */ numAccounts, + /* nTxs */ 50, /* txRate */ 1, /* offset */ 2 * numAccounts)); + + auto& loadGenDone2 = + node2->getMetrics().NewMeter({"loadgen", "run", "complete"}, "run"); + auto& loadGenDone3 = + node3->getMetrics().NewMeter({"loadgen", "run", "complete"}, "run"); + + currLoadGenCount1 = loadGenDone1.count(); + auto currLoadGenCount2 = loadGenDone2.count(); + auto currLoadGenCount3 = loadGenDone3.count(); + + simulation->crankUntil( + [&]() { + return loadGenDone1.count() > currLoadGenCount1 && + loadGenDone2.count() > currLoadGenCount2 && + loadGenDone3.count() > currLoadGenCount3; + }, + 500 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false); + + // Node 1 is its own quorum. So when Node 1 generates the load, + // it immediately closes a ledger with it. + // Remember that ledger, and let the other three nodes catch up to it. + auto ledger = node1->getLedgerManager().getLastClosedLedgerNum(); + + // As long as all the other nodes can catch up to `ledger`, + // we can be certain that they were able to get all the tx sets. + simulation->crankUntil( + [&]() { return simulation->haveAllExternalized(ledger, 10); }, + ledger * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false); +} } diff --git a/src/scp/BallotProtocol.cpp b/src/scp/BallotProtocol.cpp index 55a6da810f..6f37cc041e 100644 --- a/src/scp/BallotProtocol.cpp +++ b/src/scp/BallotProtocol.cpp @@ -188,7 +188,8 @@ BallotProtocol::processEnvelope(SCPEnvelopeWrapperPtr envelope, bool self) auto validationRes = validateValues(statement); // If the value is not valid, we just ignore it. - if (validationRes == SCPDriver::kInvalidValue) + if (validationRes == SCPDriver::kInvalidValue || + validationRes == SCPDriver::kVoteToNominate) { if (self) { diff --git a/src/scp/NominationProtocol.cpp b/src/scp/NominationProtocol.cpp index 6b44566538..493682dd9e 100644 --- a/src/scp/NominationProtocol.cpp +++ b/src/scp/NominationProtocol.cpp @@ -26,6 +26,22 @@ NominationProtocol::NominationProtocol(Slot& slot) { } +bool +NominationProtocol::processedNewerStatement(NodeID const& nodeID, + SCPNomination const& st) +{ + auto oldp = mLatestNominations.find(nodeID); + if (oldp == mLatestNominations.end()) + { + return false; + } + else + { + return isNewerStatement( + st, oldp->second->getStatement().pledges.nominate()); + } +} + bool NominationProtocol::isNewerStatement(NodeID const& nodeID, SCPNomination const& st) @@ -348,6 +364,10 @@ NominationProtocol::getNewValueFromNomination(SCPNomination const& nom) { valueToNominate = mSlot.getSCPDriver().wrapValue(value); } + else if (vl == SCPDriver::kVoteToNominate) + { + valueToNominate = mSlot.getSCPDriver().wrapValue(value); + } else { valueToNominate = extractValidValue(value); @@ -391,7 +411,10 @@ NominationProtocol::processEnvelope(SCPEnvelopeWrapperPtr envelope) auto const& st = envelope->getStatement(); auto const& nom = st.pledges.nominate(); - if (!isNewerStatement(st.nodeID, nom)) + // If we've processed the same envelope, we'll process it again + // since the validity of values might have changed + // (e.g., tx set fetch) + if (processedNewerStatement(st.nodeID, nom)) return SCP::EnvelopeState::INVALID; if (!isSane(st)) diff --git a/src/scp/NominationProtocol.h b/src/scp/NominationProtocol.h index 3fe28b59f9..50b9e943bc 100644 --- a/src/scp/NominationProtocol.h +++ b/src/scp/NominationProtocol.h @@ -41,6 +41,7 @@ class NominationProtocol Value mPreviousValue; bool isNewerStatement(NodeID const& nodeID, SCPNomination const& st); + bool processedNewerStatement(NodeID const& nodeID, SCPNomination const& st); // returns true if 'p' is a subset of 'v' // also sets 'notEqual' if p and v differ diff --git a/src/scp/SCPDriver.h b/src/scp/SCPDriver.h index c1f9085c63..0fbd714d6f 100644 --- a/src/scp/SCPDriver.h +++ b/src/scp/SCPDriver.h @@ -70,6 +70,11 @@ class SCPDriver { } + // Users of the SCP library should inherit from SCPDriver and implement the + // virtual methods which are called by the SCP implementation to + // abstract the transport layer used from the implementation of the SCP + // protocol. + // Envelope signature virtual void signEnvelope(SCPEnvelope& envelope) = 0; @@ -91,11 +96,6 @@ class SCPDriver // considered invalid. virtual SCPQuorumSetPtr getQSet(Hash const& qSetHash) = 0; - // Users of the SCP library should inherit from SCPDriver and implement the - // virtual methods which are called by the SCP implementation to - // abstract the transport layer used from the implementation of the SCP - // protocol. - // Delegates the emission of an SCPEnvelope to the user of SCP. Envelopes // should be flooded to the network. virtual void emitEnvelope(SCPEnvelope const& envelope) = 0; @@ -115,9 +115,10 @@ class SCPDriver // NB: validation levels are ordered enum ValidationLevel { - kInvalidValue = 0, // value is invalid for sure - kMaybeValidValue = 1, // value may be valid - kFullyValidatedValue = 2 // value is valid for sure + kInvalidValue = 0, // value is invalid for sure + kMaybeValidValue = 1, // value may be valid + kFullyValidatedValue = 2, // value is valid for sure + kVoteToNominate = 3 // value is valid enough to vote to nominate }; virtual ValidationLevel validateValue(uint64 slotIndex, Value const& value, bool nomination) @@ -177,6 +178,10 @@ class SCPDriver // `computeTimeout` computes a timeout given a round number // it should be sufficiently large such that nodes in a // quorum can exchange 4 messages + + // TODO: does 'can exchange 4 messages' consider sending getTxSet messages? + // Should we clarify this? + virtual std::chrono::milliseconds computeTimeout(uint32 roundNumber); // Inform about events happening within the consensus algorithm. diff --git a/src/scp/readme.md b/src/scp/readme.md index 0c8949f11f..b279bef4a1 100644 --- a/src/scp/readme.md +++ b/src/scp/readme.md @@ -23,7 +23,7 @@ which contains all the base types used through the implementation The `stellar-core` program has a single subclass of SCPDriver called -[Herder](../herder), which gives a specific interpretation to "slot" and +[HerderSCPDriver](../herder/HerderSCPDriver.h), which gives a specific interpretation to "slot" and "value", and connects SCP up with a specific broadcast communication medium ([Overlay](../overlay)) and specific replicated state machine ([LedgerManager](../ledger)). diff --git a/src/scp/test/SCPTests.cpp b/src/scp/test/SCPTests.cpp index 401fe4e5e2..a6beea3574 100644 --- a/src/scp/test/SCPTests.cpp +++ b/src/scp/test/SCPTests.cpp @@ -56,6 +56,9 @@ class TestSCP : public SCPDriver public: SCP mSCP; + SCPDriver::ValidationLevel mValidationLevel{ + SCPDriver::kFullyValidatedValue}; + TestSCP(NodeID const& nodeID, SCPQuorumSet const& qSetLocal, bool isValidator = true) : mSCP(*this, nodeID, isValidator, qSetLocal) @@ -87,7 +90,7 @@ class TestSCP : public SCPDriver validateValue(uint64 slotIndex, Value const& value, bool nomination) override { - return SCPDriver::kFullyValidatedValue; + return mValidationLevel; } void @@ -3156,4 +3159,111 @@ TEST_CASE("nomination tests core5", "[scp][nominationprotocol]") } } } + +TEST_CASE("nomination votes to nominate optimistically", + "[scp][nominationprotocol]") +{ + setupValues(); + SIMULATION_CREATE_NODE(0); + SIMULATION_CREATE_NODE(1); + + // 2 out of 2. + SCPQuorumSet qSet; + qSet.threshold = 2; + qSet.validators.push_back(v0NodeID); + qSet.validators.push_back(v1NodeID); + + uint256 qSetHash = sha256(xdr::xdr_to_opaque(qSet)); + + REQUIRE(xValue < yValue); + REQUIRE(yValue < zValue); + + auto checkLeaders = [&](TestSCP& scp, std::set expectedLeaders) { + auto l = scp.getNominationLeaders(0); + REQUIRE(std::equal(l.begin(), l.end(), expectedLeaders.begin(), + expectedLeaders.end())); + }; + + TestSCP scp(v0SecretKey.getPublicKey(), qSet); + uint256 qSetHash0 = scp.mSCP.getLocalNode()->getQuorumSetHash(); + scp.storeQuorumSet(std::make_shared(qSet)); + scp.mExpectedCandidates.emplace(xValue); + scp.mCompositeValue = xValue; + + SECTION("fully validated value") + { + scp.mValidationLevel = SCPDriver::kFullyValidatedValue; + + // Node 0 (leader) votes to nominate xValue. + REQUIRE(scp.nominate(0, xValue, false)); + + checkLeaders(scp, {v0SecretKey.getPublicKey()}); + + std::vector votes, accepted; + votes.emplace_back(xValue); + + REQUIRE(scp.mEnvs.size() == 1); + verifyNominate(scp.mEnvs[0], v0SecretKey, qSetHash0, 0, votes, + accepted); + + // Node 1 has voted and accepted to nominate xValue. + accepted.emplace_back(xValue); + SCPEnvelope nom1 = + makeNominate(v1SecretKey, qSetHash, 0, votes, accepted); + + // This allows node 0 to accept and confirm to nominate xValue, + // and also start the ballot protocol. + scp.receiveEnvelope(nom1); + REQUIRE(scp.mEnvs.size() == 3); + + // We emit the accept(nominate(X)) after the prepare statement. + verifyNominate(scp.mEnvs[2], v0SecretKey, qSetHash0, 0, votes, + accepted); + + SCPBallot b(1, xValue); + verifyPrepare(scp.mEnvs[1], v0SecretKey, qSetHash0, 0, b); + } + SECTION("fully validated later") + { + scp.mValidationLevel = SCPDriver::kVoteToNominate; + + // Node 0 (leader) votes to nominate xValue. + REQUIRE(scp.nominate(0, xValue, false)); + + checkLeaders(scp, {v0SecretKey.getPublicKey()}); + + std::vector votes, accepted; + votes.emplace_back(xValue); + + REQUIRE(scp.mEnvs.size() == 1); + verifyNominate(scp.mEnvs[0], v0SecretKey, qSetHash0, 0, votes, + accepted); + + // Node 1 has voted and accepted to nominate xValue. + accepted.emplace_back(xValue); + SCPEnvelope nom1 = + makeNominate(v1SecretKey, qSetHash, 0, votes, accepted); + + // This *almost* allows Node 0 to accept to nominate xValue. + // However, Node 0 won't accept to nominate as it hasn't validated + // xValue yet. + scp.receiveEnvelope(nom1); + REQUIRE(scp.mEnvs.size() == 1); // No new envelope emitted + + CLOG_INFO(SCP, "updating the validation level"); + scp.mValidationLevel = SCPDriver::kFullyValidatedValue; + + // Now this time, Node 0 can accept & confirm to nominate xValue + // and also start the ballot protocol. + scp.receiveEnvelope(nom1); + REQUIRE(scp.mEnvs.size() == 3); + + // We emit the accept(nominate(X)) after the prepare statement. + verifyNominate(scp.mEnvs[2], v0SecretKey, qSetHash0, 0, votes, + accepted); + + SCPBallot b(1, xValue); + verifyPrepare(scp.mEnvs[1], v0SecretKey, qSetHash0, 0, b); + } +} } diff --git a/src/test/TestUtils.h b/src/test/TestUtils.h index fa6692c666..49b93356f3 100644 --- a/src/test/TestUtils.h +++ b/src/test/TestUtils.h @@ -21,6 +21,7 @@ namespace testutil { void crankSome(VirtualClock& clock); void crankFor(VirtualClock& clock, VirtualClock::duration duration); + void injectSendPeersAndReschedule(VirtualClock::time_point& end, VirtualClock& clock, VirtualTimer& timer, LoopbackPeerConnection& connection);