Skip to content

Commit

Permalink
Refactored API
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnes committed Jun 21, 2024
1 parent b7fb09e commit fd4dae3
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 52 deletions.
14 changes: 6 additions & 8 deletions examples/riverkfq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,20 @@ func main() {
}

pool := createPool()

pq, err := riverkfq.NewPublishQueue(pool)
if err != nil {
panic(err)
}

producer := createProducer()

wq, err := riverkfq.NewRiverQueue(pool, producer)
pq, err := riverkfq.NewPublishQueue(
riverkfq.Pool(pool),
riverkfq.WithProducer(producer),
riverkfq.MaxWorkers(2),
)
if err != nil {
panic(err)
}

ctx, cancel := context.WithCancel(context.Background())

go wq.Run(ctx)
go pq.Run(ctx)

messages := generateMessages(topic, 10)

Expand Down
3 changes: 3 additions & 0 deletions kfq/riverkfq/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package riverkfq provides River based queues that offer
// Kafka publishing guarantees.
package riverkfq
56 changes: 56 additions & 0 deletions kfq/riverkfq/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package riverkfq

import (
"context"

"github.com/gojekfarm/xtools/xkafka"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
)

// Option defines interface for configuring riverkfq
type Option interface {
apply(*PublishQueue)
}

type optionFunc func(*PublishQueue)

func (f optionFunc) apply(pq *PublishQueue) {
f(pq)
}

// Pool sets the database connection pool.
func Pool(p *pgxpool.Pool) Option {
return optionFunc(func(pq *PublishQueue) {
pq.pool = p
})
}

// Producer is a Kafka producer
type Producer interface {
Publish(ctx context.Context, msg *xkafka.Message) error
}

// WithProducer sets the Kafka producer.
func WithProducer(p Producer) Option {
return optionFunc(func(pq *PublishQueue) {
pq.producer = p
})
}

// WithClient sets the river.Client.
// Useful for integrating with existing river queues.
func WithClient(c *river.Client[pgx.Tx]) Option {
return optionFunc(func(pq *PublishQueue) {
pq.queue = c
})
}

// MaxWorkers sets the maximum number of workers.
// Default is 1.
type MaxWorkers int

func (m MaxWorkers) apply(pq *PublishQueue) {
pq.maxWorkers = int(m)
}
82 changes: 38 additions & 44 deletions kfq/riverkfq/river.go → kfq/riverkfq/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,56 +10,47 @@ import (
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

// Producer is a Kafka producer
type Producer interface {
Publish(ctx context.Context, msg *xkafka.Message) error
}
// PublishQueue is a thin wrapper around river.Client
// that enqueues messages are asynchronously published to Kafka.
type PublishQueue struct {
queue *river.Client[pgx.Tx]
pool *pgxpool.Pool
producer Producer

// RiverQueue guarantees that messages are published
// to Kafka.
type RiverQueue struct {
queue *river.Client[pgx.Tx]
// config
maxWorkers int
}

// NewPublishQueue creates a new RiverQueue which can only
// queue messages.
func NewPublishQueue(pool *pgxpool.Pool) (*RiverQueue, error) {
client, err := river.NewClient(
riverpgxv5.New(pool),
&river.Config{},
)
if err != nil {
return nil, err
// NewPublishQueue creates a new PublishQueue.
func NewPublishQueue(opts ...Option) (*PublishQueue, error) {
pq := &PublishQueue{}

for _, opt := range opts {
opt.apply(pq)
}

return &RiverQueue{
queue: client,
}, nil
}
if pq.queue != nil {
workers := river.NewWorkers()
river.AddWorker(workers, NewPublishWorker(pq.producer))

client, err := river.NewClient(riverpgxv5.New(pq.pool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: pq.maxWorkers},
},
Workers: workers,
})
if err != nil {
return nil, err
}

// NewRiverQueue creates a new RiverQueue which can publish
// messages to Kafka.
func NewRiverQueue(pool *pgxpool.Pool, producer Producer) (*RiverQueue, error) {
workers := river.NewWorkers()
river.AddWorker(workers, &PublishWorker{producer: producer})

client, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 3},
},
Workers: workers,
})
if err != nil {
return nil, err
pq.queue = client
}

return &RiverQueue{
queue: client,
}, nil
return pq, nil
}

// Add enqueues one or more messages.
func (q *RiverQueue) Add(ctx context.Context, msgs ...*xkafka.Message) error {
func (q *PublishQueue) Add(ctx context.Context, msgs ...*xkafka.Message) error {
args := make([]river.InsertManyParams, len(msgs))

for i, msg := range msgs {
Expand All @@ -74,7 +65,7 @@ func (q *RiverQueue) Add(ctx context.Context, msgs ...*xkafka.Message) error {
}

// AddTx enqueues one or more messages in a transaction.
func (q *RiverQueue) AddTx(ctx context.Context, tx pgx.Tx, msgs ...*xkafka.Message) error {
func (q *PublishQueue) AddTx(ctx context.Context, tx pgx.Tx, msgs ...*xkafka.Message) error {
args := make([]river.InsertManyParams, len(msgs))

for i, msg := range msgs {
Expand All @@ -89,7 +80,7 @@ func (q *RiverQueue) AddTx(ctx context.Context, tx pgx.Tx, msgs ...*xkafka.Messa
}

// Run starts the queue and begins publishing messages to Kafka.
func (q *RiverQueue) Run(ctx context.Context) error {
func (q *PublishQueue) Run(ctx context.Context) error {
if err := q.queue.Start(ctx); err != nil {
return err
}
Expand All @@ -103,25 +94,28 @@ func (q *RiverQueue) Run(ctx context.Context) error {
return nil
}

// PublishArgs is the job payload for publishing messages to Kafka.
type PublishArgs struct {
Msg *xkafka.Message `json:"msg"`
}

// Kind returns the kind of the job.
func (args PublishArgs) Kind() string {
return "publish_xkafka_message"
}

// PublishWorker is a worker that publishes messages to Kafka.
type PublishWorker struct {
river.WorkerDefaults[PublishArgs]
producer Producer
}

// NewPublishWorker creates a new PublishWorker.
func NewPublishWorker(producer Producer) *PublishWorker {
return &PublishWorker{
producer: producer,
}
return &PublishWorker{producer: producer}
}

// Work executes the job.
func (w *PublishWorker) Work(ctx context.Context, job *river.Job[PublishArgs]) error {
return w.producer.Publish(ctx, job.Args.Msg)
}
File renamed without changes.

0 comments on commit fd4dae3

Please sign in to comment.