From c62f312d412f478bb2ca00c6aec8d66588b5e413 Mon Sep 17 00:00:00 2001 From: Karan Sharma Date: Fri, 8 Dec 2023 11:20:25 +0530 Subject: [PATCH] feat: add support for app metrics --- cmd/app.go | 81 ++++++++++++++++++++++++++++-- cmd/init.go | 28 +++++++++++ cmd/main.go | 61 +++++++++++++++++++--- config.sample.toml | 41 ++++++++------- internal/nse/nse.go | 117 +++++++++++++++++++++++++++++++++++++++++++ pkg/models/models.go | 12 +++++ 6 files changed, 312 insertions(+), 28 deletions(-) diff --git a/cmd/app.go b/cmd/app.go index b283d91..152c884 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -17,9 +17,10 @@ type App struct { metricsMgr *metrics.Manager nseMgr *nse.Manager - hardwareSvc *hardwareService - dbSvc *dbService - networkSvc *networkService + hardwareSvc *hardwareService + dbSvc *dbService + networkSvc *networkService + applicationSvc *applicationService } type Opts struct { @@ -43,6 +44,11 @@ type networkService struct { queries map[string]string } +type applicationService struct { + hosts []string + queries map[string]string +} + // fetchHWMetrics fetches hardware metrics from the Prometheus HTTP API. func (app *App) fetchHWMetrics() (map[string]models.HWPromResp, error) { hwMetrics := make(map[string]models.HWPromResp) @@ -178,6 +184,53 @@ func (app *App) fetchNetworkMetrics() (map[string]models.NetworkPromResp, error) return networkMetrics, nil } +// fetchApplicationMetrics fetches application metrics from the Prometheus HTTP API. +func (app *App) fetchApplicationMetrics() (map[string]models.AppPromResp, error) { + appMetrics := make(map[string]models.AppPromResp) + + for _, host := range app.applicationSvc.hosts { + appMetricsResp := models.AppPromResp{} + for metric, query := range app.applicationSvc.queries { + switch metric { + case "throughput": + // NOTE: The query doesn't have a host parameter as it aggregates across all hosts. + value, err := app.metricsMgr.Query(query) + if err != nil { + app.lo.Error("Failed to query Prometheus", + "host", host, + "metric", metric, + "error", err) + continue + } + appMetricsResp.Throughput = value + + case "failure_count": + // NOTE: The query doesn't have a host parameter as it aggregates across all hosts. + value, err := app.metricsMgr.Query(query) + if err != nil { + app.lo.Error("Failed to query Prometheus", + "host", host, + "metric", metric, + "error", err) + continue + } + appMetricsResp.FailureCount = value + + default: + app.lo.Warn("Unknown application metric queried", + "host", host, + "metric", metric) + } + } + + // Add host metrics to the map. + appMetrics[host] = appMetricsResp + app.lo.Debug("fetched metrics", "host", host, "data", appMetricsResp) + } + + return appMetrics, nil +} + // pushHWMetrics pushes hardware metrics to the NSE. func (app *App) pushHWMetrics(host string, data models.HWPromResp) error { for i := 0; i < app.opts.MaxRetries; i++ { @@ -249,3 +302,25 @@ func (app *App) pushNetworkMetrics(host string, data models.NetworkPromResp) err } return nil } + +func (app *App) pushApplicationMetrics(host string, data models.AppPromResp) error { + for i := 0; i < app.opts.MaxRetries; i++ { + if err := app.nseMgr.PushAppMetrics(host, data); err != nil { + if i < app.opts.MaxRetries-1 { + app.lo.Error("Failed to push application metrics to NSE. Retrying...", + "host", host, + "attempt", i+1, + "error", err) + time.Sleep(app.opts.RetryInterval) + continue + } + app.lo.Error("Failed to push application metrics to NSE after max retries", + "host", host, + "max_retries", app.opts.MaxRetries, + "error", err) + return err + } + break + } + return nil +} diff --git a/cmd/init.go b/cmd/init.go index ab46a52..dec145a 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -207,6 +207,34 @@ func initNetworkSvc(ko *koanf.Koanf) (*networkService, error) { }, nil } +func initApplicationSvc(ko *koanf.Koanf) (*applicationService, error) { + var ( + queries = map[string]string{ + "failure_count": ko.MustString("metrics.application.failure_count"), + "throughput": ko.MustString("metrics.application.throughput"), + } + hosts = ko.Strings("metrics.application.hosts") + cfgPath = ko.String("prometheus.config_path") + ) + + if len(hosts) == 0 && cfgPath != "" { + defaultHosts, err := initDefaultHosts(ko, cfgPath) + if err != nil { + return nil, err + } + hosts = defaultHosts + } + + if len(hosts) == 0 { + return nil, fmt.Errorf("no hosts found in the config") + } + + return &applicationService{ + hosts: hosts, + queries: queries, + }, nil +} + // initNSEManager initialises the NSE manager. func initNSEManager(ko *koanf.Koanf, lo *slog.Logger) (*nse.Manager, error) { nseMgr, err := nse.New(lo, nse.Opts{ diff --git a/cmd/main.go b/cmd/main.go index 8a80293..70ee860 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -52,6 +52,13 @@ func main() { exit() } + // Load queries for application metrics. + applicationSvc, err := initApplicationSvc(ko) + if err != nil { + lo.Error("failed to init application service", "error", err) + exit() + } + // Initialise the NSE manager. nseMgr, err := initNSEManager(ko, lo) if err != nil { @@ -61,13 +68,14 @@ func main() { // Init the app. app := &App{ - lo: lo, - opts: initOpts(ko), - metricsMgr: metricsMgr, - nseMgr: nseMgr, - hardwareSvc: hardwareSvc, - dbSvc: dbSvc, - networkSvc: networkSvc, + lo: lo, + opts: initOpts(ko), + metricsMgr: metricsMgr, + nseMgr: nseMgr, + hardwareSvc: hardwareSvc, + dbSvc: dbSvc, + networkSvc: networkSvc, + applicationSvc: applicationSvc, } // Create a new context which is cancelled when `SIGINT`/`SIGTERM` is received. @@ -85,6 +93,9 @@ func main() { wg.Add(1) go app.syncNetworkMetricsWorker(ctx, wg) + wg.Add(1) + go app.syncApplicationMetricsWorker(ctx, wg) + // Listen on the close channel indefinitely until a // `SIGINT` or `SIGTERM` is received. <-ctx.Done() @@ -201,3 +212,39 @@ func (app *App) syncNetworkMetricsWorker(ctx context.Context, wg *sync.WaitGroup } } } + +// Add a new worker function for the application service. +func (app *App) syncApplicationMetricsWorker(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + ticker := time.NewTicker(app.opts.SyncInterval) + defer ticker.Stop() + + app.lo.Info("Starting application metrics worker", "interval", app.opts.SyncInterval) + for { + select { + case <-ticker.C: + data, err := app.fetchApplicationMetrics() + if err != nil { + app.lo.Error("Failed to fetch application metrics", "error", err) + continue + } + + // Push to upstream LAMA APIs. + for host, hostData := range data { + if err := app.pushApplicationMetrics(host, hostData); err != nil { + app.lo.Error("Failed to push application metrics to NSE", "host", host, "error", err) + continue + } + + // FIXME: Currently the LAMA API does not support multiple hosts. + // Once we've pushed the data for the first host, break the loop. + // Once the LAMA API supports multiple hosts, remove this. + break + } + case <-ctx.Done(): + app.lo.Info("Stopping application metrics worker") + return + } + } +} diff --git a/config.sample.toml b/config.sample.toml index 0c3c57f..ecfe4af 100644 --- a/config.sample.toml +++ b/config.sample.toml @@ -1,34 +1,34 @@ [app] -log_level = "debug" # To enable debug logging, level should be `debug`. -sync_interval = "5m" # Interval at which the app should fetch data from metrics store. +log_level = "debug" # To enable debug logging, level should be `debug`. +max_retries = 3 # Maximum number of retries for a failed request. retry_interval = "5s" # Interval at which the app should retry if the previous request failed. -max_retries = 3 # Maximum number of retries for a failed request. +sync_interval = "5m" # Interval at which the app should fetch data from metrics store. [lama.nse] +exchange_id = 1 # 1=National Stock Exchange +idle_timeout = "5m" # Idle timeout for HTTP requests +login_id = "redacted" +member_id = "redacted" +password = "redacted" +timeout = "30s" # Timeout for HTTP requests url = "https://lama.nse.internal" # Endpoint for NSE LAMA API Gateway -login_id = "redacted" -member_id = "redacted" -password = "redacted" -timeout = "30s" # Timeout for HTTP requests -idle_timeout = "5m" # Idle timeout for HTTP requests -exchange_id = 1 # 1=National Stock Exchange [prometheus] -endpoint = "http://prometheus:9090" # Endpoint for Prometheus API -query_path = "/api/v1/query" # Endpoint for Prometheus query API -username = "redacted" # HTTP Basic Auth username -password = "redacted" # HTTP Basic Auth password -timeout = "10s" # Timeout for HTTP requests -idle_timeout = "5m" # Idle timeout for HTTP requests -max_idle_conns = 10 config_path = "/etc/prometheus/prometheus.yml" # Path to Prometheus config file. This is used to load a list of hosts to fetch metrics for. +endpoint = "http://prometheus:9090" # Endpoint for Prometheus API +idle_timeout = "5m" # Idle timeout for HTTP requests +max_idle_conns = 10 +password = "redacted" # HTTP Basic Auth password +query_path = "/api/v1/query" # Endpoint for Prometheus query API +timeout = "10s" # Timeout for HTTP requests +username = "redacted" # HTTP Basic Auth username [metrics.hardware] # Define Prometheus queries for hardware metrics # List of hosts to fetch metrics for. Keep this empty to fetch metrics for all hosts defined in `prometheus.config_path` file. -hosts = [] cpu = '100 * (1 - avg(rate(node_cpu_seconds_total{mode="idle", hostname="%s"}[5m])))' -memory = '(1 - ((node_memory_MemFree_bytes{hostname="%s"} + node_memory_Buffers_bytes{hostname="%s"} + node_memory_Cached_bytes{hostname="%s"}) / node_memory_MemTotal_bytes{hostname="%s"})) * 100' disk = '100 - ((node_filesystem_avail_bytes{hostname="%s",device!~"rootfs"} * 100) / node_filesystem_size_bytes{hostname="%s",device!~"rootfs"})' +hosts = [] +memory = '(1 - ((node_memory_MemFree_bytes{hostname="%s"} + node_memory_Buffers_bytes{hostname="%s"} + node_memory_Cached_bytes{hostname="%s"}) / node_memory_MemTotal_bytes{hostname="%s"})) * 100' uptime = '(node_time_seconds{hostname="%s"} - node_boot_time_seconds{hostname="%s"}) / 60' [metrics.database] # Define Prometheus queries for db metrics @@ -36,5 +36,10 @@ hosts = [] status = 'up{hostname="%s"}' [metrics.network] +hosts = [] packet_errors = 'sum(rate(node_network_receive_errs_total{hostname="%s"}[5m])) + sum(rate(node_network_transmit_errs_total{hostname="%s"}[5m]))' + +[metrics.application] +failure_count = 'sum(sum without (hostname, instance, server) (rate(haproxy_server_http_responses_total{job="my-app",code="5xx",proxy="my-backend"}[5m]))) by (code)' hosts = [] +throughput = 'sum(sum without (hostname, instance, server) (rate(haproxy_server_http_responses_total{job="my-app",proxy="my-backend"}[5m]))) by (proxy)' diff --git a/internal/nse/nse.go b/internal/nse/nse.go index 8c536dc..64d5136 100644 --- a/internal/nse/nse.go +++ b/internal/nse/nse.go @@ -124,6 +124,14 @@ type NetworkReq struct { Payload []MetricPayload `json:"payload"` } +type AppReq struct { + MemberID string `json:"memberId"` + ExchangeID int `json:"exchangeId"` + SequenceID int `json:"sequenceId"` + Timestamp int64 `json:"timestamp"` + Payload []MetricPayload `json:"payload"` +} + func New(lo *slog.Logger, opts Opts) (*Manager, error) { client := &http.Client{ Timeout: opts.Timeout, @@ -527,6 +535,97 @@ func (mgr *Manager) PushNetworkMetrics(host string, data models.NetworkPromResp) return nil } +// PushAppMetrics sends app metrics to NSE LAMA API. +func (mgr *Manager) PushAppMetrics(host string, data models.AppPromResp) error { + endpoint := fmt.Sprintf("%s%s", mgr.opts.URL, "/api/V1/metrics/application") + + // Acquire read lock to safely read token and sequence ID. + mgr.RLock() + token := mgr.token + seqID := mgr.netSeqID + mgr.RUnlock() + + appPayload := createAppReq(data, mgr.opts.MemberID, mgr.opts.ExchangeID, seqID, 1) + + payload, err := json.Marshal(appPayload) + if err != nil { + mgr.lo.Error("Failed to marshal app metrics payload", "error", err) + return fmt.Errorf("failed to marshal app metrics payload: %v", err) + } + + mgr.lo.Info("Preparing to send app metrics", "host", host, "URL", endpoint, "payload", string(payload), "headers", mgr.headers) + + // Initialize new HTTP request for metrics push. + req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(payload)) + if err != nil { + mgr.lo.Error("Failed to create HTTP request", "error", err) + return fmt.Errorf("failed to create HTTP request: %v", err) + } + + // Set headers for the request. + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + for k, v := range mgr.headers { + req.Header.Set(k, strings.Join(v, ",")) + } + + // Execute HTTP request using the HTTP client. + resp, err := mgr.client.Do(req) + if err != nil { + mgr.lo.Error("App metrics HTTP request failed", "error", err) + return fmt.Errorf("app metrics HTTP request failed: %v", err) + } + defer resp.Body.Close() + + // Unmarshal the response into MetricsResp object. + var r MetricsResp + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + mgr.lo.Error("Failed to unmarshal app metrics response", "error", err) + return fmt.Errorf("failed to unmarshal app metrics response: %v", err) + } + + mgr.lo.Info("Received response for app metrics push", "response_code", r.ResponseCode, "response_description", r.ResponseDesc, "http_status", resp.StatusCode) + + if resp.StatusCode != http.StatusOK { + mgr.lo.Error("App metrics push failed", "response_code", r.ResponseCode, "response_desc", r.ResponseDesc, "errors", r.Errors) + switch r.ResponseCode { + case NSE_RESP_CODE_INVALID_TOKEN, NSE_RESP_CODE_EXPIRED_TOKEN: + mgr.lo.Warn("Token is invalid or expired, attempting to log in again") + if err := mgr.Login(); err != nil { + mgr.lo.Error("Relogin attempt failed", "error", err) + return fmt.Errorf("failed to log in again: %v", err) + } + return fmt.Errorf("new token obtained after relogin, retrying app metrics push") + + case NSE_RESP_CODE_INVALID_SEQ_ID: + mgr.lo.Warn("Sequence ID is invalid, attempting to update") + expectedSeqID, err := extractExpectedSequenceID(r.ResponseDesc) + if err != nil { + mgr.lo.Error("Failed to extract expected sequence ID", "error", err) + return fmt.Errorf("failed to extract expected sequence ID: %v", err) + } + mgr.lo.Info("Expected sequence ID identified", "expected_seq_id", expectedSeqID) + mgr.Lock() + mgr.netSeqID = expectedSeqID + mgr.Unlock() + return fmt.Errorf("sequence ID has been updated, retrying app metrics push") + + default: + mgr.lo.Error("App metrics push failed with unhandled response code", "response_code", r.ResponseCode) + return fmt.Errorf("app metrics push failed with unhandled response code: %d", r.ResponseCode) + } + } + + // Increase sequence ID if metrics push was successful or partially successful. + if r.ResponseCode == NSE_RESP_CODE_SUCCESS || r.ResponseCode == NSE_RESP_CODE_PARTIAL_SUCCESS { + mgr.Lock() + mgr.netSeqID++ + mgr.Unlock() + } + + return nil +} + func createNetworkReq(metrics models.NetworkPromResp, memberId string, exchangeId, sequenceId, applicationId int) NetworkReq { return NetworkReq{ MemberID: memberId, @@ -544,6 +643,24 @@ func createNetworkReq(metrics models.NetworkPromResp, memberId string, exchangeI } } +func createAppReq(metrics models.AppPromResp, memberId string, exchangeId, sequenceId, applicationId int) AppReq { + return AppReq{ + MemberID: memberId, + ExchangeID: exchangeId, + SequenceID: sequenceId, + Timestamp: time.Now().Unix(), + Payload: []MetricPayload{ + { + ApplicationID: applicationId, + MetricData: []MetricData{ + newMetricData("throughput", float64(metrics.Throughput), false), + newMetricData("failureTradeApi", float64(metrics.FailureCount), true), + }, + }, + }, + } +} + func createHardwareReq(metrics models.HWPromResp, memberId string, exchangeId, sequenceId, applicationId int) HardwareReq { return HardwareReq{ MemberID: memberId, diff --git a/pkg/models/models.go b/pkg/models/models.go index a982985..c0ae2ca 100644 --- a/pkg/models/models.go +++ b/pkg/models/models.go @@ -17,3 +17,15 @@ type DBPromResp struct { type NetworkPromResp struct { PacketErrors float64 `json:"packet_errors"` } + +// AppPromResp is the response from the Prometheus HTTP API for application metrics. +type AppPromResp struct { + Throughput float64 `json:"throughput"` + FailureCount float64 `json:"failure_count"` +} + +// AppMetric represents an individual application metric. +type AppMetric struct { + Name string `json:"name"` + Value float64 `json:"value"` +}