Skip to content

Commit

Permalink
Merge pull request #333 from porters-xyz/poktscan-metrics-endpoint
Browse files Browse the repository at this point in the history
Poktscan metrics endpoint
  • Loading branch information
scermat authored Aug 5, 2024
2 parents bc7e987 + e3118a4 commit 4d9c70f
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 124 deletions.
152 changes: 77 additions & 75 deletions gateway/common/config.go
Original file line number Diff line number Diff line change
@@ -1,113 +1,115 @@
package common

import (
log "log/slog"
"os"
"strconv"
"strings"
"sync"
log "log/slog"
"os"
"strconv"
"strings"
"sync"
)

// store config as constants for now
const (
SHUTDOWN_DELAY string = "SHUTDOWN_DELAY"
JOB_BUFFER_SIZE = "JOB_BUFFER_SIZE"
NUM_WORKERS = "NUM_WORKERS"
PROXY_TO = "PROXY_TO"
HOST = "HOST" // host to add subdomains to
PORT = "PORT"
DATABASE_URL = "DATABASE_URL"
REDIS_URL = "REDIS_URL"
REDIS_ADDR = "REDIS_ADDR"
REDIS_USER = "REDIS_USER"
REDIS_PASSWORD = "REDIS_PASSWORD"
INSTRUMENT_ENABLED = "ENABLE_INSTRUMENT"
LOG_LEVEL = "LOG_LEVEL"
SHUTDOWN_DELAY string = "SHUTDOWN_DELAY"
JOB_BUFFER_SIZE = "JOB_BUFFER_SIZE"
NUM_WORKERS = "NUM_WORKERS"
PROXY_TO = "PROXY_TO"
HOST = "HOST" // host to add subdomains to
PORT = "PORT"
DATABASE_URL = "DATABASE_URL"
REDIS_URL = "REDIS_URL"
REDIS_ADDR = "REDIS_ADDR"
REDIS_USER = "REDIS_USER"
REDIS_PASSWORD = "REDIS_PASSWORD"
INSTRUMENT_ENABLED = "ENABLE_INSTRUMENT"
LOG_LEVEL = "LOG_LEVEL"
LOG_HTTP_RESPONSE = "LOG_HTTP_RESPONSE"
)

// This may evolve to include config outside env, or use .env file for
// convenience
type Config struct {
defaults map[string]string
loglevel *log.LevelVar
defaults map[string]string
loglevel *log.LevelVar
}

var config *Config
var configMutex sync.Once

func setupConfig() *Config {
configMutex.Do(func() {
config = &Config{
defaults: make(map[string]string),
loglevel: &log.LevelVar{},
}
config.defaults[SHUTDOWN_DELAY] = "5"
config.defaults[JOB_BUFFER_SIZE] = "50"
config.defaults[NUM_WORKERS] = "10"
config.defaults[HOST] = "localhost"
config.defaults[PORT] = "9000"
config.defaults[INSTRUMENT_ENABLED] = "false"
configMutex.Do(func() {
config = &Config{
defaults: make(map[string]string),
loglevel: &log.LevelVar{},
}
config.defaults[SHUTDOWN_DELAY] = "5"
config.defaults[JOB_BUFFER_SIZE] = "50"
config.defaults[NUM_WORKERS] = "10"
config.defaults[HOST] = "localhost"
config.defaults[PORT] = "9000"
config.defaults[INSTRUMENT_ENABLED] = "false"
config.defaults[LOG_HTTP_RESPONSE] = "true"

level := parseLogLevel(os.Getenv(LOG_LEVEL))
config.SetLogLevel(level)
})
return config
level := parseLogLevel(os.Getenv(LOG_LEVEL))
config.SetLogLevel(level)
})
return config
}

func GetConfig(key string) string {
config := setupConfig()
value, ok := os.LookupEnv(key)
if ok {
return value
} else {
defaultval, ok := config.defaults[key]
if ok {
return defaultval
} else {
log.Warn("config not set, no default", "key", key)
return ""
}
}
config := setupConfig()
value, ok := os.LookupEnv(key)
if ok {
return value
} else {
defaultval, ok := config.defaults[key]
if ok {
return defaultval
} else {
log.Warn("config not set, no default", "key", key)
return ""
}
}
}

