From 8327310d5256c8877fc9012634ca6728eef60b29 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 19 Sep 2024 12:28:42 -0300 Subject: [PATCH] improve SimplePool authHandler, rename IncomingEvent to RelayEvent so we can use it there. --- pool.go | 53 +++++++++++++++++++++++++++----------------------- sdk/system.go | 2 +- sdk/tracker.go | 2 +- 3 files changed, 31 insertions(+), 26 deletions(-) diff --git a/pool.go b/pool.go index 846d176..d91954c 100644 --- a/pool.go +++ b/pool.go @@ -21,10 +21,10 @@ type SimplePool struct { Relays *xsync.MapOf[string, *Relay] Context context.Context - authHandler func(*Event) error + authHandler func(context.Context, RelayEvent) error cancel context.CancelFunc - eventMiddleware []func(IncomingEvent) + eventMiddleware []func(RelayEvent) // custom things not often used signatureChecker func(Event) bool @@ -37,12 +37,12 @@ type DirectedFilters struct { Relay string } -type IncomingEvent struct { +type RelayEvent struct { *Event Relay *Relay } -func (ie IncomingEvent) String() string { +func (ie RelayEvent) String() string { return fmt.Sprintf("[%s] >> %s", ie.Relay.URL, ie.Event) } @@ -70,7 +70,7 @@ func NewSimplePool(ctx context.Context, opts ...PoolOption) *SimplePool { // WithAuthHandler must be a function that signs the auth event when called. // it will be called whenever any relay in the pool returns a `CLOSED` message // with the "auth-required:" prefix, only once for each relay -type WithAuthHandler func(authEvent *Event) error +type WithAuthHandler func(ctx context.Context, authEvent RelayEvent) error func (h WithAuthHandler) ApplyPoolOption(pool *SimplePool) { pool.authHandler = h @@ -114,7 +114,7 @@ func (h withPenaltyBoxOpt) ApplyPoolOption(pool *SimplePool) { // WithEventMiddleware is a function that will be called with all events received. // more than one can be passed at a time. -type WithEventMiddleware func(IncomingEvent) +type WithEventMiddleware func(RelayEvent) func (h WithEventMiddleware) ApplyPoolOption(pool *SimplePool) { pool.eventMiddleware = append(pool.eventMiddleware, h) @@ -173,19 +173,19 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) { // SubMany opens a subscription with the given filters to multiple relays // the subscriptions only end when the context is canceled -func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { +func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan RelayEvent { return pool.subMany(ctx, urls, filters, true) } // SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays -func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { +func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters Filters) chan RelayEvent { return pool.subMany(ctx, urls, filters, false) } -func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent { +func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan RelayEvent { ctx, cancel := context.WithCancel(ctx) _ = cancel // do this so `go vet` will stop complaining - events := make(chan IncomingEvent) + events := make(chan RelayEvent) seenAlready := xsync.NewMapOf[string, Timestamp]() ticker := time.NewTicker(seenAlreadyDropTick) @@ -255,7 +255,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt goto reconnect } - ie := IncomingEvent{Event: evt, Relay: relay} + ie := RelayEvent{Event: evt, Relay: relay} for _, mh := range pool.eventMiddleware { mh(ie) } @@ -284,7 +284,10 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt case reason := <-sub.ClosedReason: if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed { // relay is requesting auth. if we can we will perform auth and try again - if err := relay.Auth(ctx, pool.authHandler); err == nil { + err := relay.Auth(ctx, func(event *Event) error { + return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay}) + }) + if err == nil { hasAuthed = true // so we don't keep doing AUTH again and again goto subscribe } @@ -310,19 +313,19 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt } // SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE -func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { +func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan RelayEvent { return pool.subManyEose(ctx, urls, filters, true) } // SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays -func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { +func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters Filters) chan RelayEvent { return pool.subManyEose(ctx, urls, filters, false) } -func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent { +func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters Filters, unique bool) chan RelayEvent { ctx, cancel := context.WithCancel(ctx) - events := make(chan IncomingEvent) + events := make(chan RelayEvent) seenAlready := xsync.NewMapOf[string, bool]() wg := sync.WaitGroup{} wg.Add(len(urls)) @@ -361,7 +364,9 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters case reason := <-sub.ClosedReason: if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed { // relay is requesting auth. if we can we will perform auth and try again - err := relay.Auth(ctx, pool.authHandler) + err := relay.Auth(ctx, func(event *Event) error { + return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay}) + }) if err == nil { hasAuthed = true // so we don't keep doing AUTH again and again goto subscribe @@ -374,7 +379,7 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters return } - ie := IncomingEvent{Event: evt, Relay: relay} + ie := RelayEvent{Event: evt, Relay: relay} for _, mh := range pool.eventMiddleware { mh(ie) } @@ -399,7 +404,7 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters } // QuerySingle returns the first event returned by the first relay, cancels everything else. -func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *IncomingEvent { +func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *RelayEvent { ctx, cancel := context.WithCancel(ctx) defer cancel() for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}) { @@ -411,9 +416,9 @@ func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter F func (pool *SimplePool) batchedSubMany( ctx context.Context, dfs []DirectedFilters, - subFn func(context.Context, []string, Filters, bool) chan IncomingEvent, -) chan IncomingEvent { - res := make(chan IncomingEvent) + subFn func(context.Context, []string, Filters, bool) chan RelayEvent, +) chan RelayEvent { + res := make(chan RelayEvent) for _, df := range dfs { go func(df DirectedFilters) { @@ -427,11 +432,11 @@ func (pool *SimplePool) batchedSubMany( } // BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same. -func (pool *SimplePool) BatchedSubMany(ctx context.Context, dfs []DirectedFilters) chan IncomingEvent { +func (pool *SimplePool) BatchedSubMany(ctx context.Context, dfs []DirectedFilters) chan RelayEvent { return pool.batchedSubMany(ctx, dfs, pool.subMany) } // BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays. -func (pool *SimplePool) BatchedSubManyEose(ctx context.Context, dfs []DirectedFilters) chan IncomingEvent { +func (pool *SimplePool) BatchedSubManyEose(ctx context.Context, dfs []DirectedFilters) chan RelayEvent { return pool.batchedSubMany(ctx, dfs, pool.subManyEose) } diff --git a/sdk/system.go b/sdk/system.go index 8bee234..3ab7fdf 100644 --- a/sdk/system.go +++ b/sdk/system.go @@ -70,7 +70,7 @@ func NewSystem(mods ...SystemModifier) *System { } sys.Pool = nostr.NewSimplePool(context.Background(), - nostr.WithEventMiddleware(sys.trackEventHints), + nostr.WithEventMiddleware(sys.TrackEventHints), nostr.WithPenaltyBox(), ) diff --git a/sdk/tracker.go b/sdk/tracker.go index 71868cc..90cf6a9 100644 --- a/sdk/tracker.go +++ b/sdk/tracker.go @@ -7,7 +7,7 @@ import ( "github.com/nbd-wtf/go-nostr/sdk/hints" ) -func (sys *System) trackEventHints(ie nostr.IncomingEvent) { +func (sys *System) TrackEventHints(ie nostr.RelayEvent) { if IsVirtualRelay(ie.Relay.URL) { return }