Skip to content

Commit

Permalink
Correctly add compression extensions to the generated S3 sink keys. I…
Browse files Browse the repository at this point in the history
…f compression is internal, does not utilize. Resolves #3158.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Aug 18, 2023
1 parent f61604b commit d0f251c
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void verify_flushed_object_count_into_s3_bucket() {

void configureNewLineCodec() {
codec = new NdjsonOutputCodec(ndjsonOutputConfig);
keyGenerator = new KeyGenerator(s3SinkConfig, codec);
keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE));
}

@Test
Expand Down Expand Up @@ -356,7 +356,7 @@ private void configureParquetCodec() {
parquetOutputCodecConfig.setSchema(parseSchema().toString());
parquetOutputCodecConfig.setPathPrefix(PATH_PREFIX);
codec = new ParquetOutputCodec(parquetOutputCodecConfig);
keyGenerator = new KeyGenerator(s3SinkConfig, codec);
keyGenerator = new KeyGenerator(s3SinkConfig, StandardExtensionProvider.create(codec, CompressionOption.NONE));
}

private Collection<Record<Event>> getRecordList() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opensearch.dataprepper.plugins.sink.s3;

public interface ExtensionProvider {
String getExtension();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@

package org.opensearch.dataprepper.plugins.sink.s3;

import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey;

public class KeyGenerator {
private final S3SinkConfig s3SinkConfig;
private final OutputCodec outputCodec;
private final ExtensionProvider extensionProvider;

public KeyGenerator(S3SinkConfig s3SinkConfig, OutputCodec outputCodec) {
public KeyGenerator(S3SinkConfig s3SinkConfig, ExtensionProvider extensionProvider) {
this.s3SinkConfig = s3SinkConfig;
this.outputCodec = outputCodec;
this.extensionProvider = extensionProvider;
}

/**
Expand All @@ -24,7 +23,7 @@ public KeyGenerator(S3SinkConfig s3SinkConfig, OutputCodec outputCodec) {
*/
String generateKey() {
final String pathPrefix = ObjectKey.buildingPathPrefix(s3SinkConfig);
final String namePattern = ObjectKey.objectFileName(s3SinkConfig, outputCodec.getExtension());
final String namePattern = ObjectKey.objectFileName(s3SinkConfig, extensionProvider.getExtension());
return (!pathPrefix.isEmpty()) ? pathPrefix + namePattern : namePattern;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -64,12 +65,15 @@ public S3Sink(final PluginSetting pluginSetting,
sinkInitialized = Boolean.FALSE;

final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier);
KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, codec);
final BufferFactory innerBufferFactory = s3SinkConfig.getBufferType().getBufferFactory();
final CompressionEngine compressionEngine = s3SinkConfig.getCompression().getCompressionEngine();
CompressionOption compressionOption = s3SinkConfig.getCompression();
final CompressionEngine compressionEngine = compressionOption.getCompressionEngine();
bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, codec);

S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), s3SinkConfig.getCompression());
ExtensionProvider extensionProvider = StandardExtensionProvider.create(codec, compressionOption);
KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, extensionProvider);

S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption);

s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3OutputCodecContext, s3Client, keyGenerator, pluginMetrics);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.opensearch.dataprepper.plugins.sink.s3;

import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;

