Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(txpool): calculate consumption when committing a new block #1554

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config,
eventCh = nil
}

txPool := txpool.NewTxPool(conf.TxPool, messageCh)

str, err := store.NewStore(conf.Store)
if err != nil {
return nil, err
}

txPool := txpool.NewTxPool(conf.TxPool, messageCh, str)

st, err := state.LoadOrNewState(genDoc, valKeys, str, txPool, eventCh)
if err != nil {
return nil, err
Expand Down
7 changes: 3 additions & 4 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@
// Only one subsidy transaction per blk
if txs[i].IsSubsidyTx() {
st.logger.Error("found duplicated subsidy transaction", "tx", txs[i])
st.txPool.RemoveTx(txs[i].ID())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b00f Please double check for remove this line.

txs.Remove(i)
i--

Expand Down Expand Up @@ -434,9 +433,9 @@

st.store.SaveBlock(blk, cert)

// Remove transactions from pool
for _, trx := range blk.Transactions() {
st.txPool.RemoveTx(trx.ID())
// Remove transactions from pool and update consumption
if err = st.txPool.HandleCommittedBlock(blk); err != nil {

Check failure on line 437 in state/state.go

View workflow job for this annotation

GitHub Actions / linting

sloppyReassign: re-assignment to `err` can be replaced with `err := st.txPool.HandleCommittedBlock(blk)` (gocritic)

Check failure on line 437 in state/state.go

View workflow job for this annotation

GitHub Actions / build-linux

sloppyReassign: re-assignment to `err` can be replaced with `err := st.txPool.HandleCommittedBlock(blk)` (gocritic)
return err
}

if err := st.store.WriteBatch(); err != nil {
Expand Down
10 changes: 6 additions & 4 deletions txpool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
)

type Config struct {
MaxSize int `toml:"max_size"`
Fee *FeeConfig `toml:"fee"`
MaxSize int `toml:"max_size"`
ConsumptionBlocks uint32 `toml:"-"`
Fee *FeeConfig `toml:"fee"`
}

type FeeConfig struct {
Expand All @@ -17,8 +18,9 @@ type FeeConfig struct {

func DefaultConfig() *Config {
return &Config{
MaxSize: 1000,
Fee: DefaultFeeConfig(),
MaxSize: 1000,
ConsumptionBlocks: 8640,
Fee: DefaultFeeConfig(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion txpool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
SetNewSandboxAndRecheck(sb sandbox.Sandbox)
AppendTxAndBroadcast(trx *tx.Tx) error
AppendTx(trx *tx.Tx) error
RemoveTx(id tx.ID)
HandleCommittedBlock(block *block.Block) error

Check failure on line 26 in txpool/interface.go

View workflow job for this annotation

GitHub Actions / linting

import-shadowing: The name 'block' shadows an import name (revive)

Check failure on line 26 in txpool/interface.go

View workflow job for this annotation

GitHub Actions / build-linux

import-shadowing: The name 'block' shadows an import name (revive)
}
4 changes: 4 additions & 0 deletions txpool/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@
}
}

func (m *MockTxPool) HandleCommittedBlock(_ *block.Block) error {

Check failure on line 82 in txpool/mock.go

View workflow job for this annotation

GitHub Actions / linting

unused-receiver: method receiver 'm' is not referenced in method's body, consider removing or renaming it as _ (revive)

Check failure on line 82 in txpool/mock.go

View workflow job for this annotation

GitHub Actions / build-linux

unused-receiver: method receiver 'm' is not referenced in method's body, consider removing or renaming it as _ (revive)
return nil
}

func (m *MockTxPool) PrepareBlockTransactions() block.Txs {
txs := make([]*tx.Tx, m.Size())
copy(txs, m.Txs)
Expand Down
74 changes: 64 additions & 10 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"fmt"
"sync"

"github.com/pactus-project/pactus/crypto"
"github.com/pactus-project/pactus/store"

"github.com/pactus-project/pactus/execution"
"github.com/pactus-project/pactus/sandbox"
"github.com/pactus-project/pactus/sync/bundle/message"
Expand All @@ -19,14 +22,16 @@
type txPool struct {
lk sync.RWMutex

config *Config
sandbox sandbox.Sandbox
pools map[payload.Type]pool
broadcastCh chan message.Message
logger *logger.SubLogger
config *Config
sandbox sandbox.Sandbox
pools map[payload.Type]pool
consumptionMap map[crypto.Address]uint32
broadcastCh chan message.Message
strReader store.Reader
logger *logger.SubLogger
}

func NewTxPool(conf *Config, broadcastCh chan message.Message) TxPool {
func NewTxPool(conf *Config, broadcastCh chan message.Message, storeReader store.Reader) TxPool {
pools := make(map[payload.Type]pool)
pools[payload.TypeTransfer] = newPool(conf.transferPoolSize(), conf.minFee())
pools[payload.TypeBond] = newPool(conf.bondPoolSize(), conf.minFee())
Expand All @@ -35,9 +40,11 @@
pools[payload.TypeSortition] = newPool(conf.sortitionPoolSize(), 0)

pool := &txPool{
config: conf,
pools: pools,
broadcastCh: broadcastCh,
config: conf,
pools: pools,
consumptionMap: make(map[crypto.Address]uint32),
strReader: storeReader,
broadcastCh: broadcastCh,
}

pool.logger = logger.NewSubLogger("_pool", pool)
Expand Down Expand Up @@ -133,10 +140,57 @@
return nil
}

func (p *txPool) RemoveTx(id tx.ID) {
func (p *txPool) HandleCommittedBlock(blk *block.Block) error {
p.lk.Lock()
defer p.lk.Unlock()

for _, trx := range blk.Transactions() {
p.removeTx(trx.ID())

p.handleIncreaseConsumption(trx)
}

return p.handleDecreaseConsumption(blk.Height())
}

func (p *txPool) handleIncreaseConsumption(trx *tx.Tx) {
if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() {
signer := trx.Payload().Signer()

// retrieve existing consumption or start with 0
p.consumptionMap[signer] = p.consumptionMap[signer] + uint32(trx.SerializeSize())

Check failure on line 161 in txpool/txpool.go

View workflow job for this annotation

GitHub Actions / linting

assignOp: replace `p.consumptionMap[signer] = p.consumptionMap[signer] + uint32(trx.SerializeSize())` with `p.consumptionMap[signer] += uint32(trx.SerializeSize())` (gocritic)

Check failure on line 161 in txpool/txpool.go

View workflow job for this annotation

GitHub Actions / build-linux

assignOp: replace `p.consumptionMap[signer] = p.consumptionMap[signer] + uint32(trx.SerializeSize())` with `p.consumptionMap[signer] += uint32(trx.SerializeSize())` (gocritic)
}
}

func (p *txPool) handleDecreaseConsumption(height uint32) error {
if height <= p.config.ConsumptionBlocks {
return nil
}

oldConsumptionHeight := height - p.config.ConsumptionBlocks
committedBlock, err := p.strReader.Block(oldConsumptionHeight)
if err != nil {
return err
}

blk, err := committedBlock.ToBlock()
if err != nil {
return err
}

for _, trx := range blk.Transactions() {
if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() {
signer := trx.Payload().Signer()
if v, ok := p.consumptionMap[signer]; ok {
p.consumptionMap[signer] = v - uint32(trx.SerializeSize())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b00f I don't know on test TestCalculatingConsumption why value p.consumptionMap for specific signer don't update.

}
}
}

return nil
}

func (p *txPool) removeTx(id tx.ID) {
for _, pool := range p.pools {
if pool.list.Remove(id) {
break
Expand Down
37 changes: 34 additions & 3 deletions txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"testing"
"time"

"github.com/pactus-project/pactus/store"

"github.com/pactus-project/pactus/execution"
"github.com/pactus-project/pactus/sandbox"
"github.com/pactus-project/pactus/sync/bundle/message"
Expand All @@ -22,12 +24,14 @@

pool *txPool
sandbox *sandbox.MockSandbox
str *store.MockStore
ch chan message.Message
}

func testConfig() *Config {
return &Config{
MaxSize: 10,
MaxSize: 10,
ConsumptionBlocks: 3,
Fee: &FeeConfig{
FixedFee: 0.000001,
DailyLimit: 280,
Expand All @@ -44,7 +48,8 @@
ch := make(chan message.Message, 10)
sb := sandbox.MockingSandbox(ts)
config := testConfig()
p := NewTxPool(config, ch)
mockStore := store.MockingStore(ts)
p := NewTxPool(config, ch, mockStore)
p.SetNewSandboxAndRecheck(sb)
pool := p.(*txPool)
assert.NotNil(t, pool)
Expand All @@ -53,6 +58,7 @@
TestSuite: ts,
pool: pool,
sandbox: sb,
str: mockStore,
ch: ch,
}
}
Expand Down Expand Up @@ -96,11 +102,36 @@
// Appending the same transaction again, should not return any error
assert.NoError(t, td.pool.AppendTx(testTrx))

td.pool.RemoveTx(testTrx.ID())
td.pool.removeTx(testTrx.ID())
assert.False(t, td.pool.HasTx(testTrx.ID()), "Transaction should be removed")
assert.Nil(t, td.pool.PendingTx(testTrx.ID()))
}

func TestCalculatingConsumption(t *testing.T) {

Check failure on line 110 in txpool/txpool_test.go

View workflow job for this annotation

GitHub Actions / linting

empty-lines: extra empty line at the end of a block (revive)

Check failure on line 110 in txpool/txpool_test.go

View workflow job for this annotation

GitHub Actions / build-linux

empty-lines: extra empty line at the end of a block (revive)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b00f What case need for check consumption map? I need to more details for test cases.

td := setup(t)

_, prv := td.TestSuite.RandEd25519KeyPair()
signer := prv

for i := uint32(1); i < 10; i++ {
txr := td.GenerateTestTransferTx(func(tm *testsuite.TransactionMaker) {
tm.Signer = signer
})

blk, cert := td.TestSuite.GenerateTestBlock(i, func(bm *testsuite.BlockMaker) {
bm.Txs = []*tx.Tx{txr}
})

td.str.SaveBlock(blk, cert)

err := td.pool.HandleCommittedBlock(blk)
require.NoError(t, err)

//t.Log(td.pool.consumptionMap[txr.Payload().Signer()])

Check failure on line 130 in txpool/txpool_test.go

View workflow job for this annotation

GitHub Actions / linting

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 130 in txpool/txpool_test.go

View workflow job for this annotation

GitHub Actions / build-linux

commentFormatting: put a space between `//` and comment text (gocritic)
}

}

Check failure on line 133 in txpool/txpool_test.go

View workflow job for this annotation

GitHub Actions / linting

unnecessary trailing newline (whitespace)

Check failure on line 133 in txpool/txpool_test.go

View workflow job for this annotation

GitHub Actions / build-linux

unnecessary trailing newline (whitespace)

func TestAppendInvalidTransaction(t *testing.T) {
td := setup(t)

Expand Down
Loading