From 5b6221c26044a74a9b6f3af3ef71e0bff36036bf Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Tue, 15 Oct 2024 10:09:55 -0500 Subject: [PATCH] Add validations on route expressions, clean up assertTrue validation messages (#5060) Signed-off-by: Taylor Gray --- .../parser/PipelineTransformer.java | 24 +++++- .../config/PipelineParserConfiguration.java | 10 ++- .../parser/PipelineTransformerTests.java | 39 ++++++++- .../PipelineParserConfigurationTest.java | 6 +- .../dataprepper/validation/PluginError.java | 9 +- .../plugin/PluginConfigurationConverter.java | 11 ++- .../PluginConfigurationErrorHandler.java | 1 - .../PluginConfigurationConverterTest.java | 84 ++++++++++++++++++- .../processor/csv/CsvProcessorConfig.java | 3 + 9 files changed, 174 insertions(+), 13 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java index c57c5eab61..c7f4d78c6b 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.parser; import org.opensearch.dataprepper.breaker.CircuitBreakerManager; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.annotations.SingleThread; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PipelineModel; @@ -53,6 +54,9 @@ @SuppressWarnings("rawtypes") public class PipelineTransformer { private static final Logger LOG = LoggerFactory.getLogger(PipelineTransformer.class); + + static final String CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT = "Route %s contains an invalid conditional expression '%s'. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax."; private static final String PIPELINE_TYPE = "pipeline"; private static final String ATTRIBUTE_NAME = "name"; private final PipelinesDataFlowModel pipelinesDataFlowModel; @@ -68,6 +72,8 @@ public class PipelineTransformer { private final PluginErrorCollector pluginErrorCollector; private final PluginErrorsHandler pluginErrorsHandler; + private final ExpressionEvaluator expressionEvaluator; + public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel, final PluginFactory pluginFactory, final PeerForwarderProvider peerForwarderProvider, @@ -78,7 +84,8 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel, final AcknowledgementSetManager acknowledgementSetManager, final SourceCoordinatorFactory sourceCoordinatorFactory, final PluginErrorCollector pluginErrorCollector, - final PluginErrorsHandler pluginErrorsHandler) { + final PluginErrorsHandler pluginErrorsHandler, + final ExpressionEvaluator expressionEvaluator) { this.pipelinesDataFlowModel = pipelinesDataFlowModel; this.pluginFactory = Objects.requireNonNull(pluginFactory); this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider); @@ -90,6 +97,7 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel, this.sourceCoordinatorFactory = sourceCoordinatorFactory; this.pluginErrorCollector = pluginErrorCollector; this.pluginErrorsHandler = pluginErrorsHandler; + this.expressionEvaluator = expressionEvaluator; } public Map transformConfiguration() { @@ -168,7 +176,19 @@ private void buildPipelineFromConfiguration( final List subPipelinePluginErrors = pluginErrorCollector.getPluginErrors() .stream().filter(pluginError -> pipelineName.equals(pluginError.getPipelineName())) .collect(Collectors.toList()); - if (!subPipelinePluginErrors.isEmpty()) { + + final List invalidRouteExpressions = pipelineConfiguration.getRoutes() + .stream().filter(route -> !expressionEvaluator.isValidExpressionStatement(route.getCondition())) + .map(route -> PluginError.builder() + .componentType(PipelineModel.ROUTE_PLUGIN_TYPE) + .pipelineName(pipelineName) + .exception(new InvalidPluginConfigurationException( + String.format(CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT, route.getName(), route.getCondition()))) + .build()) + .collect(Collectors.toList()); + + if (!subPipelinePluginErrors.isEmpty() || !invalidRouteExpressions.isEmpty()) { + subPipelinePluginErrors.addAll(invalidRouteExpressions); pluginErrorsHandler.handleErrors(subPipelinePluginErrors); throw new InvalidPluginConfigurationException( String.format("One or more plugins are not configured correctly in the pipeline: %s.\n", diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java index 440e618f3b..27bae13415 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.parser.config; import org.opensearch.dataprepper.breaker.CircuitBreakerManager; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.event.EventFactory; @@ -32,7 +33,8 @@ @Configuration @ComponentScan(basePackages = { "org.opensearch.dataprepper.pipeline.parser", - "org.opensearch.dataprepper.plugin" + "org.opensearch.dataprepper.plugin", + "org.opensearch.dataprepper.expression" }) public class PipelineParserConfiguration { @@ -48,7 +50,8 @@ public PipelineTransformer pipelineParser( final AcknowledgementSetManager acknowledgementSetManager, final SourceCoordinatorFactory sourceCoordinatorFactory, final PluginErrorCollector pluginErrorCollector, - final PluginErrorsHandler pluginErrorsHandler + final PluginErrorsHandler pluginErrorsHandler, + final ExpressionEvaluator expressionEvaluator ) { return new PipelineTransformer(pipelinesDataFlowModel, pluginFactory, @@ -60,7 +63,8 @@ public PipelineTransformer pipelineParser( acknowledgementSetManager, sourceCoordinatorFactory, pluginErrorCollector, - pluginErrorsHandler); + pluginErrorsHandler, + expressionEvaluator); } @Bean diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java index bcddc49b94..31a49fa839 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java @@ -22,12 +22,14 @@ import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager; import org.opensearch.dataprepper.breaker.CircuitBreakerManager; import org.opensearch.dataprepper.core.event.EventFactoryApplicationContextMarker; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.breaker.CircuitBreaker; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PipelineModel; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration; @@ -64,12 +66,15 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.parser.PipelineTransformer.CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT; @ExtendWith(MockitoExtension.class) class PipelineTransformerTests { @@ -93,6 +98,9 @@ class PipelineTransformerTests { private CircuitBreakerManager circuitBreakerManager; @Mock private PluginErrorsHandler pluginErrorsHandler; + + @Mock + private ExpressionEvaluator expressionEvaluator; @Captor private ArgumentCaptor> pluginErrorsArgumentCaptor; @@ -143,7 +151,7 @@ private PipelineTransformer createObjectUnderTest(final String pipelineConfigura pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, pluginErrorCollector, - pluginErrorsHandler); + pluginErrorsHandler, expressionEvaluator); } @Test @@ -345,6 +353,7 @@ void testMultipleProcessors() { @Test void parseConfiguration_with_routes_creates_correct_pipeline() { mockDataPrepperConfigurationAccesses(); + when(expressionEvaluator.isValidExpressionStatement(anyString())).thenReturn(true); final PipelineTransformer pipelineTransformer = createObjectUnderTest("src/test/resources/valid_multiple_sinks_with_routes.yml"); final Map pipelineMap = pipelineTransformer.transformConfiguration(); @@ -358,6 +367,34 @@ void parseConfiguration_with_routes_creates_correct_pipeline() { verify(dataPrepperConfiguration).getPipelineExtensions(); } + @Test + void parseConfiguration_with_invalid_route_expressions_handles_errors_and_returns_empty_pipeline_map() { + when(expressionEvaluator.isValidExpressionStatement("/value == raw")).thenReturn(true); + when(expressionEvaluator.isValidExpressionStatement("/value == service")).thenReturn(false); + + final ArgumentCaptor> pluginErrorArgumentCaptor = ArgumentCaptor.forClass(Collection.class); + doNothing().when(pluginErrorsHandler).handleErrors(pluginErrorArgumentCaptor.capture()); + final PipelineTransformer pipelineTransformer = + createObjectUnderTest("src/test/resources/valid_multiple_sinks_with_routes.yml"); + final Map pipelineMap = pipelineTransformer.transformConfiguration(); + assertThat(pipelineMap.keySet().isEmpty(), equalTo(true)); + + final Collection pluginErrorCollection = pluginErrorArgumentCaptor.getValue(); + assertThat(pluginErrorCollection, notNullValue()); + assertThat(pluginErrorCollection.size(), equalTo(1)); + + final PluginError pluginError = pluginErrorCollection.stream().findAny().orElseThrow(); + final String expectedErrorMessage = String.format(CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT, "service", "/value == service"); + assertThat(pluginError.getPluginName(), equalTo(null)); + assertThat(pluginError.getPipelineName(), equalTo("entry-pipeline")); + assertThat(pluginError.getComponentType(), equalTo(PipelineModel.ROUTE_PLUGIN_TYPE)); + assertThat(pluginError.getException(), notNullValue()); + assertThat(pluginError.getException() instanceof InvalidPluginConfigurationException, equalTo(true)); + assertThat(pluginError.getException().getMessage(), equalTo(expectedErrorMessage)); + + verify(dataPrepperConfiguration).getPipelineExtensions(); + } + @Test void getPeerForwarderDrainDuration_peerForwarderConfigurationNotSet() { when(dataPrepperConfiguration.getPeerForwarderConfiguration()).thenReturn(null); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java index ab657dfc94..874836fdf1 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.parser.config; import org.opensearch.dataprepper.breaker.CircuitBreakerManager; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.parser.PipelineTransformer; @@ -63,12 +64,15 @@ class PipelineParserConfigurationTest { @Mock private PluginErrorsHandler pluginErrorsHandler; + @Mock + private ExpressionEvaluator expressionEvaluator; + @Test void pipelineParser() { final PipelineTransformer pipelineTransformer = pipelineParserConfiguration.pipelineParser( pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory, acknowledgementSetManager, - sourceCoordinatorFactory, pluginErrorCollector, pluginErrorsHandler); + sourceCoordinatorFactory, pluginErrorCollector, pluginErrorsHandler, expressionEvaluator); assertThat(pipelineTransformer, is(notNullValue())); } diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginError.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginError.java index 4f7eb4b919..01634a6e7c 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginError.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginError.java @@ -12,8 +12,8 @@ public class PluginError { static final String CAUSED_BY_DELIMITER = " caused by: "; private final String pipelineName; private final String componentType; - @NonNull private final String pluginName; + @NonNull private final Exception exception; @@ -27,8 +27,11 @@ public String getErrorMessage() { message.append(componentType); message.append(PIPELINE_DELIMITER); } - message.append(pluginName); - message.append(PATH_TO_CAUSE_DELIMITER); + + if (pluginName != null) { + message.append(pluginName); + message.append(PATH_TO_CAUSE_DELIMITER); + } message.append(getFlattenedExceptionMessage(CAUSED_BY_DELIMITER)); return message.toString(); } diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java index ca4e5d0e9d..5af6d3dea9 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverter.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.validation.ConstraintViolation; import jakarta.validation.Validator; +import jakarta.validation.constraints.AssertTrue; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; @@ -69,7 +70,7 @@ public Object convert(final Class pluginConfigurationType, final PluginSettin if (!constraintViolations.isEmpty()) { final String violationsString = constraintViolations.stream() - .map(v -> v.getPropertyPath().toString() + " " + v.getMessage()) + .map(this::constructConstrainViolationMessage) .collect(Collectors.joining(". ")); final String exceptionMessage = String.format("Plugin %s in pipeline %s is configured incorrectly: %s", @@ -92,4 +93,12 @@ private Object convertSettings(final Class pluginConfigurationType, final Plu } } + private String constructConstrainViolationMessage(final ConstraintViolation constraintViolation) { + if (constraintViolation.getConstraintDescriptor().getAnnotation().annotationType().equals(AssertTrue.class)) { + return constraintViolation.getMessage(); + } + + return constraintViolation.getPropertyPath().toString() + " " + constraintViolation.getMessage(); + } + } diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationErrorHandler.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationErrorHandler.java index 8ad79ee349..9ae5f2579c 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationErrorHandler.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/PluginConfigurationErrorHandler.java @@ -38,7 +38,6 @@ public RuntimeException handleException(final PluginSetting pluginSetting, final return new InvalidPluginConfigurationException( String.format(GENERIC_PLUGIN_EXCEPTION_FORMAT, pluginSetting.getName(), e.getMessage())); } - private RuntimeException handleJsonMappingException(final JsonMappingException e, final PluginSetting pluginSetting) { final String parameterPath = getParameterPath(e.getPath()); diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java index 4eb19853e4..bd00826277 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/PluginConfigurationConverterTest.java @@ -9,7 +9,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.validation.ConstraintViolation; import jakarta.validation.Path; +import jakarta.validation.Payload; import jakarta.validation.Validator; +import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.metadata.ConstraintDescriptor; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -18,6 +21,9 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import javax.annotation.Nonnull; +import javax.annotation.meta.When; +import java.lang.annotation.Annotation; import java.util.Collections; import java.util.UUID; @@ -138,7 +144,7 @@ void convert_with_other_target_should_validate_configuration() { } @Test - void convert_with_other_target_should_throw_exception_when_there_are_constraint_violations() { + void convert_with_other_target_should_throw_exception_when_there_are_constraint_violations_with_for_non_assert_true() { final String value = UUID.randomUUID().toString(); given(pluginSetting.getSettings()) @@ -160,6 +166,23 @@ void convert_with_other_target_should_throw_exception_when_there_are_constraint_ given(propertyPath.toString()).willReturn(propertyPathString); given(constraintViolation.getPropertyPath()).willReturn(propertyPath); + final ConstraintDescriptor constraintDescriptor = mock(ConstraintDescriptor.class); + given(constraintViolation.getConstraintDescriptor()).willReturn(constraintDescriptor); + + Nonnull annotation = new Nonnull() { + @Override + public When when() { + return null; + } + + @Override + public Class annotationType() { + return Nonnull.class; + } + }; + + when(constraintDescriptor.getAnnotation()).thenReturn(annotation); + given(validator.validate(any())) .willReturn(Collections.singleton(constraintViolation)); @@ -174,6 +197,65 @@ void convert_with_other_target_should_throw_exception_when_there_are_constraint_ assertThat(actualException.getMessage(), containsString(errorMessage)); } + @Test + void convert_with_other_target_should_throw_exception_when_there_are_constraint_violations_with_assert_true_annotation() { + + final String value = UUID.randomUUID().toString(); + given(pluginSetting.getSettings()) + .willReturn(Collections.singletonMap("my_value", value)); + + final String pluginName = UUID.randomUUID().toString(); + given(pluginSetting.getName()) + .willReturn(pluginName); + + final String pipelineName = UUID.randomUUID().toString(); + given(pluginSetting.getPipelineName()) + .willReturn(pipelineName); + + @SuppressWarnings("unchecked") final ConstraintViolation constraintViolation = mock(ConstraintViolation.class); + final String errorMessage = UUID.randomUUID().toString(); + given(constraintViolation.getMessage()).willReturn(errorMessage); + + final ConstraintDescriptor constraintDescriptor = mock(ConstraintDescriptor.class); + given(constraintViolation.getConstraintDescriptor()).willReturn(constraintDescriptor); + + AssertTrue annotation = new AssertTrue() { + @Override + public String message() { + return null; + } + + @Override + public Class[] groups() { + return new Class[0]; + } + + @Override + public Class[] payload() { + return new Class[0]; + } + + @Override + public Class annotationType() { + return AssertTrue.class; + } + }; + + when(constraintDescriptor.getAnnotation()).thenReturn(annotation); + + given(validator.validate(any())) + .willReturn(Collections.singleton(constraintViolation)); + + final PluginConfigurationConverter objectUnderTest = createObjectUnderTest(); + + final InvalidPluginConfigurationException actualException = assertThrows(InvalidPluginConfigurationException.class, + () -> objectUnderTest.convert(TestConfiguration.class, pluginSetting)); + + assertThat(actualException.getMessage(), containsString(pluginName)); + assertThat(actualException.getMessage(), containsString(pipelineName)); + assertThat(actualException.getMessage(), containsString(errorMessage)); + } + @Test void convert_with_error_when_converting_with_object_mapper_calls_plugin_configuration_error_handler() { objectMapper = mock(ObjectMapper.class); diff --git a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java index 07cfe5798f..30537e91e9 100644 --- a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java +++ b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.annotation.JsonClassDescription; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; import jakarta.validation.constraints.AssertTrue; import java.util.List; @@ -15,6 +16,8 @@ /** * Configuration class for {@link CsvProcessor}. */ + +@JsonPropertyOrder @JsonClassDescription("The csv processor parses comma-separated values (CSVs) strings into structured data.") public class CsvProcessorConfig { static final String DEFAULT_SOURCE = "message";