Skip to content

Commit

Permalink
Merge branch 'main' into nodiesBlade-patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
nodiesBlade authored May 27, 2024
2 parents 630ab30 + 5f1aad1 commit e0951ea
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 7 deletions.
30 changes: 26 additions & 4 deletions internal/apps_registry/cached_app_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pokt-network/gateway-server/pkg/pokt/pokt_v0"
pokt "github.com/pokt-network/gateway-server/pkg/pokt/pokt_v0/models"
"go.uber.org/zap"
"reflect"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -171,14 +172,35 @@ func arePoktApplicationSignersEqual(slice1, slice2 []*models.PoktApplicationSign
return sortedSlice2[i].NetworkApp.Address < sortedSlice2[j].NetworkApp.Address
})

// Now that slices are sorted, check if address keys are same.
// Now that slices are sorted, check if address keys and chains are same.
for i := range slice1 {
// Check if any field is different
if !strings.EqualFold(sortedSlice1[i].NetworkApp.Address, sortedSlice2[i].NetworkApp.Address) {

networkApp1 := sortedSlice1[i].NetworkApp
networkApp2 := sortedSlice2[i].NetworkApp

// If address are not same, gateway operator added or remove some application
if !strings.EqualFold(networkApp1.Address, networkApp2.Address) {
return false
}

// Copy network chains and sort them.
// Applications are equal if they have same chains but different ordering
chains1Copy := append([]string{}, networkApp1.Chains...)
chains2Copy := append([]string{}, networkApp2.Chains...)
sort.Strings(chains1Copy)
sort.Strings(chains2Copy)

if len(chains1Copy) != len(chains2Copy) {
return false
}

// Gateway operator may have updated chains staked
if !reflect.DeepEqual(chains1Copy, chains2Copy) {
return false
}

}

// Slices are equal
// Applications are equal
return true
}
121 changes: 121 additions & 0 deletions internal/apps_registry/cached_app_registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package apps_registry

import (
"github.com/pokt-network/gateway-server/internal/apps_registry/models"
pokt_models "github.com/pokt-network/gateway-server/pkg/pokt/pokt_v0/models"
"testing"
)

func Test_arePoktApplicationSignersEqual(t *testing.T) {
type args struct {
slice1 []*models.PoktApplicationSigner
slice2 []*models.PoktApplicationSigner
}
tests := []struct {
name string
args args
want bool
}{
{
name: "different length",
args: args{
slice1: []*models.PoktApplicationSigner{},
slice2: []*models.PoktApplicationSigner{{NetworkApp: &pokt_models.PoktApplication{
Address: "123",
Chains: []string{"123", "123"},
PublicKey: "",
Status: 0,
MaxRelays: 0,
}}},
},
want: false,
},
{
name: "different address",
args: args{
slice1: []*models.PoktApplicationSigner{{NetworkApp: &pokt_models.PoktApplication{
Address: "1234",
Chains: []string{"123", "123"},
PublicKey: "",
Status: 0,
MaxRelays: 0,
}}},
slice2: []*models.PoktApplicationSigner{{NetworkApp: &pokt_models.PoktApplication{
Address: "123",
Chains: []string{"123", "123"},
PublicKey: "",
Status: 0,
MaxRelays: 0,
}}},
},
want: false,
},
{
name: "different chains",
args: args{
slice1: []*models.PoktApplicationSigner{{NetworkApp: &pokt_models.PoktApplication{
Address: "1234",
Chains: []string{"123", "123"},
PublicKey: "",
Status: 0,
MaxRelays: 0,
}}},
slice2: []*models.PoktApplicationSigner{{NetworkApp: &pokt_models.PoktApplication{
Address: "123",
Chains: []string{"123"},
PublicKey: "",
Status: 0,
MaxRelays: 0,
}}},
},
want: false,
},
{
name: "same apps with exact chains",
args: args{
slice1: []*models.PoktApplicationSigner{{NetworkApp: &pokt_models.PoktApplication{
Address: "123",
Chains: []string{"123", "123"},
PublicKey: "",
Status: 0,
MaxRelays: 0,
}}},
slice2: []*models.PoktApplicationSigner{{NetworkApp: &pokt_models.PoktApplication{
Address: "123",
Chains: []string{"123", "123"},
PublicKey: "",
Status: 0,
MaxRelays: 0,
}}},
},
want: true,
},
{
name: "same apps with same chains different ordering",
args: args{
slice1: []*models.PoktApplicationSigner{{NetworkApp: &pokt_models.PoktApplication{
Address: "123",
Chains: []string{"123", "1234", "1235"},
PublicKey: "",
Status: 0,
MaxRelays: 0,
}}},
slice2: []*models.PoktApplicationSigner{{NetworkApp: &pokt_models.PoktApplication{
Address: "123",
Chains: []string{"123", "1235", "1234"},
PublicKey: "",
Status: 0,
MaxRelays: 0,
}}},
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := arePoktApplicationSignersEqual(tt.args.slice1, tt.args.slice2); got != tt.want {
t.Errorf("arePoktApplicationSignersEqual() = %v, want %v", got, tt.want)
}
})
}
}
4 changes: 2 additions & 2 deletions internal/node_selector_service/models/qos_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewQosNode(morseNode *models.Node, pocketSession *models.Session, appSigner
}

