Skip to content

Commit

Permalink
improve SimplePool authHandler, rename IncomingEvent to RelayEvent so…
Browse files Browse the repository at this point in the history
… we can use it there.
  • Loading branch information
fiatjaf committed Sep 19, 2024
1 parent c07528e commit 8327310
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 26 deletions.
53 changes: 29 additions & 24 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type SimplePool struct {
Relays *xsync.MapOf[string, *Relay]
Context context.Context

authHandler func(*Event) error
authHandler func(context.Context, RelayEvent) error
cancel context.CancelFunc

eventMiddleware []func(IncomingEvent)
eventMiddleware []func(RelayEvent)

// custom things not often used
signatureChecker func(Event) bool
Expand All @@ -37,12 +37,12 @@ type DirectedFilters struct {
Relay string
}

type IncomingEvent struct {
type RelayEvent struct {
*Event
Relay *Relay
}

func (ie IncomingEvent) String() string {
func (ie RelayEvent) String() string {
return fmt.Sprintf("[%s] >> %s", ie.Relay.URL, ie.Event)
}

Expand Down Expand Up @@ -70,7 +70,7 @@ func NewSimplePool(ctx context.Context, opts ...PoolOption) *SimplePool {
// WithAuthHandler must be a function that signs the auth event when called.
// it will be called whenever any relay in the pool returns a `CLOSED` message
// with the "auth-required:" prefix, only once for each relay
type WithAuthHandler func(authEvent *Event) error
type WithAuthHandler func(ctx context.Context, authEvent RelayEvent) error

func (h WithAuthHandler) ApplyPoolOption(pool *SimplePool) {
pool.authHandler = h
Expand Down Expand Up @@ -114,7 +114,7 @@ func (h withPenaltyBoxOpt) ApplyPoolOption(pool *SimplePool) {

// WithEventMiddleware is a function that will be called with all events received.
// more than one can be passed at a time.
type WithEventMiddleware func(IncomingEvent)
type WithEventMiddleware func(RelayEvent)

func (h WithEventMiddleware) ApplyPoolOption(pool *SimplePool) {
pool.eventMiddleware = append(pool.eventMiddleware, h)
Expand Down Expand Up @@ -173,19 +173,19 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {

// SubMany opens a subscription with the given filters to multiple relays
// the subscriptions only end when the context is canceled
func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan IncomingEvent {
func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan RelayEvent {
return pool.subMany(ctx, urls, filters, true)
}

// SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays
func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters Filters) chan IncomingEvent {
func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters Filters) chan RelayEvent {
return pool.subMany(ctx, urls, filters, false)
}

func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent {
func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan RelayEvent {
ctx, cancel := context.WithCancel(ctx)
_ = cancel // do this so `go vet` will stop complaining
events := make(chan IncomingEvent)
events := make(chan RelayEvent)
seenAlready := xsync.NewMapOf[string, Timestamp]()
ticker := time.NewTicker(seenAlreadyDropTick)

Expand Down Expand Up @@ -255,7 +255,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
goto reconnect
}

ie := IncomingEvent{Event: evt, Relay: relay}
ie := RelayEvent{Event: evt, Relay: relay}
for _, mh := range pool.eventMiddleware {
mh(ie)
}
Expand Down Expand Up @@ -284,7 +284,10 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
case reason := <-sub.ClosedReason:
if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
// relay is requesting auth. if we can we will perform auth and try again
if err := relay.Auth(ctx, pool.authHandler); err == nil {
err := relay.Auth(ctx, func(event *Event) error {
return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
})
if err == nil {
hasAuthed = true // so we don't keep doing AUTH again and again
goto subscribe
}
Expand All @@ -310,19 +313,19 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
}

// SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE
func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan IncomingEvent {
func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan RelayEvent {
return pool.subManyEose(ctx, urls, filters, true)
}

// SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays
func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters Filters) chan IncomingEvent {
func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters Filters) chan RelayEvent {
return pool.subManyEose(ctx, urls, filters, false)
}

func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent {
func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters Filters, unique bool) chan RelayEvent {
ctx, cancel := context.WithCancel(ctx)

events := make(chan IncomingEvent)
events := make(chan RelayEvent)
seenAlready := xsync.NewMapOf[string, bool]()
wg := sync.WaitGroup{}
wg.Add(len(urls))
Expand Down Expand Up @@ -361,7 +364,9 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters
case reason := <-sub.ClosedReason:
if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
// relay is requesting auth. if we can we will perform auth and try again
err := relay.Auth(ctx, pool.authHandler)
err := relay.Auth(ctx, func(event *Event) error {
return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
})
if err == nil {
hasAuthed = true // so we don't keep doing AUTH again and again
goto subscribe
Expand All @@ -374,7 +379,7 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters
return
}

ie := IncomingEvent{Event: evt, Relay: relay}
ie := RelayEvent{Event: evt, Relay: relay}
for _, mh := range pool.eventMiddleware {
mh(ie)
}
Expand All @@ -399,7 +404,7 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters
}

// QuerySingle returns the first event returned by the first relay, cancels everything else.
func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *IncomingEvent {
func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *RelayEvent {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}) {
Expand All @@ -411,9 +416,9 @@ func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter F
func (pool *SimplePool) batchedSubMany(
ctx context.Context,
dfs []DirectedFilters,
subFn func(context.Context, []string, Filters, bool) chan IncomingEvent,
) chan IncomingEvent {
res := make(chan IncomingEvent)
subFn func(context.Context, []string, Filters, bool) chan RelayEvent,
) chan RelayEvent {
res := make(chan RelayEvent)

for _, df := range dfs {
go func(df DirectedFilters) {
Expand All @@ -427,11 +432,11 @@ func (pool *SimplePool) batchedSubMany(
}

// BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same.
func (pool *SimplePool) BatchedSubMany(ctx context.Context, dfs []DirectedFilters) chan IncomingEvent {
func (pool *SimplePool) BatchedSubMany(ctx context.Context, dfs []DirectedFilters) chan RelayEvent {
return pool.batchedSubMany(ctx, dfs, pool.subMany)
}

// BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays.
func (pool *SimplePool) BatchedSubManyEose(ctx context.Context, dfs []DirectedFilters) chan IncomingEvent {
func (pool *SimplePool) BatchedSubManyEose(ctx context.Context, dfs []DirectedFilters) chan RelayEvent {
return pool.batchedSubMany(ctx, dfs, pool.subManyEose)
}
2 changes: 1 addition & 1 deletion sdk/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewSystem(mods ...SystemModifier) *System {
}

sys.Pool = nostr.NewSimplePool(context.Background(),
nostr.WithEventMiddleware(sys.trackEventHints),
nostr.WithEventMiddleware(sys.TrackEventHints),
nostr.WithPenaltyBox(),
)

Expand Down
2 changes: 1 addition & 1 deletion sdk/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/nbd-wtf/go-nostr/sdk/hints"
)

func (sys *System) trackEventHints(ie nostr.IncomingEvent) {
func (sys *System) TrackEventHints(ie nostr.RelayEvent) {
if IsVirtualRelay(ie.Relay.URL) {
return
}
Expand Down

0 comments on commit 8327310

Please sign in to comment.