diff --git a/examples/riverkfq/main.go b/examples/riverkfq/main.go index 50eec76..eb03c02 100644 --- a/examples/riverkfq/main.go +++ b/examples/riverkfq/main.go @@ -38,22 +38,20 @@ func main() { } pool := createPool() - - pq, err := riverkfq.NewPublishQueue(pool) - if err != nil { - panic(err) - } - producer := createProducer() - wq, err := riverkfq.NewRiverQueue(pool, producer) + pq, err := riverkfq.NewPublishQueue( + riverkfq.Pool(pool), + riverkfq.WithProducer(producer), + riverkfq.MaxWorkers(2), + ) if err != nil { panic(err) } ctx, cancel := context.WithCancel(context.Background()) - go wq.Run(ctx) + go pq.Run(ctx) messages := generateMessages(topic, 10) diff --git a/kfq/riverkfq/doc.go b/kfq/riverkfq/doc.go new file mode 100644 index 0000000..a4e729e --- /dev/null +++ b/kfq/riverkfq/doc.go @@ -0,0 +1,3 @@ +// Package riverkfq provides River based queues that offer +// Kafka publishing guarantees. +package riverkfq diff --git a/kfq/riverkfq/options.go b/kfq/riverkfq/options.go new file mode 100644 index 0000000..9e685d9 --- /dev/null +++ b/kfq/riverkfq/options.go @@ -0,0 +1,56 @@ +package riverkfq + +import ( + "context" + + "github.com/gojekfarm/xtools/xkafka" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/riverqueue/river" +) + +// Option defines interface for configuring riverkfq +type Option interface { + apply(*PublishQueue) +} + +type optionFunc func(*PublishQueue) + +func (f optionFunc) apply(pq *PublishQueue) { + f(pq) +} + +// Pool sets the database connection pool. +func Pool(p *pgxpool.Pool) Option { + return optionFunc(func(pq *PublishQueue) { + pq.pool = p + }) +} + +// Producer is a Kafka producer +type Producer interface { + Publish(ctx context.Context, msg *xkafka.Message) error +} + +// WithProducer sets the Kafka producer. +func WithProducer(p Producer) Option { + return optionFunc(func(pq *PublishQueue) { + pq.producer = p + }) +} + +// WithClient sets the river.Client. +// Useful for integrating with existing river queues. +func WithClient(c *river.Client[pgx.Tx]) Option { + return optionFunc(func(pq *PublishQueue) { + pq.queue = c + }) +} + +// MaxWorkers sets the maximum number of workers. +// Default is 1. +type MaxWorkers int + +func (m MaxWorkers) apply(pq *PublishQueue) { + pq.maxWorkers = int(m) +} diff --git a/kfq/riverkfq/river.go b/kfq/riverkfq/publish.go similarity index 51% rename from kfq/riverkfq/river.go rename to kfq/riverkfq/publish.go index 3869da1..0c56df2 100644 --- a/kfq/riverkfq/river.go +++ b/kfq/riverkfq/publish.go @@ -10,56 +10,47 @@ import ( "github.com/riverqueue/river/riverdriver/riverpgxv5" ) -// Producer is a Kafka producer -type Producer interface { - Publish(ctx context.Context, msg *xkafka.Message) error -} +// PublishQueue is a thin wrapper around river.Client +// that enqueues messages are asynchronously published to Kafka. +type PublishQueue struct { + queue *river.Client[pgx.Tx] + pool *pgxpool.Pool + producer Producer -// RiverQueue guarantees that messages are published -// to Kafka. -type RiverQueue struct { - queue *river.Client[pgx.Tx] + // config + maxWorkers int } -// NewPublishQueue creates a new RiverQueue which can only -// queue messages. -func NewPublishQueue(pool *pgxpool.Pool) (*RiverQueue, error) { - client, err := river.NewClient( - riverpgxv5.New(pool), - &river.Config{}, - ) - if err != nil { - return nil, err +// NewPublishQueue creates a new PublishQueue. +func NewPublishQueue(opts ...Option) (*PublishQueue, error) { + pq := &PublishQueue{} + + for _, opt := range opts { + opt.apply(pq) } - return &RiverQueue{ - queue: client, - }, nil -} + if pq.queue != nil { + workers := river.NewWorkers() + river.AddWorker(workers, NewPublishWorker(pq.producer)) + + client, err := river.NewClient(riverpgxv5.New(pq.pool), &river.Config{ + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: pq.maxWorkers}, + }, + Workers: workers, + }) + if err != nil { + return nil, err + } -// NewRiverQueue creates a new RiverQueue which can publish -// messages to Kafka. -func NewRiverQueue(pool *pgxpool.Pool, producer Producer) (*RiverQueue, error) { - workers := river.NewWorkers() - river.AddWorker(workers, &PublishWorker{producer: producer}) - - client, err := river.NewClient(riverpgxv5.New(pool), &river.Config{ - Queues: map[string]river.QueueConfig{ - river.QueueDefault: {MaxWorkers: 3}, - }, - Workers: workers, - }) - if err != nil { - return nil, err + pq.queue = client } - return &RiverQueue{ - queue: client, - }, nil + return pq, nil } // Add enqueues one or more messages. -func (q *RiverQueue) Add(ctx context.Context, msgs ...*xkafka.Message) error { +func (q *PublishQueue) Add(ctx context.Context, msgs ...*xkafka.Message) error { args := make([]river.InsertManyParams, len(msgs)) for i, msg := range msgs { @@ -74,7 +65,7 @@ func (q *RiverQueue) Add(ctx context.Context, msgs ...*xkafka.Message) error { } // AddTx enqueues one or more messages in a transaction. -func (q *RiverQueue) AddTx(ctx context.Context, tx pgx.Tx, msgs ...*xkafka.Message) error { +func (q *PublishQueue) AddTx(ctx context.Context, tx pgx.Tx, msgs ...*xkafka.Message) error { args := make([]river.InsertManyParams, len(msgs)) for i, msg := range msgs { @@ -89,7 +80,7 @@ func (q *RiverQueue) AddTx(ctx context.Context, tx pgx.Tx, msgs ...*xkafka.Messa } // Run starts the queue and begins publishing messages to Kafka. -func (q *RiverQueue) Run(ctx context.Context) error { +func (q *PublishQueue) Run(ctx context.Context) error { if err := q.queue.Start(ctx); err != nil { return err } @@ -103,25 +94,28 @@ func (q *RiverQueue) Run(ctx context.Context) error { return nil } +// PublishArgs is the job payload for publishing messages to Kafka. type PublishArgs struct { Msg *xkafka.Message `json:"msg"` } +// Kind returns the kind of the job. func (args PublishArgs) Kind() string { return "publish_xkafka_message" } +// PublishWorker is a worker that publishes messages to Kafka. type PublishWorker struct { river.WorkerDefaults[PublishArgs] producer Producer } +// NewPublishWorker creates a new PublishWorker. func NewPublishWorker(producer Producer) *PublishWorker { - return &PublishWorker{ - producer: producer, - } + return &PublishWorker{producer: producer} } +// Work executes the job. func (w *PublishWorker) Work(ctx context.Context, job *river.Job[PublishArgs]) error { return w.producer.Publish(ctx, job.Args.Msg) } diff --git a/kfq/riverkfq/river_test.go b/kfq/riverkfq/publish_test.go similarity index 100% rename from kfq/riverkfq/river_test.go rename to kfq/riverkfq/publish_test.go