Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nomination prototype #3856

Closed
wants to merge 54 commits into from
Closed
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
83c46a8
SCP change
Jan 30, 2023
4c11fd0
move mPendingTxRequests to private variable in implementation
JuI3s Jul 26, 2023
e29bb86
fix clang format
JuI3s Jul 26, 2023
5ac20c0
Debugging tx fetching
JuI3s Aug 8, 2023
0e6cf73
Debug tx queue source account limit test
JuI3s Aug 10, 2023
3478fba
Node adds pending tx set requests for missing tx sets
JuI3s Aug 11, 2023
5883118
Remove debug info
JuI3s Aug 16, 2023
da136a7
Remove debug CLOG_INFO
JuI3s Aug 16, 2023
541f632
Make SEND_DONT_HAVE_DELAY configurable
JuI3s Aug 16, 2023
1db7bc2
Wait before sending DONT_HAVE for tx set hash request
JuI3s Aug 16, 2023
46236b5
Set default SEND_DONT_HAVE_DELAY to 2 seconds
JuI3s Aug 16, 2023
cee418e
Set SEND_DONT_HAVE_DELAY to 100 milliseconds
JuI3s Aug 16, 2023
b8222e2
Add delay sending DONT_HAVE test
JuI3s Aug 26, 2023
efee75d
Change MS_TO_WAIT_FOR_FETCH_REPLY to constexpr
JuI3s Aug 28, 2023
4d5c13d
Limit number of outstanding pending getTxnSet requests
JuI3s Aug 28, 2023
96b12f7
Add recvGetTxSet flow diagram and set max num pending requests to 1
JuI3s Aug 31, 2023
e10a556
temp
JuI3s Sep 2, 2023
c6757f4
Add flow diagram for Peer::recvGetTxSet.
JuI3s Sep 5, 2023
5ff851a
Peers receive nomination envelops in "delay sending DONT_HAVE" test b…
JuI3s Sep 6, 2023
b85bc45
Fix typos and update documentation
JuI3s Sep 12, 2023
f904642
Herder purges pending getTxSet requests when value externalized
JuI3s Sep 13, 2023
3cbb29f
Rename OverlayManager::getPendingGetTxSetRequests
JuI3s Sep 13, 2023
13c4874
Fix format
JuI3s Sep 13, 2023
1d69e90
Store pending getTxSet requests by slotIndex. Purge pending getTxSet …
JuI3s Sep 13, 2023
d8e0005
Fix delay send DONT_HAVE test
JuI3s Sep 13, 2023
b94fa8b
Change slot index in Peer::recvGetTxSet to currently tracked ledger seq
JuI3s Sep 13, 2023
63303c2
Remove max number of pending getTxSet requests to keep
JuI3s Sep 13, 2023
d8e25a6
Use slot index in recvGetTxSet from the highest slot index seen for t…
JuI3s Sep 14, 2023
aaeba33
Change require conditions in Herder test 'delay sending DONT_HAVE'
JuI3s Sep 17, 2023
92435b9
remove DONT_HAVE tests
JuI3s Sep 17, 2023
f6d58c0
Add delay sending DONT_HAVE test
JuI3s Sep 18, 2023
41308d3
Test to make sure node rejects unknown getTxSet requests
JuI3s Sep 20, 2023
4d67a45
Delete delay sending DONT_HAVE simulation test
JuI3s Sep 20, 2023
ce080d3
Do not start fetching when receive nomination msg with tx set hash fo…
JuI3s Sep 26, 2023
99a9a63
Quick fix for making delay sending DONT_HAVE test pass using legacy t…
JuI3s Sep 27, 2023
ba04c15
Add assert(!forceIsNotGeneralized) to make sure it's only used for te…
JuI3s Sep 27, 2023
9f534db
PR fix No.1
JuI3s Sep 28, 2023
2acba40
fix pr 2
JuI3s Sep 28, 2023
1651120
PR fix 3
JuI3s Sep 28, 2023
5e43b7a
PR fix 3. Use generalized tx sets instead of tx sets.
JuI3s Sep 29, 2023
1d71931
Working on generating load for test 'overlay tx set fetching test'
JuI3s Sep 29, 2023
254fc6f
Do not send tx set back to peers we've previously sent DONT_HAVE to
JuI3s Sep 30, 2023
5e5445d
fix delay sending DONT_HAVE test
JuI3s Sep 30, 2023
555b9e0
Send DONT_HAVE back immediately for slot index not currently tracking
JuI3s Sep 30, 2023
86cd20b
Update recvGetTxSet diagram
JuI3s Sep 30, 2023
da29888
Change add recvGetTxSet clearing request logic and add documentation
JuI3s Sep 30, 2023
a94237d
Reject untracked tx set message in recvGetTxSet
JuI3s Sep 30, 2023
fe80739
temp
JuI3s Sep 30, 2023
cf90561
Revert "Change add recvGetTxSet clearing request logic and add docume…
JuI3s Sep 30, 2023
5da884e
Fix ambiguities in and errors recvGetTxSet flow diagram
JuI3s Sep 30, 2023
101e0ae
Moving delay sending DONT_HAVE logic to Peer::maybeSendDontHave
JuI3s Oct 3, 2023
73dba79
Fix bugs in maybeSendDontHaveAfterDelay
JuI3s Oct 3, 2023
a7ca10e
Turn log info msg for sending DONT_HAVE to trace
JuI3s Oct 3, 2023
122363e
Remove logs
JuI3s Oct 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/herder/Herder.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ class Herder
// sender in the pending or recent tx sets.
virtual SequenceNumber getMaxSeqInPendingTxs(AccountID const&) = 0;

