Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add include_keys and exclude_keys options under the sink #2975

Closed
daixba opened this issue Jul 3, 2023 · 22 comments · Fixed by #2989
Closed

Add include_keys and exclude_keys options under the sink #2975

daixba opened this issue Jul 3, 2023 · 22 comments · Fixed by #2989
Labels
enhancement New feature or request
Milestone

Comments

@daixba
Copy link
Contributor

daixba commented Jul 3, 2023

Is your feature request related to a problem? Please describe.
Customers would like to allowlist Some data to OpenSearch and send all discarded data to S3 Sink. Right now there is not an easy way to do it, it would be nice to add options of include_keys and exclude_keys under the sink.

Describe the solution you'd like
This additional options must be supported in OpenSearch sink and S3 sink, and can be extended to others as well.

The minimal usage is as below:

sink: 
  - opensearch:
     ...
      include_keys:  [ "key-one", "key-one" ]
            
  - opensearch:
     ...
      exclude_keys:  [ "key-three" ]

  - s3:
     ...
      exclude_keys:  [ "key-one", "key-one" ]

The assumption is that customers should not use both in the one sink. So if include_keys are defined, exclude_keys will be ignored.

For OpenSearch sink, this list of keys are under the document_root_key if document_root_key is defined.

It would be nice if create a list with name to be used as allowlist in one sink and reuse same name with negation as deny list in another sink. Such as:

sink: 
  - opensearch:
      include_keys:
        waf_logs_desired : [ "key-one", "key-one" ] <<< This is like defining a list of Keys with name “waf_logs_desired” and to reuse same name in deny_list.
            
  - s3:
      exclude_keys: waf_logs_desired
@daixba
Copy link
Contributor Author

daixba commented Jul 3, 2023

Additional limitations:

  • The value of keys in the addtional options must not contain /. e.g. 'http' is allowed, but 'http/status' is not.
  • If the key does not exists, the result will not include the key. A special case is that if none of the keys in the included_keys are found, the record will be discarded. e.g. if the event is {"name": "a"}, and the included_keys are ["value"], in this case, this event record will be ignored by the sink. The metrics may be impacted by this.

Also, some extra information for this addtional options to work with document_root_key in OpenSearch sink.

Taking below input as an example:

{
    status: 200,
    message: null,
    metadata: {
        sourceIp: "123.212.49.58",
        destinationIp: "79.54.67.231",
        bytes: 3545,
        duration: "15 ms"
    }
}
  • if document_root_key is not set, and include_keys is ["metadata"], the result is { "metadata": {sourceIp: ....}}
  • if document_root_key is set to metadata and the include_keys is not set, the result is { sourceIp: ..., } (metadata is not included.
  • if document_root_key is set to metadata and the include_keys is set to ["duration"], the result is { "duration": "15 ms"}

@graytaylor0
Copy link
Member

graytaylor0 commented Jul 3, 2023

Overall result looks good. A couple of thoughts

sink: 
  - opensearch:
      include_keys:
        waf_logs_desired : [ "key-one", "key-one" ] <<< This is like defining a list of Keys with name “waf_logs_desired” and to reuse same name in deny_list.
            
  - s3:
      exclude_keys: waf_logs_desired

I see what is trying to be done here, but I don't think it's very clean, and I know it won't be clean or easy to do in the code (sharing the values between sinks is not going to be a simple add). I think we should just support the list of strings for now (it's still just a copy paste to get this functionality).

One thing I might also suggest is that we make this an object in sink context

filtering:
   include_keys: []
   exclude_keys: []

I think this gives some more room for enhancements in the future (maybe we add a regex flag, etc) , but I am also ok with keeping these at the highest level.

Lastly, there is a consideration to be made for how this is implemented (because the Events are shared in memory between all sinks of a sub-pipeline). There are 2 options, one prioritizes memory usage, and the other prioritizes cpu time. I think they could both be implemented in the json string builder in JacksonEvent to include or exclude the keys in the json string

Option 1 - Copy the JsonNode of the Event without the keys that are being excluded, and then return it as a json string (this prioritizes cpu time and adds memory overhead)

Option 2 - Search through the JsonNode (BFS or DFS), and construct the json string while you search and exclude the desired keys (this prioritizes memory and adds cpu overhead)

Given that we can already support the functionality to send different key Events to different sinks with sub-pipelines, and this approach has a memory overhead because Events are copied between sub-pipelines, I am leaning towards favoring Option 2

@kkondaka
Copy link
Collaborator

kkondaka commented Jul 3, 2023

Agree with Taylor's suggestions. Taylor and I discussed this in detail. I suggest that we should do Option 2 mentioned by Taylor because memory is precious resource and we cannot afford to duplicate JsonNode

@engechas
Copy link
Collaborator

