Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Avro schema shuts down pipeline when field is not defined in the schema #3161

Closed
engechas opened this issue Aug 14, 2023 · 2 comments
Closed
Assignees
Labels
bug Something isn't working
Milestone

Comments

@engechas
Copy link
Collaborator

Describe the bug
When the avro schema and the actual data don't match, a RuntimeException is thrown which shuts down the pipeline

2023-08-14T17:19:53.568 [log-pipeline-sink-worker-2-thread-2] ERROR org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor - Pipeline [log-pipeline] process worker encountered a fatal exception, cannot proceed further
java.util.concurrent.ExecutionException: java.lang.RuntimeException: The event has a key ('s3') which is not included in the schema.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
    at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
    at org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor.afterExecute(PipelineThreadPoolExecutor.java:70) ~[data-prepper-core-2.4.0-SNAPSHOT.jar:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1129) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.RuntimeException: The event has a key ('s3') which is not included in the schema.
    at org.opensearch.dataprepper.plugins.codec.avro.AvroOutputCodec.buildAvroRecord(AvroOutputCodec.java:181) ~[avro-codecs-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.plugins.codec.avro.AvroOutputCodec.writeEvent(AvroOutputCodec.java:152) ~[avro-codecs-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.plugins.sink.s3.S3SinkService.output(S3SinkService.java:115) ~[s3-sink-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.plugins.sink.s3.S3Sink.doOutput(S3Sink.java:116) ~[s3-sink-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.model.sink.AbstractSink.lambda$output$0(AbstractSink.java:64) ~[data-prepper-api-2.4.0-SNAPSHOT.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.10.5.jar:1.10.5]
    at org.opensearch.dataprepper.model.sink.AbstractSink.output(AbstractSink.java:64) ~[data-prepper-api-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$5(Pipeline.java:336) ~[data-prepper-core-2.4.0-SNAPSHOT.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
    ... 2 more

Exception comes from here: https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java#L181

Potential solutions for undefined schema keys:

Automap to a type and include
Ignore the field (essentially adds to exclude_keys list)

To Reproduce
Steps to reproduce the behavior:
Create an S3 sink pipeline using avro schema as the output codec. Ingest data with one or more field that does not match the defined schema.

Expected behavior
The pipeline should not shutdown (debatable what the correct way to handle the mismatch is)

@engechas engechas added bug Something isn't working untriaged labels Aug 14, 2023
@dlvenable
Copy link
Member

Perhaps the schema should define exactly what is sent. This could be considered data loss. However, I think it is not data loss because by defining the schema the user is stating what data matters to him.

So, I think the right solution is to use only the fields as defined by the schema. It also seems right to ignore the include/exclude keys directive in this case.

@daixba , Any thoughts on this approach?

@dlvenable dlvenable added this to the v2.4 milestone Aug 16, 2023
@dlvenable dlvenable self-assigned this Aug 16, 2023
@dlvenable
Copy link
Member

Fixed by supporting the exclude_keys and include_keys which were corrected in #3181 and #3197.

For now, a user will need to manipulate the keys to match the schema.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Archived in project
Development

No branches or pull requests

2 participants