// If the node has asked about a tx set with the given hash, return the
// highest slot asked about for that tx set, otherwise return 0. Used for
// deciding what get tx set requests to accept.
virtual uint64 getLastSeenSlotIndexForTxSet(Hash const& hash) = 0;

// Returns sequence number for most recent completed checkpoint that the
// node knows about, as derived from
// trackingConsensusLedgerIndex
Expand Down
17 changes: 17 additions & 0 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,17 @@ HerderImpl::recvSCPEnvelope(SCPEnvelope const& envelope)
{
std::string txt("FETCHING");
ZoneText(txt.c_str(), txt.size());
// If nomination, we should still process this.
if (envelope.statement.pledges.type() == SCP_ST_NOMINATE)
{
auto qSetHash = Slot::getCompanionQuorumSetHashFromStatement(
envelope.statement);
auto qSet = mApp.getHerder().getQSet(qSetHash);
if (qSet != nullptr)
{
processSCPQueue();
}
}
}
else if (status == Herder::ENVELOPE_STATUS_PROCESSED)
{
Expand Down Expand Up @@ -1220,6 +1231,12 @@ HerderImpl::getMaxSeqInPendingTxs(AccountID const& acc)
return mTransactionQueue.getAccountTransactionQueueInfo(acc).mMaxSeq;
}

uint64
HerderImpl::getLastSeenSlotIndexForTxSet(Hash const& hash)
{
return mPendingEnvelopes.getLastSeenSlotIndexForTxSet(hash);
}

uint32_t
HerderImpl::getMostRecentCheckpointSeq()
{
Expand Down
1 change: 1 addition & 0 deletions src/herder/HerderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class HerderImpl : public Herder
SCPStatement const& newSt) override;

SequenceNumber getMaxSeqInPendingTxs(AccountID const&) override;
uint64 getLastSeenSlotIndexForTxSet(Hash const& hash) override;

uint32_t getMostRecentCheckpointSeq() override;

Expand Down
21 changes: 9 additions & 12 deletions src/herder/HerderSCPDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,9 @@ class SCPHerderEnvelopeWrapper : public SCPEnvelopeWrapper
}
else
{
throw std::runtime_error(fmt::format(
FMT_STRING("SCPHerderEnvelopeWrapper: Wrapping an unknown "
"tx set {} from envelope"),
hexAbbrev(txSetH)));
CLOG_TRACE(Herder,
"unknown tx set {}, we might still able to vote",
hexAbbrev(txSetH));
}
}
}
Expand Down Expand Up @@ -314,10 +313,10 @@ HerderSCPDriver::validateValueHelper(uint64_t slotIndex, StellarValue const& b,

if (!txSet)
{
CLOG_ERROR(Herder, "validateValue i:{} unknown txSet {}", slotIndex,
CLOG_TRACE(Herder, "validateValue i:{} unknown txSet {}", slotIndex,
hexAbbrev(txSetHash));

res = SCPDriver::kInvalidValue;
res =
nomination ? SCPDriver::kVoteToNominate : SCPDriver::kInvalidValue;
}
else if (!checkAndCacheTxSetValid(txSet, closeTimeOffset))
{
Expand Down Expand Up @@ -439,7 +438,7 @@ HerderSCPDriver::extractValidValue(uint64_t slotIndex, Value const& value)
return res;
}

// value marshaling
// value marshalling

std::string
HerderSCPDriver::toShortString(NodeID const& pk) const
Expand Down Expand Up @@ -1178,10 +1177,8 @@ class SCPHerderValueWrapper : public ValueWrapper
mTxSet = mHerder.getTxSet(sv.txSetHash);
if (!mTxSet)
{
throw std::runtime_error(fmt::format(
FMT_STRING(
"SCPHerderValueWrapper tried to bind an unknown tx set {}"),
hexAbbrev(sv.txSetHash)));
CLOG_TRACE(Herder, "unknown tx set {}, we might vote to nominate",
hexAbbrev(sv.txSetHash));
}
}
};
Expand Down
33 changes: 29 additions & 4 deletions src/herder/PendingEnvelopes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,21 @@ PendingEnvelopes::addTxSet(Hash const& hash, uint64 lastSeenSlotIndex,
mTxSetFetcher.recv(hash, mFetchTxSetTimer);
}

