Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.firehose): Add new plugin #15988

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

syedmhashim
Copy link

Summary

The firehose input plugin would be used to receive data from AWS Data Firehose.

Checklist

  • No AI generated code was used in this PR

Related issues

resolves #15870

@telegraf-tiger telegraf-tiger bot added the feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin label Oct 7, 2024
@telegraf-tiger
Copy link
Contributor

telegraf-tiger bot commented Oct 7, 2024

@srebhan srebhan changed the title feat(inputs/firehose): add input plugin for AWS Data Firehose feat(inputs.firehose): Add new plugin Oct 8, 2024
@telegraf-tiger telegraf-tiger bot added the plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins label Oct 8, 2024
Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@syedmhashim I do have some comments, mostly on style, so overall this looks quite promising. I urge you to keep the code as simple and less nested as possible to ease review and debugging. And of course some unit-tests would be great!

@@ -0,0 +1,123 @@
# AWS Data Firehose HTTP Listener Input Plugin

Firehose HTTP Listener is a service input plugin that listens for metrics sent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Firehose HTTP Listener is a service input plugin that listens for metrics sent
This is a service input plugin that listens for metrics sent

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 90 to 97
data_format = "json_v2"
[[inputs.firehose.json_v2]]
timestamp_path = "timestamp"
timestamp_format = "unix_ms"
[[inputs.firehose.json_v2.tag]]
path = "type"
[[inputs.firehose.json_v2.field]]
path = "message"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please either use the xpath parser here or use a simple string value or somethin? Let's not promote the json_v2 parser more than necessary, especially when dealing with arrays...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 57 to 191
for _, method := range allowedMethods {
if r.req.Method == method {
isAcceptedMethod = true
break
}
}
if !isAcceptedMethod {
r.responseStatusCode = http.StatusMethodNotAllowed
return fmt.Errorf("%s method not allowed", r.req.Method)
}

contentType := r.req.Header.Get("content-type")
if contentType != "application/json" {
r.responseStatusCode = http.StatusBadRequest
return fmt.Errorf("%s content type not accepted", contentType)
}

contentEncoding := r.req.Header.Get("content-encoding")
if contentEncoding != "" && contentEncoding != "gzip" {
r.responseStatusCode = http.StatusBadRequest
return fmt.Errorf("%s content encoding not accepted", contentEncoding)
}

err := r.extractBody()
if err != nil {
return err
}

requestID := r.req.Header.Get(requestIDHeader)
if requestID == "" {
r.responseStatusCode = http.StatusBadRequest
return fmt.Errorf("%s header is not set", requestIDHeader)
}

if requestID != r.body.RequestID {
r.responseStatusCode = http.StatusBadRequest
return fmt.Errorf("requestId in the body does not match the value of the request header, %s", requestIDHeader)
}

return nil
}

func (r *firehoseRequest) extractBody() error {
encoding := r.req.Header.Get("content-encoding")
switch encoding {
case "gzip":
g, err := gzip.NewReader(r.req.Body)
if err != nil {
r.responseStatusCode = http.StatusBadRequest
return fmt.Errorf("unable to decode body - %s", err.Error())
}
defer g.Close()
err = json.NewDecoder(g).Decode(&r.body)
if err != nil {
r.responseStatusCode = http.StatusBadRequest
return err
}
default:
defer r.req.Body.Close()
err := json.NewDecoder(r.req.Body).Decode(&r.body)
if err != nil {
r.responseStatusCode = http.StatusBadRequest
return err
}
}
return nil
}

func (r *firehoseRequest) decodeData() ([][]byte, bool) {
// decode base64-encoded data and return them as a slice of byte slices
decodedData := make([][]byte, 0)
for _, record := range r.body.Records {
data, err := base64.StdEncoding.DecodeString(record.EncodedData)
if err != nil {
return nil, false
}
decodedData = append(decodedData, data)
}
return decodedData, true
}

func (r *firehoseRequest) sendResponse(res http.ResponseWriter) error {
responseBody := struct {
RequestID string `json:"requestId"`
Timestamp int64 `json:"timestamp"`
ErrorMessage string `json:"errorMessage,omitempty"`
}{
RequestID: r.req.Header.Get(requestIDHeader),
Timestamp: time.Now().Unix(),
ErrorMessage: statusCodeToMessage[r.responseStatusCode],
}
response, err := json.Marshal(responseBody)
if err != nil {
return err
}
res.Header().Set("content-type", "application/json")
res.WriteHeader(r.responseStatusCode)
_, err = res.Write(response)
return err
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest moving this to an own file to make the plugin file more readable. Is there an upstream definition of those types in the AWS SDK v2?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I'll have a look. If I'm not able to find something in the AWS SDK v2, then I'll definitely move this to a new file

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. moved to a separate file

//go:embed sample.conf
var sampleConfig string

var once sync.Once
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be per plugin-instance and not globally.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. Btw I do see this declared globally in every other (input) plugins

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)

