From d0f251c7c32a307ba06947e8c52f20cdc123e1a0 Mon Sep 17 00:00:00 2001 From: David Venable Date: Fri, 18 Aug 2023 16:51:02 -0500 Subject: [PATCH] Correctly add compression extensions to the generated S3 sink keys. If compression is internal, does not utilize. Resolves #3158. Signed-off-by: David Venable --- .../plugins/sink/s3/S3SinkServiceIT.java | 4 +- .../plugins/sink/s3/ExtensionProvider.java | 5 ++ .../plugins/sink/s3/KeyGenerator.java | 9 ++- .../dataprepper/plugins/sink/s3/S3Sink.java | 10 ++- .../sink/s3/StandardExtensionProvider.java | 33 +++++++++ .../s3/compression/CompressionOption.java | 15 ++-- .../plugins/sink/s3/KeyGeneratorTest.java | 18 ++++- .../s3/StandardExtensionProviderTest.java | 72 +++++++++++++++++++ .../s3/compression/CompressionOptionTest.java | 35 +++++++++ 9 files changed, 184 insertions(+), 17 deletions(-) create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ExtensionProvider.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/StandardExtensionProvider.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/StandardExtensionProviderTest.java diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java index ef01125aff..562194e8cd 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java @@ -173,7 +173,7 @@ void verify_flushed_object_count_into_s3_bucket() { void configureNewLineCodec() { codec = new NdjsonOutputCodec(ndjsonOutputConfig); - keyGenerator = new KeyGenerator(s3SinkConfig, codec); + keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE)); } @Test @@ -356,7 +356,7 @@ private void configureParquetCodec() { parquetOutputCodecConfig.setSchema(parseSchema().toString()); parquetOutputCodecConfig.setPathPrefix(PATH_PREFIX); codec = new ParquetOutputCodec(parquetOutputCodecConfig); - keyGenerator = new KeyGenerator(s3SinkConfig, codec); + keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE)); } private Collection> getRecordList() { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ExtensionProvider.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ExtensionProvider.java new file mode 100644 index 0000000000..6ec71f3b73 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ExtensionProvider.java @@ -0,0 +1,5 @@ +package org.opensearch.dataprepper.plugins.sink.s3; + +public interface ExtensionProvider { + String getExtension(); +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java index 2087554c7c..5281921cee 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGenerator.java @@ -5,16 +5,15 @@ package org.opensearch.dataprepper.plugins.sink.s3; -import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey; public class KeyGenerator { private final S3SinkConfig s3SinkConfig; - private final OutputCodec outputCodec; + private final ExtensionProvider extensionProvider; - public KeyGenerator(S3SinkConfig s3SinkConfig, OutputCodec outputCodec) { + public KeyGenerator(S3SinkConfig s3SinkConfig, ExtensionProvider extensionProvider) { this.s3SinkConfig = s3SinkConfig; - this.outputCodec = outputCodec; + this.extensionProvider = extensionProvider; } /** @@ -24,7 +23,7 @@ public KeyGenerator(S3SinkConfig s3SinkConfig, OutputCodec outputCodec) { */ String generateKey() { final String pathPrefix = ObjectKey.buildingPathPrefix(s3SinkConfig); - final String namePattern = ObjectKey.objectFileName(s3SinkConfig, outputCodec.getExtension()); + final String namePattern = ObjectKey.objectFileName(s3SinkConfig, extensionProvider.getExtension()); return (!pathPrefix.isEmpty()) ? pathPrefix + namePattern : namePattern; } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index 351ebcf0e1..bb03f8be2e 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -22,6 +22,7 @@ import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; @@ -64,12 +65,15 @@ public S3Sink(final PluginSetting pluginSetting, sinkInitialized = Boolean.FALSE; final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier); - KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, codec); final BufferFactory innerBufferFactory = s3SinkConfig.getBufferType().getBufferFactory(); - final CompressionEngine compressionEngine = s3SinkConfig.getCompression().getCompressionEngine(); + CompressionOption compressionOption = s3SinkConfig.getCompression(); + final CompressionEngine compressionEngine = compressionOption.getCompressionEngine(); bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, codec); - S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), s3SinkConfig.getCompression()); + ExtensionProvider extensionProvider = StandardExtensionProvider.create(codec, compressionOption); + KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, extensionProvider); + + S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption); s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3OutputCodecContext, s3Client, keyGenerator, pluginMetrics); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/StandardExtensionProvider.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/StandardExtensionProvider.java new file mode 100644 index 0000000000..e23cd386ed --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/StandardExtensionProvider.java @@ -0,0 +1,33 @@ +package org.opensearch.dataprepper.plugins.sink.s3; + +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; + +class StandardExtensionProvider implements ExtensionProvider { + private final String extension; + + static ExtensionProvider create(OutputCodec outputCodec, CompressionOption compressionOption) { + + String codecExtension = outputCodec.getExtension(); + + if(outputCodec.isCompressionInternal()) { + return new StandardExtensionProvider(codecExtension); + } + + String extension = compressionOption.getExtension() + .map(compressionExtension -> codecExtension + "." + compressionExtension) + .orElse(codecExtension); + + + return new StandardExtensionProvider(extension); + } + + private StandardExtensionProvider(String extension) { + this.extension = extension; + } + + @Override + public String getExtension() { + return extension; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java index 86715f806d..98c4eceaa6 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java @@ -9,13 +9,14 @@ import java.util.Arrays; import java.util.Map; +import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; public enum CompressionOption { - NONE("none", NoneCompressionEngine::new), - GZIP("gzip", GZipCompressionEngine::new), - SNAPPY("snappy", SnappyCompressionEngine::new); + NONE("none", null, NoneCompressionEngine::new), + GZIP("gzip", "gz", GZipCompressionEngine::new), + SNAPPY("snappy", "snappy", SnappyCompressionEngine::new); private static final Map OPTIONS_MAP = Arrays.stream(CompressionOption.values()) .collect(Collectors.toMap( @@ -25,9 +26,11 @@ public enum CompressionOption { private final String option; + private final String extension; private final Supplier compressionEngineSupplier; - CompressionOption(final String option, final Supplier compressionEngineSupplier) { + CompressionOption(final String option, String extension, final Supplier compressionEngineSupplier) { this.option = option.toLowerCase(); + this.extension = extension; this.compressionEngineSupplier = compressionEngineSupplier; } @@ -39,6 +42,10 @@ public String getOption() { return option; } + public Optional getExtension() { + return Optional.ofNullable(extension); + } + @JsonCreator public static CompressionOption fromOptionValue(final String option) { return OPTIONS_MAP.get(option); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGeneratorTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGeneratorTest.java index f080163914..8b9b5f99ed 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGeneratorTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/KeyGeneratorTest.java @@ -10,7 +10,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import java.time.LocalDateTime; @@ -18,8 +17,10 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.TimeZone; +import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.when; @@ -32,7 +33,7 @@ class KeyGeneratorTest { private S3SinkConfig s3SinkConfig; @Mock - private OutputCodec outputCodec; + private ExtensionProvider extensionProvider; @Mock private ObjectKeyOptions objectKeyOptions; @@ -44,7 +45,7 @@ void setUp() { } private KeyGenerator createObjectUnderTest() { - return new KeyGenerator(s3SinkConfig, outputCodec); + return new KeyGenerator(s3SinkConfig, extensionProvider); } @Test @@ -74,4 +75,15 @@ void test_generateKey_with_date_prefix() { assertThat(key, true); assertThat(key, key.contains(pathPrefix + dateString)); } + + @Test + void generateKey_ends_with_extension() { + String extension = UUID.randomUUID().toString(); + when(extensionProvider.getExtension()).thenReturn(extension); + String pathPrefix = "events/"; + when(s3SinkConfig.getObjectKeyOptions().getPathPrefix()).thenReturn(pathPrefix); + String key = createObjectUnderTest().generateKey(); + assertThat(key, notNullValue()); + assertThat(key, key.endsWith("." + extension)); + } } \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/StandardExtensionProviderTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/StandardExtensionProviderTest.java new file mode 100644 index 0000000000..f0580c6a48 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/StandardExtensionProviderTest.java @@ -0,0 +1,72 @@ +package org.opensearch.dataprepper.plugins.sink.s3; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; + +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class StandardExtensionProviderTest { + + @Mock + private OutputCodec outputCodec; + + @Mock + private CompressionOption compressionOption; + + private String codecExtension; + + @BeforeEach + void setUp() { + codecExtension = UUID.randomUUID().toString(); + } + + @Test + void getExtension_returns_extension_of_codec_when_compression_internal() { + when(outputCodec.getExtension()).thenReturn(codecExtension); + when(outputCodec.isCompressionInternal()).thenReturn(true); + + ExtensionProvider extensionProvider = StandardExtensionProvider.create(outputCodec, compressionOption); + assertThat(extensionProvider, notNullValue()); + assertThat(extensionProvider.getExtension(), equalTo(codecExtension)); + + verify(compressionOption, never()).getExtension(); + } + + @Test + void getExtension_returns_extension_of_codec_compression_has_no_extension() { + when(outputCodec.getExtension()).thenReturn(codecExtension); + when(compressionOption.getExtension()).thenReturn(Optional.empty()); + + ExtensionProvider extensionProvider = StandardExtensionProvider.create(outputCodec, compressionOption); + assertThat(extensionProvider, notNullValue()); + assertThat(extensionProvider.getExtension(), equalTo(codecExtension)); + + verify(compressionOption).getExtension(); + } + + @Test + void getExtension_returns_extension_of_codec_compression_has_extension() { + String compressionExtension = UUID.randomUUID().toString(); + when(outputCodec.getExtension()).thenReturn(codecExtension); + when(compressionOption.getExtension()).thenReturn(Optional.of(compressionExtension)); + + ExtensionProvider extensionProvider = StandardExtensionProvider.create(outputCodec, compressionOption); + assertThat(extensionProvider, notNullValue()); + assertThat(extensionProvider.getExtension(), equalTo(codecExtension + "." + compressionExtension)); + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOptionTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOptionTest.java index 15a13b31db..d869d30d25 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOptionTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOptionTest.java @@ -12,10 +12,12 @@ import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.EnumSource; +import java.util.Optional; import java.util.stream.Stream; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -38,6 +40,29 @@ void getCompressionEngine_returns_expected_engine_type(final CompressionOption o assertThat(option.getCompressionEngine(), instanceOf(expectedEngineType)); } + @ParameterizedTest + @EnumSource(CompressionOption.class) + void getExtension_returns_non_null_Optional(final CompressionOption option) { + assertThat(option.getExtension(), notNullValue()); + } + + @ParameterizedTest + @ArgumentsSource(OptionToExpectedExtension.class) + void getExtension_returns_expected_extension(final CompressionOption option, final String expectedExtension) { + Optional extension = option.getExtension(); + assertThat(extension, notNullValue()); + assertThat(extension.isEmpty(), equalTo(false)); + assertThat(extension.get(), equalTo(expectedExtension)); + } + + @ParameterizedTest + @EnumSource(value = CompressionOption.class, names = {"NONE"}) + void getExtension_returns_empty_Optional_when_no_extension(final CompressionOption option) { + Optional extension = option.getExtension(); + assertThat(extension, notNullValue()); + assertThat(extension.isEmpty(), equalTo(true)); + } + static class OptionToExpectedEngine implements ArgumentsProvider { @Override public Stream provideArguments(final ExtensionContext context) { @@ -48,4 +73,14 @@ public Stream provideArguments(final ExtensionContext conte ); } } + + static class OptionToExpectedExtension implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments(CompressionOption.GZIP, "gz"), + arguments(CompressionOption.SNAPPY, "snappy") + ); + } + } } \ No newline at end of file