Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.firehose): Add new plugin #15988

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/inputs/all/firehose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || inputs || inputs.firehose

package all

import _ "github.com/influxdata/telegraf/plugins/inputs/firehose" // register plugin
117 changes: 117 additions & 0 deletions plugins/inputs/firehose/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# AWS Data Firehose HTTP Listener Input Plugin

This is a service input plugin that listens for metrics sent
via HTTP from AWS Data Firehose. It strictly follows the request response
schema as describe in the official
[documentation](https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html).

## Service Input <!-- @/docs/includes/service_input.md -->

This plugin is a service input. Normal plugins gather metrics determined by the
interval setting. Service plugins start a service to listens and waits for
metrics or events to occur. Service plugins have two key differences from
normal plugins:

1. The global or plugin specific `interval` setting may not apply
2. The CLI options of `--test`, `--test-wait`, and `--once` may not produce
output for this plugin

## Global configuration options <!-- @/docs/includes/plugin_config.md -->

In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Configuration

```toml @sample.conf
# AWS Data Firehose HTTP listener
[[inputs.firehose]]
## Address and port to host HTTP listener on
service_address = ":8080"

## Paths to listen to.
# paths = ["/telegraf"]

## Save path as firehose_http_path tag if set to true
# path_tag = false

## maximum duration before timing out read of the request
# read_timeout = "10s"
## maximum duration before timing out write of the response
# write_timeout = "10s"

## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]

## Add service certificate and key
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"

## Minimal TLS version accepted by the server
# tls_min_version = "TLS12"

## Optional access key to accept for authentication.
## AWS Data Firehose uses "x-amz-firehose-access-key" header to set the access key
# access_key = "foobar"

## Optional setting to add parameters as tags
## If the http header "x-amz-firehose-common-attributes" is not present on the request, no corresponding tag will be added
## The header value should be a json and should follow the schema as describe in the official documentation: https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#requestformat
# parameter_tags = ["env"]

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

## Metrics

Metrics are collected from the `records.[*].data` field in the request body.
The data must be base64 encoded and may be sent in any supported
[data format][data_format]. Metrics are parsed depending on the value of
`data_format`.

## Example Output

When run with this configuration:

```toml
[[inputs.firehose]]
service_address = ":8080"
paths = ["/telegraf"]
path_tag = true
data_format = "value"
data_type = "string"
```

the following curl command:

```sh
curl -i -XPOST 'localhost:8080/telegraf' \
--header 'x-amz-firehose-request-id: ed4acda5-034f-9f42-bba1-f29aea6d7d8f' \
--header 'Content-Type: application/json' \
--data '{
"requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f",
"timestamp": 1578090901599,
"records": [
{
"data": "aGVsbG8gd29ybGQK" // "hello world"
}
]
}'
```

produces:

```text
firehose,firehose_http_path=/telegraf,type=example message="hello" 1725001851000000000
```

[data_format]: /docs/DATA_FORMATS_INPUT.md
241 changes: 241 additions & 0 deletions plugins/inputs/firehose/firehose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
//go:generate ../../../tools/readme_config_includer/generator
package firehose

import (
"crypto/tls"
_ "embed"
"encoding/json"
"errors"
"net"
"net/http"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/choice"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)

//go:embed sample.conf
var sampleConfig string

var allowedMethods = []string{http.MethodPost, http.MethodPut}
var statusCodeToMessage = map[int]string{
http.StatusBadRequest: "bad request",
http.StatusMethodNotAllowed: "method not allowed",
http.StatusRequestEntityTooLarge: "request body too large",
http.StatusUnauthorized: "unauthorized",
http.StatusOK: "",
}

// Firehose is an input plugin that collects external metrics sent via HTTP from AWS Data Firhose
type Firehose struct {
ServiceAddress string `toml:"service_address"`
Paths []string `toml:"paths"`
PathTag bool `toml:"path_tag"`
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
AccessKey config.Secret `toml:"access_key"`
ParameterTags []string `toml:"parameter_tags"`

tlsint.ServerConfig
tlsConf *tls.Config

once sync.Once
Log telegraf.Logger

wg sync.WaitGroup
close chan struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whenever I see this it calls for a Context. ;-)


listener net.Listener

parser telegraf.Parser
acc telegraf.Accumulator
}

func (*Firehose) SampleConfig() string {
return sampleConfig
}

func (f *Firehose) Gather(_ telegraf.Accumulator) error {
return nil
}

func (f *Firehose) SetParser(parser telegraf.Parser) {
f.parser = parser
}

func (f *Firehose) Init() error {
if f.ServiceAddress == "" {
f.ServiceAddress = ":8080"
}
if len(f.Paths) == 0 {
f.Paths = []string{"/telegraf"}
}
if f.ReadTimeout < config.Duration(time.Second) {
f.ReadTimeout = config.Duration(time.Second * 10)
}
if f.WriteTimeout < config.Duration(time.Second) {
f.WriteTimeout = config.Duration(time.Second * 10)
}
Comment on lines +78 to +83
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done in the Init() function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


var err error
f.tlsConf, err = f.ServerConfig.TLSConfig()
if err != nil {
return err
}

return nil
}

// Start starts the http listener service.
func (f *Firehose) Start(acc telegraf.Accumulator) error {
f.acc = acc

var err error
if f.tlsConf != nil {
f.listener, err = tls.Listen("tcp", f.ServiceAddress, f.tlsConf)
} else {
f.listener, err = net.Listen("tcp", f.ServiceAddress)
}
if err != nil {
return err
}

server := &http.Server{
Addr: f.ServiceAddress,
Handler: f,
ReadTimeout: time.Duration(f.ReadTimeout),
WriteTimeout: time.Duration(f.WriteTimeout),
TLSConfig: f.tlsConf,
}

f.wg.Add(1)
go func() {
defer f.wg.Done()
if err := server.Serve(f.listener); err != nil {
if !errors.Is(err, net.ErrClosed) {
f.Log.Errorf("starting server failed: %v", err)
}
close(f.close)
}
}()

f.Log.Infof("Listening on %s", f.listener.Addr().String())

return nil
}

// Stop cleans up all resources
func (f *Firehose) Stop() {
if f.listener != nil {
f.listener.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using f.server.Shutdown()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh, not sure. Again just copied this block from http_listener_v2 plugin and does work as expected. I haven't yet inspected why this is used instead of f.server.Shutdown, but I'll definitely have a look.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take everything from http_listener_v2 with care as this is a really old plugin. Check everything against current go standards!

}
f.wg.Wait()
}

func (f *Firehose) ServeHTTP(res http.ResponseWriter, req *http.Request) {
srebhan marked this conversation as resolved.
Show resolved Hide resolved
if !choice.Contains(req.URL.Path, f.Paths) {
res.WriteHeader(http.StatusNotFound)
return
}

r := &firehoseRequest{req: req}
requestID := req.Header.Get("x-amz-firehose-request-id")

if err := r.authenticate(f.AccessKey); err != nil {
f.Log.Error(err.Error())
if err = r.sendResponse(res); err != nil {
f.Log.Errorf("error sending response to request %s, %v", requestID, err.Error())
}
return
}

if err := r.validate(); err != nil {
f.Log.Error(err.Error())
if err = r.sendResponse(res); err != nil {
f.Log.Errorf("error sending response to request %s, %v", requestID, err.Error())
}
return
}

decodedBytesData, ok := r.decodeData()
if !ok {
f.Log.Errorf("failed to base64 decode record data from request %s", requestID)
if err := r.sendResponse(res); err != nil {
f.Log.Errorf("error sending response to request %s, %v", requestID, err.Error())
}
return
}

var metrics []telegraf.Metric
for _, bytes := range decodedBytesData {
m, err := f.parser.Parse(bytes)
if err != nil {
f.Log.Errorf("unable to parse data from request %s", requestID)
// respond with bad request status code to inform firehose about the failure
r.responseStatusCode = http.StatusBadRequest
if err = r.sendResponse(res); err != nil {
f.Log.Errorf("error sending response to request %s, %v", requestID, err.Error())
}
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please think about continuing parsing if that makes sense in your scenario to get as much information as we can...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what you're saying but I do think we should return here because we are sending an http.StatusBadRequest response just above this code block so that firehose knows that the request was not able to go through and there are issues with the sent data (which I think is important)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to add that as a comment to the code!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
metrics = append(metrics, m...)
}

if len(metrics) == 0 {
f.once.Do(func() {
f.Log.Info(internal.NoMetricsCreatedMsg)
})
return
}

attributesHeader := req.Header.Get("x-amz-firehose-common-attributes")
if len(attributesHeader) != 0 && len(f.ParameterTags) != 0 {
var parameters map[string]interface{}
if err := json.Unmarshal([]byte(attributesHeader), &parameters); err != nil {
f.Log.Warnf("x-amz-firehose-common-attributes header's value is not a valid json in request %s", requestID)
}

parameters, ok := parameters["commonAttributes"].(map[string]interface{})
if !ok {
f.Log.Warnf("Invalid value for header x-amz-firehose-common-attributes in request %s", requestID)
} else {
for _, parameter := range f.ParameterTags {
if value, ok := parameters[parameter]; ok {
for _, m := range metrics {
m.AddTag(parameter, value.(string))
}
}
}
}
}

if f.PathTag {
for _, m := range metrics {
m.AddTag("firehose_http_path", req.URL.Path)
}
}

for _, m := range metrics {
f.acc.AddMetric(m)
}

r.responseStatusCode = http.StatusOK
if err := r.sendResponse(res); err != nil {
f.Log.Errorf("error sending response to request %s, %v", requestID, err.Error())
}
}

func init() {
inputs.Add("firehose", func() telegraf.Input {
return &Firehose{
ServiceAddress: ":8080",
Paths: []string{"/telegraf"},
close: make(chan struct{}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also do in Init().

}
})
}
Loading
Loading