Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Kafka tailer #16

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ The following table shows which `grok_exporter` version uses which `config_versi
Input Section
-------------

We currently support two input types: `file` and `stdin`. The following two sections describe the `file` input type and the `stdin` input type:
We currently support two input types: `file`, `stdin` and `kafka`. The following sections describe `file`, `stdin` and `kafka` input types:

### File Input Type

Expand Down Expand Up @@ -92,6 +92,30 @@ the exporter will terminate as soon as `sample.log` is processed,
and we will not be able to access the result via HTTP(S) after that.
Always use a command that keeps the output open (like `tail -f`) when testing the `grok_exporter` with the `stdin` input.

### Kafka Input Type
Grok exporter can subscribe to kafka topics and parse message text. If messages are in json format some fields can be extracted from message.

```yaml
input:
type: kafka
brokers: 'broker1:9092,broker2:9092'
topics: 'topic1,topic2'
jsonfields: 'loglevel,message,tag'
```
Brokers parameter is a comma separated list of Kafka brokers (mandatory for input type kafka). Topics parameter is comma separated list of topics to subscribe to (mandatory for input type kafka).
Jsonfields parameter is a comma separated list of JSON fields which values will be extracted from JSON message and concatinated to one line (space as separator).
Jsonfields is optional parameter for input type kafka. If omited kafka messages will be sent to parser as string.
I.e. having this message from kafka
```json
{message: "123", loglevel: "INFO", host: "myhost1", tag: "live"}
```
Parser will receive the text
```INFO 123 live```
So Grok regexp should be designed accordingly.

Messages from all topics are processed in same way (same grok parser).


Grok Section
------------

Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ git submodule update --init --recursive

The resulting `grok_exporter` binary will be dynamically linked to the Oniguruma library, i.e. it needs the Oniguruma library to run. The [releases] are statically linked with Oniguruma, i.e. the releases don't require Oniguruma as a run-time dependency. The releases are built with `release.sh`.

**Building grok_exporter docker image**
Building docker image requires docker installed.
```bash
go get get github.com/mbarzilovich/grok_exporter
cd $GOPATH/src/github.com/fstab/grok_exporter
./build_image.sh
```

More Documentation
------------------

Expand Down
31 changes: 31 additions & 0 deletions build_image.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/bash

set +e

cd $GOPATH/src/github.com/fstab/grok_exporter
docker run --rm --net none -it -v $GOPATH/src/github.com/fstab/grok_exporter:/root/go/src/github.com/fstab/grok_exporter ubuntu:16.04 rm -rf /root/go/src/github.com/fstab/grok_exporter/dist
mkdir dist

export VERSION=0.2.2.3
export ARCH=linux-amd64
export VERSION_FLAGS="\
-X github.com/fstab/grok_exporter/exporter.Version=$VERSION \
-X github.com/fstab/grok_exporter/exporter.BuildDate=$(date +%Y-%m-%d) \
-X github.com/fstab/grok_exporter/exporter.Branch=$(git rev-parse --abbrev-ref HEAD) \
-X github.com/fstab/grok_exporter/exporter.Revision=$(git rev-parse --short HEAD) \
"

#--------------------------------------------------------------
# Make sure all tests run.
#--------------------------------------------------------------

go fmt $(go list ./... | grep -v /vendor/)
go test $(go list ./... | grep -v /vendor/)
cp -r docker/* dist
cp -a logstash-patterns-core/patterns dist
docker run -v $GOPATH/src/github.com/fstab/grok_exporter:/root/go/src/github.com/fstab/grok_exporter --net none --rm -ti fstab/grok_exporter-compiler compile-$ARCH.sh -ldflags "$VERSION_FLAGS" -o dist/grok_exporter

cd dist
docker build -t grok_exporter:$VERSION -t grok_exporter:latest .


2 changes: 2 additions & 0 deletions config/v1/configV1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ metrics:
prometheus_label: user
server:
protocol: https
host: 0.0.0.0
port: 1111
`

