Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Exception writing S3 file - S3 Sink + Avro Codec #3160

Closed
engechas opened this issue Aug 14, 2023 · 2 comments · Fixed by #3170
Closed

[BUG] Exception writing S3 file - S3 Sink + Avro Codec #3160

engechas opened this issue Aug 14, 2023 · 2 comments · Fixed by #3170
Assignees
Labels
bug Something isn't working
Milestone

Comments

@engechas
Copy link
Collaborator

Describe the bug
The S3 sink throws this exception part-way through processing the data:

2023-08-14T20:31:16.966 [log-pipeline-sink-worker-2-thread-2] ERROR org.opensearch.dataprepper.plugins.sink.s3.S3SinkService - Exception while write event into buffer :
java.io.IOException: Cannot write more data, the end of the compressed data stream has been reached
    at org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream.write(GzipCompressorOutputStream.java:178) ~[commons-compress-1.23.0.jar:1.23.0]
    at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:476) ~[avro-1.11.1.jar:1.11.1]
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81) ~[?:?]
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142) ~[?:?]
    at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream.flush(DataFileWriter.java:493) ~[avro-1.11.1.jar:1.11.1]
    at org.apache.avro.io.DirectBinaryEncoder.flush(DirectBinaryEncoder.java:63) ~[avro-1.11.1.jar:1.11.1]
    at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:175) ~[avro-1.11.1.jar:1.11.1]
    at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:145) ~[avro-1.11.1.jar:1.11.1]
    at org.opensearch.dataprepper.plugins.codec.avro.AvroOutputCodec.start(AvroOutputCodec.java:75) ~[avro-codecs-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.plugins.sink.s3.S3SinkService.output(S3SinkService.java:111) ~[s3-sink-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.plugins.sink.s3.S3Sink.doOutput(S3Sink.java:116) ~[s3-sink-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.model.sink.AbstractSink.lambda$output$0(AbstractSink.java:64) ~[data-prepper-api-2.4.0-SNAPSHOT.jar:?]
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.10.5.jar:1.10.5]
    at org.opensearch.dataprepper.model.sink.AbstractSink.output(AbstractSink.java:64) ~[data-prepper-api-2.4.0-SNAPSHOT.jar:?]
    at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$5(Pipeline.java:336) ~[data-prepper-core-2.4.0-SNAPSHOT.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]

I am not sure if all of the data was processed before this or not. It did not generate a file after this exception was encountered and prevented the E2E ack callback from executing.

To Reproduce
Steps to reproduce the behavior:
Sink config:

sink:
    - s3:
        aws:
          region: "us-west-2"
          sts_role_arn: "<my role>"
        bucket: "my-sink-bucket"
        object_key:
          path_prefix: "s3-sink"
        threshold:
          event_collect_timeout: 600s
        compression: "gzip"
        codec:
          avro:
            schema: >
              {
                "type" : "record",
                "namespace" : "org.opensearch.dataprepper.examples",
                "name" : "VpcFlowLog",
                "fields" : [
                  { "name" : "version", "type" : ["null", "string"]},
                  { "name" : "srcport", "type": ["null", "int"]},
                  { "name" : "dstport", "type": ["null", "int"]},
                  { "name" : "accountId", "type" : ["null", "string"]},
                  { "name" : "interfaceId", "type" : ["null", "string"]},
                  { "name" : "srcaddr", "type" : ["null", "string"]},
                  { "name" : "dstaddr", "type" : ["null", "string"]},
                  { "name" : "start", "type": ["null", "int"]},
                  { "name" : "end", "type": ["null", "int"]},
                  { "name" : "protocol", "type": ["null", "int"]},
                  { "name" : "packets", "type": ["null", "int"]},
                  { "name" : "bytes", "type": ["null", "int"]},
                  { "name" : "action", "type": ["null", "string"]},
                  { "name" : "logStatus", "type" : ["null", "string"]}
                ]
              }
@engechas engechas added bug Something isn't working untriaged labels Aug 14, 2023
@engechas
Copy link
Collaborator Author

This is seen with ndjson as well and causing data loss:
image

@engechas
Copy link
Collaborator Author

Exception from ndjson

2023-08-14T23:03:34.476 [log-pipeline-sink-worker-2-thread-1] ERROR org.opensearch.dataprepper.plugins.sink.s3.S3SinkService - Exception while write event into buffer :
java.io.IOException: Cannot write more data, the end of the compressed data stream has been reached
	at org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream.write(GzipCompressorOutputStream.java:178) ~[commons-compress-1.23.0.jar:1.23.0]
	at org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream.write(GzipCompressorOutputStream.java:167) ~[commons-compress-1.23.0.jar:1.23.0]
	at org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputCodec.writeEvent(NdjsonOutputCodec.java:51) ~[parse-json-processor-2.4.0-SNAPSHOT.jar:?]
	at org.opensearch.dataprepper.plugins.sink.s3.S3SinkService.output(S3SinkService.java:115) ~[s3-sink-2.4.0-SNAPSHOT.jar:?]
	at org.opensearch.dataprepper.plugins.sink.s3.S3Sink.doOutput(S3Sink.java:116) ~[s3-sink-2.4.0-SNAPSHOT.jar:?]
	at org.opensearch.dataprepper.model.sink.AbstractSink.lambda$output$0(AbstractSink.java:64) ~[data-prepper-api-2.4.0-SNAPSHOT.jar:?]
	at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141) ~[micrometer-core-1.10.5.jar:1.10.5]
	at org.opensearch.dataprepper.model.sink.AbstractSink.output(AbstractSink.java:64) ~[data-prepper-api-2.4.0-SNAPSHOT.jar:?]
	at org.opensearch.dataprepper.pipeline.Pipeline.lambda$publishToSinks$5(Pipeline.java:336) ~[data-prepper-core-2.4.0-SNAPSHOT.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]

@dlvenable dlvenable self-assigned this Aug 16, 2023
@dlvenable dlvenable added this to the v2.4 milestone Aug 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

2 participants