Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Kafka tailer #16

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open

New Kafka tailer #16

wants to merge 4 commits into from

Conversation

mbarzilovich
Copy link

In my company we have the log stream as kafka topic.
That is why i added kafkaTailer module so grok_exporter can connect to kafka and feed grok with logs from kafka.

@fstab
Copy link
Owner

fstab commented Jun 22, 2017

Hi Mikhail, thanks a lot for your PR. I think it's a great idea to provide different inputs, and Kafka could be a good addition (especially for Microservice architectures).
Please allow me some time to review the code, I will get back to it in two weeks or so.

@fstab
Copy link
Owner

fstab commented Jul 3, 2017

Hi Mikhail, I tried out your Kafka tailer and it worked well. However, I am new to Kafka, and I have a question regarding the message format. From what I understand, Kafka does not enforce any particular format for event data. The first example in the official quickstart (step 4) uses unstructured text messages, and generally Kafka seems to work fine with JSON, XML, or other formats.

So to me, Kafka and JSON seem to be independent of each other. It might be a good idea to implement JSON support for grok_exporter (parsing log files containing structured JSON data), but I don't think it should be coupled with the Kafka tailer. That means, if you have JSON messages coming out of Kafka, and grok_exporter doesn't explicitly support JSON yet, you would need to parse JSON with grok expressions.

What do you think, can we remove JSON support from the Kafka tailer and maybe introduce it later as a general grok_exporter feature?

@hartfordfive
Copy link
Contributor

Any updates on this PR? I also have a hard requirement for this feature. Thanks!

@fstab
Copy link
Owner

fstab commented Aug 6, 2020

@hartfordfive unfortunately no update. I still think that "tailers" should be independent of the "log line format", and therefore I'd find it better to decouple the tailer/kafkaTailer.go from the JSON format (my understanding is that there could be Kafka messages in other formats like XML or unstructured text messages).

Regarding your question on the Kafka client library in #43: I agree that if samara is more widely used and more frequently maintained this would be preferable. In grok_exporter the only dependency to the Kafka client library should be in tailer/kafkaTailer.go, so it should not have any side effects to replace optiopay with samara.

Would you like to work on this? That'd be awesome.

@hartfordfive
Copy link
Contributor

I could definitely look into this and yes I agree the integration should not base itself on a specific message format as it could be in any format, although JSON is typically used with an ELK stack. I myself would definitely benefit from this Kafka integration as most logs I work with from all hosts and applications are published to their respective Kafka topics, which are later indexed in Elasticsearch. Instead of having a second application on each host reading the same byte stream (and consuming additional resources) I can simply consume the logs from these these topics on completely separate processing hosts.

I'll base myself on what was submitted to give myself a head start and switch the client library over to samara. Will get back to you once I have a PR ready.

@hartfordfive
Copy link
Contributor

hartfordfive commented Aug 27, 2020

@fstab

So I've started working on the code (please see my changes in the kafka-tailer branch in my fork), although now when I'm trying to build, the following error occurs:

# github.com/fstab/grok_exporter/exporter
exporter/metrics.go:474:8: cfg.MaxAge undefined (type *v3.MetricConfig has no field or method MaxAge)
exporter/metrics.go:475:27: cfg.MaxAge undefined (type *v3.MetricConfig has no field or method MaxAge)
# github.com/fstab/grok_exporter/tailer
vet: tailer/webhookTailer_test.go:24:6: TestWebhookTextSingle redeclared in this block
# github.com/fstab/grok_exporter/exporter
vet: exporter/metrics.go:474:9: cfg.MaxAge undefined (type *v3.MetricConfig has no field or method MaxAge)

Seems as if that struct property doesn't exist in the v3 config. Am I missing something or should I not have based my PR off your master branch? Keep in mind the PR is completed yet, I'm just trying to get past this issue.

I appreciate the help!

@fstab
Copy link
Owner

fstab commented Aug 27, 2020

