Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issue with usage metrics not being reported or logged #332

Merged
merged 40 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b93f01f
updated logging
scermat Jul 25, 2024
7ddd027
Merge branch 'develop' of https://github.com/porters-xyz/gateway-demo…
scermat Jul 26, 2024
3320556
added t emporary logging to investigate issue
scermat Jul 26, 2024
7ae8bdb
updated logging for further investigation
scermat Jul 26, 2024
861d643
various modifications to ensure safety of execution and identify sour…
scermat Jul 26, 2024
1aaf5a2
mroe debugging changes
scermat Jul 26, 2024
2ae26a4
added checks in UsageUpdater
scermat Jul 26, 2024
3f70933
further logging for debugging
scermat Jul 26, 2024
0f43b59
further temporary logging
scermat Jul 26, 2024
81b95d5
switched proxy to single threaded to aid in debugging...
scermat Jul 26, 2024
4b62421
added more logging to identify why tenant context is empty
scermat Jul 26, 2024
946dd8f
reverted previous change
scermat Jul 26, 2024
2e9fa21
attempted bugfix
scermat Jul 26, 2024
d46e74b
more logging to try to understand tenant null behaviour
scermat Jul 26, 2024
1b9a245
fixed typo in redis key
scermat Jul 26, 2024
f4b34ce
added logging to investigate redis discrepancy
scermat Jul 26, 2024
f2371d4
more logs... trying to see why context is not updating on prod
scermat Jul 26, 2024
d01aa9b
removed tenant from UsageUpdater
scermat Jul 26, 2024
66fe08e
more debugging...
scermat Jul 26, 2024
a42fff0
testing out changes in UsageUpdater
scermat Jul 26, 2024
f86d22a
logging cause of failure in error handler
scermat Jul 26, 2024
ca15c64
usage issues appears to have been resolved, now investigating balance…
scermat Jul 26, 2024
0756658
reverted back to original, now that the source of the issue has been …
scermat Jul 26, 2024
8d64777
cleaned some logs, and added others to identify cause of pointer issue
scermat Jul 26, 2024
40a0b62
revised logs to better analyse state change of updater
scermat Jul 26, 2024
0d2204c
trying revised usage updater
scermat Jul 26, 2024
66dc5c8
yet more logging
scermat Jul 26, 2024
ff22be6
fixed missing argument in logContext
scermat Jul 26, 2024
37cb8fe
more logging changes
scermat Jul 26, 2024
bbb8993
fixed potential issue with cache mechanism, that may have been trigge…
scermat Jul 27, 2024
4daa8aa
revised handling of context, since it was being updated and not retur…
scermat Jul 27, 2024
a5cdd1b
updated context management, added additional logging
scermat Jul 27, 2024
46fb337
fixed broken import
scermat Jul 27, 2024
ec80f8a
changes to ErrorHandler since it may be triggered when context is not…
scermat Jul 27, 2024
c322fcd
added logs to investigate tenant context
scermat Jul 27, 2024
04c22e2
added missing tenant Id set
scermat Jul 27, 2024
32672c8
reverted id set, since it was using redis key
scermat Jul 27, 2024
a32800d
removed redundant tenant from usage.go
scermat Jul 27, 2024
8deecaf
cleaned up logs, reset worker config
scermat Jul 27, 2024
bc7e987
Merge pull request #331 from porters-xyz/poktscan-metrics-endpoint
scermat Jul 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions gateway/common/context.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,48 @@
package common

import (
"context"
"time"
"context"
log "log/slog"
"time"
)

const (
INSTRUMENT string = "INSTRUMENT_START"
INSTRUMENT string = "INSTRUMENT_START"
)

type Contextable interface {
ContextKey() string
ContextKey() string
}

type Instrument struct {
Timestamp time.Time
Timestamp time.Time
}

