Skip to content

Commit

Permalink
Add context for valuer interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Sep 23, 2024
1 parent 3627d33 commit 9a01159
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
4 changes: 4 additions & 0 deletions cmd/jetstream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes []
collection = e.Commit.Collection
}

// Wrap the valuer functions for more lightweight event filtering
getJSONEvent := func() []byte { return asJSON }
getCompressedEvent := func() []byte { return compBytes }

Expand Down Expand Up @@ -125,6 +126,9 @@ func (s *Server) Emit(ctx context.Context, e *models.Event, asJSON, compBytes []
return nil
}

// emitToSubscriber sends an event to a subscriber if the subscriber wants the event
// It takes a valuer function to get the event bytes so that the caller can avoid
// unnecessary allocations and/or reading from the playback DB if the subscriber doesn't want the event
func emitToSubscriber(ctx context.Context, log *slog.Logger, sub *Subscriber, timeUS int64, did, collection string, playback bool, getEventBytes func() []byte) error {
if !sub.WantsCollection(collection) {
return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/consumer/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ func (c *Consumer) ReplayEvents(ctx context.Context, compressed bool, cursor int
collection = parts[2]
}

// Emit the event
// Emit the event with the valuer function so the subscriber can decide if it wants to filter it out
// without having to read the entire event from the database
err = emit(ctx, timeUS, parts[1], collection, iter.Value)
if err != nil {
log.Error("failed to emit event", "error", err)
Expand Down

0 comments on commit 9a01159

Please sign in to comment.