class StandardExtensionProvider implements ExtensionProvider {
private final String extension;

static ExtensionProvider create(OutputCodec outputCodec, CompressionOption compressionOption) {

String codecExtension = outputCodec.getExtension();

if(outputCodec.isCompressionInternal()) {
return new StandardExtensionProvider(codecExtension);
}

String extension = compressionOption.getExtension()
.map(compressionExtension -> codecExtension + "." + compressionExtension)
.orElse(codecExtension);


return new StandardExtensionProvider(extension);
}

private StandardExtensionProvider(String extension) {
this.extension = extension;
}

@Override
public String getExtension() {
return extension;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public enum CompressionOption {
NONE("none", NoneCompressionEngine::new),
GZIP("gzip", GZipCompressionEngine::new),
SNAPPY("snappy", SnappyCompressionEngine::new);
NONE("none", null, NoneCompressionEngine::new),
GZIP("gzip", "gz", GZipCompressionEngine::new),
SNAPPY("snappy", "snappy", SnappyCompressionEngine::new);

private static final Map<String, CompressionOption> OPTIONS_MAP = Arrays.stream(CompressionOption.values())
.collect(Collectors.toMap(
Expand All @@ -25,9 +26,11 @@ public enum CompressionOption {

private final String option;

private final String extension;
private final Supplier<CompressionEngine> compressionEngineSupplier;
CompressionOption(final String option, final Supplier<CompressionEngine> compressionEngineSupplier) {
CompressionOption(final String option, String extension, final Supplier<CompressionEngine> compressionEngineSupplier) {
this.option = option.toLowerCase();
this.extension = extension;
this.compressionEngineSupplier = compressionEngineSupplier;
}

Expand All @@ -39,6 +42,10 @@ public String getOption() {
return option;
}

public Optional<String> getExtension() {
return Optional.ofNullable(extension);
}

@JsonCreator
public static CompressionOption fromOptionValue(final String option) {
return OPTIONS_MAP.get(option);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.TimeZone;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.when;

Expand All @@ -32,7 +33,7 @@ class KeyGeneratorTest {
private S3SinkConfig s3SinkConfig;

@Mock
private OutputCodec outputCodec;
private ExtensionProvider extensionProvider;

@Mock
private ObjectKeyOptions objectKeyOptions;
Expand All @@ -44,7 +45,7 @@ void setUp() {
}

private KeyGenerator createObjectUnderTest() {
return new KeyGenerator(s3SinkConfig, outputCodec);
return new KeyGenerator(s3SinkConfig, extensionProvider);
}

@Test
Expand Down Expand Up @@ -74,4 +75,15 @@ void test_generateKey_with_date_prefix() {
assertThat(key, true);
assertThat(key, key.contains(pathPrefix + dateString));
}

@Test
void generateKey_ends_with_extension() {
String extension = UUID.randomUUID().toString();
when(extensionProvider.getExtension()).thenReturn(extension);
String pathPrefix = "events/";
when(s3SinkConfig.getObjectKeyOptions().getPathPrefix()).thenReturn(pathPrefix);
String key = createObjectUnderTest().generateKey();
assertThat(key, notNullValue());
assertThat(key, key.endsWith("." + extension));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.opensearch.dataprepper.plugins.sink.s3;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;

import java.util.Optional;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class StandardExtensionProviderTest {

@Mock
private OutputCodec outputCodec;

@Mock
private CompressionOption compressionOption;

private String codecExtension;

@BeforeEach
void setUp() {
codecExtension = UUID.randomUUID().toString();
}

@Test
void getExtension_returns_extension_of_codec_when_compression_internal() {
when(outputCodec.getExtension()).thenReturn(codecExtension);
when(outputCodec.isCompressionInternal()).thenReturn(true);

ExtensionProvider extensionProvider = StandardExtensionProvider.create(outputCodec, compressionOption);
assertThat(extensionProvider, notNullValue());
assertThat(extensionProvider.getExtension(), equalTo(codecExtension));

verify(compressionOption, never()).getExtension();
}

@Test
void getExtension_returns_extension_of_codec_compression_has_no_extension() {
when(outputCodec.getExtension()).thenReturn(codecExtension);
when(compressionOption.getExtension()).thenReturn(Optional.empty());

ExtensionProvider extensionProvider = StandardExtensionProvider.create(outputCodec, compressionOption);
assertThat(extensionProvider, notNullValue());
assertThat(extensionProvider.getExtension(), equalTo(codecExtension));

verify(compressionOption).getExtension();
}

@Test
void getExtension_returns_extension_of_codec_compression_has_extension() {
String compressionExtension = UUID.randomUUID().toString();
when(outputCodec.getExtension()).thenReturn(codecExtension);
when(compressionOption.getExtension()).thenReturn(Optional.of(compressionExtension));

ExtensionProvider extensionProvider = StandardExtensionProvider.create(outputCodec, compressionOption);
assertThat(extensionProvider, notNullValue());
assertThat(extensionProvider.getExtension(), equalTo(codecExtension + "." + compressionExtension));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.EnumSource;

import java.util.Optional;
import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;

Expand All @@ -38,6 +40,29 @@ void getCompressionEngine_returns_expected_engine_type(final CompressionOption o
assertThat(option.getCompressionEngine(), instanceOf(expectedEngineType));
}

@ParameterizedTest
@EnumSource(CompressionOption.class)
void getExtension_returns_non_null_Optional(final CompressionOption option) {
assertThat(option.getExtension(), notNullValue());
}

@ParameterizedTest
@ArgumentsSource(OptionToExpectedExtension.class)
void getExtension_returns_expected_extension(final CompressionOption option, final String expectedExtension) {
Optional<String> extension = option.getExtension();
assertThat(extension, notNullValue());
assertThat(extension.isEmpty(), equalTo(false));
assertThat(extension.get(), equalTo(expectedExtension));
}

@ParameterizedTest
@EnumSource(value = CompressionOption.class, names = {"NONE"})
void getExtension_returns_empty_Optional_when_no_extension(final CompressionOption option) {
Optional<String> extension = option.getExtension();
assertThat(extension, notNullValue());
assertThat(extension.isEmpty(), equalTo(true));
}

static class OptionToExpectedEngine implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
Expand All @@ -48,4 +73,14 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
);
}
}

static class OptionToExpectedExtension implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
arguments(CompressionOption.GZIP, "gz"),
arguments(CompressionOption.SNAPPY, "snappy")
);
}
}
}

0 comments on commit d0f251c

Please sign in to comment.