Skip to content

Commit

Permalink
Non-blocking subscriber teardown
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Oct 6, 2024
1 parent 9d5cb2c commit 2d7c074
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes []
defer sub.lk.Unlock()

// Don't emit events to subscribers that are replaying and are too far behind
if sub.cursor != nil && sub.seq < e.TimeUS-cutoverThresholdUS {
if sub.cursor != nil && sub.seq < e.TimeUS-cutoverThresholdUS || sub.tearingDown {
return
}

Expand Down
32 changes: 19 additions & 13 deletions pkg/server/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ type WantedCollections struct {
}

type Subscriber struct {
ws *websocket.Conn
conLk sync.Mutex
realIP string
lk sync.Mutex
seq int64
outbox chan *[]byte
hello chan struct{}
id int64
ws *websocket.Conn
conLk sync.Mutex
realIP string
lk sync.Mutex
seq int64
outbox chan *[]byte
hello chan struct{}
id int64
tearingDown bool

// Subscriber options

Expand Down Expand Up @@ -99,11 +100,16 @@ func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, ti
default:
// Drop slow subscribers if they're live tailing and fall too far behind
log.Error("failed to send event to subscriber, dropping", "error", "buffer full", "subscriber", sub.id)
sub.Terminate("consuming too slowly")
err := sub.ws.Close()
if err != nil {
log.Error("failed to close subscriber connection", "error", err)
}

// Tearing down a subscriber can block, so do it in a goroutine
go func() {
sub.tearingDown = true
sub.Terminate("consuming too slowly")
err := sub.ws.Close()
if err != nil {
log.Error("failed to close subscriber connection", "error", err)
}
}()
}
}

Expand Down

0 comments on commit 2d7c074

Please sign in to comment.