engechas commented Jul 4, 2023

I agree on option 2 to avoid the copy.

The value of keys in the addtional options must not contain /. e.g. 'http' is allowed, but 'http/status' is not.

Does this exclude supporting the DataPrepper JSONPointer notation? In the example given, it'd be nice to be able to do

include_keys: ["/status", "/metadata/sourceIp", "/metadata/bytes"]

to get a resulting object like

{
  "status": 200,
  "metadata": {
    "sourceIp": "123.212.49.58",
    "bytes": 3545
  }
}

@daixba
Copy link
Contributor Author

daixba commented Jul 4, 2023

Thanks for all the comments. I was trying option 1 earlier (yes, it will need to a copy of nodes again for each event records), but I will do more digging in option 2. I have thought about that but I am not so sure of what JsonNode can do. So far, I have very limited operations exposed by JacksonEvent, The Json string builder currently is just a string mutation. But I will see how I can extend this.

And I agree that we just support a list of strings for now (as the minimal usage). So far, to define a dynamic name and share that among multiple sinks, it must be done similar to the route, which is a up level node. But this also sounds a bad design. Again, this is nice to have. I agreed that we don't support this so far as customers can simply copy the list again in the yaml.

@daixba
Copy link
Contributor Author

daixba commented Jul 4, 2023

I agree on option 2 to avoid the copy.

The value of keys in the addtional options must not contain /. e.g. 'http' is allowed, but 'http/status' is not.

Does this exclude supporting the DataPrepper JSONPointer notation? In the example given, it'd be nice to be able to do

include_keys: ["/status", "/metadata/sourceIp", "/metadata/bytes"]

to get a resulting object like

{
  "status": 200,
  "metadata": {
    "sourceIp": "123.212.49.58",
    "bytes": 3545
  }
}

For option 1, this is difficult. That is why I added a limitation earlier that the keys must not contains /. But option 2 should make this easier.

@daixba
Copy link
Contributor Author

daixba commented Jul 4, 2023

Also, I would suggest to leave the include_keys and exclude_keys in the highest level (under the sink) for now, as filtering sounds confusing. My first throught of filtering is that I may have 10 records at first but end to 5 records in the sink, but that is not what we are doing here.

So until we have an enhancement in the future, let's keep it in the highest level, or if we can have another alternative node name rather than filtering. I hope this makes sense.

@asifsmohammed asifsmohammed added enhancement New feature or request and removed untriaged labels Jul 5, 2023
@daixba
Copy link
Contributor Author

daixba commented Jul 6, 2023

I almost finish the implementation of option2, but I found one issue during my test and would like to get your thoughts. @graytaylor0 @kkondaka @engechas

I just noticed that the Jackson Event is shared in all sinks in the same pipeline, meanning that they are dealing with same memory instead of separate copies. Problem with that is the later operations won't have the original event message as it's already modified.

For example, if a simple message is {"name": "hello", "age": 1}, if first sink is to include key name, and the second sink is to include key age.

When first sink "processed" the event, the message is already modified to {"name": "hello"}, then the second sink can't get any info of age any longer, hence the result is nothing. This is not what we expected.

Thanks.

@graytaylor0
Copy link
Member

I just noticed that the Jackson Event is shared in all sinks in the same pipeline, meanning that they are dealing with same memory instead of separate copies. Problem with that is the later operations won't have the original event message as it's already modified.

This was the reason we have the two options (either we have to copy the jsonNode completely when creating the jsonString for the Event without the keys, or we have to search through with option 2). To not hit the issue described above in option 2, you should not be modifying the json node at all, you should just be searching through it and constructing the json string by reading from the json node

@daixba
Copy link
Contributor Author

daixba commented Jul 6, 2023

Thanks Talor, I misunderstood your point earlier, I thought you want to do in-place modification to aviod any extra space in memeory.

@dlvenable
Copy link
Member

This is somewhat like using the remove keys processor, but on sinks. It seems that this solution won't scale as the number of items that could go here is almost just as variable as processors.

I think that using a sub-pipeline is the right solution here. What is the problem with using a sub-pipeline? Performance? Verbosity? Something else? I think we should fix that issue so that we don't have to keep adding processors into the sinks.

@sharraj
Copy link

sharraj commented Jul 7, 2023

We need this capability in complimentary to mutations in the pipelines via creating sub-pipelines. This solution is not precluding user from forking multiple sub-pipelines and create copies of events (at the cost of memory explosion) if different mutations are needed to process events differently and then sent to sinks. In case where events doesn't need different processing, there is no need to create sub-pipelines. Sub-pipeline approach is not optimal for every case as it causes a long yaml and memory explosion and should be done only when needed. For a simpler case of just filtering while sending to sinks, this is better approach to have a allow list and deny list at the sinks level. Also at sinks we have many settings today like bulk_sizing, DLQs and this allow-list/deny-list is another valuable capability to filter events.

