Skip to content

Commit

Permalink
Kafka publishing queue using River (#47)
Browse files Browse the repository at this point in the history
Thin wrapper for publishing kafka messages using River queue
  • Loading branch information
sonnes authored Jul 9, 2024
1 parent 1446392 commit ef9e1d7
Show file tree
Hide file tree
Showing 17 changed files with 1,136 additions and 20 deletions.
16 changes: 13 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ jobs:
strategy:
matrix:
go-version: [1.18.x, 1.19.x, 1.20.x, 1.21.x, 1.22.x]
services:
postgres:
image: postgres:16
env:
POSTGRES_PASSWORD: postgres
options: >-
--health-cmd pg_isready
--health-interval 2s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- name: Install Go
uses: actions/setup-go@v2
Expand All @@ -23,9 +35,7 @@ jobs:
uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go${{ matrix.go-version }}-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
key: ${{ runner.os }}-go${{ matrix.go-version }}
- name: Tools bin cache
uses: actions/cache@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ gocov-xml:

MOCKERY = $(BIN_DIR)/mockery
mockery:
$(call go-get-tool,$(MOCKERY),github.com/vektra/mockery/v2@v2.20.0)
$(call go-get-tool,$(MOCKERY),github.com/vektra/mockery/v2@v2.43.0)

PROTOC = $(BIN_DIR)/protoc
protoc:
Expand Down
11 changes: 11 additions & 0 deletions examples/riverkfq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Examples of how to use the `riverkfq` package to publish messages to a Kafka topic.

## Running Kafka & Postgres

Start a Kafka broker and a Postgres database using the provided `docker-compose.yml` file:

```bash
$ docker-compose up -d
```

## Scenarios
52 changes: 52 additions & 0 deletions examples/riverkfq/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
version: "3"

services:
xkafka-zoo:
image: confluentinc/cp-zookeeper:7.3.2
hostname: xkafka-zoo
container_name: xkafka-zoo
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: xkafka-zoo:2888:3888

xkafka-1:
image: confluentinc/cp-kafka:7.3.2
hostname: xkafka-1
container_name: xkafka-1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://xkafka-1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "xkafka-zoo:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
# For testing small segments 16MB and retention of 128MB
KAFKA_LOG_SEGMENT_BYTES: 16777216
KAFKA_LOG_RETENTION_BYTES: 134217728
AUTO_CREATE_TOPICS: true
depends_on:
- xkafka-zoo
postgres:
image: postgres:16.0-alpine
restart: always
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
39 changes: 39 additions & 0 deletions examples/riverkfq/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
module github.com/gojekfarm/xtools/examples/riverkfq

go 1.21.4

toolchain go1.22.2

replace (
github.com/gojekfarm/xtools => ../../
github.com/gojekfarm/xtools/kfq/riverkfq => ../../kfq/riverkfq
github.com/gojekfarm/xtools/xkafka => ../../xkafka
github.com/gojekfarm/xtools/xkafka/middleware => ../../xkafka/middleware
)

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.0.2
github.com/gojekfarm/xtools/kfq/riverkfq v0.0.0-00010101000000-000000000000
github.com/gojekfarm/xtools/xkafka v0.8.1
github.com/gojekfarm/xtools/xkafka/middleware v0.0.0-00010101000000-000000000000
github.com/jackc/pgx/v5 v5.6.0
github.com/lmittmann/tint v1.0.4
github.com/riverqueue/river v0.7.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.7.0
github.com/rs/xid v1.5.0
)

require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/riverqueue/river/riverdriver v0.7.0 // indirect
github.com/riverqueue/river/rivertype v0.7.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/text v0.16.0 // indirect
)
Loading

0 comments on commit ef9e1d7

Please sign in to comment.