Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Add solana, pokt, moonbeam, and add emit prom metrics #34

Merged
merged 14 commits into from
Apr 21, 2024
4 changes: 3 additions & 1 deletion .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ POKT_APPLICATIONS_ENCRYPTION_KEY=
SESSION_CACHE_TTL=75m
DB_CONNECTION_URL=postgres://myuser:mypassword@postgres:5432/postgres?sslmode=disable
API_KEY=
ALTRUIST_REQUEST_TIMEOUT=10s
ALTRUIST_REQUEST_TIMEOUT=10s
EMIT_SERVICE_URL_PROM_METRICS=false
CHAIN_NETWORK=morse_mainnet
23 changes: 23 additions & 0 deletions cmd/gateway_server/internal/config/dot_env_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"fmt"
"github.com/joho/godotenv"
"github.com/pokt-network/gateway-server/internal/chain_network"
"github.com/pokt-network/gateway-server/internal/global_config"
"os"
"strconv"
Expand All @@ -15,6 +16,8 @@ const (

// Environment variable names
const (
chainNetworkEnv = "CHAIN_NETWORK"
emitServiceUrlPromMetricsEnv = "EMIT_SERVICE_URL_PROM_METRICS"
poktRPCFullHostEnv = "POKT_RPC_FULL_HOST"
httpServerPortEnv = "HTTP_SERVER_PORT"
poktRPCTimeoutEnv = "POKT_RPC_TIMEOUT"
Expand All @@ -29,13 +32,15 @@ const (
// DotEnvGlobalConfigProvider implements the GatewayServerProvider interface.
type DotEnvGlobalConfigProvider struct {
poktRPCFullHost string
chainNetwork chain_network.ChainNetwork
httpServerPort uint
poktRPCRequestTimeout time.Duration
sessionCacheTTL time.Duration
environmentStage global_config.EnvironmentStage
poktApplicationsEncryptionKey string
databaseConnectionUrl string
apiKey string
emitServiceUrlPromMetrics bool
altruistRequestTimeout time.Duration
}

Expand Down Expand Up @@ -83,6 +88,16 @@ func (c DotEnvGlobalConfigProvider) GetAltruistRequestTimeout() time.Duration {
return c.altruistRequestTimeout
}

// ShouldEmitServiceUrl returns whether to emit service url tags as part of relay metrics.
func (c DotEnvGlobalConfigProvider) ShouldEmitServiceUrlPromMetrics() bool {
return c.emitServiceUrlPromMetrics
}

// GetChainNetwork returns the current network, this can be useful for identifying the correct chain ids dependent on testnet or mainnet.
func (c DotEnvGlobalConfigProvider) GetChainNetwork() chain_network.ChainNetwork {
return c.chainNetwork
}

// NewDotEnvConfigProvider creates a new instance of DotEnvGlobalConfigProvider.
func NewDotEnvConfigProvider() *DotEnvGlobalConfigProvider {
_ = godotenv.Load()
Expand All @@ -108,7 +123,14 @@ func NewDotEnvConfigProvider() *DotEnvGlobalConfigProvider {
altruistRequestTimeoutDuration = defaultAltruistRequestTimeout
}

emitServiceUrlPromMetrics, err := strconv.ParseBool(getEnvVar(emitServiceUrlPromMetricsEnv, "false"))

if err != nil {
emitServiceUrlPromMetrics = false
}

return &DotEnvGlobalConfigProvider{
emitServiceUrlPromMetrics: emitServiceUrlPromMetrics,
poktRPCFullHost: getEnvVar(poktRPCFullHostEnv, ""),
httpServerPort: uint(httpServerPort),
poktRPCRequestTimeout: poktRPCTimeout,
Expand All @@ -117,6 +139,7 @@ func NewDotEnvConfigProvider() *DotEnvGlobalConfigProvider {
environmentStage: global_config.EnvironmentStage(getEnvVar(environmentStageEnv, "")),
poktApplicationsEncryptionKey: getEnvVar(poktApplicationsEncryptionKeyEnv, ""),
apiKey: getEnvVar(apiKey, ""),
chainNetwork: chain_network.ChainNetwork(getEnvVar(chainNetworkEnv, string(chain_network.MorseMainnet))),
altruistRequestTimeout: altruistRequestTimeoutDuration,
}
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/gateway_server/internal/controllers/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (c *RelayController) HandleRelay(ctx *fasthttp.RequestCtx) {
return
}

// getPathSegmented: returns the chain being requested and other parts to be proxied to pokt nodes
// Example: /relay/0001/v1/client, returns 0001, /v1/client
func getPathSegmented(path []byte) (chain, otherParts string) {
paths := strings.Split(string(path), "/")

Expand Down
2 changes: 1 addition & 1 deletion cmd/gateway_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func main() {
poktApplicationRegistry := apps_registry.NewCachedAppsRegistry(client, querier, gatewayConfigProvider, logger.Named("pokt_application_registry"))
chainConfigurationRegistry := chain_configurations_registry.NewCachedChainConfigurationRegistry(querier, logger.Named("chain_configurations_registry"))
sessionRegistry := session_registry.NewCachedSessionRegistryService(client, poktApplicationRegistry, sessionCache, nodeCache, logger.Named("session_registry"))
nodeSelectorService := node_selector_service.NewNodeSelectorService(sessionRegistry, client, chainConfigurationRegistry, logger.Named("node_selector"))
nodeSelectorService := node_selector_service.NewNodeSelectorService(sessionRegistry, client, chainConfigurationRegistry, gatewayConfigProvider, logger.Named("node_selector"))

relayer := relayer.NewRelayer(client, sessionRegistry, poktApplicationRegistry, nodeSelectorService, chainConfigurationRegistry, userAgent, gatewayConfigProvider, logger.Named("relayer"))

Expand Down
16 changes: 15 additions & 1 deletion docs/node-selection.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,24 @@ type CheckJob interface {
```
Under the hood, the NodeSelectorService is responsible for asynchronously executing all the initialized `CheckJobs`.

### Existing QoS checks:
- **Height Check:** The general flow would be:
1) Query all node operators height,
2) compares heights with other node operators within a specific threshold
3) filters out node operators that exceed the configurable block height tolerance.

- **Data Integrity Check:** The general flow would be:
1) Retrieve a unique block identifier (i.e block hash or total block tx count, etc) with a configurable block offset for randomness,
2) Query other node operators for the same block identifier
3) filter out other node operators that return a different identifier.

Some existing implementations of Checks can be found in:
1. [evm_data_integrity_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fevm_data_integrity_check%2Fevm_data_integrity_check.go)
2. [evm_height_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fevm_height_check%2Fevm_height_check.go)

3. [pokt_height_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fpokt_height_check%2Fpokt_height_check.go)
4. [pokt_data_integrity_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fpokt_data_integrity_check%2Fpokt_data_integrity_check.go)
5. [solana_height_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fsolana_height_check%2Fsolana_height_check.go)
6. [solana_data_integrity_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fsolana_data_integrity_check%2Fsolana_data_integrity_check.go)
### Adding custom QoS checks

Every custom check must conform to the `CheckJob` interface. The gateway server provides a base check:
Expand Down
23 changes: 12 additions & 11 deletions docs/quick-onboarding-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,18 @@ Create an encryption password for your app stake keys. This password will be use
Fill out the `.env` variables for the gateway server. This can be done by injecting environment variables directly or using a `.env` file.

### Env Variables Description
| Variable Name | Description | Example Value |
|-------------------------------------|-----------------------------------------------------|---------------------------------------------------------------------------------------------------|
| `POKT_RPC_FULL_HOST` | Used for dispatching sessions | `https://pokt-testnet-rpc.nodies.org` (a complimentary testnet dispatcher URL provided by Nodies) |
| `HTTP_SERVER_PORT` | Gateway server port | `8080` |
| `POKT_RPC_TIMEOUT` | Max response time for a POKT node to respond | `10s` |
| `ALTRUIST_REQUEST_TIMEOUT` | Max response time for an altruist backup to respond | `10s` |
| `ENVIRONMENT_STAGE` | Log verbosity | `development`, `production` |
| `SESSION_CACHE_TTL` | Duration for sessions to stay in cache | `75m` |
| `POKT_APPLICATIONS_ENCRYPTION_KEY` | User-generated encryption key | `a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6` |
| `DB_CONNECTION_URL` | PostgreSQL Database connection URL | `postgres://user:password@localhost:5432/postgres` |

| Variable Name | Description | Example Value |
|------------------------------------|------------------------------------------------------------|---------------------------------------------------------------------------------------------------|
| `POKT_RPC_FULL_HOST` | Used for dispatching sessions | `https://pokt-testnet-rpc.nodies.org` (a complimentary testnet dispatcher URL provided by Nodies) |
| `HTTP_SERVER_PORT` | Gateway server port | `8080` |
| `POKT_RPC_TIMEOUT` | Max response time for a POKT node to respond | `10s` |
| `ALTRUIST_REQUEST_TIMEOUT` | Max response time for an altruist backup to respond | `10s` |
| `ENVIRONMENT_STAGE` | Log verbosity | `development`, `production` |
| `SESSION_CACHE_TTL` | Duration for sessions to stay in cache | `75m` |
| `POKT_APPLICATIONS_ENCRYPTION_KEY` | User-generated encryption key | `a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6` |
| `DB_CONNECTION_URL` | PostgreSQL Database connection URL | `postgres://user:password@localhost:5432/postgres` |
| `EMIT_SERVICE_URL_PROM_METRICS` | Boolean flag to enable service url for relay metrics | `false`, `true` |
| `CHAIN_NETWORK` | Identifies which network the gateway server is running on. | `morse_mainnet`, `morse_testnet` |
See [.env.sample](..%2F.env.sample) for a sample.

## 4. Run Migration Script
Expand Down
8 changes: 8 additions & 0 deletions internal/chain_network/chain_network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package chain_network

type ChainNetwork string

const (
MorseMainnet ChainNetwork = "morse_mainnet"
MorseTestnet ChainNetwork = "morse_testnet"
)
15 changes: 14 additions & 1 deletion internal/global_config/config_provider.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package global_config

import "time"
import (
"github.com/pokt-network/gateway-server/internal/chain_network"
"time"
)

type EnvironmentStage string

Expand All @@ -14,6 +17,12 @@ type GlobalConfigProvider interface {
EnvironmentProvider
PoktNodeConfigProvider
AltruistConfigProvider
PromMetricsProvider
ChainNetworkProvider
}

type PromMetricsProvider interface {
ShouldEmitServiceUrlPromMetrics() bool
}

type SecretProvider interface {
Expand All @@ -37,3 +46,7 @@ type PoktNodeConfigProvider interface {
type AltruistConfigProvider interface {
GetAltruistRequestTimeout() time.Duration
}

type ChainNetworkProvider interface {
GetChainNetwork() chain_network.ChainNetwork
}
4 changes: 2 additions & 2 deletions internal/node_selector_service/checks/async_relay_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type nodeRelayResponse struct {
Error error
}

func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, payload string, method string) chan *nodeRelayResponse {
func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, payload string, method string, path string) chan *nodeRelayResponse {
// Define a channel to receive relay responses
relayResponses := make(chan *nodeRelayResponse, len(nodes))
var wg sync.WaitGroup
Expand All @@ -23,7 +23,7 @@ func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, pay
defer wg.Done()
relay, err := relayer.SendRelay(&relayer_models.SendRelayRequest{
Signer: node.GetAppStakeSigner(),
Payload: &relayer_models.Payload{Data: payload, Method: method},
Payload: &relayer_models.Payload{Data: payload, Method: method, Path: path},
Chain: node.GetChain(),
SelectedNodePubKey: node.GetPublicKey(),
Session: node.MorseSession,
Expand Down
130 changes: 130 additions & 0 deletions internal/node_selector_service/checks/data_integrity_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package checks

import (
"fmt"
"github.com/pokt-network/gateway-server/internal/node_selector_service/models"
"github.com/pokt-network/gateway-server/pkg/common"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
"time"
)

const (
// how often to check a node's data integrity
dataIntegrityNodeCheckInterval = time.Minute * 10

// penalty whenever a pocket node doesn't match other node providers responses
dataIntegrityTimePenalty = time.Minute * 15

// the look back we will use to determine which block number to do a data integrity against (latestBlockHeight - lookBack)
dataIntegrityHeightLookbackDefault = 25
)

type nodeHashRspPair struct {
node *models.QosNode
blockIdentifier string
}

type BlockHashParser func(response string) (string, error)

type GetBlockByNumberPayloadFmter func(blockToFind uint64) string

// PerformDataIntegrityCheck: is the default implementation of a data integrity check by:
func PerformDataIntegrityCheck(check *Check, calculatePayload GetBlockByNumberPayloadFmter, path string, retrieveBlockIdentifier BlockHashParser, logger *zap.Logger) {
// Find a node that has been reported as healthy to use as source of truth
sourceOfTruth := findRandomHealthyNode(check.NodeList)

// Node that is synced cannot be found, so we cannot run data integrity checks since we need a trusted source
if sourceOfTruth == nil {
logger.Sugar().Warnw("cannot find source of truth for data integrity check", "chain", check.NodeList[0].GetChain())
return
}

logger.Sugar().Infow("running default data integrity check", "chain", check.NodeList[0].GetChain())

// Map to count number of nodes that return blockHash -> counter
nodeResponseCounts := make(map[string]int)

var nodeResponsePairs []*nodeHashRspPair

// find a random block to search that nodes should have access too
blockNumberToSearch := sourceOfTruth.GetLastKnownHeight() - uint64(GetDataIntegrityHeightLookback(check.ChainConfiguration, sourceOfTruth.GetChain(), dataIntegrityHeightLookbackDefault))

attestationResponses := SendRelaysAsync(check.PocketRelayer, getEligibleDataIntegrityCheckNodes(check.NodeList), calculatePayload(blockNumberToSearch), "POST", path)
for rsp := range attestationResponses {

if rsp.Error != nil {
DefaultPunishNode(rsp.Error, rsp.Node, logger)
continue
}

blockIdentifier, err := retrieveBlockIdentifier(rsp.Relay.Response)
if err != nil {
logger.Sugar().Warnw("failed to unmarshal response", "err", err)
DefaultPunishNode(fasthttp.ErrTimeout, rsp.Node, logger)
continue
}

rsp.Node.SetLastDataIntegrityCheckTime(time.Now())
nodeResponsePairs = append(nodeResponsePairs, &nodeHashRspPair{
node: rsp.Node,
blockIdentifier: blockIdentifier,
})
nodeResponseCounts[blockIdentifier]++
}

majorityBlockIdentifier := findMajorityBlockIdentifier(nodeResponseCounts)

// Blcok blockIdentifier must not be empty
if majorityBlockIdentifier == "" {
return
}

// Penalize other node operators with a timeout if they don't attest with same block blockIdentifier.
for _, nodeResp := range nodeResponsePairs {
if nodeResp.blockIdentifier != majorityBlockIdentifier {
logger.Sugar().Errorw("punishing node for failed data integrity check", "node", nodeResp.node.MorseNode.ServiceUrl, "nodeBlockHash", nodeResp.blockIdentifier, "trustedSourceBlockHash", majorityBlockIdentifier)
nodeResp.node.SetTimeoutUntil(time.Now().Add(dataIntegrityTimePenalty), models.DataIntegrityTimeout, fmt.Errorf("nodeBlockHash %s, trustedSourceBlockHash %s", nodeResp.blockIdentifier, majorityBlockIdentifier))
}
}

}

// findRandomHealthyNode - returns a healthy node that is synced so we can use it as a source of truth for data integrity checks
func findRandomHealthyNode(nodes []*models.QosNode) *models.QosNode {
var healthyNodes []*models.QosNode
for _, node := range nodes {
if node.IsHealthy() {
healthyNodes = append(healthyNodes, node)
}
}
healthyNode, ok := common.GetRandomElement(healthyNodes)
if !ok {
return nil
}
return healthyNode
}

func getEligibleDataIntegrityCheckNodes(nodes []*models.QosNode) []*models.QosNode {
// Filter nodes based on last checked time
var eligibleNodes []*models.QosNode
for _, node := range nodes {
if (node.GetLastDataIntegrityCheckTime().IsZero() || time.Since(node.GetLastDataIntegrityCheckTime()) >= dataIntegrityNodeCheckInterval) && node.IsHealthy() {
eligibleNodes = append(eligibleNodes, node)
}
}
return eligibleNodes
}

// findMajorityBlockIdentifier finds the blockIdentifier with the highest response count
func findMajorityBlockIdentifier(responseCounts map[string]int) string {
var highestResponseIdentifier string
var highestResponseCount int
for rsp, count := range responseCounts {
if count > highestResponseCount {
highestResponseIdentifier = rsp
highestResponseCount = count
}
}
return highestResponseIdentifier
}
Loading
Loading