func (n *QosNode) IsHealthy() bool {
return !n.isInTimeout() && n.IsSynced()
return !n.IsInTimeout() && n.IsSynced()
}

func (n *QosNode) IsSynced() bool {
Expand All @@ -80,7 +80,7 @@ func (n *QosNode) SetSynced(synced bool) {
n.synced = synced
}

func (n *QosNode) isInTimeout() bool {
func (n *QosNode) IsInTimeout() bool {
return !n.timeoutUntil.IsZero() && time.Now().Before(n.timeoutUntil)
}

Expand Down
64 changes: 63 additions & 1 deletion internal/session_registry/cached_session_registry_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ import (
var (
counterSessionRequest *prometheus.CounterVec
histogramSessionRequestLatency *prometheus.HistogramVec
healthyNodesPerChainGauge *prometheus.GaugeVec
syncedNodesPerChainGauge *prometheus.GaugeVec
timeoutNodesPerChainGauge *prometheus.GaugeVec
ErrRecentlyFailed = errors.New("dispatch recently failed, returning early")
)

const (
blocksPerSession = 4
sessionPrimerInterval = time.Second * 5
ttlCacheCleanerInterval = time.Second * 15
nodeMetricsExporterInterval = time.Second * 20
reasonSessionSuccessCached = "session_cached"
reasonSessionSuccessColdHit = "session_cold_hit"
reasonSessionFailedBackoff = "session_failed_backoff"
Expand All @@ -53,7 +57,28 @@ func init() {
[]string{"cached"},
)

prometheus.MustRegister(counterSessionRequest, histogramSessionRequestLatency)
healthyNodesPerChainGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cached_client_session_healthy_nodes",
Help: "Number of healthy (synced + not in timeout) nodes per chain",
},
[]string{"chain_id"},
)
syncedNodesPerChainGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cached_client_session_synced_nodes",
Help: "Number of synced nodes per chain",
},
[]string{"chain_id"},
)
timeoutNodesPerChainGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cached_client_session_timeout_nodes",
Help: "Number of nodes in timeout per chain",
},
[]string{"chain_id"},
)
prometheus.MustRegister(counterSessionRequest, histogramSessionRequestLatency, healthyNodesPerChainGauge, syncedNodesPerChainGauge, timeoutNodesPerChainGauge)
}

type CachedSessionRegistryService struct {
Expand All @@ -77,6 +102,7 @@ func NewCachedSessionRegistryService(poktClient pokt_v0.PocketService, appRegist
go nodeCache.Start()
cachedRegistry.startTTLCacheCleaner()
cachedRegistry.startSessionUpdater()
cachedRegistry.startNodeMetricsExporter()
return cachedRegistry
}

Expand Down Expand Up @@ -282,3 +308,39 @@ func (c *CachedSessionRegistryService) shouldBackoffDispatchFailure() bool {
func getSessionCacheKey(req *models.GetSessionRequest) string {
return fmt.Sprintf("%s-%s-%d", req.AppPubKey, req.Chain, req.SessionHeight)
}

func (c *CachedSessionRegistryService) exportNodeMetrics() {
nodesMap := c.GetNodesMap()
for sessionKey, sessionItem := range nodesMap {
chainId := sessionKey.Chain
var healthyNodesCount, syncedNodesCount, timeoutNodesCount int

for _, node := range sessionItem.Value() {
if node.IsHealthy() {
healthyNodesCount++
}
if node.IsSynced() {
syncedNodesCount++
}
if node.IsInTimeout() {
timeoutNodesCount++
}
}

healthyNodesPerChainGauge.WithLabelValues(chainId).Set(float64(healthyNodesCount))
syncedNodesPerChainGauge.WithLabelValues(chainId).Set(float64(syncedNodesCount))
timeoutNodesPerChainGauge.WithLabelValues(chainId).Set(float64(timeoutNodesCount))
}
}

func (c *CachedSessionRegistryService) startNodeMetricsExporter() {
ticker := time.Tick(nodeMetricsExporterInterval)
go func() {
for {
select {
case <-ticker:
c.exportNodeMetrics()
}
}
}()
}

0 comments on commit e0951ea

Please sign in to comment.