Skip to content

Commit

Permalink
Do not commit offset anymore
Browse files Browse the repository at this point in the history
  • Loading branch information
mouminoux committed Jul 26, 2018
1 parent cd77853 commit 04d0b22
Showing 1 changed file with 11 additions and 12 deletions.
23 changes: 11 additions & 12 deletions kafkacli.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,7 @@ func main() {
select {
case msg, ok := <-consumer.Messages():
if ok {

fmt.Printf("[%s]----------------\n", msg.Timestamp)
fmt.Printf("Headers on %s/%d:", msg.Topic, msg.Partition)
for _, header := range msg.Headers {
fmt.Printf(" %s=%s", header.Key, header.Value)
}
fmt.Printf("\n")

fmt.Printf("Message on %s/%d: [%s]%s\n", msg.Topic, msg.Partition, msg.Key, msg.Value)

// fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\t%v\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value, msg.Headers)
consumer.MarkOffset(msg, "") // mark message as processed
displayMessage(msg)
}
case <-signals:
return
Expand All @@ -88,6 +77,16 @@ func main() {
app.Run(os.Args)
}

func displayMessage(msg *sarama.ConsumerMessage) {
fmt.Printf("[%s]----------------\n", msg.Timestamp)
fmt.Printf("Headers on %s/%d:", msg.Topic, msg.Partition)
for _, header := range msg.Headers {
fmt.Printf(" %s=%s", header.Key, header.Value)
}
fmt.Printf("\n")
fmt.Printf("Message on %s/%d: [%s]%s\n", msg.Topic, msg.Partition, msg.Key, msg.Value)
}

func die(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
Expand Down

0 comments on commit 04d0b22

Please sign in to comment.