// TimeFunc provides a timestamp for the metrics
type TimeFunc func() time.Time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this in http_listener_v2 as well as in influxdb_v2_listener. Thought it's used somewhere downstream in telegraf. If this is not the case, please let me know and I'll remove this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove. We had this in the dark age where we could not test metrics without pinning the timestamp...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


parameters, ok := parameters["commonAttributes"].(map[string]interface{})
if !ok {
f.Log.Warn(formatLog(r.body.RequestID, "Invalid value for header "+commonAttributesHeader))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I don't think you should simply continue here rather return here...

Copy link
Author

@syedmhashim syedmhashim Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of parameter tags as something optional. So if were able to extract and find them, we'd add to the metrics otherwise we'll ignore. Thus, I'm just emitting a warning log. Now that you mentioned, it does makes sense to maybe respond with bad request status and return here. Agree?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, but I would then at least skip the for loop below as this is known to not return anything but just burns some cycles...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Comment on lines 377 to 391
for _, parameter := range f.ParameterTags {
if value, ok := parameters[parameter]; ok {
for _, m := range metrics {
m.AddTag(parameter, value.(string))
}
}
}
}

for _, m := range metrics {
if f.PathTag {
m.AddTag(pathTag, req.URL.Path)
}
f.acc.AddMetric(m)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really would collect all tags first and then iterate over the metrics once to add them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Does the following makes sense:

        if f.PathTag {
		for _, m := range metrics {
			m.AddTag(pathTag, req.URL.Path)
		}
	}
	
	for _, m := range metrics {
		f.acc.AddMetric(m)
	}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I want is (pseudocode)

	tags := make(map[string]string, len(f.ParameterTags)+1)
	if f.pathTag {
		tags[f.pathTag] = req.URL.Path
	}
	for _, parameter := range f.ParameterTags {
		if value, found := parameters[parameter]; found {
			if v, ok := value.(string); ok {
				tags[parameter] = v
			}
		}
	}

	for _, m := range metrics {
		for k, v := range tags {
			m.AddTag(k , v)
		}
		f.acc.AddMetric(m)
	}

Copy link
Author

@syedmhashim syedmhashim Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually f.PathTag variable is a boolean. When set to True, we add a tag, firehose_http_path to each metric with value set to the request path, i.e, req.URL.Path. It's unrelated to f.ParameterTags. The purpose of this was to allow the plugin to receive request on more than one request paths. So with the following configuration:

[[inputs.firehose]]
  ## Address and port to host HTTP listener on
  service_address = ":8080"

  ## Paths to listen to.
  paths = ["/telegraf/dev", "/telegraf/stage"]

  ## Save path as firehose_http_path tag if set to true
  path_tag = true

our input plugin would accept requests for both of these paths. This would allow us to have a single telegraf server running, serving both our dev and stage environments. If we want, we can also add the prod environment here or have a separate telegraf server running for it. And if the f.PathTag is set to True we'll add a tag firehose_http_path to each metric with it's value set to the actual request path e.g telegraf/dev. This is something I thought could be useful and is optional therefore. I did add comments about this in the readme and sample config

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Comment on lines 400 to 402
func formatLog(requestID, message string) string {
return fmt.Sprintf(logFormat, requestID, message)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please get rid of this function!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 407 to 409
ServiceAddress: ":8080",
TimeFunc: time.Now,
Paths: []string{"/telegraf"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please set the default values of ServiceAddress and Paths in Init() to reuse it in tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Btw if it's only for tests can't we do it the way it's done here? Just a thought

Copy link
Member

@srebhan srebhan Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I hate these types of indirections. Imagine a test fails and you quickly want to look at what's going on. You immediately need to jump around in code to see how a simple test case is set up... :-( So please avoid this if possible!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

ServiceAddress: ":8080",
TimeFunc: time.Now,
Paths: []string{"/telegraf"},
close: make(chan struct{}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also do in Init().

@srebhan srebhan self-assigned this Oct 8, 2024
@syedmhashim
Copy link
Author

@syedmhashim I do have some comments, mostly on style, so overall this looks quite promising. I urge you to keep the code as simple and less nested as possible to ease review and debugging. And of course some unit-tests would be great!

Thanks for the review. Will definitely go over the comments and update accordingly. Just FYI, I have been using http_listener_v2 input plugin as a reference to write the code. The unit tests will come soon as well

@srebhan
Copy link
Member

srebhan commented Oct 9, 2024

Yeah we do have some older code we didn't adapt yet but things changed both on the golang side as well as on us being more strict with the way things are done. ;-) No worries, we will figure it out together. :-)

@srebhan
Copy link
Member

srebhan commented Oct 10, 2024

@syedmhashim I'm loosing a bit the track in the discussion above, could you please push an update with the stuff that is clear and then we discuss the unclear parts?

@syedmhashim
Copy link
Author

@srebhan Hey! Just pushed some changes and commented "done" on the threads that I resolved. Sorry for the delay. I got occupied with my job

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AWS Data Firehose HTTP Endpoint Input Plugin
2 participants