From 9e108b755c315649717ae2521cdd86222e4abf98 Mon Sep 17 00:00:00 2001 From: blade Date: Tue, 16 Apr 2024 19:56:01 -0500 Subject: [PATCH 01/14] add solana, pokt, moonbeam, and add emit prom metrics --- .env.sample | 3 +- .../config/dot_env_config_provider.go | 9 ++ .../internal/controllers/relay.go | 2 + docs/node-selection.md | 16 ++- docs/quick-onboarding-guide.md | 22 +-- internal/global_config/config_provider.go | 5 + .../checks/async_relay_handler.go | 4 +- .../checks/data_integrity_handler.go | 128 ++++++++++++++++++ .../evm_data_integrity_check.go | 122 ++--------------- .../evm_height_check/evm_height_check.go | 102 +------------- .../checks/height_check_handler.go | 109 +++++++++++++++ .../pokt_data_integrity_check.go | 71 ++++++++++ .../pokt_height_check/pokt_height_check.go | 63 +++++++++ .../solana_data_integrity_check.go | 70 ++++++++++ .../solana_height_check.go | 67 +++++++++ .../node_selector_service/models/qos_node.go | 9 +- .../node_selector_service.go | 8 ++ 17 files changed, 586 insertions(+), 224 deletions(-) create mode 100644 internal/node_selector_service/checks/data_integrity_handler.go create mode 100644 internal/node_selector_service/checks/height_check_handler.go create mode 100644 internal/node_selector_service/checks/pokt_data_integrity_check/pokt_data_integrity_check.go create mode 100644 internal/node_selector_service/checks/pokt_height_check/pokt_height_check.go create mode 100644 internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go create mode 100644 internal/node_selector_service/checks/solana_height_check/solana_height_check.go diff --git a/.env.sample b/.env.sample index d5394a7..6cedff1 100644 --- a/.env.sample +++ b/.env.sample @@ -6,4 +6,5 @@ POKT_APPLICATIONS_ENCRYPTION_KEY= SESSION_CACHE_TTL=75m DB_CONNECTION_URL=postgres://myuser:mypassword@postgres:5432/postgres?sslmode=disable API_KEY= -ALTRUIST_REQUEST_TIMEOUT=10s \ No newline at end of file +ALTRUIST_REQUEST_TIMEOUT=10s +EMIT_SERVICE_URL_PROM_METRICS=false \ No newline at end of file diff --git a/cmd/gateway_server/internal/config/dot_env_config_provider.go b/cmd/gateway_server/internal/config/dot_env_config_provider.go index e748dc6..dd165b9 100644 --- a/cmd/gateway_server/internal/config/dot_env_config_provider.go +++ b/cmd/gateway_server/internal/config/dot_env_config_provider.go @@ -15,6 +15,7 @@ const ( // Environment variable names const ( + emitServiceUrlPromMetricsEnv = "EMIT_SERVICE_URL_PROM_METRICS" poktRPCFullHostEnv = "POKT_RPC_FULL_HOST" httpServerPortEnv = "HTTP_SERVER_PORT" poktRPCTimeoutEnv = "POKT_RPC_TIMEOUT" @@ -36,6 +37,7 @@ type DotEnvGlobalConfigProvider struct { poktApplicationsEncryptionKey string databaseConnectionUrl string apiKey string + emitServiceUrlPromMetrics bool altruistRequestTimeout time.Duration } @@ -108,7 +110,14 @@ func NewDotEnvConfigProvider() *DotEnvGlobalConfigProvider { altruistRequestTimeoutDuration = defaultAltruistRequestTimeout } + emitServiceUrlPromMetrics, err := strconv.ParseBool(getEnvVar(emitServiceUrlPromMetricsEnv, "false")) + + if err != nil { + emitServiceUrlPromMetrics = false + } + return &DotEnvGlobalConfigProvider{ + emitServiceUrlPromMetrics: emitServiceUrlPromMetrics, poktRPCFullHost: getEnvVar(poktRPCFullHostEnv, ""), httpServerPort: uint(httpServerPort), poktRPCRequestTimeout: poktRPCTimeout, diff --git a/cmd/gateway_server/internal/controllers/relay.go b/cmd/gateway_server/internal/controllers/relay.go index 9ce12da..27faf71 100644 --- a/cmd/gateway_server/internal/controllers/relay.go +++ b/cmd/gateway_server/internal/controllers/relay.go @@ -57,6 +57,8 @@ func (c *RelayController) HandleRelay(ctx *fasthttp.RequestCtx) { return } +// getPathSegmented: returns the chain being requested and other parts to be proxied to pokt nodes +// Example: /relay/0001/v1/client, returns 0001, /v1/client func getPathSegmented(path []byte) (chain, otherParts string) { paths := strings.Split(string(path), "/") diff --git a/docs/node-selection.md b/docs/node-selection.md index 94db540..6a022f6 100644 --- a/docs/node-selection.md +++ b/docs/node-selection.md @@ -33,10 +33,24 @@ type CheckJob interface { ``` Under the hood, the NodeSelectorService is responsible for asynchronously executing all the initialized `CheckJobs`. +### Existing QoS checks: +- **Height Check:** The general flow would be: + 1) Query all node operators height, + 2) compares heights with other node operators within a specific threshold + 3) filters out node operators that exceed the configurable block height tolerance. + +- **Data Integrity Check:** The general flow would be: + 1) Retrieve a unique block identifier (i.e block hash or total block tx count, etc) with a configurable block offset for randomness, + 2) Query other node operators for the same block identifier + 3) filter out other node operators that return a different identifier. + Some existing implementations of Checks can be found in: 1. [evm_data_integrity_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fevm_data_integrity_check%2Fevm_data_integrity_check.go) 2. [evm_height_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fevm_height_check%2Fevm_height_check.go) - +3. [pokt_height_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fpokt_height_check%2Fpokt_height_check.go) +4. [pokt_data_integrity_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fpokt_data_integrity_check%2Fpokt_data_integrity_check.go) +5. [solana_height_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fsolana_height_check%2Fsolana_height_check.go) +6. [solana_data_integrity_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fsolana_data_integrity_check%2Fsolana_data_integrity_check.go) ### Adding custom QoS checks Every custom check must conform to the `CheckJob` interface. The gateway server provides a base check: diff --git a/docs/quick-onboarding-guide.md b/docs/quick-onboarding-guide.md index 04fa7dd..6cfe340 100644 --- a/docs/quick-onboarding-guide.md +++ b/docs/quick-onboarding-guide.md @@ -39,17 +39,17 @@ Create an encryption password for your app stake keys. This password will be use Fill out the `.env` variables for the gateway server. This can be done by injecting environment variables directly or using a `.env` file. ### Env Variables Description -| Variable Name | Description | Example Value | -|-------------------------------------|-----------------------------------------------------|---------------------------------------------------------------------------------------------------| -| `POKT_RPC_FULL_HOST` | Used for dispatching sessions | `https://pokt-testnet-rpc.nodies.org` (a complimentary testnet dispatcher URL provided by Nodies) | -| `HTTP_SERVER_PORT` | Gateway server port | `8080` | -| `POKT_RPC_TIMEOUT` | Max response time for a POKT node to respond | `10s` | -| `ALTRUIST_REQUEST_TIMEOUT` | Max response time for an altruist backup to respond | `10s` | -| `ENVIRONMENT_STAGE` | Log verbosity | `development`, `production` | -| `SESSION_CACHE_TTL` | Duration for sessions to stay in cache | `75m` | -| `POKT_APPLICATIONS_ENCRYPTION_KEY` | User-generated encryption key | `a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6` | -| `DB_CONNECTION_URL` | PostgreSQL Database connection URL | `postgres://user:password@localhost:5432/postgres` | - +| Variable Name | Description | Example Value | +|------------------------------------|-------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------| +| `POKT_RPC_FULL_HOST` | Used for dispatching sessions | `https://pokt-testnet-rpc.nodies.org` (a complimentary testnet dispatcher URL provided by Nodies) | +| `HTTP_SERVER_PORT` | Gateway server port | `8080` | +| `POKT_RPC_TIMEOUT` | Max response time for a POKT node to respond | `10s` | +| `ALTRUIST_REQUEST_TIMEOUT` | Max response time for an altruist backup to respond | `10s` | +| `ENVIRONMENT_STAGE` | Log verbosity | `development`, `production` | +| `SESSION_CACHE_TTL` | Duration for sessions to stay in cache | `75m` | +| `POKT_APPLICATIONS_ENCRYPTION_KEY` | User-generated encryption key | `a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6` | +| `DB_CONNECTION_URL` | PostgreSQL Database connection URL | `postgres://user:password@localhost:5432/postgres` | +| `EMIT_SERVICE_URL_PROM_METRICS` | Boolean flag to enable service url for relay metrics (disabled by default to prevent cardinality explosion) | `false`, `true` | See [.env.sample](..%2F.env.sample) for a sample. ## 4. Run Migration Script diff --git a/internal/global_config/config_provider.go b/internal/global_config/config_provider.go index 5037e56..cde832a 100644 --- a/internal/global_config/config_provider.go +++ b/internal/global_config/config_provider.go @@ -14,6 +14,11 @@ type GlobalConfigProvider interface { EnvironmentProvider PoktNodeConfigProvider AltruistConfigProvider + PromMetricsProvider +} + +type PromMetricsProvider interface { + ShouldEmitServiceUrl() bool } type SecretProvider interface { diff --git a/internal/node_selector_service/checks/async_relay_handler.go b/internal/node_selector_service/checks/async_relay_handler.go index a1ea26d..824da6c 100644 --- a/internal/node_selector_service/checks/async_relay_handler.go +++ b/internal/node_selector_service/checks/async_relay_handler.go @@ -13,7 +13,7 @@ type nodeRelayResponse struct { Error error } -func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, payload string, method string) chan *nodeRelayResponse { +func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, payload string, method string, path string) chan *nodeRelayResponse { // Define a channel to receive relay responses relayResponses := make(chan *nodeRelayResponse, len(nodes)) var wg sync.WaitGroup @@ -23,7 +23,7 @@ func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, pay defer wg.Done() relay, err := relayer.SendRelay(&relayer_models.SendRelayRequest{ Signer: node.GetAppStakeSigner(), - Payload: &relayer_models.Payload{Data: payload, Method: method}, + Payload: &relayer_models.Payload{Data: payload, Method: method, Path: path}, Chain: node.GetChain(), SelectedNodePubKey: node.GetPublicKey(), Session: node.MorseSession, diff --git a/internal/node_selector_service/checks/data_integrity_handler.go b/internal/node_selector_service/checks/data_integrity_handler.go new file mode 100644 index 0000000..04e7a93 --- /dev/null +++ b/internal/node_selector_service/checks/data_integrity_handler.go @@ -0,0 +1,128 @@ +package checks + +import ( + "fmt" + "github.com/pokt-network/gateway-server/internal/node_selector_service/models" + "github.com/pokt-network/gateway-server/pkg/common" + "github.com/valyala/fasthttp" + "go.uber.org/zap" + "time" +) + +const ( + // how often to check a node's data integrity + dataIntegrityNodeCheckInterval = time.Minute * 10 + + // penalty whenever a pocket node doesn't match other node providers responses + dataIntegrityTimePenalty = time.Minute * 15 + + // the look back we will use to determine which block number to do a data integrity against (latestBlockHeight - lookBack) + dataIntegrityHeightLookbackDefault = 25 +) + +type nodeHashRspPair struct { + node *models.QosNode + blockIdentifier string +} + +type BlockHashParser func(response string) (string, error) + +type GetBlockByNumberPayloadFmter func(blockToFind uint64) string + +// PerformDataIntegrityCheck: is the default implementation of a data integrity check by: +func PerformDataIntegrityCheck(check *Check, calculatePayload GetBlockByNumberPayloadFmter, path string, retrieveBlockHash BlockHashParser, logger *zap.Logger) { + // Find a node that has been reported as healthy to use as source of truth + sourceOfTruth := findRandomHealthyNode(check.NodeList) + + // Node that is synced cannot be found, so we cannot run data integrity checks since we need a trusted source + if sourceOfTruth == nil { + logger.Sugar().Warnw("cannot find source of truth for data integrity check", "chain", check.NodeList[0].GetChain()) + return + } + + // Map to count number of nodes that return blockHash -> counter + nodeResponseCounts := make(map[string]int) + + var nodeResponsePairs []*nodeHashRspPair + + // find a random block to search that nodes should have access too + blockNumberToSearch := sourceOfTruth.GetLastKnownHeight() - uint64(GetDataIntegrityHeightLookback(check.ChainConfiguration, sourceOfTruth.GetChain(), dataIntegrityHeightLookbackDefault)) + + attestationResponses := SendRelaysAsync(check.PocketRelayer, getEligibleDataIntegrityCheckNodes(check.NodeList), calculatePayload(blockNumberToSearch), "POST", path) + for rsp := range attestationResponses { + + if rsp.Error != nil { + DefaultPunishNode(rsp.Error, rsp.Node, logger) + continue + } + + hash, err := retrieveBlockHash(rsp.Relay.Response) + if err != nil { + logger.Sugar().Warnw("failed to unmarshal response", "err", err) + DefaultPunishNode(fasthttp.ErrTimeout, rsp.Node, logger) + continue + } + + rsp.Node.SetLastDataIntegrityCheckTime(time.Now()) + nodeResponsePairs = append(nodeResponsePairs, &nodeHashRspPair{ + node: rsp.Node, + blockIdentifier: hash, + }) + nodeResponseCounts[hash]++ + } + + majorityBlockIdentifier := findMajorityBlockIdentifier(nodeResponseCounts) + + // Blcok blockIdentifier must not be empty + if majorityBlockIdentifier == "" { + return + } + + // Penalize other node operators with a timeout if they don't attest with same block blockIdentifier. + for _, nodeResp := range nodeResponsePairs { + if nodeResp.blockIdentifier != majorityBlockIdentifier { + logger.Sugar().Errorw("punishing node for failed data integrity check", "node", nodeResp.node.MorseNode.ServiceUrl, "nodeBlockHash", nodeResp.blockIdentifier, "trustedSourceBlockHash", majorityBlockIdentifier) + nodeResp.node.SetTimeoutUntil(time.Now().Add(dataIntegrityTimePenalty), models.DataIntegrityTimeout, fmt.Errorf("nodeBlockHash %s, trustedSourceBlockHash %s", nodeResp.blockIdentifier, majorityBlockIdentifier)) + } + } + +} + +// findRandomHealthyNode - returns a healthy node that is synced so we can use it as a source of truth for data integrity checks +func findRandomHealthyNode(nodes []*models.QosNode) *models.QosNode { + var healthyNodes []*models.QosNode + for _, node := range nodes { + if node.IsHealthy() { + healthyNodes = append(healthyNodes, node) + } + } + healthyNode, ok := common.GetRandomElement(healthyNodes) + if !ok { + return nil + } + return healthyNode +} + +func getEligibleDataIntegrityCheckNodes(nodes []*models.QosNode) []*models.QosNode { + // Filter nodes based on last checked time + var eligibleNodes []*models.QosNode + for _, node := range nodes { + if (node.GetLastDataIntegrityCheckTime().IsZero() || time.Since(node.GetLastDataIntegrityCheckTime()) >= dataIntegrityNodeCheckInterval) && node.IsHealthy() { + eligibleNodes = append(eligibleNodes, node) + } + } + return eligibleNodes +} + +// findMajorityBlockIdentifier finds the blockIdentifier with the highest response count +func findMajorityBlockIdentifier(responseCounts map[string]int) string { + var highestResponseIdentifier string + var highestResponseCount int + for rsp, count := range responseCounts { + if count > highestResponseCount { + highestResponseIdentifier = rsp + highestResponseCount = count + } + } + return highestResponseIdentifier +} diff --git a/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go b/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go index 10f5c83..4c6c9f5 100644 --- a/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go +++ b/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go @@ -5,26 +5,15 @@ import ( "fmt" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" "github.com/pokt-network/gateway-server/internal/node_selector_service/models" - "github.com/pokt-network/gateway-server/pkg/common" - "github.com/valyala/fasthttp" "go.uber.org/zap" "strconv" "time" ) const ( - // penalty whenever a pocket node doesn't match other node providers responses - dataIntegrityTimePenalty = time.Minute * 15 - - // how often to check a node's data integrity - dataIntegrityNodeCheckInterval = time.Minute * 10 - // how often the job should run dataIntegrityCheckInterval = time.Second * 1 - // the look back we will use to determine which block number to do a data integrity against (latestBlockHeight - lookBack) - dataIntegrityHeightLookbackDefault = 25 - //json rpc payload to send a data integrity check blockPayloadFmt = `{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["%s", false],"id":1}` ) @@ -41,13 +30,17 @@ type EvmDataIntegrityCheck struct { logger *zap.Logger } -func NewEvmDataIntegrityCheck(check *checks.Check, logger *zap.Logger) *EvmDataIntegrityCheck { - return &EvmDataIntegrityCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} +func (c *EvmDataIntegrityCheck) getBlockHashFromNodeResponse(response string) (string, error) { + var evmRsp blockByNumberResponse + err := json.Unmarshal([]byte(response), &evmRsp) + if err != nil { + return "", err + } + return evmRsp.Result.Hash, nil } -type nodeResponsePair struct { - node *models.QosNode - result blockByNumberResponse +func NewEvmDataIntegrityCheck(check *checks.Check, logger *zap.Logger) *EvmDataIntegrityCheck { + return &EvmDataIntegrityCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} } func (c *EvmDataIntegrityCheck) Name() string { @@ -64,109 +57,14 @@ func (c *EvmDataIntegrityCheck) Perform() { if len(c.NodeList) == 0 || !c.NodeList[0].IsEvmChain() { return } - - // Find a node that has been reported as healthy to use as source of truth - sourceOfTruth := c.findRandomHealthyNode() - - // Node that is synced cannot be found, so we cannot run data integrity checks since we need a trusted source - if sourceOfTruth == nil { - c.logger.Sugar().Warnw("cannot find source of truth for data integrity check", "chain", c.NodeList[0].GetChain()) - return - } - - // Map to count number of nodes that return blockHash -> counter - nodeResponseCounts := make(map[string]int) - - var nodeResponsePairs []*nodeResponsePair - - // find a random block to search that nodes should have access too - blockNumberToSearch := sourceOfTruth.GetLastKnownHeight() - uint64(checks.GetDataIntegrityHeightLookback(c.ChainConfiguration, sourceOfTruth.GetChain(), dataIntegrityHeightLookbackDefault)) - - attestationResponses := checks.SendRelaysAsync(c.PocketRelayer, c.getEligibleNodes(), getBlockByNumberPayload(blockNumberToSearch), "POST") - for rsp := range attestationResponses { - - if rsp.Error != nil { - checks.DefaultPunishNode(rsp.Error, rsp.Node, c.logger) - continue - } - - var resp blockByNumberResponse - err := json.Unmarshal([]byte(rsp.Relay.Response), &resp) - if err != nil { - c.logger.Sugar().Warnw("failed to unmarshal response", "err", err) - checks.DefaultPunishNode(fasthttp.ErrTimeout, rsp.Node, c.logger) - continue - } - - rsp.Node.SetLastDataIntegrityCheckTime(time.Now()) - nodeResponsePairs = append(nodeResponsePairs, &nodeResponsePair{ - node: rsp.Node, - result: resp, - }) - nodeResponseCounts[resp.Result.Hash]++ - } - - majorityBlockHash := findMajorityBlockHash(nodeResponseCounts) - - // Blcok hash must not be empty - if majorityBlockHash == "" { - return - } - - // Penalize other node operators with a timeout if they don't attest with same block hash. - for _, nodeResp := range nodeResponsePairs { - if nodeResp.result.Result.Hash != majorityBlockHash { - c.logger.Sugar().Errorw("punishing node for failed data integrity check", "node", nodeResp.node.MorseNode.ServiceUrl, "nodeBlockHash", nodeResp.result.Result, "trustedSourceBlockHash", majorityBlockHash) - nodeResp.node.SetTimeoutUntil(time.Now().Add(dataIntegrityTimePenalty), models.DataIntegrityTimeout, fmt.Errorf("evmDataIntegrityCheck: nodeBlockHash %s, trustedSourceBlockHash %s", nodeResp.result.Result, majorityBlockHash)) - } - } - + checks.PerformDataIntegrityCheck(c.Check, getBlockByNumberPayload, "", c.getBlockHashFromNodeResponse, c.logger) c.nextCheckTime = time.Now().Add(dataIntegrityCheckInterval) } -// findMajorityBlockHash finds the hash with the highest response count -func findMajorityBlockHash(responseCounts map[string]int) string { - var highestResponseHash string - var highestResponseCount int - for rsp, count := range responseCounts { - if count > highestResponseCount { - highestResponseHash = rsp - highestResponseCount = count - } - } - return highestResponseHash -} - func (c *EvmDataIntegrityCheck) ShouldRun() bool { return c.nextCheckTime.IsZero() || time.Now().After(c.nextCheckTime) } -// findRandomHealthyNode - returns a healthy node that is synced so we can use it as a source of truth for data integrity checks -func (c *EvmDataIntegrityCheck) findRandomHealthyNode() *models.QosNode { - var healthyNodes []*models.QosNode - for _, node := range c.NodeList { - if node.IsHealthy() { - healthyNodes = append(healthyNodes, node) - } - } - healthyNode, ok := common.GetRandomElement(healthyNodes) - if !ok { - return nil - } - return healthyNode -} - -func (c *EvmDataIntegrityCheck) getEligibleNodes() []*models.QosNode { - // Filter nodes based on last checked time - var eligibleNodes []*models.QosNode - for _, node := range c.NodeList { - if (node.GetLastDataIntegrityCheckTime().IsZero() || time.Since(node.GetLastDataIntegrityCheckTime()) >= dataIntegrityNodeCheckInterval) && node.IsHealthy() { - eligibleNodes = append(eligibleNodes, node) - } - } - return eligibleNodes -} - func getBlockByNumberPayload(blockNumber uint64) string { return fmt.Sprintf(blockPayloadFmt, "0x"+strconv.FormatInt(int64(blockNumber), 16)) } diff --git a/internal/node_selector_service/checks/evm_height_check/evm_height_check.go b/internal/node_selector_service/checks/evm_height_check/evm_height_check.go index a74e132..59a3cb6 100644 --- a/internal/node_selector_service/checks/evm_height_check/evm_height_check.go +++ b/internal/node_selector_service/checks/evm_height_check/evm_height_check.go @@ -6,33 +6,19 @@ import ( "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" "github.com/pokt-network/gateway-server/internal/node_selector_service/models" "github.com/pquerna/ffjson/ffjson" - "github.com/valyala/fasthttp" "go.uber.org/zap" - "gonum.org/v1/gonum/stat" - "math" "strconv" "strings" "time" ) const ( - // zScore to remove outliers for determining highest height - zScoreHeightThreshold = 3 - - // interval to check a node height again - checkNodeHeightInterval = time.Minute * 5 // interval to run the evm height check evmHeightCheckInterval = time.Second * 1 - // penalty for being out of sync - evmHeightCheckPenalty = time.Minute * 5 - // jsonrpc payload to retrieve evm height heightJsonPayload = `{"jsonrpc":"2.0","method":"eth_blockNumber","params": [],"id":1}` - - // default height allowance - defaultHeightTolerance int = 100 ) type evmHeightResponse struct { @@ -84,49 +70,7 @@ func (c *EvmHeightCheck) Perform() { if len(c.NodeList) == 0 || !c.NodeList[0].IsEvmChain() { return } - - // Send request to all nodes - relayResponses := checks.SendRelaysAsync(c.PocketRelayer, c.NodeList, heightJsonPayload, "POST") - - var nodesResponded []*models.QosNode - // Process relay responses - for resp := range relayResponses { - - err := resp.Error - if err != nil { - checks.DefaultPunishNode(err, resp.Node, c.logger) - continue - } - - var evmHeightResp evmHeightResponse - err = json.Unmarshal([]byte(resp.Relay.Response), &evmHeightResp) - - if err != nil { - c.logger.Sugar().Warnw("failed to unmarshal response", "err", err) - // Treat a invalid response as a timeout error - checks.DefaultPunishNode(fasthttp.ErrTimeout, resp.Node, c.logger) - continue - } - - resp.Node.SetLastHeightCheckTime(time.Now()) - resp.Node.SetLastKnownHeight(evmHeightResp.Height) - nodesResponded = append(nodesResponded, resp.Node) - } - - highestNodeHeight := getHighestNodeHeight(nodesResponded) - // Compare each node's reported height against the highest reported height - for _, node := range nodesResponded { - heightDifference := int(highestNodeHeight - node.GetLastKnownHeight()) - // Penalize nodes whose reported height is significantly lower than the highest reported height - if heightDifference > checks.GetBlockHeightTolerance(c.ChainConfiguration, node.GetChain(), defaultHeightTolerance) { - c.logger.Sugar().Infow("node is out of sync", "node", node.MorseNode.ServiceUrl, "heightDifference", heightDifference, "nodeSyncedHeight", node.GetLastKnownHeight(), "highestNodeHeight", highestNodeHeight, "chain", node.GetChain()) - // Punish Node specifically due to timeout. - node.SetSynced(false) - node.SetTimeoutUntil(time.Now().Add(evmHeightCheckPenalty), models.OutOfSyncTimeout, fmt.Errorf("evmHeightCheck: heightDifference: %d, nodeSyncedHeight: %d, highestNodeHeight: %d", heightDifference, node.GetLastKnownHeight(), highestNodeHeight)) - } else { - node.SetSynced(true) - } - } + checks.PerformDefaultHeightCheck(c.Check, heightJsonPayload, "", c.getHeightFromNodeResponse, c.logger) c.nextCheckTime = time.Now().Add(evmHeightCheckInterval) } @@ -138,43 +82,11 @@ func (c *EvmHeightCheck) ShouldRun() bool { return time.Now().After(c.nextCheckTime) } -func (c *EvmHeightCheck) getEligibleNodes() []*models.QosNode { - // Filter nodes based on last checked time - var eligibleNodes []*models.QosNode - for _, node := range c.NodeList { - if node.GetLastHeightCheckTime().IsZero() || time.Since(node.GetLastHeightCheckTime()) >= checkNodeHeightInterval { - eligibleNodes = append(eligibleNodes, node) - } - } - return eligibleNodes -} - -// getHighestHeight returns the highest height reported from a slice of nodes -// by using z-score threshhold to prevent any misconfigured or malicious node -func getHighestNodeHeight(nodes []*models.QosNode) uint64 { - - var nodeHeights []float64 - for _, node := range nodes { - nodeHeights = append(nodeHeights, float64(node.GetLastKnownHeight())) - } - - // Calculate mean and standard deviation - meanValue := stat.Mean(nodeHeights, nil) - stdDevValue := stat.StdDev(nodeHeights, nil) - - var highestNodeHeight float64 - for _, nodeHeight := range nodeHeights { - - zScore := stat.StdScore(nodeHeight, meanValue, stdDevValue) - - // height is an outlier according to zScore threshold - if math.Abs(zScore) > zScoreHeightThreshold { - continue - } - // Height is higher than last recorded height - if nodeHeight > highestNodeHeight { - highestNodeHeight = nodeHeight - } +func (c *EvmHeightCheck) getHeightFromNodeResponse(response string) (uint64, error) { + var evmRsp evmHeightResponse + err := json.Unmarshal([]byte(response), &evmRsp) + if err != nil { + return 0, err } - return uint64(highestNodeHeight) + return evmRsp.Height, nil } diff --git a/internal/node_selector_service/checks/height_check_handler.go b/internal/node_selector_service/checks/height_check_handler.go new file mode 100644 index 0000000..cb5417a --- /dev/null +++ b/internal/node_selector_service/checks/height_check_handler.go @@ -0,0 +1,109 @@ +package checks + +import ( + "fmt" + "github.com/pokt-network/gateway-server/internal/node_selector_service/models" + "github.com/valyala/fasthttp" + "go.uber.org/zap" + "gonum.org/v1/gonum/stat" + "math" + "time" +) + +const ( + defaultNodeHeightCheckInterval = time.Minute * 5 + defaultZScoreHeightThreshold = 3 + defaultHeightTolerance int = 100 + defaultCheckPenalty = time.Minute * 5 +) + +type HeightJsonParser func(response string) (uint64, error) + +// PerformDefaultHeightCheck is the default implementation of a height check by: +// 0. Filtering out nodes that have not been checked since defaultNodeHeightCheckInterval +// 1. Sending height request via payload to all the nodes +// 2. Punishing all nodes that return an error +// 3. Filtering out nodes that are returning a height out of the zScore threshold +// 4. Punishing the nodes with defaultCheckPenalty that exceed the height tolerance. +func PerformDefaultHeightCheck(check *Check, payload string, path string, parseHeight HeightJsonParser, logger *zap.Logger) { + var nodesResponded []*models.QosNode + // Send request to all nodes + relayResponses := SendRelaysAsync(check.PocketRelayer, getEligibleHeightCheckNodes(check.NodeList), payload, "POST", path) + + // Process relay responses + for resp := range relayResponses { + err := resp.Error + if err != nil { + DefaultPunishNode(err, resp.Node, logger) + continue + } + + height, err := parseHeight(resp.Relay.Response) + if err != nil { + logger.Sugar().Warnw("failed to unmarshal response", "err", err) + // Treat an invalid response as a timeout error + DefaultPunishNode(fasthttp.ErrTimeout, resp.Node, logger) + continue + } + + resp.Node.SetLastHeightCheckTime(time.Now()) + resp.Node.SetLastKnownHeight(height) + nodesResponded = append(nodesResponded, resp.Node) + } + + highestNodeHeight := getHighestNodeHeight(nodesResponded, defaultZScoreHeightThreshold) + // Compare each node's reported height against the highest reported height + for _, node := range nodesResponded { + heightDifference := int(highestNodeHeight - node.GetLastKnownHeight()) + // Penalize nodes whose reported height is significantly lower than the highest reported height + if heightDifference > GetBlockHeightTolerance(check.ChainConfiguration, node.GetChain(), defaultHeightTolerance) { + logger.Sugar().Infow("node is out of sync", "node", node.MorseNode.ServiceUrl, "heightDifference", heightDifference, "nodeSyncedHeight", node.GetLastKnownHeight(), "highestNodeHeight", highestNodeHeight, "chain", node.GetChain()) + // Punish Node specifically due to timeout. + node.SetSynced(false) + node.SetTimeoutUntil(time.Now().Add(defaultCheckPenalty), models.OutOfSyncTimeout, fmt.Errorf("heightDifference: %d, nodeSyncedHeight: %d, highestNodeHeight: %d", heightDifference, node.GetLastKnownHeight(), highestNodeHeight)) + } else { + node.SetSynced(true) + } + } +} + +// getHighestHeight returns the highest height reported from a slice of nodes +// by using z-score threshhold to prevent any misconfigured or malicious node +func getHighestNodeHeight(nodes []*models.QosNode, zScoreHeightThreshhold float64) uint64 { + + var nodeHeights []float64 + for _, node := range nodes { + nodeHeights = append(nodeHeights, float64(node.GetLastKnownHeight())) + } + + // Calculate mean and standard deviation + meanValue := stat.Mean(nodeHeights, nil) + stdDevValue := stat.StdDev(nodeHeights, nil) + + var highestNodeHeight float64 + for _, nodeHeight := range nodeHeights { + + zScore := stat.StdScore(nodeHeight, meanValue, stdDevValue) + + // height is an outlier according to zScore threshold + if math.Abs(zScore) > zScoreHeightThreshhold { + continue + } + // Height is higher than last recorded height + if nodeHeight > highestNodeHeight { + highestNodeHeight = nodeHeight + } + } + return uint64(highestNodeHeight) +} + +func getEligibleHeightCheckNodes(nodes []*models.QosNode) []*models.QosNode { + // Filter nodes based on last checked time + var eligibleNodes []*models.QosNode + for _, node := range nodes { + if node.GetLastHeightCheckTime().IsZero() || time.Since(node.GetLastHeightCheckTime()) >= defaultNodeHeightCheckInterval { + eligibleNodes = append(eligibleNodes, node) + } + } + return eligibleNodes +} diff --git a/internal/node_selector_service/checks/pokt_data_integrity_check/pokt_data_integrity_check.go b/internal/node_selector_service/checks/pokt_data_integrity_check/pokt_data_integrity_check.go new file mode 100644 index 0000000..af5111e --- /dev/null +++ b/internal/node_selector_service/checks/pokt_data_integrity_check/pokt_data_integrity_check.go @@ -0,0 +1,71 @@ +package pokt_data_integrity_check + +import ( + "encoding/json" + "fmt" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" + "github.com/pokt-network/gateway-server/internal/node_selector_service/models" + "go.uber.org/zap" + "time" +) + +const poktBlockTxEndpoint = "/v1/query/blocktxs" + +const ( + // how often the job should run + dataIntegrityCheckInterval = time.Second * 1 + + //json rpc payload to send a data integrity check + blockPayloadFmt = `{"height": %d}` +) + +type blockTxResponse struct { + TotalTxs int `json:"total_txs"` +} + +type PoktDataIntegrityCheck struct { + *checks.Check + nextCheckTime time.Time + logger *zap.Logger +} + +// getBlockIdentifierFromNodeResponse: We use total txs as the block identifier because retrieving block hash from POKT RPC can lead up to +// 8MB+ payloads per node operator response, whereas blocktxs is only ~110kb +func (c *PoktDataIntegrityCheck) getBlockIdentifierFromNodeResponse(response string) (string, error) { + var blockTxRsp blockTxResponse + err := json.Unmarshal([]byte(response), &blockTxRsp) + if err != nil { + return "", err + } + return fmt.Sprintf("%d", blockTxRsp.TotalTxs), nil +} + +func NewPoktDataIntegrityCheck(check *checks.Check, logger *zap.Logger) *PoktDataIntegrityCheck { + return &PoktDataIntegrityCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} +} + +func (c *PoktDataIntegrityCheck) Name() string { + return "pokt_data_integrity_check" +} + +func (c *PoktDataIntegrityCheck) SetNodes(nodes []*models.QosNode) { + c.NodeList = nodes +} + +func (c *PoktDataIntegrityCheck) Perform() { + + // Session is not meant for POKT + if len(c.NodeList) == 0 || !c.NodeList[0].IsPoktChain() { + return + } + checks.PerformDataIntegrityCheck(c.Check, getBlockByNumberPayload, poktBlockTxEndpoint, c.getBlockIdentifierFromNodeResponse, c.logger) + c.nextCheckTime = time.Now().Add(dataIntegrityCheckInterval) +} + +func (c *PoktDataIntegrityCheck) ShouldRun() bool { + return c.nextCheckTime.IsZero() || time.Now().After(c.nextCheckTime) +} + +func getBlockByNumberPayload(blockNumber uint64) string { + return fmt.Sprintf(blockPayloadFmt, blockNumber) +} diff --git a/internal/node_selector_service/checks/pokt_height_check/pokt_height_check.go b/internal/node_selector_service/checks/pokt_height_check/pokt_height_check.go new file mode 100644 index 0000000..bc4e83f --- /dev/null +++ b/internal/node_selector_service/checks/pokt_height_check/pokt_height_check.go @@ -0,0 +1,63 @@ +package pokt_height_check + +import ( + "encoding/json" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" + "github.com/pokt-network/gateway-server/internal/node_selector_service/models" + "go.uber.org/zap" + "time" +) + +const ( + + // interval to run the pokt height check job + poktHeightCheckInterval = time.Second * 1 + + // jsonrpc payload to pokt evm height + heightJsonPayload = `` +) + +type poktHeightResponse struct { + Height uint64 `json:"height"` +} + +type PoktHeightCheck struct { + *checks.Check + nextCheckTime time.Time + logger *zap.Logger +} + +func NewPoktHeightCheck(check *checks.Check, logger *zap.Logger) *PoktHeightCheck { + return &PoktHeightCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} +} + +func (c *PoktHeightCheck) Name() string { + return "pokt_height_check" +} + +func (c *PoktHeightCheck) Perform() { + + // Session is not meant for EVM + if len(c.NodeList) == 0 || !c.NodeList[0].IsPoktChain() { + return + } + checks.PerformDefaultHeightCheck(c.Check, heightJsonPayload, "/v1/query/height", c.getHeightFromNodeResponse, c.logger) + c.nextCheckTime = time.Now().Add(poktHeightCheckInterval) +} + +func (c *PoktHeightCheck) SetNodes(nodes []*models.QosNode) { + c.NodeList = nodes +} + +func (c *PoktHeightCheck) ShouldRun() bool { + return time.Now().After(c.nextCheckTime) +} + +func (c *PoktHeightCheck) getHeightFromNodeResponse(response string) (uint64, error) { + var poktRsp poktHeightResponse + err := json.Unmarshal([]byte(response), &poktRsp) + if err != nil { + return 0, err + } + return poktRsp.Height, nil +} diff --git a/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go b/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go new file mode 100644 index 0000000..a2567ad --- /dev/null +++ b/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go @@ -0,0 +1,70 @@ +package solana_data_integrity_check + +import ( + "encoding/json" + "fmt" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" + "github.com/pokt-network/gateway-server/internal/node_selector_service/models" + "go.uber.org/zap" + "strconv" + "time" +) + +const ( + // how often the job should run + dataIntegrityCheckInterval = time.Second * 1 + + //json rpc payload to send a data integrity check + blockPayloadFmt = `{"jsonrpc":"2.0","method":"getBlock","params":[%s, {"encoding": "json"}],"id":1}` +) + +type blockByNumberResponse struct { + Result struct { + Hash string `json:"blockhash"` + } `json:"result"` +} + +type SolanaDataIntegrityCheck struct { + *checks.Check + nextCheckTime time.Time + logger *zap.Logger +} + +func (c *SolanaDataIntegrityCheck) getBlockIdentifierFromNodeResponse(response string) (string, error) { + var blockRsp blockByNumberResponse + err := json.Unmarshal([]byte(response), &blockRsp) + if err != nil { + return "", err + } + return blockRsp.Result.Hash, nil +} + +func NewSolanaDataIntegrityCheck(check *checks.Check, logger *zap.Logger) *SolanaDataIntegrityCheck { + return &SolanaDataIntegrityCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} +} + +func (c *SolanaDataIntegrityCheck) Name() string { + return "solana_data_integrity_check" +} + +func (c *SolanaDataIntegrityCheck) SetNodes(nodes []*models.QosNode) { + c.NodeList = nodes +} + +func (c *SolanaDataIntegrityCheck) Perform() { + + // Session is not meant for Solana + if len(c.NodeList) == 0 || !c.NodeList[0].IsSolanaChain() { + return + } + checks.PerformDataIntegrityCheck(c.Check, getBlockByNumberPayload, "", c.getBlockIdentifierFromNodeResponse, c.logger) + c.nextCheckTime = time.Now().Add(dataIntegrityCheckInterval) +} + +func (c *SolanaDataIntegrityCheck) ShouldRun() bool { + return c.nextCheckTime.IsZero() || time.Now().After(c.nextCheckTime) +} + +func getBlockByNumberPayload(blockNumber uint64) string { + return fmt.Sprintf(blockPayloadFmt, "0x"+strconv.FormatInt(int64(blockNumber), 16)) +} diff --git a/internal/node_selector_service/checks/solana_height_check/solana_height_check.go b/internal/node_selector_service/checks/solana_height_check/solana_height_check.go new file mode 100644 index 0000000..cbd646d --- /dev/null +++ b/internal/node_selector_service/checks/solana_height_check/solana_height_check.go @@ -0,0 +1,67 @@ +package solana_height_check + +import ( + "encoding/json" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" + "github.com/pokt-network/gateway-server/internal/node_selector_service/models" + "go.uber.org/zap" + "time" +) + +const ( + + // interval to run the solana height check + solanaHeightCheckInterval = time.Second * 1 + + // jsonrpc payload to retrieve solana height + heightJsonPayload = `{"jsonrpc":"2.0","method":"getSlot","params": [],"id":1}` +) + +type solanaHeightResponse struct { + Result struct { + Slot uint64 `json:"slot"` + } `json:"result"` +} + +type SolanaHeightCheck struct { + *checks.Check + nextCheckTime time.Time + logger *zap.Logger +} + +func NewSolanaHeightCheck(check *checks.Check, logger *zap.Logger) *SolanaHeightCheck { + return &SolanaHeightCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} +} + +func (c *SolanaHeightCheck) Name() string { + return "solana_height_check" +} + +func (c *SolanaHeightCheck) Perform() { + + // Session is not meant for Solana + if len(c.NodeList) == 0 || !c.NodeList[0].IsSolanaChain() { + return + } + + checks.PerformDefaultHeightCheck(c.Check, heightJsonPayload, "", c.getHeightFromNodeResponse, c.logger) + + c.nextCheckTime = time.Now().Add(solanaHeightCheckInterval) +} + +func (c *SolanaHeightCheck) SetNodes(nodes []*models.QosNode) { + c.NodeList = nodes +} + +func (c *SolanaHeightCheck) ShouldRun() bool { + return time.Now().After(c.nextCheckTime) +} + +func (c *SolanaHeightCheck) getHeightFromNodeResponse(response string) (uint64, error) { + var solanaRsp solanaHeightResponse + err := json.Unmarshal([]byte(response), &solanaRsp) + if err != nil { + return 0, err + } + return solanaRsp.Result.Slot, nil +} diff --git a/internal/node_selector_service/models/qos_node.go b/internal/node_selector_service/models/qos_node.go index 7f5011b..3c7c60e 100644 --- a/internal/node_selector_service/models/qos_node.go +++ b/internal/node_selector_service/models/qos_node.go @@ -18,6 +18,7 @@ const ( const ( chainSolanaCustom = "C006" + chainPokt = "0001" chainSolana = "0006" ) const ( @@ -69,7 +70,7 @@ type QosNode struct { } func NewQosNode(morseNode *models.Node, pocketSession *models.Session, appSigner *models.Ed25519Account) *QosNode { - return &QosNode{MorseNode: morseNode, MorseSession: pocketSession, MorseSigner: appSigner, LatencyTracker: &LatencyTracker{tDigest: tdigest.NewWithCompression(1000)}} + return &QosNode{MorseNode: morseNode, MorseSession: pocketSession, MorseSigner: appSigner, LatencyTracker: &LatencyTracker{tDigest: tdigest.NewWithCompression(latencyCompression)}} } func (n *QosNode) IsHealthy() bool { @@ -134,8 +135,12 @@ func (n *QosNode) IsSolanaChain() bool { return chainId == chainSolana || chainId == chainSolanaCustom } +func (n *QosNode) IsPoktChain() bool { + return n.GetChain() == chainPokt +} + func (n *QosNode) IsEvmChain() bool { - return !n.IsSolanaChain() + return !n.IsSolanaChain() && !n.IsPoktChain() } func (n *QosNode) GetTimeoutReason() TimeoutReason { diff --git a/internal/node_selector_service/node_selector_service.go b/internal/node_selector_service/node_selector_service.go index 99c7d46..a70df79 100644 --- a/internal/node_selector_service/node_selector_service.go +++ b/internal/node_selector_service/node_selector_service.go @@ -5,6 +5,10 @@ import ( "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/evm_data_integrity_check" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/evm_height_check" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/pokt_data_integrity_check" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/pokt_height_check" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/solana_data_integrity_check" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/solana_height_check" "github.com/pokt-network/gateway-server/internal/node_selector_service/models" "github.com/pokt-network/gateway-server/internal/session_registry" "github.com/pokt-network/gateway-server/pkg/common" @@ -38,6 +42,10 @@ func NewNodeSelectorService(sessionRegistry session_registry.SessionRegistryServ enabledChecks := []checks.CheckJob{ evm_height_check.NewEvmHeightCheck(baseCheck, logger.Named("evm_height_checker")), evm_data_integrity_check.NewEvmDataIntegrityCheck(baseCheck, logger.Named("evm_data_integrity_checker")), + solana_height_check.NewSolanaHeightCheck(baseCheck, logger.Named("solana_height_check")), + solana_data_integrity_check.NewSolanaDataIntegrityCheck(baseCheck, logger.Named("solana_data_integrity_check")), + pokt_height_check.NewPoktHeightCheck(baseCheck, logger.Named("pokt_height_check")), + pokt_data_integrity_check.NewPoktDataIntegrityCheck(baseCheck, logger.Named("pokt_data_integrity_check")), } selectorService := &NodeSelectorClient{ sessionRegistry: sessionRegistry, From ed1337c491d718e606e5164d1520eae69fcc32af Mon Sep 17 00:00:00 2001 From: blade Date: Fri, 19 Apr 2024 12:14:12 -0500 Subject: [PATCH 02/14] Fix build --- .../internal/config/dot_env_config_provider.go | 5 +++++ internal/global_config/config_provider.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/gateway_server/internal/config/dot_env_config_provider.go b/cmd/gateway_server/internal/config/dot_env_config_provider.go index dd165b9..edef2e0 100644 --- a/cmd/gateway_server/internal/config/dot_env_config_provider.go +++ b/cmd/gateway_server/internal/config/dot_env_config_provider.go @@ -85,6 +85,11 @@ func (c DotEnvGlobalConfigProvider) GetAltruistRequestTimeout() time.Duration { return c.altruistRequestTimeout } +// ShouldEmitServiceUrl returns whether to emit service url tags as part of relay metrics. +func (c DotEnvGlobalConfigProvider) ShouldEmitServiceUrlPromMetrics() bool { + return c.emitServiceUrlPromMetrics +} + // NewDotEnvConfigProvider creates a new instance of DotEnvGlobalConfigProvider. func NewDotEnvConfigProvider() *DotEnvGlobalConfigProvider { _ = godotenv.Load() diff --git a/internal/global_config/config_provider.go b/internal/global_config/config_provider.go index cde832a..9e3c829 100644 --- a/internal/global_config/config_provider.go +++ b/internal/global_config/config_provider.go @@ -18,7 +18,7 @@ type GlobalConfigProvider interface { } type PromMetricsProvider interface { - ShouldEmitServiceUrl() bool + ShouldEmitServiceUrlPromMetrics() bool } type SecretProvider interface { From 6a9933e824f0f495b768f201c8ce03c0f33d8aae Mon Sep 17 00:00:00 2001 From: blade Date: Fri, 19 Apr 2024 12:15:59 -0500 Subject: [PATCH 03/14] Regenraitng mocks --- mocks/apps_registry/app_registry_mock.go | 1 - .../chain_configurations_registry_mock.go | 1 - mocks/global_config/config_provider.go | 46 ++++++++++++++++++- mocks/node_selector/node_selector_mock.go | 1 - mocks/pocket_service/pocket_service_mock.go | 1 - .../session_registry/session_registry_mock.go | 1 - .../ttl_cache_service_mock.go | 32 +++++++++++++ 7 files changed, 77 insertions(+), 6 deletions(-) diff --git a/mocks/apps_registry/app_registry_mock.go b/mocks/apps_registry/app_registry_mock.go index b0aa744..023d53c 100644 --- a/mocks/apps_registry/app_registry_mock.go +++ b/mocks/apps_registry/app_registry_mock.go @@ -4,7 +4,6 @@ package apps_registry_mock import ( models "github.com/pokt-network/gateway-server/internal/apps_registry/models" - mock "github.com/stretchr/testify/mock" ) diff --git a/mocks/chain_configurations_registry/chain_configurations_registry_mock.go b/mocks/chain_configurations_registry/chain_configurations_registry_mock.go index aba5a8b..4a783d4 100644 --- a/mocks/chain_configurations_registry/chain_configurations_registry_mock.go +++ b/mocks/chain_configurations_registry/chain_configurations_registry_mock.go @@ -4,7 +4,6 @@ package chain_configurations_registry_mock import ( db_query "github.com/pokt-network/gateway-server/internal/db_query" - mock "github.com/stretchr/testify/mock" ) diff --git a/mocks/global_config/config_provider.go b/mocks/global_config/config_provider.go index b353e51..72d3866 100644 --- a/mocks/global_config/config_provider.go +++ b/mocks/global_config/config_provider.go @@ -4,7 +4,6 @@ package global_config_mock import ( global_config "github.com/pokt-network/gateway-server/internal/global_config" - mock "github.com/stretchr/testify/mock" time "time" @@ -338,6 +337,51 @@ func (_c *GlobalConfigProvider_GetPoktRPCRequestTimeout_Call) RunAndReturn(run f return _c } +// ShouldEmitServiceUrlPromMetrics provides a mock function with given fields: +func (_m *GlobalConfigProvider) ShouldEmitServiceUrlPromMetrics() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ShouldEmitServiceUrlPromMetrics") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShouldEmitServiceUrlPromMetrics' +type GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call struct { + *mock.Call +} + +// ShouldEmitServiceUrlPromMetrics is a helper method to define mock.On call +func (_e *GlobalConfigProvider_Expecter) ShouldEmitServiceUrlPromMetrics() *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call { + return &GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call{Call: _e.mock.On("ShouldEmitServiceUrlPromMetrics")} +} + +func (_c *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call) Run(run func()) *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call) Return(_a0 bool) *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call) RunAndReturn(run func() bool) *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call { + _c.Call.Return(run) + return _c +} + // NewGlobalConfigProvider creates a new instance of GlobalConfigProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewGlobalConfigProvider(t interface { diff --git a/mocks/node_selector/node_selector_mock.go b/mocks/node_selector/node_selector_mock.go index 155dacd..041d16a 100644 --- a/mocks/node_selector/node_selector_mock.go +++ b/mocks/node_selector/node_selector_mock.go @@ -4,7 +4,6 @@ package node_selector_mock import ( models "github.com/pokt-network/gateway-server/internal/node_selector_service/models" - mock "github.com/stretchr/testify/mock" ) diff --git a/mocks/pocket_service/pocket_service_mock.go b/mocks/pocket_service/pocket_service_mock.go index 589dba9..c906a03 100644 --- a/mocks/pocket_service/pocket_service_mock.go +++ b/mocks/pocket_service/pocket_service_mock.go @@ -4,7 +4,6 @@ package pocket_service_mock import ( models "github.com/pokt-network/gateway-server/pkg/pokt/pokt_v0/models" - mock "github.com/stretchr/testify/mock" ) diff --git a/mocks/session_registry/session_registry_mock.go b/mocks/session_registry/session_registry_mock.go index 1bff02e..60ca1f2 100644 --- a/mocks/session_registry/session_registry_mock.go +++ b/mocks/session_registry/session_registry_mock.go @@ -5,7 +5,6 @@ package session_registry_mock import ( models "github.com/pokt-network/gateway-server/internal/node_selector_service/models" pokt_v0models "github.com/pokt-network/gateway-server/pkg/pokt/pokt_v0/models" - mock "github.com/stretchr/testify/mock" session_registry "github.com/pokt-network/gateway-server/internal/session_registry" diff --git a/mocks/ttl_cache_service/ttl_cache_service_mock.go b/mocks/ttl_cache_service/ttl_cache_service_mock.go index 9d0a441..da85834 100644 --- a/mocks/ttl_cache_service/ttl_cache_service_mock.go +++ b/mocks/ttl_cache_service/ttl_cache_service_mock.go @@ -23,6 +23,38 @@ func (_m *TTLCacheService[K, V]) EXPECT() *TTLCacheService_Expecter[K, V] { return &TTLCacheService_Expecter[K, V]{mock: &_m.Mock} } +// DeleteExpired provides a mock function with given fields: +func (_m *TTLCacheService[K, V]) DeleteExpired() { + _m.Called() +} + +// TTLCacheService_DeleteExpired_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteExpired' +type TTLCacheService_DeleteExpired_Call[K comparable, V interface{}] struct { + *mock.Call +} + +// DeleteExpired is a helper method to define mock.On call +func (_e *TTLCacheService_Expecter[K, V]) DeleteExpired() *TTLCacheService_DeleteExpired_Call[K, V] { + return &TTLCacheService_DeleteExpired_Call[K, V]{Call: _e.mock.On("DeleteExpired")} +} + +func (_c *TTLCacheService_DeleteExpired_Call[K, V]) Run(run func()) *TTLCacheService_DeleteExpired_Call[K, V] { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *TTLCacheService_DeleteExpired_Call[K, V]) Return() *TTLCacheService_DeleteExpired_Call[K, V] { + _c.Call.Return() + return _c +} + +func (_c *TTLCacheService_DeleteExpired_Call[K, V]) RunAndReturn(run func()) *TTLCacheService_DeleteExpired_Call[K, V] { + _c.Call.Return(run) + return _c +} + // Get provides a mock function with given fields: key, opts func (_m *TTLCacheService[K, V]) Get(key K, opts ...ttlcache.Option[K, V]) *ttlcache.Item[K, V] { _va := make([]interface{}, len(opts)) From 2e8d8f149ff1f533b5f192e0e78d8ded2451642d Mon Sep 17 00:00:00 2001 From: blade Date: Fri, 19 Apr 2024 21:50:07 -0500 Subject: [PATCH 04/14] Fix solana qos check and data integrity check --- .../checks/data_integrity_handler.go | 6 ++++-- .../checks/height_check_handler.go | 3 +++ .../solana_data_integrity_check.go | 6 +++--- .../solana_height_check/solana_height_check.go | 8 ++------ .../node_selector_service.go | 15 +++++---------- 5 files changed, 17 insertions(+), 21 deletions(-) diff --git a/internal/node_selector_service/checks/data_integrity_handler.go b/internal/node_selector_service/checks/data_integrity_handler.go index 04e7a93..45331dd 100644 --- a/internal/node_selector_service/checks/data_integrity_handler.go +++ b/internal/node_selector_service/checks/data_integrity_handler.go @@ -30,7 +30,7 @@ type BlockHashParser func(response string) (string, error) type GetBlockByNumberPayloadFmter func(blockToFind uint64) string // PerformDataIntegrityCheck: is the default implementation of a data integrity check by: -func PerformDataIntegrityCheck(check *Check, calculatePayload GetBlockByNumberPayloadFmter, path string, retrieveBlockHash BlockHashParser, logger *zap.Logger) { +func PerformDataIntegrityCheck(check *Check, calculatePayload GetBlockByNumberPayloadFmter, path string, retrieveBlockIdentifier BlockHashParser, logger *zap.Logger) { // Find a node that has been reported as healthy to use as source of truth sourceOfTruth := findRandomHealthyNode(check.NodeList) @@ -40,6 +40,8 @@ func PerformDataIntegrityCheck(check *Check, calculatePayload GetBlockByNumberPa return } + logger.Sugar().Infow("running default data integrity check", "chain", check.NodeList[0].GetChain()) + // Map to count number of nodes that return blockHash -> counter nodeResponseCounts := make(map[string]int) @@ -56,7 +58,7 @@ func PerformDataIntegrityCheck(check *Check, calculatePayload GetBlockByNumberPa continue } - hash, err := retrieveBlockHash(rsp.Relay.Response) + hash, err := retrieveBlockIdentifier(rsp.Relay.Response) if err != nil { logger.Sugar().Warnw("failed to unmarshal response", "err", err) DefaultPunishNode(fasthttp.ErrTimeout, rsp.Node, logger) diff --git a/internal/node_selector_service/checks/height_check_handler.go b/internal/node_selector_service/checks/height_check_handler.go index cb5417a..b61fc69 100644 --- a/internal/node_selector_service/checks/height_check_handler.go +++ b/internal/node_selector_service/checks/height_check_handler.go @@ -26,6 +26,9 @@ type HeightJsonParser func(response string) (uint64, error) // 3. Filtering out nodes that are returning a height out of the zScore threshold // 4. Punishing the nodes with defaultCheckPenalty that exceed the height tolerance. func PerformDefaultHeightCheck(check *Check, payload string, path string, parseHeight HeightJsonParser, logger *zap.Logger) { + + logger.Sugar().Infow("running default height check", "chain", check.NodeList[0].GetChain()) + var nodesResponded []*models.QosNode // Send request to all nodes relayResponses := SendRelaysAsync(check.PocketRelayer, getEligibleHeightCheckNodes(check.NodeList), payload, "POST", path) diff --git a/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go b/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go index a2567ad..1709647 100644 --- a/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go +++ b/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go @@ -6,7 +6,6 @@ import ( "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" "github.com/pokt-network/gateway-server/internal/node_selector_service/models" "go.uber.org/zap" - "strconv" "time" ) @@ -15,7 +14,8 @@ const ( dataIntegrityCheckInterval = time.Second * 1 //json rpc payload to send a data integrity check - blockPayloadFmt = `{"jsonrpc":"2.0","method":"getBlock","params":[%s, {"encoding": "json"}],"id":1}` + // we use signatures for transaction detail to prevent large payloads and we don't need anything but block hash + blockPayloadFmt = `{"jsonrpc":"2.0","method":"getBlock","params":[%d, {"encoding": "jsonParsed", "maxSupportedTransactionVersion":0, "transactionDetails":"signatures"}],"id":1}` ) type blockByNumberResponse struct { @@ -66,5 +66,5 @@ func (c *SolanaDataIntegrityCheck) ShouldRun() bool { } func getBlockByNumberPayload(blockNumber uint64) string { - return fmt.Sprintf(blockPayloadFmt, "0x"+strconv.FormatInt(int64(blockNumber), 16)) + return fmt.Sprintf(blockPayloadFmt, blockNumber) } diff --git a/internal/node_selector_service/checks/solana_height_check/solana_height_check.go b/internal/node_selector_service/checks/solana_height_check/solana_height_check.go index cbd646d..cc22bdc 100644 --- a/internal/node_selector_service/checks/solana_height_check/solana_height_check.go +++ b/internal/node_selector_service/checks/solana_height_check/solana_height_check.go @@ -18,9 +18,7 @@ const ( ) type solanaHeightResponse struct { - Result struct { - Slot uint64 `json:"slot"` - } `json:"result"` + Result uint64 `json:"result"` } type SolanaHeightCheck struct { @@ -43,9 +41,7 @@ func (c *SolanaHeightCheck) Perform() { if len(c.NodeList) == 0 || !c.NodeList[0].IsSolanaChain() { return } - checks.PerformDefaultHeightCheck(c.Check, heightJsonPayload, "", c.getHeightFromNodeResponse, c.logger) - c.nextCheckTime = time.Now().Add(solanaHeightCheckInterval) } @@ -63,5 +59,5 @@ func (c *SolanaHeightCheck) getHeightFromNodeResponse(response string) (uint64, if err != nil { return 0, err } - return solanaRsp.Result.Slot, nil + return solanaRsp.Result, nil } diff --git a/internal/node_selector_service/node_selector_service.go b/internal/node_selector_service/node_selector_service.go index a70df79..8430eb4 100644 --- a/internal/node_selector_service/node_selector_service.go +++ b/internal/node_selector_service/node_selector_service.go @@ -3,10 +3,6 @@ package node_selector_service import ( "github.com/pokt-network/gateway-server/internal/chain_configurations_registry" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" - "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/evm_data_integrity_check" - "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/evm_height_check" - "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/pokt_data_integrity_check" - "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/pokt_height_check" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/solana_data_integrity_check" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/solana_height_check" "github.com/pokt-network/gateway-server/internal/node_selector_service/models" @@ -40,12 +36,12 @@ func NewNodeSelectorService(sessionRegistry session_registry.SessionRegistryServ // enabled checks enabledChecks := []checks.CheckJob{ - evm_height_check.NewEvmHeightCheck(baseCheck, logger.Named("evm_height_checker")), - evm_data_integrity_check.NewEvmDataIntegrityCheck(baseCheck, logger.Named("evm_data_integrity_checker")), + //evm_height_check.NewEvmHeightCheck(baseCheck, logger.Named("evm_height_checker")), + //evm_data_integrity_check.NewEvmDataIntegrityCheck(baseCheck, logger.Named("evm_data_integrity_checker")), solana_height_check.NewSolanaHeightCheck(baseCheck, logger.Named("solana_height_check")), solana_data_integrity_check.NewSolanaDataIntegrityCheck(baseCheck, logger.Named("solana_data_integrity_check")), - pokt_height_check.NewPoktHeightCheck(baseCheck, logger.Named("pokt_height_check")), - pokt_data_integrity_check.NewPoktDataIntegrityCheck(baseCheck, logger.Named("pokt_data_integrity_check")), + //pokt_height_check.NewPoktHeightCheck(baseCheck, logger.Named("pokt_height_check")), + //pokt_data_integrity_check.NewPoktDataIntegrityCheck(baseCheck, logger.Named("pokt_data_integrity_check")), } selectorService := &NodeSelectorClient{ sessionRegistry: sessionRegistry, @@ -121,8 +117,7 @@ func (q NodeSelectorClient) startJobChecker() { case <-ticker: for _, job := range q.checkJobs { if job.ShouldRun() { - for sessionChainKey, nodes := range q.sessionRegistry.GetNodesMap() { - q.logger.Sugar().Infow("running job", "job", job.Name(), "sessionChainKey", sessionChainKey) + for _, nodes := range q.sessionRegistry.GetNodesMap() { job.SetNodes(nodes.Value()) job.Perform() } From 8367ad9b1da9690d3813d8ab1389de7e29f693e6 Mon Sep 17 00:00:00 2001 From: blade Date: Fri, 19 Apr 2024 21:56:11 -0500 Subject: [PATCH 05/14] finish implemenation and add back all checks --- .../checks/data_integrity_handler.go | 6 +++--- .../node_selector_service/node_selector_service.go | 12 ++++++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/internal/node_selector_service/checks/data_integrity_handler.go b/internal/node_selector_service/checks/data_integrity_handler.go index 45331dd..e5ce599 100644 --- a/internal/node_selector_service/checks/data_integrity_handler.go +++ b/internal/node_selector_service/checks/data_integrity_handler.go @@ -58,7 +58,7 @@ func PerformDataIntegrityCheck(check *Check, calculatePayload GetBlockByNumberPa continue } - hash, err := retrieveBlockIdentifier(rsp.Relay.Response) + blockIdentifier, err := retrieveBlockIdentifier(rsp.Relay.Response) if err != nil { logger.Sugar().Warnw("failed to unmarshal response", "err", err) DefaultPunishNode(fasthttp.ErrTimeout, rsp.Node, logger) @@ -68,9 +68,9 @@ func PerformDataIntegrityCheck(check *Check, calculatePayload GetBlockByNumberPa rsp.Node.SetLastDataIntegrityCheckTime(time.Now()) nodeResponsePairs = append(nodeResponsePairs, &nodeHashRspPair{ node: rsp.Node, - blockIdentifier: hash, + blockIdentifier: blockIdentifier, }) - nodeResponseCounts[hash]++ + nodeResponseCounts[blockIdentifier]++ } majorityBlockIdentifier := findMajorityBlockIdentifier(nodeResponseCounts) diff --git a/internal/node_selector_service/node_selector_service.go b/internal/node_selector_service/node_selector_service.go index 8430eb4..50b726b 100644 --- a/internal/node_selector_service/node_selector_service.go +++ b/internal/node_selector_service/node_selector_service.go @@ -3,6 +3,10 @@ package node_selector_service import ( "github.com/pokt-network/gateway-server/internal/chain_configurations_registry" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/evm_data_integrity_check" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/evm_height_check" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/pokt_data_integrity_check" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/pokt_height_check" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/solana_data_integrity_check" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/solana_height_check" "github.com/pokt-network/gateway-server/internal/node_selector_service/models" @@ -36,12 +40,12 @@ func NewNodeSelectorService(sessionRegistry session_registry.SessionRegistryServ // enabled checks enabledChecks := []checks.CheckJob{ - //evm_height_check.NewEvmHeightCheck(baseCheck, logger.Named("evm_height_checker")), - //evm_data_integrity_check.NewEvmDataIntegrityCheck(baseCheck, logger.Named("evm_data_integrity_checker")), + evm_height_check.NewEvmHeightCheck(baseCheck, logger.Named("evm_height_checker")), + evm_data_integrity_check.NewEvmDataIntegrityCheck(baseCheck, logger.Named("evm_data_integrity_checker")), solana_height_check.NewSolanaHeightCheck(baseCheck, logger.Named("solana_height_check")), solana_data_integrity_check.NewSolanaDataIntegrityCheck(baseCheck, logger.Named("solana_data_integrity_check")), - //pokt_height_check.NewPoktHeightCheck(baseCheck, logger.Named("pokt_height_check")), - //pokt_data_integrity_check.NewPoktDataIntegrityCheck(baseCheck, logger.Named("pokt_data_integrity_check")), + pokt_height_check.NewPoktHeightCheck(baseCheck, logger.Named("pokt_height_check")), + pokt_data_integrity_check.NewPoktDataIntegrityCheck(baseCheck, logger.Named("pokt_data_integrity_check")), } selectorService := &NodeSelectorClient{ sessionRegistry: sessionRegistry, From 74827abe01b97f56853586f73eca2b2915a1197f Mon Sep 17 00:00:00 2001 From: blade Date: Fri, 19 Apr 2024 22:05:04 -0500 Subject: [PATCH 06/14] add chain network --- .env.sample | 3 +- .../config/dot_env_config_provider.go | 4 ++ docs/quick-onboarding-guide.md | 1 + internal/chain_network/chain_network.go | 8 ++++ internal/global_config/config_provider.go | 5 ++ mocks/global_config/config_provider.go | 47 +++++++++++++++++++ 6 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 internal/chain_network/chain_network.go diff --git a/.env.sample b/.env.sample index 6cedff1..0f43a43 100644 --- a/.env.sample +++ b/.env.sample @@ -7,4 +7,5 @@ SESSION_CACHE_TTL=75m DB_CONNECTION_URL=postgres://myuser:mypassword@postgres:5432/postgres?sslmode=disable API_KEY= ALTRUIST_REQUEST_TIMEOUT=10s -EMIT_SERVICE_URL_PROM_METRICS=false \ No newline at end of file +EMIT_SERVICE_URL_PROM_METRICS=false +CHAIN_NETWORK=morse_mainnet \ No newline at end of file diff --git a/cmd/gateway_server/internal/config/dot_env_config_provider.go b/cmd/gateway_server/internal/config/dot_env_config_provider.go index edef2e0..382972e 100644 --- a/cmd/gateway_server/internal/config/dot_env_config_provider.go +++ b/cmd/gateway_server/internal/config/dot_env_config_provider.go @@ -3,6 +3,7 @@ package config import ( "fmt" "github.com/joho/godotenv" + "github.com/pokt-network/gateway-server/internal/chain_network" "github.com/pokt-network/gateway-server/internal/global_config" "os" "strconv" @@ -15,6 +16,7 @@ const ( // Environment variable names const ( + chainNetworkEnv = "CHAIN_NETWORK" emitServiceUrlPromMetricsEnv = "EMIT_SERVICE_URL_PROM_METRICS" poktRPCFullHostEnv = "POKT_RPC_FULL_HOST" httpServerPortEnv = "HTTP_SERVER_PORT" @@ -30,6 +32,7 @@ const ( // DotEnvGlobalConfigProvider implements the GatewayServerProvider interface. type DotEnvGlobalConfigProvider struct { poktRPCFullHost string + chainNetwork chain_network.ChainNetwork httpServerPort uint poktRPCRequestTimeout time.Duration sessionCacheTTL time.Duration @@ -131,6 +134,7 @@ func NewDotEnvConfigProvider() *DotEnvGlobalConfigProvider { environmentStage: global_config.EnvironmentStage(getEnvVar(environmentStageEnv, "")), poktApplicationsEncryptionKey: getEnvVar(poktApplicationsEncryptionKeyEnv, ""), apiKey: getEnvVar(apiKey, ""), + chainNetwork: chain_network.ChainNetwork(getEnvVar(chainNetworkEnv, chain_network.MorseMainnet)), altruistRequestTimeout: altruistRequestTimeoutDuration, } } diff --git a/docs/quick-onboarding-guide.md b/docs/quick-onboarding-guide.md index 6cfe340..dbab63e 100644 --- a/docs/quick-onboarding-guide.md +++ b/docs/quick-onboarding-guide.md @@ -50,6 +50,7 @@ Fill out the `.env` variables for the gateway server. This can be done by inject | `POKT_APPLICATIONS_ENCRYPTION_KEY` | User-generated encryption key | `a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6` | | `DB_CONNECTION_URL` | PostgreSQL Database connection URL | `postgres://user:password@localhost:5432/postgres` | | `EMIT_SERVICE_URL_PROM_METRICS` | Boolean flag to enable service url for relay metrics (disabled by default to prevent cardinality explosion) | `false`, `true` | +| `CHAIN_NETWORK` | Identifies which network the gateway server is running on. | `morse_mainnnet`, `morse_testnet` | See [.env.sample](..%2F.env.sample) for a sample. ## 4. Run Migration Script diff --git a/internal/chain_network/chain_network.go b/internal/chain_network/chain_network.go new file mode 100644 index 0000000..aeba175 --- /dev/null +++ b/internal/chain_network/chain_network.go @@ -0,0 +1,8 @@ +package chain_network + +type ChainNetwork string + +const ( + MorseMainnet = "morse_mainnet" + MorseTestnet = "morse_testnet" +) diff --git a/internal/global_config/config_provider.go b/internal/global_config/config_provider.go index 9e3c829..4546f2a 100644 --- a/internal/global_config/config_provider.go +++ b/internal/global_config/config_provider.go @@ -15,6 +15,7 @@ type GlobalConfigProvider interface { PoktNodeConfigProvider AltruistConfigProvider PromMetricsProvider + ChainNetworkProvider } type PromMetricsProvider interface { @@ -42,3 +43,7 @@ type PoktNodeConfigProvider interface { type AltruistConfigProvider interface { GetAltruistRequestTimeout() time.Duration } + +type ChainNetworkProvider interface { + GetChainNetwork() ChainNetworkProvider +} diff --git a/mocks/global_config/config_provider.go b/mocks/global_config/config_provider.go index 72d3866..cb1afd1 100644 --- a/mocks/global_config/config_provider.go +++ b/mocks/global_config/config_provider.go @@ -112,6 +112,53 @@ func (_c *GlobalConfigProvider_GetAltruistRequestTimeout_Call) RunAndReturn(run return _c } +// GetChainNetwork provides a mock function with given fields: +func (_m *GlobalConfigProvider) GetChainNetwork() global_config.ChainNetworkProvider { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetChainNetwork") + } + + var r0 global_config.ChainNetworkProvider + if rf, ok := ret.Get(0).(func() global_config.ChainNetworkProvider); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(global_config.ChainNetworkProvider) + } + } + + return r0 +} + +// GlobalConfigProvider_GetChainNetwork_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChainNetwork' +type GlobalConfigProvider_GetChainNetwork_Call struct { + *mock.Call +} + +// GetChainNetwork is a helper method to define mock.On call +func (_e *GlobalConfigProvider_Expecter) GetChainNetwork() *GlobalConfigProvider_GetChainNetwork_Call { + return &GlobalConfigProvider_GetChainNetwork_Call{Call: _e.mock.On("GetChainNetwork")} +} + +func (_c *GlobalConfigProvider_GetChainNetwork_Call) Run(run func()) *GlobalConfigProvider_GetChainNetwork_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *GlobalConfigProvider_GetChainNetwork_Call) Return(_a0 global_config.ChainNetworkProvider) *GlobalConfigProvider_GetChainNetwork_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *GlobalConfigProvider_GetChainNetwork_Call) RunAndReturn(run func() global_config.ChainNetworkProvider) *GlobalConfigProvider_GetChainNetwork_Call { + _c.Call.Return(run) + return _c +} + // GetDatabaseConnectionUrl provides a mock function with given fields: func (_m *GlobalConfigProvider) GetDatabaseConnectionUrl() string { ret := _m.Called() From fd270a39a702f39617cd9f0b2bf0fb2797be7241 Mon Sep 17 00:00:00 2001 From: blade Date: Fri, 19 Apr 2024 22:20:22 -0500 Subject: [PATCH 07/14] fix typos and build --- .../config/dot_env_config_provider.go | 5 ++++ docs/quick-onboarding-guide.md | 24 +++++++++---------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/cmd/gateway_server/internal/config/dot_env_config_provider.go b/cmd/gateway_server/internal/config/dot_env_config_provider.go index 382972e..7769ec1 100644 --- a/cmd/gateway_server/internal/config/dot_env_config_provider.go +++ b/cmd/gateway_server/internal/config/dot_env_config_provider.go @@ -93,6 +93,11 @@ func (c DotEnvGlobalConfigProvider) ShouldEmitServiceUrlPromMetrics() bool { return c.emitServiceUrlPromMetrics } +// GetChainNetwork returns the current network, this can be useful for identifying the correct chain ids dependent on testnet or mainnet. +func (c DotEnvGlobalConfigProvider) GetChainNetwork() chain_network.ChainNetwork { + return c.chainNetwork +} + // NewDotEnvConfigProvider creates a new instance of DotEnvGlobalConfigProvider. func NewDotEnvConfigProvider() *DotEnvGlobalConfigProvider { _ = godotenv.Load() diff --git a/docs/quick-onboarding-guide.md b/docs/quick-onboarding-guide.md index dbab63e..0435568 100644 --- a/docs/quick-onboarding-guide.md +++ b/docs/quick-onboarding-guide.md @@ -39,18 +39,18 @@ Create an encryption password for your app stake keys. This password will be use Fill out the `.env` variables for the gateway server. This can be done by injecting environment variables directly or using a `.env` file. ### Env Variables Description -| Variable Name | Description | Example Value | -|------------------------------------|-------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------| -| `POKT_RPC_FULL_HOST` | Used for dispatching sessions | `https://pokt-testnet-rpc.nodies.org` (a complimentary testnet dispatcher URL provided by Nodies) | -| `HTTP_SERVER_PORT` | Gateway server port | `8080` | -| `POKT_RPC_TIMEOUT` | Max response time for a POKT node to respond | `10s` | -| `ALTRUIST_REQUEST_TIMEOUT` | Max response time for an altruist backup to respond | `10s` | -| `ENVIRONMENT_STAGE` | Log verbosity | `development`, `production` | -| `SESSION_CACHE_TTL` | Duration for sessions to stay in cache | `75m` | -| `POKT_APPLICATIONS_ENCRYPTION_KEY` | User-generated encryption key | `a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6` | -| `DB_CONNECTION_URL` | PostgreSQL Database connection URL | `postgres://user:password@localhost:5432/postgres` | -| `EMIT_SERVICE_URL_PROM_METRICS` | Boolean flag to enable service url for relay metrics (disabled by default to prevent cardinality explosion) | `false`, `true` | -| `CHAIN_NETWORK` | Identifies which network the gateway server is running on. | `morse_mainnnet`, `morse_testnet` | +| Variable Name | Description | Example Value | +|------------------------------------|------------------------------------------------------------|---------------------------------------------------------------------------------------------------| +| `POKT_RPC_FULL_HOST` | Used for dispatching sessions | `https://pokt-testnet-rpc.nodies.org` (a complimentary testnet dispatcher URL provided by Nodies) | +| `HTTP_SERVER_PORT` | Gateway server port | `8080` | +| `POKT_RPC_TIMEOUT` | Max response time for a POKT node to respond | `10s` | +| `ALTRUIST_REQUEST_TIMEOUT` | Max response time for an altruist backup to respond | `10s` | +| `ENVIRONMENT_STAGE` | Log verbosity | `development`, `production` | +| `SESSION_CACHE_TTL` | Duration for sessions to stay in cache | `75m` | +| `POKT_APPLICATIONS_ENCRYPTION_KEY` | User-generated encryption key | `a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6` | +| `DB_CONNECTION_URL` | PostgreSQL Database connection URL | `postgres://user:password@localhost:5432/postgres` | +| `EMIT_SERVICE_URL_PROM_METRICS` | Boolean flag to enable service url for relay metrics | `false`, `true` | +| `CHAIN_NETWORK` | Identifies which network the gateway server is running on. | `morse_mainnet`, `morse_testnet` | See [.env.sample](..%2F.env.sample) for a sample. ## 4. Run Migration Script From c163a4593a0f3d0c9200bcf148774cff80e0116a Mon Sep 17 00:00:00 2001 From: blade Date: Fri, 19 Apr 2024 22:32:31 -0500 Subject: [PATCH 08/14] add checks for mainnnet/testnet, and move chain checks to check package --- .../config/dot_env_config_provider.go | 2 +- cmd/gateway_server/main.go | 2 +- internal/chain_network/chain_network.go | 4 +- internal/global_config/config_provider.go | 7 ++- .../evm_data_integrity_check.go | 2 +- .../evm_height_check/evm_height_check.go | 2 +- .../pokt_data_integrity_check.go | 2 +- .../pokt_height_check/pokt_height_check.go | 4 +- .../node_selector_service/checks/qos_check.go | 44 ++++++++++++++++--- .../solana_data_integrity_check.go | 2 +- .../solana_height_check.go | 2 +- .../node_selector_service/models/qos_node.go | 18 -------- .../node_selector_service.go | 5 ++- mocks/global_config/config_provider.go | 16 +++---- 14 files changed, 66 insertions(+), 46 deletions(-) diff --git a/cmd/gateway_server/internal/config/dot_env_config_provider.go b/cmd/gateway_server/internal/config/dot_env_config_provider.go index 7769ec1..63e2411 100644 --- a/cmd/gateway_server/internal/config/dot_env_config_provider.go +++ b/cmd/gateway_server/internal/config/dot_env_config_provider.go @@ -139,7 +139,7 @@ func NewDotEnvConfigProvider() *DotEnvGlobalConfigProvider { environmentStage: global_config.EnvironmentStage(getEnvVar(environmentStageEnv, "")), poktApplicationsEncryptionKey: getEnvVar(poktApplicationsEncryptionKeyEnv, ""), apiKey: getEnvVar(apiKey, ""), - chainNetwork: chain_network.ChainNetwork(getEnvVar(chainNetworkEnv, chain_network.MorseMainnet)), + chainNetwork: chain_network.ChainNetwork(getEnvVar(chainNetworkEnv, string(chain_network.MorseMainnet))), altruistRequestTimeout: altruistRequestTimeoutDuration, } } diff --git a/cmd/gateway_server/main.go b/cmd/gateway_server/main.go index 9e2f0ae..b8dd1a5 100644 --- a/cmd/gateway_server/main.go +++ b/cmd/gateway_server/main.go @@ -67,7 +67,7 @@ func main() { poktApplicationRegistry := apps_registry.NewCachedAppsRegistry(client, querier, gatewayConfigProvider, logger.Named("pokt_application_registry")) chainConfigurationRegistry := chain_configurations_registry.NewCachedChainConfigurationRegistry(querier, logger.Named("chain_configurations_registry")) sessionRegistry := session_registry.NewCachedSessionRegistryService(client, poktApplicationRegistry, sessionCache, nodeCache, logger.Named("session_registry")) - nodeSelectorService := node_selector_service.NewNodeSelectorService(sessionRegistry, client, chainConfigurationRegistry, logger.Named("node_selector")) + nodeSelectorService := node_selector_service.NewNodeSelectorService(sessionRegistry, client, chainConfigurationRegistry, gatewayConfigProvider, logger.Named("node_selector")) relayer := relayer.NewRelayer(client, sessionRegistry, poktApplicationRegistry, nodeSelectorService, chainConfigurationRegistry, userAgent, gatewayConfigProvider, logger.Named("relayer")) diff --git a/internal/chain_network/chain_network.go b/internal/chain_network/chain_network.go index aeba175..361ab92 100644 --- a/internal/chain_network/chain_network.go +++ b/internal/chain_network/chain_network.go @@ -3,6 +3,6 @@ package chain_network type ChainNetwork string const ( - MorseMainnet = "morse_mainnet" - MorseTestnet = "morse_testnet" + MorseMainnet ChainNetwork = "morse_mainnet" + MorseTestnet ChainNetwork = "morse_testnet" ) diff --git a/internal/global_config/config_provider.go b/internal/global_config/config_provider.go index 4546f2a..4183272 100644 --- a/internal/global_config/config_provider.go +++ b/internal/global_config/config_provider.go @@ -1,6 +1,9 @@ package global_config -import "time" +import ( + "github.com/pokt-network/gateway-server/internal/chain_network" + "time" +) type EnvironmentStage string @@ -45,5 +48,5 @@ type AltruistConfigProvider interface { } type ChainNetworkProvider interface { - GetChainNetwork() ChainNetworkProvider + GetChainNetwork() chain_network.ChainNetwork } diff --git a/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go b/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go index 4c6c9f5..e556773 100644 --- a/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go +++ b/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go @@ -54,7 +54,7 @@ func (c *EvmDataIntegrityCheck) SetNodes(nodes []*models.QosNode) { func (c *EvmDataIntegrityCheck) Perform() { // Session is not meant for EVM - if len(c.NodeList) == 0 || !c.NodeList[0].IsEvmChain() { + if len(c.NodeList) == 0 || !c.IsEvmChain(c.NodeList[0]) { return } checks.PerformDataIntegrityCheck(c.Check, getBlockByNumberPayload, "", c.getBlockHashFromNodeResponse, c.logger) diff --git a/internal/node_selector_service/checks/evm_height_check/evm_height_check.go b/internal/node_selector_service/checks/evm_height_check/evm_height_check.go index 59a3cb6..41697ae 100644 --- a/internal/node_selector_service/checks/evm_height_check/evm_height_check.go +++ b/internal/node_selector_service/checks/evm_height_check/evm_height_check.go @@ -67,7 +67,7 @@ func (c *EvmHeightCheck) Name() string { func (c *EvmHeightCheck) Perform() { // Session is not meant for EVM - if len(c.NodeList) == 0 || !c.NodeList[0].IsEvmChain() { + if len(c.NodeList) == 0 || !c.IsEvmChain(c.NodeList[0]) { return } checks.PerformDefaultHeightCheck(c.Check, heightJsonPayload, "", c.getHeightFromNodeResponse, c.logger) diff --git a/internal/node_selector_service/checks/pokt_data_integrity_check/pokt_data_integrity_check.go b/internal/node_selector_service/checks/pokt_data_integrity_check/pokt_data_integrity_check.go index af5111e..1a06d5a 100644 --- a/internal/node_selector_service/checks/pokt_data_integrity_check/pokt_data_integrity_check.go +++ b/internal/node_selector_service/checks/pokt_data_integrity_check/pokt_data_integrity_check.go @@ -55,7 +55,7 @@ func (c *PoktDataIntegrityCheck) SetNodes(nodes []*models.QosNode) { func (c *PoktDataIntegrityCheck) Perform() { // Session is not meant for POKT - if len(c.NodeList) == 0 || !c.NodeList[0].IsPoktChain() { + if len(c.NodeList) == 0 || !c.IsPoktChain(c.NodeList[0]) { return } checks.PerformDataIntegrityCheck(c.Check, getBlockByNumberPayload, poktBlockTxEndpoint, c.getBlockIdentifierFromNodeResponse, c.logger) diff --git a/internal/node_selector_service/checks/pokt_height_check/pokt_height_check.go b/internal/node_selector_service/checks/pokt_height_check/pokt_height_check.go index bc4e83f..dcf86f6 100644 --- a/internal/node_selector_service/checks/pokt_height_check/pokt_height_check.go +++ b/internal/node_selector_service/checks/pokt_height_check/pokt_height_check.go @@ -37,8 +37,8 @@ func (c *PoktHeightCheck) Name() string { func (c *PoktHeightCheck) Perform() { - // Session is not meant for EVM - if len(c.NodeList) == 0 || !c.NodeList[0].IsPoktChain() { + // Session is not meant for POKT + if len(c.NodeList) == 0 || !c.IsPoktChain(c.NodeList[0]) { return } checks.PerformDefaultHeightCheck(c.Check, heightJsonPayload, "/v1/query/height", c.getHeightFromNodeResponse, c.logger) diff --git a/internal/node_selector_service/checks/qos_check.go b/internal/node_selector_service/checks/qos_check.go index 434556b..913e37e 100644 --- a/internal/node_selector_service/checks/qos_check.go +++ b/internal/node_selector_service/checks/qos_check.go @@ -2,10 +2,23 @@ package checks import ( "github.com/pokt-network/gateway-server/internal/chain_configurations_registry" + "github.com/pokt-network/gateway-server/internal/chain_network" + config2 "github.com/pokt-network/gateway-server/internal/global_config" qos_models "github.com/pokt-network/gateway-server/internal/node_selector_service/models" "github.com/pokt-network/gateway-server/pkg/pokt/pokt_v0" ) +const ( + chainMorseMainnetSolanaCustom = "C006" + chainMorseMainnetPokt = "0001" + chainMorseMainnetSolana = "0006" +) + +const ( + chainMorseTestnetPokt = "0013" + chainMorseTestnetSolana = "0008" +) + type CheckJob interface { Perform() Name() string @@ -14,11 +27,32 @@ type CheckJob interface { } type Check struct { - NodeList []*qos_models.QosNode - PocketRelayer pokt_v0.PocketRelayer - ChainConfiguration chain_configurations_registry.ChainConfigurationsService + NodeList []*qos_models.QosNode + PocketRelayer pokt_v0.PocketRelayer + ChainConfiguration chain_configurations_registry.ChainConfigurationsService + ChainNetworkProvider config2.ChainNetworkProvider +} + +func NewCheck(pocketRelayer pokt_v0.PocketRelayer, chainConfiguration chain_configurations_registry.ChainConfigurationsService, chainNetworkProvider config2.ChainNetworkProvider) *Check { + return &Check{PocketRelayer: pocketRelayer, ChainConfiguration: chainConfiguration, ChainNetworkProvider: chainNetworkProvider} +} + +func (c *Check) IsSolanaChain(node *qos_models.QosNode) bool { + chainId := node.GetChain() + if c.ChainNetworkProvider.GetChainNetwork() == chain_network.MorseTestnet { + return chainId == chainMorseTestnetSolana + } + return chainId == chainMorseMainnetSolana || chainId == chainMorseMainnetSolanaCustom +} + +func (c *Check) IsPoktChain(node *qos_models.QosNode) bool { + chainId := node.GetChain() + if c.ChainNetworkProvider.GetChainNetwork() == chain_network.MorseTestnet { + return chainId == chainMorseTestnetPokt + } + return chainId == chainMorseMainnetPokt } -func NewCheck(pocketRelayer pokt_v0.PocketRelayer, chainConfiguration chain_configurations_registry.ChainConfigurationsService) *Check { - return &Check{PocketRelayer: pocketRelayer, ChainConfiguration: chainConfiguration} +func (c *Check) IsEvmChain(node *qos_models.QosNode) bool { + return !c.IsPoktChain(node) && !c.IsSolanaChain(node) } diff --git a/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go b/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go index 1709647..770a774 100644 --- a/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go +++ b/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go @@ -54,7 +54,7 @@ func (c *SolanaDataIntegrityCheck) SetNodes(nodes []*models.QosNode) { func (c *SolanaDataIntegrityCheck) Perform() { // Session is not meant for Solana - if len(c.NodeList) == 0 || !c.NodeList[0].IsSolanaChain() { + if len(c.NodeList) == 0 || !c.IsSolanaChain(c.NodeList[0]) { return } checks.PerformDataIntegrityCheck(c.Check, getBlockByNumberPayload, "", c.getBlockIdentifierFromNodeResponse, c.logger) diff --git a/internal/node_selector_service/checks/solana_height_check/solana_height_check.go b/internal/node_selector_service/checks/solana_height_check/solana_height_check.go index cc22bdc..4adc7ac 100644 --- a/internal/node_selector_service/checks/solana_height_check/solana_height_check.go +++ b/internal/node_selector_service/checks/solana_height_check/solana_height_check.go @@ -38,7 +38,7 @@ func (c *SolanaHeightCheck) Name() string { func (c *SolanaHeightCheck) Perform() { // Session is not meant for Solana - if len(c.NodeList) == 0 || !c.NodeList[0].IsSolanaChain() { + if len(c.NodeList) == 0 || !c.IsSolanaChain(c.NodeList[0]) { return } checks.PerformDefaultHeightCheck(c.Check, heightJsonPayload, "", c.getHeightFromNodeResponse, c.logger) diff --git a/internal/node_selector_service/models/qos_node.go b/internal/node_selector_service/models/qos_node.go index 3c7c60e..29f2786 100644 --- a/internal/node_selector_service/models/qos_node.go +++ b/internal/node_selector_service/models/qos_node.go @@ -16,11 +16,6 @@ const ( latencyCompression = 1000 ) -const ( - chainSolanaCustom = "C006" - chainPokt = "0001" - chainSolana = "0006" -) const ( OutOfSyncTimeout TimeoutReason = "out_of_sync_timeout" DataIntegrityTimeout TimeoutReason = "invalid_data_timeout" @@ -130,19 +125,6 @@ func (n *QosNode) SetLastDataIntegrityCheckTime(lastDataIntegrityCheckTime time. n.lastDataIntegrityCheckTime = lastDataIntegrityCheckTime } -func (n *QosNode) IsSolanaChain() bool { - chainId := n.GetChain() - return chainId == chainSolana || chainId == chainSolanaCustom -} - -func (n *QosNode) IsPoktChain() bool { - return n.GetChain() == chainPokt -} - -func (n *QosNode) IsEvmChain() bool { - return !n.IsSolanaChain() && !n.IsPoktChain() -} - func (n *QosNode) GetTimeoutReason() TimeoutReason { return n.timeoutReason } diff --git a/internal/node_selector_service/node_selector_service.go b/internal/node_selector_service/node_selector_service.go index 50b726b..eaf1f5b 100644 --- a/internal/node_selector_service/node_selector_service.go +++ b/internal/node_selector_service/node_selector_service.go @@ -2,6 +2,7 @@ package node_selector_service import ( "github.com/pokt-network/gateway-server/internal/chain_configurations_registry" + "github.com/pokt-network/gateway-server/internal/global_config" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/evm_data_integrity_check" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/evm_height_check" @@ -33,10 +34,10 @@ type NodeSelectorClient struct { checkJobs []checks.CheckJob } -func NewNodeSelectorService(sessionRegistry session_registry.SessionRegistryService, pocketRelayer pokt_v0.PocketRelayer, chainConfiguration chain_configurations_registry.ChainConfigurationsService, logger *zap.Logger) *NodeSelectorClient { +func NewNodeSelectorService(sessionRegistry session_registry.SessionRegistryService, pocketRelayer pokt_v0.PocketRelayer, chainConfiguration chain_configurations_registry.ChainConfigurationsService, networkProvider global_config.ChainNetworkProvider, logger *zap.Logger) *NodeSelectorClient { // base checks will share same node list and pocket relayer - baseCheck := checks.NewCheck(pocketRelayer, chainConfiguration) + baseCheck := checks.NewCheck(pocketRelayer, chainConfiguration, networkProvider) // enabled checks enabledChecks := []checks.CheckJob{ diff --git a/mocks/global_config/config_provider.go b/mocks/global_config/config_provider.go index cb1afd1..6bcc083 100644 --- a/mocks/global_config/config_provider.go +++ b/mocks/global_config/config_provider.go @@ -3,7 +3,9 @@ package global_config_mock import ( + chain_network "github.com/pokt-network/gateway-server/internal/chain_network" global_config "github.com/pokt-network/gateway-server/internal/global_config" + mock "github.com/stretchr/testify/mock" time "time" @@ -113,20 +115,18 @@ func (_c *GlobalConfigProvider_GetAltruistRequestTimeout_Call) RunAndReturn(run } // GetChainNetwork provides a mock function with given fields: -func (_m *GlobalConfigProvider) GetChainNetwork() global_config.ChainNetworkProvider { +func (_m *GlobalConfigProvider) GetChainNetwork() chain_network.ChainNetwork { ret := _m.Called() if len(ret) == 0 { panic("no return value specified for GetChainNetwork") } - var r0 global_config.ChainNetworkProvider - if rf, ok := ret.Get(0).(func() global_config.ChainNetworkProvider); ok { + var r0 chain_network.ChainNetwork + if rf, ok := ret.Get(0).(func() chain_network.ChainNetwork); ok { r0 = rf() } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(global_config.ChainNetworkProvider) - } + r0 = ret.Get(0).(chain_network.ChainNetwork) } return r0 @@ -149,12 +149,12 @@ func (_c *GlobalConfigProvider_GetChainNetwork_Call) Run(run func()) *GlobalConf return _c } -func (_c *GlobalConfigProvider_GetChainNetwork_Call) Return(_a0 global_config.ChainNetworkProvider) *GlobalConfigProvider_GetChainNetwork_Call { +func (_c *GlobalConfigProvider_GetChainNetwork_Call) Return(_a0 chain_network.ChainNetwork) *GlobalConfigProvider_GetChainNetwork_Call { _c.Call.Return(_a0) return _c } -func (_c *GlobalConfigProvider_GetChainNetwork_Call) RunAndReturn(run func() global_config.ChainNetworkProvider) *GlobalConfigProvider_GetChainNetwork_Call { +func (_c *GlobalConfigProvider_GetChainNetwork_Call) RunAndReturn(run func() chain_network.ChainNetwork) *GlobalConfigProvider_GetChainNetwork_Call { _c.Call.Return(run) return _c } From 1aee44ef4d6def592f7fa96878626c102dc2437f Mon Sep 17 00:00:00 2001 From: blade Date: Thu, 11 Apr 2024 11:17:29 -0500 Subject: [PATCH 09/14] add service domain --- internal/relayer/relayer.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/internal/relayer/relayer.go b/internal/relayer/relayer.go index f72bc99..fbc4599 100644 --- a/internal/relayer/relayer.go +++ b/internal/relayer/relayer.go @@ -15,7 +15,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/valyala/fasthttp" "go.uber.org/zap" + "net/url" "strconv" + "strings" "time" ) @@ -140,6 +142,7 @@ func (r *Relayer) sendNodeSelectorRelay(req *models.SendRelayRequest) (*models.S // Record latency to prom and latency tracker latency := time.Now().Sub(startRequestTime) + pocketClientHistogramRelayRequestLatency.WithLabelValues(strconv.FormatBool(err == nil), "false", req.Chain).Observe(latency.Seconds()) node.GetLatencyTracker().RecordMeasurement(float64(latency.Milliseconds())) // Node returned an error, potentially penalize the node operator dependent on error @@ -253,3 +256,31 @@ func (r *Relayer) getPocketRequestTimeout(chainId string) time.Duration { } return configTime } + +func extractHostFromServiceUrl(urlStr string) string { + parsedURL, err := url.Parse(urlStr) + if err != nil { + return "" // return empty string or handle error + } + + // Get the hostname from the parsed URL + hostname := parsedURL.Hostname() + + // Find the last occurrence of "." in the hostname + index := strings.LastIndex(hostname, ".") + + // If there is no "." or it's the first character, return the hostname itself + if index == -1 || index == 0 { + return hostname + } + + // Find the index of the second-to-last occurrence of "." (root domain separator) + index = strings.LastIndex(hostname[:index-1], ".") + if index == -1 { + // If there is only one ".", return the hostname itself + return hostname + } + + // Extract and return the root domain + return hostname[index+1:] +} From 5c28d4eb1393893da5c64dbd9d1108ae67637c51 Mon Sep 17 00:00:00 2001 From: blade Date: Thu, 11 Apr 2024 12:20:23 -0500 Subject: [PATCH 10/14] add node host --- internal/relayer/relayer.go | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/internal/relayer/relayer.go b/internal/relayer/relayer.go index fbc4599..f52b3e7 100644 --- a/internal/relayer/relayer.go +++ b/internal/relayer/relayer.go @@ -43,7 +43,7 @@ func init() { Name: "relay_counter", Help: "Request to send an actual relay and if it succeeded", }, - []string{"success", "altruist", "reason", "chain_id"}, + []string{"success", "altruist", "reason", "chain_id", "service_host"}, ) histogramRelayRequestLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -51,7 +51,7 @@ func init() { Name: "relay_latency", Help: "percentile on the request on latency to select a node, sign a request and send it to the network", }, - []string{"success", "altruist", "chain_id"}, + []string{"success", "altruist", "chain_id", "service_host"}, ) pocketClientHistogramRelayRequestLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -59,7 +59,7 @@ func init() { Name: "pocket_client_relay_latency", Help: "percentile on the request on latency to sign a request and send it to the network", }, - []string{"success", "altruist", "chain_id"}, + []string{"success", "chain_id", "service_host"}, ) prometheus.MustRegister(counterRelayRequest, histogramRelayRequestLatency, pocketClientHistogramRelayRequestLatency) } @@ -95,23 +95,26 @@ func (r *Relayer) SendRelay(req *models.SendRelayRequest) (*models.SendRelayResp success := false altruist := false + var nodeHost string startTime := time.Now() // Measure end to end latency for send relay defer func() { - histogramRelayRequestLatency.WithLabelValues(strconv.FormatBool(success), strconv.FormatBool(altruist), req.Chain).Observe(time.Since(startTime).Seconds()) + histogramRelayRequestLatency.WithLabelValues(strconv.FormatBool(success), strconv.FormatBool(altruist), req.Chain, nodeHost).Observe(time.Since(startTime).Seconds()) }() - rsp, err := r.sendNodeSelectorRelay(req) + rsp, host, err := r.sendNodeSelectorRelay(req) + // Set the host to record service domain + nodeHost = host // Node selector relay was successful if err == nil { success = true - counterRelayRequest.WithLabelValues("true", "false", "", req.Chain).Inc() + counterRelayRequest.WithLabelValues("true", "false", "", req.Chain, nodeHost).Inc() return rsp, nil } altruist = true - counterRelayRequest.WithLabelValues("false", "true", reasonRelayFailedPocketErr, req.Chain).Inc() + counterRelayRequest.WithLabelValues("false", "true", reasonRelayFailedPocketErr, req.Chain, "").Inc() r.logger.Sugar().Errorw("failed to send to pokt", "poktErr", err) altruistRsp, altruistErr := r.altruistRelay(req) @@ -123,17 +126,17 @@ func (r *Relayer) SendRelay(req *models.SendRelayRequest) (*models.SendRelayResp return altruistRsp, nil } -func (r *Relayer) sendNodeSelectorRelay(req *models.SendRelayRequest) (*models.SendRelayResponse, error) { +func (r *Relayer) sendNodeSelectorRelay(req *models.SendRelayRequest) (*models.SendRelayResponse, string, error) { // find a node to send too first. node, ok := r.nodeSelector.FindNode(req.Chain) if !ok { - return nil, errSelectNodeFail + return nil, "", errSelectNodeFail } req.Signer = node.MorseSigner req.Session = node.MorseSession req.SelectedNodePubKey = node.GetPublicKey() if err := req.Validate(); err != nil { - return nil, err + return nil, "", err } startRequestTime := time.Now() @@ -143,14 +146,15 @@ func (r *Relayer) sendNodeSelectorRelay(req *models.SendRelayRequest) (*models.S // Record latency to prom and latency tracker latency := time.Now().Sub(startRequestTime) - pocketClientHistogramRelayRequestLatency.WithLabelValues(strconv.FormatBool(err == nil), "false", req.Chain).Observe(latency.Seconds()) + nodeHost := extractHostFromServiceUrl(node.MorseNode.ServiceUrl) + pocketClientHistogramRelayRequestLatency.WithLabelValues(strconv.FormatBool(err == nil), req.Chain, nodeHost).Observe(latency.Seconds()) node.GetLatencyTracker().RecordMeasurement(float64(latency.Milliseconds())) // Node returned an error, potentially penalize the node operator dependent on error if err != nil { checks.DefaultPunishNode(err, node, r.logger) } - return rsp, err + return rsp, nodeHost, err } func (r *Relayer) sendRandomNodeRelay(req *models.SendRelayRequest) (*models.SendRelayResponse, error) { @@ -172,7 +176,7 @@ func (r *Relayer) sendRandomNodeRelay(req *models.SendRelayRequest) (*models.Sen }) if err != nil { - counterRelayRequest.WithLabelValues("false", "true", reasonRelayFailedSessionErr, req.Chain).Inc() + counterRelayRequest.WithLabelValues("false", "true", reasonRelayFailedSessionErr, req.Chain, "").Inc() return nil, err } @@ -190,7 +194,7 @@ func (r *Relayer) sendRandomNodeRelay(req *models.SendRelayRequest) (*models.Sen rsp, err := r.pocketClient.SendRelay(req) // record if relay was successful - counterRelayRequest.WithLabelValues(strconv.FormatBool(err == nil), "false", "", req.Chain).Inc() + counterRelayRequest.WithLabelValues(strconv.FormatBool(err == nil), "false", "", req.Chain, extractHostFromServiceUrl(randomNode.MorseNode.ServiceUrl)).Inc() return rsp, err } @@ -223,7 +227,7 @@ func (r *Relayer) altruistRelay(req *models.SendRelayRequest) (*models.SendRelay err := r.httpRequester.DoTimeout(request, response, requestTimeout) success := err == nil - counterRelayRequest.WithLabelValues(strconv.FormatBool(success), "true", "", req.Chain).Inc() + counterRelayRequest.WithLabelValues(strconv.FormatBool(success), "true", "", req.Chain, "").Inc() if !success { return nil, err From cf41a587154f9b8709520210da1db16206ecfd23 Mon Sep 17 00:00:00 2001 From: blade Date: Thu, 11 Apr 2024 12:22:51 -0500 Subject: [PATCH 11/14] update tests --- internal/relayer/relayer_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/relayer/relayer_test.go b/internal/relayer/relayer_test.go index 641c82b..edfc529 100644 --- a/internal/relayer/relayer_test.go +++ b/internal/relayer/relayer_test.go @@ -49,6 +49,7 @@ func (suite *RelayerTestSuite) TestNodeSelectorRelay() { request *models.SendRelayRequest setupMocks func(*models.SendRelayRequest) expectedResponse *models.SendRelayResponse + expectedNodeHost string expectedError error }{ { @@ -61,6 +62,7 @@ func (suite *RelayerTestSuite) TestNodeSelectorRelay() { suite.mockNodeSelectorService.EXPECT().FindNode("1234").Return(nil, false) }, expectedResponse: nil, + expectedNodeHost: "", expectedError: errSelectNodeFail, }, { @@ -72,7 +74,7 @@ func (suite *RelayerTestSuite) TestNodeSelectorRelay() { setupMocks: func(request *models.SendRelayRequest) { signer := &models.Ed25519Account{} - node := &models.Node{PublicKey: "123"} + node := &models.Node{PublicKey: "123", ServiceUrl: "http://complex.subdomain.root.com/test/123"} session := &models.Session{} suite.mockNodeSelectorService.EXPECT().FindNode("1234").Return(qos_models.NewQosNode(node, session, signer), true) // expect sendRelay to have same parameters as find node, otherwise validation will fail @@ -84,6 +86,7 @@ func (suite *RelayerTestSuite) TestNodeSelectorRelay() { Session: session, }).Return(expectedResponse, nil) }, + expectedNodeHost: "root.com", expectedResponse: expectedResponse, expectedError: nil, }, @@ -97,12 +100,12 @@ func (suite *RelayerTestSuite) TestNodeSelectorRelay() { tc.setupMocks(tc.request) // setup mocks - rsp, err := suite.relayer.sendNodeSelectorRelay(tc.request) + rsp, host, err := suite.relayer.sendNodeSelectorRelay(tc.request) // assert results suite.Equal(tc.expectedResponse, rsp) + suite.Equal(tc.expectedNodeHost, host) suite.Equal(tc.expectedError, err) - }) } From 976b23fbac01e046a50af2df8664b0876a160cc0 Mon Sep 17 00:00:00 2001 From: blade Date: Fri, 19 Apr 2024 22:50:23 -0500 Subject: [PATCH 12/14] add emit metrics --- internal/relayer/relayer.go | 9 ++++++--- internal/relayer/relayer_test.go | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/internal/relayer/relayer.go b/internal/relayer/relayer.go index f52b3e7..1cc0aa2 100644 --- a/internal/relayer/relayer.go +++ b/internal/relayer/relayer.go @@ -146,7 +146,7 @@ func (r *Relayer) sendNodeSelectorRelay(req *models.SendRelayRequest) (*models.S // Record latency to prom and latency tracker latency := time.Now().Sub(startRequestTime) - nodeHost := extractHostFromServiceUrl(node.MorseNode.ServiceUrl) + nodeHost := r.extractHostFromServiceUrl(node.MorseNode.ServiceUrl) pocketClientHistogramRelayRequestLatency.WithLabelValues(strconv.FormatBool(err == nil), req.Chain, nodeHost).Observe(latency.Seconds()) node.GetLatencyTracker().RecordMeasurement(float64(latency.Milliseconds())) // Node returned an error, potentially penalize the node operator dependent on error @@ -194,7 +194,7 @@ func (r *Relayer) sendRandomNodeRelay(req *models.SendRelayRequest) (*models.Sen rsp, err := r.pocketClient.SendRelay(req) // record if relay was successful - counterRelayRequest.WithLabelValues(strconv.FormatBool(err == nil), "false", "", req.Chain, extractHostFromServiceUrl(randomNode.MorseNode.ServiceUrl)).Inc() + counterRelayRequest.WithLabelValues(strconv.FormatBool(err == nil), "false", "", req.Chain, r.extractHostFromServiceUrl(randomNode.MorseNode.ServiceUrl)).Inc() return rsp, err } @@ -261,7 +261,10 @@ func (r *Relayer) getPocketRequestTimeout(chainId string) time.Duration { return configTime } -func extractHostFromServiceUrl(urlStr string) string { +func (r *Relayer) extractHostFromServiceUrl(urlStr string) string { + if !r.globalConfigProvider.ShouldEmitServiceUrlPromMetrics() { + return "" + } parsedURL, err := url.Parse(urlStr) if err != nil { return "" // return empty string or handle error diff --git a/internal/relayer/relayer_test.go b/internal/relayer/relayer_test.go index edfc529..8ba82b9 100644 --- a/internal/relayer/relayer_test.go +++ b/internal/relayer/relayer_test.go @@ -76,6 +76,7 @@ func (suite *RelayerTestSuite) TestNodeSelectorRelay() { signer := &models.Ed25519Account{} node := &models.Node{PublicKey: "123", ServiceUrl: "http://complex.subdomain.root.com/test/123"} session := &models.Session{} + suite.mockConfigProvider.EXPECT().ShouldEmitServiceUrlPromMetrics().Return(true) suite.mockNodeSelectorService.EXPECT().FindNode("1234").Return(qos_models.NewQosNode(node, session, signer), true) // expect sendRelay to have same parameters as find node, otherwise validation will fail suite.mockPocketService.EXPECT().SendRelay(&models.SendRelayRequest{ From 55ba283de6821887d1185da0c47220566e8b3e93 Mon Sep 17 00:00:00 2001 From: blade Date: Sat, 20 Apr 2024 15:28:08 -0500 Subject: [PATCH 13/14] update comments --- docs/node-selection.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/node-selection.md b/docs/node-selection.md index 6a022f6..eb51c1e 100644 --- a/docs/node-selection.md +++ b/docs/node-selection.md @@ -63,7 +63,7 @@ ChainConfiguration chain_configurations_registry.ChainConfigurationsService ``` that developers should inherit. This base check provides a list of nodes to check and a `PocketRelayer` that allows the developer to send requests to the nodes in the network, and `ChainConfiguration` service that allows for per-chain specific check configurations. -Checks are designed to be opinionated and there are numerous ways to implement whether a node is healthy or not by definition. Therefore, implementing custom QoS checks will be dependent on the chain or data source the developer is looking to support. For example, the developer may want to send a request to a Solana node with a custom JSON-RPC method to see if the node is synced by using the provided `PocketRelayer` to send a request to the node through Pocket network. +Checks are designed to be opinionated and there are numerous ways to implement whether a node is healthy or not by definition. Therefore, implementing custom QoS checks will be dependent on the chain or data source the developer is looking to support. For example, the developer may want to send a request to a custom blockchain node with a custom JSON-RPC method to see if the node is synced by using the provided `PocketRelayer` to send a request to the node through Pocket network. If the node is not synced, the developer can set a custom punishment through the various functions exposed in [qos_node.go](..%2Finternal%2Fnode_selector_service%2Fmodels%2Fqos_node.go), such as `SetTimeoutUntil` to punish the node. Once the developer is finished implementing the CheckJob, they can enable the QoS check by initializing the newly created check into the `enabledChecks` variable inside [node_selector_service.go](..%2Finternal%2Fnode_selector_service%2Fnode_selector_service.go) and are encouraged to open up a PR for inclusion in the official repository. @@ -72,7 +72,7 @@ Once the developer is finished implementing the CheckJob, they can enable the Qo - Long term persistent results - Pros: More data to work with on determining if a node is healthy - - Cons: Expensive, more complex logic, and can be punishing to new node operators + - Cons: Expensive, more complex logic (due to geographic regions) and can be punishing to new node operators - Rolling up the results for long term storage & historical look back From f5c9fc1e391354057bd510342764e602e15a20b9 Mon Sep 17 00:00:00 2001 From: blade Date: Sat, 20 Apr 2024 15:30:16 -0500 Subject: [PATCH 14/14] clarify FindNode function --- docs/node-selection.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/node-selection.md b/docs/node-selection.md index eb51c1e..13afe27 100644 --- a/docs/node-selection.md +++ b/docs/node-selection.md @@ -20,7 +20,7 @@ heuristics: ## Node Selector After the sessions are primed, the nodes are fed to the `NodeSelectorService` which is responsible for: 1. Running various QoS checks (Height and Data Integrity Checks) -2. Exposing functions for the main process to select a healthy node `findNode(chainId) string` +2. Exposing functions for the main process to select a healthy node `FindNode(chainId string) string` ### Checks Framework The gateway server provides a simple interface called a `CheckJob`. This interface consists of three simple functions