Skip to content

Commit

Permalink
Disallow the combination of a user-defined schema and include/exclude…
Browse files Browse the repository at this point in the history
… keys (#3254)

Disallow the combination of a user-defined schema and include/exclude keys in the Parquet/Avro sink codecs. Resolves #3253.

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Aug 25, 2023
1 parent 1bcf9f6 commit 01981a0
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ default boolean isCompressionInternal() {
return false;
}

default void validateAgainstCodecContext(OutputCodecContext outputCodecContext) { }

default Event addTagsToEvent(Event event, String tagsTargetKey) throws JsonProcessingException {
String eventJsonString = event.jsonBuilder().includeTags(tagsTargetKey).toJsonString();
Map<String, Object> eventData = objectMapper.readValue(eventJsonString, new TypeReference<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoInteractions;

public class OutputCodecTest {
@Test
Expand All @@ -32,6 +33,17 @@ void isCompressionInternal_returns_false() {
assertThat(objectUnderTest.isCompressionInternal(), equalTo(false));
}

@Test
void validateAgainstCodecContext_does_not_throw_or_interact_with_outputCodecContext() {
OutputCodec objectUnderTest = mock(OutputCodec.class, InvocationOnMock::callRealMethod);

OutputCodecContext outputCodecContext = mock(OutputCodecContext.class);

objectUnderTest.validateAgainstCodecContext(outputCodecContext);

verifyNoInteractions(outputCodecContext);
}

@Test
public void testWriteMetrics() throws JsonProcessingException {
OutputCodec outputCodec = new OutputCodec() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -107,6 +108,17 @@ public String getExtension() {
return AVRO;
}

@Override
public void validateAgainstCodecContext(OutputCodecContext outputCodecContext) {
if (config.isAutoSchema())
return;

if ((outputCodecContext.getIncludeKeys() != null && !outputCodecContext.getIncludeKeys().isEmpty()) ||
(outputCodecContext.getExcludeKeys() != null && !outputCodecContext.getExcludeKeys().isEmpty())) {
throw new InvalidPluginConfigurationException("Providing a user-defined schema and using sink include or exclude keys is not an allowed configuration.");
}
}

Schema parseSchema(final String schemaString) {
try {
Objects.requireNonNull(schemaString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;

import java.io.ByteArrayInputStream;
Expand All @@ -41,6 +43,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;

public class AvroOutputCodecTest {
private static final String EXPECTED_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"Event\",\"fields\":" +
Expand Down Expand Up @@ -274,6 +277,117 @@ public void testInlineSchemaBuilder() throws IOException {
assertThat(actualSchema, equalTo(expectedSchema));
}

@Nested
class ValidateWithSchema {
private OutputCodecContext codecContext;
private List<String> keys;

@BeforeEach
void setUp() {
config.setSchema(createStandardSchemaNullable().toString());
codecContext = mock(OutputCodecContext.class);
keys = List.of(UUID.randomUUID().toString());
}

@Test
void validateAgainstCodecContext_throws_when_user_defined_schema_and_includeKeys_non_empty() {
when(codecContext.getIncludeKeys()).thenReturn(keys);

AvroOutputCodec objectUnderTest = createObjectUnderTest();
assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.validateAgainstCodecContext(codecContext));
}

@Test
void validateAgainstCodecContext_throws_when_user_defined_schema_and_excludeKeys_non_empty() {
when(codecContext.getExcludeKeys()).thenReturn(keys);

AvroOutputCodec objectUnderTest = createObjectUnderTest();
assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.validateAgainstCodecContext(codecContext));
}

@Test
void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_includeKeys_isNull() {
when(codecContext.getIncludeKeys()).thenReturn(null);

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_includeKeys_isEmpty() {
when(codecContext.getIncludeKeys()).thenReturn(Collections.emptyList());

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_excludeKeys_isNull() {
when(codecContext.getExcludeKeys()).thenReturn(null);

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_excludeKeys_isEmpty() {
when(codecContext.getExcludeKeys()).thenReturn(Collections.emptyList());

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}
}

@Nested
class ValidateWithAutoSchema {
private OutputCodecContext codecContext;
private List<String> keys;

@BeforeEach
void setUp() {
config.setAutoSchema(true);
codecContext = mock(OutputCodecContext.class, withSettings().lenient());
keys = List.of(UUID.randomUUID().toString());
}

@Test
void validateAgainstCodecContext_is_ok_when_auto_schema_and_includeKeys_non_empty() {
when(codecContext.getIncludeKeys()).thenReturn(keys);

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_auto_schema_and_excludeKeys_non_empty() {
when(codecContext.getExcludeKeys()).thenReturn(keys);

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_auto_schema_and_includeKeys_isNull() {
when(codecContext.getIncludeKeys()).thenReturn(null);

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_auto_schema_and_includeKeys_isEmpty() {
when(codecContext.getIncludeKeys()).thenReturn(Collections.emptyList());

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_auto_schema_and_excludeKeys_isNull() {
when(codecContext.getExcludeKeys()).thenReturn(null);

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_auto_schema_and_excludeKeys_isEmpty() {
when(codecContext.getExcludeKeys()).thenReturn(Collections.emptyList());

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}
}

private static Event createEventRecord(final Map<String, Object> eventData) {
return JacksonLog.builder().withData(eventData).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.plugins.sink.s3.S3OutputCodecContext;
import org.opensearch.dataprepper.plugins.sink.s3.codec.BufferedCodec;
Expand Down Expand Up @@ -123,6 +124,17 @@ public String getExtension() {
return PARQUET;
}

@Override
public void validateAgainstCodecContext(OutputCodecContext outputCodecContext) {
if (config.isAutoSchema())
return;

if ((outputCodecContext.getIncludeKeys() != null && !outputCodecContext.getIncludeKeys().isEmpty()) ||
(outputCodecContext.getExcludeKeys() != null && !outputCodecContext.getExcludeKeys().isEmpty())) {
throw new InvalidPluginConfigurationException("Providing a user-defined schema and using sink include or exclude keys is not an allowed configuration.");
}
}

static Schema parseSchema(final String schemaString) {
return new Schema.Parser().parse(schemaString);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public S3Sink(final PluginSetting pluginSetting,

S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption);

codec.validateAgainstCodecContext(s3OutputCodecContext);

s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, s3OutputCodecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.parquet.schema.Type;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -32,6 +33,8 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.plugins.fs.LocalFilePositionOutputStream;
import org.opensearch.dataprepper.plugins.sink.s3.S3OutputCodecContext;
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
Expand Down Expand Up @@ -62,6 +65,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;

@ExtendWith(MockitoExtension.class)
public class ParquetOutputCodecTest {
Expand Down Expand Up @@ -351,6 +355,119 @@ void getSize_returns_non_zero_after_close_and_new_writes(int writeCount) throws
assertThat(actualSizeOptional.get(), lessThanOrEqualTo(roughMultiplierMax * writeCount));
}


@Nested
class ValidateWithSchema {
private OutputCodecContext codecContext;
private List<String> keys;

@BeforeEach
void setUp() {
config.setSchema(createStandardSchemaNullable().toString());
codecContext = mock(OutputCodecContext.class);
keys = List.of(UUID.randomUUID().toString());
}

@Test
void validateAgainstCodecContext_throws_when_user_defined_schema_and_includeKeys_non_empty() {
when(codecContext.getIncludeKeys()).thenReturn(keys);

ParquetOutputCodec objectUnderTest = createObjectUnderTest();
assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.validateAgainstCodecContext(codecContext));
}

@Test
void validateAgainstCodecContext_throws_when_user_defined_schema_and_excludeKeys_non_empty() {
when(codecContext.getExcludeKeys()).thenReturn(keys);

ParquetOutputCodec objectUnderTest = createObjectUnderTest();
assertThrows(InvalidPluginConfigurationException.class, () -> objectUnderTest.validateAgainstCodecContext(codecContext));
}

@Test
void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_includeKeys_isNull() {
when(codecContext.getIncludeKeys()).thenReturn(null);

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_includeKeys_isEmpty() {
when(codecContext.getIncludeKeys()).thenReturn(Collections.emptyList());

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_excludeKeys_isNull() {
when(codecContext.getExcludeKeys()).thenReturn(null);

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_user_defined_schema_and_excludeKeys_isEmpty() {
when(codecContext.getExcludeKeys()).thenReturn(Collections.emptyList());

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}
}

@Nested
class ValidateWithAutoSchema {
private OutputCodecContext codecContext;
private List<String> keys;

@BeforeEach
void setUp() {
config.setAutoSchema(true);
codecContext = mock(OutputCodecContext.class, withSettings().lenient());
keys = List.of(UUID.randomUUID().toString());
}

@Test
void validateAgainstCodecContext_is_ok_when_auto_schema_and_includeKeys_non_empty() {
when(codecContext.getIncludeKeys()).thenReturn(keys);

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_auto_schema_and_excludeKeys_non_empty() {
when(codecContext.getExcludeKeys()).thenReturn(keys);

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_auto_schema_and_includeKeys_isNull() {
when(codecContext.getIncludeKeys()).thenReturn(null);

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_auto_schema_and_includeKeys_isEmpty() {
when(codecContext.getIncludeKeys()).thenReturn(Collections.emptyList());

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_auto_schema_and_excludeKeys_isNull() {
when(codecContext.getExcludeKeys()).thenReturn(null);

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}

@Test
void validateAgainstCodecContext_is_ok_when_auto_schema_and_excludeKeys_isEmpty() {
when(codecContext.getExcludeKeys()).thenReturn(Collections.emptyList());

createObjectUnderTest().validateAgainstCodecContext(codecContext);
}
}

private static Event createEventRecord(final Map<String, Object> eventData) {
return JacksonLog.builder().withData(eventData).build();
}
Expand Down
Loading

0 comments on commit 01981a0

Please sign in to comment.