Skip to content

Commit

Permalink
feat: add network service metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-karan committed Jul 25, 2023
1 parent 4febb70 commit 42f1cc9
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 26 deletions.
64 changes: 64 additions & 0 deletions cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type App struct {

hardwareSvc *hardwareService
dbSvc *dbService
networkSvc *networkService
}

type Opts struct {
Expand All @@ -37,6 +38,11 @@ type dbService struct {
queries map[string]string
}

type networkService struct {
hosts []string
queries map[string]string
}

// fetchHWMetrics fetches hardware metrics from the Prometheus HTTP API.
func (app *App) fetchHWMetrics() (map[string]models.HWPromResp, error) {
hwMetrics := make(map[string]models.HWPromResp)
Expand Down Expand Up @@ -138,6 +144,40 @@ func (app *App) fetchDBMetrics() (map[string]models.DBPromResp, error) {
return dbMetrics, nil
}

// fetchNetworkMetrics fetches network metrics from the Prometheus HTTP API.
func (app *App) fetchNetworkMetrics() (map[string]models.NetworkPromResp, error) {
networkMetrics := make(map[string]models.NetworkPromResp)

for _, host := range app.networkSvc.hosts {
networkMetricsResp := models.NetworkPromResp{}
for metric, query := range app.networkSvc.queries {
switch metric {
case "packet_errors":
value, err := app.metricsMgr.Query(fmt.Sprintf(query, host, host))
if err != nil {
app.lo.Error("Failed to query Prometheus",
"host", host,
"metric", metric,
"error", err)
continue
}
networkMetricsResp.PacketErrors = value

default:
app.lo.Warn("Unknown network metric queried",
"host", host,
"metric", metric)
}
}

// Add host metrics to the map.
networkMetrics[host] = networkMetricsResp
app.lo.Debug("fetched metrics", "host", host, "data", networkMetricsResp)
}

return networkMetrics, nil
}

// pushHWMetrics pushes hardware metrics to the NSE.
func (app *App) pushHWMetrics(host string, data models.HWPromResp) error {
for i := 0; i < app.opts.MaxRetries; i++ {
Expand Down Expand Up @@ -185,3 +225,27 @@ func (app *App) pushDBMetrics(host string, data models.DBPromResp) error {
}
return nil
}

// pushNetworkMetrics pushes network metrics to the NSE.
func (app *App) pushNetworkMetrics(host string, data models.NetworkPromResp) error {
for i := 0; i < app.opts.MaxRetries; i++ {
if err := app.nseMgr.PushNetworkMetrics(host, data); err != nil {
// Handle retry logic.
if i < app.opts.MaxRetries-1 {
app.lo.Error("Failed to push network metrics to NSE. Retrying...",
"host", host,
"attempt", i+1,
"error", err)
time.Sleep(app.opts.RetryInterval)
continue
}
app.lo.Error("Failed to push network metrics to NSE after max retries",
"host", host,
"max_retries", app.opts.MaxRetries,
"error", err)
return err
}
break
}
return nil
}
41 changes: 37 additions & 4 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ func initMetricsManager(ko *koanf.Koanf) (*metrics.Manager, error) {
func inithardwareSvc(ko *koanf.Koanf) (*hardwareService, error) {
var (
queries = map[string]string{
"cpu": ko.MustString("metrics.hardware.cpu"),
"memory": ko.MustString("metrics.hardware.memory"),
"disk": ko.MustString("metrics.hardware.disk"),
"uptime": ko.MustString("metrics.hardware.uptime"),
"cpu": ko.MustString("metrics.hardware.cpu"),
"memory": ko.MustString("metrics.hardware.memory"),
"disk": ko.MustString("metrics.hardware.disk"),
"uptime": ko.MustString("metrics.hardware.uptime"),
"network_packet_erro": ko.MustString("metrics.hardware.uptime"),
}
hosts = ko.Strings("metrics.hardware.hosts")
cfgPath = ko.String("prometheus.config_path")
Expand Down Expand Up @@ -175,6 +176,38 @@ func initDBSvc(ko *koanf.Koanf) (*dbService, error) {
}, nil
}

// Load network metrics queries and hosts from the configuration
func initNetworkSvc(ko *koanf.Koanf) (*networkService, error) {
var (
queries = map[string]string{
"packet_errors": ko.MustString("metrics.network.packet_errors"),
}
hosts = ko.Strings("metrics.network.hosts")
cfgPath = ko.String("prometheus.config_path")
)

// If no hosts are provided, try to load from the prometheus config.
if len(hosts) == 0 && cfgPath != "" {
// Fallback to the default hosts from the config.
// Load the config files from the path provided.
defaultHosts, err := initDefaultHosts(ko, cfgPath)
if err != nil {
return nil, err
}
hosts = defaultHosts
}

// Validate that hosts are loaded.
if len(hosts) == 0 {
return nil, fmt.Errorf("no hosts found in the config")
}

return &networkService{
hosts: hosts,
queries: queries,
}, nil
}

// initNSEManager initialises the NSE manager.
func initNSEManager(ko *koanf.Koanf, lo *slog.Logger) (*nse.Manager, error) {
nseMgr, err := nse.New(lo, nse.Opts{
Expand Down
46 changes: 46 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ func main() {
exit()
}

// Load queries for network metrics.
networkSvc, err := initNetworkSvc(ko)
if err != nil {
lo.Error("failed to init network service", "error", err)
exit()
}

// Initialise the NSE manager.
nseMgr, err := initNSEManager(ko, lo)
if err != nil {
Expand All @@ -60,6 +67,7 @@ func main() {
nseMgr: nseMgr,
hardwareSvc: hardwareSvc,
dbSvc: dbSvc,
networkSvc: networkSvc,
}

// Create a new context which is cancelled when `SIGINT`/`SIGTERM` is received.
Expand All @@ -74,6 +82,9 @@ func main() {
wg.Add(1)
go app.syncDBMetricsWorker(ctx, wg)

wg.Add(1)
go app.syncNetworkMetricsWorker(ctx, wg)

// Listen on the close channel indefinitely until a
// `SIGINT` or `SIGTERM` is received.
<-ctx.Done()
Expand Down Expand Up @@ -155,3 +166,38 @@ func (app *App) syncDBMetricsWorker(ctx context.Context, wg *sync.WaitGroup) {
}
}
}

func (app *App) syncNetworkMetricsWorker(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

ticker := time.NewTicker(app.opts.SyncInterval)
defer ticker.Stop()

app.lo.Info("Starting network metrics worker", "interval", app.opts.SyncInterval)
for {
select {
case <-ticker.C:
data, err := app.fetchNetworkMetrics()
if err != nil {
app.lo.Error("Failed to fetch network metrics", "error", err)
continue
}

// Push to upstream LAMA APIs.
for host, hostData := range data {
if err := app.pushNetworkMetrics(host, hostData); err != nil {
app.lo.Error("Failed to push network metrics to NSE", "host", host, "error", err)
continue
}

// FIXME: Currently the LAMA API does not support multiple hosts.
// Once we've pushed the data for the first host, break the loop.
// Once the LAMA API supports multiple hosts, remove this.
break
}
case <-ctx.Done():
app.lo.Info("Stopping network metrics worker")
return
}
}
}
33 changes: 19 additions & 14 deletions config.sample.toml
Original file line number Diff line number Diff line change
@@ -1,35 +1,40 @@
[app]
log_level = "debug" # To enable debug logging, level should be `debug`.
sync_interval = "5m" # Interval at which the app should fetch data from metrics store.
log_level = "debug" # To enable debug logging, level should be `debug`.
sync_interval = "5m" # Interval at which the app should fetch data from metrics store.
retry_interval = "5s" # Interval at which the app should retry if the previous request failed.
max_retries = 3 # Maximum number of retries for a failed request.
max_retries = 3 # Maximum number of retries for a failed request.

[lama.nse]
url = "https://lama.nse.internal" # Endpoint for NSE LAMA API Gateway
login_id = "redacted"
member_id = "redacted"
password = "redacted"
timeout = "30s" # Timeout for HTTP requests
idle_timeout = "5m" # Idle timeout for HTTP requests
exchange_id = 1 # 1=National Stock Exchange
timeout = "30s" # Timeout for HTTP requests
idle_timeout = "5m" # Idle timeout for HTTP requests
exchange_id = 1 # 1=National Stock Exchange

[prometheus]
endpoint = "http://prometheus:9090" # Endpoint for Prometheus API
query_path = "/api/v1/query" # Endpoint for Prometheus query API
username = "redacted" # HTTP Basic Auth username
password = "redacted" # HTTP Basic Auth password
timeout = "10s" # Timeout for HTTP requests
idle_timeout = "5m" # Idle timeout for HTTP requests
endpoint = "http://prometheus:9090" # Endpoint for Prometheus API
query_path = "/api/v1/query" # Endpoint for Prometheus query API
username = "redacted" # HTTP Basic Auth username
password = "redacted" # HTTP Basic Auth password
timeout = "10s" # Timeout for HTTP requests
idle_timeout = "5m" # Idle timeout for HTTP requests
max_idle_conns = 10
config_path = "/etc/prometheus/prometheus.yml" # Path to Prometheus config file. This is used to load a list of hosts to fetch metrics for.

[metrics.hardware] # Define Prometheus queries for hardware metrics
hosts = [] # List of hosts to fetch metrics for. Keep this empty to fetch metrics for all hosts defined in `prometheus.config_path` file.
# List of hosts to fetch metrics for. Keep this empty to fetch metrics for all hosts defined in `prometheus.config_path` file.
hosts = []
cpu = '100 * (1 - avg(rate(node_cpu_seconds_total{mode="idle", hostname="%s"}[5m])))'
memory = '(1 - ((node_memory_MemFree_bytes{hostname="%s"} + node_memory_Buffers_bytes{hostname="%s"} + node_memory_Cached_bytes{hostname="%s"}) / node_memory_MemTotal_bytes{hostname="%s"})) * 100'
disk = '100 - ((node_filesystem_avail_bytes{hostname="%s",device!~"rootfs"} * 100) / node_filesystem_size_bytes{hostname="%s",device!~"rootfs"})'
uptime = '(node_time_seconds{hostname="%s"} - node_boot_time_seconds{hostname="%s"}) / 60'

[metrics.database] # Define Prometheus queries for db metrics
hosts = [] # List of hosts to fetch metrics for. Keep this empty to fetch metrics for all hosts defined in `prometheus.config_path` file.
hosts = []
status = 'up{hostname="%s"}'

[metrics.network]
packet_errors = 'sum(rate(node_network_receive_errs_total{hostname="%s"}[5m])) + sum(rate(node_network_transmit_errs_total{hostname="%s"}[5m]))'
hosts = []
Loading

0 comments on commit 42f1cc9

Please sign in to comment.