From b93f01f9992232bfff957545304fd6ce0b3c8fd5 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Thu, 25 Jul 2024 23:59:29 +0200 Subject: [PATCH 01/38] updated logging --- gateway/proxy/proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index f6b0470..0652e7c 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -26,7 +26,7 @@ func Start() { log.Error("unable to parse proxy to", "err", err) panic("unable to start with invalid remote url") } - log.Debug("proxying to remote", "url", remote) + log.Info("proxying to remote", "url", remote) handler := func(proxy *httputil.ReverseProxy) func(http.ResponseWriter, *http.Request) { return func(resp http.ResponseWriter, req *http.Request) { From 33205563a7853ba2fb0c75b30897c86d4531202c Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 15:26:39 +0200 Subject: [PATCH 02/38] added t emporary logging to investigate issue --- gateway/common/tasks.go | 218 ++++++++++++++++++------------------ gateway/db/cache.go | 44 ++++---- gateway/proxy/proxy.go | 4 + gateway/proxy/reconciler.go | 6 + 4 files changed, 144 insertions(+), 128 deletions(-) diff --git a/gateway/common/tasks.go b/gateway/common/tasks.go index e816ee8..386859b 100644 --- a/gateway/common/tasks.go +++ b/gateway/common/tasks.go @@ -1,41 +1,41 @@ package common import ( - "errors" - log "log/slog" - "sync" - "time" + "errors" + log "log/slog" + "sync" + "time" ) type Runnable interface { - error - Run() + error + Run() } type Delayable interface { - Runnable - Ready() bool + Runnable + Ready() bool } // contains bits necessary to run later type SimpleTask struct { - run func() - runtime time.Time + run func() + runtime time.Time } type RetryTask struct { - SimpleTask - runWithSuccess func() bool - retryCount int - retryEvery time.Duration - retryGen int + SimpleTask + runWithSuccess func() bool + retryCount int + retryEvery time.Duration + retryGen int } type TaskQueue struct { - closed bool - tasks chan Runnable - delayed chan Delayable - errors chan error + closed bool + tasks chan Runnable + delayed chan Delayable + errors chan error } var qInst *TaskQueue @@ -43,138 +43,138 @@ var qmutex sync.Once // Another singleton func GetTaskQueue() *TaskQueue { - qmutex.Do(func() { - bufferSize := GetConfigInt(JOB_BUFFER_SIZE) - qInst = &TaskQueue{ - closed: false, - tasks: make(chan Runnable, bufferSize), - delayed: make(chan Delayable, bufferSize), - errors: make(chan error, bufferSize), - } - }) - return qInst + qmutex.Do(func() { + bufferSize := GetConfigInt(JOB_BUFFER_SIZE) + qInst = &TaskQueue{ + closed: false, + tasks: make(chan Runnable, bufferSize), + delayed: make(chan Delayable, bufferSize), + errors: make(chan error, bufferSize), + } + }) + return qInst } func (q *TaskQueue) SetupWorkers() { - numWorkers := GetConfigInt(NUM_WORKERS) - for i := 0; i < numWorkers; i++ { - go worker(q) - } - go delayWorker(q) - go errWorker(q) + numWorkers := GetConfigInt(NUM_WORKERS) + for i := 0; i < numWorkers; i++ { + go worker(q) + } + go delayWorker(q) + go errWorker(q) } // use this for graceful shutdown func (q *TaskQueue) CloseQueue() { - close(q.tasks) - close(q.delayed) - q.closed = true - - shutdownTime := time.Duration(GetConfigInt(SHUTDOWN_DELAY)) * time.Second - ticker := time.NewTicker(100 * time.Millisecond) - for { - select { - case <-ticker.C: - if len(q.tasks) == 0 { - return - } - case <-time.After(shutdownTime): - log.Warn("workers not finished, work may be lost") - return - } - } + close(q.tasks) + close(q.delayed) + q.closed = true + + shutdownTime := time.Duration(GetConfigInt(SHUTDOWN_DELAY)) * time.Second + ticker := time.NewTicker(100 * time.Millisecond) + for { + select { + case <-ticker.C: + if len(q.tasks) == 0 { + return + } + case <-time.After(shutdownTime): + log.Warn("workers not finished, work may be lost") + return + } + } } func (q *TaskQueue) Add(runnable Runnable) { - q.tasks <- runnable - JobGauge.WithLabelValues("task").Inc() + q.tasks <- runnable + JobGauge.WithLabelValues("task").Inc() } func (q *TaskQueue) Delay(delayable Delayable) { - q.delayed <- delayable - JobGauge.WithLabelValues("delayed").Inc() + q.delayed <- delayable + JobGauge.WithLabelValues("delayed").Inc() } func (q *TaskQueue) ReportError(err error) { - q.errors <- err - JobGauge.WithLabelValues("error").Inc() + q.errors <- err + JobGauge.WithLabelValues("error").Inc() } func worker(q *TaskQueue) { - for task := range q.tasks { - switch t := task.(type) { - case Combinable: - task.(Combinable).Combine(q.tasks) - case Runnable: - task.Run() - default: - log.Debug("unspecified task", "task", task, "type", t) - } - JobGauge.WithLabelValues("task").Set(float64(len(q.tasks))) - } + for task := range q.tasks { + switch t := task.(type) { + case Combinable: + task.(Combinable).Combine(q.tasks) + case Runnable: + task.Run() + default: + log.Debug("unspecified task", "task", task, "type", t) + } + JobGauge.WithLabelValues("task").Set(float64(len(q.tasks))) + } } func errWorker(q *TaskQueue) { - for err := range q.errors { - log.Error("error encountered", "err", err) - JobGauge.WithLabelValues("error").Dec() - } + for err := range q.errors { + log.Error("error encountered", "err", err) + JobGauge.WithLabelValues("error").Dec() + } } func delayWorker(q *TaskQueue) { - for i:=len(q.delayed); i>0; i-- { - task := <- q.delayed - if q.closed { - q.ReportError(errors.New("Shutting down")) - } else if task.Ready() { - q.Add(task) - JobGauge.WithLabelValues("delayed").Dec() - } else { - q.delayed <- task - } - } - time.Sleep(1 * time.Second) + for i := len(q.delayed); i > 0; i-- { + task := <-q.delayed + if q.closed { + q.ReportError(errors.New("Shutting down")) + } else if task.Ready() { + q.Add(task) + JobGauge.WithLabelValues("delayed").Dec() + } else { + q.delayed <- task + } + } + time.Sleep(1 * time.Second) } func NewSimpleTask(run func()) *SimpleTask { - return &SimpleTask{ - run: run, - runtime: time.Now(), - } + return &SimpleTask{ + run: run, + runtime: time.Now(), + } } // SimpleTask can be extended if needed func (t *SimpleTask) Run() { - t.run() + t.run() } func (t *SimpleTask) Ready() bool { - return time.Now().After(t.runtime) + return time.Now().After(t.runtime) } func (t *SimpleTask) Error() string { - // Override to include more details - return "error processing async task" + // Override to include more details + return "error processing async task" } func NewRetryTask(run func() bool, count int, every time.Duration) *RetryTask { - return &RetryTask{ - runWithSuccess: run, - retryCount: count, - retryEvery: every, - } + return &RetryTask{ + runWithSuccess: run, + retryCount: count, + retryEvery: every, + } } func (r *RetryTask) Run() { - ok := r.runWithSuccess() - if !ok { - q := GetTaskQueue() - if r.retryGen < r.retryCount { - r.runtime = time.Now().Add(r.retryEvery) - r.retryGen++ - q.delayed <- r - } else { - q.errors <- r - } - } + ok := r.runWithSuccess() + if !ok { + q := GetTaskQueue() + if r.retryGen < r.retryCount { + r.runtime = time.Now().Add(r.retryEvery) + r.retryGen++ + q.delayed <- r + } else { + q.errors <- r + } + } } diff --git a/gateway/db/cache.go b/gateway/db/cache.go index c7ee5bd..b3db859 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -18,14 +18,13 @@ import ( ) const ( - ACCOUNT_SET = "VALID_ACCOUNTS" - REDIS = "redis" + ACCOUNT_SET = "VALID_ACCOUNTS" + REDIS = "redis" MISSED_FALSE = "0001-01-01T00:00:00Z" ) // access redis functions through this object type Cache struct { - } type RefreshTask struct { @@ -56,10 +55,10 @@ func getCache() *redis.Client { if err != nil { log.Warn("valid REDIS_URL not provided", "err", err) opts = &redis.Options{ - Addr: common.GetConfig(common.REDIS_ADDR), + Addr: common.GetConfig(common.REDIS_ADDR), Username: common.GetConfig(common.REDIS_USER), Password: common.GetConfig(common.REDIS_PASSWORD), - DB: 0, + DB: 0, } } client = redis.NewClient(opts) @@ -209,24 +208,24 @@ func (a *App) Rules(ctx context.Context) (Apprules, error) { continue } - // Extract the actual ID from the Redis key - parts := strings.Split(key, ":") - if len(parts) != 3 || parts[0] != APPRULE { - log.Error("Invalid key format", "key", key) - continue - } - appId := parts[1] - appRuleId := parts[2] + // Extract the actual ID from the Redis key + parts := strings.Split(key, ":") + if len(parts) != 3 || parts[0] != APPRULE { + log.Error("Invalid key format", "key", key) + continue + } + appId := parts[1] + appRuleId := parts[2] active, _ := strconv.ParseBool(result["active"]) cachedAt, _ := time.Parse(time.RFC3339, result["cachedAt"]) ar := Apprule{ - Id: appRuleId, - Active: active, - Value: result["value"], + Id: appRuleId, + Active: active, + Value: result["value"], RuleType: result["ruleType"], CachedAt: cachedAt, - App: App{Id: appId}, + App: App{Id: appId}, } // Check if the Apprule needs to be refreshed @@ -279,7 +278,14 @@ func (p *Product) Lookup(ctx context.Context) error { func RelaytxFromKey(ctx context.Context, key string) (*Relaytx, bool) { relaycount := GetIntVal(ctx, key) + log.Info("Relay count", "relaycount", relaycount) + rtx := reverseRelaytxKey(key) + log.Info("Reverse relay key", "rtx", rtx) + + log.Info("AppId", "AppId", rtx.AppId) + log.Info("ProductName", "ProductName", rtx.ProductName) + if relaycount > 0 && rtx.AppId != "" && rtx.ProductName != "" { uuid := uuid.New() rtx.Id = uuid.String() @@ -440,14 +446,14 @@ func DecrementCounter(ctx context.Context, key string, amount int) int { // returns false if counter already exists func InitCounter(ctx context.Context, key string, initValue int) (bool, error) { - return getCache().SetNX(ctx, key, initValue, 2 * time.Minute).Result() + return getCache().SetNX(ctx, key, initValue, 2*time.Minute).Result() } func ReconcileRelays(ctx context.Context, rtx *Relaytx) (func() bool, error) { // Can ignore new value _, err := getCache().DecrBy(ctx, rtx.Key(), int64(rtx.Amount)).Result() if err != nil { - return func() bool {return false}, err + return func() bool { return false }, err } updateFunc := func() bool { diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index 0652e7c..18eebc0 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -90,6 +90,10 @@ func setupProxy(remote *url.URL) *httputil.ReverseProxy { cancel(ChainNotSupportedError) } target := utils.NewTarget(remote, poktId) + + //TODO + log.Info("new target", "remote", remote, "poktId", poktId); + req.URL = target.URL() for _, p := range (*reg).plugins { diff --git a/gateway/proxy/reconciler.go b/gateway/proxy/reconciler.go index e7ab642..d420e20 100644 --- a/gateway/proxy/reconciler.go +++ b/gateway/proxy/reconciler.go @@ -46,12 +46,18 @@ func (r *Reconciler) spawnTasks() { for iter.Next(ctx) { rtxkey := iter.Val() // use for building relaytx + log.Info("Reconciling tasks for key", "rtxkey", rtxkey) + rtx, ok := db.RelaytxFromKey(ctx, rtxkey) if ok { task := &reconcileTask{ relaytx: rtx, } queue.Add(task) + + log.Info("Queued task", "rtxkey", rtxkey) + } else { + log.Error("Failed to queue task", "rtxkey", rtxkey) } } } From 7ae8bdb306c47d3f716e403f559cf27ec2f1a0f3 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 16:50:01 +0200 Subject: [PATCH 03/38] updated logging for further investigation --- gateway/common/tasks.go | 3 ++- gateway/db/cache.go | 8 ++++---- gateway/proxy/proxy.go | 4 ---- gateway/proxy/reconciler.go | 1 + 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/gateway/common/tasks.go b/gateway/common/tasks.go index 386859b..6f0b2b6 100644 --- a/gateway/common/tasks.go +++ b/gateway/common/tasks.go @@ -102,13 +102,14 @@ func (q *TaskQueue) ReportError(err error) { func worker(q *TaskQueue) { for task := range q.tasks { + log.Info("Processing task", "task", task) switch t := task.(type) { case Combinable: task.(Combinable).Combine(q.tasks) case Runnable: task.Run() default: - log.Debug("unspecified task", "task", task, "type", t) + log.Warn("unspecified task", "task", task, "type", t) } JobGauge.WithLabelValues("task").Set(float64(len(q.tasks))) } diff --git a/gateway/db/cache.go b/gateway/db/cache.go index b3db859..e8758c3 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -278,13 +278,13 @@ func (p *Product) Lookup(ctx context.Context) error { func RelaytxFromKey(ctx context.Context, key string) (*Relaytx, bool) { relaycount := GetIntVal(ctx, key) - log.Info("Relay count", "relaycount", relaycount) + //log.Info("Relay count", "relaycount", relaycount) rtx := reverseRelaytxKey(key) - log.Info("Reverse relay key", "rtx", rtx) + //log.Info("Reverse relay key", "rtx", rtx) - log.Info("AppId", "AppId", rtx.AppId) - log.Info("ProductName", "ProductName", rtx.ProductName) + //log.Info("AppId", "AppId", rtx.AppId) + //log.Info("ProductName", "ProductName", rtx.ProductName) if relaycount > 0 && rtx.AppId != "" && rtx.ProductName != "" { uuid := uuid.New() diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index 18eebc0..0652e7c 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -90,10 +90,6 @@ func setupProxy(remote *url.URL) *httputil.ReverseProxy { cancel(ChainNotSupportedError) } target := utils.NewTarget(remote, poktId) - - //TODO - log.Info("new target", "remote", remote, "poktId", poktId); - req.URL = target.URL() for _, p := range (*reg).plugins { diff --git a/gateway/proxy/reconciler.go b/gateway/proxy/reconciler.go index d420e20..2d1a271 100644 --- a/gateway/proxy/reconciler.go +++ b/gateway/proxy/reconciler.go @@ -64,6 +64,7 @@ func (r *Reconciler) spawnTasks() { } func (t *reconcileTask) Run() { + log.Info("Running reconcile task", "relaytx", t.relaytx) ctx := context.Background() replayfunc, err := db.ReconcileRelays(ctx, t.relaytx) if err != nil { From 861d64352b5660d93fcb2be14b5e4ce699ae2280 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 18:31:29 +0200 Subject: [PATCH 04/38] various modifications to ensure safety of execution and identify source of issue. --- gateway/common/tasks.go | 44 ++++++++++++++++++++++++++++++-------- gateway/db/cache.go | 47 +++++++++++++++++++++++++++++++---------- 2 files changed, 71 insertions(+), 20 deletions(-) diff --git a/gateway/common/tasks.go b/gateway/common/tasks.go index 6f0b2b6..4e72003 100644 --- a/gateway/common/tasks.go +++ b/gateway/common/tasks.go @@ -100,21 +100,47 @@ func (q *TaskQueue) ReportError(err error) { JobGauge.WithLabelValues("error").Inc() } +// func worker(q *TaskQueue) { +// for task := range q.tasks { +// log.Info("Processing task", "task", task) +// switch t := task.(type) { +// case Combinable: +// task.(Combinable).Combine(q.tasks) +// case Runnable: +// task.Run() +// default: +// log.Warn("unspecified task", "task", task, "type", t) +// } +// JobGauge.WithLabelValues("task").Set(float64(len(q.tasks))) +// } +// } + func worker(q *TaskQueue) { for task := range q.tasks { - log.Info("Processing task", "task", task) - switch t := task.(type) { - case Combinable: - task.(Combinable).Combine(q.tasks) - case Runnable: - task.Run() - default: - log.Warn("unspecified task", "task", task, "type", t) - } + processTask(task, q) JobGauge.WithLabelValues("task").Set(float64(len(q.tasks))) } } +func processTask(task Runnable, q *TaskQueue) { + defer func() { + if r := recover(); r != nil { + log.Error("Recovered in worker", "error", r) + } + }() + + log.Info("Processing task", "task", task) + + switch t := task.(type) { + case Combinable: + task.(Combinable).Combine(q.tasks) + case Runnable: + task.Run() + default: + log.Debug("unspecified task", "task", task, "type", t) + } +} + func errWorker(q *TaskQueue) { for err := range q.errors { log.Error("error encountered", "err", err) diff --git a/gateway/db/cache.go b/gateway/db/cache.go index e8758c3..cdeb98d 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -299,38 +299,61 @@ func RelaytxFromKey(ctx context.Context, key string) (*Relaytx, bool) { // Refresh does the psql calls to build cache func (t *Tenant) refresh(ctx context.Context) error { + log.Info("Refreshing tenant", "tenantId", t.Id) err := t.fetch(ctx) if err != nil { - log.Error("something's wrong", "tenant", t.Id, "err", err) + log.Error("Failed to fetch tenant", "tenantId", t.Id, "error", err) + return err + } + + err = t.canonicalBalance(ctx) + if err != nil { + log.Error("Failed to get canonical balance", "tenantId", t.Id, "error", err) + return err + } + + err = t.cache(ctx) + if err != nil { + log.Error("Failed to cache tenant", "tenantId", t.Id, "error", err) return err - } else { - err := t.canonicalBalance(ctx) - if err != nil { - log.Error("error getting balance", "tenant", t.Id, "err", err) - } - t.cache(ctx) } + return nil } func (a *App) refresh(ctx context.Context) error { + if a == nil { + log.Error("App is nil, cannot refresh") + return errors.New("App is nil") + } + + log.Info("Refreshing app", "appId", a.Id) err := a.fetch(ctx) if err != nil { - log.Error("err seen refreshing app", "app", a.HashId(), "err", err) + log.Error("Failed to fetch app", "appId", a.Id, "error", err) a.MissedAt = time.Now() } else { a.Tenant.Lookup(ctx) } - a.cache(ctx) + + err = a.cache(ctx) + if err != nil { + log.Error("Failed to cache app", "appId", a.Id, "error", err) + return err + } rules, err := a.fetchRules(ctx) if err != nil { - log.Error("error accessing rules", "app", a.HashId(), "err", err) + log.Error("Failed to fetch rules", "appId", a.Id, "error", err) return err } for _, r := range rules { - r.cache(ctx) + err := r.cache(ctx) + if err != nil { + log.Error("Failed to cache rule", "ruleId", r.Id, "error", err) + } } + return nil } @@ -355,9 +378,11 @@ func (p *Product) refresh(ctx context.Context) error { } func (t *RefreshTask) Run() { + log.Info("Running refresh task", "refreshable", t.ref) ctx := context.Background() err := t.ref.refresh(ctx) if err != nil { + log.Error("Failed to refresh", "error", err) common.GetTaskQueue().ReportError(errors.New(t.Error())) } } From 1aaf5a298f19ad89dfbce06efe886e5d0eca37ff Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 19:14:42 +0200 Subject: [PATCH 05/38] mroe debugging changes --- gateway/common/tasks.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/gateway/common/tasks.go b/gateway/common/tasks.go index 4e72003..5f19a0a 100644 --- a/gateway/common/tasks.go +++ b/gateway/common/tasks.go @@ -3,6 +3,7 @@ package common import ( "errors" log "log/slog" + "reflect" "sync" "time" ) @@ -86,16 +87,19 @@ func (q *TaskQueue) CloseQueue() { } func (q *TaskQueue) Add(runnable Runnable) { + log.Info("Adding runnable to queue", "delayable", runnable) q.tasks <- runnable JobGauge.WithLabelValues("task").Inc() } func (q *TaskQueue) Delay(delayable Delayable) { + log.Info("Adding delayed task to queue", "delayable", delayable) q.delayed <- delayable JobGauge.WithLabelValues("delayed").Inc() } func (q *TaskQueue) ReportError(err error) { + log.Info("Adding error task to queue", "err", err) q.errors <- err JobGauge.WithLabelValues("error").Inc() } @@ -129,7 +133,8 @@ func processTask(task Runnable, q *TaskQueue) { } }() - log.Info("Processing task", "task", task) + taskType := reflect.TypeOf(task).String() + log.Info("Processing task", "task", task, "taskType", taskType) switch t := task.(type) { case Combinable: From 2ae26a4716abde3e7aff80278d0dd0a96967d710 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 19:33:13 +0200 Subject: [PATCH 06/38] added checks in UsageUpdater --- gateway/common/tasks.go | 2 +- gateway/db/usage.go | 98 ++++++++++++++++++++++------------------- 2 files changed, 54 insertions(+), 46 deletions(-) diff --git a/gateway/common/tasks.go b/gateway/common/tasks.go index 5f19a0a..81b44ad 100644 --- a/gateway/common/tasks.go +++ b/gateway/common/tasks.go @@ -87,7 +87,7 @@ func (q *TaskQueue) CloseQueue() { } func (q *TaskQueue) Add(runnable Runnable) { - log.Info("Adding runnable to queue", "delayable", runnable) + log.Info("Adding runnable to queue", "runnable", runnable) q.tasks <- runnable JobGauge.WithLabelValues("task").Inc() } diff --git a/gateway/db/usage.go b/gateway/db/usage.go index 286531f..57a2fc8 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -1,63 +1,71 @@ package db import ( - "context" - "fmt" - log "log/slog" + "context" + "fmt" + log "log/slog" - "porters/common" + "porters/common" ) // Implements Runnable type UsageUpdater struct { - status string - balancekey string - app *App - tenant *Tenant - product *Product + status string + balancekey string + app *App + tenant *Tenant + product *Product } func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { - updater := &UsageUpdater{ - status: status, - } - - entity, ok := common.FromContext(ctx, PRODUCT) - if ok { - updater.product = entity.(*Product) - } - entity, ok = common.FromContext(ctx, APP) - if ok { - updater.app = entity.(*App) - } - entity, ok = common.FromContext(ctx, TENANT) - if ok { - tenant := entity.(*Tenant) - updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) - updater.tenant = tenant - } - - return updater + updater := &UsageUpdater{ + status: status, + } + + entity, ok := common.FromContext(ctx, PRODUCT) + if !ok || entity == nil { + log.Error("Failed to get product from context") + } else { + updater.product = entity.(*Product) + } + + entity, ok = common.FromContext(ctx, APP) + if !ok || entity == nil { + log.Error("Failed to get app from context") + } else { + updater.app = entity.(*App) + } + + entity, ok = common.FromContext(ctx, TENANT) + if !ok || entity == nil { + log.Error("Failed to get tenant from context") + } else { + tenant := entity.(*Tenant) + updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) + updater.tenant = tenant + } + + return updater } func (u *UsageUpdater) Run() { - if u.app == nil || u.tenant == nil || u.product == nil { - log.Debug("invalid request, usage not reported") - return - } - if u.status == "success" { - ctx := context.Background() - DecrementCounter(ctx, u.balancekey, u.product.Weight) - - use := &Relaytx{ - AppId: u.app.Id, - ProductName: u.product.Name, - } - IncrementCounter(ctx, use.Key(), u.product.Weight) - } - common.EndpointUsage.WithLabelValues(u.app.HashId(), u.tenant.Id, u.product.Name, u.status).Inc() + if u.app == nil || u.tenant == nil || u.product == nil { + log.Error("Invalid request, usage not reported", "app", u.app, "tenant", u.tenant, "product", u.product) + return + } + if u.status == "success" { + ctx := context.Background() + DecrementCounter(ctx, u.balancekey, u.product.Weight) + + use := &Relaytx{ + AppId: u.app.Id, + ProductName: u.product.Name, + } + IncrementCounter(ctx, use.Key(), u.product.Weight) + } + common.EndpointUsage.WithLabelValues(u.app.HashId(), u.tenant.Id, u.product.Name, u.status).Inc() } func (u *UsageUpdater) Error() string { - return fmt.Sprintf("BAL: unable to use relays for %s", u.tenant.Id) + return fmt.Sprintf("BAL: unable to use relays for %s", u.tenant.Id) } From 3f709332f30f95ad13f5ecdf07c334ad694e9905 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 20:11:27 +0200 Subject: [PATCH 07/38] further logging for debugging --- gateway/common/tasks.go | 6 +++--- gateway/db/cache.go | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/gateway/common/tasks.go b/gateway/common/tasks.go index 81b44ad..05a6887 100644 --- a/gateway/common/tasks.go +++ b/gateway/common/tasks.go @@ -87,19 +87,19 @@ func (q *TaskQueue) CloseQueue() { } func (q *TaskQueue) Add(runnable Runnable) { - log.Info("Adding runnable to queue", "runnable", runnable) + log.Info("Adding runnable to queue") q.tasks <- runnable JobGauge.WithLabelValues("task").Inc() } func (q *TaskQueue) Delay(delayable Delayable) { - log.Info("Adding delayed task to queue", "delayable", delayable) + log.Info("Adding delayed task to queue") q.delayed <- delayable JobGauge.WithLabelValues("delayed").Inc() } func (q *TaskQueue) ReportError(err error) { - log.Info("Adding error task to queue", "err", err) + log.Info("Adding error task to queue") q.errors <- err JobGauge.WithLabelValues("error").Inc() } diff --git a/gateway/db/cache.go b/gateway/db/cache.go index cdeb98d..6c66cbf 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -154,6 +154,7 @@ func (t *Tenant) Lookup(ctx context.Context) error { common.UpdateContext(ctx, t) if expired(t) { + log.Info("Tenant cache expired. Refreshing", "tenantId", t.Id) common.GetTaskQueue().Add(&RefreshTask{ ref: t, }) @@ -187,6 +188,7 @@ func (a *App) Lookup(ctx context.Context) error { common.UpdateContext(ctx, a) if expired(a) { + log.Info("App cache expired. Refreshing", "appId", a.Id) common.GetTaskQueue().Add(&RefreshTask{ ref: a, }) @@ -230,7 +232,7 @@ func (a *App) Rules(ctx context.Context) (Apprules, error) { // Check if the Apprule needs to be refreshed if expired(&ar) { - log.Debug("Apprule is expired, adding refresh task", "apprule", ar.Id) + log.Info("Apprule cache is expired. Refreshing", "apprule", ar.Id) common.GetTaskQueue().Add(&RefreshTask{ ref: &ar, }) @@ -268,6 +270,7 @@ func (p *Product) Lookup(ctx context.Context) error { common.UpdateContext(ctx, p) if expired(p) { + log.Info("Product cache expired. Refreshing", "productId", p.Id) common.GetTaskQueue().Add(&RefreshTask{ ref: p, }) From 0f43b591c8259f094577de41febf111488e33010 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 20:41:08 +0200 Subject: [PATCH 08/38] further temporary logging --- gateway/common/tasks.go | 2 +- gateway/db/cache.go | 15 ++++++++++----- gateway/db/usage.go | 6 +++--- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/gateway/common/tasks.go b/gateway/common/tasks.go index 05a6887..477d4f2 100644 --- a/gateway/common/tasks.go +++ b/gateway/common/tasks.go @@ -87,7 +87,7 @@ func (q *TaskQueue) CloseQueue() { } func (q *TaskQueue) Add(runnable Runnable) { - log.Info("Adding runnable to queue") + log.Info("tasks.go > Add > Adding runnable to queue") q.tasks <- runnable JobGauge.WithLabelValues("task").Inc() } diff --git a/gateway/db/cache.go b/gateway/db/cache.go index 6c66cbf..835b2f9 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -136,6 +136,7 @@ func (p *Product) cache(ctx context.Context) error { } func (t *Tenant) Lookup(ctx context.Context) error { + log.Info("cache.go > Lookup > Looking up Tenant from context") fromContext, ok := common.FromContext(ctx, TENANT) if ok { *t = *fromContext.(*Tenant) @@ -151,10 +152,11 @@ func (t *Tenant) Lookup(ctx context.Context) error { t.CachedAt, _ = time.Parse(time.RFC3339, result["cachedAt"]) } + log.Info("cache.go > Lookup > Setting Tenant context", "tenantId", t.Id) common.UpdateContext(ctx, t) if expired(t) { - log.Info("Tenant cache expired. Refreshing", "tenantId", t.Id) + log.Info("cache.go > Lookup > Tenant cache expired. Refreshing", "tenantId", t.Id) common.GetTaskQueue().Add(&RefreshTask{ ref: t, }) @@ -185,6 +187,8 @@ func (a *App) Lookup(ctx context.Context) error { a.Tenant.Id = result["tenant"] a.Tenant.Lookup(ctx) } + + log.Info("cache.go > Lookup > Setting App context", "appId", a.Id) common.UpdateContext(ctx, a) if expired(a) { @@ -267,6 +271,7 @@ func (p *Product) Lookup(ctx context.Context) error { p.Active, _ = strconv.ParseBool(result["active"]) } + log.Info("cache.go > Lookup > Setting Product context", "productId", p.Id) common.UpdateContext(ctx, p) if expired(p) { @@ -302,22 +307,22 @@ func RelaytxFromKey(ctx context.Context, key string) (*Relaytx, bool) { // Refresh does the psql calls to build cache func (t *Tenant) refresh(ctx context.Context) error { - log.Info("Refreshing tenant", "tenantId", t.Id) + log.Info("cache.go > refresh > Refreshing tenant", "tenantId", t.Id) err := t.fetch(ctx) if err != nil { - log.Error("Failed to fetch tenant", "tenantId", t.Id, "error", err) + log.Error("cache.go > refresh > Failed to fetch tenant", "tenantId", t.Id, "error", err) return err } err = t.canonicalBalance(ctx) if err != nil { - log.Error("Failed to get canonical balance", "tenantId", t.Id, "error", err) + log.Error("cache.go > refresh > Failed to get canonical balance", "tenantId", t.Id, "error", err) return err } err = t.cache(ctx) if err != nil { - log.Error("Failed to cache tenant", "tenantId", t.Id, "error", err) + log.Error("cache.go > refresh > Failed to cache tenant", "tenantId", t.Id, "error", err) return err } diff --git a/gateway/db/usage.go b/gateway/db/usage.go index 57a2fc8..f1c6677 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -24,21 +24,21 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { entity, ok := common.FromContext(ctx, PRODUCT) if !ok || entity == nil { - log.Error("Failed to get product from context") + log.Error("usage.go > NewUsageUpdater > Failed to get product from context") } else { updater.product = entity.(*Product) } entity, ok = common.FromContext(ctx, APP) if !ok || entity == nil { - log.Error("Failed to get app from context") + log.Error("usage.go > NewUsageUpdater > Failed to get app from context") } else { updater.app = entity.(*App) } entity, ok = common.FromContext(ctx, TENANT) if !ok || entity == nil { - log.Error("Failed to get tenant from context") + log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context") } else { tenant := entity.(*Tenant) updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) From 81b95d50a6c2912b3bed571790678e7b4606a023 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 20:49:15 +0200 Subject: [PATCH 09/38] switched proxy to single threaded to aid in debugging... --- gateway/common/tasks.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/gateway/common/tasks.go b/gateway/common/tasks.go index 477d4f2..8b031f8 100644 --- a/gateway/common/tasks.go +++ b/gateway/common/tasks.go @@ -56,13 +56,19 @@ func GetTaskQueue() *TaskQueue { return qInst } +// TODO: Undo once stable!! +// Currently operates as single threaded to debug issue with prod environment func (q *TaskQueue) SetupWorkers() { - numWorkers := GetConfigInt(NUM_WORKERS) - for i := 0; i < numWorkers; i++ { - go worker(q) - } - go delayWorker(q) - go errWorker(q) + // numWorkers := GetConfigInt(NUM_WORKERS) + // for i := 0; i < numWorkers; i++ { + // go worker(q) + // } + // go delayWorker(q) + // go errWorker(q) + + worker(q) + delayWorker(q) + errWorker(q) } // use this for graceful shutdown From 4b624210891562e2b11d7690b9aceb7f09375838 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 20:59:47 +0200 Subject: [PATCH 10/38] added more logging to identify why tenant context is empty --- gateway/common/context.go | 34 ++++++++++++++++++---------------- gateway/proxy/proxy.go | 2 ++ 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/gateway/common/context.go b/gateway/common/context.go index 9654b6d..1bce1b1 100644 --- a/gateway/common/context.go +++ b/gateway/common/context.go @@ -1,41 +1,43 @@ package common import ( - "context" - "time" + "context" + log "log/slog" + "time" ) const ( - INSTRUMENT string = "INSTRUMENT_START" + INSTRUMENT string = "INSTRUMENT_START" ) type Contextable interface { - ContextKey() string + ContextKey() string } type Instrument struct { - Timestamp time.Time + Timestamp time.Time } func UpdateContext(ctx context.Context, entity Contextable) context.Context { - return context.WithValue(ctx, entity.ContextKey(), entity) + log.Info("context.go > UpdateContext > Updating context", "entity", entity.ContextKey()) + return context.WithValue(ctx, entity.ContextKey(), entity) } func FromContext(ctx context.Context, contextkey string) (any, bool) { - value := ctx.Value(contextkey) - if value != nil { - return value, true - } else { - return nil, false - } + value := ctx.Value(contextkey) + if value != nil { + return value, true + } else { + return nil, false + } } func StartInstrument() *Instrument { - return &Instrument{ - Timestamp: time.Now(), - } + return &Instrument{ + Timestamp: time.Now(), + } } func (i *Instrument) ContextKey() string { - return INSTRUMENT + return INSTRUMENT } diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index 0652e7c..d92a140 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -155,6 +155,7 @@ func setupProxy(remote *url.URL) *httputil.ReverseProxy { } if resp.StatusCode < 400 && err == nil { + log.Info("proxy.go > revProxy.ModifyResponse > New Usage Updater Success") updater := db.NewUsageUpdater(ctx, "success") common.GetTaskQueue().Add(updater) } @@ -167,6 +168,7 @@ func setupProxy(remote *url.URL) *httputil.ReverseProxy { var httpErr *HTTPError cause := context.Cause(ctx) + log.Info("proxy.go > revProxy.ErrorHandler > New Usage Updater Failure") updater := db.NewUsageUpdater(ctx, "failure") common.GetTaskQueue().Add(updater) From 946dd8f4e19e61d72738c32736d1529a980da220 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 21:00:57 +0200 Subject: [PATCH 11/38] reverted previous change --- gateway/common/tasks.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/gateway/common/tasks.go b/gateway/common/tasks.go index 8b031f8..acef04a 100644 --- a/gateway/common/tasks.go +++ b/gateway/common/tasks.go @@ -56,19 +56,15 @@ func GetTaskQueue() *TaskQueue { return qInst } -// TODO: Undo once stable!! -// Currently operates as single threaded to debug issue with prod environment func (q *TaskQueue) SetupWorkers() { + // TODO: Undo once stable!! // numWorkers := GetConfigInt(NUM_WORKERS) - // for i := 0; i < numWorkers; i++ { - // go worker(q) - // } - // go delayWorker(q) - // go errWorker(q) - - worker(q) - delayWorker(q) - errWorker(q) + numWorkers := 1 + for i := 0; i < numWorkers; i++ { + go worker(q) + } + go delayWorker(q) + go errWorker(q) } // use this for graceful shutdown From 2e9fa21cc49824ecd17504963985884b7f102aba Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 21:24:11 +0200 Subject: [PATCH 12/38] attempted bugfix --- gateway/db/usage.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index f1c6677..d94ff2c 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -34,11 +34,15 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Error("usage.go > NewUsageUpdater > Failed to get app from context") } else { updater.app = entity.(*App) + + //Ensure Tenant is loaded in context + updater.app.Tenant.Lookup(ctx) } entity, ok = common.FromContext(ctx, TENANT) if !ok || entity == nil { log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context") + } else { tenant := entity.(*Tenant) updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) @@ -50,7 +54,7 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { func (u *UsageUpdater) Run() { if u.app == nil || u.tenant == nil || u.product == nil { - log.Error("Invalid request, usage not reported", "app", u.app, "tenant", u.tenant, "product", u.product) + log.Error("usage.go > UsageUpdated > Invalid request, usage not reported", "app", u.app, "tenant", u.tenant, "product", u.product) return } if u.status == "success" { From d46e74b733c8b121e3330968cf2fb50058daef92 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 21:32:46 +0200 Subject: [PATCH 13/38] more logging to try to understand tenant null behaviour --- gateway/db/usage.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index d94ff2c..4f7b11f 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -34,15 +34,16 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Error("usage.go > NewUsageUpdater > Failed to get app from context") } else { updater.app = entity.(*App) - - //Ensure Tenant is loaded in context - updater.app.Tenant.Lookup(ctx) } + //Ensure Tenant is loaded in context + log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + updater.app.Tenant.Lookup(ctx) + log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + entity, ok = common.FromContext(ctx, TENANT) if !ok || entity == nil { - log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context") - + log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context", "tenantId", updater.app.Tenant.Id, "appId", updater.app.Id, "product", updater.product.Id) } else { tenant := entity.(*Tenant) updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) From 1b9a245ac4d09de6960c3d5fe10ebb400e05dd25 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 21:55:18 +0200 Subject: [PATCH 14/38] fixed typo in redis key --- gateway/db/cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gateway/db/cache.go b/gateway/db/cache.go index 835b2f9..a148113 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -89,7 +89,7 @@ func (t *Tenant) cache(ctx context.Context) error { err := getCache().HSet(ctx, t.Key(), "active", t.Active, "balance", t.Balance, - "cached", cached).Err() + "cachedAt", cached).Err() if err != nil { return err } @@ -101,7 +101,7 @@ func (a *App) cache(ctx context.Context) error { err := getCache().HSet(ctx, a.Key(), "active", a.Active, "tenant", a.Tenant.Id, - "cached", cached, + "cachedAt", cached, "missedAt", a.MissedAt).Err() if err != nil { return err From f4b34ced40382fb8892bfeaf7b27ba05f00cc70f Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 22:05:50 +0200 Subject: [PATCH 15/38] added logging to investigate redis discrepancy --- gateway/db/cache.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gateway/db/cache.go b/gateway/db/cache.go index a148113..b98c49a 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -149,7 +149,10 @@ func (t *Tenant) Lookup(ctx context.Context) error { } else { t.Active, _ = strconv.ParseBool(result["active"]) t.Balance, _ = strconv.Atoi(result["balance"]) - t.CachedAt, _ = time.Parse(time.RFC3339, result["cachedAt"]) + t.CachedAt, err = time.Parse(time.RFC3339, result["cachedAt"]) + if err != nil { + log.Error("Failed retrieving cache for tenant", "tenantId", t.Key()) + } } log.Info("cache.go > Lookup > Setting Tenant context", "tenantId", t.Id) From f2371d4eb060e833bb4666ab8421e9f7f7089868 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 22:22:39 +0200 Subject: [PATCH 16/38] more logs... trying to see why context is not updating on prod --- gateway/db/cache.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/gateway/db/cache.go b/gateway/db/cache.go index b98c49a..a29984f 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -149,14 +149,14 @@ func (t *Tenant) Lookup(ctx context.Context) error { } else { t.Active, _ = strconv.ParseBool(result["active"]) t.Balance, _ = strconv.Atoi(result["balance"]) - t.CachedAt, err = time.Parse(time.RFC3339, result["cachedAt"]) - if err != nil { - log.Error("Failed retrieving cache for tenant", "tenantId", t.Key()) - } + t.CachedAt, _ = time.Parse(time.RFC3339, result["cachedAt"]) + + log.Info("cache.go > Lookup > retrieved tenant", "tenant", t) } log.Info("cache.go > Lookup > Setting Tenant context", "tenantId", t.Id) common.UpdateContext(ctx, t) + log.Info("cache.go > Lookup > Finished setting Tenant context", "tenantId", t.Id) if expired(t) { log.Info("cache.go > Lookup > Tenant cache expired. Refreshing", "tenantId", t.Id) From d01aa9b7d74275930ce6e4d4fc16724f8ad7c84d Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 22:34:28 +0200 Subject: [PATCH 17/38] removed tenant from UsageUpdater --- gateway/db/usage.go | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index 4f7b11f..b1f3464 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -13,7 +13,6 @@ type UsageUpdater struct { status string balancekey string app *App - tenant *Tenant product *Product } @@ -34,43 +33,34 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Error("usage.go > NewUsageUpdater > Failed to get app from context") } else { updater.app = entity.(*App) - } - - //Ensure Tenant is loaded in context - log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) - updater.app.Tenant.Lookup(ctx) - log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) - - entity, ok = common.FromContext(ctx, TENANT) - if !ok || entity == nil { - log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context", "tenantId", updater.app.Tenant.Id, "appId", updater.app.Id, "product", updater.product.Id) - } else { - tenant := entity.(*Tenant) - updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) - updater.tenant = tenant + updater.balancekey = fmt.Sprintf("BALANCE:%s", updater.app.Tenant.Id) } return updater } func (u *UsageUpdater) Run() { - if u.app == nil || u.tenant == nil || u.product == nil { - log.Error("usage.go > UsageUpdated > Invalid request, usage not reported", "app", u.app, "tenant", u.tenant, "product", u.product) + if u.app == nil || u.product == nil { + log.Error("usage.go > UsageUpdated > Invalid request, usage not reported", "app", u.app, "product", u.product) return } + if u.status == "success" { ctx := context.Background() - DecrementCounter(ctx, u.balancekey, u.product.Weight) + decCounter := DecrementCounter(ctx, u.balancekey, u.product.Weight) + log.Info("usage.go > UsageUpdated > Decremented by ", "decCounter", decCounter, "app", u.app) use := &Relaytx{ AppId: u.app.Id, ProductName: u.product.Name, } - IncrementCounter(ctx, use.Key(), u.product.Weight) + + incCounter := IncrementCounter(ctx, use.Key(), u.product.Weight) + log.Info("usage.go > UsageUpdated > Incremented by ", "incCounter", incCounter, "app", u.app) } - common.EndpointUsage.WithLabelValues(u.app.HashId(), u.tenant.Id, u.product.Name, u.status).Inc() + common.EndpointUsage.WithLabelValues(u.app.HashId(), u.app.Tenant.Id, u.product.Name, u.status).Inc() } func (u *UsageUpdater) Error() string { - return fmt.Sprintf("BAL: unable to use relays for %s", u.tenant.Id) + return fmt.Sprintf("BAL: unable to use relays for %s", u.app.Tenant.Id) } From 66fe08eb59abe6b9c49a0a29db529812abef90d3 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Fri, 26 Jul 2024 23:59:48 +0200 Subject: [PATCH 18/38] more debugging... --- gateway/db/usage.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index b1f3464..6476bfd 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -13,6 +13,7 @@ type UsageUpdater struct { status string balancekey string app *App + tenant *Tenant product *Product } @@ -33,7 +34,20 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Error("usage.go > NewUsageUpdater > Failed to get app from context") } else { updater.app = entity.(*App) - updater.balancekey = fmt.Sprintf("BALANCE:%s", updater.app.Tenant.Id) + } + + //Ensure Tenant is loaded in context + log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + updater.app.Tenant.Lookup(ctx) + log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + + entity, ok = common.FromContext(ctx, TENANT) + if !ok || entity == nil { + log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context", "tenantId", updater.app.Tenant.Id, "appId", updater.app.Id, "product", updater.product.Id) + } else { + tenant := entity.(*Tenant) + updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) + updater.tenant = tenant } return updater From a42fff0514b4cd1feeb301eebddf9cd9aad6a464 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 00:12:56 +0200 Subject: [PATCH 19/38] testing out changes in UsageUpdater --- gateway/db/cache.go | 3 +-- gateway/db/usage.go | 43 ++++++++++++++++++++++--------------------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/gateway/db/cache.go b/gateway/db/cache.go index a29984f..13ec320 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" log "log/slog" + "porters/common" "strconv" "strings" "sync" @@ -13,8 +14,6 @@ import ( rl "github.com/go-redis/redis_rate/v10" "github.com/google/uuid" "github.com/redis/go-redis/v9" - - "porters/common" ) const ( diff --git a/gateway/db/usage.go b/gateway/db/usage.go index 6476bfd..ee0d2cb 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -12,9 +12,8 @@ import ( type UsageUpdater struct { status string balancekey string - app *App - tenant *Tenant - product *Product + app App + product Product } func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { @@ -26,38 +25,40 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { if !ok || entity == nil { log.Error("usage.go > NewUsageUpdater > Failed to get product from context") } else { - updater.product = entity.(*Product) + updater.product = entity.(Product) } entity, ok = common.FromContext(ctx, APP) if !ok || entity == nil { log.Error("usage.go > NewUsageUpdater > Failed to get app from context") } else { - updater.app = entity.(*App) + updater.app = entity.(App) + updater.balancekey = fmt.Sprintf("BALANCE:%s", updater.app.Tenant.Id) } - //Ensure Tenant is loaded in context - log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) - updater.app.Tenant.Lookup(ctx) - log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + // //Ensure Tenant is loaded in context + // log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + // updater.app.Tenant.Lookup(ctx) + // log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) - entity, ok = common.FromContext(ctx, TENANT) - if !ok || entity == nil { - log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context", "tenantId", updater.app.Tenant.Id, "appId", updater.app.Id, "product", updater.product.Id) - } else { - tenant := entity.(*Tenant) - updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) - updater.tenant = tenant - } + // entity, ok = common.FromContext(ctx, TENANT) + // if !ok || entity == nil { + // log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context", "tenantId", updater.app.Tenant.Id, "appId", updater.app.Id, "product", updater.product.Id) + // } else { + // tenant := entity.(Tenant) + // updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) + // updater.tenant = tenant + // } return updater } func (u *UsageUpdater) Run() { - if u.app == nil || u.product == nil { - log.Error("usage.go > UsageUpdated > Invalid request, usage not reported", "app", u.app, "product", u.product) - return - } + // if u.app == nil || u.product == nil { + // log.Error("usage.go > UsageUpdated > Invalid request, usage not reported", "app", u.app, "product", u.product) + // return + // } + log.Error("usage.go > UsageUpdated > Attempting usage reporte", "app", u.app, "product", u.product) if u.status == "success" { ctx := context.Background() From f86d22a7e50832e6af71bb739677ac8896e424de Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 00:20:17 +0200 Subject: [PATCH 20/38] logging cause of failure in error handler --- gateway/proxy/proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index d92a140..3cf87fe 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -168,7 +168,7 @@ func setupProxy(remote *url.URL) *httputil.ReverseProxy { var httpErr *HTTPError cause := context.Cause(ctx) - log.Info("proxy.go > revProxy.ErrorHandler > New Usage Updater Failure") + log.Info("proxy.go > revProxy.ErrorHandler > New Usage Updater Failure", "cause", cause) updater := db.NewUsageUpdater(ctx, "failure") common.GetTaskQueue().Add(updater) From ca15c643a58d87bfca6bd53b7532b6508656cbfd Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 00:30:03 +0200 Subject: [PATCH 21/38] usage issues appears to have been resolved, now investigating balance issue --- gateway/plugins/balance.go | 115 +++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 57 deletions(-) diff --git a/gateway/plugins/balance.go b/gateway/plugins/balance.go index 692ca5a..4fa9c46 100644 --- a/gateway/plugins/balance.go +++ b/gateway/plugins/balance.go @@ -3,93 +3,94 @@ package plugins // This is the top level bucket where available relays are sync'ed with balance import ( - "context" - "fmt" - log "log/slog" - "net/http" + "context" + "fmt" + log "log/slog" + "net/http" - "porters/common" - "porters/db" - "porters/proxy" + "porters/common" + "porters/db" + "porters/proxy" ) type BalanceTracker struct { } type balancecache struct { - tracker *BalanceTracker - tenant *db.Tenant - app *db.App - product *db.Product - cachedBalance int + tracker *BalanceTracker + tenant *db.Tenant + app *db.App + product *db.Product + cachedBalance int } const ( - BALANCE string = "BALANCE" + BALANCE string = "BALANCE" ) func (b *BalanceTracker) Name() string { - return "Relay Balance Limiter" + return "Relay Balance Limiter" } func (b *BalanceTracker) Key() string { - return BALANCE + return BALANCE } func (b *BalanceTracker) Load() { - // Setup any plugin state - log.Debug("Loading plugin", "plugin", b.Name()) + // Setup any plugin state + log.Debug("Loading plugin", "plugin", b.Name()) } // TODO optim: script this to avoid multi-hops func (b *BalanceTracker) HandleRequest(req *http.Request) error { - ctx := req.Context() - appId := proxy.PluckAppId(req) - app := &db.App{Id: appId} - err := app.Lookup(ctx) - if err != nil { - return proxy.NewHTTPError(http.StatusNotFound) - } - bal := &balancecache{ - tracker: b, - tenant: &app.Tenant, - } - err = bal.Lookup(ctx) - if err != nil { - return proxy.NewHTTPError(http.StatusNotFound) - } - ctx = common.UpdateContext(ctx, bal) - ctx = common.UpdateContext(ctx, app) - // TODO Check that balance is greater than or equal to req weight - if bal.cachedBalance > 0 { - lifecycle := proxy.SetStageComplete(ctx, proxy.BalanceCheck|proxy.AccountLookup) - ctx = common.UpdateContext(ctx, lifecycle) - *req = *req.WithContext(ctx) - } else { - log.Debug("no balance remaining", "app", app.HashId()) - return proxy.BalanceExceededError - } - return nil + ctx := req.Context() + appId := proxy.PluckAppId(req) + app := &db.App{Id: appId} + err := app.Lookup(ctx) + if err != nil { + return proxy.NewHTTPError(http.StatusNotFound) + } + bal := &balancecache{ + tracker: b, + tenant: &app.Tenant, + } + err = bal.Lookup(ctx) + if err != nil { + return proxy.NewHTTPError(http.StatusNotFound) + } + ctx = common.UpdateContext(ctx, bal) + ctx = common.UpdateContext(ctx, app) + // TODO Check that balance is greater than or equal to req weight + log.Info("balance", "bal", bal) + + if bal.cachedBalance > 0 { + lifecycle := proxy.SetStageComplete(ctx, proxy.BalanceCheck|proxy.AccountLookup) + ctx = common.UpdateContext(ctx, lifecycle) + *req = *req.WithContext(ctx) + } else { + log.Error("no balance remaining", "app", app.HashId()) + return proxy.BalanceExceededError + } + return nil } func (c *balancecache) Key() string { - return fmt.Sprintf("%s:%s", c.tracker.Key(), c.tenant.Id) + return fmt.Sprintf("%s:%s", c.tracker.Key(), c.tenant.Id) } func (c *balancecache) ContextKey() string { - return BALANCE + return BALANCE } func (c *balancecache) Lookup(ctx context.Context) error { - created, err := db.InitCounter(ctx, c.Key(), c.tenant.Balance) - if err != nil { - return err - } - if created { - c.cachedBalance = c.tenant.Balance - } else { - c.cachedBalance = db.GetIntVal(ctx, c.Key()) - } - return nil + created, err := db.InitCounter(ctx, c.Key(), c.tenant.Balance) + if err != nil { + return err + } + if created { + c.cachedBalance = c.tenant.Balance + } else { + c.cachedBalance = db.GetIntVal(ctx, c.Key()) + } + return nil } - From 07566581b0dd703e0de93e4bdfadc7d61eeb8e08 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 00:45:28 +0200 Subject: [PATCH 22/38] reverted back to original, now that the source of the issue has been identified --- gateway/db/usage.go | 53 +++++++++++++++++++++--------------------- gateway/proxy/proxy.go | 2 +- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index ee0d2cb..81ac104 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -12,8 +12,9 @@ import ( type UsageUpdater struct { status string balancekey string - app App - product Product + app *App + tenant *Tenant + product *Product } func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { @@ -25,53 +26,51 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { if !ok || entity == nil { log.Error("usage.go > NewUsageUpdater > Failed to get product from context") } else { - updater.product = entity.(Product) + updater.product = entity.(*Product) + log.Info("usage.go > retrieved product entity", "product", updater.product) } entity, ok = common.FromContext(ctx, APP) if !ok || entity == nil { log.Error("usage.go > NewUsageUpdater > Failed to get app from context") } else { - updater.app = entity.(App) - updater.balancekey = fmt.Sprintf("BALANCE:%s", updater.app.Tenant.Id) + updater.app = entity.(*App) + log.Info("usage.go > retrieved app entity", "app", updater.app) } - // //Ensure Tenant is loaded in context - // log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) - // updater.app.Tenant.Lookup(ctx) - // log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + //Ensure Tenant is loaded in context + log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + updater.app.Tenant.Lookup(ctx) + log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) - // entity, ok = common.FromContext(ctx, TENANT) - // if !ok || entity == nil { - // log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context", "tenantId", updater.app.Tenant.Id, "appId", updater.app.Id, "product", updater.product.Id) - // } else { - // tenant := entity.(Tenant) - // updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) - // updater.tenant = tenant - // } + entity, ok = common.FromContext(ctx, TENANT) + if !ok || entity == nil { + log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context", "tenantId", updater.app.Tenant.Id, "appId", updater.app.Id, "product", updater.product.Id) + } else { + tenant := entity.(*Tenant) + log.Info("usage.go > retrieved tenant entity", "tenant", tenant) + updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) + updater.tenant = tenant + } return updater } func (u *UsageUpdater) Run() { - // if u.app == nil || u.product == nil { - // log.Error("usage.go > UsageUpdated > Invalid request, usage not reported", "app", u.app, "product", u.product) - // return - // } - log.Error("usage.go > UsageUpdated > Attempting usage reporte", "app", u.app, "product", u.product) + if u.app == nil || u.tenant == nil || u.product == nil { + log.Error("usage.go > UsageUpdated > Invalid request, usage not reported", "app", u.app, "tenant", u.tenant, "product", u.product) + return + } if u.status == "success" { ctx := context.Background() - decCounter := DecrementCounter(ctx, u.balancekey, u.product.Weight) - log.Info("usage.go > UsageUpdated > Decremented by ", "decCounter", decCounter, "app", u.app) + DecrementCounter(ctx, u.balancekey, u.product.Weight) use := &Relaytx{ AppId: u.app.Id, ProductName: u.product.Name, } - - incCounter := IncrementCounter(ctx, use.Key(), u.product.Weight) - log.Info("usage.go > UsageUpdated > Incremented by ", "incCounter", incCounter, "app", u.app) + IncrementCounter(ctx, use.Key(), u.product.Weight) } common.EndpointUsage.WithLabelValues(u.app.HashId(), u.app.Tenant.Id, u.product.Name, u.status).Inc() } diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index 3cf87fe..0bdc555 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -168,7 +168,7 @@ func setupProxy(remote *url.URL) *httputil.ReverseProxy { var httpErr *HTTPError cause := context.Cause(ctx) - log.Info("proxy.go > revProxy.ErrorHandler > New Usage Updater Failure", "cause", cause) + log.Info("proxy.go > revProxy.ErrorHandler", "cause", cause) updater := db.NewUsageUpdater(ctx, "failure") common.GetTaskQueue().Add(updater) From 8d64777d82c4c122f2865ebe5eedf6cf3cb2cd26 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 00:59:27 +0200 Subject: [PATCH 23/38] cleaned some logs, and added others to identify cause of pointer issue --- gateway/common/tasks.go | 24 +---------------- gateway/db/usage.go | 6 +++-- gateway/gateway.go | 54 +++++++++++++++++++------------------- gateway/plugins/balance.go | 1 - gateway/proxy/proxy.go | 3 +-- 5 files changed, 33 insertions(+), 55 deletions(-) diff --git a/gateway/common/tasks.go b/gateway/common/tasks.go index acef04a..643b82f 100644 --- a/gateway/common/tasks.go +++ b/gateway/common/tasks.go @@ -3,7 +3,6 @@ package common import ( "errors" log "log/slog" - "reflect" "sync" "time" ) @@ -89,38 +88,20 @@ func (q *TaskQueue) CloseQueue() { } func (q *TaskQueue) Add(runnable Runnable) { - log.Info("tasks.go > Add > Adding runnable to queue") q.tasks <- runnable JobGauge.WithLabelValues("task").Inc() } func (q *TaskQueue) Delay(delayable Delayable) { - log.Info("Adding delayed task to queue") q.delayed <- delayable JobGauge.WithLabelValues("delayed").Inc() } func (q *TaskQueue) ReportError(err error) { - log.Info("Adding error task to queue") q.errors <- err JobGauge.WithLabelValues("error").Inc() } -// func worker(q *TaskQueue) { -// for task := range q.tasks { -// log.Info("Processing task", "task", task) -// switch t := task.(type) { -// case Combinable: -// task.(Combinable).Combine(q.tasks) -// case Runnable: -// task.Run() -// default: -// log.Warn("unspecified task", "task", task, "type", t) -// } -// JobGauge.WithLabelValues("task").Set(float64(len(q.tasks))) -// } -// } - func worker(q *TaskQueue) { for task := range q.tasks { processTask(task, q) @@ -135,16 +116,13 @@ func processTask(task Runnable, q *TaskQueue) { } }() - taskType := reflect.TypeOf(task).String() - log.Info("Processing task", "task", task, "taskType", taskType) - switch t := task.(type) { case Combinable: task.(Combinable).Combine(q.tasks) case Runnable: task.Run() default: - log.Debug("unspecified task", "task", task, "type", t) + log.Warn("unspecified task", "task", task, "type", t) } } diff --git a/gateway/db/usage.go b/gateway/db/usage.go index 81ac104..af86e1b 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -22,6 +22,7 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { status: status, } + log.Info("usage.go > NewUsageUpdater > Attempting to read product from context") entity, ok := common.FromContext(ctx, PRODUCT) if !ok || entity == nil { log.Error("usage.go > NewUsageUpdater > Failed to get product from context") @@ -30,6 +31,7 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Info("usage.go > retrieved product entity", "product", updater.product) } + log.Info("usage.go > NewUsageUpdater > Attempting to read app from context") entity, ok = common.FromContext(ctx, APP) if !ok || entity == nil { log.Error("usage.go > NewUsageUpdater > Failed to get app from context") @@ -39,9 +41,9 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { } //Ensure Tenant is loaded in context - log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "updater", updater) updater.app.Tenant.Lookup(ctx) - log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "updater", updater) entity, ok = common.FromContext(ctx, TENANT) if !ok || entity == nil { diff --git a/gateway/gateway.go b/gateway/gateway.go index 5456e05..e304c34 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -1,41 +1,41 @@ package main import ( - log "log/slog" - "os" - "os/signal" - "sync" - "syscall" + log "log/slog" + "os" + "os/signal" + "sync" + "syscall" - "porters/common" - "porters/proxy" + "porters/common" + "porters/proxy" ) func gateway() { - // Start job queue - common.GetTaskQueue().SetupWorkers() - level := common.GetLogLevel() + // Start job queue + common.GetTaskQueue().SetupWorkers() + level := common.GetLogLevel() - log.Info("starting gateway", "level", level) - proxy.Start() + log.Info("starting gateway", "level", level) + proxy.Start() - done := make(chan os.Signal, 1) - signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) - <-done - shutdown() + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) + <-done + shutdown() } func shutdown() { - var wg sync.WaitGroup + var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - proxy.Stop() - }() - go func() { - defer wg.Done() - common.GetTaskQueue().CloseQueue() - }() - wg.Wait() + wg.Add(2) + go func() { + defer wg.Done() + proxy.Stop() + }() + go func() { + defer wg.Done() + common.GetTaskQueue().CloseQueue() + }() + wg.Wait() } diff --git a/gateway/plugins/balance.go b/gateway/plugins/balance.go index 4fa9c46..e09cfec 100644 --- a/gateway/plugins/balance.go +++ b/gateway/plugins/balance.go @@ -61,7 +61,6 @@ func (b *BalanceTracker) HandleRequest(req *http.Request) error { ctx = common.UpdateContext(ctx, bal) ctx = common.UpdateContext(ctx, app) // TODO Check that balance is greater than or equal to req weight - log.Info("balance", "bal", bal) if bal.cachedBalance > 0 { lifecycle := proxy.SetStageComplete(ctx, proxy.BalanceCheck|proxy.AccountLookup) diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index 0bdc555..4b2e454 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -155,7 +155,6 @@ func setupProxy(remote *url.URL) *httputil.ReverseProxy { } if resp.StatusCode < 400 && err == nil { - log.Info("proxy.go > revProxy.ModifyResponse > New Usage Updater Success") updater := db.NewUsageUpdater(ctx, "success") common.GetTaskQueue().Add(updater) } @@ -168,7 +167,7 @@ func setupProxy(remote *url.URL) *httputil.ReverseProxy { var httpErr *HTTPError cause := context.Cause(ctx) - log.Info("proxy.go > revProxy.ErrorHandler", "cause", cause) + log.Error("Error during relay attempt", "cause", cause) updater := db.NewUsageUpdater(ctx, "failure") common.GetTaskQueue().Add(updater) From 40a0b62ccbe959982719c32a6d90c692e59f60bc Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 01:01:46 +0200 Subject: [PATCH 24/38] revised logs to better analyse state change of updater --- gateway/db/usage.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index af86e1b..a5345ca 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -28,7 +28,7 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Error("usage.go > NewUsageUpdater > Failed to get product from context") } else { updater.product = entity.(*Product) - log.Info("usage.go > retrieved product entity", "product", updater.product) + log.Info("usage.go > retrieved product entity", "updater", updater) } log.Info("usage.go > NewUsageUpdater > Attempting to read app from context") @@ -37,7 +37,7 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Error("usage.go > NewUsageUpdater > Failed to get app from context") } else { updater.app = entity.(*App) - log.Info("usage.go > retrieved app entity", "app", updater.app) + log.Info("usage.go > retrieved app entity", "updater", updater) } //Ensure Tenant is loaded in context From 0d2204cff156cbc7848180fabdab89ddc7b5e0a2 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 01:12:24 +0200 Subject: [PATCH 25/38] trying revised usage updater --- gateway/db/usage.go | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index a5345ca..959bbd1 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -28,7 +28,7 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Error("usage.go > NewUsageUpdater > Failed to get product from context") } else { updater.product = entity.(*Product) - log.Info("usage.go > retrieved product entity", "updater", updater) + log.Info("usage.go > NewUsageUpdater > retrieved product entity", "product", updater.product) } log.Info("usage.go > NewUsageUpdater > Attempting to read app from context") @@ -37,22 +37,31 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Error("usage.go > NewUsageUpdater > Failed to get app from context") } else { updater.app = entity.(*App) - log.Info("usage.go > retrieved app entity", "updater", updater) - } + log.Info("usage.go > NewUsageUpdater > retrieved app entity", "app", updater.app) - //Ensure Tenant is loaded in context - log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "updater", updater) - updater.app.Tenant.Lookup(ctx) - log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "updater", updater) + // Ensure Tenant is loaded in context + if updater.app.Tenant.Id != "" { // Check if Tenant Id is not empty to ensure it's initialized + log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + updater.app.Tenant.Lookup(ctx) + log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + } else { + log.Error("usage.go > NewUsageUpdater > app.Tenant is not initialized", "appId", updater.app.Id) + } + } - entity, ok = common.FromContext(ctx, TENANT) - if !ok || entity == nil { - log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context", "tenantId", updater.app.Tenant.Id, "appId", updater.app.Id, "product", updater.product.Id) + // Ensure tenant is retrieved from context + if updater.app.Tenant.Id != "" { + entity, ok = common.FromContext(ctx, TENANT) + if !ok || entity == nil { + log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context", "tenantId", updater.app.Tenant.Id, "appId", updater.app.Id, "product", updater.product) + } else { + tenant := entity.(*Tenant) + log.Info("usage.go > retrieved tenant entity", "tenant", tenant) + updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) + updater.tenant = tenant + } } else { - tenant := entity.(*Tenant) - log.Info("usage.go > retrieved tenant entity", "tenant", tenant) - updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) - updater.tenant = tenant + log.Error("usage.go > NewUsageUpdater > app.Tenant is not properly initialized", "appId", updater.app.Id) } return updater From 66dc5c89ec297d18c33833ed8c2a887fdf7433cb Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 01:20:57 +0200 Subject: [PATCH 26/38] yet more logging --- gateway/db/usage.go | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index 959bbd1..4725f6c 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -31,6 +31,7 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Info("usage.go > NewUsageUpdater > retrieved product entity", "product", updater.product) } + logContext(ctx) log.Info("usage.go > NewUsageUpdater > Attempting to read app from context") entity, ok = common.FromContext(ctx, APP) if !ok || entity == nil { @@ -38,15 +39,21 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { } else { updater.app = entity.(*App) log.Info("usage.go > NewUsageUpdater > retrieved app entity", "app", updater.app) + } - // Ensure Tenant is loaded in context - if updater.app.Tenant.Id != "" { // Check if Tenant Id is not empty to ensure it's initialized - log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) - updater.app.Tenant.Lookup(ctx) - log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) - } else { - log.Error("usage.go > NewUsageUpdater > app.Tenant is not initialized", "appId", updater.app.Id) - } + // Ensure app is not nil before accessing Tenant + if updater.app == nil { + log.Error("usage.go > NewUsageUpdater > app is nil") + return updater + } + + // Ensure Tenant is loaded in context only if app is not nil + if updater.app.Tenant.Id != "" { // Check if Tenant Id is not empty to ensure it's initialized + log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + updater.app.Tenant.Lookup(ctx) + log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) + } else { + log.Error("usage.go > NewUsageUpdater > app.Tenant is not initialized", "appId", updater.app.Id) } // Ensure tenant is retrieved from context @@ -67,6 +74,12 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { return updater } +func logContext(ctx context.Context) { + for k, v := range ctx.Value().(map[string]interface{}) { + log.Info("context value", "key", k, "value", v) + } +} + func (u *UsageUpdater) Run() { if u.app == nil || u.tenant == nil || u.product == nil { log.Error("usage.go > UsageUpdated > Invalid request, usage not reported", "app", u.app, "tenant", u.tenant, "product", u.product) From ff22be6ae563445955df509f6f8e026270bbf499 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 01:25:27 +0200 Subject: [PATCH 27/38] fixed missing argument in logContext --- gateway/db/usage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index 4725f6c..08862de 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -75,7 +75,7 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { } func logContext(ctx context.Context) { - for k, v := range ctx.Value().(map[string]interface{}) { + for k, v := range ctx.Value(ctx).(map[string]interface{}) { log.Info("context value", "key", k, "value", v) } } From 37cb8fea91eff9631c77f86c35520f01f4f325c6 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 01:36:24 +0200 Subject: [PATCH 28/38] more logging changes --- gateway/common/context.go | 2 +- gateway/db/usage.go | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/gateway/common/context.go b/gateway/common/context.go index 1bce1b1..65802a9 100644 --- a/gateway/common/context.go +++ b/gateway/common/context.go @@ -19,7 +19,7 @@ type Instrument struct { } func UpdateContext(ctx context.Context, entity Contextable) context.Context { - log.Info("context.go > UpdateContext > Updating context", "entity", entity.ContextKey()) + log.Info("*** Updating context ***", "entity", entity) return context.WithValue(ctx, entity.ContextKey(), entity) } diff --git a/gateway/db/usage.go b/gateway/db/usage.go index 08862de..3a7926b 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -31,7 +31,6 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Info("usage.go > NewUsageUpdater > retrieved product entity", "product", updater.product) } - logContext(ctx) log.Info("usage.go > NewUsageUpdater > Attempting to read app from context") entity, ok = common.FromContext(ctx, APP) if !ok || entity == nil { @@ -74,12 +73,6 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { return updater } -func logContext(ctx context.Context) { - for k, v := range ctx.Value(ctx).(map[string]interface{}) { - log.Info("context value", "key", k, "value", v) - } -} - func (u *UsageUpdater) Run() { if u.app == nil || u.tenant == nil || u.product == nil { log.Error("usage.go > UsageUpdated > Invalid request, usage not reported", "app", u.app, "tenant", u.tenant, "product", u.product) From bbb899334c2760e7278878abf10a4a469a5efab2 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 02:05:54 +0200 Subject: [PATCH 29/38] fixed potential issue with cache mechanism, that may have been triggering pointer to be nil --- gateway/common/context.go | 6 +++- gateway/db/cache.go | 58 +++++++++++++++++++++++++++++++++++---- gateway/db/usage.go | 16 ++++------- 3 files changed, 63 insertions(+), 17 deletions(-) diff --git a/gateway/common/context.go b/gateway/common/context.go index 65802a9..5680c34 100644 --- a/gateway/common/context.go +++ b/gateway/common/context.go @@ -19,7 +19,7 @@ type Instrument struct { } func UpdateContext(ctx context.Context, entity Contextable) context.Context { - log.Info("*** Updating context ***", "entity", entity) + log.Info("*** Updating context ***", "key", entity.ContextKey(), "entity", entity) return context.WithValue(ctx, entity.ContextKey(), entity) } @@ -32,6 +32,10 @@ func FromContext(ctx context.Context, contextkey string) (any, bool) { } } +func LogContext(ctx context.Context, contextkey string) { + log.Info("Context Value for", "key", contextkey, "val", ctx.Value(contextkey)) +} + func StartInstrument() *Instrument { return &Instrument{ Timestamp: time.Now(), diff --git a/gateway/db/cache.go b/gateway/db/cache.go index 13ec320..55d358a 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -135,7 +135,6 @@ func (p *Product) cache(ctx context.Context) error { } func (t *Tenant) Lookup(ctx context.Context) error { - log.Info("cache.go > Lookup > Looking up Tenant from context") fromContext, ok := common.FromContext(ctx, TENANT) if ok { *t = *fromContext.(*Tenant) @@ -148,14 +147,27 @@ func (t *Tenant) Lookup(ctx context.Context) error { } else { t.Active, _ = strconv.ParseBool(result["active"]) t.Balance, _ = strconv.Atoi(result["balance"]) - t.CachedAt, _ = time.Parse(time.RFC3339, result["cachedAt"]) - log.Info("cache.go > Lookup > retrieved tenant", "tenant", t) + //We have to do this because in an older version some redis keys were "cached" not "cachedAt" due to a bug + if cachedAtStr := result["cachedAt"]; cachedAtStr != "" { + cachedAt, err := time.Parse(time.RFC3339, cachedAtStr) + if err != nil { + log.Error("Failed to parse cachedAt date", "tenantId", t.Id, "error", err) + } else { + t.CachedAt = cachedAt + } + } else if cachedStr := result["cached"]; cachedStr != "" { + cachedAt, err := time.Parse(time.RFC3339, cachedStr) + if err != nil { + log.Error("Failed to parse cached date", "tenantId", t.Id, "error", err) + } else { + t.CachedAt = cachedAt + } + } } - log.Info("cache.go > Lookup > Setting Tenant context", "tenantId", t.Id) common.UpdateContext(ctx, t) - log.Info("cache.go > Lookup > Finished setting Tenant context", "tenantId", t.Id) + common.LogContext(ctx, TENANT) if expired(t) { log.Info("cache.go > Lookup > Tenant cache expired. Refreshing", "tenantId", t.Id) @@ -186,12 +198,31 @@ func (a *App) Lookup(ctx context.Context) error { } else { log.Debug("got app from cache", "app", a.HashId()) a.Active, _ = strconv.ParseBool(result["active"]) + + //We have to do this because in an older version some redis keys were "cached" not "cachedAt" due to a bug + if cachedAtStr := result["cachedAt"]; cachedAtStr != "" { + cachedAt, err := time.Parse(time.RFC3339, cachedAtStr) + if err != nil { + log.Error("Failed to parse cachedAt date", "appId", a.Id, "error", err) + } else { + a.CachedAt = cachedAt + } + } else if cachedStr := result["cached"]; cachedStr != "" { + cachedAt, err := time.Parse(time.RFC3339, cachedStr) + if err != nil { + log.Error("Failed to parse cached date", "appId", a.Id, "error", err) + } else { + a.CachedAt = cachedAt + } + } + a.Tenant.Id = result["tenant"] a.Tenant.Lookup(ctx) } log.Info("cache.go > Lookup > Setting App context", "appId", a.Id) common.UpdateContext(ctx, a) + common.LogContext(ctx, APP) if expired(a) { log.Info("App cache expired. Refreshing", "appId", a.Id) @@ -271,6 +302,23 @@ func (p *Product) Lookup(ctx context.Context) error { p.PoktId, _ = result["poktId"] p.Weight, _ = strconv.Atoi(result["weight"]) p.Active, _ = strconv.ParseBool(result["active"]) + + //We have to do this because in an older version some redis keys were "cached" not "cachedAt" due to a bug + if cachedAtStr := result["cachedAt"]; cachedAtStr != "" { + cachedAt, err := time.Parse(time.RFC3339, cachedAtStr) + if err != nil { + log.Error("Failed to parse cachedAt product", "product", p.Id, "error", err) + } else { + p.CachedAt = cachedAt + } + } else if cachedStr := result["cached"]; cachedStr != "" { + cachedAt, err := time.Parse(time.RFC3339, cachedStr) + if err != nil { + log.Error("Failed to parse cached product", "product", p.Id, "error", err) + } else { + p.CachedAt = cachedAt + } + } } log.Info("cache.go > Lookup > Setting Product context", "productId", p.Id) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index 3a7926b..1ec2d1c 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -22,7 +22,11 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { status: status, } - log.Info("usage.go > NewUsageUpdater > Attempting to read product from context") + log.Info("NewUsageUpdater Being") + common.LogContext(ctx, PRODUCT) + common.LogContext(ctx, APP) + common.LogContext(ctx, TENANT) + entity, ok := common.FromContext(ctx, PRODUCT) if !ok || entity == nil { log.Error("usage.go > NewUsageUpdater > Failed to get product from context") @@ -31,7 +35,6 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Info("usage.go > NewUsageUpdater > retrieved product entity", "product", updater.product) } - log.Info("usage.go > NewUsageUpdater > Attempting to read app from context") entity, ok = common.FromContext(ctx, APP) if !ok || entity == nil { log.Error("usage.go > NewUsageUpdater > Failed to get app from context") @@ -46,15 +49,6 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { return updater } - // Ensure Tenant is loaded in context only if app is not nil - if updater.app.Tenant.Id != "" { // Check if Tenant Id is not empty to ensure it's initialized - log.Info("usage.go > NewUsageUpdater > Begin Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) - updater.app.Tenant.Lookup(ctx) - log.Info("usage.go > NewUsageUpdater > Finished Tenant Lookup for tenant with id", "tenantId", updater.app.Tenant.Id) - } else { - log.Error("usage.go > NewUsageUpdater > app.Tenant is not initialized", "appId", updater.app.Id) - } - // Ensure tenant is retrieved from context if updater.app.Tenant.Id != "" { entity, ok = common.FromContext(ctx, TENANT) From 4daa8aa651602f5f8b061fdedd77ad5c65a55260 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 02:32:52 +0200 Subject: [PATCH 30/38] revised handling of context, since it was being updated and not returned in Lookup functions --- gateway/db/cache.go | 23 ++++---- gateway/db/usage.go | 2 +- gateway/plugins/apikeyauth.go | 2 +- gateway/plugins/balance.go | 2 +- gateway/plugins/leaky.go | 2 +- gateway/plugins/origin.go | 4 +- gateway/plugins/productfilter.go | 2 +- gateway/plugins/useragent.go | 97 ++++++++++++++++---------------- gateway/proxy/proxy.go | 2 +- 9 files changed, 68 insertions(+), 68 deletions(-) diff --git a/gateway/db/cache.go b/gateway/db/cache.go index 55d358a..d698932 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -134,7 +134,7 @@ func (p *Product) cache(ctx context.Context) error { return nil } -func (t *Tenant) Lookup(ctx context.Context) error { +func (t *Tenant) Lookup(ctx context.Context) (context.Context, error) { fromContext, ok := common.FromContext(ctx, TENANT) if ok { *t = *fromContext.(*Tenant) @@ -166,7 +166,7 @@ func (t *Tenant) Lookup(ctx context.Context) error { } } - common.UpdateContext(ctx, t) + ctx = common.UpdateContext(ctx, t) common.LogContext(ctx, TENANT) if expired(t) { @@ -176,10 +176,11 @@ func (t *Tenant) Lookup(ctx context.Context) error { }) } } - return nil + + return ctx, nil } -func (a *App) Lookup(ctx context.Context) error { +func (a *App) Lookup(ctx context.Context) (context.Context, error) { fromContext, ok := common.FromContext(ctx, APP) if ok { *a = *fromContext.(*App) @@ -217,11 +218,11 @@ func (a *App) Lookup(ctx context.Context) error { } a.Tenant.Id = result["tenant"] - a.Tenant.Lookup(ctx) + ctx, _ = a.Tenant.Lookup(ctx) } log.Info("cache.go > Lookup > Setting App context", "appId", a.Id) - common.UpdateContext(ctx, a) + ctx = common.UpdateContext(ctx, a) common.LogContext(ctx, APP) if expired(a) { @@ -231,7 +232,7 @@ func (a *App) Lookup(ctx context.Context) error { }) } } - return nil + return ctx, nil } func (a *App) Rules(ctx context.Context) (Apprules, error) { @@ -281,7 +282,7 @@ func (a *App) Rules(ctx context.Context) (Apprules, error) { } // Lookup by name, p should have a valid "Name" set before lookup -func (p *Product) Lookup(ctx context.Context) error { +func (p *Product) Lookup(ctx context.Context) (context.Context, error) { fromContext, ok := common.FromContext(ctx, PRODUCT) if ok { *p = *fromContext.(*Product) @@ -322,7 +323,7 @@ func (p *Product) Lookup(ctx context.Context) error { } log.Info("cache.go > Lookup > Setting Product context", "productId", p.Id) - common.UpdateContext(ctx, p) + ctx = common.UpdateContext(ctx, p) if expired(p) { log.Info("Product cache expired. Refreshing", "productId", p.Id) @@ -331,7 +332,7 @@ func (p *Product) Lookup(ctx context.Context) error { }) } } - return nil + return ctx, nil } func RelaytxFromKey(ctx context.Context, key string) (*Relaytx, bool) { @@ -391,7 +392,7 @@ func (a *App) refresh(ctx context.Context) error { log.Error("Failed to fetch app", "appId", a.Id, "error", err) a.MissedAt = time.Now() } else { - a.Tenant.Lookup(ctx) + ctx, _ = a.Tenant.Lookup(ctx) } err = a.cache(ctx) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index 1ec2d1c..7e0b307 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -22,7 +22,7 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { status: status, } - log.Info("NewUsageUpdater Being") + log.Info("NewUsageUpdater Begin") common.LogContext(ctx, PRODUCT) common.LogContext(ctx, APP) common.LogContext(ctx, TENANT) diff --git a/gateway/plugins/apikeyauth.go b/gateway/plugins/apikeyauth.go index 94d49ec..7f71d83 100644 --- a/gateway/plugins/apikeyauth.go +++ b/gateway/plugins/apikeyauth.go @@ -39,7 +39,7 @@ func (a *ApiKeyAuth) HandleRequest(req *http.Request) error { if validApiKey(apiKey) { appId := proxy.PluckAppId(req) app := &db.App{Id: appId} - err := app.Lookup(ctx) + ctx, err := app.Lookup(ctx) if err != nil { return proxy.NewHTTPError(http.StatusBadGateway) } diff --git a/gateway/plugins/balance.go b/gateway/plugins/balance.go index e09cfec..13e5587 100644 --- a/gateway/plugins/balance.go +++ b/gateway/plugins/balance.go @@ -46,7 +46,7 @@ func (b *BalanceTracker) HandleRequest(req *http.Request) error { ctx := req.Context() appId := proxy.PluckAppId(req) app := &db.App{Id: appId} - err := app.Lookup(ctx) + ctx, err := app.Lookup(ctx) if err != nil { return proxy.NewHTTPError(http.StatusNotFound) } diff --git a/gateway/plugins/leaky.go b/gateway/plugins/leaky.go index ce4e0d5..ce869ed 100644 --- a/gateway/plugins/leaky.go +++ b/gateway/plugins/leaky.go @@ -43,7 +43,7 @@ func (l *LeakyBucketPlugin) HandleRequest(req *http.Request) error { ctx := req.Context() appId := proxy.PluckAppId(req) app := &db.App{Id: appId} - err := app.Lookup(ctx) + ctx, err := app.Lookup(ctx) if err != nil { log.Error("unable to lookup app", "app", app.HashId(), "err", err) } diff --git a/gateway/plugins/origin.go b/gateway/plugins/origin.go index eda2608..62a97d3 100644 --- a/gateway/plugins/origin.go +++ b/gateway/plugins/origin.go @@ -36,7 +36,7 @@ func (a *AllowedOriginFilter) HandleRequest(req *http.Request) error { app := &db.App{ Id: proxy.PluckAppId(req), } - err := app.Lookup(ctx) + ctx, err := app.Lookup(ctx) if err != nil { return proxy.NewHTTPError(http.StatusNotFound) } @@ -56,7 +56,7 @@ func (a *AllowedOriginFilter) HandleResponse(resp *http.Response) error { app := &db.App{ Id: proxy.PluckAppId(resp.Request), } - err := app.Lookup(ctx) + ctx, err := app.Lookup(ctx) if err != nil { return nil // don't modify header } diff --git a/gateway/plugins/productfilter.go b/gateway/plugins/productfilter.go index f4179e4..5399dcd 100644 --- a/gateway/plugins/productfilter.go +++ b/gateway/plugins/productfilter.go @@ -36,7 +36,7 @@ func (p *ProductFilter) HandleRequest(req *http.Request) error { Id: proxy.PluckAppId(req), } - err := app.Lookup(ctx) + ctx, err := app.Lookup(ctx) if err != nil { return proxy.NewHTTPError(http.StatusNotFound) } diff --git a/gateway/plugins/useragent.go b/gateway/plugins/useragent.go index 0aa6a24..0cf9624 100644 --- a/gateway/plugins/useragent.go +++ b/gateway/plugins/useragent.go @@ -1,78 +1,77 @@ package plugins import ( - "context" - log "log/slog" - "net/http" - "regexp" + "context" + log "log/slog" + "net/http" + "regexp" - "porters/db" - "porters/proxy" + "porters/db" + "porters/proxy" ) const ( - UA_TYPE_ID string = "allowed-user-agents" + UA_TYPE_ID string = "allowed-user-agents" ) type UserAgentFilter struct { - } func (u *UserAgentFilter) Name() string { - return "User Agent Filter" + return "User Agent Filter" } func (u *UserAgentFilter) Key() string { - return "USERAGENT" + return "USERAGENT" } func (u *UserAgentFilter) Load() { - log.Debug("Loading plugin", "plugin", u.Name()) + log.Debug("Loading plugin", "plugin", u.Name()) } func (u *UserAgentFilter) HandleRequest(req *http.Request) error { - ctx := req.Context() - ua := req.UserAgent() - appId := proxy.PluckAppId(req) - app := &db.App{Id: appId} - err := app.Lookup(ctx) - if err != nil { - return proxy.NewHTTPError(http.StatusNotFound) - } + ctx := req.Context() + ua := req.UserAgent() + appId := proxy.PluckAppId(req) + app := &db.App{Id: appId} + ctx, err := app.Lookup(ctx) + if err != nil { + return proxy.NewHTTPError(http.StatusNotFound) + } - rules := u.getRulesForScope(ctx, app) - success := (len(rules) == 0) + rules := u.getRulesForScope(ctx, app) + success := (len(rules) == 0) - for _, rule := range rules { - if (rule.MatchString(ua)) { - success = true - break - } - } + for _, rule := range rules { + if rule.MatchString(ua) { + success = true + break + } + } - if !success { - return proxy.NewHTTPError(http.StatusUnauthorized) - } - return nil + if !success { + return proxy.NewHTTPError(http.StatusUnauthorized) + } + return nil } func (u *UserAgentFilter) getRulesForScope(ctx context.Context, app *db.App) []regexp.Regexp { - useragents := make([]regexp.Regexp, 0) - rules, err := app.Rules(ctx) - if err != nil { - log.Error("couldn't get rules", "err", err) - } else { - for _, rule := range rules { - if rule.RuleType != UA_TYPE_ID || !rule.Active { - continue - } - matcher, err := regexp.Compile(rule.Value) - if err != nil { - log.Error("unable to compile regexp", "regex", rule.Value, "err", err) - } else { - useragents = append(useragents, *matcher) - } - } - } - return useragents + useragents := make([]regexp.Regexp, 0) + rules, err := app.Rules(ctx) + if err != nil { + log.Error("couldn't get rules", "err", err) + } else { + for _, rule := range rules { + if rule.RuleType != UA_TYPE_ID || !rule.Active { + continue + } + matcher, err := regexp.Compile(rule.Value) + if err != nil { + log.Error("unable to compile regexp", "regex", rule.Value, "err", err) + } else { + useragents = append(useragents, *matcher) + } + } + } + return useragents } diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index 4b2e454..06ee42c 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -205,7 +205,7 @@ func lookupPoktId(req *http.Request) (string, bool) { ctx := req.Context() name := PluckProductName(req) product := &db.Product{Name: name} - err := product.Lookup(ctx) + ctx, err := product.Lookup(ctx) if err != nil { log.Error("product not found", "product", product.Name, "err", err) return "", false From a5cdd1bc9ab7b48843488eebc88002fb7834f059 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 03:10:56 +0200 Subject: [PATCH 31/38] updated context management, added additional logging --- gateway/db/usage.go | 10 +-- gateway/plugins/apikeyauth.go | 115 +++++++++++++++++----------------- gateway/plugins/balance.go | 4 +- gateway/plugins/origin.go | 2 + gateway/proxy/proxy.go | 1 + 5 files changed, 68 insertions(+), 64 deletions(-) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index 7e0b307..c9bf203 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -29,18 +29,18 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { entity, ok := common.FromContext(ctx, PRODUCT) if !ok || entity == nil { - log.Error("usage.go > NewUsageUpdater > Failed to get product from context") + log.Error("Failed to get product from context") } else { updater.product = entity.(*Product) - log.Info("usage.go > NewUsageUpdater > retrieved product entity", "product", updater.product) + log.Debug("Retrieved product entity", "product", updater.product) } entity, ok = common.FromContext(ctx, APP) if !ok || entity == nil { - log.Error("usage.go > NewUsageUpdater > Failed to get app from context") + log.Error("Failed to get app from context") } else { updater.app = entity.(*App) - log.Info("usage.go > NewUsageUpdater > retrieved app entity", "app", updater.app) + log.Debug("Retrieved app entity", "app", updater.app) } // Ensure app is not nil before accessing Tenant @@ -56,7 +56,7 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context", "tenantId", updater.app.Tenant.Id, "appId", updater.app.Id, "product", updater.product) } else { tenant := entity.(*Tenant) - log.Info("usage.go > retrieved tenant entity", "tenant", tenant) + log.Debug("Retrieved tenant entity", "tenant", tenant) updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) updater.tenant = tenant } diff --git a/gateway/plugins/apikeyauth.go b/gateway/plugins/apikeyauth.go index 7f71d83..3e1e573 100644 --- a/gateway/plugins/apikeyauth.go +++ b/gateway/plugins/apikeyauth.go @@ -4,92 +4,91 @@ package plugins // Implements Filter interface import ( - "context" - log "log/slog" - "net/http" + "context" + log "log/slog" + "net/http" - "porters/common" - "porters/db" - "porters/proxy" - "porters/utils" + "porters/common" + "porters/db" + "porters/proxy" + "porters/utils" ) type ApiKeyAuth struct { - ApiKeyName string + ApiKeyName string } func (a *ApiKeyAuth) Name() string { - return "API Key Auth" + return "API Key Auth" } func (a *ApiKeyAuth) Load() { - // load plugin - log.Debug("loading plugin", "plugin", a.Name()) + // load plugin + log.Debug("loading plugin", "plugin", a.Name()) } func (a *ApiKeyAuth) Key() string { - return "API_KEY_AUTH" + return "API_KEY_AUTH" } func (a *ApiKeyAuth) HandleRequest(req *http.Request) error { - ctx := req.Context() - apiKey := req.Header.Get(a.ApiKeyName) - newCtx := context.WithValue(req.Context(), proxy.AUTH_VAL, apiKey) + ctx := req.Context() + apiKey := req.Header.Get(a.ApiKeyName) + newCtx := context.WithValue(req.Context(), proxy.AUTH_VAL, apiKey) - if validApiKey(apiKey) { - appId := proxy.PluckAppId(req) - app := &db.App{Id: appId} - ctx, err := app.Lookup(ctx) - if err != nil { - return proxy.NewHTTPError(http.StatusBadGateway) - } + if validApiKey(apiKey) { + appId := proxy.PluckAppId(req) + app := &db.App{Id: appId} + ctx, err := app.Lookup(ctx) + if err != nil { + return proxy.NewHTTPError(http.StatusBadGateway) + } - hashedKey := utils.Hash(apiKey) - rules := a.getRulesForScope(ctx, app) - success := (len(rules) == 0) + hashedKey := utils.Hash(apiKey) + rules := a.getRulesForScope(ctx, app) + success := (len(rules) == 0) - for _, rule := range rules { - if hashedKey == rule { - success = true - break - } - } - if success { - lifecycle := proxy.SetStageComplete(newCtx, proxy.Auth) - newCtx = common.UpdateContext(newCtx, lifecycle) - *req = *req.WithContext(newCtx) - return nil - } - } else { - return proxy.APIKeyInvalidError - } - return proxy.APIKeyInvalidError + for _, rule := range rules { + if hashedKey == rule { + success = true + break + } + } + if success { + lifecycle := proxy.SetStageComplete(newCtx, proxy.Auth) + newCtx = common.UpdateContext(newCtx, lifecycle) + *req = *req.WithContext(newCtx) + return nil + } + } else { + return proxy.APIKeyInvalidError + } + return proxy.APIKeyInvalidError } func (a *ApiKeyAuth) getRulesForScope(ctx context.Context, app *db.App) []string { - apirules := make([]string, 0) - rules, err := app.Rules(ctx) - if err != nil { - log.Error("error getting rules", "app", app.HashId(), "err", err) - } else { - for _, rule := range rules { - if rule.RuleType != "secret-key" || !rule.Active { - continue - } - apirules = append(apirules, rule.Value) - } - } - return apirules + apirules := make([]string, 0) + rules, err := app.Rules(ctx) + if err != nil { + log.Error("error getting rules", "app", app.HashId(), "err", err) + } else { + for _, rule := range rules { + if rule.RuleType != "secret-key" || !rule.Active { + continue + } + apirules = append(apirules, rule.Value) + } + } + return apirules } // TODO check api key is in valid format to quickly determine errant requests func validApiKey(apiKey string) bool { - // TODO add other checks - return checksumApiKey(apiKey) + // TODO add other checks + return checksumApiKey(apiKey) } // TODO implement CRC or something to quickly check api key as in spec func checksumApiKey(apiKey string) bool { - return true + return true } - diff --git a/gateway/plugins/balance.go b/gateway/plugins/balance.go index 13e5587..e416729 100644 --- a/gateway/plugins/balance.go +++ b/gateway/plugins/balance.go @@ -65,11 +65,13 @@ func (b *BalanceTracker) HandleRequest(req *http.Request) error { if bal.cachedBalance > 0 { lifecycle := proxy.SetStageComplete(ctx, proxy.BalanceCheck|proxy.AccountLookup) ctx = common.UpdateContext(ctx, lifecycle) - *req = *req.WithContext(ctx) } else { log.Error("no balance remaining", "app", app.HashId()) return proxy.BalanceExceededError } + + *req = *req.WithContext(ctx) + return nil } diff --git a/gateway/plugins/origin.go b/gateway/plugins/origin.go index 62a97d3..6d6c463 100644 --- a/gateway/plugins/origin.go +++ b/gateway/plugins/origin.go @@ -48,6 +48,8 @@ func (a *AllowedOriginFilter) HandleRequest(req *http.Request) error { return proxy.NewHTTPError(http.StatusUnauthorized) } + *req = *req.WithContext(ctx) + return nil } diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index 06ee42c..259eb16 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -155,6 +155,7 @@ func setupProxy(remote *url.URL) *httputil.ReverseProxy { } if resp.StatusCode < 400 && err == nil { + log.Info("Response", "resp", resp) updater := db.NewUsageUpdater(ctx, "success") common.GetTaskQueue().Add(updater) } From 46fb3373c06a8b7b911b1a16505d8bdbf937a8f3 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 03:11:44 +0200 Subject: [PATCH 32/38] fixed broken import --- gateway/proxy/health.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/gateway/proxy/health.go b/gateway/proxy/health.go index 3d1d21e..1d4c27c 100644 --- a/gateway/proxy/health.go +++ b/gateway/proxy/health.go @@ -1,15 +1,14 @@ package proxy import ( - "io" - "net/http" - - "porters/db" + "io" + "net/http" + "porters/db" ) // TODO other healthchecks should be added func healthHandler(resp http.ResponseWriter, req *http.Request) { - hc := (&db.Cache{}).Healthcheck() - resp.Header().Set("Content-Type", "application/json") - io.WriteString(resp, hc.ToJson()) + hc := (&db.Cache{}).Healthcheck() + resp.Header().Set("Content-Type", "application/json") + io.WriteString(resp, hc.ToJson()) } From ec80f8a2d7e75cb3db8d6fb581bb305e1948b5d9 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 03:26:12 +0200 Subject: [PATCH 33/38] changes to ErrorHandler since it may be triggered when context is not fully set. --- gateway/db/usage.go | 3 ++- gateway/proxy/proxy.go | 10 +++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index c9bf203..35b377a 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -22,7 +22,8 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { status: status, } - log.Info("NewUsageUpdater Begin") + log.Info("NewUsageUpdater Begin", "status", status) + common.LogContext(ctx, PRODUCT) common.LogContext(ctx, APP) common.LogContext(ctx, TENANT) diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index 259eb16..fd455e3 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -155,7 +155,7 @@ func setupProxy(remote *url.URL) *httputil.ReverseProxy { } if resp.StatusCode < 400 && err == nil { - log.Info("Response", "resp", resp) + log.Info("Success response", "resp", resp) updater := db.NewUsageUpdater(ctx, "success") common.GetTaskQueue().Add(updater) } @@ -168,6 +168,14 @@ func setupProxy(remote *url.URL) *httputil.ReverseProxy { var httpErr *HTTPError cause := context.Cause(ctx) + //While we are getting the context, when handling the error state we have no guarantee the app has been set + //So we must assume it has not... + appId := PluckAppId(req) + log.Info("ErrorHandler Pluck App Id", "appId", appId) + + app := &db.App{Id: appId} + ctx, err = app.Lookup(ctx) + log.Error("Error during relay attempt", "cause", cause) updater := db.NewUsageUpdater(ctx, "failure") common.GetTaskQueue().Add(updater) From c322fcd551a5f007b4f97cd46625c864c3be8b9a Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 03:36:12 +0200 Subject: [PATCH 34/38] added logs to investigate tenant context --- gateway/db/cache.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/gateway/db/cache.go b/gateway/db/cache.go index d698932..464d9f5 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -137,6 +137,7 @@ func (p *Product) cache(ctx context.Context) error { func (t *Tenant) Lookup(ctx context.Context) (context.Context, error) { fromContext, ok := common.FromContext(ctx, TENANT) if ok { + log.Info("cache.go > Tenant Lookup > Retrieving from context", "t", t) *t = *fromContext.(*Tenant) } else { key := t.Key() @@ -166,6 +167,7 @@ func (t *Tenant) Lookup(ctx context.Context) (context.Context, error) { } } + log.Info("cache.go > Tenant Lookup > Updating Context", "t", t) ctx = common.UpdateContext(ctx, t) common.LogContext(ctx, TENANT) @@ -218,6 +220,8 @@ func (a *App) Lookup(ctx context.Context) (context.Context, error) { } a.Tenant.Id = result["tenant"] + + log.Info("cache.go > App Lookup > Updating Tenant") ctx, _ = a.Tenant.Lookup(ctx) } @@ -392,6 +396,7 @@ func (a *App) refresh(ctx context.Context) error { log.Error("Failed to fetch app", "appId", a.Id, "error", err) a.MissedAt = time.Now() } else { + log.Info("cache.go > App refresh > Updating Tenant") ctx, _ = a.Tenant.Lookup(ctx) } From 04c22e28c9f6c9487abaf4b5c63a9b3a7b5d9881 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 03:57:19 +0200 Subject: [PATCH 35/38] added missing tenant Id set --- gateway/db/cache.go | 1 + 1 file changed, 1 insertion(+) diff --git a/gateway/db/cache.go b/gateway/db/cache.go index 464d9f5..53719c1 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -141,6 +141,7 @@ func (t *Tenant) Lookup(ctx context.Context) (context.Context, error) { *t = *fromContext.(*Tenant) } else { key := t.Key() + t.Id = key result, err := getCache().HGetAll(ctx, key).Result() if err != nil || len(result) == 0 { log.Debug("tenant cache missing", "key", key) From 32672c817411d9423efc6e430f3e7a85683fbf45 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 04:07:25 +0200 Subject: [PATCH 36/38] reverted id set, since it was using redis key --- gateway/db/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway/db/cache.go b/gateway/db/cache.go index 53719c1..ae5a74c 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -140,8 +140,8 @@ func (t *Tenant) Lookup(ctx context.Context) (context.Context, error) { log.Info("cache.go > Tenant Lookup > Retrieving from context", "t", t) *t = *fromContext.(*Tenant) } else { + log.Info("cache.go > Tenant Lookup > Tenant not in context, retrieving", "t", t) key := t.Key() - t.Id = key result, err := getCache().HGetAll(ctx, key).Result() if err != nil || len(result) == 0 { log.Debug("tenant cache missing", "key", key) From a32800dd81ef0a315d7c04df643e074b22e61c54 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 04:15:38 +0200 Subject: [PATCH 37/38] removed redundant tenant from usage.go --- gateway/db/usage.go | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/gateway/db/usage.go b/gateway/db/usage.go index 35b377a..b3fef3c 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -13,7 +13,6 @@ type UsageUpdater struct { status string balancekey string app *App - tenant *Tenant product *Product } @@ -50,27 +49,12 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { return updater } - // Ensure tenant is retrieved from context - if updater.app.Tenant.Id != "" { - entity, ok = common.FromContext(ctx, TENANT) - if !ok || entity == nil { - log.Error("usage.go > NewUsageUpdater > Failed to get tenant from context", "tenantId", updater.app.Tenant.Id, "appId", updater.app.Id, "product", updater.product) - } else { - tenant := entity.(*Tenant) - log.Debug("Retrieved tenant entity", "tenant", tenant) - updater.balancekey = fmt.Sprintf("BALANCE:%s", tenant.Id) - updater.tenant = tenant - } - } else { - log.Error("usage.go > NewUsageUpdater > app.Tenant is not properly initialized", "appId", updater.app.Id) - } - return updater } func (u *UsageUpdater) Run() { - if u.app == nil || u.tenant == nil || u.product == nil { - log.Error("usage.go > UsageUpdated > Invalid request, usage not reported", "app", u.app, "tenant", u.tenant, "product", u.product) + if u.app == nil || u.product == nil { + log.Error("usage.go > UsageUpdated > Invalid request, usage not reported", "app", u.app, "product", u.product) return } From 8deecaf3007e52055749823d71b1d70e64ec66e3 Mon Sep 17 00:00:00 2001 From: Matthew Scerri <32162885+scermat@users.noreply.github.com> Date: Sat, 27 Jul 2024 04:36:31 +0200 Subject: [PATCH 38/38] cleaned up logs, reset worker config --- gateway/common/context.go | 5 +++-- gateway/common/tasks.go | 6 +++--- gateway/db/cache.go | 43 +++++++++++++++++-------------------- gateway/db/usage.go | 11 ++-------- gateway/proxy/proxy.go | 4 +--- gateway/proxy/reconciler.go | 6 ++---- 6 files changed, 31 insertions(+), 44 deletions(-) diff --git a/gateway/common/context.go b/gateway/common/context.go index 5680c34..58de89e 100644 --- a/gateway/common/context.go +++ b/gateway/common/context.go @@ -19,7 +19,7 @@ type Instrument struct { } func UpdateContext(ctx context.Context, entity Contextable) context.Context { - log.Info("*** Updating context ***", "key", entity.ContextKey(), "entity", entity) + log.Debug("Updating context", "key", entity.ContextKey(), "entity", entity) return context.WithValue(ctx, entity.ContextKey(), entity) } @@ -32,8 +32,9 @@ func FromContext(ctx context.Context, contextkey string) (any, bool) { } } +// Leaving here for debugging purposes func LogContext(ctx context.Context, contextkey string) { - log.Info("Context Value for", "key", contextkey, "val", ctx.Value(contextkey)) + log.Debug("Context Value for", "key", contextkey, "val", ctx.Value(contextkey)) } func StartInstrument() *Instrument { diff --git a/gateway/common/tasks.go b/gateway/common/tasks.go index 643b82f..f11a0d6 100644 --- a/gateway/common/tasks.go +++ b/gateway/common/tasks.go @@ -56,9 +56,9 @@ func GetTaskQueue() *TaskQueue { } func (q *TaskQueue) SetupWorkers() { - // TODO: Undo once stable!! - // numWorkers := GetConfigInt(NUM_WORKERS) - numWorkers := 1 + // When debugging, may be worth setting to 1 worker, to avoid multiple threads cross emitting logs + numWorkers := GetConfigInt(NUM_WORKERS) + for i := 0; i < numWorkers; i++ { go worker(q) } diff --git a/gateway/db/cache.go b/gateway/db/cache.go index ae5a74c..1b9b113 100644 --- a/gateway/db/cache.go +++ b/gateway/db/cache.go @@ -137,10 +137,10 @@ func (p *Product) cache(ctx context.Context) error { func (t *Tenant) Lookup(ctx context.Context) (context.Context, error) { fromContext, ok := common.FromContext(ctx, TENANT) if ok { - log.Info("cache.go > Tenant Lookup > Retrieving from context", "t", t) + log.Debug("Retrieving Tenant from context", "t", t) *t = *fromContext.(*Tenant) } else { - log.Info("cache.go > Tenant Lookup > Tenant not in context, retrieving", "t", t) + log.Debug("Tenant not in context, retrieving from cache or db", "t", t) key := t.Key() result, err := getCache().HGetAll(ctx, key).Result() if err != nil || len(result) == 0 { @@ -168,12 +168,10 @@ func (t *Tenant) Lookup(ctx context.Context) (context.Context, error) { } } - log.Info("cache.go > Tenant Lookup > Updating Context", "t", t) ctx = common.UpdateContext(ctx, t) - common.LogContext(ctx, TENANT) if expired(t) { - log.Info("cache.go > Lookup > Tenant cache expired. Refreshing", "tenantId", t.Id) + log.Debug("Tenant cache expired. Queing refresh", "tenantId", t.Id) common.GetTaskQueue().Add(&RefreshTask{ ref: t, }) @@ -222,16 +220,15 @@ func (a *App) Lookup(ctx context.Context) (context.Context, error) { a.Tenant.Id = result["tenant"] - log.Info("cache.go > App Lookup > Updating Tenant") + log.Debug("Updating Tenant via Lookup") ctx, _ = a.Tenant.Lookup(ctx) } - log.Info("cache.go > Lookup > Setting App context", "appId", a.Id) + log.Debug("Setting App context", "appId", a.Id) ctx = common.UpdateContext(ctx, a) - common.LogContext(ctx, APP) if expired(a) { - log.Info("App cache expired. Refreshing", "appId", a.Id) + log.Debug("App cache expired. Refreshing", "appId", a.Id) common.GetTaskQueue().Add(&RefreshTask{ ref: a, }) @@ -275,7 +272,7 @@ func (a *App) Rules(ctx context.Context) (Apprules, error) { // Check if the Apprule needs to be refreshed if expired(&ar) { - log.Info("Apprule cache is expired. Refreshing", "apprule", ar.Id) + log.Debug("Apprule cache is expired. Refreshing", "apprule", ar.Id) common.GetTaskQueue().Add(&RefreshTask{ ref: &ar, }) @@ -327,11 +324,11 @@ func (p *Product) Lookup(ctx context.Context) (context.Context, error) { } } - log.Info("cache.go > Lookup > Setting Product context", "productId", p.Id) + log.Debug("Setting Product context", "productId", p.Id) ctx = common.UpdateContext(ctx, p) if expired(p) { - log.Info("Product cache expired. Refreshing", "productId", p.Id) + log.Debug("Product cache expired. Refreshing", "productId", p.Id) common.GetTaskQueue().Add(&RefreshTask{ ref: p, }) @@ -342,13 +339,13 @@ func (p *Product) Lookup(ctx context.Context) (context.Context, error) { func RelaytxFromKey(ctx context.Context, key string) (*Relaytx, bool) { relaycount := GetIntVal(ctx, key) - //log.Info("Relay count", "relaycount", relaycount) + log.Debug("RelaytxFromKey, Relay count", "relaycount", relaycount) rtx := reverseRelaytxKey(key) - //log.Info("Reverse relay key", "rtx", rtx) + log.Debug("Reverse relay key", "rtx", rtx) - //log.Info("AppId", "AppId", rtx.AppId) - //log.Info("ProductName", "ProductName", rtx.ProductName) + log.Debug("AppId", "AppId", rtx.AppId) + log.Debug("ProductName", "ProductName", rtx.ProductName) if relaycount > 0 && rtx.AppId != "" && rtx.ProductName != "" { uuid := uuid.New() @@ -363,22 +360,22 @@ func RelaytxFromKey(ctx context.Context, key string) (*Relaytx, bool) { // Refresh does the psql calls to build cache func (t *Tenant) refresh(ctx context.Context) error { - log.Info("cache.go > refresh > Refreshing tenant", "tenantId", t.Id) + log.Debug("Refreshing tenant", "tenantId", t.Id) err := t.fetch(ctx) if err != nil { - log.Error("cache.go > refresh > Failed to fetch tenant", "tenantId", t.Id, "error", err) + log.Error("Failed to fetch tenant", "tenantId", t.Id, "error", err) return err } err = t.canonicalBalance(ctx) if err != nil { - log.Error("cache.go > refresh > Failed to get canonical balance", "tenantId", t.Id, "error", err) + log.Error("Failed to get canonical balance", "tenantId", t.Id, "error", err) return err } err = t.cache(ctx) if err != nil { - log.Error("cache.go > refresh > Failed to cache tenant", "tenantId", t.Id, "error", err) + log.Error("Failed to cache tenant", "tenantId", t.Id, "error", err) return err } @@ -391,13 +388,13 @@ func (a *App) refresh(ctx context.Context) error { return errors.New("App is nil") } - log.Info("Refreshing app", "appId", a.Id) + log.Debug("Refreshing app", "appId", a.Id) err := a.fetch(ctx) if err != nil { log.Error("Failed to fetch app", "appId", a.Id, "error", err) a.MissedAt = time.Now() } else { - log.Info("cache.go > App refresh > Updating Tenant") + log.Debug("Updating Tenant via Lookup") ctx, _ = a.Tenant.Lookup(ctx) } @@ -443,7 +440,7 @@ func (p *Product) refresh(ctx context.Context) error { } func (t *RefreshTask) Run() { - log.Info("Running refresh task", "refreshable", t.ref) + log.Debug("Running refresh task", "refreshable", t.ref) ctx := context.Background() err := t.ref.refresh(ctx) if err != nil { diff --git a/gateway/db/usage.go b/gateway/db/usage.go index b3fef3c..6193424 100644 --- a/gateway/db/usage.go +++ b/gateway/db/usage.go @@ -21,12 +21,6 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { status: status, } - log.Info("NewUsageUpdater Begin", "status", status) - - common.LogContext(ctx, PRODUCT) - common.LogContext(ctx, APP) - common.LogContext(ctx, TENANT) - entity, ok := common.FromContext(ctx, PRODUCT) if !ok || entity == nil { log.Error("Failed to get product from context") @@ -43,9 +37,8 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { log.Debug("Retrieved app entity", "app", updater.app) } - // Ensure app is not nil before accessing Tenant if updater.app == nil { - log.Error("usage.go > NewUsageUpdater > app is nil") + log.Error("App is nil when constructing new UsageUpdater") return updater } @@ -54,7 +47,7 @@ func NewUsageUpdater(ctx context.Context, status string) *UsageUpdater { func (u *UsageUpdater) Run() { if u.app == nil || u.product == nil { - log.Error("usage.go > UsageUpdated > Invalid request, usage not reported", "app", u.app, "product", u.product) + log.Error("Invalid request, usage not reported", "app", u.app, "product", u.product) return } diff --git a/gateway/proxy/proxy.go b/gateway/proxy/proxy.go index fd455e3..7cd9786 100644 --- a/gateway/proxy/proxy.go +++ b/gateway/proxy/proxy.go @@ -167,20 +167,18 @@ func setupProxy(remote *url.URL) *httputil.ReverseProxy { ctx := req.Context() var httpErr *HTTPError cause := context.Cause(ctx) + log.Error("Error during relay attempt", "cause", cause) //While we are getting the context, when handling the error state we have no guarantee the app has been set //So we must assume it has not... appId := PluckAppId(req) - log.Info("ErrorHandler Pluck App Id", "appId", appId) app := &db.App{Id: appId} ctx, err = app.Lookup(ctx) - log.Error("Error during relay attempt", "cause", cause) updater := db.NewUsageUpdater(ctx, "failure") common.GetTaskQueue().Add(updater) - log.Debug("cancel cause", "cause", cause) if errors.As(cause, &httpErr) { status := httpErr.code http.Error(resp, http.StatusText(status), status) diff --git a/gateway/proxy/reconciler.go b/gateway/proxy/reconciler.go index 2d1a271..88db16e 100644 --- a/gateway/proxy/reconciler.go +++ b/gateway/proxy/reconciler.go @@ -46,7 +46,7 @@ func (r *Reconciler) spawnTasks() { for iter.Next(ctx) { rtxkey := iter.Val() // use for building relaytx - log.Info("Reconciling tasks for key", "rtxkey", rtxkey) + log.Debug("Reconciling tasks for key", "rtxkey", rtxkey) rtx, ok := db.RelaytxFromKey(ctx, rtxkey) if ok { @@ -55,9 +55,7 @@ func (r *Reconciler) spawnTasks() { } queue.Add(task) - log.Info("Queued task", "rtxkey", rtxkey) - } else { - log.Error("Failed to queue task", "rtxkey", rtxkey) + log.Debug("Queued task", "rtxkey", rtxkey) } } }