func UpdateContext(ctx context.Context, entity Contextable) context.Context {
return context.WithValue(ctx, entity.ContextKey(), entity)
log.Debug("Updating context", "key", entity.ContextKey(), "entity", entity)
return context.WithValue(ctx, entity.ContextKey(), entity)
}

func FromContext(ctx context.Context, contextkey string) (any, bool) {
value := ctx.Value(contextkey)
if value != nil {
return value, true
} else {
return nil, false
}
value := ctx.Value(contextkey)
if value != nil {
return value, true
} else {
return nil, false
}
}

// Leaving here for debugging purposes
func LogContext(ctx context.Context, contextkey string) {
log.Debug("Context Value for", "key", contextkey, "val", ctx.Value(contextkey))
}

func StartInstrument() *Instrument {
return &Instrument{
Timestamp: time.Now(),
}
return &Instrument{
Timestamp: time.Now(),
}
}

func (i *Instrument) ContextKey() string {
return INSTRUMENT
return INSTRUMENT
}
230 changes: 121 additions & 109 deletions gateway/common/tasks.go
Original file line number Diff line number Diff line change
@@ -1,180 +1,192 @@
package common

import (
"errors"
log "log/slog"
"sync"
"time"
"errors"
log "log/slog"
"sync"
"time"
)

type Runnable interface {
error
Run()
error
Run()
}

type Delayable interface {
Runnable
Ready() bool
Runnable
Ready() bool
}

// contains bits necessary to run later
type SimpleTask struct {
run func()
runtime time.Time
run func()
runtime time.Time
}

type RetryTask struct {
SimpleTask
runWithSuccess func() bool
retryCount int
retryEvery time.Duration
retryGen int
SimpleTask
runWithSuccess func() bool
retryCount int
retryEvery time.Duration
retryGen int
}

type TaskQueue struct {
closed bool
tasks chan Runnable
delayed chan Delayable
errors chan error
closed bool
tasks chan Runnable
delayed chan Delayable
errors chan error
}

var qInst *TaskQueue
var qmutex sync.Once

// Another singleton
func GetTaskQueue() *TaskQueue {
qmutex.Do(func() {
bufferSize := GetConfigInt(JOB_BUFFER_SIZE)
qInst = &TaskQueue{
closed: false,
tasks: make(chan Runnable, bufferSize),
delayed: make(chan Delayable, bufferSize),
errors: make(chan error, bufferSize),
}
})
return qInst
qmutex.Do(func() {
bufferSize := GetConfigInt(JOB_BUFFER_SIZE)
qInst = &TaskQueue{
closed: false,
tasks: make(chan Runnable, bufferSize),
delayed: make(chan Delayable, bufferSize),
errors: make(chan error, bufferSize),
}
})
return qInst
}

func (q *TaskQueue) SetupWorkers() {
numWorkers := GetConfigInt(NUM_WORKERS)
for i := 0; i < numWorkers; i++ {
go worker(q)
}
go delayWorker(q)
go errWorker(q)
// When debugging, may be worth setting to 1 worker, to avoid multiple threads cross emitting logs
numWorkers := GetConfigInt(NUM_WORKERS)

for i := 0; i < numWorkers; i++ {
go worker(q)
}
go delayWorker(q)
go errWorker(q)
}

// use this for graceful shutdown
func (q *TaskQueue) CloseQueue() {
close(q.tasks)
close(q.delayed)
q.closed = true

shutdownTime := time.Duration(GetConfigInt(SHUTDOWN_DELAY)) * time.Second
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
if len(q.tasks) == 0 {
return
}
case <-time.After(shutdownTime):
log.Warn("workers not finished, work may be lost")
return
}
}
close(q.tasks)
close(q.delayed)
q.closed = true

shutdownTime := time.Duration(GetConfigInt(SHUTDOWN_DELAY)) * time.Second
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
if len(q.tasks) == 0 {
return
}
case <-time.After(shutdownTime):
log.Warn("workers not finished, work may be lost")
return
}
}
}

