From a5aed4a3bc5288f6faeb52ee38edbe99b8560540 Mon Sep 17 00:00:00 2001 From: Mikhail Barzilovich Date: Sat, 22 Apr 2017 16:32:20 +0700 Subject: [PATCH 1/4] Add kafka input --- config/v2/configV2.go | 19 ++++++-- grok_exporter.go | 12 +++++ release.sh | 4 +- tailer/kafkaTailer.go | 77 ++++++++++++++++++++++++++++++++ vendor/github.com/golang/snappy | 1 + vendor/github.com/optiopay/kafka | 1 + 6 files changed, 108 insertions(+), 6 deletions(-) create mode 100644 tailer/kafkaTailer.go create mode 160000 vendor/github.com/golang/snappy create mode 160000 vendor/github.com/optiopay/kafka diff --git a/config/v2/configV2.go b/config/v2/configV2.go index 0804081d..36513183 100644 --- a/config/v2/configV2.go +++ b/config/v2/configV2.go @@ -42,13 +42,17 @@ func (cfg *Config) String() string { } type GlobalConfig struct { - ConfigVersion int `yaml:"config_version,omitempty"` + ConfigVersion int `yaml:"config_version,omitempty"` + Debug bool `yaml:",omitempty"` } type InputConfig struct { - Type string `yaml:",omitempty"` - Path string `yaml:",omitempty"` - Readall bool `yaml:",omitempty"` + Type string `yaml:",omitempty"` + Path string `yaml:",omitempty"` + Readall bool `yaml:",omitempty"` + Brokers string `yaml:",omitempty"` + Topics string `yaml:",omitempty"` + Jsonfields string `yaml:",omitempty"` } type GrokConfig struct { @@ -167,6 +171,13 @@ func (c *InputConfig) validate() error { if c.Path == "" { return fmt.Errorf("Invalid input configuration: 'input.path' is required for input type \"file\".") } + case c.Type == "kafka": + if c.Brokers == "" { + return fmt.Errorf("Invalid input configuration: 'input.brokers' is required for input type \"kafka\".") + } + if c.Topics == "" { + return fmt.Errorf("Invalid input configuration: 'input.topics' is required for input type \"kafka\".") + } default: return fmt.Errorf("Unsupported 'input.type': %v", c.Type) } diff --git a/grok_exporter.go b/grok_exporter.go index f32799d5..9db6f8ef 100644 --- a/grok_exporter.go +++ b/grok_exporter.go @@ -21,9 +21,11 @@ import ( "github.com/fstab/grok_exporter/config/v2" "github.com/fstab/grok_exporter/exporter" "github.com/fstab/grok_exporter/tailer" + "github.com/optiopay/kafka" "github.com/prometheus/client_golang/prometheus" "net/http" "os" + "strings" "time" ) @@ -31,6 +33,7 @@ var ( printVersion = flag.Bool("version", false, "Print the grok_exporter version.") configPath = flag.String("config", "", "Path to the config file. Try '-config ./example/config.yml' to get started.") showConfig = flag.Bool("showconfig", false, "Print the current configuration to the console. Example: 'grok_exporter -showconfig -config ./exemple/config.yml'") + debug = flag.Bool("debug", false, "Print extra logs") ) const ( @@ -46,6 +49,7 @@ func main() { } validateCommandLineOrExit() cfg, warn, err := config.LoadConfigFile(*configPath) + cfg.Global.Debug = *debug if len(warn) > 0 && !*showConfig { // warning is suppressed when '-showconfig' is used fmt.Fprintf(os.Stderr, "%v\n", warn) @@ -244,6 +248,14 @@ func startTailer(cfg *v2.Config) (tailer.Tailer, error) { tail = tailer.RunFileTailer(cfg.Input.Path, cfg.Input.Readall, nil) case cfg.Input.Type == "stdin": tail = tailer.RunStdinTailer() + case cfg.Input.Type == "kafka": + conf := kafka.NewBrokerConf("grok-exporter-client") + broker, err := kafka.Dial(strings.Split(cfg.Input.Brokers, ","), conf) + if err != nil { + // log.Fatalf("cannot connect to kafka cluster: %s", err) + fmt.Printf("cannot connect to kafka cluster: %s/n", err) + } + tail = tailer.RunKafkaTailer(broker, cfg) default: return nil, fmt.Errorf("Config error: Input type '%v' unknown.", cfg.Input.Type) } diff --git a/release.sh b/release.sh index f01e4dc2..74648a34 100755 --- a/release.sh +++ b/release.sh @@ -51,6 +51,6 @@ function make_release { cd .. } -make_release native darwin-amd64 +#make_release native darwin-amd64 make_release docker linux-amd64 -make_release docker windows-amd64 .exe +#make_release docker windows-amd64 .exe diff --git a/tailer/kafkaTailer.go b/tailer/kafkaTailer.go new file mode 100644 index 00000000..38ec35e6 --- /dev/null +++ b/tailer/kafkaTailer.go @@ -0,0 +1,77 @@ +package tailer + +import ( + "encoding/json" + "github.com/fstab/grok_exporter/config/v2" + "github.com/optiopay/kafka" + "log" + "strings" +) + +type kafkaTailer struct { + lines chan string + errors chan error +} + +func (t *kafkaTailer) Lines() chan string { + return t.lines +} + +func (t *kafkaTailer) Errors() chan error { + return t.errors +} + +func (t *kafkaTailer) Close() { + // broker.Close() +} + +func RunKafkaTailer(broker kafka.Client, cfg *v2.Config) Tailer { + lineChan := make(chan string) + errorChan := make(chan error) + topics := strings.Split(cfg.Input.Topics, ",") + var partition int32 = 0 + var data map[string]interface{} + for _, topic := range topics { + go func() { + log.Printf("Creating consumer for topic %s", topic) + conf := kafka.NewConsumerConf(topic, partition) + conf.StartOffset = kafka.StartOffsetNewest + consumer, err := broker.Consumer(conf) + if err != nil { + log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err) + errorChan <- err + } + log.Printf("Consumer for topic %s is ready", topic) + for { + msg, err := consumer.Consume() + if err != nil { + if err != kafka.ErrNoData { + log.Printf("cannot consume %q topic message: %s", topic, err) + } + errorChan <- err + return + } + s := []string{} + if cfg.Input.Jsonfields == "" { + s = append(s, string(msg.Value)) + } else { + if err := json.Unmarshal([]byte(msg.Value), &data); err != nil { + log.Fatalf("Cannot unmarshal JSON message %s because of the error: %s", string(msg.Value), err) + } + for _, field := range strings.Split(cfg.Input.Jsonfields, ",") { + s = append(s, data[field].(string)) + } + } + line := strings.Join(s, " ") + if cfg.Global.Debug { + log.Println("Sending line to parcer: " + line) + } + lineChan <- line + } + }() + } + return &stdinTailer{ + lines: lineChan, + errors: errorChan, + } +} diff --git a/vendor/github.com/golang/snappy b/vendor/github.com/golang/snappy new file mode 160000 index 00000000..553a6414 --- /dev/null +++ b/vendor/github.com/golang/snappy @@ -0,0 +1 @@ +Subproject commit 553a641470496b2327abcac10b36396bd98e45c9 diff --git a/vendor/github.com/optiopay/kafka b/vendor/github.com/optiopay/kafka new file mode 160000 index 00000000..7b7b6a0d --- /dev/null +++ b/vendor/github.com/optiopay/kafka @@ -0,0 +1 @@ +Subproject commit 7b7b6a0d49740e36cd39226913b38cecf9ef76cb From e7ff3fc0be98ad10221408dab7a80c07e486a2d1 Mon Sep 17 00:00:00 2001 From: Mikhail Barzilovich Date: Tue, 9 May 2017 15:55:32 +0700 Subject: [PATCH 2/4] Improve code and docs --- CONFIG.md | 26 +++++++++++++++++++++++++- README.md | 8 ++++++++ build_image.sh | 31 +++++++++++++++++++++++++++++++ config/v2/configV2.go | 13 +++++++++---- docker/Dockerfile | 10 ++++++++++ vendor/update.sh | 13 +++++++++++++ 6 files changed, 96 insertions(+), 5 deletions(-) create mode 100755 build_image.sh create mode 100644 docker/Dockerfile diff --git a/CONFIG.md b/CONFIG.md index 173de77a..7f34c6e6 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -51,7 +51,7 @@ The following table shows which `grok_exporter` version uses which `config_versi Input Section ------------- -We currently support two input types: `file` and `stdin`. The following two sections describe the `file` input type and the `stdin` input type: +We currently support two input types: `file`, `stdin` and `kafka`. The following sections describe `file`, `stdin` and `kafka` input types: ### File Input Type @@ -92,6 +92,30 @@ the exporter will terminate as soon as `sample.log` is processed, and we will not be able to access the result via HTTP(S) after that. Always use a command that keeps the output open (like `tail -f`) when testing the `grok_exporter` with the `stdin` input. +### Kafka Input Type +Grok exporter can subscribe to kafka topics and parse message text. If messages are in json format some fields can be extracted from message. + +```yaml +input: + type: kafka + brokers: 'broker1:9092,broker2:9092' + topics: 'topic1,topic2' + jsonfields: 'loglevel,message,tag' +``` +Brokers parameter is a comma separated list of Kafka brokers (mandatory for input type kafka). Topics parameter is comma separated list of topics to subscribe to (mandatory for input type kafka). +Jsonfields parameter is a comma separated list of JSON fields which values will be extracted from JSON message and concatinated to one line (space as separator). +Jsonfields is optional parameter for input type kafka. If omited kafka messages will be sent to parser as string. +I.e. having this message from kafka +```json +{message: "123", loglevel: "INFO", host: "myhost1", tag: "live"} +``` +Parser will receive the text +```INFO 123 live``` +So Grok regexp should be designed accordingly. + +Messages from all topics are processed in same way (same grok parser). + + Grok Section ------------ diff --git a/README.md b/README.md index ecbde91e..9b66ce02 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,14 @@ git submodule update --init --recursive The resulting `grok_exporter` binary will be dynamically linked to the Oniguruma library, i.e. it needs the Oniguruma library to run. The [releases] are statically linked with Oniguruma, i.e. the releases don't require Oniguruma as a run-time dependency. The releases are built with `release.sh`. +**Building grok_exporter docker image** +Building docker image requires docker installed. +```bash +go get get github.com/mbarzilovich/grok_exporter +cd $GOPATH/src/github.com/fstab/grok_exporter +./build_image.sh +``` + More Documentation ------------------ diff --git a/build_image.sh b/build_image.sh new file mode 100755 index 00000000..ad67cb6e --- /dev/null +++ b/build_image.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +set +e + +cd $GOPATH/src/github.com/fstab/grok_exporter +docker run --rm --net none -it -v $GOPATH/src/github.com/fstab/grok_exporter:/root/go/src/github.com/fstab/grok_exporter ubuntu:16.04 rm -rf /root/go/src/github.com/fstab/grok_exporter/dist +mkdir dist + +export VERSION=0.2.2 +export ARCH=linux-amd64 +export VERSION_FLAGS="\ + -X github.com/fstab/grok_exporter/exporter.Version=$VERSION \ + -X github.com/fstab/grok_exporter/exporter.BuildDate=$(date +%Y-%m-%d) \ + -X github.com/fstab/grok_exporter/exporter.Branch=$(git rev-parse --abbrev-ref HEAD) \ + -X github.com/fstab/grok_exporter/exporter.Revision=$(git rev-parse --short HEAD) \ +" + +#-------------------------------------------------------------- +# Make sure all tests run. +#-------------------------------------------------------------- + +go fmt $(go list ./... | grep -v /vendor/) +go test $(go list ./... | grep -v /vendor/) +cp -r docker/* dist +cp -a logstash-patterns-core/patterns dist +docker run -v $GOPATH/src/github.com/fstab/grok_exporter:/root/go/src/github.com/fstab/grok_exporter --net none --rm -ti fstab/grok_exporter-compiler compile-$ARCH.sh -ldflags "$VERSION_FLAGS" -o dist/grok_exporter + +cd dist +docker build -t grok_exporter:$VERSION -t grok_exporter:latest . + + diff --git a/config/v2/configV2.go b/config/v2/configV2.go index 36513183..a4699482 100644 --- a/config/v2/configV2.go +++ b/config/v2/configV2.go @@ -120,6 +120,7 @@ func (c *GlobalConfig) addDefaults() { if c.ConfigVersion == 0 { c.ConfigVersion = 2 } + c.Debug = false } func (c *InputConfig) addDefaults() { @@ -128,7 +129,11 @@ func (c *InputConfig) addDefaults() { } } -func (c *GrokConfig) addDefaults() {} +func (c *GrokConfig) addDefaults() { + if c.PatternsDir == "" { + c.PatternsDir = "/patterns" + } +} func (c *MetricsConfig) addDefaults() {} @@ -136,6 +141,9 @@ func (c *ServerConfig) addDefaults() { if c.Protocol == "" { c.Protocol = "http" } + if c.Host == "" { + c.Host = "0.0.0.0" + } if c.Port == 0 { c.Port = 9144 } @@ -185,9 +193,6 @@ func (c *InputConfig) validate() error { } func (c *GrokConfig) validate() error { - if c.PatternsDir == "" && len(c.AdditionalPatterns) == 0 { - return fmt.Errorf("Invalid grok configuration: no patterns defined: one of 'grok.patterns_dir' and 'grok.additional_patterns' must be configured.") - } return nil } diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 00000000..bdd54edc --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,10 @@ +FROM ubuntu:16.04 + +ADD patterns /patterns +ADD grok_exporter /grok_exporter + +VOLUME /config.yml + +EXPOSE 9144 + +CMD /grok_exporter -config /config.yml \ No newline at end of file diff --git a/vendor/update.sh b/vendor/update.sh index e26b35d0..dee542d7 100755 --- a/vendor/update.sh +++ b/vendor/update.sh @@ -110,6 +110,19 @@ git checkout 7be2ce36128ef1337a5348a7cb5a599830b42ac3 find . -type f | grep -v winfsnotify.go | xargs rm -f find . -type d -empty -delete +########################################################################### +# github.com/optiopay/kafka +########################################################################### + +cd $VENDOR +mkdir -p github.com/optiopay/ +cd github.com/optiopay +git clone https://github.com/optiopay/kafka.git +cd kafka +git checkout 7b7b6a0d49740e36cd39226913b38cecf9ef76cb +rm -rf .git .gitignore .travis.yml LICENSE README.md + + ########################################################################### find $VENDOR -type f -name '*_test.go' | xargs rm From 78091a647b8b168e8f84cc6180e7196d9ed6f1a0 Mon Sep 17 00:00:00 2001 From: Mikhail Barzilovich Date: Thu, 18 May 2017 17:50:07 +0700 Subject: [PATCH 3/4] Fix multi topic consumption --- tailer/kafkaTailer.go | 82 ++++++++++++++++++++++--------------------- 1 file changed, 42 insertions(+), 40 deletions(-) diff --git a/tailer/kafkaTailer.go b/tailer/kafkaTailer.go index 38ec35e6..23b2e9e2 100644 --- a/tailer/kafkaTailer.go +++ b/tailer/kafkaTailer.go @@ -25,52 +25,54 @@ func (t *kafkaTailer) Close() { // broker.Close() } +func RunConsumer(broker kafka.Client, lineChan chan string, errorChan chan error, topic string, cfg *v2.Config) { + var data map[string]interface{} + var partition int32 = 0 + log.Printf("Creating consumer for topic %s", topic) + conf := kafka.NewConsumerConf(topic, partition) + conf.StartOffset = kafka.StartOffsetNewest + consumer, err := broker.Consumer(conf) + if err != nil { + log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err) + errorChan <- err + } + log.Printf("Consumer for topic %s is ready", topic) + for { + msg, err := consumer.Consume() + if err != nil { + if err != kafka.ErrNoData { + log.Printf("cannot consume %q topic message: %s", topic, err) + } + errorChan <- err + return + } + s := []string{} + if cfg.Input.Jsonfields == "" { + s = append(s, string(msg.Value)) + } else { + if err := json.Unmarshal([]byte(msg.Value), &data); err != nil { + log.Fatalf("Cannot unmarshal JSON message %s because of the error: %s", string(msg.Value), err) + } + for _, field := range strings.Split(cfg.Input.Jsonfields, ",") { + s = append(s, data[field].(string)) + } + } + line := strings.Join(s, " ") + if cfg.Global.Debug { + log.Println("Sending line to parcer: " + line) + } + lineChan <- line + } +} + func RunKafkaTailer(broker kafka.Client, cfg *v2.Config) Tailer { lineChan := make(chan string) errorChan := make(chan error) topics := strings.Split(cfg.Input.Topics, ",") - var partition int32 = 0 - var data map[string]interface{} for _, topic := range topics { - go func() { - log.Printf("Creating consumer for topic %s", topic) - conf := kafka.NewConsumerConf(topic, partition) - conf.StartOffset = kafka.StartOffsetNewest - consumer, err := broker.Consumer(conf) - if err != nil { - log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err) - errorChan <- err - } - log.Printf("Consumer for topic %s is ready", topic) - for { - msg, err := consumer.Consume() - if err != nil { - if err != kafka.ErrNoData { - log.Printf("cannot consume %q topic message: %s", topic, err) - } - errorChan <- err - return - } - s := []string{} - if cfg.Input.Jsonfields == "" { - s = append(s, string(msg.Value)) - } else { - if err := json.Unmarshal([]byte(msg.Value), &data); err != nil { - log.Fatalf("Cannot unmarshal JSON message %s because of the error: %s", string(msg.Value), err) - } - for _, field := range strings.Split(cfg.Input.Jsonfields, ",") { - s = append(s, data[field].(string)) - } - } - line := strings.Join(s, " ") - if cfg.Global.Debug { - log.Println("Sending line to parcer: " + line) - } - lineChan <- line - } - }() + go RunConsumer(broker, lineChan, errorChan, topic, cfg) } - return &stdinTailer{ + return &kafkaTailer{ lines: lineChan, errors: errorChan, } From a9693391f5b7b7cdb2cf22bab9af3ee90c55d570 Mon Sep 17 00:00:00 2001 From: Mikhail Barzilovich Date: Wed, 21 Jun 2017 16:16:51 +0700 Subject: [PATCH 4/4] Improve multi partition kafka consumer --- build_image.sh | 2 +- config/v1/configV1_test.go | 2 ++ config/v2/configV2_test.go | 3 +++ grok_exporter.go | 10 +------- tailer/kafkaTailer.go | 48 ++++++++++++++++++++++++++------------ 5 files changed, 40 insertions(+), 25 deletions(-) diff --git a/build_image.sh b/build_image.sh index ad67cb6e..024f7697 100755 --- a/build_image.sh +++ b/build_image.sh @@ -6,7 +6,7 @@ cd $GOPATH/src/github.com/fstab/grok_exporter docker run --rm --net none -it -v $GOPATH/src/github.com/fstab/grok_exporter:/root/go/src/github.com/fstab/grok_exporter ubuntu:16.04 rm -rf /root/go/src/github.com/fstab/grok_exporter/dist mkdir dist -export VERSION=0.2.2 +export VERSION=0.2.2.3 export ARCH=linux-amd64 export VERSION_FLAGS="\ -X github.com/fstab/grok_exporter/exporter.Version=$VERSION \ diff --git a/config/v1/configV1_test.go b/config/v1/configV1_test.go index a67da6ea..969f63d5 100644 --- a/config/v1/configV1_test.go +++ b/config/v1/configV1_test.go @@ -65,6 +65,7 @@ metrics: prometheus_label: user server: protocol: https + host: 0.0.0.0 port: 1111 ` @@ -111,6 +112,7 @@ metrics: user: '{{.user}}' server: protocol: https + host: 0.0.0.0 port: 1111 ` diff --git a/config/v2/configV2_test.go b/config/v2/configV2_test.go index 1cd139f9..2b7d21d5 100644 --- a/config/v2/configV2_test.go +++ b/config/v2/configV2_test.go @@ -38,6 +38,7 @@ metrics: label_b: '{{.some_grok_field_b}}' server: protocol: https + host: 0.0.0.0 port: 1111 ` @@ -77,6 +78,7 @@ metrics: buckets: $BUCKETS server: protocol: http + host: 0.0.0.0 port: 9144 ` @@ -96,6 +98,7 @@ metrics: quantiles: $QUANTILES server: protocol: http + host: 0.0.0.0 port: 9144 ` diff --git a/grok_exporter.go b/grok_exporter.go index 9db6f8ef..79673d25 100644 --- a/grok_exporter.go +++ b/grok_exporter.go @@ -21,11 +21,9 @@ import ( "github.com/fstab/grok_exporter/config/v2" "github.com/fstab/grok_exporter/exporter" "github.com/fstab/grok_exporter/tailer" - "github.com/optiopay/kafka" "github.com/prometheus/client_golang/prometheus" "net/http" "os" - "strings" "time" ) @@ -249,13 +247,7 @@ func startTailer(cfg *v2.Config) (tailer.Tailer, error) { case cfg.Input.Type == "stdin": tail = tailer.RunStdinTailer() case cfg.Input.Type == "kafka": - conf := kafka.NewBrokerConf("grok-exporter-client") - broker, err := kafka.Dial(strings.Split(cfg.Input.Brokers, ","), conf) - if err != nil { - // log.Fatalf("cannot connect to kafka cluster: %s", err) - fmt.Printf("cannot connect to kafka cluster: %s/n", err) - } - tail = tailer.RunKafkaTailer(broker, cfg) + tail = tailer.RunKafkaTailer(cfg) default: return nil, fmt.Errorf("Config error: Input type '%v' unknown.", cfg.Input.Type) } diff --git a/tailer/kafkaTailer.go b/tailer/kafkaTailer.go index 23b2e9e2..987af66a 100644 --- a/tailer/kafkaTailer.go +++ b/tailer/kafkaTailer.go @@ -25,23 +25,28 @@ func (t *kafkaTailer) Close() { // broker.Close() } -func RunConsumer(broker kafka.Client, lineChan chan string, errorChan chan error, topic string, cfg *v2.Config) { - var data map[string]interface{} - var partition int32 = 0 +func RunConsumer(broker kafka.Client, lineChan chan string, errorChan chan error, topic string, partitions int32, cfg *v2.Config) { + fetchers := []kafka.Consumer{} log.Printf("Creating consumer for topic %s", topic) - conf := kafka.NewConsumerConf(topic, partition) - conf.StartOffset = kafka.StartOffsetNewest - consumer, err := broker.Consumer(conf) - if err != nil { - log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err) - errorChan <- err + for partition := int32(0); partition < partitions; partition++ { + conf := kafka.NewConsumerConf(topic, partition) + conf.StartOffset = kafka.StartOffsetNewest + consumer, err := broker.Consumer(conf) + if err != nil { + log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err) + errorChan <- err + } + fetchers = append(fetchers, consumer) } + mx := kafka.Merge(fetchers...) + defer mx.Close() log.Printf("Consumer for topic %s is ready", topic) for { - msg, err := consumer.Consume() + var data map[string]interface{} + msg, err := mx.Consume() if err != nil { - if err != kafka.ErrNoData { - log.Printf("cannot consume %q topic message: %s", topic, err) + if err != kafka.ErrMxClosed { + log.Printf("All consumers stopped. Cannot consume %q topic message: %s", topic, err) } errorChan <- err return @@ -54,7 +59,9 @@ func RunConsumer(broker kafka.Client, lineChan chan string, errorChan chan error log.Fatalf("Cannot unmarshal JSON message %s because of the error: %s", string(msg.Value), err) } for _, field := range strings.Split(cfg.Input.Jsonfields, ",") { - s = append(s, data[field].(string)) + if data[field] != nil { + s = append(s, data[field].(string)) + } } } line := strings.Join(s, " ") @@ -65,12 +72,23 @@ func RunConsumer(broker kafka.Client, lineChan chan string, errorChan chan error } } -func RunKafkaTailer(broker kafka.Client, cfg *v2.Config) Tailer { +func RunKafkaTailer(cfg *v2.Config) Tailer { lineChan := make(chan string) errorChan := make(chan error) topics := strings.Split(cfg.Input.Topics, ",") + brokerConf := kafka.NewBrokerConf("grok-exporter-client") + broker, err := kafka.Dial(strings.Split(cfg.Input.Brokers, ","), brokerConf) + if err != nil { + log.Fatalf("cannot connect to kafka cluster: %s", err) + errorChan <- err + } for _, topic := range topics { - go RunConsumer(broker, lineChan, errorChan, topic, cfg) + partitions, err := broker.PartitionCount(topic) + if err != nil { + log.Fatalf("Unable to fetch partitions from broker for topis %s\n", topic) + errorChan <- err + } + go RunConsumer(broker, lineChan, errorChan, topic, partitions, cfg) } return &kafkaTailer{ lines: lineChan,