From d9dc28c0ab776d31fef287c5494f8e0433f34fe6 Mon Sep 17 00:00:00 2001 From: Ravi Atluri Date: Mon, 15 Jul 2024 15:47:14 +0530 Subject: [PATCH] Add Batch Consumer --- xkafka/batch.go | 106 ++++++++++++++ xkafka/batch_consumer.go | 290 +++++++++++++++++++++++++++++++++++++ xkafka/consumer.go | 2 +- xkafka/consumer_options.go | 18 +++ xkafka/generate.go | 1 + xkafka/handler.go | 42 +++++- xkafka/producer.go | 2 +- 7 files changed, 451 insertions(+), 10 deletions(-) create mode 100644 xkafka/batch.go create mode 100644 xkafka/batch_consumer.go diff --git a/xkafka/batch.go b/xkafka/batch.go new file mode 100644 index 0000000..d292499 --- /dev/null +++ b/xkafka/batch.go @@ -0,0 +1,106 @@ +package xkafka + +import ( + "sync" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/rs/xid" +) + +// Batch is a group of messages that are processed together. +type Batch struct { + ID string + Messages []*Message + Status Status + + err error + lock sync.Mutex +} + +// NewBatch creates a new Batch. +func NewBatch() *Batch { + return &Batch{ + ID: xid.New().String(), + Messages: make([]*Message, 0), + } +} + +// AckSuccess marks the batch as successfully processed. +func (b *Batch) AckSuccess() { + b.lock.Lock() + defer b.lock.Unlock() + + b.Status = Success +} + +// AckFail marks the batch as failed to process. +func (b *Batch) AckFail(err error) { + b.lock.Lock() + defer b.lock.Unlock() + + b.Status = Fail + b.err = err +} + +// AckSkip marks the batch as skipped. +func (b *Batch) AckSkip() { + b.lock.Lock() + defer b.lock.Unlock() + + b.Status = Skip +} + +// Err returns the error that caused the batch to fail. +func (b *Batch) Err() error { + b.lock.Lock() + defer b.lock.Unlock() + + return b.err +} + +// MaxOffset returns the maximum offset among the +// messages in the batch. +func (b *Batch) MaxOffset() int64 { + b.lock.Lock() + defer b.lock.Unlock() + + var max int64 + for _, m := range b.Messages { + if m.Offset > max { + max = m.Offset + } + } + + return max +} + +// GroupMaxOffset returns the maximum offset for each +// topic-partition in the batch. +func (b *Batch) GroupMaxOffset() []kafka.TopicPartition { + b.lock.Lock() + defer b.lock.Unlock() + + offsets := make(map[string]map[int32]int64) + for _, m := range b.Messages { + if _, ok := offsets[m.Topic]; !ok { + offsets[m.Topic] = make(map[int32]int64) + } + + if m.Offset > offsets[m.Topic][m.Partition] { + offsets[m.Topic][m.Partition] = m.Offset + } + } + + var tps []kafka.TopicPartition + for topic, partitions := range offsets { + for partition, offset := range partitions { + tps = append(tps, kafka.TopicPartition{ + Topic: &topic, + Partition: partition, + Offset: kafka.Offset(offset + 1), + }) + } + } + + return tps +} diff --git a/xkafka/batch_consumer.go b/xkafka/batch_consumer.go new file mode 100644 index 0000000..815c7d9 --- /dev/null +++ b/xkafka/batch_consumer.go @@ -0,0 +1,290 @@ +package xkafka + +import ( + "context" + "errors" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/sourcegraph/conc/pool" + "github.com/sourcegraph/conc/stream" +) + +// BatchConsumer manages consumption & processing of messages +// from kafka topics in batches. +type BatchConsumer struct { + name string + kafka consumerClient + handler BatchHandler + middlewares []BatchMiddlewarer + config *consumerConfig + batch *BatchManager + cancelCtx atomic.Pointer[context.CancelFunc] +} + +// NewBatchConsumer creates a new BatchConsumer instance. +func NewBatchConsumer(name string, handler BatchHandler, opts ...ConsumerOption) (*BatchConsumer, error) { + cfg, err := newConsumerConfig(opts...) + if err != nil { + return nil, err + } + + // override kafka configs + _ = cfg.configMap.SetKey("enable.auto.offset.store", false) + _ = cfg.configMap.SetKey("bootstrap.servers", strings.Join(cfg.brokers, ",")) + _ = cfg.configMap.SetKey("group.id", name) + + if cfg.manualCommit { + _ = cfg.configMap.SetKey("enable.auto.commit", false) + } + + consumer, err := cfg.consumerFn(&cfg.configMap) + if err != nil { + return nil, err + } + + return &BatchConsumer{ + name: name, + config: cfg, + kafka: consumer, + handler: handler, + batch: NewBatchManager(cfg.batchSize, cfg.batchTimeout), + }, nil +} + +// GetMetadata returns the metadata for the consumer. +func (c *BatchConsumer) GetMetadata() (*Metadata, error) { + return c.kafka.GetMetadata(nil, false, int(c.config.metadataTimeout.Milliseconds())) +} + +// Use appends a BatchMiddleware to the chain. +func (c *BatchConsumer) Use(mws ...BatchMiddlewarer) { + c.middlewares = append(c.middlewares, mws...) +} + +// Run starts the consumer and blocks until context is cancelled. +func (c *BatchConsumer) Run(ctx context.Context) error { + if err := c.start(ctx); err != nil { + return err + } + + return nil +} + +func (c *BatchConsumer) start(ctx context.Context) error { + c.handler = c.concatMiddlewares(c.handler) + + pool := pool.New(). + WithContext(ctx). + WithMaxGoroutines(2). + WithCancelOnError() + + pool.Go(func(ctx context.Context) error { + if c.config.concurrency > 1 { + return c.processAsync(ctx) + } + + return c.process(ctx) + }) + + pool.Go(func(ctx context.Context) error { + return c.consume(ctx) + }) + + return pool.Wait() +} + +func (c *BatchConsumer) process(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case batch := <-c.batch.Receive(): + err := c.handler.HandleBatch(ctx, batch) + if ferr := c.config.errorHandler(err); ferr != nil { + return ferr + } + + err = c.saveOffset(batch) + if err != nil { + return err + } + } + } +} + +func (c *BatchConsumer) processAsync(ctx context.Context) error { + st := stream.New().WithMaxGoroutines(c.config.concurrency) + ctx, cancel := context.WithCancelCause(ctx) + + for { + select { + case <-ctx.Done(): + st.Wait() + + return context.Cause(ctx) + case batch := <-c.batch.Receive(): + st.Go(func() stream.Callback { + err := c.handler.HandleBatch(ctx, batch) + if ferr := c.config.errorHandler(err); ferr != nil { + cancel(ferr) + + return func() {} + } + + return func() { + if err := c.saveOffset(batch); err != nil { + cancel(err) + } + } + }) + } + } +} + +func (c *BatchConsumer) consume(ctx context.Context) (err error) { + if err := c.subscribe(); err != nil { + return err + } + + defer func() { + if uerr := c.unsubscribe(); uerr != nil { + err = errors.Join(err, uerr) + } + }() + + for { + select { + case <-ctx.Done(): + return err + default: + km, err := c.kafka.ReadMessage(c.config.pollTimeout) + if err != nil { + var kerr kafka.Error + if errors.As(err, &kerr) && kerr.Code() == kafka.ErrTimedOut { + continue + } + + if ferr := c.config.errorHandler(err); ferr != nil { + err = ferr + + return err + } + + continue + } + + msg := newMessage(c.name, km) + + c.batch.Add(msg) + } + } +} + +func (c *BatchConsumer) subscribe() error { + return c.kafka.SubscribeTopics(c.config.topics, nil) +} + +func (c *BatchConsumer) unsubscribe() error { + return c.kafka.Unsubscribe() +} + +func (c *BatchConsumer) concatMiddlewares(handler BatchHandler) BatchHandler { + for i := len(c.middlewares) - 1; i >= 0; i-- { + handler = c.middlewares[i].BatchMiddleware(handler) + } + + return handler +} + +func (c *BatchConsumer) saveOffset(batch *Batch) error { + if batch.Status == Fail { + return nil + } + + offsets := batch.GroupMaxOffset() + + _, err := c.kafka.StoreOffsets(offsets) + if err != nil { + return err + } + + if c.config.manualCommit { + if _, err := c.kafka.Commit(); err != nil { + return err + } + } + + return nil +} + +// BatchManager manages aggregation and processing of Message batches. +type BatchManager struct { + size int + timeout time.Duration + batch *Batch + mutex *sync.RWMutex + flushChan chan *Batch +} + +// NewBatchManager creates a new BatchManager. +func NewBatchManager(size int, timeout time.Duration) *BatchManager { + b := &BatchManager{ + size: size, + timeout: timeout, + mutex: &sync.RWMutex{}, + batch: &Batch{ + Messages: make([]*Message, 0, size), + }, + flushChan: make(chan *Batch), + } + + go b.runFlushByTime() + + return b +} + +// Add adds to batch and flushes when MaxSize is reached. +func (b *BatchManager) Add(m *Message) { + b.mutex.Lock() + b.batch.Messages = append(b.batch.Messages, m) + + if len(b.batch.Messages) >= b.size { + b.flush() + } + + b.mutex.Unlock() +} + +// Receive returns a channel to read batched Messages. +func (b *BatchManager) Receive() <-chan *Batch { + return b.flushChan +} + +func (b *BatchManager) runFlushByTime() { + t := time.NewTicker(b.timeout) + + for range t.C { + b.mutex.Lock() + b.flush() + b.mutex.Unlock() + } +} + +// flush sends the batch to the flush channel and resets the batch. +// DESIGN: flush does NOT acquire a mutex lock. Locks should be managed by caller. +// nolint:gosimple +func (b *BatchManager) flush() { + if len(b.batch.Messages) == 0 { + return + } + + b.flushChan <- b.batch + + b.batch = &Batch{ + Messages: make([]*Message, 0, b.size), + } +} diff --git a/xkafka/consumer.go b/xkafka/consumer.go index 2ba0c9d..1506751 100644 --- a/xkafka/consumer.go +++ b/xkafka/consumer.go @@ -17,7 +17,7 @@ type Consumer struct { name string kafka consumerClient handler Handler - middlewares []middleware + middlewares []Middlewarer config *consumerConfig cancelCtx atomic.Pointer[context.CancelFunc] } diff --git a/xkafka/consumer_options.go b/xkafka/consumer_options.go index b3fd49d..e3b3e97 100644 --- a/xkafka/consumer_options.go +++ b/xkafka/consumer_options.go @@ -21,6 +21,10 @@ type consumerConfig struct { pollTimeout time.Duration concurrency int manualCommit bool + + // batch options + batchSize int + batchTimeout time.Duration } func newConsumerConfig(opts ...ConsumerOption) (*consumerConfig, error) { @@ -100,3 +104,17 @@ type ManualCommit bool func (mc ManualCommit) setConsumerConfig(o *consumerConfig) { o.manualCommit = bool(mc) } + +// BatchSize defines the maximum number of messages in a batch. +type BatchSize int + +func (bs BatchSize) setConsumerConfig(o *consumerConfig) { + o.batchSize = int(bs) +} + +// BatchTimeout defines the maximum time to wait for a batch to be filled. +type BatchTimeout time.Duration + +func (bt BatchTimeout) setConsumerConfig(o *consumerConfig) { + o.batchTimeout = time.Duration(bt) +} diff --git a/xkafka/generate.go b/xkafka/generate.go index 3f87faf..e119be1 100644 --- a/xkafka/generate.go +++ b/xkafka/generate.go @@ -2,3 +2,4 @@ //go:generate mockery --name producerClient --structname MockProducerClient --filename producer_mock_test.go --outpkg xkafka --output . package xkafka + diff --git a/xkafka/handler.go b/xkafka/handler.go index 6f2bd2b..e13c6bf 100644 --- a/xkafka/handler.go +++ b/xkafka/handler.go @@ -4,12 +4,12 @@ import ( "context" ) -// Handler responds to a Message from a Kafka topic. +// Handler defines a message handler. type Handler interface { Handle(ctx context.Context, m *Message) error } -// HandlerFunc defines signature of a message handler function. +// HandlerFunc defines a function for handling messages. type HandlerFunc func(ctx context.Context, m *Message) error // Handle implements Handler interface on HandlerFunc. @@ -17,15 +17,41 @@ func (h HandlerFunc) Handle(ctx context.Context, m *Message) error { return h(ctx, m) } -// MiddlewareFunc functions are closures that intercept Messages. -type MiddlewareFunc func(Handler) Handler - -// middleware interface is anything which implements a MiddlewareFunc named Middleware. -type middleware interface { +// Middlewarer is an interface for message handler middleware. +type Middlewarer interface { Middleware(handler Handler) Handler } -// Middleware allows MiddlewareFunc to implement the middleware interface. +// MiddlewareFunc defines a function for middleware. +type MiddlewareFunc func(Handler) Handler + +// Middleware implements Middlewarer interface. func (mw MiddlewareFunc) Middleware(handler Handler) Handler { return mw(handler) } + +// BatchHandler defines a handler for a batch of messages. +type BatchHandler interface { + HandleBatch(ctx context.Context, b *Batch) error +} + +// BatchHandlerFunc defines a function for handling a batch. +type BatchHandlerFunc func(ctx context.Context, b *Batch) error + +// HandleBatch implements BatchHandler interface. +func (h BatchHandlerFunc) HandleBatch(ctx context.Context, b *Batch) error { + return h(ctx, b) +} + +// BatchMiddlewarer is an interface for batch message handler middleware. +type BatchMiddlewarer interface { + BatchMiddleware(handler BatchHandler) BatchHandler +} + +// BatchMiddlewareFunc defines a function for batch middleware. +type BatchMiddlewareFunc func(BatchHandler) BatchHandler + +// BatchMiddleware implements BatchMiddlewarer interface. +func (mw BatchMiddlewareFunc) BatchMiddleware(handler BatchHandler) BatchHandler { + return mw(handler) +} diff --git a/xkafka/producer.go b/xkafka/producer.go index 0b8c837..3b70a69 100644 --- a/xkafka/producer.go +++ b/xkafka/producer.go @@ -15,7 +15,7 @@ type Producer struct { config *producerConfig kafka producerClient events chan kafka.Event - middlewares []middleware + middlewares []Middlewarer wrappedPublish Handler wrappedAsyncPublish Handler }