Skip to content

Commit

Permalink
feat: add support for app metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-karan committed Dec 8, 2023
1 parent 1849852 commit c62f312
Show file tree
Hide file tree
Showing 6 changed files with 312 additions and 28 deletions.
81 changes: 78 additions & 3 deletions cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ type App struct {
metricsMgr *metrics.Manager
nseMgr *nse.Manager

hardwareSvc *hardwareService
dbSvc *dbService
networkSvc *networkService
hardwareSvc *hardwareService
dbSvc *dbService
networkSvc *networkService
applicationSvc *applicationService
}

type Opts struct {
Expand All @@ -43,6 +44,11 @@ type networkService struct {
queries map[string]string
}

type applicationService struct {
hosts []string
queries map[string]string
}

// fetchHWMetrics fetches hardware metrics from the Prometheus HTTP API.
func (app *App) fetchHWMetrics() (map[string]models.HWPromResp, error) {
hwMetrics := make(map[string]models.HWPromResp)
Expand Down Expand Up @@ -178,6 +184,53 @@ func (app *App) fetchNetworkMetrics() (map[string]models.NetworkPromResp, error)
return networkMetrics, nil
}

// fetchApplicationMetrics fetches application metrics from the Prometheus HTTP API.
func (app *App) fetchApplicationMetrics() (map[string]models.AppPromResp, error) {
appMetrics := make(map[string]models.AppPromResp)

for _, host := range app.applicationSvc.hosts {
appMetricsResp := models.AppPromResp{}
for metric, query := range app.applicationSvc.queries {
switch metric {
case "throughput":
// NOTE: The query doesn't have a host parameter as it aggregates across all hosts.
value, err := app.metricsMgr.Query(query)
if err != nil {
app.lo.Error("Failed to query Prometheus",
"host", host,
"metric", metric,
"error", err)
continue
}
appMetricsResp.Throughput = value

case "failure_count":
// NOTE: The query doesn't have a host parameter as it aggregates across all hosts.
value, err := app.metricsMgr.Query(query)
if err != nil {
app.lo.Error("Failed to query Prometheus",
"host", host,
"metric", metric,
"error", err)
continue
}
appMetricsResp.FailureCount = value

default:
app.lo.Warn("Unknown application metric queried",
"host", host,
"metric", metric)
}
}

// Add host metrics to the map.
appMetrics[host] = appMetricsResp
app.lo.Debug("fetched metrics", "host", host, "data", appMetricsResp)
}

return appMetrics, nil
}

// pushHWMetrics pushes hardware metrics to the NSE.
func (app *App) pushHWMetrics(host string, data models.HWPromResp) error {
for i := 0; i < app.opts.MaxRetries; i++ {
Expand Down Expand Up @@ -249,3 +302,25 @@ func (app *App) pushNetworkMetrics(host string, data models.NetworkPromResp) err
}
return nil
}

func (app *App) pushApplicationMetrics(host string, data models.AppPromResp) error {
for i := 0; i < app.opts.MaxRetries; i++ {
if err := app.nseMgr.PushAppMetrics(host, data); err != nil {
if i < app.opts.MaxRetries-1 {
app.lo.Error("Failed to push application metrics to NSE. Retrying...",
"host", host,
"attempt", i+1,
"error", err)
time.Sleep(app.opts.RetryInterval)
continue
}
app.lo.Error("Failed to push application metrics to NSE after max retries",
"host", host,
"max_retries", app.opts.MaxRetries,
"error", err)
return err
}
break
}
return nil
}
28 changes: 28 additions & 0 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,34 @@ func initNetworkSvc(ko *koanf.Koanf) (*networkService, error) {
}, nil
}

func initApplicationSvc(ko *koanf.Koanf) (*applicationService, error) {
var (
queries = map[string]string{
"failure_count": ko.MustString("metrics.application.failure_count"),
"throughput": ko.MustString("metrics.application.throughput"),
}
hosts = ko.Strings("metrics.application.hosts")
cfgPath = ko.String("prometheus.config_path")
)

if len(hosts) == 0 && cfgPath != "" {
defaultHosts, err := initDefaultHosts(ko, cfgPath)
if err != nil {
return nil, err
}
hosts = defaultHosts
}

if len(hosts) == 0 {
return nil, fmt.Errorf("no hosts found in the config")
}

return &applicationService{
hosts: hosts,
queries: queries,
}, nil
}