Expand Down Expand Up @@ -111,6 +112,7 @@ metrics:
user: '{{.user}}'
server:
protocol: https
host: 0.0.0.0
port: 1111
`

Expand Down
32 changes: 24 additions & 8 deletions config/v2/configV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,17 @@ func (cfg *Config) String() string {
}

type GlobalConfig struct {
ConfigVersion int `yaml:"config_version,omitempty"`
ConfigVersion int `yaml:"config_version,omitempty"`
Debug bool `yaml:",omitempty"`
}

type InputConfig struct {
Type string `yaml:",omitempty"`
Path string `yaml:",omitempty"`
Readall bool `yaml:",omitempty"`
Type string `yaml:",omitempty"`
Path string `yaml:",omitempty"`
Readall bool `yaml:",omitempty"`
Brokers string `yaml:",omitempty"`
Topics string `yaml:",omitempty"`
Jsonfields string `yaml:",omitempty"`
}

type GrokConfig struct {
Expand Down Expand Up @@ -116,6 +120,7 @@ func (c *GlobalConfig) addDefaults() {
if c.ConfigVersion == 0 {
c.ConfigVersion = 2
}
c.Debug = false
}

func (c *InputConfig) addDefaults() {
Expand All @@ -124,14 +129,21 @@ func (c *InputConfig) addDefaults() {
}
}

func (c *GrokConfig) addDefaults() {}
func (c *GrokConfig) addDefaults() {
if c.PatternsDir == "" {
c.PatternsDir = "/patterns"
}
}

func (c *MetricsConfig) addDefaults() {}

func (c *ServerConfig) addDefaults() {
if c.Protocol == "" {
c.Protocol = "http"
}
if c.Host == "" {
c.Host = "0.0.0.0"
}
if c.Port == 0 {
c.Port = 9144
}
Expand Down Expand Up @@ -167,16 +179,20 @@ func (c *InputConfig) validate() error {
if c.Path == "" {
return fmt.Errorf("Invalid input configuration: 'input.path' is required for input type \"file\".")
}
case c.Type == "kafka":
if c.Brokers == "" {
return fmt.Errorf("Invalid input configuration: 'input.brokers' is required for input type \"kafka\".")
}
if c.Topics == "" {
return fmt.Errorf("Invalid input configuration: 'input.topics' is required for input type \"kafka\".")
}
default:
return fmt.Errorf("Unsupported 'input.type': %v", c.Type)
}
return nil
}

func (c *GrokConfig) validate() error {
if c.PatternsDir == "" && len(c.AdditionalPatterns) == 0 {
return fmt.Errorf("Invalid grok configuration: no patterns defined: one of 'grok.patterns_dir' and 'grok.additional_patterns' must be configured.")
}
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions config/v2/configV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ metrics:
label_b: '{{.some_grok_field_b}}'
server:
protocol: https
host: 0.0.0.0
port: 1111
`

Expand Down Expand Up @@ -77,6 +78,7 @@ metrics:
buckets: $BUCKETS
server:
protocol: http
host: 0.0.0.0
port: 9144
`

Expand All @@ -96,6 +98,7 @@ metrics:
quantiles: $QUANTILES
server:
protocol: http
host: 0.0.0.0
port: 9144
`

Expand Down
10 changes: 10 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM ubuntu:16.04

ADD patterns /patterns
ADD grok_exporter /grok_exporter

VOLUME /config.yml

EXPOSE 9144

CMD /grok_exporter -config /config.yml
4 changes: 4 additions & 0 deletions grok_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
printVersion = flag.Bool("version", false, "Print the grok_exporter version.")
configPath = flag.String("config", "", "Path to the config file. Try '-config ./example/config.yml' to get started.")
showConfig = flag.Bool("showconfig", false, "Print the current configuration to the console. Example: 'grok_exporter -showconfig -config ./exemple/config.yml'")
debug = flag.Bool("debug", false, "Print extra logs")
)