You forked off your new branch from the "configurable sliding window" commit that added the MaxAge property, but your configV3.go is based on an older version of that file that does not have MaxAge. As a result, your commit removed MaxAge from configV3.go. The fastest way to fix this is probably to revert to the original configV3.go from master (which has MaxAge) and to copy your changes that you need for the Kafka tailer.

screenshot_2020-08-27_22:30:07_150554124

@hartfordfive
Copy link
Contributor

hartfordfive commented Aug 30, 2020

Ok so I've got an initial version working on this branch although there's a few issues I still need to solve:

  1. When terminating the app with SIGINT or SIGTERM, i gracefully shut down the kafka consumer. Unfortunately it appears the application doesn't terminate as it just continues in the for/select loop in the main goroutine. Is there a current function I can use to return from this loop? Would calling the Close() method of the fileTailer cause the app to exit the loop? The only issue with that is that the struct is not public so it couldn't be called anyways.
  2. Each tailer must implement the Lines(), Errors(), Close() method, although it seems that the Close() function is never called for my tailer. Can you give me a bit more details about this function?

Thanks!

@fstab
Copy link
Owner

fstab commented Aug 30, 2020

  1. The current behavior of grok_exporter is that you can terminate it with SIGING (typing Ctrl-C in a terminal). If that doesn't work on your branch, the reason is probably because you implemented your own signal handler, and now the default behavior (process termination) isn't triggered anymore. I think you don't need your own signal handler. If you don't have a signal handler, grok_exporter should terminate by default on SIGINT, and that's what you want.
  2. Close() is not called in production because the tailers run forever, but it's called a lot in tests. As soon as you write a test for the Kafka tailer, you will find the Close() function handy to terminate your test.

@hartfordfive
Copy link
Contributor

hartfordfive commented Sep 1, 2020

The issue is that the kafka tailer should be notified when the grok_exporter intercepts the kill signal so that it can gracefully terminate the consumer client. The following cancel() function would get called upon the interception of the kill signal:
https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go#L127

and then consequently cause the for loop within the goroutine to return:
https://github.com/hartfordfive/grok_exporter/blob/kafka-tailer-integration/tailer/kafkaTailer.go#L112

which then eventually allows the client to gracefully close:
https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go#L129

If the tailer's Close() function was called, I could then add the necessary code in there to accomplish this. I'm not sure on the exact specifics of the sarama client.Close() function although I do know that there could be some memory leaks if it's not called. In this case, the process would be terminated anyways, so I'm not sure how big of a deal that is. I could always raise an issue in the sarama repo and see what the maintainers say about it. Anyways, to sum it up, would it eventually be possible to have this Close() called when a tailer is terminated? I can imagine eventually other integrations might also need some type of cleanup operations before existing. Let me know your thoughts about this.

@fstab
Copy link
Owner

fstab commented Sep 1, 2020

It is risky to rely on the file tailers' Close() function in grok_exporter, because these Close() functions are very hard to implement and test. There are different tailers for different operating systems (Windows, BSD, Linux), and Close() potentially needs to interrupt system calls (inotify, kevent, winfsnotify's calls), which is hard to get right. Moreover, there are a couple of more tailers (webhook, polling file tailer, etc) and likely people will add more tailers in the future, so there are a lot of places where Close() could be implemented wrong.

There is even a known issue in Windows, and the shutdown test is skipped on Windows because Close() is never called in production:

if runtime.GOOS == "windows" {
t.Skip("The shutdown tests are flaky on Windows. We skip them until either golang.org/x/exp/winfsnotify is fixed, or until we do our own implementation. This shouldn't be a problem when running grok_exporter, because in grok_exporter the file system watcher is never stopped.")
return
}

We should avoid relying on Close() for proper termination.

I suspect that graceful shutdown of the sarama client is not necessary if all we want is to terminate the application. We don't need to care about potential memory leaks or hung goroutines, because grok_exporter will be terminated. Kafka should be fine with a client disappearing without saying goodbye.

I'm pretty sure it's safe not to install a signal handler for sigterm and to just terminate. But of course asking the sarama maintainers or verifying this in their code is a good idea.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants