Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add f3 participant #246

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ executors:
golang:
docker:
# Must match GO_VERSION_MIN in project root
- image: cimg/go:1.21.7
- image: cimg/go:1.22.8
ubuntu:
docker:
- image: ubuntu:20.04
Expand Down Expand Up @@ -342,7 +342,7 @@ jobs:
- checkout

- setup_remote_docker:
version: 19.03.13
version: default
docker_layer_caching: false

- run:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ scratchpad
.vscode
*.out
dockerfile
sophon-miner.log
152 changes: 152 additions & 0 deletions f3participant/multiparticipation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package f3participant

import (
"context"
"fmt"
"time"

"github.com/filecoin-project/go-address"
v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1"
"github.com/ipfs-force-community/sophon-miner/node/modules/helpers"
miner_manager "github.com/ipfs-force-community/sophon-miner/node/modules/miner-manager"
"github.com/jpillora/backoff"
)

const (
// maxCheckProgressAttempts defines the maximum number of failed attempts
// before we abandon the current lease and restart the participation process.
//
// The default backoff takes 12 attempts to reach a maximum delay of 1 minute.
// Allowing for 13 failures results in approximately 2 minutes of backoff since
// the lease was granted. Given a lease validity of up to 5 instances, this means
// we would give up on checking the lease during its mid-validity period;
// typically when we would try to renew the participation ticket. Hence, the value
// to 13.
checkProgressMaxAttempts = 13

// F3LeaseTerm The number of instances the miner will attempt to lease from nodes.
F3LeaseTerm = 5
)

type MultiParticipant struct {
participants map[address.Address]*Participant
minerManager miner_manager.MinerManageAPI

newParticipant func(context.Context, address.Address) *Participant
}

func NewMultiParticipant(ctx helpers.MetricsCtx,
node v1api.FullNode,
minerManager miner_manager.MinerManageAPI,
) (*MultiParticipant, error) {
newParticipant := func(ctx context.Context, participant address.Address) *Participant {
return NewParticipant(
ctx,
node,
participant,
&backoff.Backoff{
Min: 1 * time.Second,
Max: 1 * time.Minute,
Factor: 1.5,
},
checkProgressMaxAttempts,
F3LeaseTerm,
)
}

miners, err := minerManager.List(ctx)
if err != nil {
return nil, err
}

mp := &MultiParticipant{
participants: make(map[address.Address]*Participant),
minerManager: minerManager,
newParticipant: newParticipant,
}

for _, minerInfo := range miners {
if !minerManager.IsOpenMining(ctx, minerInfo.Addr) {
continue
}
p := newParticipant(ctx, minerInfo.Addr)
mp.participants[minerInfo.Addr] = p
}

return mp, nil
}

func (mp *MultiParticipant) Start(ctx context.Context) error {
for _, p := range mp.participants {
if err := p.Start(ctx); err != nil {
return err
}
}
go mp.MonitorMiner(ctx)

return nil
}

func (mp *MultiParticipant) Stop(ctx context.Context) error {
for _, p := range mp.participants {
if err := p.Stop(ctx); err != nil {
return fmt.Errorf("failed to stop participant %v, err: %v", p, err)
}
}
return nil
}

func (mp *MultiParticipant) addParticipant(ctx helpers.MetricsCtx, participant address.Address) error {
p, ok := mp.participants[participant]
if ok {
return nil
}
mp.participants[participant] = mp.newParticipant(ctx, participant)
log.Infof("add participate %s", participant)

return p.Start(ctx)
}

func (mp *MultiParticipant) removeParticipant(ctx context.Context, participant address.Address) error {
p, ok := mp.participants[participant]
if !ok {
return nil
}
delete(mp.participants, participant)
log.Infof("remove participate %s", participant)

return p.Stop(ctx)
}

func (mp *MultiParticipant) MonitorMiner(ctx context.Context) {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
miners, err := mp.minerManager.List(ctx)
if err != nil {
log.Errorf("failed to list miners: %v", err)
continue
}

for _, minerInfo := range miners {
if !mp.minerManager.IsOpenMining(ctx, minerInfo.Addr) {
if err := mp.removeParticipant(ctx, minerInfo.Addr); err != nil {
log.Errorf("failed to remove participate %s: %v", minerInfo.Addr, err)
}
continue
}

if _, ok := mp.participants[minerInfo.Addr]; !ok {
if err := mp.addParticipant(ctx, minerInfo.Addr); err != nil {
log.Errorf("failed to add participate %s: %v", minerInfo.Addr, err)
}
}
}
}
}
}
Loading
Loading