func GetConfigInt(key string) int {
configval := GetConfig(key)
intval, err := strconv.Atoi(configval)
if err != nil {
log.Error("Error parsing config", "err", err)
intval = -1
}
return intval
configval := GetConfig(key)
intval, err := strconv.Atoi(configval)
if err != nil {
log.Error("Error parsing config", "err", err)
intval = -1
}
return intval
}

func Enabled(key string) bool {
configval := GetConfig(key)
boolval, err := strconv.ParseBool(configval)
if err != nil {
boolval = false
}
return boolval
configval := GetConfig(key)
boolval, err := strconv.ParseBool(configval)
if err != nil {
boolval = false
}
return boolval
}

func GetLogLevel() log.Level {
configval := GetConfig(LOG_LEVEL)
return parseLogLevel(configval)
configval := GetConfig(LOG_LEVEL)
return parseLogLevel(configval)
}

func parseLogLevel(level string) log.Level {
if strings.EqualFold(level, "ERROR") {
return log.LevelError
} else if strings.EqualFold(level, "WARN") {
return log.LevelWarn
} else if strings.EqualFold(level, "DEBUG") {
return log.LevelDebug
} else {
return log.LevelInfo
}
if strings.EqualFold(level, "ERROR") {
return log.LevelError
} else if strings.EqualFold(level, "WARN") {
return log.LevelWarn
} else if strings.EqualFold(level, "DEBUG") {
return log.LevelDebug
} else {
return log.LevelInfo
}
}