@kkondaka
Copy link
Collaborator

kkondaka commented Jul 8, 2023

I almost finish the implementation of option2, but I found one issue during my test and would like to get your thoughts. @graytaylor0 @kkondaka @engechas

I just noticed that the Jackson Event is shared in all sinks in the same pipeline, meanning that they are dealing with same memory instead of separate copies. Problem with that is the later operations won't have the original event message as it's already modified.

For example, if a simple message is {"name": "hello", "age": 1}, if first sink is to include key name, and the second sink is to include key age.

When first sink "processed" the event, the message is already modified to {"name": "hello"}, then the second sink can't get any info of age any longer, hence the result is nothing. This is not what we expected.

Thanks.

This is exactly we want to avoid if possible. Only the string should include/exclude keys without modifying the JsonNode itself.

@kkondaka
Copy link
Collaborator

kkondaka commented Jul 8, 2023

This is somewhat like using the remove keys processor, but on sinks. It seems that this solution won't scale as the number of items that could go here is almost just as variable as processors.

I think that using a sub-pipeline is the right solution here. What is the problem with using a sub-pipeline? Performance? Verbosity? Something else? I think we should fix that issue so that we don't have to keep adding processors into the sinks.

@dlvenable, yes, the issue is the memory. If all we want is to write to two sinks with some keys excluded/included, the same event needs to be copied at least one more time when using sub-pipeline. This is an optimization only in the sinks where copying the event is unnecessary because transformed event is going to be written to the sink anyways. There is a "copy" when an event is transformed to String using event.toJsonString. We are trying to explore and see if it's possible exclude/include keys while converting from event to document or event to json/avro (etc when output codec is used)

@dlvenable
Copy link
Member

@kkondaka , I do think the copy is a problem. And I suggest that we solve the underlying problem rather than throw on a band-aid for one specific situation.

@daixba
Copy link
Contributor Author

daixba commented Jul 12, 2023

@dlvenable When I first saw this requirement, My first thought is also that this should be in scope of processor rather than in sink, It makes more sense that way.

I am not sure if this will help, consider it as a formatter rather than a processor (as we are not changing the event), instead, we just print it in the way it's expected in the sink. Something like we may print json record as a csv record and store that in S3.

And This is definely not the first one. If you check document_root_key option in OpenSearch sink, it's performing the same, I am sure there should be some reasons why it's not a processor. (The document root key doesn't change the event, however, does change the outcome in the sink from the original event, it's just a toString() basically )

Anyway, let me know the final decision of this. And we may also add this a processor maybe in the future (Although we support using delete_entries to do exclude_keys, but we can't do include_keys with any existing processors.)

Thanks.

@dlvenable
Copy link
Member

but we can't do include_keys with any existing processors.

This would make sense as a new processor. Basically, it would remove everything except for...

I am not sure if this will help, consider it as a formatter rather than a processor (as we are not changing the event), instead, we just print it in the way it's expected in the sink.

I don't think this is quite accurate. This proposal is to change the actual data which is saved. And there is no general reason why include/exclude is the only one. So this list of pseudo-processors in front of sinks can grow without bound.

Codecs are the concept for sinks to determine the formatting within the sink.

Something like we may print json record as a csv record and store that in S3.

Again, this is what codecs are meant to provide. The opensearch sink does not support codecs because OpenSearch has a well-defined format that shouldn't vary.

@sharraj
Copy link

sharraj commented Jul 16, 2023

@dlvenable we need to move forward with this feature. This provides value with out causing memory explosion of sub-pipelines and that is the main intent here for customers with large workloads where Out-of-memory (OOM) situations can get triggered. Let's table this conversation here and help with the code review so that we can bring in this capability asked by critical customers.

@dlvenable dlvenable added this to the v2.4 milestone Jul 31, 2023
@dlvenable dlvenable reopened this Aug 2, 2023
@dlvenable
Copy link
Member

@kkondaka , @graytaylor0 , @daixba

I raised this question in the latest PR on this work, but want to bring it up here.

Where should this configure exist?

  • Is a part of the core model and all sinks must implement?
  • Is it a non-core model, but we use this convention?
  • If it is a non-core model, how do codec-based sinks implement it? On the codec or on the sink?

If we implement on the core model and a sink or codec doesn't implement, then there is some behavioral difference which is not apparent until runtime.

@dlvenable
Copy link
Member

@daixba , Is there any work left for this, or should we close this issue?

@daixba
Copy link
Contributor Author

daixba commented Aug 11, 2023

I am good to close this one.

@dlvenable
Copy link
Member

Resolved by #2989 and #3122.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Archived in project
7 participants