Skip to content

Commit

Permalink
Add a list topic (with size) command
Browse files Browse the repository at this point in the history
  • Loading branch information
mpicque committed Jul 8, 2024
1 parent e45c3b6 commit 39f7cf3
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 98 deletions.
208 changes: 140 additions & 68 deletions consume.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
package main

import (
"context"
"errors"
"fmt"
"github.com/IBM/sarama"
cli "github.com/jawher/mow.cli"
"github.com/mouminoux/kafkacli/filter"
uuid "github.com/satori/go.uuid"
"log"
"os"
"os/signal"
"strings"

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
cli "github.com/jawher/mow.cli"
"github.com/mouminoux/kafkacli/filter"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"sync"
"syscall"
)

type stack []uintptr
type consumeError struct {
message string
cause error
}

func (f *consumeError) Error() string { return f.message }

var messageCount int

func consumeCmd(c *cli.Cmd) {
var (
prettyPrint = c.BoolOpt("p pretty-print", false, "pretty print the messages")
Expand All @@ -37,11 +48,11 @@ The currently supported filters:
cfg := config(*useSSL, *sslCAFile, *sslCertFile, *sslKeyFile)
f, err := parseFilters(*filters)
die(err)
consume(*cfg, splitFlatten(*bootstrapServers), splitFlatten(*topics), *prettyPrint, *fromBeginning, *consumerGroupId, *existOnLastMessage, f)
consume(cfg, splitFlatten(*bootstrapServers), splitFlatten(*topics), *prettyPrint, *fromBeginning, *consumerGroupId, *existOnLastMessage, f)
}
}