func (c *Config) SetLogLevel(level log.Level) {
c.loglevel.Set(level)
logger := log.New(log.NewTextHandler(os.Stdout, &log.HandlerOptions{Level: c.loglevel}))
log.SetDefault(logger)
c.loglevel.Set(level)
logger := log.New(log.NewTextHandler(os.Stdout, &log.HandlerOptions{Level: c.loglevel}))
log.SetDefault(logger)
}
37 changes: 29 additions & 8 deletions gateway/db/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ func (a *App) refresh(ctx context.Context) error {
log.Debug("Refreshing app", "appId", a.Id)
err := a.fetch(ctx)
if err != nil {
log.Error("Failed to fetch app", "appId", a.Id, "error", err)
//Can be set to log.Error, but lots of false positives will show due to context cancellation
log.Debug("Failed to fetch app", "appId", a.Id, "error", err)
a.MissedAt = time.Now()
} else {
log.Debug("Updating Tenant via Lookup")
Expand All @@ -400,7 +401,8 @@ func (a *App) refresh(ctx context.Context) error {

err = a.cache(ctx)
if err != nil {
log.Error("Failed to cache app", "appId", a.Id, "error", err)
//Can be set to log.Error, but lots of false positives will show due to context cancellation
log.Debug("Failed to cache app", "appId", a.Id, "error", err)
return err
}

Expand Down Expand Up @@ -513,27 +515,46 @@ func DecrementField(ctx context.Context, decr Decrementable, amount int) int {
return int(newVal)
}

func IncrementCounter(ctx context.Context, key string, amount int) int {
func IncrementCounterKey(ctx context.Context, key string, amount int) int {
incrBy := int64(amount)
newVal, err := getCache().IncrBy(ctx, key, incrBy).Result()
if err != nil {
log.Error("Error incrementing counter (key)", "key", key, "amount", amount, "error", err)
return -1
}
return int(newVal)
}

func DecrementCounter(ctx context.Context, key string, amount int) int {
func IncrementCounterField(ctx context.Context, key string, field string, amount int) int {
incrBy := int64(amount)

newVal, err := getCache().HIncrBy(ctx, key, field, incrBy).Result()
if err != nil {
log.Error("Error incrementing counter (field)", "key", key, "field", field, "amount", amount, "error", err)
return -1
}
return int(newVal)
}

func DecrementCounterField(ctx context.Context, key string, field string, amount int) int {
decrBy := int64(amount)
newVal, err := getCache().DecrBy(ctx, key, decrBy).Result()

newVal, err := getCache().HIncrBy(ctx, key, field, -decrBy).Result()
if err != nil {
log.Error("Error decrementing counter", "key", key, "field", field, "amount", amount, "error", err)
return -1
}
return int(newVal)
}

// returns false if counter already exists
func InitCounter(ctx context.Context, key string, initValue int) (bool, error) {
return getCache().SetNX(ctx, key, initValue, 2*time.Minute).Result()
func InitCounter(ctx context.Context, key string, field string, initValue int) (bool, error) {
success, err := getCache().HSetNX(ctx, key, field, initValue).Result()
if err != nil {
log.Error("Error initializing counter", "key", key, "field", field, "initValue", initValue, "error", err)
return false, err
}
log.Info("Initialized counter", "key", key, "field", field, "success", success)
return success, nil
}

func ReconcileRelays(ctx context.Context, rtx *Relaytx) (func() bool, error) {
Expand Down
7 changes: 4 additions & 3 deletions gateway/db/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type UsageUpdater struct {

func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater {
updater := &UsageUpdater{
status: status,
status: status,
balancekey: "balance",
}

entity, ok := common.FromContext(ctx, PRODUCT)
Expand Down Expand Up @@ -53,13 +54,13 @@ func (u *UsageUpdater) Run() {

if u.status == "success" {
ctx := context.Background()
DecrementCounter(ctx, u.balancekey, u.product.Weight)
DecrementCounterField(ctx, u.app.Tenant.Key(), u.balancekey, u.product.Weight)

use := &Relaytx{
AppId: u.app.Id,
ProductName: u.product.Name,
}
IncrementCounter(ctx, use.Key(), u.product.Weight)
IncrementCounterKey(ctx, use.Key(), u.product.Weight)
}
common.EndpointUsage.WithLabelValues(u.app.HashId(), u.app.Tenant.Id, u.product.Name, u.status).Inc()
}
Expand Down
30 changes: 15 additions & 15 deletions gateway/main.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
package main

import (
"os"
"os"

"porters/plugins"
"porters/proxy"
"porters/plugins"
"porters/proxy"
)

// command line runner
func main() {

arg := os.Args[1]
if arg == "gateway" {
arg := os.Args[1]
if arg == "gateway" {

// currently registering plugins via main
proxy.Register(&plugins.ApiKeyAuth{"X-API"})
proxy.Register(&plugins.BalanceTracker{})
proxy.Register(&plugins.LeakyBucketPlugin{"APP"})
proxy.Register(&plugins.ProductFilter{})
proxy.Register(&plugins.UserAgentFilter{})
proxy.Register(&plugins.AllowedOriginFilter{})
proxy.Register(proxy.NewReconciler(300)) // seconds
// currently registering plugins via main
proxy.Register(&plugins.ApiKeyAuth{"X-API"})
proxy.Register(&plugins.BalanceTracker{})
proxy.Register(&plugins.LeakyBucketPlugin{"APP"})
proxy.Register(&plugins.ProductFilter{})
proxy.Register(&plugins.UserAgentFilter{})
proxy.Register(&plugins.AllowedOriginFilter{})
proxy.Register(proxy.NewReconciler(300)) // seconds

gateway()
}
gateway()
}
}
16 changes: 5 additions & 11 deletions gateway/plugins/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ func (b *BalanceTracker) HandleRequest(req *http.Request) error {
if err != nil {
return proxy.NewHTTPError(http.StatusNotFound)
}

ctx = common.UpdateContext(ctx, bal)
ctx = common.UpdateContext(ctx, app)
// TODO Check that balance is greater than or equal to req weight

log.Info("Balance for app", "appId", app.Id, "bal", bal.cachedBalance)

if bal.cachedBalance > 0 {
lifecycle := proxy.SetStageComplete(ctx, proxy.BalanceCheck|proxy.AccountLookup)
ctx = common.UpdateContext(ctx, lifecycle)
} else {
log.Error("no balance remaining", "app", app.HashId())
log.Error("no balance remaining", "app", app.Id)
return proxy.BalanceExceededError
}

Expand All @@ -84,14 +86,6 @@ func (c *balancecache) ContextKey() string {
}

func (c *balancecache) Lookup(ctx context.Context) error {
created, err := db.InitCounter(ctx, c.Key(), c.tenant.Balance)
if err != nil {
return err
}
if created {
c.cachedBalance = c.tenant.Balance
} else {
c.cachedBalance = db.GetIntVal(ctx, c.Key())
}
c.cachedBalance = c.tenant.Balance
return nil
}
Loading

0 comments on commit 4d9c70f

Please sign in to comment.