diff --git a/.env.sample b/.env.sample index d5394a7..0f43a43 100644 --- a/.env.sample +++ b/.env.sample @@ -6,4 +6,6 @@ POKT_APPLICATIONS_ENCRYPTION_KEY= SESSION_CACHE_TTL=75m DB_CONNECTION_URL=postgres://myuser:mypassword@postgres:5432/postgres?sslmode=disable API_KEY= -ALTRUIST_REQUEST_TIMEOUT=10s \ No newline at end of file +ALTRUIST_REQUEST_TIMEOUT=10s +EMIT_SERVICE_URL_PROM_METRICS=false +CHAIN_NETWORK=morse_mainnet \ No newline at end of file diff --git a/cmd/gateway_server/internal/config/dot_env_config_provider.go b/cmd/gateway_server/internal/config/dot_env_config_provider.go index e748dc6..63e2411 100644 --- a/cmd/gateway_server/internal/config/dot_env_config_provider.go +++ b/cmd/gateway_server/internal/config/dot_env_config_provider.go @@ -3,6 +3,7 @@ package config import ( "fmt" "github.com/joho/godotenv" + "github.com/pokt-network/gateway-server/internal/chain_network" "github.com/pokt-network/gateway-server/internal/global_config" "os" "strconv" @@ -15,6 +16,8 @@ const ( // Environment variable names const ( + chainNetworkEnv = "CHAIN_NETWORK" + emitServiceUrlPromMetricsEnv = "EMIT_SERVICE_URL_PROM_METRICS" poktRPCFullHostEnv = "POKT_RPC_FULL_HOST" httpServerPortEnv = "HTTP_SERVER_PORT" poktRPCTimeoutEnv = "POKT_RPC_TIMEOUT" @@ -29,6 +32,7 @@ const ( // DotEnvGlobalConfigProvider implements the GatewayServerProvider interface. type DotEnvGlobalConfigProvider struct { poktRPCFullHost string + chainNetwork chain_network.ChainNetwork httpServerPort uint poktRPCRequestTimeout time.Duration sessionCacheTTL time.Duration @@ -36,6 +40,7 @@ type DotEnvGlobalConfigProvider struct { poktApplicationsEncryptionKey string databaseConnectionUrl string apiKey string + emitServiceUrlPromMetrics bool altruistRequestTimeout time.Duration } @@ -83,6 +88,16 @@ func (c DotEnvGlobalConfigProvider) GetAltruistRequestTimeout() time.Duration { return c.altruistRequestTimeout } +// ShouldEmitServiceUrl returns whether to emit service url tags as part of relay metrics. +func (c DotEnvGlobalConfigProvider) ShouldEmitServiceUrlPromMetrics() bool { + return c.emitServiceUrlPromMetrics +} + +// GetChainNetwork returns the current network, this can be useful for identifying the correct chain ids dependent on testnet or mainnet. +func (c DotEnvGlobalConfigProvider) GetChainNetwork() chain_network.ChainNetwork { + return c.chainNetwork +} + // NewDotEnvConfigProvider creates a new instance of DotEnvGlobalConfigProvider. func NewDotEnvConfigProvider() *DotEnvGlobalConfigProvider { _ = godotenv.Load() @@ -108,7 +123,14 @@ func NewDotEnvConfigProvider() *DotEnvGlobalConfigProvider { altruistRequestTimeoutDuration = defaultAltruistRequestTimeout } + emitServiceUrlPromMetrics, err := strconv.ParseBool(getEnvVar(emitServiceUrlPromMetricsEnv, "false")) + + if err != nil { + emitServiceUrlPromMetrics = false + } + return &DotEnvGlobalConfigProvider{ + emitServiceUrlPromMetrics: emitServiceUrlPromMetrics, poktRPCFullHost: getEnvVar(poktRPCFullHostEnv, ""), httpServerPort: uint(httpServerPort), poktRPCRequestTimeout: poktRPCTimeout, @@ -117,6 +139,7 @@ func NewDotEnvConfigProvider() *DotEnvGlobalConfigProvider { environmentStage: global_config.EnvironmentStage(getEnvVar(environmentStageEnv, "")), poktApplicationsEncryptionKey: getEnvVar(poktApplicationsEncryptionKeyEnv, ""), apiKey: getEnvVar(apiKey, ""), + chainNetwork: chain_network.ChainNetwork(getEnvVar(chainNetworkEnv, string(chain_network.MorseMainnet))), altruistRequestTimeout: altruistRequestTimeoutDuration, } } diff --git a/cmd/gateway_server/internal/controllers/relay.go b/cmd/gateway_server/internal/controllers/relay.go index 9ce12da..27faf71 100644 --- a/cmd/gateway_server/internal/controllers/relay.go +++ b/cmd/gateway_server/internal/controllers/relay.go @@ -57,6 +57,8 @@ func (c *RelayController) HandleRelay(ctx *fasthttp.RequestCtx) { return } +// getPathSegmented: returns the chain being requested and other parts to be proxied to pokt nodes +// Example: /relay/0001/v1/client, returns 0001, /v1/client func getPathSegmented(path []byte) (chain, otherParts string) { paths := strings.Split(string(path), "/") diff --git a/cmd/gateway_server/main.go b/cmd/gateway_server/main.go index 9e2f0ae..b8dd1a5 100644 --- a/cmd/gateway_server/main.go +++ b/cmd/gateway_server/main.go @@ -67,7 +67,7 @@ func main() { poktApplicationRegistry := apps_registry.NewCachedAppsRegistry(client, querier, gatewayConfigProvider, logger.Named("pokt_application_registry")) chainConfigurationRegistry := chain_configurations_registry.NewCachedChainConfigurationRegistry(querier, logger.Named("chain_configurations_registry")) sessionRegistry := session_registry.NewCachedSessionRegistryService(client, poktApplicationRegistry, sessionCache, nodeCache, logger.Named("session_registry")) - nodeSelectorService := node_selector_service.NewNodeSelectorService(sessionRegistry, client, chainConfigurationRegistry, logger.Named("node_selector")) + nodeSelectorService := node_selector_service.NewNodeSelectorService(sessionRegistry, client, chainConfigurationRegistry, gatewayConfigProvider, logger.Named("node_selector")) relayer := relayer.NewRelayer(client, sessionRegistry, poktApplicationRegistry, nodeSelectorService, chainConfigurationRegistry, userAgent, gatewayConfigProvider, logger.Named("relayer")) diff --git a/docs/node-selection.md b/docs/node-selection.md index 94db540..13afe27 100644 --- a/docs/node-selection.md +++ b/docs/node-selection.md @@ -20,7 +20,7 @@ heuristics: ## Node Selector After the sessions are primed, the nodes are fed to the `NodeSelectorService` which is responsible for: 1. Running various QoS checks (Height and Data Integrity Checks) -2. Exposing functions for the main process to select a healthy node `findNode(chainId) string` +2. Exposing functions for the main process to select a healthy node `FindNode(chainId string) string` ### Checks Framework The gateway server provides a simple interface called a `CheckJob`. This interface consists of three simple functions @@ -33,10 +33,24 @@ type CheckJob interface { ``` Under the hood, the NodeSelectorService is responsible for asynchronously executing all the initialized `CheckJobs`. +### Existing QoS checks: +- **Height Check:** The general flow would be: + 1) Query all node operators height, + 2) compares heights with other node operators within a specific threshold + 3) filters out node operators that exceed the configurable block height tolerance. + +- **Data Integrity Check:** The general flow would be: + 1) Retrieve a unique block identifier (i.e block hash or total block tx count, etc) with a configurable block offset for randomness, + 2) Query other node operators for the same block identifier + 3) filter out other node operators that return a different identifier. + Some existing implementations of Checks can be found in: 1. [evm_data_integrity_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fevm_data_integrity_check%2Fevm_data_integrity_check.go) 2. [evm_height_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fevm_height_check%2Fevm_height_check.go) - +3. [pokt_height_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fpokt_height_check%2Fpokt_height_check.go) +4. [pokt_data_integrity_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fpokt_data_integrity_check%2Fpokt_data_integrity_check.go) +5. [solana_height_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fsolana_height_check%2Fsolana_height_check.go) +6. [solana_data_integrity_check.go](..%2Finternal%2Fnode_selector_service%2Fchecks%2Fsolana_data_integrity_check%2Fsolana_data_integrity_check.go) ### Adding custom QoS checks Every custom check must conform to the `CheckJob` interface. The gateway server provides a base check: @@ -49,7 +63,7 @@ ChainConfiguration chain_configurations_registry.ChainConfigurationsService ``` that developers should inherit. This base check provides a list of nodes to check and a `PocketRelayer` that allows the developer to send requests to the nodes in the network, and `ChainConfiguration` service that allows for per-chain specific check configurations. -Checks are designed to be opinionated and there are numerous ways to implement whether a node is healthy or not by definition. Therefore, implementing custom QoS checks will be dependent on the chain or data source the developer is looking to support. For example, the developer may want to send a request to a Solana node with a custom JSON-RPC method to see if the node is synced by using the provided `PocketRelayer` to send a request to the node through Pocket network. +Checks are designed to be opinionated and there are numerous ways to implement whether a node is healthy or not by definition. Therefore, implementing custom QoS checks will be dependent on the chain or data source the developer is looking to support. For example, the developer may want to send a request to a custom blockchain node with a custom JSON-RPC method to see if the node is synced by using the provided `PocketRelayer` to send a request to the node through Pocket network. If the node is not synced, the developer can set a custom punishment through the various functions exposed in [qos_node.go](..%2Finternal%2Fnode_selector_service%2Fmodels%2Fqos_node.go), such as `SetTimeoutUntil` to punish the node. Once the developer is finished implementing the CheckJob, they can enable the QoS check by initializing the newly created check into the `enabledChecks` variable inside [node_selector_service.go](..%2Finternal%2Fnode_selector_service%2Fnode_selector_service.go) and are encouraged to open up a PR for inclusion in the official repository. @@ -58,7 +72,7 @@ Once the developer is finished implementing the CheckJob, they can enable the Qo - Long term persistent results - Pros: More data to work with on determining if a node is healthy - - Cons: Expensive, more complex logic, and can be punishing to new node operators + - Cons: Expensive, more complex logic (due to geographic regions) and can be punishing to new node operators - Rolling up the results for long term storage & historical look back diff --git a/docs/quick-onboarding-guide.md b/docs/quick-onboarding-guide.md index 04fa7dd..0435568 100644 --- a/docs/quick-onboarding-guide.md +++ b/docs/quick-onboarding-guide.md @@ -39,17 +39,18 @@ Create an encryption password for your app stake keys. This password will be use Fill out the `.env` variables for the gateway server. This can be done by injecting environment variables directly or using a `.env` file. ### Env Variables Description -| Variable Name | Description | Example Value | -|-------------------------------------|-----------------------------------------------------|---------------------------------------------------------------------------------------------------| -| `POKT_RPC_FULL_HOST` | Used for dispatching sessions | `https://pokt-testnet-rpc.nodies.org` (a complimentary testnet dispatcher URL provided by Nodies) | -| `HTTP_SERVER_PORT` | Gateway server port | `8080` | -| `POKT_RPC_TIMEOUT` | Max response time for a POKT node to respond | `10s` | -| `ALTRUIST_REQUEST_TIMEOUT` | Max response time for an altruist backup to respond | `10s` | -| `ENVIRONMENT_STAGE` | Log verbosity | `development`, `production` | -| `SESSION_CACHE_TTL` | Duration for sessions to stay in cache | `75m` | -| `POKT_APPLICATIONS_ENCRYPTION_KEY` | User-generated encryption key | `a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6` | -| `DB_CONNECTION_URL` | PostgreSQL Database connection URL | `postgres://user:password@localhost:5432/postgres` | - +| Variable Name | Description | Example Value | +|------------------------------------|------------------------------------------------------------|---------------------------------------------------------------------------------------------------| +| `POKT_RPC_FULL_HOST` | Used for dispatching sessions | `https://pokt-testnet-rpc.nodies.org` (a complimentary testnet dispatcher URL provided by Nodies) | +| `HTTP_SERVER_PORT` | Gateway server port | `8080` | +| `POKT_RPC_TIMEOUT` | Max response time for a POKT node to respond | `10s` | +| `ALTRUIST_REQUEST_TIMEOUT` | Max response time for an altruist backup to respond | `10s` | +| `ENVIRONMENT_STAGE` | Log verbosity | `development`, `production` | +| `SESSION_CACHE_TTL` | Duration for sessions to stay in cache | `75m` | +| `POKT_APPLICATIONS_ENCRYPTION_KEY` | User-generated encryption key | `a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6` | +| `DB_CONNECTION_URL` | PostgreSQL Database connection URL | `postgres://user:password@localhost:5432/postgres` | +| `EMIT_SERVICE_URL_PROM_METRICS` | Boolean flag to enable service url for relay metrics | `false`, `true` | +| `CHAIN_NETWORK` | Identifies which network the gateway server is running on. | `morse_mainnet`, `morse_testnet` | See [.env.sample](..%2F.env.sample) for a sample. ## 4. Run Migration Script diff --git a/internal/chain_network/chain_network.go b/internal/chain_network/chain_network.go new file mode 100644 index 0000000..361ab92 --- /dev/null +++ b/internal/chain_network/chain_network.go @@ -0,0 +1,8 @@ +package chain_network + +type ChainNetwork string + +const ( + MorseMainnet ChainNetwork = "morse_mainnet" + MorseTestnet ChainNetwork = "morse_testnet" +) diff --git a/internal/global_config/config_provider.go b/internal/global_config/config_provider.go index 5037e56..4183272 100644 --- a/internal/global_config/config_provider.go +++ b/internal/global_config/config_provider.go @@ -1,6 +1,9 @@ package global_config -import "time" +import ( + "github.com/pokt-network/gateway-server/internal/chain_network" + "time" +) type EnvironmentStage string @@ -14,6 +17,12 @@ type GlobalConfigProvider interface { EnvironmentProvider PoktNodeConfigProvider AltruistConfigProvider + PromMetricsProvider + ChainNetworkProvider +} + +type PromMetricsProvider interface { + ShouldEmitServiceUrlPromMetrics() bool } type SecretProvider interface { @@ -37,3 +46,7 @@ type PoktNodeConfigProvider interface { type AltruistConfigProvider interface { GetAltruistRequestTimeout() time.Duration } + +type ChainNetworkProvider interface { + GetChainNetwork() chain_network.ChainNetwork +} diff --git a/internal/node_selector_service/checks/async_relay_handler.go b/internal/node_selector_service/checks/async_relay_handler.go index a1ea26d..824da6c 100644 --- a/internal/node_selector_service/checks/async_relay_handler.go +++ b/internal/node_selector_service/checks/async_relay_handler.go @@ -13,7 +13,7 @@ type nodeRelayResponse struct { Error error } -func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, payload string, method string) chan *nodeRelayResponse { +func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, payload string, method string, path string) chan *nodeRelayResponse { // Define a channel to receive relay responses relayResponses := make(chan *nodeRelayResponse, len(nodes)) var wg sync.WaitGroup @@ -23,7 +23,7 @@ func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, pay defer wg.Done() relay, err := relayer.SendRelay(&relayer_models.SendRelayRequest{ Signer: node.GetAppStakeSigner(), - Payload: &relayer_models.Payload{Data: payload, Method: method}, + Payload: &relayer_models.Payload{Data: payload, Method: method, Path: path}, Chain: node.GetChain(), SelectedNodePubKey: node.GetPublicKey(), Session: node.MorseSession, diff --git a/internal/node_selector_service/checks/data_integrity_handler.go b/internal/node_selector_service/checks/data_integrity_handler.go new file mode 100644 index 0000000..e5ce599 --- /dev/null +++ b/internal/node_selector_service/checks/data_integrity_handler.go @@ -0,0 +1,130 @@ +package checks + +import ( + "fmt" + "github.com/pokt-network/gateway-server/internal/node_selector_service/models" + "github.com/pokt-network/gateway-server/pkg/common" + "github.com/valyala/fasthttp" + "go.uber.org/zap" + "time" +) + +const ( + // how often to check a node's data integrity + dataIntegrityNodeCheckInterval = time.Minute * 10 + + // penalty whenever a pocket node doesn't match other node providers responses + dataIntegrityTimePenalty = time.Minute * 15 + + // the look back we will use to determine which block number to do a data integrity against (latestBlockHeight - lookBack) + dataIntegrityHeightLookbackDefault = 25 +) + +type nodeHashRspPair struct { + node *models.QosNode + blockIdentifier string +} + +type BlockHashParser func(response string) (string, error) + +type GetBlockByNumberPayloadFmter func(blockToFind uint64) string + +// PerformDataIntegrityCheck: is the default implementation of a data integrity check by: +func PerformDataIntegrityCheck(check *Check, calculatePayload GetBlockByNumberPayloadFmter, path string, retrieveBlockIdentifier BlockHashParser, logger *zap.Logger) { + // Find a node that has been reported as healthy to use as source of truth + sourceOfTruth := findRandomHealthyNode(check.NodeList) + + // Node that is synced cannot be found, so we cannot run data integrity checks since we need a trusted source + if sourceOfTruth == nil { + logger.Sugar().Warnw("cannot find source of truth for data integrity check", "chain", check.NodeList[0].GetChain()) + return + } + + logger.Sugar().Infow("running default data integrity check", "chain", check.NodeList[0].GetChain()) + + // Map to count number of nodes that return blockHash -> counter + nodeResponseCounts := make(map[string]int) + + var nodeResponsePairs []*nodeHashRspPair + + // find a random block to search that nodes should have access too + blockNumberToSearch := sourceOfTruth.GetLastKnownHeight() - uint64(GetDataIntegrityHeightLookback(check.ChainConfiguration, sourceOfTruth.GetChain(), dataIntegrityHeightLookbackDefault)) + + attestationResponses := SendRelaysAsync(check.PocketRelayer, getEligibleDataIntegrityCheckNodes(check.NodeList), calculatePayload(blockNumberToSearch), "POST", path) + for rsp := range attestationResponses { + + if rsp.Error != nil { + DefaultPunishNode(rsp.Error, rsp.Node, logger) + continue + } + + blockIdentifier, err := retrieveBlockIdentifier(rsp.Relay.Response) + if err != nil { + logger.Sugar().Warnw("failed to unmarshal response", "err", err) + DefaultPunishNode(fasthttp.ErrTimeout, rsp.Node, logger) + continue + } + + rsp.Node.SetLastDataIntegrityCheckTime(time.Now()) + nodeResponsePairs = append(nodeResponsePairs, &nodeHashRspPair{ + node: rsp.Node, + blockIdentifier: blockIdentifier, + }) + nodeResponseCounts[blockIdentifier]++ + } + + majorityBlockIdentifier := findMajorityBlockIdentifier(nodeResponseCounts) + + // Blcok blockIdentifier must not be empty + if majorityBlockIdentifier == "" { + return + } + + // Penalize other node operators with a timeout if they don't attest with same block blockIdentifier. + for _, nodeResp := range nodeResponsePairs { + if nodeResp.blockIdentifier != majorityBlockIdentifier { + logger.Sugar().Errorw("punishing node for failed data integrity check", "node", nodeResp.node.MorseNode.ServiceUrl, "nodeBlockHash", nodeResp.blockIdentifier, "trustedSourceBlockHash", majorityBlockIdentifier) + nodeResp.node.SetTimeoutUntil(time.Now().Add(dataIntegrityTimePenalty), models.DataIntegrityTimeout, fmt.Errorf("nodeBlockHash %s, trustedSourceBlockHash %s", nodeResp.blockIdentifier, majorityBlockIdentifier)) + } + } + +} + +// findRandomHealthyNode - returns a healthy node that is synced so we can use it as a source of truth for data integrity checks +func findRandomHealthyNode(nodes []*models.QosNode) *models.QosNode { + var healthyNodes []*models.QosNode + for _, node := range nodes { + if node.IsHealthy() { + healthyNodes = append(healthyNodes, node) + } + } + healthyNode, ok := common.GetRandomElement(healthyNodes) + if !ok { + return nil + } + return healthyNode +} + +func getEligibleDataIntegrityCheckNodes(nodes []*models.QosNode) []*models.QosNode { + // Filter nodes based on last checked time + var eligibleNodes []*models.QosNode + for _, node := range nodes { + if (node.GetLastDataIntegrityCheckTime().IsZero() || time.Since(node.GetLastDataIntegrityCheckTime()) >= dataIntegrityNodeCheckInterval) && node.IsHealthy() { + eligibleNodes = append(eligibleNodes, node) + } + } + return eligibleNodes +} + +// findMajorityBlockIdentifier finds the blockIdentifier with the highest response count +func findMajorityBlockIdentifier(responseCounts map[string]int) string { + var highestResponseIdentifier string + var highestResponseCount int + for rsp, count := range responseCounts { + if count > highestResponseCount { + highestResponseIdentifier = rsp + highestResponseCount = count + } + } + return highestResponseIdentifier +} diff --git a/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go b/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go index 10f5c83..e556773 100644 --- a/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go +++ b/internal/node_selector_service/checks/evm_data_integrity_check/evm_data_integrity_check.go @@ -5,26 +5,15 @@ import ( "fmt" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" "github.com/pokt-network/gateway-server/internal/node_selector_service/models" - "github.com/pokt-network/gateway-server/pkg/common" - "github.com/valyala/fasthttp" "go.uber.org/zap" "strconv" "time" ) const ( - // penalty whenever a pocket node doesn't match other node providers responses - dataIntegrityTimePenalty = time.Minute * 15 - - // how often to check a node's data integrity - dataIntegrityNodeCheckInterval = time.Minute * 10 - // how often the job should run dataIntegrityCheckInterval = time.Second * 1 - // the look back we will use to determine which block number to do a data integrity against (latestBlockHeight - lookBack) - dataIntegrityHeightLookbackDefault = 25 - //json rpc payload to send a data integrity check blockPayloadFmt = `{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["%s", false],"id":1}` ) @@ -41,13 +30,17 @@ type EvmDataIntegrityCheck struct { logger *zap.Logger } -func NewEvmDataIntegrityCheck(check *checks.Check, logger *zap.Logger) *EvmDataIntegrityCheck { - return &EvmDataIntegrityCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} +func (c *EvmDataIntegrityCheck) getBlockHashFromNodeResponse(response string) (string, error) { + var evmRsp blockByNumberResponse + err := json.Unmarshal([]byte(response), &evmRsp) + if err != nil { + return "", err + } + return evmRsp.Result.Hash, nil } -type nodeResponsePair struct { - node *models.QosNode - result blockByNumberResponse +func NewEvmDataIntegrityCheck(check *checks.Check, logger *zap.Logger) *EvmDataIntegrityCheck { + return &EvmDataIntegrityCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} } func (c *EvmDataIntegrityCheck) Name() string { @@ -61,112 +54,17 @@ func (c *EvmDataIntegrityCheck) SetNodes(nodes []*models.QosNode) { func (c *EvmDataIntegrityCheck) Perform() { // Session is not meant for EVM - if len(c.NodeList) == 0 || !c.NodeList[0].IsEvmChain() { + if len(c.NodeList) == 0 || !c.IsEvmChain(c.NodeList[0]) { return } - - // Find a node that has been reported as healthy to use as source of truth - sourceOfTruth := c.findRandomHealthyNode() - - // Node that is synced cannot be found, so we cannot run data integrity checks since we need a trusted source - if sourceOfTruth == nil { - c.logger.Sugar().Warnw("cannot find source of truth for data integrity check", "chain", c.NodeList[0].GetChain()) - return - } - - // Map to count number of nodes that return blockHash -> counter - nodeResponseCounts := make(map[string]int) - - var nodeResponsePairs []*nodeResponsePair - - // find a random block to search that nodes should have access too - blockNumberToSearch := sourceOfTruth.GetLastKnownHeight() - uint64(checks.GetDataIntegrityHeightLookback(c.ChainConfiguration, sourceOfTruth.GetChain(), dataIntegrityHeightLookbackDefault)) - - attestationResponses := checks.SendRelaysAsync(c.PocketRelayer, c.getEligibleNodes(), getBlockByNumberPayload(blockNumberToSearch), "POST") - for rsp := range attestationResponses { - - if rsp.Error != nil { - checks.DefaultPunishNode(rsp.Error, rsp.Node, c.logger) - continue - } - - var resp blockByNumberResponse - err := json.Unmarshal([]byte(rsp.Relay.Response), &resp) - if err != nil { - c.logger.Sugar().Warnw("failed to unmarshal response", "err", err) - checks.DefaultPunishNode(fasthttp.ErrTimeout, rsp.Node, c.logger) - continue - } - - rsp.Node.SetLastDataIntegrityCheckTime(time.Now()) - nodeResponsePairs = append(nodeResponsePairs, &nodeResponsePair{ - node: rsp.Node, - result: resp, - }) - nodeResponseCounts[resp.Result.Hash]++ - } - - majorityBlockHash := findMajorityBlockHash(nodeResponseCounts) - - // Blcok hash must not be empty - if majorityBlockHash == "" { - return - } - - // Penalize other node operators with a timeout if they don't attest with same block hash. - for _, nodeResp := range nodeResponsePairs { - if nodeResp.result.Result.Hash != majorityBlockHash { - c.logger.Sugar().Errorw("punishing node for failed data integrity check", "node", nodeResp.node.MorseNode.ServiceUrl, "nodeBlockHash", nodeResp.result.Result, "trustedSourceBlockHash", majorityBlockHash) - nodeResp.node.SetTimeoutUntil(time.Now().Add(dataIntegrityTimePenalty), models.DataIntegrityTimeout, fmt.Errorf("evmDataIntegrityCheck: nodeBlockHash %s, trustedSourceBlockHash %s", nodeResp.result.Result, majorityBlockHash)) - } - } - + checks.PerformDataIntegrityCheck(c.Check, getBlockByNumberPayload, "", c.getBlockHashFromNodeResponse, c.logger) c.nextCheckTime = time.Now().Add(dataIntegrityCheckInterval) } -// findMajorityBlockHash finds the hash with the highest response count -func findMajorityBlockHash(responseCounts map[string]int) string { - var highestResponseHash string - var highestResponseCount int - for rsp, count := range responseCounts { - if count > highestResponseCount { - highestResponseHash = rsp - highestResponseCount = count - } - } - return highestResponseHash -} - func (c *EvmDataIntegrityCheck) ShouldRun() bool { return c.nextCheckTime.IsZero() || time.Now().After(c.nextCheckTime) } -// findRandomHealthyNode - returns a healthy node that is synced so we can use it as a source of truth for data integrity checks -func (c *EvmDataIntegrityCheck) findRandomHealthyNode() *models.QosNode { - var healthyNodes []*models.QosNode - for _, node := range c.NodeList { - if node.IsHealthy() { - healthyNodes = append(healthyNodes, node) - } - } - healthyNode, ok := common.GetRandomElement(healthyNodes) - if !ok { - return nil - } - return healthyNode -} - -func (c *EvmDataIntegrityCheck) getEligibleNodes() []*models.QosNode { - // Filter nodes based on last checked time - var eligibleNodes []*models.QosNode - for _, node := range c.NodeList { - if (node.GetLastDataIntegrityCheckTime().IsZero() || time.Since(node.GetLastDataIntegrityCheckTime()) >= dataIntegrityNodeCheckInterval) && node.IsHealthy() { - eligibleNodes = append(eligibleNodes, node) - } - } - return eligibleNodes -} - func getBlockByNumberPayload(blockNumber uint64) string { return fmt.Sprintf(blockPayloadFmt, "0x"+strconv.FormatInt(int64(blockNumber), 16)) } diff --git a/internal/node_selector_service/checks/evm_height_check/evm_height_check.go b/internal/node_selector_service/checks/evm_height_check/evm_height_check.go index a74e132..41697ae 100644 --- a/internal/node_selector_service/checks/evm_height_check/evm_height_check.go +++ b/internal/node_selector_service/checks/evm_height_check/evm_height_check.go @@ -6,33 +6,19 @@ import ( "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" "github.com/pokt-network/gateway-server/internal/node_selector_service/models" "github.com/pquerna/ffjson/ffjson" - "github.com/valyala/fasthttp" "go.uber.org/zap" - "gonum.org/v1/gonum/stat" - "math" "strconv" "strings" "time" ) const ( - // zScore to remove outliers for determining highest height - zScoreHeightThreshold = 3 - - // interval to check a node height again - checkNodeHeightInterval = time.Minute * 5 // interval to run the evm height check evmHeightCheckInterval = time.Second * 1 - // penalty for being out of sync - evmHeightCheckPenalty = time.Minute * 5 - // jsonrpc payload to retrieve evm height heightJsonPayload = `{"jsonrpc":"2.0","method":"eth_blockNumber","params": [],"id":1}` - - // default height allowance - defaultHeightTolerance int = 100 ) type evmHeightResponse struct { @@ -81,52 +67,10 @@ func (c *EvmHeightCheck) Name() string { func (c *EvmHeightCheck) Perform() { // Session is not meant for EVM - if len(c.NodeList) == 0 || !c.NodeList[0].IsEvmChain() { + if len(c.NodeList) == 0 || !c.IsEvmChain(c.NodeList[0]) { return } - - // Send request to all nodes - relayResponses := checks.SendRelaysAsync(c.PocketRelayer, c.NodeList, heightJsonPayload, "POST") - - var nodesResponded []*models.QosNode - // Process relay responses - for resp := range relayResponses { - - err := resp.Error - if err != nil { - checks.DefaultPunishNode(err, resp.Node, c.logger) - continue - } - - var evmHeightResp evmHeightResponse - err = json.Unmarshal([]byte(resp.Relay.Response), &evmHeightResp) - - if err != nil { - c.logger.Sugar().Warnw("failed to unmarshal response", "err", err) - // Treat a invalid response as a timeout error - checks.DefaultPunishNode(fasthttp.ErrTimeout, resp.Node, c.logger) - continue - } - - resp.Node.SetLastHeightCheckTime(time.Now()) - resp.Node.SetLastKnownHeight(evmHeightResp.Height) - nodesResponded = append(nodesResponded, resp.Node) - } - - highestNodeHeight := getHighestNodeHeight(nodesResponded) - // Compare each node's reported height against the highest reported height - for _, node := range nodesResponded { - heightDifference := int(highestNodeHeight - node.GetLastKnownHeight()) - // Penalize nodes whose reported height is significantly lower than the highest reported height - if heightDifference > checks.GetBlockHeightTolerance(c.ChainConfiguration, node.GetChain(), defaultHeightTolerance) { - c.logger.Sugar().Infow("node is out of sync", "node", node.MorseNode.ServiceUrl, "heightDifference", heightDifference, "nodeSyncedHeight", node.GetLastKnownHeight(), "highestNodeHeight", highestNodeHeight, "chain", node.GetChain()) - // Punish Node specifically due to timeout. - node.SetSynced(false) - node.SetTimeoutUntil(time.Now().Add(evmHeightCheckPenalty), models.OutOfSyncTimeout, fmt.Errorf("evmHeightCheck: heightDifference: %d, nodeSyncedHeight: %d, highestNodeHeight: %d", heightDifference, node.GetLastKnownHeight(), highestNodeHeight)) - } else { - node.SetSynced(true) - } - } + checks.PerformDefaultHeightCheck(c.Check, heightJsonPayload, "", c.getHeightFromNodeResponse, c.logger) c.nextCheckTime = time.Now().Add(evmHeightCheckInterval) } @@ -138,43 +82,11 @@ func (c *EvmHeightCheck) ShouldRun() bool { return time.Now().After(c.nextCheckTime) } -func (c *EvmHeightCheck) getEligibleNodes() []*models.QosNode { - // Filter nodes based on last checked time - var eligibleNodes []*models.QosNode - for _, node := range c.NodeList { - if node.GetLastHeightCheckTime().IsZero() || time.Since(node.GetLastHeightCheckTime()) >= checkNodeHeightInterval { - eligibleNodes = append(eligibleNodes, node) - } - } - return eligibleNodes -} - -// getHighestHeight returns the highest height reported from a slice of nodes -// by using z-score threshhold to prevent any misconfigured or malicious node -func getHighestNodeHeight(nodes []*models.QosNode) uint64 { - - var nodeHeights []float64 - for _, node := range nodes { - nodeHeights = append(nodeHeights, float64(node.GetLastKnownHeight())) - } - - // Calculate mean and standard deviation - meanValue := stat.Mean(nodeHeights, nil) - stdDevValue := stat.StdDev(nodeHeights, nil) - - var highestNodeHeight float64 - for _, nodeHeight := range nodeHeights { - - zScore := stat.StdScore(nodeHeight, meanValue, stdDevValue) - - // height is an outlier according to zScore threshold - if math.Abs(zScore) > zScoreHeightThreshold { - continue - } - // Height is higher than last recorded height - if nodeHeight > highestNodeHeight { - highestNodeHeight = nodeHeight - } +func (c *EvmHeightCheck) getHeightFromNodeResponse(response string) (uint64, error) { + var evmRsp evmHeightResponse + err := json.Unmarshal([]byte(response), &evmRsp) + if err != nil { + return 0, err } - return uint64(highestNodeHeight) + return evmRsp.Height, nil } diff --git a/internal/node_selector_service/checks/height_check_handler.go b/internal/node_selector_service/checks/height_check_handler.go new file mode 100644 index 0000000..b61fc69 --- /dev/null +++ b/internal/node_selector_service/checks/height_check_handler.go @@ -0,0 +1,112 @@ +package checks + +import ( + "fmt" + "github.com/pokt-network/gateway-server/internal/node_selector_service/models" + "github.com/valyala/fasthttp" + "go.uber.org/zap" + "gonum.org/v1/gonum/stat" + "math" + "time" +) + +const ( + defaultNodeHeightCheckInterval = time.Minute * 5 + defaultZScoreHeightThreshold = 3 + defaultHeightTolerance int = 100 + defaultCheckPenalty = time.Minute * 5 +) + +type HeightJsonParser func(response string) (uint64, error) + +// PerformDefaultHeightCheck is the default implementation of a height check by: +// 0. Filtering out nodes that have not been checked since defaultNodeHeightCheckInterval +// 1. Sending height request via payload to all the nodes +// 2. Punishing all nodes that return an error +// 3. Filtering out nodes that are returning a height out of the zScore threshold +// 4. Punishing the nodes with defaultCheckPenalty that exceed the height tolerance. +func PerformDefaultHeightCheck(check *Check, payload string, path string, parseHeight HeightJsonParser, logger *zap.Logger) { + + logger.Sugar().Infow("running default height check", "chain", check.NodeList[0].GetChain()) + + var nodesResponded []*models.QosNode + // Send request to all nodes + relayResponses := SendRelaysAsync(check.PocketRelayer, getEligibleHeightCheckNodes(check.NodeList), payload, "POST", path) + + // Process relay responses + for resp := range relayResponses { + err := resp.Error + if err != nil { + DefaultPunishNode(err, resp.Node, logger) + continue + } + + height, err := parseHeight(resp.Relay.Response) + if err != nil { + logger.Sugar().Warnw("failed to unmarshal response", "err", err) + // Treat an invalid response as a timeout error + DefaultPunishNode(fasthttp.ErrTimeout, resp.Node, logger) + continue + } + + resp.Node.SetLastHeightCheckTime(time.Now()) + resp.Node.SetLastKnownHeight(height) + nodesResponded = append(nodesResponded, resp.Node) + } + + highestNodeHeight := getHighestNodeHeight(nodesResponded, defaultZScoreHeightThreshold) + // Compare each node's reported height against the highest reported height + for _, node := range nodesResponded { + heightDifference := int(highestNodeHeight - node.GetLastKnownHeight()) + // Penalize nodes whose reported height is significantly lower than the highest reported height + if heightDifference > GetBlockHeightTolerance(check.ChainConfiguration, node.GetChain(), defaultHeightTolerance) { + logger.Sugar().Infow("node is out of sync", "node", node.MorseNode.ServiceUrl, "heightDifference", heightDifference, "nodeSyncedHeight", node.GetLastKnownHeight(), "highestNodeHeight", highestNodeHeight, "chain", node.GetChain()) + // Punish Node specifically due to timeout. + node.SetSynced(false) + node.SetTimeoutUntil(time.Now().Add(defaultCheckPenalty), models.OutOfSyncTimeout, fmt.Errorf("heightDifference: %d, nodeSyncedHeight: %d, highestNodeHeight: %d", heightDifference, node.GetLastKnownHeight(), highestNodeHeight)) + } else { + node.SetSynced(true) + } + } +} + +// getHighestHeight returns the highest height reported from a slice of nodes +// by using z-score threshhold to prevent any misconfigured or malicious node +func getHighestNodeHeight(nodes []*models.QosNode, zScoreHeightThreshhold float64) uint64 { + + var nodeHeights []float64 + for _, node := range nodes { + nodeHeights = append(nodeHeights, float64(node.GetLastKnownHeight())) + } + + // Calculate mean and standard deviation + meanValue := stat.Mean(nodeHeights, nil) + stdDevValue := stat.StdDev(nodeHeights, nil) + + var highestNodeHeight float64 + for _, nodeHeight := range nodeHeights { + + zScore := stat.StdScore(nodeHeight, meanValue, stdDevValue) + + // height is an outlier according to zScore threshold + if math.Abs(zScore) > zScoreHeightThreshhold { + continue + } + // Height is higher than last recorded height + if nodeHeight > highestNodeHeight { + highestNodeHeight = nodeHeight + } + } + return uint64(highestNodeHeight) +} + +func getEligibleHeightCheckNodes(nodes []*models.QosNode) []*models.QosNode { + // Filter nodes based on last checked time + var eligibleNodes []*models.QosNode + for _, node := range nodes { + if node.GetLastHeightCheckTime().IsZero() || time.Since(node.GetLastHeightCheckTime()) >= defaultNodeHeightCheckInterval { + eligibleNodes = append(eligibleNodes, node) + } + } + return eligibleNodes +} diff --git a/internal/node_selector_service/checks/pokt_data_integrity_check/pokt_data_integrity_check.go b/internal/node_selector_service/checks/pokt_data_integrity_check/pokt_data_integrity_check.go new file mode 100644 index 0000000..1a06d5a --- /dev/null +++ b/internal/node_selector_service/checks/pokt_data_integrity_check/pokt_data_integrity_check.go @@ -0,0 +1,71 @@ +package pokt_data_integrity_check + +import ( + "encoding/json" + "fmt" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" + "github.com/pokt-network/gateway-server/internal/node_selector_service/models" + "go.uber.org/zap" + "time" +) + +const poktBlockTxEndpoint = "/v1/query/blocktxs" + +const ( + // how often the job should run + dataIntegrityCheckInterval = time.Second * 1 + + //json rpc payload to send a data integrity check + blockPayloadFmt = `{"height": %d}` +) + +type blockTxResponse struct { + TotalTxs int `json:"total_txs"` +} + +type PoktDataIntegrityCheck struct { + *checks.Check + nextCheckTime time.Time + logger *zap.Logger +} + +// getBlockIdentifierFromNodeResponse: We use total txs as the block identifier because retrieving block hash from POKT RPC can lead up to +// 8MB+ payloads per node operator response, whereas blocktxs is only ~110kb +func (c *PoktDataIntegrityCheck) getBlockIdentifierFromNodeResponse(response string) (string, error) { + var blockTxRsp blockTxResponse + err := json.Unmarshal([]byte(response), &blockTxRsp) + if err != nil { + return "", err + } + return fmt.Sprintf("%d", blockTxRsp.TotalTxs), nil +} + +func NewPoktDataIntegrityCheck(check *checks.Check, logger *zap.Logger) *PoktDataIntegrityCheck { + return &PoktDataIntegrityCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} +} + +func (c *PoktDataIntegrityCheck) Name() string { + return "pokt_data_integrity_check" +} + +func (c *PoktDataIntegrityCheck) SetNodes(nodes []*models.QosNode) { + c.NodeList = nodes +} + +func (c *PoktDataIntegrityCheck) Perform() { + + // Session is not meant for POKT + if len(c.NodeList) == 0 || !c.IsPoktChain(c.NodeList[0]) { + return + } + checks.PerformDataIntegrityCheck(c.Check, getBlockByNumberPayload, poktBlockTxEndpoint, c.getBlockIdentifierFromNodeResponse, c.logger) + c.nextCheckTime = time.Now().Add(dataIntegrityCheckInterval) +} + +func (c *PoktDataIntegrityCheck) ShouldRun() bool { + return c.nextCheckTime.IsZero() || time.Now().After(c.nextCheckTime) +} + +func getBlockByNumberPayload(blockNumber uint64) string { + return fmt.Sprintf(blockPayloadFmt, blockNumber) +} diff --git a/internal/node_selector_service/checks/pokt_height_check/pokt_height_check.go b/internal/node_selector_service/checks/pokt_height_check/pokt_height_check.go new file mode 100644 index 0000000..dcf86f6 --- /dev/null +++ b/internal/node_selector_service/checks/pokt_height_check/pokt_height_check.go @@ -0,0 +1,63 @@ +package pokt_height_check + +import ( + "encoding/json" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" + "github.com/pokt-network/gateway-server/internal/node_selector_service/models" + "go.uber.org/zap" + "time" +) + +const ( + + // interval to run the pokt height check job + poktHeightCheckInterval = time.Second * 1 + + // jsonrpc payload to pokt evm height + heightJsonPayload = `` +) + +type poktHeightResponse struct { + Height uint64 `json:"height"` +} + +type PoktHeightCheck struct { + *checks.Check + nextCheckTime time.Time + logger *zap.Logger +} + +func NewPoktHeightCheck(check *checks.Check, logger *zap.Logger) *PoktHeightCheck { + return &PoktHeightCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} +} + +func (c *PoktHeightCheck) Name() string { + return "pokt_height_check" +} + +func (c *PoktHeightCheck) Perform() { + + // Session is not meant for POKT + if len(c.NodeList) == 0 || !c.IsPoktChain(c.NodeList[0]) { + return + } + checks.PerformDefaultHeightCheck(c.Check, heightJsonPayload, "/v1/query/height", c.getHeightFromNodeResponse, c.logger) + c.nextCheckTime = time.Now().Add(poktHeightCheckInterval) +} + +func (c *PoktHeightCheck) SetNodes(nodes []*models.QosNode) { + c.NodeList = nodes +} + +func (c *PoktHeightCheck) ShouldRun() bool { + return time.Now().After(c.nextCheckTime) +} + +func (c *PoktHeightCheck) getHeightFromNodeResponse(response string) (uint64, error) { + var poktRsp poktHeightResponse + err := json.Unmarshal([]byte(response), &poktRsp) + if err != nil { + return 0, err + } + return poktRsp.Height, nil +} diff --git a/internal/node_selector_service/checks/qos_check.go b/internal/node_selector_service/checks/qos_check.go index 434556b..913e37e 100644 --- a/internal/node_selector_service/checks/qos_check.go +++ b/internal/node_selector_service/checks/qos_check.go @@ -2,10 +2,23 @@ package checks import ( "github.com/pokt-network/gateway-server/internal/chain_configurations_registry" + "github.com/pokt-network/gateway-server/internal/chain_network" + config2 "github.com/pokt-network/gateway-server/internal/global_config" qos_models "github.com/pokt-network/gateway-server/internal/node_selector_service/models" "github.com/pokt-network/gateway-server/pkg/pokt/pokt_v0" ) +const ( + chainMorseMainnetSolanaCustom = "C006" + chainMorseMainnetPokt = "0001" + chainMorseMainnetSolana = "0006" +) + +const ( + chainMorseTestnetPokt = "0013" + chainMorseTestnetSolana = "0008" +) + type CheckJob interface { Perform() Name() string @@ -14,11 +27,32 @@ type CheckJob interface { } type Check struct { - NodeList []*qos_models.QosNode - PocketRelayer pokt_v0.PocketRelayer - ChainConfiguration chain_configurations_registry.ChainConfigurationsService + NodeList []*qos_models.QosNode + PocketRelayer pokt_v0.PocketRelayer + ChainConfiguration chain_configurations_registry.ChainConfigurationsService + ChainNetworkProvider config2.ChainNetworkProvider +} + +func NewCheck(pocketRelayer pokt_v0.PocketRelayer, chainConfiguration chain_configurations_registry.ChainConfigurationsService, chainNetworkProvider config2.ChainNetworkProvider) *Check { + return &Check{PocketRelayer: pocketRelayer, ChainConfiguration: chainConfiguration, ChainNetworkProvider: chainNetworkProvider} +} + +func (c *Check) IsSolanaChain(node *qos_models.QosNode) bool { + chainId := node.GetChain() + if c.ChainNetworkProvider.GetChainNetwork() == chain_network.MorseTestnet { + return chainId == chainMorseTestnetSolana + } + return chainId == chainMorseMainnetSolana || chainId == chainMorseMainnetSolanaCustom +} + +func (c *Check) IsPoktChain(node *qos_models.QosNode) bool { + chainId := node.GetChain() + if c.ChainNetworkProvider.GetChainNetwork() == chain_network.MorseTestnet { + return chainId == chainMorseTestnetPokt + } + return chainId == chainMorseMainnetPokt } -func NewCheck(pocketRelayer pokt_v0.PocketRelayer, chainConfiguration chain_configurations_registry.ChainConfigurationsService) *Check { - return &Check{PocketRelayer: pocketRelayer, ChainConfiguration: chainConfiguration} +func (c *Check) IsEvmChain(node *qos_models.QosNode) bool { + return !c.IsPoktChain(node) && !c.IsSolanaChain(node) } diff --git a/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go b/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go new file mode 100644 index 0000000..770a774 --- /dev/null +++ b/internal/node_selector_service/checks/solana_data_integrity_check/solana_data_integrity_check.go @@ -0,0 +1,70 @@ +package solana_data_integrity_check + +import ( + "encoding/json" + "fmt" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" + "github.com/pokt-network/gateway-server/internal/node_selector_service/models" + "go.uber.org/zap" + "time" +) + +const ( + // how often the job should run + dataIntegrityCheckInterval = time.Second * 1 + + //json rpc payload to send a data integrity check + // we use signatures for transaction detail to prevent large payloads and we don't need anything but block hash + blockPayloadFmt = `{"jsonrpc":"2.0","method":"getBlock","params":[%d, {"encoding": "jsonParsed", "maxSupportedTransactionVersion":0, "transactionDetails":"signatures"}],"id":1}` +) + +type blockByNumberResponse struct { + Result struct { + Hash string `json:"blockhash"` + } `json:"result"` +} + +type SolanaDataIntegrityCheck struct { + *checks.Check + nextCheckTime time.Time + logger *zap.Logger +} + +func (c *SolanaDataIntegrityCheck) getBlockIdentifierFromNodeResponse(response string) (string, error) { + var blockRsp blockByNumberResponse + err := json.Unmarshal([]byte(response), &blockRsp) + if err != nil { + return "", err + } + return blockRsp.Result.Hash, nil +} + +func NewSolanaDataIntegrityCheck(check *checks.Check, logger *zap.Logger) *SolanaDataIntegrityCheck { + return &SolanaDataIntegrityCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} +} + +func (c *SolanaDataIntegrityCheck) Name() string { + return "solana_data_integrity_check" +} + +func (c *SolanaDataIntegrityCheck) SetNodes(nodes []*models.QosNode) { + c.NodeList = nodes +} + +func (c *SolanaDataIntegrityCheck) Perform() { + + // Session is not meant for Solana + if len(c.NodeList) == 0 || !c.IsSolanaChain(c.NodeList[0]) { + return + } + checks.PerformDataIntegrityCheck(c.Check, getBlockByNumberPayload, "", c.getBlockIdentifierFromNodeResponse, c.logger) + c.nextCheckTime = time.Now().Add(dataIntegrityCheckInterval) +} + +func (c *SolanaDataIntegrityCheck) ShouldRun() bool { + return c.nextCheckTime.IsZero() || time.Now().After(c.nextCheckTime) +} + +func getBlockByNumberPayload(blockNumber uint64) string { + return fmt.Sprintf(blockPayloadFmt, blockNumber) +} diff --git a/internal/node_selector_service/checks/solana_height_check/solana_height_check.go b/internal/node_selector_service/checks/solana_height_check/solana_height_check.go new file mode 100644 index 0000000..4adc7ac --- /dev/null +++ b/internal/node_selector_service/checks/solana_height_check/solana_height_check.go @@ -0,0 +1,63 @@ +package solana_height_check + +import ( + "encoding/json" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" + "github.com/pokt-network/gateway-server/internal/node_selector_service/models" + "go.uber.org/zap" + "time" +) + +const ( + + // interval to run the solana height check + solanaHeightCheckInterval = time.Second * 1 + + // jsonrpc payload to retrieve solana height + heightJsonPayload = `{"jsonrpc":"2.0","method":"getSlot","params": [],"id":1}` +) + +type solanaHeightResponse struct { + Result uint64 `json:"result"` +} + +type SolanaHeightCheck struct { + *checks.Check + nextCheckTime time.Time + logger *zap.Logger +} + +func NewSolanaHeightCheck(check *checks.Check, logger *zap.Logger) *SolanaHeightCheck { + return &SolanaHeightCheck{Check: check, nextCheckTime: time.Time{}, logger: logger} +} + +func (c *SolanaHeightCheck) Name() string { + return "solana_height_check" +} + +func (c *SolanaHeightCheck) Perform() { + + // Session is not meant for Solana + if len(c.NodeList) == 0 || !c.IsSolanaChain(c.NodeList[0]) { + return + } + checks.PerformDefaultHeightCheck(c.Check, heightJsonPayload, "", c.getHeightFromNodeResponse, c.logger) + c.nextCheckTime = time.Now().Add(solanaHeightCheckInterval) +} + +func (c *SolanaHeightCheck) SetNodes(nodes []*models.QosNode) { + c.NodeList = nodes +} + +func (c *SolanaHeightCheck) ShouldRun() bool { + return time.Now().After(c.nextCheckTime) +} + +func (c *SolanaHeightCheck) getHeightFromNodeResponse(response string) (uint64, error) { + var solanaRsp solanaHeightResponse + err := json.Unmarshal([]byte(response), &solanaRsp) + if err != nil { + return 0, err + } + return solanaRsp.Result, nil +} diff --git a/internal/node_selector_service/models/qos_node.go b/internal/node_selector_service/models/qos_node.go index 7f5011b..29f2786 100644 --- a/internal/node_selector_service/models/qos_node.go +++ b/internal/node_selector_service/models/qos_node.go @@ -16,10 +16,6 @@ const ( latencyCompression = 1000 ) -const ( - chainSolanaCustom = "C006" - chainSolana = "0006" -) const ( OutOfSyncTimeout TimeoutReason = "out_of_sync_timeout" DataIntegrityTimeout TimeoutReason = "invalid_data_timeout" @@ -69,7 +65,7 @@ type QosNode struct { } func NewQosNode(morseNode *models.Node, pocketSession *models.Session, appSigner *models.Ed25519Account) *QosNode { - return &QosNode{MorseNode: morseNode, MorseSession: pocketSession, MorseSigner: appSigner, LatencyTracker: &LatencyTracker{tDigest: tdigest.NewWithCompression(1000)}} + return &QosNode{MorseNode: morseNode, MorseSession: pocketSession, MorseSigner: appSigner, LatencyTracker: &LatencyTracker{tDigest: tdigest.NewWithCompression(latencyCompression)}} } func (n *QosNode) IsHealthy() bool { @@ -129,15 +125,6 @@ func (n *QosNode) SetLastDataIntegrityCheckTime(lastDataIntegrityCheckTime time. n.lastDataIntegrityCheckTime = lastDataIntegrityCheckTime } -func (n *QosNode) IsSolanaChain() bool { - chainId := n.GetChain() - return chainId == chainSolana || chainId == chainSolanaCustom -} - -func (n *QosNode) IsEvmChain() bool { - return !n.IsSolanaChain() -} - func (n *QosNode) GetTimeoutReason() TimeoutReason { return n.timeoutReason } diff --git a/internal/node_selector_service/node_selector_service.go b/internal/node_selector_service/node_selector_service.go index 99c7d46..eaf1f5b 100644 --- a/internal/node_selector_service/node_selector_service.go +++ b/internal/node_selector_service/node_selector_service.go @@ -2,9 +2,14 @@ package node_selector_service import ( "github.com/pokt-network/gateway-server/internal/chain_configurations_registry" + "github.com/pokt-network/gateway-server/internal/global_config" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/evm_data_integrity_check" "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/evm_height_check" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/pokt_data_integrity_check" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/pokt_height_check" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/solana_data_integrity_check" + "github.com/pokt-network/gateway-server/internal/node_selector_service/checks/solana_height_check" "github.com/pokt-network/gateway-server/internal/node_selector_service/models" "github.com/pokt-network/gateway-server/internal/session_registry" "github.com/pokt-network/gateway-server/pkg/common" @@ -29,15 +34,19 @@ type NodeSelectorClient struct { checkJobs []checks.CheckJob } -func NewNodeSelectorService(sessionRegistry session_registry.SessionRegistryService, pocketRelayer pokt_v0.PocketRelayer, chainConfiguration chain_configurations_registry.ChainConfigurationsService, logger *zap.Logger) *NodeSelectorClient { +func NewNodeSelectorService(sessionRegistry session_registry.SessionRegistryService, pocketRelayer pokt_v0.PocketRelayer, chainConfiguration chain_configurations_registry.ChainConfigurationsService, networkProvider global_config.ChainNetworkProvider, logger *zap.Logger) *NodeSelectorClient { // base checks will share same node list and pocket relayer - baseCheck := checks.NewCheck(pocketRelayer, chainConfiguration) + baseCheck := checks.NewCheck(pocketRelayer, chainConfiguration, networkProvider) // enabled checks enabledChecks := []checks.CheckJob{ evm_height_check.NewEvmHeightCheck(baseCheck, logger.Named("evm_height_checker")), evm_data_integrity_check.NewEvmDataIntegrityCheck(baseCheck, logger.Named("evm_data_integrity_checker")), + solana_height_check.NewSolanaHeightCheck(baseCheck, logger.Named("solana_height_check")), + solana_data_integrity_check.NewSolanaDataIntegrityCheck(baseCheck, logger.Named("solana_data_integrity_check")), + pokt_height_check.NewPoktHeightCheck(baseCheck, logger.Named("pokt_height_check")), + pokt_data_integrity_check.NewPoktDataIntegrityCheck(baseCheck, logger.Named("pokt_data_integrity_check")), } selectorService := &NodeSelectorClient{ sessionRegistry: sessionRegistry, @@ -113,8 +122,7 @@ func (q NodeSelectorClient) startJobChecker() { case <-ticker: for _, job := range q.checkJobs { if job.ShouldRun() { - for sessionChainKey, nodes := range q.sessionRegistry.GetNodesMap() { - q.logger.Sugar().Infow("running job", "job", job.Name(), "sessionChainKey", sessionChainKey) + for _, nodes := range q.sessionRegistry.GetNodesMap() { job.SetNodes(nodes.Value()) job.Perform() } diff --git a/internal/relayer/relayer.go b/internal/relayer/relayer.go index f72bc99..1cc0aa2 100644 --- a/internal/relayer/relayer.go +++ b/internal/relayer/relayer.go @@ -15,7 +15,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/valyala/fasthttp" "go.uber.org/zap" + "net/url" "strconv" + "strings" "time" ) @@ -41,7 +43,7 @@ func init() { Name: "relay_counter", Help: "Request to send an actual relay and if it succeeded", }, - []string{"success", "altruist", "reason", "chain_id"}, + []string{"success", "altruist", "reason", "chain_id", "service_host"}, ) histogramRelayRequestLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -49,7 +51,7 @@ func init() { Name: "relay_latency", Help: "percentile on the request on latency to select a node, sign a request and send it to the network", }, - []string{"success", "altruist", "chain_id"}, + []string{"success", "altruist", "chain_id", "service_host"}, ) pocketClientHistogramRelayRequestLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -57,7 +59,7 @@ func init() { Name: "pocket_client_relay_latency", Help: "percentile on the request on latency to sign a request and send it to the network", }, - []string{"success", "altruist", "chain_id"}, + []string{"success", "chain_id", "service_host"}, ) prometheus.MustRegister(counterRelayRequest, histogramRelayRequestLatency, pocketClientHistogramRelayRequestLatency) } @@ -93,23 +95,26 @@ func (r *Relayer) SendRelay(req *models.SendRelayRequest) (*models.SendRelayResp success := false altruist := false + var nodeHost string startTime := time.Now() // Measure end to end latency for send relay defer func() { - histogramRelayRequestLatency.WithLabelValues(strconv.FormatBool(success), strconv.FormatBool(altruist), req.Chain).Observe(time.Since(startTime).Seconds()) + histogramRelayRequestLatency.WithLabelValues(strconv.FormatBool(success), strconv.FormatBool(altruist), req.Chain, nodeHost).Observe(time.Since(startTime).Seconds()) }() - rsp, err := r.sendNodeSelectorRelay(req) + rsp, host, err := r.sendNodeSelectorRelay(req) + // Set the host to record service domain + nodeHost = host // Node selector relay was successful if err == nil { success = true - counterRelayRequest.WithLabelValues("true", "false", "", req.Chain).Inc() + counterRelayRequest.WithLabelValues("true", "false", "", req.Chain, nodeHost).Inc() return rsp, nil } altruist = true - counterRelayRequest.WithLabelValues("false", "true", reasonRelayFailedPocketErr, req.Chain).Inc() + counterRelayRequest.WithLabelValues("false", "true", reasonRelayFailedPocketErr, req.Chain, "").Inc() r.logger.Sugar().Errorw("failed to send to pokt", "poktErr", err) altruistRsp, altruistErr := r.altruistRelay(req) @@ -121,17 +126,17 @@ func (r *Relayer) SendRelay(req *models.SendRelayRequest) (*models.SendRelayResp return altruistRsp, nil } -func (r *Relayer) sendNodeSelectorRelay(req *models.SendRelayRequest) (*models.SendRelayResponse, error) { +func (r *Relayer) sendNodeSelectorRelay(req *models.SendRelayRequest) (*models.SendRelayResponse, string, error) { // find a node to send too first. node, ok := r.nodeSelector.FindNode(req.Chain) if !ok { - return nil, errSelectNodeFail + return nil, "", errSelectNodeFail } req.Signer = node.MorseSigner req.Session = node.MorseSession req.SelectedNodePubKey = node.GetPublicKey() if err := req.Validate(); err != nil { - return nil, err + return nil, "", err } startRequestTime := time.Now() @@ -140,14 +145,16 @@ func (r *Relayer) sendNodeSelectorRelay(req *models.SendRelayRequest) (*models.S // Record latency to prom and latency tracker latency := time.Now().Sub(startRequestTime) - pocketClientHistogramRelayRequestLatency.WithLabelValues(strconv.FormatBool(err == nil), "false", req.Chain).Observe(latency.Seconds()) + + nodeHost := r.extractHostFromServiceUrl(node.MorseNode.ServiceUrl) + pocketClientHistogramRelayRequestLatency.WithLabelValues(strconv.FormatBool(err == nil), req.Chain, nodeHost).Observe(latency.Seconds()) node.GetLatencyTracker().RecordMeasurement(float64(latency.Milliseconds())) // Node returned an error, potentially penalize the node operator dependent on error if err != nil { checks.DefaultPunishNode(err, node, r.logger) } - return rsp, err + return rsp, nodeHost, err } func (r *Relayer) sendRandomNodeRelay(req *models.SendRelayRequest) (*models.SendRelayResponse, error) { @@ -169,7 +176,7 @@ func (r *Relayer) sendRandomNodeRelay(req *models.SendRelayRequest) (*models.Sen }) if err != nil { - counterRelayRequest.WithLabelValues("false", "true", reasonRelayFailedSessionErr, req.Chain).Inc() + counterRelayRequest.WithLabelValues("false", "true", reasonRelayFailedSessionErr, req.Chain, "").Inc() return nil, err } @@ -187,7 +194,7 @@ func (r *Relayer) sendRandomNodeRelay(req *models.SendRelayRequest) (*models.Sen rsp, err := r.pocketClient.SendRelay(req) // record if relay was successful - counterRelayRequest.WithLabelValues(strconv.FormatBool(err == nil), "false", "", req.Chain).Inc() + counterRelayRequest.WithLabelValues(strconv.FormatBool(err == nil), "false", "", req.Chain, r.extractHostFromServiceUrl(randomNode.MorseNode.ServiceUrl)).Inc() return rsp, err } @@ -220,7 +227,7 @@ func (r *Relayer) altruistRelay(req *models.SendRelayRequest) (*models.SendRelay err := r.httpRequester.DoTimeout(request, response, requestTimeout) success := err == nil - counterRelayRequest.WithLabelValues(strconv.FormatBool(success), "true", "", req.Chain).Inc() + counterRelayRequest.WithLabelValues(strconv.FormatBool(success), "true", "", req.Chain, "").Inc() if !success { return nil, err @@ -253,3 +260,34 @@ func (r *Relayer) getPocketRequestTimeout(chainId string) time.Duration { } return configTime } + +func (r *Relayer) extractHostFromServiceUrl(urlStr string) string { + if !r.globalConfigProvider.ShouldEmitServiceUrlPromMetrics() { + return "" + } + parsedURL, err := url.Parse(urlStr) + if err != nil { + return "" // return empty string or handle error + } + + // Get the hostname from the parsed URL + hostname := parsedURL.Hostname() + + // Find the last occurrence of "." in the hostname + index := strings.LastIndex(hostname, ".") + + // If there is no "." or it's the first character, return the hostname itself + if index == -1 || index == 0 { + return hostname + } + + // Find the index of the second-to-last occurrence of "." (root domain separator) + index = strings.LastIndex(hostname[:index-1], ".") + if index == -1 { + // If there is only one ".", return the hostname itself + return hostname + } + + // Extract and return the root domain + return hostname[index+1:] +} diff --git a/internal/relayer/relayer_test.go b/internal/relayer/relayer_test.go index 641c82b..8ba82b9 100644 --- a/internal/relayer/relayer_test.go +++ b/internal/relayer/relayer_test.go @@ -49,6 +49,7 @@ func (suite *RelayerTestSuite) TestNodeSelectorRelay() { request *models.SendRelayRequest setupMocks func(*models.SendRelayRequest) expectedResponse *models.SendRelayResponse + expectedNodeHost string expectedError error }{ { @@ -61,6 +62,7 @@ func (suite *RelayerTestSuite) TestNodeSelectorRelay() { suite.mockNodeSelectorService.EXPECT().FindNode("1234").Return(nil, false) }, expectedResponse: nil, + expectedNodeHost: "", expectedError: errSelectNodeFail, }, { @@ -72,8 +74,9 @@ func (suite *RelayerTestSuite) TestNodeSelectorRelay() { setupMocks: func(request *models.SendRelayRequest) { signer := &models.Ed25519Account{} - node := &models.Node{PublicKey: "123"} + node := &models.Node{PublicKey: "123", ServiceUrl: "http://complex.subdomain.root.com/test/123"} session := &models.Session{} + suite.mockConfigProvider.EXPECT().ShouldEmitServiceUrlPromMetrics().Return(true) suite.mockNodeSelectorService.EXPECT().FindNode("1234").Return(qos_models.NewQosNode(node, session, signer), true) // expect sendRelay to have same parameters as find node, otherwise validation will fail suite.mockPocketService.EXPECT().SendRelay(&models.SendRelayRequest{ @@ -84,6 +87,7 @@ func (suite *RelayerTestSuite) TestNodeSelectorRelay() { Session: session, }).Return(expectedResponse, nil) }, + expectedNodeHost: "root.com", expectedResponse: expectedResponse, expectedError: nil, }, @@ -97,12 +101,12 @@ func (suite *RelayerTestSuite) TestNodeSelectorRelay() { tc.setupMocks(tc.request) // setup mocks - rsp, err := suite.relayer.sendNodeSelectorRelay(tc.request) + rsp, host, err := suite.relayer.sendNodeSelectorRelay(tc.request) // assert results suite.Equal(tc.expectedResponse, rsp) + suite.Equal(tc.expectedNodeHost, host) suite.Equal(tc.expectedError, err) - }) } diff --git a/mocks/apps_registry/app_registry_mock.go b/mocks/apps_registry/app_registry_mock.go index b0aa744..023d53c 100644 --- a/mocks/apps_registry/app_registry_mock.go +++ b/mocks/apps_registry/app_registry_mock.go @@ -4,7 +4,6 @@ package apps_registry_mock import ( models "github.com/pokt-network/gateway-server/internal/apps_registry/models" - mock "github.com/stretchr/testify/mock" ) diff --git a/mocks/chain_configurations_registry/chain_configurations_registry_mock.go b/mocks/chain_configurations_registry/chain_configurations_registry_mock.go index aba5a8b..4a783d4 100644 --- a/mocks/chain_configurations_registry/chain_configurations_registry_mock.go +++ b/mocks/chain_configurations_registry/chain_configurations_registry_mock.go @@ -4,7 +4,6 @@ package chain_configurations_registry_mock import ( db_query "github.com/pokt-network/gateway-server/internal/db_query" - mock "github.com/stretchr/testify/mock" ) diff --git a/mocks/global_config/config_provider.go b/mocks/global_config/config_provider.go index b353e51..6bcc083 100644 --- a/mocks/global_config/config_provider.go +++ b/mocks/global_config/config_provider.go @@ -3,6 +3,7 @@ package global_config_mock import ( + chain_network "github.com/pokt-network/gateway-server/internal/chain_network" global_config "github.com/pokt-network/gateway-server/internal/global_config" mock "github.com/stretchr/testify/mock" @@ -113,6 +114,51 @@ func (_c *GlobalConfigProvider_GetAltruistRequestTimeout_Call) RunAndReturn(run return _c } +// GetChainNetwork provides a mock function with given fields: +func (_m *GlobalConfigProvider) GetChainNetwork() chain_network.ChainNetwork { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetChainNetwork") + } + + var r0 chain_network.ChainNetwork + if rf, ok := ret.Get(0).(func() chain_network.ChainNetwork); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(chain_network.ChainNetwork) + } + + return r0 +} + +// GlobalConfigProvider_GetChainNetwork_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChainNetwork' +type GlobalConfigProvider_GetChainNetwork_Call struct { + *mock.Call +} + +// GetChainNetwork is a helper method to define mock.On call +func (_e *GlobalConfigProvider_Expecter) GetChainNetwork() *GlobalConfigProvider_GetChainNetwork_Call { + return &GlobalConfigProvider_GetChainNetwork_Call{Call: _e.mock.On("GetChainNetwork")} +} + +func (_c *GlobalConfigProvider_GetChainNetwork_Call) Run(run func()) *GlobalConfigProvider_GetChainNetwork_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *GlobalConfigProvider_GetChainNetwork_Call) Return(_a0 chain_network.ChainNetwork) *GlobalConfigProvider_GetChainNetwork_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *GlobalConfigProvider_GetChainNetwork_Call) RunAndReturn(run func() chain_network.ChainNetwork) *GlobalConfigProvider_GetChainNetwork_Call { + _c.Call.Return(run) + return _c +} + // GetDatabaseConnectionUrl provides a mock function with given fields: func (_m *GlobalConfigProvider) GetDatabaseConnectionUrl() string { ret := _m.Called() @@ -338,6 +384,51 @@ func (_c *GlobalConfigProvider_GetPoktRPCRequestTimeout_Call) RunAndReturn(run f return _c } +// ShouldEmitServiceUrlPromMetrics provides a mock function with given fields: +func (_m *GlobalConfigProvider) ShouldEmitServiceUrlPromMetrics() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ShouldEmitServiceUrlPromMetrics") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ShouldEmitServiceUrlPromMetrics' +type GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call struct { + *mock.Call +} + +// ShouldEmitServiceUrlPromMetrics is a helper method to define mock.On call +func (_e *GlobalConfigProvider_Expecter) ShouldEmitServiceUrlPromMetrics() *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call { + return &GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call{Call: _e.mock.On("ShouldEmitServiceUrlPromMetrics")} +} + +func (_c *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call) Run(run func()) *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call) Return(_a0 bool) *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call) RunAndReturn(run func() bool) *GlobalConfigProvider_ShouldEmitServiceUrlPromMetrics_Call { + _c.Call.Return(run) + return _c +} + // NewGlobalConfigProvider creates a new instance of GlobalConfigProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewGlobalConfigProvider(t interface { diff --git a/mocks/node_selector/node_selector_mock.go b/mocks/node_selector/node_selector_mock.go index 155dacd..041d16a 100644 --- a/mocks/node_selector/node_selector_mock.go +++ b/mocks/node_selector/node_selector_mock.go @@ -4,7 +4,6 @@ package node_selector_mock import ( models "github.com/pokt-network/gateway-server/internal/node_selector_service/models" - mock "github.com/stretchr/testify/mock" ) diff --git a/mocks/pocket_service/pocket_service_mock.go b/mocks/pocket_service/pocket_service_mock.go index 589dba9..c906a03 100644 --- a/mocks/pocket_service/pocket_service_mock.go +++ b/mocks/pocket_service/pocket_service_mock.go @@ -4,7 +4,6 @@ package pocket_service_mock import ( models "github.com/pokt-network/gateway-server/pkg/pokt/pokt_v0/models" - mock "github.com/stretchr/testify/mock" ) diff --git a/mocks/session_registry/session_registry_mock.go b/mocks/session_registry/session_registry_mock.go index 1bff02e..60ca1f2 100644 --- a/mocks/session_registry/session_registry_mock.go +++ b/mocks/session_registry/session_registry_mock.go @@ -5,7 +5,6 @@ package session_registry_mock import ( models "github.com/pokt-network/gateway-server/internal/node_selector_service/models" pokt_v0models "github.com/pokt-network/gateway-server/pkg/pokt/pokt_v0/models" - mock "github.com/stretchr/testify/mock" session_registry "github.com/pokt-network/gateway-server/internal/session_registry" diff --git a/mocks/ttl_cache_service/ttl_cache_service_mock.go b/mocks/ttl_cache_service/ttl_cache_service_mock.go index 9d0a441..da85834 100644 --- a/mocks/ttl_cache_service/ttl_cache_service_mock.go +++ b/mocks/ttl_cache_service/ttl_cache_service_mock.go @@ -23,6 +23,38 @@ func (_m *TTLCacheService[K, V]) EXPECT() *TTLCacheService_Expecter[K, V] { return &TTLCacheService_Expecter[K, V]{mock: &_m.Mock} } +// DeleteExpired provides a mock function with given fields: +func (_m *TTLCacheService[K, V]) DeleteExpired() { + _m.Called() +} + +// TTLCacheService_DeleteExpired_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteExpired' +type TTLCacheService_DeleteExpired_Call[K comparable, V interface{}] struct { + *mock.Call +} + +// DeleteExpired is a helper method to define mock.On call +func (_e *TTLCacheService_Expecter[K, V]) DeleteExpired() *TTLCacheService_DeleteExpired_Call[K, V] { + return &TTLCacheService_DeleteExpired_Call[K, V]{Call: _e.mock.On("DeleteExpired")} +} + +func (_c *TTLCacheService_DeleteExpired_Call[K, V]) Run(run func()) *TTLCacheService_DeleteExpired_Call[K, V] { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *TTLCacheService_DeleteExpired_Call[K, V]) Return() *TTLCacheService_DeleteExpired_Call[K, V] { + _c.Call.Return() + return _c +} + +func (_c *TTLCacheService_DeleteExpired_Call[K, V]) RunAndReturn(run func()) *TTLCacheService_DeleteExpired_Call[K, V] { + _c.Call.Return(run) + return _c +} + // Get provides a mock function with given fields: key, opts func (_m *TTLCacheService[K, V]) Get(key K, opts ...ttlcache.Option[K, V]) *ttlcache.Item[K, V] { _va := make([]interface{}, len(opts))