func (q *TaskQueue) Add(runnable Runnable) {
q.tasks <- runnable
JobGauge.WithLabelValues("task").Inc()
q.tasks <- runnable
JobGauge.WithLabelValues("task").Inc()
}

func (q *TaskQueue) Delay(delayable Delayable) {
q.delayed <- delayable
JobGauge.WithLabelValues("delayed").Inc()
q.delayed <- delayable
JobGauge.WithLabelValues("delayed").Inc()
}

func (q *TaskQueue) ReportError(err error) {
q.errors <- err
JobGauge.WithLabelValues("error").Inc()
q.errors <- err
JobGauge.WithLabelValues("error").Inc()
}

func worker(q *TaskQueue) {
for task := range q.tasks {
switch t := task.(type) {
case Combinable:
task.(Combinable).Combine(q.tasks)
case Runnable:
task.Run()
default:
log.Debug("unspecified task", "task", task, "type", t)
}
JobGauge.WithLabelValues("task").Set(float64(len(q.tasks)))
}
for task := range q.tasks {
processTask(task, q)
JobGauge.WithLabelValues("task").Set(float64(len(q.tasks)))
}
}

func processTask(task Runnable, q *TaskQueue) {
defer func() {
if r := recover(); r != nil {
log.Error("Recovered in worker", "error", r)
}
}()

switch t := task.(type) {
case Combinable:
task.(Combinable).Combine(q.tasks)
case Runnable:
task.Run()
default:
log.Warn("unspecified task", "task", task, "type", t)
}
}

func errWorker(q *TaskQueue) {
for err := range q.errors {
log.Error("error encountered", "err", err)
JobGauge.WithLabelValues("error").Dec()
}
for err := range q.errors {
log.Error("error encountered", "err", err)
JobGauge.WithLabelValues("error").Dec()
}
}

func delayWorker(q *TaskQueue) {
for i:=len(q.delayed); i>0; i-- {
task := <- q.delayed
if q.closed {
q.ReportError(errors.New("Shutting down"))
} else if task.Ready() {
q.Add(task)
JobGauge.WithLabelValues("delayed").Dec()
} else {
q.delayed <- task
}
}
time.Sleep(1 * time.Second)
for i := len(q.delayed); i > 0; i-- {
task := <-q.delayed
if q.closed {
q.ReportError(errors.New("Shutting down"))
} else if task.Ready() {
q.Add(task)
JobGauge.WithLabelValues("delayed").Dec()
} else {
q.delayed <- task
}
}
time.Sleep(1 * time.Second)
}

func NewSimpleTask(run func()) *SimpleTask {
return &SimpleTask{
run: run,
runtime: time.Now(),
}
return &SimpleTask{
run: run,
runtime: time.Now(),
}
}

// SimpleTask can be extended if needed
func (t *SimpleTask) Run() {
t.run()
t.run()
}

func (t *SimpleTask) Ready() bool {
return time.Now().After(t.runtime)
return time.Now().After(t.runtime)
}

func (t *SimpleTask) Error() string {
// Override to include more details
return "error processing async task"
// Override to include more details
return "error processing async task"
}

func NewRetryTask(run func() bool, count int, every time.Duration) *RetryTask {
return &RetryTask{
runWithSuccess: run,
retryCount: count,
retryEvery: every,
}
return &RetryTask{
runWithSuccess: run,
retryCount: count,
retryEvery: every,
}
}

func (r *RetryTask) Run() {
ok := r.runWithSuccess()
if !ok {
q := GetTaskQueue()
if r.retryGen < r.retryCount {
r.runtime = time.Now().Add(r.retryEvery)
r.retryGen++
q.delayed <- r
} else {
q.errors <- r
}
}
ok := r.runWithSuccess()
if !ok {
q := GetTaskQueue()
if r.retryGen < r.retryCount {
r.runtime = time.Now().Add(r.retryEvery)
r.retryGen++
q.delayed <- r
} else {
q.errors <- r
}
}
}
Loading