diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java index afb9b975fe..a38224a3fb 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java @@ -74,6 +74,8 @@ default boolean isCompressionInternal() { return false; } + default void validateAgainstCodecContext(OutputCodecContext outputCodecContext) { } + default Event addTagsToEvent(Event event, String tagsTargetKey) throws JsonProcessingException { String eventJsonString = event.jsonBuilder().includeTags(tagsTargetKey).toJsonString(); Map eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() { diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java index 193f0ff196..76426262f6 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertNotEquals; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoInteractions; public class OutputCodecTest { @Test @@ -32,6 +33,17 @@ void isCompressionInternal_returns_false() { assertThat(objectUnderTest.isCompressionInternal(), equalTo(false)); } + @Test + void validateAgainstCodecContext_does_not_throw_or_interact_with_outputCodecContext() { + OutputCodec objectUnderTest = mock(OutputCodec.class, InvocationOnMock::callRealMethod); + + OutputCodecContext outputCodecContext = mock(OutputCodecContext.class); + + objectUnderTest.validateAgainstCodecContext(outputCodecContext); + + verifyNoInteractions(outputCodecContext); + } + @Test public void testWriteMetrics() throws JsonProcessingException { OutputCodec outputCodec = new OutputCodec() { diff --git a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java index ef6d9bce7d..82f7056b0e 100644 --- a/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java +++ b/data-prepper-plugins/avro-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodec.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,6 +108,17 @@ public String getExtension() { return AVRO; } + @Override + public void validateAgainstCodecContext(OutputCodecContext outputCodecContext) { + if (config.isAutoSchema()) + return; + + if ((outputCodecContext.getIncludeKeys() != null && !outputCodecContext.getIncludeKeys().isEmpty()) || + (outputCodecContext.getExcludeKeys() != null && !outputCodecContext.getExcludeKeys().isEmpty())) { + throw new InvalidPluginConfigurationException("Providing a user-defined schema and using sink include or exclude keys is not an allowed configuration."); + } + } + Schema parseSchema(final String schemaString) { try { Objects.requireNonNull(schemaString); diff --git a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java index 2ec27b73ab..e3b030c37f 100644 --- a/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java +++ b/data-prepper-plugins/avro-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/avro/AvroOutputCodecTest.java @@ -12,11 +12,13 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import java.io.ByteArrayInputStream; @@ -41,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; public class AvroOutputCodecTest { private static final String EXPECTED_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"Event\",\"fields\":" + @@ -274,6 +277,117 @@ public void testInlineSchemaBuilder() throws IOException { assertThat(actualSchema, equalTo(expectedSchema)); } + @Nested + class ValidateWithSchema { + private OutputCodecContext codecContext; + private List keys; + + @BeforeEach + void setUp() { + config.setSchema(createStandardSchemaNullable().toString()); + codecContext = mock(OutputCodecContext.class); + keys = List.of(UUID.randomUUID().toString()); + } + + @Test + void validateAgainstCodecContext_throws_when_user_defined_schema_and_includeKeys_non_empty() { + when(codecContext.getIncludeKeys()).thenReturn(keys); + + AvroOutputCodec objectUnderTest = createObjectUnderTest(); + assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.validateAgainstCodecContext(codecContext)); + } + + @Test + void validateAgainstCodecContext_throws_when_user_defined_schema_and_excludeKeys_non_empty() { + when(codecContext.getExcludeKeys()).thenReturn(keys); + + AvroOutputCodec objectUnderTest = createObjectUnderTest(); + assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.validateAgainstCodecContext(codecContext)); + } + + @Test + void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_includeKeys_isNull() { + when(codecContext.getIncludeKeys()).thenReturn(null); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_includeKeys_isEmpty() { + when(codecContext.getIncludeKeys()).thenReturn(Collections.emptyList()); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_excludeKeys_isNull() { + when(codecContext.getExcludeKeys()).thenReturn(null); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_excludeKeys_isEmpty() { + when(codecContext.getExcludeKeys()).thenReturn(Collections.emptyList()); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + } + + @Nested + class ValidateWithAutoSchema { + private OutputCodecContext codecContext; + private List keys; + + @BeforeEach + void setUp() { + config.setAutoSchema(true); + codecContext = mock(OutputCodecContext.class, withSettings().lenient()); + keys = List.of(UUID.randomUUID().toString()); + } + + @Test + void validateAgainstCodecContext_is_ok_when_auto_schema_and_includeKeys_non_empty() { + when(codecContext.getIncludeKeys()).thenReturn(keys); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_auto_schema_and_excludeKeys_non_empty() { + when(codecContext.getExcludeKeys()).thenReturn(keys); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_auto_schema_and_includeKeys_isNull() { + when(codecContext.getIncludeKeys()).thenReturn(null); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_auto_schema_and_includeKeys_isEmpty() { + when(codecContext.getIncludeKeys()).thenReturn(Collections.emptyList()); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_auto_schema_and_excludeKeys_isNull() { + when(codecContext.getExcludeKeys()).thenReturn(null); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_auto_schema_and_excludeKeys_isEmpty() { + when(codecContext.getExcludeKeys()).thenReturn(Collections.emptyList()); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + } private static Event createEventRecord(final Map eventData) { return JacksonLog.builder().withData(eventData).build(); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java index 07e2572759..3cd99574c1 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodec.java @@ -19,6 +19,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.plugins.sink.s3.S3OutputCodecContext; import org.opensearch.dataprepper.plugins.sink.s3.codec.BufferedCodec; @@ -123,6 +124,17 @@ public String getExtension() { return PARQUET; } + @Override + public void validateAgainstCodecContext(OutputCodecContext outputCodecContext) { + if (config.isAutoSchema()) + return; + + if ((outputCodecContext.getIncludeKeys() != null && !outputCodecContext.getIncludeKeys().isEmpty()) || + (outputCodecContext.getExcludeKeys() != null && !outputCodecContext.getExcludeKeys().isEmpty())) { + throw new InvalidPluginConfigurationException("Providing a user-defined schema and using sink include or exclude keys is not an allowed configuration."); + } + } + static Schema parseSchema(final String schemaString) { return new Schema.Parser().parse(schemaString); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index c33bdcd9de..5266cdb36f 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -86,6 +86,8 @@ public S3Sink(final PluginSetting pluginSetting, S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption); + codec.validateAgainstCodecContext(s3OutputCodecContext); + s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3OutputCodecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics); } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java index 2b694d76f0..cd55e8e646 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetOutputCodecTest.java @@ -23,6 +23,7 @@ import org.apache.parquet.schema.Type; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; @@ -32,6 +33,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.plugins.fs.LocalFilePositionOutputStream; import org.opensearch.dataprepper.plugins.sink.s3.S3OutputCodecContext; import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; @@ -62,6 +65,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; @ExtendWith(MockitoExtension.class) public class ParquetOutputCodecTest { @@ -351,6 +355,119 @@ void getSize_returns_non_zero_after_close_and_new_writes(int writeCount) throws assertThat(actualSizeOptional.get(), lessThanOrEqualTo(roughMultiplierMax * writeCount)); } + + @Nested + class ValidateWithSchema { + private OutputCodecContext codecContext; + private List keys; + + @BeforeEach + void setUp() { + config.setSchema(createStandardSchemaNullable().toString()); + codecContext = mock(OutputCodecContext.class); + keys = List.of(UUID.randomUUID().toString()); + } + + @Test + void validateAgainstCodecContext_throws_when_user_defined_schema_and_includeKeys_non_empty() { + when(codecContext.getIncludeKeys()).thenReturn(keys); + + ParquetOutputCodec objectUnderTest = createObjectUnderTest(); + assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.validateAgainstCodecContext(codecContext)); + } + + @Test + void validateAgainstCodecContext_throws_when_user_defined_schema_and_excludeKeys_non_empty() { + when(codecContext.getExcludeKeys()).thenReturn(keys); + + ParquetOutputCodec objectUnderTest = createObjectUnderTest(); + assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.validateAgainstCodecContext(codecContext)); + } + + @Test + void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_includeKeys_isNull() { + when(codecContext.getIncludeKeys()).thenReturn(null); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_includeKeys_isEmpty() { + when(codecContext.getIncludeKeys()).thenReturn(Collections.emptyList()); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_excludeKeys_isNull() { + when(codecContext.getExcludeKeys()).thenReturn(null); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_excludeKeys_isEmpty() { + when(codecContext.getExcludeKeys()).thenReturn(Collections.emptyList()); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + } + + @Nested + class ValidateWithAutoSchema { + private OutputCodecContext codecContext; + private List keys; + + @BeforeEach + void setUp() { + config.setAutoSchema(true); + codecContext = mock(OutputCodecContext.class, withSettings().lenient()); + keys = List.of(UUID.randomUUID().toString()); + } + + @Test + void validateAgainstCodecContext_is_ok_when_auto_schema_and_includeKeys_non_empty() { + when(codecContext.getIncludeKeys()).thenReturn(keys); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_auto_schema_and_excludeKeys_non_empty() { + when(codecContext.getExcludeKeys()).thenReturn(keys); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_auto_schema_and_includeKeys_isNull() { + when(codecContext.getIncludeKeys()).thenReturn(null); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_auto_schema_and_includeKeys_isEmpty() { + when(codecContext.getIncludeKeys()).thenReturn(Collections.emptyList()); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_auto_schema_and_excludeKeys_isNull() { + when(codecContext.getExcludeKeys()).thenReturn(null); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + + @Test + void validateAgainstCodecContext_is_ok_when_auto_schema_and_excludeKeys_isEmpty() { + when(codecContext.getExcludeKeys()).thenReturn(Collections.emptyList()); + + createObjectUnderTest().validateAgainstCodecContext(codecContext); + } + } + private static Event createEventRecord(final Map eventData) { return JacksonLog.builder().withData(eventData).build(); } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java index 75ae2dde1c..553f96d2fb 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java @@ -8,6 +8,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.configuration.PluginModel; @@ -15,6 +16,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; @@ -28,14 +30,19 @@ import java.util.ArrayList; import java.util.Collection; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class S3SinkTest { - public static final int MAX_EVENTS = 100; public static final int MAX_RETRIES = 5; public static final String BUCKET_NAME = "dataprepper"; @@ -51,6 +58,7 @@ class S3SinkTest { private PluginFactory pluginFactory; private AwsCredentialsSupplier awsCredentialsSupplier; private SinkContext sinkContext; + private OutputCodec codec; @BeforeEach void setUp() { @@ -59,7 +67,7 @@ void setUp() { sinkContext = mock(SinkContext.class); ThresholdOptions thresholdOptions = mock(ThresholdOptions.class); AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); - OutputCodec codec = mock(OutputCodec.class); + codec = mock(OutputCodec.class); ObjectKeyOptions objectKeyOptions = mock(ObjectKeyOptions.class); pluginSetting = mock(PluginSetting.class); PluginModel pluginModel = mock(PluginModel.class); @@ -111,4 +119,26 @@ void test_doOutput_with_empty_records() { Collection> records = new ArrayList<>(); s3Sink.doOutput(records); } + + @Test + void constructor_should_call_codec_validateAgainstCodecContext_with_context() { + createObjectUnderTest(); + + ArgumentCaptor outputCodecContextArgumentCaptor = ArgumentCaptor.forClass(OutputCodecContext.class); + verify(codec).validateAgainstCodecContext(outputCodecContextArgumentCaptor.capture()); + + OutputCodecContext actualCodecContext = outputCodecContextArgumentCaptor.getValue(); + + assertThat(actualCodecContext, instanceOf(S3OutputCodecContext.class)); + } + + @Test + void constructor_should_throw_if_codec_validateAgainstCodecContext_throws() { + RuntimeException codecException = mock(RuntimeException.class); + + doThrow(codecException).when(codec).validateAgainstCodecContext(any()); + + RuntimeException actualException = assertThrows(RuntimeException.class, () -> createObjectUnderTest()); + assertThat(actualException, sameInstance(codecException)); + } }