func consume(config cluster.Config,
func consume(config *sarama.Config,
bootstrapServers []string,
topics []string,
prettyPrint bool,
Expand All @@ -59,76 +70,84 @@ func consume(config cluster.Config,
uuidString := uuid.NewV4().String()
consumerGroupId = uuidString
}
consumer, err := cluster.NewConsumer(bootstrapServers, consumerGroupId, topics, &config)
die(err)

defer func() {
if err := consumer.Close(); err != nil {
log.Printf("error while closing consumer: %+v\n", err)
}
}()

// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

// consume errors
go func() {
for err := range consumer.Errors() {
fmt.Printf("Error: %v\n", err)
}
}()
/**
* Setup a new Sarama consumer group
*/
consumer := Consumer{
ready: make(chan bool),
filter: f,
prettyPrint: prettyPrint,
}

startConsuming := make(chan struct{})
var partitionToRead int
ctx, cancel := context.WithCancel(context.Background())
consumerGroup, err := sarama.NewConsumerGroup(bootstrapServers, consumerGroupId, config)
die(err)

// consume notifications
consumptionIsPaused := false
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
for ntf := range consumer.Notifications() {
fmt.Printf("Rebalanced: %+v\n", ntf)
if len(ntf.Claimed) != 0 {
for _, topic := range ntf.Claimed {
partitionToRead += len(topic)
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := consumerGroup.Consume(ctx, topics, &consumer); err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
startConsuming <- struct{}{}
log.Panicf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")

// consume messages, watch signals
<-startConsuming
sigusr1 := make(chan os.Signal, 1)
signal.Notify(sigusr1, syscall.SIGUSR1)

var messageCount int
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

for {
keepRunning := true
for keepRunning {
select {
case msg, ok := <-consumer.Messages():
if !ok {
continue
}
if !f(msg) {
continue
}
if prettyPrint {
displayMessagePretty(msg)
} else {
displayMessageUgly(msg)
}
messageCount++
marks := consumer.HighWaterMarks()
if existOnLastMessage && msg.Offset+1 == marks[msg.Topic][msg.Partition] {
partitionToRead -= 1
}

case <-signals:
partitionToRead = 0
}

if partitionToRead == 0 {
break
case <-ctx.Done():
log.Println("terminating: context cancelled")
keepRunning = false
case <-sigterm:
log.Println("terminating: via signal")
keepRunning = false
case <-sigusr1:
toggleConsumptionFlow(consumerGroup, &consumptionIsPaused)
}
}

log.Printf("%d messages received\n", messageCount)

cancel()
wg.Wait()
if err = consumerGroup.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}

func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
if *isPaused {
client.ResumeAll()
log.Println("Resuming consumption")
} else {
client.PauseAll()
log.Println("Pausing consumption")
}

*isPaused = !*isPaused
}

func displayMessagePretty(msg *sarama.ConsumerMessage) {
Expand Down Expand Up @@ -162,17 +181,24 @@ func parseFilters(ffs []string) (filter.Filter, error) {
for i, fs := range ffs {
parts := strings.SplitN(fs, ":", 2)
if len(parts) != 2 {
return nil, errors.Errorf("Invalid filter %q. must be in <filter-type>:<filter-condition> format", fs)
return nil, &consumeError{
message: fmt.Sprintf("Invalid filter %q. must be in <filter-type>:<filter-condition> format", fs),
}
}
switch parts[0] {
case "header", "h":
k, v, err := parseKEqV(parts[1])
if err != nil {
return nil, errors.Wrapf(err, "Invalid filter %q", fs)
return nil, &consumeError{
message: fmt.Sprintf("Invalid filter %q", fs),
cause: err,
}
}
ff[i] = filter.Header(k, v)
default:
return nil, errors.Errorf("Unknown filter type %q in filter %q", parts[0], fs)
return nil, &consumeError{
message: fmt.Sprintf("Unknown filter type %q in filter %q", parts[0], fs),
}
}

}
Expand All @@ -182,7 +208,53 @@ func parseFilters(ffs []string) (filter.Filter, error) {
func parseKEqV(s string) (string, string, error) {
parts := strings.SplitN(s, "=", 2)
if len(parts) != 2 {
return "", "", errors.Errorf("Invalid filter condition %q. must be in <x>=<y> format", s)
return "", "", &consumeError{
message: fmt.Sprintf("Invalid filter condition %q. must be in <x>=<y> format", s),
}
}
return parts[0], parts[1], nil
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
filter filter.Filter
prettyPrint bool
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(session sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case msg, ok := <-claim.Messages():
if !ok {
continue
}
if !consumer.filter(msg) {
continue
}
if consumer.prettyPrint {
displayMessagePretty(msg)
} else {
displayMessageUgly(msg)
}
messageCount++
case <-session.Context().Done():
return nil
}
}
}
9 changes: 4 additions & 5 deletions consumer_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package main

import (
"fmt"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"github.com/IBM/sarama"
cli "github.com/jawher/mow.cli"
"log"
"sort"
Expand All @@ -18,14 +17,14 @@ func consumerGroupsCmd(c *cli.Cmd) {

c.Action = func() {
cfg := config(*useSSL, *sslCAFile, *sslCertFile, *sslKeyFile)
consumerGroups(*cfg, splitFlatten(*bootstrapServers), sorted)
consumerGroups(cfg, splitFlatten(*bootstrapServers), sorted)
}
}

func consumerGroups(config cluster.Config, bootstrapServers []string, sorted *bool) {
func consumerGroups(config *sarama.Config, bootstrapServers []string, sorted *bool) {
fmt.Printf("Listing consumer groups for all topics on broker(s) %q\n", strings.Join(bootstrapServers, ", "))

newAdmin, err := sarama.NewClusterAdmin(bootstrapServers, &config.Config)
newAdmin, err := sarama.NewClusterAdmin(bootstrapServers, config)
die(err)

defer func() {
Expand Down
2 changes: 1 addition & 1 deletion filter/filter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package filter

import (
"github.com/Shopify/sarama"
"github.com/IBM/sarama"
)

type Filter func(message *sarama.ConsumerMessage) bool
Expand Down
2 changes: 1 addition & 1 deletion filter/header.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package filter

import (
"github.com/Shopify/sarama"
"github.com/IBM/sarama"
)

func Header(key, value string) Filter {
Expand Down
14 changes: 3 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,10 @@ module github.com/mouminoux/kafkacli
go 1.12

require (
github.com/DataDog/zstd v1.3.5
github.com/Shopify/sarama v1.21.0
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/davecgh/go-spew v1.1.1
github.com/eapache/go-resiliency v1.1.0
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
github.com/eapache/queue v1.1.0
github.com/golang/snappy v0.0.1
github.com/IBM/sarama v1.43.2
github.com/docker/go-units v0.5.0
github.com/jawher/mow.cli v1.1.0
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.8.1
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
github.com/satori/go.uuid v1.2.0
github.com/sirupsen/logrus v1.4.1 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
)
Loading

0 comments on commit 39f7cf3

Please sign in to comment.