uint64

PendingEnvelopes::getLastSeenSlotIndexForTxSet(Hash const& hash)
{
auto lastSeenSlotIndex = mTxSetFetcher.getLastSeenSlotIndex(hash);
return lastSeenSlotIndex;
}

bool
PendingEnvelopes::recvTxSet(Hash const& hash, TxSetFrameConstPtr txset)
{
ZoneScoped;
CLOG_TRACE(Herder, "Got TxSet {}", hexAbbrev(hash));

auto lastSeenSlotIndex = mTxSetFetcher.getLastSeenSlotIndex(hash);
auto lastSeenSlotIndex = getLastSeenSlotIndexForTxSet(hash);
if (lastSeenSlotIndex == 0)
{
return false;
Expand Down Expand Up @@ -361,9 +369,26 @@ PendingEnvelopes::recvSCPEnvelope(SCPEnvelope const& envelope)
}
else
{
// else just keep waiting for it to come in
// and refresh fetchers as needed
startFetch(envelope);
if (envelope.statement.pledges.type() == SCP_ST_NOMINATE)
{
auto qSetH = Slot::getCompanionQuorumSetHashFromStatement(
envelope.statement);
auto mQSet = mApp.getHerder().getQSet(qSetH);
if (mQSet != nullptr)
{
// The qset has been fetched.
// Not all tx sets have been fetched,
// but we should process this nomination statement
// in case we can vote for some/all of them.
envelopeReady(envelope);
}
}
else
{
// else just keep waiting for it to come in
// and refresh fetchers as needed
startFetch(envelope);
}
}

return Herder::ENVELOPE_STATUS_FETCHING;
Expand Down
2 changes: 2 additions & 0 deletions src/herder/PendingEnvelopes.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class PendingEnvelopes
TxSetFrameConstPtr putTxSet(Hash const& hash, uint64 slot,
TxSetFrameConstPtr txset);

uint64 getLastSeenSlotIndexForTxSet(Hash const& hash);

/**
* Check if @p txset identified by @p hash was requested before from peers.
* If not, ignores that @p txset. If it was requested, calls
Expand Down
1 change: 1 addition & 0 deletions src/herder/TxSetFrame.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class TxSetFrame : public NonMovableOrCopyable
// Test helper that only checks the XDR structure validitiy without
// validating internal transactions.
virtual bool checkValidStructure() const;
void computeContentsHashForTesting();
static TxSetFrameConstPtr
makeFromTransactions(Transactions txs, Application& app,
uint64_t lowerBoundCloseTimeOffset,
Expand Down
157 changes: 157 additions & 0 deletions src/herder/test/HerderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4346,6 +4346,7 @@ externalize(SecretKey const& sk, LedgerManager& lm, HerderImpl& herder,
txsPhases.emplace_back(sorobanTxs);

auto txSet = TxSetFrame::makeFromTransactions(txsPhases, app, 0, 0);

herder.getPendingEnvelopes().putTxSet(txSet->getContentsHash(), ledgerSeq,
txSet);

Expand Down Expand Up @@ -5339,3 +5340,159 @@ TEST_CASE("exclude transactions by operation type", "[herder]")
TransactionQueue::AddResult::ADD_STATUS_PENDING);
}
}

TEST_CASE("delay sending DONT_HAVE", "[herder]")
{
VirtualClock clock;
auto const numNodes = 2;
std::vector<std::shared_ptr<Application>> apps;
std::chrono::milliseconds const epsilon{1};
auto s = SecretKey::pseudoRandomForTesting();

for (auto i = 0; i < numNodes; i++)
{
Config cfg = getTestConfig(i);
cfg.FLOOD_DEMAND_BACKOFF_DELAY_MS = std::chrono::milliseconds(200);
cfg.FLOOD_DEMAND_PERIOD_MS = std::chrono::milliseconds(200);
// Using a small tq set size such as 50 may lead to an unexpectedly
// small advert/demand size limit.
cfg.MANUAL_CLOSE = false;
cfg.TESTING_UPGRADE_MAX_TX_SET_SIZE = 1000;
cfg.SEND_DONT_HAVE_DELAY = std::chrono::milliseconds{300};
// Setting validators
cfg.QUORUM_SET.validators.emplace_back(s.getPublicKey());
// cfg.LEDGER_PROTOCOL_VERSION
apps.push_back(createTestApplication(clock, cfg));
}

auto connection =
std::make_shared<LoopbackPeerConnection>(*apps[0], *apps[1]);
testutil::crankFor(clock, std::chrono::seconds(5));
REQUIRE(connection->getInitiator()->isAuthenticated());
REQUIRE(connection->getAcceptor()->isAuthenticated());

// TODO: maybe replace these lambdas with static definitions?
auto createTxn = [](auto n) {
StellarMessage txn;
txn.type(TRANSACTION);
Memo memo(MEMO_TEXT);
memo.text() = "tx" + std::to_string(n);
txn.transaction().v0().tx.memo = memo;

return std::make_shared<StellarMessage>(txn);
};

auto createGetTxSetMessage = [](uint256 const& setID) {
StellarMessage getTxSet;
getTxSet.type(GET_TX_SET);
getTxSet.txSetHash() = setID;

return std::make_shared<StellarMessage>(getTxSet);
};

auto createTxSetMessage = [](TxSetFrameConstPtr txSet) {
StellarMessage msg;
msg.type(GENERALIZED_TX_SET);
txSet->toXDR(msg.generalizedTxSet());
return std::make_shared<StellarMessage const>(msg);
};

using TxPair = std::pair<Value, TxSetFrameConstPtr>;
using SVUpgrades = decltype(StellarValue::upgrades);
auto root = TestAccount::createRoot(*apps[0]);

auto makeTxUpgradePair = [&](TxSetFrameConstPtr txSet, uint64_t closeTime,
SVUpgrades const& upgrades) {
StellarValue sv = apps[0]->getHerder().makeStellarValue(
txSet->getContentsHash(), closeTime, upgrades, root.getSecretKey());
auto v = xdr::xdr_to_opaque(sv);
return TxPair{v, txSet};
};

auto makeTxPair = [&](TxSetFrameConstPtr txSet, uint64_t closeTime) {
return makeTxUpgradePair(txSet, closeTime, emptyUpgradeSteps);
};

auto makeNominationEnvelope = [&s, &apps](TxPair const& p, Hash qSetHash,
uint64_t slotIndex) {
// herder must want the TxSet before receiving it, so we are sending it
// a fake envelope
auto envelope = SCPEnvelope{};
envelope.statement.slotIndex = slotIndex;
envelope.statement.pledges.type(SCP_ST_NOMINATE);
envelope.statement.pledges.nominate().votes.push_back(p.first);
envelope.statement.pledges.nominate().quorumSetHash = qSetHash;
envelope.statement.nodeID = s.getPublicKey();
static_cast<HerderImpl&>(apps[0]->getHerder())
.signEnvelope(s, envelope);
return envelope;
};

// Create txn set.
auto tx = createTxn(1);
std::vector<TransactionFrameBasePtr> txs = {
TransactionFrameBase::makeTransactionFromWire(apps[0]->getNetworkID(),
tx->transaction()),
};
auto txnSetFrame = TxSetFrame::makeFromTransactions(txs, *apps[0], 0, 0);

// Sending get tx set message.
auto txSetHash = txnSetFrame->getContentsHash();
auto getTxSet = createGetTxSetMessage(txSetHash);
connection->getAcceptor()->sendMessage(getTxSet, false);

REQUIRE(!apps[0]->getHerder().getTxSet(txSetHash));
REQUIRE(!apps[1]->getHerder().getTxSet(txSetHash));

// Should not add to pending getTxSet requests while we wait before
// retrying.
testutil::crankFor(clock, epsilon);

auto closedTime = apps[0]
->getLedgerManager()
.getLastClosedLedgerHeader()
.header.scpValue.closeTime +
1;

auto p = makeTxPair(txnSetFrame, closedTime);

auto recvTxPairEnvelope = [&apps, makeNominationEnvelope](TxPair p) {
auto envelope = makeNominationEnvelope(
p, {}, apps[0]->getHerder().trackingConsensusLedgerIndex() + 1);

REQUIRE(apps[0]->getHerder().recvSCPEnvelope(envelope) ==
Herder::ENVELOPE_STATUS_FETCHING);
REQUIRE(apps[1]->getHerder().recvSCPEnvelope(envelope) ==
Herder::ENVELOPE_STATUS_FETCHING);
};

SECTION("tx set not received before timeout.")
{
recvTxPairEnvelope(p);
testutil::crankFor(clock, std::chrono::milliseconds{300});

// Receives tx set.
auto txSetMsg = createTxSetMessage(txnSetFrame);
connection->getInitiator()->recvMessage(*txSetMsg);

testutil::crankFor(clock, std::chrono::seconds{1});
// Check peer does not have the txn set hash.
REQUIRE(apps[0]->getHerder().getTxSet(txSetHash));
REQUIRE(!apps[1]->getHerder().getTxSet(txSetHash));
}

SECTION("tx set received before timeout.")
{
recvTxPairEnvelope(p);
// Receives tx set.
auto txSetMsg = createTxSetMessage(txnSetFrame);
connection->getInitiator()->recvMessage(*txSetMsg);

// Pending getTxSet requests are empty since node already has the tx
// set.
testutil::crankFor(clock, std::chrono::seconds{1});
// Check peer has the txn set hash.
REQUIRE(apps[0]->getHerder().getTxSet(txSetHash));
REQUIRE(apps[1]->getHerder().getTxSet(txSetHash));
}
}
13 changes: 13 additions & 0 deletions src/main/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ Config::Config() : NODE_SEED(SecretKey::random())
AUTOMATIC_MAINTENANCE_COUNT = 400;
// automatic self-check happens once every 3 hours
AUTOMATIC_SELF_CHECK_PERIOD = std::chrono::seconds{3 * 60 * 60};
SEND_DONT_HAVE_DELAY = std::chrono::milliseconds{200};
ARTIFICIALLY_GENERATE_LOAD_FOR_TESTING = false;
ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING = false;
ARTIFICIALLY_SET_CLOSE_TIME_FOR_TESTING = 0;
Expand Down Expand Up @@ -224,6 +225,8 @@ Config::Config() : NODE_SEED(SecretKey::random())
FLOOD_ADVERT_PERIOD_MS = std::chrono::milliseconds(100);
FLOOD_DEMAND_BACKOFF_DELAY_MS = std::chrono::milliseconds(500);

TX_SET_BACKOFF_DELAY_MS = std::chrono::milliseconds(1500);

MAX_BATCH_WRITE_COUNT = 1024;
MAX_BATCH_WRITE_BYTES = 1 * 1024 * 1024;
PREFERRED_PEERS_ONLY = false;
Expand Down Expand Up @@ -1142,6 +1145,11 @@ Config::processConfig(std::shared_ptr<cpptoml::table> t)
AUTOMATIC_SELF_CHECK_PERIOD =
std::chrono::seconds{readInt<uint32_t>(item)};
}
else if (item.first == "SEND_DONT_HAVE_DELAY")
{
SEND_DONT_HAVE_DELAY =
std::chrono::seconds{readInt<uint32_t>(item)};
}
else if (item.first == "MANUAL_CLOSE")
{
MANUAL_CLOSE = readBool(item);
Expand Down Expand Up @@ -1272,6 +1280,11 @@ Config::processConfig(std::shared_ptr<cpptoml::table> t)
"bad value for FLOOD_ARB_TX_DAMPING_FACTOR");
}
}
else if (item.first == "TX_SET_BACKOFF_DELAY_MS")
{
TX_SET_BACKOFF_DELAY_MS =
std::chrono::milliseconds(readInt<int>(item, 1));
}
else if (item.first == "PREFERRED_PEERS")
{
PREFERRED_PEERS = readArray<std::string>(item);
Expand Down
Loading