Skip to content

Commit

Permalink
Split consume/produce into 2 separate commands (#7)
Browse files Browse the repository at this point in the history
## Call samples:

### Consume:

`kafkacli -b <broker> consume --pretty-print  <topic1>  <topic1>`

### Produce:

`echo "lol" | kafkacli -b <broker> produce  -H <key>=<value> <topic>`

`cat payload.json | kafkacli -b <broker> produce  -H T<key>=<value> <topic>`

`kafkacli -b <broker> produce  -H <key>=<value> -m="<payload>" <topic>`

## Pretty print
Also, rework the message formatting to (optionally) handle pretty print:

Sample output:

```
---------------- [2019-04-23 17:04:14.687 +0200 CEST]  <topic>/<partition> ----------------
(Headers):
- "Key": "Value"
- "X-Correlation-Id": "7c2cff80-8ea3-4c34-ae72-ef20190be4be"
- "PAYLOAD_TYPE": "TESt"

(Payload):
>big blob of text>
```
  • Loading branch information
jawher authored Apr 26, 2019
1 parent 4b38557 commit 435ccf9
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 169 deletions.
63 changes: 55 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,62 @@

Kafkacli is a consumer and producer client for Apache Kafka.

### Usage:
## Usage:
```
kafkacli [-b] -t... [--from-beginning] [-g] [-m] [-h...]
Usage: kafkacli [-b...] [--ssl-cafile] [--ssl-certfile] [--ssl-keyfile] COMMAND [arg...]
Options:
-b, --broker brokers (default "localhost:9092")
-t, --topic topic
Kafkacli
Options:
-b, --broker brokers (default ["localhost:9092"])
-s, --secure use SSL
--ssl-cafile filename of CA file to use in certificate verification (env $KAFKACLI_SSL_CA_FILE)
--ssl-certfile filename of file in PEM format containing the client certificate (env $KAFKACLI_SSL_CERT_FILE)
--ssl-keyfile filename containing the client private key (env $KAFKACLI_SSL_KEY_FILE)
Commands:
consume consume and display messages from 1 or more topics
produce produce a message into 1 or more topics
Run 'kafkacli COMMAND --help' for more information on a command.
```

The options listed above must be set before the command, e.g:

`kafkacli -b <broker> consume notification`

### Consume:

```
kafkacli consume --help
Usage: kafkacli consume [--from-beginning] [-g] [-e] [-p] TOPIC...
consume and display messages from 1 or more topics
Arguments:
TOPIC topic(s) to consume from
Options:
-p, --pretty-print pretty print the messages
--from-beginning start with the earliest message
-g, --consumer-group consumer group id
-m, --message message message
-h, --header message header <key=value>
-g, --consumer-group consumer group id. If unset, a random one will be generated
-e, --exit exit when last message received
```

### Produce

```
kafkacli produce --help
Usage: kafkacli produce [-H...] [-m=<message|->] TOPIC...
produce a message into 1 or more topics
Arguments:
TOPIC topic(s) to consume from
Options:
-H, --header message header <key=value>
-m, --message message message
```
139 changes: 139 additions & 0 deletions consume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package main

import (
"fmt"
"log"
"os"
"os/signal"
"strings"

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
cli "github.com/jawher/mow.cli"
uuid "github.com/satori/go.uuid"
)

func consumeCmd(c *cli.Cmd) {
var (
prettyPrint = c.BoolOpt("p pretty-print", false, "pretty print the messages")
fromBeginning = c.BoolOpt("from-beginning", false, "start with the earliest message")
consumerGroupId = c.StringOpt("g consumer-group", "", "consumer group id. If unset, a random one will be generated")
existOnLastMessage = c.BoolOpt("e exit", false, "exit when last message received")

topics = c.Strings(cli.StringsArg{
Name: "TOPIC",
Desc: "topic(s) to consume from",
})
)

c.Spec = "[--from-beginning] [-g] [-e] [-p] TOPIC..."

c.Action = func() {
cfg := config(*useSSL, *sslCAFile, *sslCertFile, *sslKeyFile)
consume(*cfg, splitFlatten(*bootstrapServers), splitFlatten(*topics), *prettyPrint, *fromBeginning, *consumerGroupId, *existOnLastMessage)
}
}

func consume(config cluster.Config, bootstrapServers []string, topics []string, prettyPrint bool, fromBeginning bool, consumerGroupId string, existOnLastMessage bool) {
fmt.Printf("Consuming from topic(s) %q, broker(s) %q\n", strings.Join(topics, ", "), strings.Join(bootstrapServers, ", "))

if fromBeginning {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}

if consumerGroupId == "" {
uuidString := uuid.NewV4().String()
consumerGroupId = uuidString
}
consumer, err := cluster.NewConsumer(bootstrapServers, consumerGroupId, topics, &config)
die(err)

defer func() {
if err := consumer.Close(); err != nil {
log.Printf("error while closing consumer: %+v\n", err)
}
}()

// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

// consume errors
go func() {
for err := range consumer.Errors() {
fmt.Printf("Error: %v\n", err)
}
}()

startConsuming := make(chan struct{})
var partitionToRead int

// consume notifications
go func() {
for ntf := range consumer.Notifications() {
fmt.Printf("Rebalanced: %+v\n", ntf)
if len(ntf.Claimed) != 0 {
for _, topic := range ntf.Claimed {
partitionToRead += len(topic)
}
startConsuming <- struct{}{}
}
}
}()

// consume messages, watch signals
<-startConsuming

var messageCount int

for {
select {
case msg, ok := <-consumer.Messages():
if ok {
if prettyPrint {
displayMessagePretty(msg)
} else {
displayMessageUgly(msg)
}
messageCount++
marks := consumer.HighWaterMarks()
if existOnLastMessage && msg.Offset+1 == marks[msg.Topic][msg.Partition] {
partitionToRead -= 1
}
}
case <-signals:
partitionToRead = 0
}

if partitionToRead == 0 {
break
}
}
log.Printf("%d messages received\n", messageCount)
}

func displayMessagePretty(msg *sarama.ConsumerMessage) {
fmt.Printf("---------------- [%v] %s/%d ----------------\n", msg.Timestamp, msg.Topic, msg.Partition)
fmt.Printf("(Headers):\n")
for _, header := range msg.Headers {
fmt.Printf("- %q: %q\n", header.Key, header.Value)
}
if msg.Key != nil {
fmt.Printf("\n(Key): %s\n", msg.Key)
}
fmt.Printf("\n(Payload):\n%s\n\n", msg.Value)
}

func displayMessageUgly(msg *sarama.ConsumerMessage) {
fmt.Printf("[%s] %s/%d----------------\n", msg.Timestamp, msg.Topic, msg.Partition)
fmt.Printf("Headers:")
for _, header := range msg.Headers {
fmt.Printf(" %s=%s", header.Key, header.Value)
}
fmt.Printf("\n")
fmt.Printf("Message")
if msg.Key != nil {
fmt.Printf("[%s]%", msg.Key)
}
fmt.Printf(": %s\n", msg.Value)
}
Loading

0 comments on commit 435ccf9

Please sign in to comment.