Skip to content

Commit

Permalink
Add validations on route expressions, clean up assertTrue validation …
Browse files Browse the repository at this point in the history
…messages (#5060)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Oct 15, 2024
1 parent 729c66f commit 5b6221c
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.parser;

import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.annotations.SingleThread;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
Expand Down Expand Up @@ -53,6 +54,9 @@
@SuppressWarnings("rawtypes")
public class PipelineTransformer {
private static final Logger LOG = LoggerFactory.getLogger(PipelineTransformer.class);

static final String CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT = "Route %s contains an invalid conditional expression '%s'. " +
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.";
private static final String PIPELINE_TYPE = "pipeline";
private static final String ATTRIBUTE_NAME = "name";
private final PipelinesDataFlowModel pipelinesDataFlowModel;
Expand All @@ -68,6 +72,8 @@ public class PipelineTransformer {
private final PluginErrorCollector pluginErrorCollector;
private final PluginErrorsHandler pluginErrorsHandler;

private final ExpressionEvaluator expressionEvaluator;

public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
final PluginFactory pluginFactory,
final PeerForwarderProvider peerForwarderProvider,
Expand All @@ -78,7 +84,8 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory,
final PluginErrorCollector pluginErrorCollector,
final PluginErrorsHandler pluginErrorsHandler) {
final PluginErrorsHandler pluginErrorsHandler,
final ExpressionEvaluator expressionEvaluator) {
this.pipelinesDataFlowModel = pipelinesDataFlowModel;
this.pluginFactory = Objects.requireNonNull(pluginFactory);
this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider);
Expand All @@ -90,6 +97,7 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
this.sourceCoordinatorFactory = sourceCoordinatorFactory;
this.pluginErrorCollector = pluginErrorCollector;
this.pluginErrorsHandler = pluginErrorsHandler;
this.expressionEvaluator = expressionEvaluator;
}

public Map<String, Pipeline> transformConfiguration() {
Expand Down Expand Up @@ -168,7 +176,19 @@ private void buildPipelineFromConfiguration(
final List<PluginError> subPipelinePluginErrors = pluginErrorCollector.getPluginErrors()
.stream().filter(pluginError -> pipelineName.equals(pluginError.getPipelineName()))
.collect(Collectors.toList());
if (!subPipelinePluginErrors.isEmpty()) {

final List<PluginError> invalidRouteExpressions = pipelineConfiguration.getRoutes()
.stream().filter(route -> !expressionEvaluator.isValidExpressionStatement(route.getCondition()))
.map(route -> PluginError.builder()
.componentType(PipelineModel.ROUTE_PLUGIN_TYPE)
.pipelineName(pipelineName)
.exception(new InvalidPluginConfigurationException(
String.format(CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT, route.getName(), route.getCondition())))
.build())
.collect(Collectors.toList());

if (!subPipelinePluginErrors.isEmpty() || !invalidRouteExpressions.isEmpty()) {
subPipelinePluginErrors.addAll(invalidRouteExpressions);
pluginErrorsHandler.handleErrors(subPipelinePluginErrors);
throw new InvalidPluginConfigurationException(
String.format("One or more plugins are not configured correctly in the pipeline: %s.\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.parser.config;

import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.event.EventFactory;
Expand All @@ -32,7 +33,8 @@
@Configuration
@ComponentScan(basePackages = {
"org.opensearch.dataprepper.pipeline.parser",
"org.opensearch.dataprepper.plugin"
"org.opensearch.dataprepper.plugin",
"org.opensearch.dataprepper.expression"
})
public class PipelineParserConfiguration {

Expand All @@ -48,7 +50,8 @@ public PipelineTransformer pipelineParser(
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory,
final PluginErrorCollector pluginErrorCollector,
final PluginErrorsHandler pluginErrorsHandler
final PluginErrorsHandler pluginErrorsHandler,
final ExpressionEvaluator expressionEvaluator
) {
return new PipelineTransformer(pipelinesDataFlowModel,
pluginFactory,
Expand All @@ -60,7 +63,8 @@ public PipelineTransformer pipelineParser(
acknowledgementSetManager,
sourceCoordinatorFactory,
pluginErrorCollector,
pluginErrorsHandler);
pluginErrorsHandler,
expressionEvaluator);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager;
import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.core.event.EventFactoryApplicationContextMarker;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.breaker.CircuitBreaker;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
Expand Down Expand Up @@ -64,12 +66,15 @@
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.parser.PipelineTransformer.CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT;

@ExtendWith(MockitoExtension.class)
class PipelineTransformerTests {
Expand All @@ -93,6 +98,9 @@ class PipelineTransformerTests {
private CircuitBreakerManager circuitBreakerManager;
@Mock
private PluginErrorsHandler pluginErrorsHandler;

@Mock
private ExpressionEvaluator expressionEvaluator;
@Captor
private ArgumentCaptor<Collection<PluginError>> pluginErrorsArgumentCaptor;

Expand Down Expand Up @@ -143,7 +151,7 @@ private PipelineTransformer createObjectUnderTest(final String pipelineConfigura
pipelinesDataFlowModel, pluginFactory, peerForwarderProvider,
routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory,
acknowledgementSetManager, sourceCoordinatorFactory, pluginErrorCollector,
pluginErrorsHandler);
pluginErrorsHandler, expressionEvaluator);
}

@Test
Expand Down Expand Up @@ -345,6 +353,7 @@ void testMultipleProcessors() {
@Test
void parseConfiguration_with_routes_creates_correct_pipeline() {
mockDataPrepperConfigurationAccesses();
when(expressionEvaluator.isValidExpressionStatement(anyString())).thenReturn(true);
final PipelineTransformer pipelineTransformer =
createObjectUnderTest("src/test/resources/valid_multiple_sinks_with_routes.yml");
final Map<String, Pipeline> pipelineMap = pipelineTransformer.transformConfiguration();
Expand All @@ -358,6 +367,34 @@ void parseConfiguration_with_routes_creates_correct_pipeline() {
verify(dataPrepperConfiguration).getPipelineExtensions();
}

@Test
void parseConfiguration_with_invalid_route_expressions_handles_errors_and_returns_empty_pipeline_map() {
when(expressionEvaluator.isValidExpressionStatement("/value == raw")).thenReturn(true);
when(expressionEvaluator.isValidExpressionStatement("/value == service")).thenReturn(false);

final ArgumentCaptor<Collection<PluginError>> pluginErrorArgumentCaptor = ArgumentCaptor.forClass(Collection.class);
doNothing().when(pluginErrorsHandler).handleErrors(pluginErrorArgumentCaptor.capture());
final PipelineTransformer pipelineTransformer =
createObjectUnderTest("src/test/resources/valid_multiple_sinks_with_routes.yml");
final Map<String, Pipeline> pipelineMap = pipelineTransformer.transformConfiguration();
assertThat(pipelineMap.keySet().isEmpty(), equalTo(true));

final Collection<PluginError> pluginErrorCollection = pluginErrorArgumentCaptor.getValue();
assertThat(pluginErrorCollection, notNullValue());
assertThat(pluginErrorCollection.size(), equalTo(1));

final PluginError pluginError = pluginErrorCollection.stream().findAny().orElseThrow();
final String expectedErrorMessage = String.format(CONDITIONAL_ROUTE_INVALID_EXPRESSION_FORMAT, "service", "/value == service");
assertThat(pluginError.getPluginName(), equalTo(null));
assertThat(pluginError.getPipelineName(), equalTo("entry-pipeline"));
assertThat(pluginError.getComponentType(), equalTo(PipelineModel.ROUTE_PLUGIN_TYPE));
assertThat(pluginError.getException(), notNullValue());
assertThat(pluginError.getException() instanceof InvalidPluginConfigurationException, equalTo(true));
assertThat(pluginError.getException().getMessage(), equalTo(expectedErrorMessage));

verify(dataPrepperConfiguration).getPipelineExtensions();
}

@Test
void getPeerForwarderDrainDuration_peerForwarderConfigurationNotSet() {
when(dataPrepperConfiguration.getPeerForwarderConfiguration()).thenReturn(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.parser.config;

import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.PipelineTransformer;
Expand Down Expand Up @@ -63,12 +64,15 @@ class PipelineParserConfigurationTest {
@Mock
private PluginErrorsHandler pluginErrorsHandler;

@Mock
private ExpressionEvaluator expressionEvaluator;

@Test
void pipelineParser() {
final PipelineTransformer pipelineTransformer = pipelineParserConfiguration.pipelineParser(
pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, routerFactory,
dataPrepperConfiguration, circuitBreakerManager, eventFactory, acknowledgementSetManager,
sourceCoordinatorFactory, pluginErrorCollector, pluginErrorsHandler);
sourceCoordinatorFactory, pluginErrorCollector, pluginErrorsHandler, expressionEvaluator);

assertThat(pipelineTransformer, is(notNullValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ public class PluginError {
static final String CAUSED_BY_DELIMITER = " caused by: ";
private final String pipelineName;
private final String componentType;
@NonNull
private final String pluginName;

@NonNull
private final Exception exception;

Expand All @@ -27,8 +27,11 @@ public String getErrorMessage() {
message.append(componentType);
message.append(PIPELINE_DELIMITER);
}
message.append(pluginName);
message.append(PATH_TO_CAUSE_DELIMITER);

if (pluginName != null) {
message.append(pluginName);
message.append(PATH_TO_CAUSE_DELIMITER);
}
message.append(getFlattenedExceptionMessage(CAUSED_BY_DELIMITER));
return message.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.Validator;
import jakarta.validation.constraints.AssertTrue;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
Expand Down Expand Up @@ -69,7 +70,7 @@ public Object convert(final Class<?> pluginConfigurationType, final PluginSettin

if (!constraintViolations.isEmpty()) {
final String violationsString = constraintViolations.stream()
.map(v -> v.getPropertyPath().toString() + " " + v.getMessage())
.map(this::constructConstrainViolationMessage)
.collect(Collectors.joining(". "));

final String exceptionMessage = String.format("Plugin %s in pipeline %s is configured incorrectly: %s",
Expand All @@ -92,4 +93,12 @@ private Object convertSettings(final Class<?> pluginConfigurationType, final Plu
}
}

private String constructConstrainViolationMessage(final ConstraintViolation<Object> constraintViolation) {
if (constraintViolation.getConstraintDescriptor().getAnnotation().annotationType().equals(AssertTrue.class)) {
return constraintViolation.getMessage();
}

return constraintViolation.getPropertyPath().toString() + " " + constraintViolation.getMessage();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public RuntimeException handleException(final PluginSetting pluginSetting, final
return new InvalidPluginConfigurationException(
String.format(GENERIC_PLUGIN_EXCEPTION_FORMAT, pluginSetting.getName(), e.getMessage()));
}

private RuntimeException handleJsonMappingException(final JsonMappingException e, final PluginSetting pluginSetting) {
final String parameterPath = getParameterPath(e.getPath());

Expand Down
Loading

0 comments on commit 5b6221c

Please sign in to comment.