const (
Expand All @@ -46,6 +47,7 @@ func main() {
}
validateCommandLineOrExit()
cfg, warn, err := config.LoadConfigFile(*configPath)
cfg.Global.Debug = *debug
if len(warn) > 0 && !*showConfig {
// warning is suppressed when '-showconfig' is used
fmt.Fprintf(os.Stderr, "%v\n", warn)
Expand Down Expand Up @@ -244,6 +246,8 @@ func startTailer(cfg *v2.Config) (tailer.Tailer, error) {
tail = tailer.RunFileTailer(cfg.Input.Path, cfg.Input.Readall, nil)
case cfg.Input.Type == "stdin":
tail = tailer.RunStdinTailer()
case cfg.Input.Type == "kafka":
tail = tailer.RunKafkaTailer(cfg)
default:
return nil, fmt.Errorf("Config error: Input type '%v' unknown.", cfg.Input.Type)
}
Expand Down
4 changes: 2 additions & 2 deletions release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ function make_release {
cd ..
}

make_release native darwin-amd64
#make_release native darwin-amd64
make_release docker linux-amd64
make_release docker windows-amd64 .exe
#make_release docker windows-amd64 .exe
97 changes: 97 additions & 0 deletions tailer/kafkaTailer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package tailer

import (
"encoding/json"
"github.com/fstab/grok_exporter/config/v2"
"github.com/optiopay/kafka"
"log"
"strings"
)

type kafkaTailer struct {
lines chan string
errors chan error
}

func (t *kafkaTailer) Lines() chan string {
return t.lines
}

func (t *kafkaTailer) Errors() chan error {
return t.errors
}

func (t *kafkaTailer) Close() {
// broker.Close()
}

func RunConsumer(broker kafka.Client, lineChan chan string, errorChan chan error, topic string, partitions int32, cfg *v2.Config) {
fetchers := []kafka.Consumer{}
log.Printf("Creating consumer for topic %s", topic)
for partition := int32(0); partition < partitions; partition++ {
conf := kafka.NewConsumerConf(topic, partition)
conf.StartOffset = kafka.StartOffsetNewest
consumer, err := broker.Consumer(conf)
if err != nil {
log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err)
errorChan <- err
}
fetchers = append(fetchers, consumer)
}
mx := kafka.Merge(fetchers...)
defer mx.Close()
log.Printf("Consumer for topic %s is ready", topic)
for {
var data map[string]interface{}
msg, err := mx.Consume()
if err != nil {
if err != kafka.ErrMxClosed {
log.Printf("All consumers stopped. Cannot consume %q topic message: %s", topic, err)
}
errorChan <- err
return
}
s := []string{}
if cfg.Input.Jsonfields == "" {
s = append(s, string(msg.Value))
} else {
if err := json.Unmarshal([]byte(msg.Value), &data); err != nil {
log.Fatalf("Cannot unmarshal JSON message %s because of the error: %s", string(msg.Value), err)
}
for _, field := range strings.Split(cfg.Input.Jsonfields, ",") {
if data[field] != nil {
s = append(s, data[field].(string))
}
}
}
line := strings.Join(s, " ")
if cfg.Global.Debug {
log.Println("Sending line to parcer: " + line)
}
lineChan <- line
}
}

func RunKafkaTailer(cfg *v2.Config) Tailer {
lineChan := make(chan string)
errorChan := make(chan error)
topics := strings.Split(cfg.Input.Topics, ",")
brokerConf := kafka.NewBrokerConf("grok-exporter-client")
broker, err := kafka.Dial(strings.Split(cfg.Input.Brokers, ","), brokerConf)
if err != nil {
log.Fatalf("cannot connect to kafka cluster: %s", err)
errorChan <- err
}
for _, topic := range topics {
partitions, err := broker.PartitionCount(topic)
if err != nil {
log.Fatalf("Unable to fetch partitions from broker for topis %s\n", topic)
errorChan <- err
}
go RunConsumer(broker, lineChan, errorChan, topic, partitions, cfg)
}
return &kafkaTailer{
lines: lineChan,
errors: errorChan,
}
}
1 change: 1 addition & 0 deletions vendor/github.com/golang/snappy
Submodule snappy added at 553a64
1 change: 1 addition & 0 deletions vendor/github.com/optiopay/kafka
Submodule kafka added at 7b7b6a
13 changes: 13 additions & 0 deletions vendor/update.sh
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ git checkout 7be2ce36128ef1337a5348a7cb5a599830b42ac3
find . -type f | grep -v winfsnotify.go | xargs rm -f
find . -type d -empty -delete

###########################################################################
# github.com/optiopay/kafka
###########################################################################

cd $VENDOR
mkdir -p github.com/optiopay/
cd github.com/optiopay
git clone https://github.com/optiopay/kafka.git
cd kafka
git checkout 7b7b6a0d49740e36cd39226913b38cecf9ef76cb
rm -rf .git .gitignore .travis.yml LICENSE README.md


###########################################################################

find $VENDOR -type f -name '*_test.go' | xargs rm