// initNSEManager initialises the NSE manager.
func initNSEManager(ko *koanf.Koanf, lo *slog.Logger) (*nse.Manager, error) {
nseMgr, err := nse.New(lo, nse.Opts{
Expand Down
61 changes: 54 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func main() {
exit()
}

// Load queries for application metrics.
applicationSvc, err := initApplicationSvc(ko)
if err != nil {
lo.Error("failed to init application service", "error", err)
exit()
}

// Initialise the NSE manager.
nseMgr, err := initNSEManager(ko, lo)
if err != nil {
Expand All @@ -61,13 +68,14 @@ func main() {

// Init the app.
app := &App{
lo: lo,
opts: initOpts(ko),
metricsMgr: metricsMgr,
nseMgr: nseMgr,
hardwareSvc: hardwareSvc,
dbSvc: dbSvc,
networkSvc: networkSvc,
lo: lo,
opts: initOpts(ko),
metricsMgr: metricsMgr,
nseMgr: nseMgr,
hardwareSvc: hardwareSvc,
dbSvc: dbSvc,
networkSvc: networkSvc,
applicationSvc: applicationSvc,
}

// Create a new context which is cancelled when `SIGINT`/`SIGTERM` is received.
Expand All @@ -85,6 +93,9 @@ func main() {
wg.Add(1)
go app.syncNetworkMetricsWorker(ctx, wg)

wg.Add(1)
go app.syncApplicationMetricsWorker(ctx, wg)

// Listen on the close channel indefinitely until a
// `SIGINT` or `SIGTERM` is received.
<-ctx.Done()
Expand Down Expand Up @@ -201,3 +212,39 @@ func (app *App) syncNetworkMetricsWorker(ctx context.Context, wg *sync.WaitGroup
}
}
}

// Add a new worker function for the application service.
func (app *App) syncApplicationMetricsWorker(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

ticker := time.NewTicker(app.opts.SyncInterval)
defer ticker.Stop()

app.lo.Info("Starting application metrics worker", "interval", app.opts.SyncInterval)
for {
select {
case <-ticker.C:
data, err := app.fetchApplicationMetrics()
if err != nil {
app.lo.Error("Failed to fetch application metrics", "error", err)
continue
}

// Push to upstream LAMA APIs.
for host, hostData := range data {
if err := app.pushApplicationMetrics(host, hostData); err != nil {
app.lo.Error("Failed to push application metrics to NSE", "host", host, "error", err)
continue
}

// FIXME: Currently the LAMA API does not support multiple hosts.
// Once we've pushed the data for the first host, break the loop.
// Once the LAMA API supports multiple hosts, remove this.
break
}
case <-ctx.Done():
app.lo.Info("Stopping application metrics worker")
return
}
}
}
41 changes: 23 additions & 18 deletions config.sample.toml
Original file line number Diff line number Diff line change
@@ -1,40 +1,45 @@
[app]
log_level = "debug" # To enable debug logging, level should be `debug`.
sync_interval = "5m" # Interval at which the app should fetch data from metrics store.
log_level = "debug" # To enable debug logging, level should be `debug`.
max_retries = 3 # Maximum number of retries for a failed request.
retry_interval = "5s" # Interval at which the app should retry if the previous request failed.
max_retries = 3 # Maximum number of retries for a failed request.
sync_interval = "5m" # Interval at which the app should fetch data from metrics store.

[lama.nse]
exchange_id = 1 # 1=National Stock Exchange
idle_timeout = "5m" # Idle timeout for HTTP requests
login_id = "redacted"
member_id = "redacted"
password = "redacted"
timeout = "30s" # Timeout for HTTP requests
url = "https://lama.nse.internal" # Endpoint for NSE LAMA API Gateway
login_id = "redacted"
member_id = "redacted"
password = "redacted"
timeout = "30s" # Timeout for HTTP requests
idle_timeout = "5m" # Idle timeout for HTTP requests
exchange_id = 1 # 1=National Stock Exchange

[prometheus]
endpoint = "http://prometheus:9090" # Endpoint for Prometheus API
query_path = "/api/v1/query" # Endpoint for Prometheus query API
username = "redacted" # HTTP Basic Auth username
password = "redacted" # HTTP Basic Auth password
timeout = "10s" # Timeout for HTTP requests
idle_timeout = "5m" # Idle timeout for HTTP requests
max_idle_conns = 10
config_path = "/etc/prometheus/prometheus.yml" # Path to Prometheus config file. This is used to load a list of hosts to fetch metrics for.
endpoint = "http://prometheus:9090" # Endpoint for Prometheus API
idle_timeout = "5m" # Idle timeout for HTTP requests
max_idle_conns = 10
password = "redacted" # HTTP Basic Auth password
query_path = "/api/v1/query" # Endpoint for Prometheus query API
timeout = "10s" # Timeout for HTTP requests
username = "redacted" # HTTP Basic Auth username

[metrics.hardware] # Define Prometheus queries for hardware metrics
# List of hosts to fetch metrics for. Keep this empty to fetch metrics for all hosts defined in `prometheus.config_path` file.
hosts = []
cpu = '100 * (1 - avg(rate(node_cpu_seconds_total{mode="idle", hostname="%s"}[5m])))'
memory = '(1 - ((node_memory_MemFree_bytes{hostname="%s"} + node_memory_Buffers_bytes{hostname="%s"} + node_memory_Cached_bytes{hostname="%s"}) / node_memory_MemTotal_bytes{hostname="%s"})) * 100'
disk = '100 - ((node_filesystem_avail_bytes{hostname="%s",device!~"rootfs"} * 100) / node_filesystem_size_bytes{hostname="%s",device!~"rootfs"})'
hosts = []
memory = '(1 - ((node_memory_MemFree_bytes{hostname="%s"} + node_memory_Buffers_bytes{hostname="%s"} + node_memory_Cached_bytes{hostname="%s"}) / node_memory_MemTotal_bytes{hostname="%s"})) * 100'
uptime = '(node_time_seconds{hostname="%s"} - node_boot_time_seconds{hostname="%s"}) / 60'

[metrics.database] # Define Prometheus queries for db metrics
hosts = []
status = 'up{hostname="%s"}'

[metrics.network]
hosts = []
packet_errors = 'sum(rate(node_network_receive_errs_total{hostname="%s"}[5m])) + sum(rate(node_network_transmit_errs_total{hostname="%s"}[5m]))'

[metrics.application]
failure_count = 'sum(sum without (hostname, instance, server) (rate(haproxy_server_http_responses_total{job="my-app",code="5xx",proxy="my-backend"}[5m]))) by (code)'
hosts = []
throughput = 'sum(sum without (hostname, instance, server) (rate(haproxy_server_http_responses_total{job="my-app",proxy="my-backend"}[5m]))) by (proxy)'
Loading

0 comments on commit c62f312

Please sign in to comment.