From f09db1a0f6fc41dcbee17451b927b5dc61553ac8 Mon Sep 17 00:00:00 2001 From: kkondaka <41027584+kkondaka@users.noreply.github.com> Date: Thu, 13 Jul 2023 08:20:50 -0700 Subject: [PATCH 1/5] Updated Kafka security configuration (#2994) * Add Kafka Security Configurations Signed-off-by: Krishna Kondaka * Modified kafka security config. Added new fields to AwsConfig Signed-off-by: Krishna Kondaka * Addressed review comments Signed-off-by: Krishna Kondaka * Modified AwsConfig to have msk option that can take multiple options Signed-off-by: Krishna Kondaka --------- Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../kafka-plugins/build.gradle | 1 + .../kafka/configuration/AuthConfig.java | 68 +++++++++++++++---- .../kafka/configuration/AwsConfig.java | 47 +++++++++++-- .../kafka/configuration/AwsIamAuthConfig.java | 34 ++++++++++ .../kafka/configuration/EncryptionType.java | 34 ++++++++++ .../configuration/KafkaSourceConfig.java | 13 ++++ .../MskBrokerConnectionType.java | 35 ++++++++++ .../plugins/kafka/source/KafkaSource.java | 59 ++++++++++++++-- .../kafka/configuration/AuthConfigTest.java | 62 +++++++++++++++-- .../kafka/configuration/AwsConfigTest.java | 18 +++-- .../configuration/AwsIamAuthConfigTest.java | 21 ++++++ .../configuration/EncryptionTypeTest.java | 21 ++++++ .../configuration/KafkaSourceConfigTest.java | 5 ++ .../MskBrokerConnectionTypeTest.java | 21 ++++++ .../kafka/configuration/OAuthConfigTest.java | 2 +- .../PlainTextAuthConfigTest.java | 4 +- .../src/test/resources/sample-pipelines.yaml | 35 +++++----- 17 files changed, 423 insertions(+), 57 deletions(-) create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsIamAuthConfig.java create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionType.java create mode 100644 data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/MskBrokerConnectionType.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsIamAuthConfigTest.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionTypeTest.java create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/MskBrokerConnectionTypeTest.java diff --git a/data-prepper-plugins/kafka-plugins/build.gradle b/data-prepper-plugins/kafka-plugins/build.gradle index 70c635bd08..848e9d9252 100644 --- a/data-prepper-plugins/kafka-plugins/build.gradle +++ b/data-prepper-plugins/kafka-plugins/build.gradle @@ -20,6 +20,7 @@ dependencies { implementation 'io.confluent:kafka-avro-serializer:7.3.3' implementation 'io.confluent:kafka-schema-registry-client:7.3.3' implementation 'io.confluent:kafka-schema-registry:7.3.3:tests' + implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.6' testImplementation 'org.mockito:mockito-inline:4.1.0' testImplementation 'org.yaml:snakeyaml:2.0' testImplementation testLibs.spring.test diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java index 0885d9b5f5..5cc768cc99 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfig.java @@ -5,33 +5,73 @@ package org.opensearch.dataprepper.plugins.kafka.configuration; - import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.Valid; + +import java.util.stream.Stream; /** - * * A helper class that helps to read auth related configuration values from + * A helper class that helps to read auth related configuration values from * pipelines.yaml */ public class AuthConfig { - @JsonProperty("sasl_plaintext") - private PlainTextAuthConfig plainTextAuthConfig; - @JsonProperty("sasl_oauth") - private OAuthConfig oAuthConfig; + public static class SaslAuthConfig { + @JsonProperty("plaintext") + private PlainTextAuthConfig plainTextAuthConfig; + + @JsonProperty("oauth") + private OAuthConfig oAuthConfig; + + @JsonProperty("aws_iam") + private AwsIamAuthConfig awsIamAuthConfig; + + public AwsIamAuthConfig getAwsIamAuthConfig() { + return awsIamAuthConfig; + } + + public PlainTextAuthConfig getPlainTextAuthConfig() { + return plainTextAuthConfig; + } + + public OAuthConfig getOAuthConfig() { + return oAuthConfig; + } + + @AssertTrue(message = "Only one of AwsIam or oAuth or PlainText auth config must be specified") + public boolean hasOnlyOneConfig() { + return Stream.of(awsIamAuthConfig, plainTextAuthConfig, oAuthConfig).filter(n -> n!=null).count() == 1; + } - public OAuthConfig getoAuthConfig() { - return oAuthConfig; } - public void setoAuthConfig(OAuthConfig oAuthConfig) { - this.oAuthConfig = oAuthConfig; + public static class SslAuthConfig { + // TODO Add Support for SSL authentication types like + // one-way or two-way authentication + + public SslAuthConfig() { + } } - public void setPlainTextAuthConfig(PlainTextAuthConfig plainTextAuthConfig) { - this.plainTextAuthConfig = plainTextAuthConfig; + @JsonProperty("ssl") + private SslAuthConfig sslAuthConfig; + + @Valid + @JsonProperty("sasl") + private SaslAuthConfig saslAuthConfig; + + public SslAuthConfig getSslAuthConfig() { + return sslAuthConfig; } - public PlainTextAuthConfig getPlainTextAuthConfig() { - return plainTextAuthConfig; + public SaslAuthConfig getSaslAuthConfig() { + return saslAuthConfig; } + + @AssertTrue(message = "Only one of SSL or SASL auth config must be specified") + public boolean hasSaslOrSslConfig() { + return Stream.of(sslAuthConfig, saslAuthConfig).filter(n -> n!=null).count() == 1; + } + } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfig.java index 4f4fa91cb5..0cd56d39bf 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfig.java @@ -7,13 +7,50 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Size; +import jakarta.validation.Valid; public class AwsConfig { - @JsonProperty("msk_arn") - @Size(min = 20, max = 2048, message = "mskArn length should be between 20 and 2048 characters") - private String awsMskArn; - public String getAwsMskArn() { - return awsMskArn; + public static class AwsMskConfig { + @Valid + @Size(min = 20, max = 2048, message = "msk_arn length should be between 20 and 2048 characters") + @JsonProperty("arn") + private String arn; + + @JsonProperty("broker_connection_type") + private MskBrokerConnectionType brokerConnectionType; + + public String getArn() { + return arn; + } + + public MskBrokerConnectionType getBrokerConnectionType() { + return brokerConnectionType; + } + } + + @JsonProperty("msk") + private AwsMskConfig awsMskConfig; + + @Valid + @Size(min = 1, message = "Region cannot be empty string") + @JsonProperty("region") + private String region; + + @Valid + @Size(min = 20, max = 2048, message = "sts_role_arn length should be between 20 and 2048 characters") + @JsonProperty("sts_role_arn") + private String stsRoleArn; + + public AwsMskConfig getAwsMskConfig() { + return awsMskConfig; + } + + public String getRegion() { + return region; + } + + public String getStsRoleArn() { + return stsRoleArn; } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsIamAuthConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsIamAuthConfig.java new file mode 100644 index 0000000000..9427ea6dcb --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsIamAuthConfig.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import com.fasterxml.jackson.annotation.JsonCreator; +import java.util.Map; +import java.util.Arrays; +import java.util.stream.Collectors; + +public enum AwsIamAuthConfig { + ROLE("role"), + DEFAULT("default"); + //TODO add "PROFILE" option + + private static final Map OPTIONS_MAP = Arrays.stream(AwsIamAuthConfig.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private final String option; + + AwsIamAuthConfig(final String option) { + this.option = option; + } + + @JsonCreator + static AwsIamAuthConfig fromOptionValue(final String option) { + return OPTIONS_MAP.get(option.toLowerCase()); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionType.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionType.java new file mode 100644 index 0000000000..8142d88bc6 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionType.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Map; +import java.util.Arrays; +import java.util.stream.Collectors; + +public enum EncryptionType { + SSL("ssl"), + PLAINTEXT("plaintext"); + + private static final Map OPTIONS_MAP = Arrays.stream(EncryptionType.values()) + .collect(Collectors.toMap( + value -> value.type, + value -> value + )); + + private final String type; + + EncryptionType(final String type) { + this.type = type; + } + + @JsonCreator + static EncryptionType fromTypeValue(final String type) { + return OPTIONS_MAP.get(type.toLowerCase()); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java index bf8645c200..c6e0a560ee 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfig.java @@ -35,10 +35,15 @@ public class KafkaSourceConfig { @Valid private SchemaConfig schemaConfig; + @Valid @JsonProperty("authentication") private AuthConfig authConfig; + @JsonProperty("encryption") + private EncryptionType encryptionType = EncryptionType.SSL; + @JsonProperty("aws") + @Valid private AwsConfig awsConfig; @JsonProperty("acknowledgments") @@ -83,6 +88,14 @@ public AuthConfig getAuthConfig() { return authConfig; } + public EncryptionType getEncryptionType() { + return encryptionType; + } + + public AwsConfig getAwsConfig() { + return awsConfig; + } + public void setAuthConfig(AuthConfig authConfig) { this.authConfig = authConfig; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/MskBrokerConnectionType.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/MskBrokerConnectionType.java new file mode 100644 index 0000000000..65d086e8ec --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/MskBrokerConnectionType.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Map; +import java.util.Arrays; +import java.util.stream.Collectors; + +public enum MskBrokerConnectionType { + PUBLIC("public"), + SINGLE_VPC("single_vpc"), + MULTI_VPC("multi_vpc"); + + private static final Map OPTIONS_MAP = Arrays.stream(MskBrokerConnectionType.values()) + .collect(Collectors.toMap( + value -> value.type, + value -> value + )); + + private final String type; + + MskBrokerConnectionType(final String type) { + this.type = type; + } + + @JsonCreator + static MskBrokerConnectionType fromTypeValue(final String type) { + return OPTIONS_MAP.get(type.toLowerCase()); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 6c69c259c2..f83bd663fd 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -24,6 +24,10 @@ import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; +import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaSourceCustomConsumer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSourceJsonDeserializer; @@ -68,6 +72,7 @@ public class KafkaSource implements Source> { private String schemaType = MessageFormat.PLAINTEXT.toString(); private static final String SCHEMA_TYPE= "schemaType"; private final AcknowledgementSetManager acknowledgementSetManager; + private final EncryptionType encryptionType; @DataPrepperPluginConstructor public KafkaSource(final KafkaSourceConfig sourceConfig, @@ -80,6 +85,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, this.pipelineName = pipelineDescription.getPipelineName(); this.kafkaWorkerThreadProcessingErrors = pluginMetrics.counter(KAFKA_WORKER_THREAD_PROCESSING_ERRORS); shutdownInProgress = new AtomicBoolean(false); + this.encryptionType = sourceConfig.getEncryptionType(); } @Override @@ -162,13 +168,26 @@ private Properties getConsumerProperties(TopicConfig topicConfig) { properties.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId()); if (sourceConfig.getSchemaConfig() != null) { schemaType = getSchemaType(sourceConfig.getSchemaConfig().getRegistryURL(), topicConfig.getName(), sourceConfig.getSchemaConfig().getVersion()); - } + } if (schemaType.isEmpty()) { schemaType = MessageFormat.PLAINTEXT.toString(); } setPropertiesForSchemaType(properties, schemaType); - if (sourceConfig.getAuthConfig() != null && sourceConfig.getAuthConfig().getPlainTextAuthConfig() != null) { - setPropertiesForAuth(properties); + if (sourceConfig.getAuthConfig() != null) { + AuthConfig.SaslAuthConfig saslAuthConfig = sourceConfig.getAuthConfig().getSaslAuthConfig(); + if (saslAuthConfig != null) { + if (saslAuthConfig.getPlainTextAuthConfig() != null) { + setPlainTextAuthProperties(properties); + } else if (saslAuthConfig.getAwsIamAuthConfig() != null) { + if (encryptionType == EncryptionType.PLAINTEXT) { + throw new RuntimeException("Encryption Config must be SSL to use IAM authentication mechanism"); + } + setAwsIamAuthProperties(properties, saslAuthConfig.getAwsIamAuthConfig(), sourceConfig.getAwsConfig()); + } else if (saslAuthConfig.getOAuthConfig() != null) { + } else { + throw new RuntimeException("No SASL auth config specified"); + } + } } LOG.info("Starting consumer with the properties : {}", properties); return properties; @@ -210,9 +229,28 @@ private String getSchemaRegistryUrl() { return sourceConfig.getSchemaConfig().getRegistryURL(); } - private void setPropertiesForAuth(Properties properties) { - String username = sourceConfig.getAuthConfig().getPlainTextAuthConfig().getUsername(); - String password = sourceConfig.getAuthConfig().getPlainTextAuthConfig().getPassword(); + private void setAwsIamAuthProperties(Properties properties, AwsIamAuthConfig awsIamAuthConfig, AwsConfig awsConfig) { + if (awsConfig == null) { + throw new RuntimeException("AWS Config is not specified"); + } + properties.put("security.protocol", "SASL_SSL"); + properties.put("sasl.mechanism", "AWS_MSK_IAM"); + properties.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler"); + if (awsIamAuthConfig == AwsIamAuthConfig.ROLE) { + properties.put("sasl.jaas.config", + "software.amazon.msk.auth.iam.IAMLoginModule required " + + "awsRoleArn=\"" + awsConfig.getStsRoleArn()+ + "\" awsStsRegion=\""+ awsConfig.getRegion()+"\";"); + } else if (awsIamAuthConfig == AwsIamAuthConfig.DEFAULT) { + properties.put("sasl.jaas.config", + "software.amazon.msk.auth.iam.IAMLoginModule required;"); + } + } + + private void setPlainTextAuthProperties(Properties properties) { + + String username = sourceConfig.getAuthConfig().getSaslAuthConfig().getPlainTextAuthConfig().getUsername(); + String password = sourceConfig.getAuthConfig().getSaslAuthConfig().getPlainTextAuthConfig().getPassword(); properties.put("sasl.mechanism", "PLAIN"); properties.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"); properties.put("security.protocol", "SASL_PLAINTEXT"); @@ -238,6 +276,8 @@ private static String getSchemaType(final String registryUrl, final String topic Object json = mapper.readValue(response.toString(), Object.class); String indented = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json); JsonNode rootNode = mapper.readTree(indented); + // If the entry exists but schema type doesn't exist then + // the schemaType defaults to AVRO if (rootNode.has(SCHEMA_TYPE)) { JsonNode node = rootNode.findValue(SCHEMA_TYPE); schemaType = node.textValue(); @@ -247,7 +287,12 @@ private static String getSchemaType(final String registryUrl, final String topic } else { InputStream errorStream = connection.getErrorStream(); String errorMessage = readErrorMessage(errorStream); - LOG.error("GET request failed while fetching the schema registry details : {}", errorMessage); + // Plaintext is not a valid schematype in schema registry + // So, if it doesn't exist in schema regitry, default + // the schemaType to PLAINTEXT + LOG.error("GET request failed while fetching the schema registry. Defaulting to schema type PLAINTEXT"); + return MessageFormat.PLAINTEXT.toString(); + } } catch (IOException e) { LOG.error("An error while fetching the schema registry details : ", e); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfigTest.java index 1a610cedd4..8e5d0546b7 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AuthConfigTest.java @@ -1,10 +1,16 @@ package org.opensearch.dataprepper.plugins.kafka.configuration; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.hamcrest.Matchers.equalTo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; import org.yaml.snakeyaml.Yaml; import java.io.FileReader; @@ -18,11 +24,24 @@ import static org.hamcrest.Matchers.notNullValue; +@ExtendWith(MockitoExtension.class) class AuthConfigTest { @Mock AuthConfig authConfig; + @Mock + AuthConfig.SaslAuthConfig saslAuthConfig; + + @Mock + AwsIamAuthConfig testAwsIamAuthConfig; + + @Mock + OAuthConfig testOAuthConfig; + + @Mock + PlainTextAuthConfig testPlainTextConfig; + @BeforeEach void setUp() throws IOException { authConfig = new AuthConfig(); @@ -44,12 +63,43 @@ void setUp() throws IOException { } @Test - void testConfig() { + void testPlainTextAuthConfig() { assertThat(authConfig, notNullValue()); - assertThat(authConfig.getPlainTextAuthConfig(), notNullValue()); - assertThat(authConfig.getPlainTextAuthConfig(), hasProperty("username")); - assertThat(authConfig.getPlainTextAuthConfig(), hasProperty("password")); - assertThat(authConfig.getoAuthConfig(), notNullValue()); + assertThat(authConfig.getSaslAuthConfig(), notNullValue()); + assertThat(authConfig.getSaslAuthConfig().getPlainTextAuthConfig(), notNullValue()); + assertThat(authConfig.getSaslAuthConfig().getPlainTextAuthConfig(), hasProperty("username")); + assertThat(authConfig.getSaslAuthConfig().getPlainTextAuthConfig(), hasProperty("password")); + assertThat(authConfig.getSaslAuthConfig().getOAuthConfig(), notNullValue()); + } + + @Test + void testSaslAuthConfigWithPlainText() throws NoSuchFieldException, IllegalAccessException { + saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class); + testPlainTextConfig = mock(PlainTextAuthConfig.class); + when(saslAuthConfig.getPlainTextAuthConfig()).thenReturn(testPlainTextConfig); + setField(AuthConfig.class, authConfig, "saslAuthConfig", saslAuthConfig); + assertThat(authConfig.getSaslAuthConfig(), equalTo(saslAuthConfig)); + assertThat(authConfig.getSaslAuthConfig().getPlainTextAuthConfig(), equalTo(testPlainTextConfig)); + } + + @Test + void testSaslAuthConfigWithMskIam() throws NoSuchFieldException, IllegalAccessException { + saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class); + AwsIamAuthConfig awsIamAuthConfig = AwsIamAuthConfig.ROLE; + when(saslAuthConfig.getAwsIamAuthConfig()).thenReturn(awsIamAuthConfig); + setField(AuthConfig.class, authConfig, "saslAuthConfig", saslAuthConfig); + assertThat(authConfig.getSaslAuthConfig(), equalTo(saslAuthConfig)); + assertThat(authConfig.getSaslAuthConfig().getAwsIamAuthConfig(), equalTo(awsIamAuthConfig)); + } + + @Test + void testSaslAuthConfigWithOAuth() throws NoSuchFieldException, IllegalAccessException { + saslAuthConfig = mock(AuthConfig.SaslAuthConfig.class); + testOAuthConfig = mock(OAuthConfig.class); + when(saslAuthConfig.getOAuthConfig()).thenReturn(testOAuthConfig); + setField(AuthConfig.class, authConfig, "saslAuthConfig", saslAuthConfig); + assertThat(authConfig.getSaslAuthConfig(), equalTo(saslAuthConfig)); + assertThat(authConfig.getSaslAuthConfig().getOAuthConfig(), equalTo(testOAuthConfig)); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfigTest.java index d3facb361f..b82694f4f6 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsConfigTest.java @@ -9,10 +9,10 @@ import org.junit.jupiter.api.Test; import java.lang.reflect.Field; -import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import org.apache.commons.lang3.RandomStringUtils; class AwsConfigTest { @@ -24,10 +24,18 @@ void setUp() { } @Test - void getMskArn_notNull() throws NoSuchFieldException, IllegalAccessException { - final String testArn = UUID.randomUUID().toString(); - reflectivelySetField(awsConfig, "awsMskArn", testArn); - assertThat(awsConfig.getAwsMskArn(), equalTo(testArn)); + void TestConfigOptions_notNull() throws NoSuchFieldException, IllegalAccessException { + final AwsConfig.AwsMskConfig testMskConfig = new AwsConfig.AwsMskConfig(); + reflectivelySetField(awsConfig, "awsMskConfig", testMskConfig); + assertThat(awsConfig.getAwsMskConfig(), equalTo(testMskConfig)); + + final String testStsRoleArn = RandomStringUtils.randomAlphabetic(10); + reflectivelySetField(awsConfig, "stsRoleArn", testStsRoleArn); + assertThat(awsConfig.getStsRoleArn(), equalTo(testStsRoleArn)); + + final String testRegion = RandomStringUtils.randomAlphabetic(8); + reflectivelySetField(awsConfig, "region", testRegion); + assertThat(awsConfig.getRegion(), equalTo(testRegion)); } private void reflectivelySetField(final AwsConfig awsConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException { diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsIamAuthConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsIamAuthConfigTest.java new file mode 100644 index 0000000000..739f1ef07e --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsIamAuthConfigTest.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +class AwsIamAuthConfigTest { + @ParameterizedTest + @EnumSource(AwsIamAuthConfig.class) + void fromTypeValue(final AwsIamAuthConfig type) { + assertThat(AwsIamAuthConfig.fromOptionValue(type.name()), is(type)); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionTypeTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionTypeTest.java new file mode 100644 index 0000000000..7f40ed3322 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/EncryptionTypeTest.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +class EncryptionTypeTest { + @ParameterizedTest + @EnumSource(EncryptionType.class) + void fromTypeValue(final EncryptionType type) { + assertThat(EncryptionType.fromTypeValue(type.name()), is(type)); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java index 552baed388..5b1744faaf 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSourceConfigTest.java @@ -91,5 +91,10 @@ void test_setters() throws NoSuchFieldException, IllegalAccessException { setField(KafkaSourceConfig.class, kafkaSourceConfig, "acknowledgementsTimeout", testTimeout); assertEquals(true, kafkaSourceConfig.getAcknowledgementsEnabled()); assertEquals(testTimeout, kafkaSourceConfig.getAcknowledgementsTimeout()); + assertEquals(EncryptionType.SSL, kafkaSourceConfig.getEncryptionType()); + setField(KafkaSourceConfig.class, kafkaSourceConfig, "encryptionType", EncryptionType.PLAINTEXT); + assertEquals(EncryptionType.PLAINTEXT, kafkaSourceConfig.getEncryptionType()); + setField(KafkaSourceConfig.class, kafkaSourceConfig, "encryptionType", EncryptionType.SSL); + assertEquals(EncryptionType.SSL, kafkaSourceConfig.getEncryptionType()); } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/MskBrokerConnectionTypeTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/MskBrokerConnectionTypeTest.java new file mode 100644 index 0000000000..6beef5b47c --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/MskBrokerConnectionTypeTest.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafka.configuration; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +class MskBrokerConnectionTypeTest { + @ParameterizedTest + @EnumSource(MskBrokerConnectionType.class) + void fromTypeValue(final MskBrokerConnectionType type) { + assertThat(MskBrokerConnectionType.fromTypeValue(type.name()), is(type)); + } +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/OAuthConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/OAuthConfigTest.java index fd5dbd3a60..25228da2ba 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/OAuthConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/OAuthConfigTest.java @@ -51,7 +51,7 @@ void setUp() throws IOException { String json = mapper.writeValueAsString(kafkaConfigMap); Reader reader = new StringReader(json); KafkaSourceConfig kafkaSourceConfig = mapper.readValue(reader, KafkaSourceConfig.class); - oAuthConfig = kafkaSourceConfig.getAuthConfig().getoAuthConfig(); + oAuthConfig = kafkaSourceConfig.getAuthConfig().getSaslAuthConfig().getOAuthConfig(); oauthClientId= oAuthConfig.getOauthClientId(); oauthClientSecret = oAuthConfig.getOauthClientSecret(); oauthLoginServer= oAuthConfig.getOauthLoginServer(); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfigTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfigTest.java index 6afbfd4e4a..560f0042eb 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfigTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/PlainTextAuthConfigTest.java @@ -37,7 +37,7 @@ void setUp() throws IOException { String json = mapper.writeValueAsString(kafkaConfigMap); Reader reader = new StringReader(json); KafkaSourceConfig kafkaSourceConfig = mapper.readValue(reader, KafkaSourceConfig.class); - plainTextAuthConfig = kafkaSourceConfig.getAuthConfig().getPlainTextAuthConfig(); + plainTextAuthConfig = kafkaSourceConfig.getAuthConfig().getSaslAuthConfig().getPlainTextAuthConfig(); username = plainTextAuthConfig.getUsername(); password = plainTextAuthConfig.getPassword(); } @@ -48,4 +48,4 @@ void testConfigValues() { assertEquals(plainTextAuthConfig.getUsername(), username); assertEquals(plainTextAuthConfig.getPassword(), password); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml index efd860a8a9..6037ad4839 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml +++ b/data-prepper-plugins/kafka-plugins/src/test/resources/sample-pipelines.yaml @@ -30,22 +30,23 @@ log-pipeline: registry_url: http://localhost:8081/ version: 1 authentication: - sasl_plaintext: - username: admin - password: admin-secret - sasl_oauth: - oauth_client_id: 0oa9wc21447Pc5vsV5d7 - oauth_client_secret: aGmOfHqIEvBJGDxXAOOcatiE9PvsPgoEePx8IPPa - oauth_login_server: https://dev-13650048.okta.com - oauth_login_endpoint: /oauth2/default/v1/token - oauth_login_grant_type: refresh_token - oauth_login_scope: kafka - oauth_introspect_server: https://dev-13650048.okta.com - oauth_introspect_endpoint: /oauth2/default/v1/introspect - oauth_token_endpoint_url: https://dev-13650048.okta.com/oauth2/default/v1/token - oauth_sasl_mechanism: OAUTHBEARER - oauth_security_protocol: SASL_PLAINTEXT - oauth_sasl_login_callback_handler_class: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler - oauth_jwks_endpoint_url: https://dev-13650048.okta.com/oauth2/default/v1/keys + sasl: + plaintext: + username: admin + password: admin-secret + oauth: + oauth_client_id: 0oa9wc21447Pc5vsV5d7 + oauth_client_secret: aGmOfHqIEvBJGDxXAOOcatiE9PvsPgoEePx8IPPa + oauth_login_server: https://dev-13650048.okta.com + oauth_login_endpoint: /oauth2/default/v1/token + oauth_login_grant_type: refresh_token + oauth_login_scope: kafka + oauth_introspect_server: https://dev-13650048.okta.com + oauth_introspect_endpoint: /oauth2/default/v1/introspect + oauth_token_endpoint_url: https://dev-13650048.okta.com/oauth2/default/v1/token + oauth_sasl_mechanism: OAUTHBEARER + oauth_security_protocol: SASL_PLAINTEXT + oauth_sasl_login_callback_handler_class: org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler + oauth_jwks_endpoint_url: https://dev-13650048.okta.com/oauth2/default/v1/keys sink: - stdout: From b7d8af104f932a82c3174f40013a6344d3f8aac3 Mon Sep 17 00:00:00 2001 From: David Venable Date: Thu, 13 Jul 2023 12:02:45 -0500 Subject: [PATCH 2/5] Adds the Data Prepper 2.3.2 change log. (#3024) Signed-off-by: David Venable --- .../data-prepper.change-log-2.3.2.md | 198 ++++++++++++++++++ 1 file changed, 198 insertions(+) create mode 100644 release/release-notes/data-prepper.change-log-2.3.2.md diff --git a/release/release-notes/data-prepper.change-log-2.3.2.md b/release/release-notes/data-prepper.change-log-2.3.2.md new file mode 100644 index 0000000000..d06876e332 --- /dev/null +++ b/release/release-notes/data-prepper.change-log-2.3.2.md @@ -0,0 +1,198 @@ + +* __Added 2.3.1 change log (#2872) (#2905)__ + + [opensearch-trigger-bot[bot]](mailto:98922864+opensearch-trigger-bot[bot]@users.noreply.github.com) - Wed, 12 Jul 2023 11:39:01 -0500 + + * Added 2.3.1 change log + Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com> + + * Updated change log + Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com> + + --------- + Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com> + (cherry picked from commit 0e1aa457de25ee2de712db6f0c7c7316587b92b7) + Co-authored-by: Asif Sohail Mohammed <nsifmoh@amazon.com> + +* __Generated THIRD-PARTY file for 8fb9d79 (#3014)__ + + [opensearch-trigger-bot[bot]](mailto:98922864+opensearch-trigger-bot[bot]@users.noreply.github.com) - Wed, 12 Jul 2023 09:46:13 -0500 + + + Signed-off-by: GitHub <noreply@github.com> + Co-authored-by: dlvenable + <dlvenable@users.noreply.github.com> + +* __Updated to Data Prepper 2.3.2 for release. (#3013)__ + + [David Venable](mailto:dlv@amazon.com) - Wed, 12 Jul 2023 09:45:57 -0500 + + + Signed-off-by: David Venable <dlv@amazon.com> + +* __Fix bucket ownership validation. Resolves #3005 (#3009) (#3011)__ + + [opensearch-trigger-bot[bot]](mailto:98922864+opensearch-trigger-bot[bot]@users.noreply.github.com) - Wed, 12 Jul 2023 09:13:29 -0500 + + + Signed-off-by: David Venable <dlv@amazon.com> + (cherry picked from commit decccb9a82e01cbaa059ce32d781d8614c494111) + Co-authored-by: David Venable <dlv@amazon.com> + +* __Remove validation that made keys starting or ending with . - or _ inv… (#3007)__ + + [Asif Sohail Mohammed](mailto:nsifmoh@amazon.com) - Wed, 12 Jul 2023 09:12:49 -0500 + + + * Remove validation that made keys starting or ending with . - or _ invalid, + catch all exceptions in the parse json processor (#2945) + Remove validation that made keys starting or ending with . - or _ invalid, + catch all exceptions in the parse json processor + Signed-off-by: Taylor Gray <tylgry@amazon.com> + (cherry picked from commit 05d229a06ceddb21cd9dedcaf49b1d455272fe6f) + + * Removed readme + Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com> + + --------- + Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com> + Co-authored-by: + Taylor Gray <tylgry@amazon.com> + +* __Fix race condition in SqsWorker when acknowledgements are enabled (#3001) (#3010)__ + + [kkondaka](mailto:41027584+kkondaka@users.noreply.github.com) - Tue, 11 Jul 2023 22:11:10 -0700 + + + * Fix race condition in SqsWorker when acknowledgements are enabled + Signed-off-by: Krishna Kondaka + <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + + * Modified to do the synchronization in the acknowledgement set framework + Signed-off-by: Krishna Kondaka + <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + + * Fixed failing tests + Signed-off-by: Krishna Kondaka + <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + + * Removed unused variable + Signed-off-by: Krishna Kondaka + <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + + * Addressed review comment and fixed failing tests + Signed-off-by: Krishna Kondaka + <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + + * Addressed review comments + Signed-off-by: Krishna Kondaka + <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + + * Fixed failing tests + Signed-off-by: Krishna Kondaka + <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + + * Fixed checkStyle failure + Signed-off-by: Krishna Kondaka + <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + + --------- + Signed-off-by: Krishna Kondaka + <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + Co-authored-by: + Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + (cherry picked from commit 515cf6114f5270f5f4fc94eba6bdd62e45659944) + +* __Retry s3 reads on socket exceptions. (#2992) (#3008)__ + + [opensearch-trigger-bot[bot]](mailto:98922864+opensearch-trigger-bot[bot]@users.noreply.github.com) - Tue, 11 Jul 2023 16:55:29 -0500 + + + * Retry s3 reads on socket exceptions. + S3 will reset the conenction on their end frequently. To not lose data, + data + prepper should retry all socket exceptions by attempting to re-open + the + stream. + Signed-off-by: Adi Suresh <adsuresh@amazon.com> + + * Bubble up parquet exceptions. + Signed-off-by: Adi Suresh <adsuresh@amazon.com> + + --------- + Signed-off-by: Adi Suresh <adsuresh@amazon.com> + (cherry picked from commit 9f78542533dd24ed21e29a12950938c0c4b23636) + Co-authored-by: Adi Suresh <adsuresh@amazon.com> + +* __Fix S3 errors around end of file behavior. (#2983) (#3006)__ + + [opensearch-trigger-bot[bot]](mailto:98922864+opensearch-trigger-bot[bot]@users.noreply.github.com) - Tue, 11 Jul 2023 15:36:47 -0500 + + + Signed-off-by: Adi Suresh <adsuresh@amazon.com> + (cherry picked from commit 75fa735289ecf8d335d5681fa63a512e8a4ee03e) + Co-authored-by: Adi Suresh <adsuresh@amazon.com> + +* __Fix SqsWorker error messages (#2991) (#3002)__ + + [opensearch-trigger-bot[bot]](mailto:98922864+opensearch-trigger-bot[bot]@users.noreply.github.com) - Tue, 11 Jul 2023 12:57:30 -0700 + + + Signed-off-by: Krishna Kondaka + <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + Co-authored-by: + Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + (cherry picked from commit 45b6e554fdad117396b5cc3bbcf52b7b99b42a5a) + Co-authored-by: kkondaka <41027584+kkondaka@users.noreply.github.com> + +* __Fix CVE-2023-35165, CVE-2023-34455, CVE-2023-34453, CVE-2023-34454, C… (#2948) (#2952)__ + + [opensearch-trigger-bot[bot]](mailto:98922864+opensearch-trigger-bot[bot]@users.noreply.github.com) - Tue, 11 Jul 2023 14:33:01 -0500 + + + * Fix CVE-2023-35165, CVE-2023-34455, CVE-2023-34453, CVE-2023-34454, + CVE-2023-2976 + Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com> + + * Updated snappy version in build.gradle files + Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com> + + --------- + Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com> + (cherry picked from commit 8e2145cc4c00fb2a93b97a3fcdb689609e23ff63) + Co-authored-by: Asif Sohail Mohammed <nsifmoh@amazon.com> + +* __Fix DLQ writer writing empty list (#2931) (#2998)__ + + [opensearch-trigger-bot[bot]](mailto:98922864+opensearch-trigger-bot[bot]@users.noreply.github.com) - Mon, 10 Jul 2023 09:46:24 -0700 + + + Signed-off-by: Krishna Kondaka + <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + Co-authored-by: + Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + (cherry picked from commit 1dd8bd385fd61fbb269e2eca17bca431189060bf) + Co-authored-by: kkondaka <41027584+kkondaka@users.noreply.github.com> + +* __Fix addTags API in EventMetadata (#2926) (#2996)__ + + [opensearch-trigger-bot[bot]](mailto:98922864+opensearch-trigger-bot[bot]@users.noreply.github.com) - Mon, 10 Jul 2023 09:41:29 -0700 + + + Signed-off-by: Krishna Kondaka + <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + Co-authored-by: + Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> + (cherry picked from commit 5565337ec07281f8b58f65e275185112811357c0) + Co-authored-by: kkondaka <41027584+kkondaka@users.noreply.github.com> + +* __Updated the release date (#2911) (#2912)__ + + [opensearch-trigger-bot[bot]](mailto:98922864+opensearch-trigger-bot[bot]@users.noreply.github.com) - Tue, 20 Jun 2023 13:45:59 -0500 + + + Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com> + (cherry picked from commit 7649059824b286dfb714ed78ab12c9c42e53ff92) + Co-authored-by: Asif Sohail Mohammed <nsifmoh@amazon.com> + + From ec37e23ccc3875d458eca336c2bfeb6a6e56e35e Mon Sep 17 00:00:00 2001 From: Vishal Boinapalli Date: Thu, 13 Jul 2023 22:18:06 -0700 Subject: [PATCH 3/5] Translate Plugin: Simplified Config. (#3022) * Translate Plugin: Simplified Config. Added functionality for multiple sources and multiple targets Signed-off-by: Vishal Boinapalli * Moved helper methods out of config file Signed-off-by: Vishal Boinapalli --------- Signed-off-by: Vishal Boinapalli --- .../translate/MappingsParameterConfig.java | 53 +++++ .../processor/translate/MappingsParser.java | 101 ++++++++ .../translate/TargetsParameterConfig.java | 137 +++++++++++ .../translate/TranslateProcessor.java | 180 +++++---------- .../translate/TranslateProcessorConfig.java | 104 +-------- .../MappingsParameterConfigTest.java | 58 +++++ .../translate/TargetsParameterConfigTest.java | 68 ++++++ .../TranslateProcessorConfigTest.java | 79 +------ .../translate/TranslateProcessorTest.java | 215 ++++++++++++------ 9 files changed, 632 insertions(+), 363 deletions(-) create mode 100644 data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfig.java create mode 100644 data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParser.java create mode 100644 data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TargetsParameterConfig.java create mode 100644 data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfigTest.java create mode 100644 data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TargetsParameterConfigTest.java diff --git a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfig.java b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfig.java new file mode 100644 index 0000000000..25c225188e --- /dev/null +++ b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfig.java @@ -0,0 +1,53 @@ +package org.opensearch.dataprepper.plugins.processor.translate; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.NotNull; + +import java.util.List; + +public class MappingsParameterConfig { + + @JsonProperty("source") + @NotNull + private Object source; + + @JsonProperty("iterate_on") + private String iterateOn; + + @JsonProperty("targets") + @NotNull + private List targetsParameterConfigs; + + + public Object getSource() { + return source; + } + + public String getIterateOn() { + return iterateOn; + } + + public List getTargetsParameterConfigs() { + return targetsParameterConfigs; + } + + public void parseMappings(){ + for(TargetsParameterConfig targetsParameterConfig: targetsParameterConfigs){ + targetsParameterConfig.parseMappings(); + } + } + + @AssertTrue(message = "source field must be a string or list of strings") + public boolean isSourceFieldValid() { + if (source instanceof String) { + return true; + } + if (source instanceof List) { + List sourceList = (List) source; + return sourceList.stream().allMatch(sourceItem -> sourceItem instanceof String); + } + return false; + } + +} diff --git a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParser.java b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParser.java new file mode 100644 index 0000000000..4807e308bf --- /dev/null +++ b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParser.java @@ -0,0 +1,101 @@ +package org.opensearch.dataprepper.plugins.processor.translate; + +import org.apache.commons.lang3.Range; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Pattern; + +public class MappingsParser { + private final LinkedHashMap, Object> rangeMappings = new LinkedHashMap<>(); + private final Map individualMappings = new HashMap<>(); + private final Map compiledPatterns = new HashMap<>(); + public MappingsParser(TargetsParameterConfig targetConfig){ + RegexParameterConfiguration regexConfig = targetConfig.getRegexParameterConfiguration(); + if (Objects.nonNull(regexConfig)) { + compilePatterns(regexConfig.getPatterns()); + } + processMapField(targetConfig.getMap()); + checkOverlappingKeys(); + } + + public Map fetchIndividualMappings() { return individualMappings; } + + public LinkedHashMap, Object> fetchRangeMappings() { return rangeMappings; } + + public Map fetchCompiledPatterns() { return compiledPatterns; } + + private void compilePatterns(Map mappings) { + for (String pattern : mappings.keySet()) { + Pattern compiledPattern = Pattern.compile(pattern); + compiledPatterns.put(compiledPattern, mappings.get(pattern)); + } + } + + private void processMapField(Map map) { + if (Objects.nonNull(map)) { + for (Map.Entry mapEntry : map.entrySet()) { + parseIndividualKeys(mapEntry); + } + } + } + + private void parseIndividualKeys(Map.Entry mapEntry) { + String[] commaSeparatedKeys = mapEntry.getKey().split(","); + for (String individualKey : commaSeparatedKeys) { + if (individualKey.contains("-")) { + addRangeMapping(Map.entry(individualKey, mapEntry.getValue())); + } else { + addIndividualMapping(individualKey, mapEntry.getValue()); + } + } + } + + private void addRangeMapping(Map.Entry mapEntry) { + String[] rangeKeys = mapEntry.getKey().split("-"); + if (rangeKeys.length != 2 || !StringUtils.isNumericSpace(rangeKeys[0]) || !StringUtils.isNumericSpace(rangeKeys[1])) { + addIndividualMapping(mapEntry.getKey(), mapEntry.getValue()); + } else { + Float lowKey = Float.parseFloat(rangeKeys[0]); + Float highKey = Float.parseFloat(rangeKeys[1]); + Range rangeEntry = Range.between(lowKey, highKey); + if (isRangeOverlapping(rangeEntry)) { + String exceptionMsg = "map option contains key " + mapEntry.getKey() + " that overlaps with other range entries"; + throw new InvalidPluginConfigurationException(exceptionMsg); + } else { + rangeMappings.put(Range.between(lowKey, highKey), mapEntry.getValue()); + } + } + } + + private void addIndividualMapping(final String key, final Object value) { + if (individualMappings.containsKey(key)) { + String exceptionMsg = "map option contains duplicate entries of " + key; + throw new InvalidPluginConfigurationException(exceptionMsg); + } else { + individualMappings.put(key.strip(), value); + } + } + + private boolean isRangeOverlapping(Range rangeEntry) { + return rangeMappings.keySet().stream().anyMatch(range -> range.isOverlappedBy(rangeEntry)); + } + + private void checkOverlappingKeys() { + for (String individualKey : individualMappings.keySet()) { + if (NumberUtils.isParsable(individualKey)) { + Float floatKey = Float.parseFloat(individualKey); + Range range = Range.between(floatKey, floatKey); + if (isRangeOverlapping(range)) { + String exceptionMsg = "map option contains key " + individualKey + " that overlaps with other range entries"; + throw new InvalidPluginConfigurationException(exceptionMsg); + } + } + } + } +} diff --git a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TargetsParameterConfig.java b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TargetsParameterConfig.java new file mode 100644 index 0000000000..523ae1e606 --- /dev/null +++ b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TargetsParameterConfig.java @@ -0,0 +1,137 @@ +package org.opensearch.dataprepper.plugins.processor.translate; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import org.apache.commons.lang3.Range; +import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType; +import org.opensearch.dataprepper.typeconverter.TypeConverter; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +public class TargetsParameterConfig { + private final TypeConverter converter; + private final LinkedHashMap, Object> rangeMappings = new LinkedHashMap<>(); + private final Map individualMappings = new HashMap<>(); + private final Map compiledPatterns = new HashMap<>(); + @JsonProperty("target") + @NotNull + @NotEmpty + private String target; + @JsonProperty("map") + private Map map; + @JsonProperty("translate_when") + private String translateWhen; + @JsonProperty("regex") + private RegexParameterConfiguration regexParameterConfig; + @JsonProperty("default") + private String defaultValue; + @JsonProperty("target_type") + private TargetType targetType = TargetType.STRING; + + public TargetsParameterConfig(Map map, String target, RegexParameterConfiguration regexParameterConfig, String translateWhen, String defaultValue, TargetType targetType) { + this.targetType = Optional + .ofNullable(targetType) + .orElse(TargetType.STRING); + this.target = target; + this.map = map; + this.defaultValue = defaultValue; + this.regexParameterConfig = regexParameterConfig; + this.converter = this.targetType.getTargetConverter(); + this.translateWhen = translateWhen; + parseMappings(); + } + + public String getTarget() { + return target; + } + + public Map getMap() { + return map; + } + + public String getDefaultValue() { + return defaultValue; + } + + public String getTranslateWhen() { + return translateWhen; + } + + public TargetType getTargetType() { + return targetType; + } + + public RegexParameterConfiguration getRegexParameterConfiguration() { + return regexParameterConfig; + } + + public Map fetchIndividualMappings() { + return individualMappings; + } + + public LinkedHashMap, Object> fetchRangeMappings() { + return rangeMappings; + } + + public Map fetchCompiledPatterns() { + return compiledPatterns; + } + + public TypeConverter getConverter() { + return converter; + } + + + @AssertTrue(message = "pattern option is mandatory while configuring regex option") + public boolean isPatternPresent() { + return regexParameterConfig == null || regexParameterConfig.getPatterns() != null; + } + + @AssertTrue(message = "Either map or patterns option needs to be configured under targets.") + public boolean hasMappings() { + return Stream.of(map, regexParameterConfig).filter(n -> n != null).count() != 0; + } + + @AssertTrue(message = "The mapped values do not match the target type provided") + public boolean isMapTypeValid() { + return map.keySet().stream().allMatch(key -> checkTargetValueType(map.get(key))); + } + + @AssertTrue(message = "The pattern values do not match the target type provided") + public boolean isPatternTypeValid() { + if (Objects.isNull(regexParameterConfig) || Objects.isNull(regexParameterConfig.getPatterns())) { + return true; + } + Map patterns = regexParameterConfig.getPatterns(); + return patterns.keySet().stream().allMatch(key -> checkTargetValueType(patterns.get(key))); + } + + private boolean checkTargetValueType(Object val) throws NumberFormatException { + if (Objects.isNull(targetType)) { + return true; + } + try { + final TypeConverter converter = targetType.getTargetConverter(); + converter.convert(val); + } catch (Exception ex) { + return false; + } + return true; + } + + public void parseMappings() { + MappingsParser parser = new MappingsParser(this); + individualMappings.putAll(parser.fetchIndividualMappings()); + rangeMappings.putAll(parser.fetchRangeMappings()); + compiledPatterns.putAll(parser.fetchCompiledPatterns()); + } + +} diff --git a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessor.java b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessor.java index 476bfa807b..bda5df91d4 100644 --- a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessor.java +++ b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessor.java @@ -6,7 +6,6 @@ package org.opensearch.dataprepper.plugins.processor.translate; import org.apache.commons.lang3.Range; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -23,7 +22,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -38,106 +36,17 @@ @DataPrepperPlugin(name = "translate", pluginType = Processor.class, pluginConfigurationType = TranslateProcessorConfig.class) public class TranslateProcessor extends AbstractProcessor, Record> { - private static final Logger LOG = LoggerFactory.getLogger(TranslateProcessor.class); private final ExpressionEvaluator expressionEvaluator; - private final TranslateProcessorConfig translateProcessorConfig; - private final LinkedHashMap, Object> rangeMappings; - private final Map individualMappings; - private final Map compiledPatterns; - private final TypeConverter converter; + private final List mappingsConfig; @DataPrepperPluginConstructor public TranslateProcessor(PluginMetrics pluginMetrics, final TranslateProcessorConfig translateProcessorConfig, final ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); - this.translateProcessorConfig = translateProcessorConfig; this.expressionEvaluator = expressionEvaluator; - this.converter = translateProcessorConfig.getTargetType().getTargetConverter(); - individualMappings = new HashMap<>(); - rangeMappings = new LinkedHashMap<>(); - compiledPatterns = new HashMap<>(); - if (Objects.nonNull(this.translateProcessorConfig.getRegexParameterConfiguration())) { - compilePatterns(translateProcessorConfig - .getRegexParameterConfiguration() - .getPatterns()); - } - processMapField(translateProcessorConfig.getMap()); + mappingsConfig = translateProcessorConfig.getMappingsParameterConfigs(); + mappingsConfig.forEach(MappingsParameterConfig::parseMappings); parseFile(translateProcessorConfig.getFilePath()); - checkOverlappingKeys(); - } - - private void compilePatterns(Map mappings) { - for (String pattern : mappings.keySet()) { - Pattern compiledPattern = Pattern.compile(pattern); - compiledPatterns.put(compiledPattern, mappings.get(pattern)); - } - } - - private void processMapField(Map map) { - if (Objects.nonNull(map)) { - for (Map.Entry mapEntry : map.entrySet()) { - parseIndividualKeys(mapEntry); - } - } - } - - private void parseIndividualKeys(Map.Entry mapEntry){ - String[] commaSeparatedKeys = mapEntry.getKey().split(","); - for(String individualKey : commaSeparatedKeys){ - if(individualKey.contains("-")){ - addRangeMapping(Map.entry(individualKey, mapEntry.getValue())); - } else { - addIndividualMapping(individualKey, mapEntry.getValue()); - } - } - } - - private void addRangeMapping(Map.Entry mapEntry){ - String[] rangeKeys = mapEntry.getKey().split("-"); - if(rangeKeys.length!=2 || !StringUtils.isNumericSpace(rangeKeys[0]) || !StringUtils.isNumericSpace(rangeKeys[1])){ - addIndividualMapping(mapEntry.getKey(), mapEntry.getValue()); - } else { - Float lowKey = Float.parseFloat(rangeKeys[0]); - Float highKey = Float.parseFloat(rangeKeys[1]); - Range rangeEntry = Range.between(lowKey, highKey); - if (isRangeOverlapping(rangeEntry)) { - String exceptionMsg = "map option contains key "+mapEntry.getKey()+" that overlaps with other range entries"; - throw new InvalidPluginConfigurationException(exceptionMsg); - } else { - rangeMappings.put(Range.between(lowKey, highKey), mapEntry.getValue()); - } - } - } - - private void addIndividualMapping(final String key, final Object value){ - if(individualMappings.containsKey(key)){ - String exceptionMsg = "map option contains duplicate entries of "+key; - throw new InvalidPluginConfigurationException(exceptionMsg); - } else { - individualMappings.put(key.strip(), value); - } - } - - private boolean isRangeOverlapping(Range rangeEntry) { - for (Range range : rangeMappings.keySet()) { - if (range.isOverlappedBy(rangeEntry)) { - return true; - } - } - return false; - } - - private void checkOverlappingKeys() { - for (String individualKey : individualMappings.keySet()) { - if (NumberUtils.isParsable(individualKey)) { - Float floatKey = Float.parseFloat(individualKey); - Range range = Range.between(floatKey, floatKey); - if (isRangeOverlapping(range)) { - String exceptionMsg = "map option contains key " + individualKey + " that overlaps with other range entries"; - throw new InvalidPluginConfigurationException(exceptionMsg); - } - } - } } private void parseFile(String filePath){ @@ -148,23 +57,28 @@ private void parseFile(String filePath){ public Collection> doExecute(Collection> records) { for (final Record record : records) { final Event recordEvent = record.getData(); - if (Objects.nonNull(translateProcessorConfig.getTranslateWhen()) && !expressionEvaluator.evaluateConditional(translateProcessorConfig.getTranslateWhen(), recordEvent)) { - continue; - } - try { - String iterateOn = translateProcessorConfig.getIterateOn(); - if (Objects.nonNull(iterateOn)) { - List> objectsToIterate = recordEvent.get(iterateOn, List.class); - for (Map recordObject : objectsToIterate) { - performMappings(recordObject); + for (MappingsParameterConfig mappingConfig : mappingsConfig) { + try { + String iterateOn = mappingConfig.getIterateOn(); + List targetsConfig = mappingConfig.getTargetsParameterConfigs(); + for (TargetsParameterConfig targetConfig : targetsConfig) { + String translateWhen = targetConfig.getTranslateWhen(); + Object sourceObject = mappingConfig.getSource(); + if (Objects.nonNull(translateWhen) && !expressionEvaluator.evaluateConditional(translateWhen, recordEvent)) { + continue; + } + if (Objects.nonNull(iterateOn)) { + List> objectsToIterate = recordEvent.get(iterateOn, List.class); + objectsToIterate.forEach(recordObject -> performMappings(recordObject, sourceObject, targetConfig)); + recordEvent.put(iterateOn, objectsToIterate); + } else { + performMappings(recordEvent, sourceObject, targetConfig); + } } - recordEvent.put(iterateOn, objectsToIterate); - } else { - performMappings(recordEvent); + } catch (Exception ex) { + LOG.error(EVENT, "Error mapping the source [{}] of entry [{}]", mappingConfig.getSource(), + record.getData(), ex); } - } catch (Exception ex) { - LOG.error(EVENT, "Error mapping the source [{}] of entry [{}]", translateProcessorConfig.getSource(), - record.getData(), ex); } } return records; @@ -178,16 +92,19 @@ private String getSourceValue(Object recordObject, String sourceKey) { } } - private Object getTargetValue(Object sourceObject, List targetValues){ + private Object getTargetValue(Object sourceObject, List targetValues, TargetsParameterConfig targetConfig) { + TypeConverter converter = targetConfig.getConverter(); if(sourceObject instanceof String) { return converter.convert(targetValues.get(0)); } - return targetValues.stream().map(converter::convert).collect(Collectors.toList()); + return targetValues + .stream() + .map(converter::convert) + .collect(Collectors.toList()); } - private void performMappings(Object recordObject) { + private void performMappings(Object recordObject, Object sourceObject, TargetsParameterConfig targetConfig) { List targetValues = new ArrayList<>(); - Object sourceObject = translateProcessorConfig.getSource(); List sourceKeys; if (sourceObject instanceof List) { sourceKeys = (ArrayList) sourceObject; @@ -199,34 +116,38 @@ private void performMappings(Object recordObject) { } for (String sourceKey : sourceKeys) { String sourceValue = getSourceValue(recordObject, sourceKey); - Optional targetValue = getTargetValueForSource(sourceValue); - targetValue.ifPresent(targetValues::add); + if(sourceValue!=null){ + Optional targetValue = getTargetValueForSource(sourceValue, targetConfig); + targetValue.ifPresent(targetValues::add); + } } - addTargetToRecords(sourceObject, targetValues, recordObject); + addTargetToRecords(sourceObject, targetValues, recordObject, targetConfig); } - private Optional getTargetValueForSource(final String sourceValue) { + private Optional getTargetValueForSource(final String sourceValue, TargetsParameterConfig targetConfig) { Optional targetValue = Optional.empty(); targetValue = targetValue - .or(() -> matchesIndividualEntry(sourceValue)) - .or(() -> matchesRangeEntry(sourceValue)) - .or(() -> matchesPatternEntry(sourceValue)) - .or(() -> Optional.ofNullable(translateProcessorConfig.getDefaultValue())); + .or(() -> matchesIndividualEntry(sourceValue, targetConfig)) + .or(() -> matchesRangeEntry(sourceValue, targetConfig)) + .or(() -> matchesPatternEntry(sourceValue, targetConfig)) + .or(() -> Optional.ofNullable(targetConfig.getDefaultValue())); return targetValue; } - private Optional matchesIndividualEntry(final String sourceValue) { + private Optional matchesIndividualEntry(final String sourceValue, TargetsParameterConfig targetConfig) { + Map individualMappings = targetConfig.fetchIndividualMappings(); if (individualMappings.containsKey(sourceValue)) { return Optional.of(individualMappings.get(sourceValue)); } return Optional.empty(); } - private Optional matchesRangeEntry(final String sourceValue) { + private Optional matchesRangeEntry(final String sourceValue, TargetsParameterConfig targetConfig) { if (!NumberUtils.isParsable(sourceValue)) { return Optional.empty(); } Float floatKey = Float.parseFloat(sourceValue); + LinkedHashMap, Object> rangeMappings = targetConfig.fetchRangeMappings(); for (Map.Entry, Object> rangeEntry : rangeMappings.entrySet()) { Range range = rangeEntry.getKey(); if (range.contains(floatKey)) { @@ -236,11 +157,12 @@ private Optional matchesRangeEntry(final String sourceValue) { return Optional.empty(); } - private Optional matchesPatternEntry(final String sourceValue) { + private Optional matchesPatternEntry(final String sourceValue, TargetsParameterConfig targetConfig) { + Map compiledPatterns = targetConfig.fetchCompiledPatterns(); if (compiledPatterns.isEmpty()) { return Optional.empty(); } - final boolean exact = translateProcessorConfig.getRegexParameterConfiguration().getExact(); + final boolean exact = targetConfig.getRegexParameterConfiguration().getExact(); for (Pattern pattern : compiledPatterns.keySet()) { Matcher matcher = pattern.matcher(sourceValue); if (matcher.matches() || (!exact && matcher.find())) { @@ -250,17 +172,17 @@ private Optional matchesPatternEntry(final String sourceValue) { return Optional.empty(); } - private void addTargetToRecords(Object sourceObject, List targetValues, Object recordObject) { + private void addTargetToRecords(Object sourceObject, List targetValues, Object recordObject, TargetsParameterConfig targetMappings) { if (targetValues.isEmpty()) { return; } - final String targetField = translateProcessorConfig.getTarget(); + final String targetField = targetMappings.getTarget(); if (recordObject instanceof Map) { Map recordMap = (Map) recordObject; - recordMap.put(targetField, getTargetValue(sourceObject, targetValues)); + recordMap.put(targetField, getTargetValue(sourceObject, targetValues, targetMappings)); } else if (recordObject instanceof Event) { Event event = (Event) recordObject; - event.put(targetField, getTargetValue(sourceObject, targetValues)); + event.put(targetField, getTargetValue(sourceObject, targetValues, targetMappings)); } } diff --git a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfig.java b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfig.java index 845442bc40..1bae063a6d 100644 --- a/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfig.java +++ b/data-prepper-plugins/translate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfig.java @@ -8,116 +8,32 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.AssertTrue; -import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; -import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType; -import org.opensearch.dataprepper.typeconverter.TypeConverter; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.stream.Stream; public class TranslateProcessorConfig { - - @JsonProperty("source") - @NotNull - private Object source; - - @JsonProperty("target") - @NotNull - @NotEmpty - private String target; - - @JsonProperty("map") - private Map map; - @JsonProperty("file_path") private String filePath; - @JsonProperty("default") - private String defaultValue; - - @JsonProperty("translate_when") - private String translateWhen; - - @JsonProperty("iterate_on") - private String iterateOn; - - @JsonProperty("regex") - private RegexParameterConfiguration regexParameterConfiguration; - - @JsonProperty("target_type") - private TargetType targetType = TargetType.STRING; - - - public Object getSource() { return source; } - - public String getTarget() { return target; } - - public Map getMap() { return map; } - - public String getDefaultValue() { return defaultValue; } - - public String getFilePath() { return filePath; } - - public String getTranslateWhen() { return translateWhen; } - - public String getIterateOn() { return iterateOn; } - - public TargetType getTargetType() { return targetType; } - - public RegexParameterConfiguration getRegexParameterConfiguration(){ return regexParameterConfiguration; } - - - @AssertTrue(message = "source field must be a string or list of strings") - public boolean isSourceFieldValid(){ - if(source instanceof String){ - return true; - } - if(source instanceof List){ - List sourceList = (List) source; - return sourceList.stream().allMatch(sourceItem -> sourceItem instanceof String); - } - return false; - } - - @AssertTrue(message = "Either of map or patterns or file_path options need to be configured.") - public boolean hasMappings() { - return Stream.of(map, filePath, regexParameterConfiguration).filter(n -> n!=null).count() != 0; - } + @NotNull + @JsonProperty("mappings") + private List mappingsParameterConfigs; - @AssertTrue(message = "pattern option is mandatory while configuring regex option") - public boolean isPatternPresent(){ - return regexParameterConfiguration == null || regexParameterConfiguration.getPatterns() != null; + public String getFilePath() { + return filePath; } - @AssertTrue(message = "The mapped values do not match the target type provided") - public boolean isMapTypeValid() { - return map.keySet().stream().allMatch(key -> checkTargetValueType(map.get(key))); + public List getMappingsParameterConfigs() { + return mappingsParameterConfigs; } - @AssertTrue(message = "The pattern values do not match the target type provided") - public boolean isPatternTypeValid() { - if (Objects.isNull(regexParameterConfiguration) || Objects.isNull(regexParameterConfiguration.getPatterns())) { - return true; - } - Map patterns = regexParameterConfiguration.getPatterns(); - return patterns.keySet().stream().allMatch(key -> checkTargetValueType(patterns.get(key))); + @AssertTrue(message = "Either mappings or file_path option needs to be configured.") + public boolean hasMappings() { + return Stream.of(mappingsParameterConfigs, filePath).filter(n -> n != null).count() != 0; } - private boolean checkTargetValueType(Object val) throws NumberFormatException { - if (Objects.isNull(targetType)) { - return true; - } - try { - final TypeConverter converter = targetType.getTargetConverter(); - converter.convert(val); - } catch (Exception ex) { - return false; - } - return true; - } } diff --git a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfigTest.java b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfigTest.java new file mode 100644 index 0000000000..68fdeb9e41 --- /dev/null +++ b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/MappingsParameterConfigTest.java @@ -0,0 +1,58 @@ +package org.opensearch.dataprepper.plugins.processor.translate; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.core.Is.is; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.hamcrest.MatcherAssert.assertThat; +import java.util.List; + +class MappingsParameterConfigTest { + + private MappingsParameterConfig mappingsParameterConfig; + + @BeforeEach + void setup() throws NoSuchFieldException, IllegalAccessException{ + mappingsParameterConfig = new MappingsParameterConfig(); + setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", "sourceKey"); + } + + @Test + void test_get_iterate_on() throws NoSuchFieldException, IllegalAccessException{ + assertNull(mappingsParameterConfig.getIterateOn()); + setField(MappingsParameterConfig.class, mappingsParameterConfig, "iterateOn", "iteratorField"); + assertThat(mappingsParameterConfig.getIterateOn(),is("iteratorField")); + } + + @Test + void test_get_source() { + assertThat(mappingsParameterConfig.getSource(),is("sourceKey")); + } + + @Test + void test_source_field_valid_types() throws NoSuchFieldException, IllegalAccessException{ + setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", "key1"); + assertTrue(mappingsParameterConfig.isSourceFieldValid()); + assertThat(mappingsParameterConfig.getSource(), is("key1")); + setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", List.of("key1", "key2", "key3")); + assertTrue(mappingsParameterConfig.isSourceFieldValid()); + assertThat(mappingsParameterConfig.getSource(), is(List.of("key1", "key2", "key3"))); + } + + @Test + void test_source_field_invalid_types() throws NoSuchFieldException, IllegalAccessException{ + setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", 200); + assertFalse(mappingsParameterConfig.isSourceFieldValid()); + setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", false); + assertFalse(mappingsParameterConfig.isSourceFieldValid()); + setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", 20.1); + assertFalse(mappingsParameterConfig.isSourceFieldValid()); + setField(MappingsParameterConfig.class, mappingsParameterConfig, "source", List.of("key1", 200)); + assertFalse(mappingsParameterConfig.isSourceFieldValid()); + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TargetsParameterConfigTest.java b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TargetsParameterConfigTest.java new file mode 100644 index 0000000000..efc209c3c4 --- /dev/null +++ b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TargetsParameterConfigTest.java @@ -0,0 +1,68 @@ +package org.opensearch.dataprepper.plugins.processor.translate; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType; + +import java.util.Collections; + +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; +import static org.hamcrest.MatcherAssert.assertThat; + + +class TargetsParameterConfigTest { + private TargetsParameterConfig targetsParameterConfig; + private RegexParameterConfiguration regexParameterConfiguration; + private TargetsParameterConfig createObjectUnderTest() { + return new TargetsParameterConfig(null, null, null, null, null,null); + } + + @BeforeEach + void setup() throws NoSuchFieldException, IllegalAccessException{ + targetsParameterConfig = createObjectUnderTest(); + setField(TargetsParameterConfig.class, targetsParameterConfig, "target", "targetKey"); + } + + @Test + void test_no_map_patterns_filepath_options_present(){ + assertFalse(targetsParameterConfig.hasMappings()); + } + + @Test + void test_only_map_option_present() throws NoSuchFieldException, IllegalAccessException{ + setField(TargetsParameterConfig.class, targetsParameterConfig, "map", Collections.singletonMap("key1", "val1")); + assertTrue(targetsParameterConfig.hasMappings()); + } + + @Test + void test_no_patterns_under_regex() throws NoSuchFieldException, IllegalAccessException{ + regexParameterConfiguration = new RegexParameterConfiguration(); + setField(RegexParameterConfiguration.class, regexParameterConfiguration, "exact", true); + setField(TargetsParameterConfig.class, targetsParameterConfig, "map", Collections.singletonMap("key1", "val1")); + setField(TargetsParameterConfig.class, targetsParameterConfig, "regexParameterConfig", regexParameterConfiguration); + assertFalse(targetsParameterConfig.isPatternPresent()); + } + + @Test + void test_get_default() throws NoSuchFieldException, IllegalAccessException{ + assertNull(targetsParameterConfig.getDefaultValue()); + setField(TargetsParameterConfig.class, targetsParameterConfig, "defaultValue", "No match"); + assertThat(targetsParameterConfig.getDefaultValue(),is("No match")); + } + + @Test + void test_target_type_default(){ + assertThat(targetsParameterConfig.getTargetType(), is(TargetType.STRING)); + } + + @Test + void test_get_target_type() throws NoSuchFieldException, IllegalAccessException{ + setField(TargetsParameterConfig.class, targetsParameterConfig, "targetType", TargetType.INTEGER); + assertThat(targetsParameterConfig.getTargetType(), is(TargetType.INTEGER)); + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfigTest.java b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfigTest.java index 6af0f325eb..f8b5326392 100644 --- a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfigTest.java +++ b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorConfigTest.java @@ -2,13 +2,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType; -import java.util.Collections; import java.util.List; import static org.hamcrest.core.Is.is; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.hamcrest.MatcherAssert.assertThat; @@ -17,26 +14,23 @@ class TranslateProcessorConfigTest { private TranslateProcessorConfig translateProcessorConfig; - private RegexParameterConfiguration regexParameterConfiguration; private TranslateProcessorConfig createObjectUnderTest() { return new TranslateProcessorConfig(); } @BeforeEach - void setup() throws NoSuchFieldException, IllegalAccessException{ + void setup(){ translateProcessorConfig = createObjectUnderTest(); - setField(TranslateProcessorConfig.class, translateProcessorConfig, "source", "sourceKey"); - setField(TranslateProcessorConfig.class, translateProcessorConfig, "target", "targetKey"); } @Test - void test_no_map_patterns_filepath_options_present(){ + void test_no_mappings_filepath_options_present(){ assertFalse(translateProcessorConfig.hasMappings()); } @Test - void test_only_map_option_present() throws NoSuchFieldException, IllegalAccessException{ - setField(TranslateProcessorConfig.class, translateProcessorConfig, "map", Collections.singletonMap("key1", "val1")); + void test_only_mappings_option_present() throws NoSuchFieldException, IllegalAccessException{ + setField(TranslateProcessorConfig.class, translateProcessorConfig, "mappingsParameterConfigs", List.of(new MappingsParameterConfig())); assertTrue(translateProcessorConfig.hasMappings()); } @@ -47,66 +41,17 @@ void test_only_filepath_option_present() throws NoSuchFieldException, IllegalAcc } @Test - void test_only_patterns_option_present() throws NoSuchFieldException, IllegalAccessException{ - regexParameterConfiguration = new RegexParameterConfiguration(); - setField(RegexParameterConfiguration.class, regexParameterConfiguration, "patterns", Collections.singletonMap("patternKey1", "patternVal1")); - setField(TranslateProcessorConfig.class, translateProcessorConfig, "regexParameterConfiguration", regexParameterConfiguration); - assertTrue(translateProcessorConfig.hasMappings()); - } - - @Test - void test_no_patterns_under_regex() throws NoSuchFieldException, IllegalAccessException{ - regexParameterConfiguration = new RegexParameterConfiguration(); - setField(RegexParameterConfiguration.class, regexParameterConfiguration, "exact", true); - setField(TranslateProcessorConfig.class, translateProcessorConfig, "map", Collections.singletonMap("key1", "val1")); - setField(TranslateProcessorConfig.class, translateProcessorConfig, "regexParameterConfiguration", regexParameterConfiguration); - assertFalse(translateProcessorConfig.isPatternPresent()); - } - - @Test - void test_source_field_valid_types() throws NoSuchFieldException, IllegalAccessException{ - setField(TranslateProcessorConfig.class, translateProcessorConfig, "source", "key1"); - assertTrue(translateProcessorConfig.isSourceFieldValid()); - assertThat(translateProcessorConfig.getSource(), is("key1")); - setField(TranslateProcessorConfig.class, translateProcessorConfig, "source", List.of("key1", "key2", "key3")); - assertTrue(translateProcessorConfig.isSourceFieldValid()); - assertThat(translateProcessorConfig.getSource(), is(List.of("key1", "key2", "key3"))); - } - - @Test - void test_source_field_invalid_types() throws NoSuchFieldException, IllegalAccessException{ - setField(TranslateProcessorConfig.class, translateProcessorConfig, "source", 200); - assertFalse(translateProcessorConfig.isSourceFieldValid()); - setField(TranslateProcessorConfig.class, translateProcessorConfig, "source", false); - assertFalse(translateProcessorConfig.isSourceFieldValid()); - setField(TranslateProcessorConfig.class, translateProcessorConfig, "source", 20.1); - assertFalse(translateProcessorConfig.isSourceFieldValid()); - setField(TranslateProcessorConfig.class, translateProcessorConfig, "source", List.of("key1", 200)); - assertFalse(translateProcessorConfig.isSourceFieldValid()); + void test_get_file_path() throws NoSuchFieldException, IllegalAccessException{ + String filePath = "/path/to/file.yaml"; + setField(TranslateProcessorConfig.class, translateProcessorConfig, "filePath", filePath); + assertThat(translateProcessorConfig.getFilePath(), is(filePath)); } @Test - void test_get_default() throws NoSuchFieldException, IllegalAccessException{ - assertNull(translateProcessorConfig.getDefaultValue()); - setField(TranslateProcessorConfig.class, translateProcessorConfig, "defaultValue", "No match"); - assertThat(translateProcessorConfig.getDefaultValue(),is("No match")); + void test_get_mappings() throws NoSuchFieldException, IllegalAccessException{ + List mappingsParameterConfigs = List.of(new MappingsParameterConfig()); + setField(TranslateProcessorConfig.class, translateProcessorConfig, "mappingsParameterConfigs", mappingsParameterConfigs); + assertThat(translateProcessorConfig.getMappingsParameterConfigs(), is(mappingsParameterConfigs)); } - @Test - void test_get_iterate_on() throws NoSuchFieldException, IllegalAccessException{ - assertNull(translateProcessorConfig.getIterateOn()); - setField(TranslateProcessorConfig.class, translateProcessorConfig, "iterateOn", "iteratorField"); - assertThat(translateProcessorConfig.getIterateOn(),is("iteratorField")); - } - - @Test - void test_target_type_default(){ - assertThat(translateProcessorConfig.getTargetType(), is(TargetType.STRING)); - } - - @Test - void test_get_target_type() throws NoSuchFieldException, IllegalAccessException{ - setField(TranslateProcessorConfig.class, translateProcessorConfig, "targetType", TargetType.INTEGER); - assertThat(translateProcessorConfig.getTargetType(), is(TargetType.INTEGER)); - } } \ No newline at end of file diff --git a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorTest.java b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorTest.java index 394cb11ad8..823f542849 100644 --- a/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorTest.java +++ b/data-prepper-plugins/translate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/translate/TranslateProcessorTest.java @@ -21,15 +21,14 @@ import java.util.List; import java.util.Map; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.when; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.CoreMatchers.is; - @ExtendWith(MockitoExtension.class) class TranslateProcessorTest { @@ -44,17 +43,33 @@ class TranslateProcessorTest { @Mock private ExpressionEvaluator expressionEvaluator; + @Mock + private TargetsParameterConfig targetsParameterConfig; + + @Mock + private MappingsParameterConfig mappingsParameterConfig; + @BeforeEach void setup() { - lenient().when(mockConfig.getSource()).thenReturn("sourceField"); - lenient().when(mockConfig.getTarget()).thenReturn("targetField"); - lenient().when(mockConfig.getTargetType()).thenReturn(TargetType.STRING); - lenient().when(mockRegexConfig.getExact()).thenReturn(mockRegexConfig.DEFAULT_EXACT); + lenient() + .when(mappingsParameterConfig.getSource()) + .thenReturn("sourceField"); + lenient() + .when(targetsParameterConfig.getTargetType()) + .thenReturn(TargetType.STRING); + lenient() + .when(mockRegexConfig.getExact()) + .thenReturn(mockRegexConfig.DEFAULT_EXACT); + lenient() + .when(mockConfig.getMappingsParameterConfigs()) + .thenReturn(List.of(mappingsParameterConfig)); } @Test - void test_string_keys_in_map(){ - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1","mappedValue1"))); + void test_string_keys_in_map() { + targetsParameterConfig = new TargetsParameterConfig(createMapEntries(createMapping("key1", "mappedValue1")), + "targetField", null, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("key1"); final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); @@ -65,7 +80,9 @@ void test_string_keys_in_map(){ @Test void test_integer_keys_in_map() { - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("123", "mappedValue1"))); + targetsParameterConfig = new TargetsParameterConfig(createMapEntries(createMapping("123", "mappedValue1")), + "targetField", null, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("123"); final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); @@ -76,10 +93,13 @@ void test_integer_keys_in_map() { @Test void test_integer_range_keys_in_map() { - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("1-10", "mappedValue1"))); + targetsParameterConfig = new TargetsParameterConfig(createMapEntries(createMapping("1-10", "mappedValue1")), + "targetField", null, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("5"); - final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + final List> translatedRecords = (List>) processor.doExecute( + Collections.singletonList(record)); assertTrue(translatedRecords.get(0).getData().containsKey("targetField")); assertThat(translatedRecords.get(0).getData().get("targetField", String.class), is("mappedValue1")); @@ -88,70 +108,89 @@ void test_integer_range_keys_in_map() { @Test void test_comma_separated_keys_in_map() { - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1,key2, key3", "mappedValue1"))); + targetsParameterConfig = new TargetsParameterConfig( + createMapEntries(createMapping("key1,key2, key3", "mappedValue1")), "targetField", null, null, null, + null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); for (String key : Arrays.asList("key1", "key2", "key3")) { final Record record = getEvent(key); - final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + final List> translatedRecords = (List>) processor.doExecute( + Collections.singletonList(record)); assertTrue(translatedRecords.get(0).getData().containsKey("targetField")); assertThat(translatedRecords.get(0).getData().get("targetField", String.class), is("mappedValue1")); } final Record failureRecord = getEvent("key4"); - final List> failingTranslatedRecords = (List>) processor.doExecute(Collections.singletonList(failureRecord)); + final List> failingTranslatedRecords = (List>) processor.doExecute( + Collections.singletonList(failureRecord)); assertFalse(failingTranslatedRecords.get(0).getData().containsKey("targetField")); } @Test void test_comma_separated_range_keys_in_map() { - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("1-10,11-20, 21-30", "mappedValue1"))); + targetsParameterConfig = new TargetsParameterConfig( + createMapEntries(createMapping("1-10,11-20, 21-30", "mappedValue1")), "targetField", null, null, null, + null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); for (String key : Arrays.asList("5", "15", "25")) { final Record record = getEvent(key); - final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + final List> translatedRecords = (List>) processor.doExecute( + Collections.singletonList(record)); assertTrue(translatedRecords.get(0).getData().containsKey("targetField")); assertThat(translatedRecords.get(0).getData().get("targetField", String.class), is("mappedValue1")); } final Record failureRecord = getEvent("35"); - final List> failingTranslatedRecords = (List>) processor.doExecute(Collections.singletonList(failureRecord)); + final List> failingTranslatedRecords = (List>) processor.doExecute( + Collections.singletonList(failureRecord)); assertFalse(failingTranslatedRecords.get(0).getData().containsKey("targetField")); } @Test void test_float_source() { - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("1-10,11-20, 21-30", "mappedValue1"))); + targetsParameterConfig = new TargetsParameterConfig( + createMapEntries(createMapping("1-10,11-20, 21-30", "mappedValue1")), "targetField", null, null, null, + null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("11.1"); - final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + final List> translatedRecords = (List>) processor.doExecute( + Collections.singletonList(record)); assertTrue(translatedRecords.get(0).getData().containsKey("targetField")); assertThat(translatedRecords.get(0).getData().get("targetField", String.class), is("mappedValue1")); final Record failureRecord = getEvent("20.5"); - final List> failingTranslatedRecords = (List>) processor.doExecute(Collections.singletonList(failureRecord)); + final List> failingTranslatedRecords = (List>) processor.doExecute( + Collections.singletonList(failureRecord)); assertFalse(failingTranslatedRecords.get(0).getData().containsKey("targetField")); } @Test void test_comma_separated_integer_ranges_and_string_keys() { - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("1-10,key1", "mappedValue1"))); + targetsParameterConfig = new TargetsParameterConfig( + createMapEntries(createMapping("1-10,key1", "mappedValue1")), "targetField", null, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("5.2"); - final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + final List> translatedRecords = (List>) processor.doExecute( + Collections.singletonList(record)); assertTrue(translatedRecords.get(0).getData().containsKey("targetField")); assertThat(translatedRecords.get(0).getData().get("targetField", String.class), is("mappedValue1")); final Record recordStringKey = getEvent("key1"); - final List> translatedStringKeyRecords = (List>) processor.doExecute(Collections.singletonList(recordStringKey)); + final List> translatedStringKeyRecords = (List>) processor.doExecute( + Collections.singletonList(recordStringKey)); assertTrue(translatedStringKeyRecords.get(0).getData().containsKey("targetField")); assertThat(translatedStringKeyRecords.get(0).getData().get("targetField", String.class), is("mappedValue1")); @@ -159,15 +198,19 @@ void test_comma_separated_integer_ranges_and_string_keys() { @Test void test_multiple_dashes_in_keys_should_be_treated_as_string_literal() { - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("1-10-20", "mappedValue1"))); + targetsParameterConfig = new TargetsParameterConfig(createMapEntries(createMapping("1-10-20", "mappedValue1")), + "targetField", null, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record failureRecord = getEvent("1-10-20"); - final List> failingTranslatedRecords = (List>) processor.doExecute(Collections.singletonList(failureRecord)); + final List> failingTranslatedRecords = (List>) processor.doExecute( + Collections.singletonList(failureRecord)); assertTrue(failingTranslatedRecords.get(0).getData().containsKey("targetField")); final Record record = getEvent("10"); - final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + final List> translatedRecords = (List>) processor.doExecute( + Collections.singletonList(record)); assertFalse(translatedRecords.get(0).getData().containsKey("targetField")); @@ -175,26 +218,28 @@ void test_multiple_dashes_in_keys_should_be_treated_as_string_literal() { @Test void test_overlapping_ranges_should_fail_when_overlapping() { - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("1-10", "mappedValue1"), createMapping("10-20", "mappedValue2"))); - - assertThrows(InvalidPluginConfigurationException.class, () -> createObjectUnderTest()); + assertThrows(InvalidPluginConfigurationException.class, () -> new TargetsParameterConfig( + createMapEntries(createMapping("1-10", "mappedValue1"), createMapping("10-20", "mappedValue2")), + "targetField", null, null, null, null)); } @Test void test_overlapping_key_and_range_in_map_option() { - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("1-10", "mappedValue1"), createMapping("5.3", "mappedValue2"))); - - assertThrows(InvalidPluginConfigurationException.class, () -> createObjectUnderTest()); + assertThrows(InvalidPluginConfigurationException.class, () -> new TargetsParameterConfig( + createMapEntries(createMapping("1-10", "mappedValue1"), createMapping("5.3", "mappedValue2")), + "targetField", null, null, null, null)); } @Test void test_string_literal_in_pattern_option() { - when(mockConfig.getRegexParameterConfiguration()).thenReturn(mockRegexConfig); when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries(createMapping("key1", "mappedValue1"))); + targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("key1"); - final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + final List> translatedRecords = (List>) processor.doExecute( + Collections.singletonList(record)); assertTrue(translatedRecords.get(0).getData().containsKey("targetField")); assertThat(translatedRecords.get(0).getData().get("targetField", String.class), is("mappedValue1")); @@ -207,8 +252,10 @@ void test_string_literal_in_pattern_option() { @Test void test_matching_of_regex_pattern_in_pattern_option() { - when(mockConfig.getRegexParameterConfiguration()).thenReturn(mockRegexConfig); - when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries(createMapping("^(1[0-9]|20)$", "patternValue1"))); //Range between 10-20 + when(mockRegexConfig.getPatterns()).thenReturn( + createMapEntries(createMapping("^(1[0-9]|20)$", "patternValue1"))); + targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("15"); final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); @@ -224,9 +271,11 @@ void test_matching_of_regex_pattern_in_pattern_option() { @Test void test_pattern_matching_when_no_match_in_map() { - when(mockConfig.getRegexParameterConfiguration()).thenReturn(mockRegexConfig); - when(mockConfig.getMap()).thenReturn((createMapEntries(createMapping("key1", "mappedValue1"), createMapping("key2", "mappedValue2")))); when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries(createMapping("patternKey1", "patternValue1"))); + targetsParameterConfig = new TargetsParameterConfig( + (createMapEntries(createMapping("key1", "mappedValue1"), createMapping("key2", "mappedValue2"))), + "targetField", mockRegexConfig, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("patternKey1"); @@ -244,9 +293,11 @@ void test_pattern_matching_when_no_match_in_map() { @Test void test_map_matching_when_overlapping_ranges_in_map_and_pattern() { - when(mockConfig.getRegexParameterConfiguration()).thenReturn(mockRegexConfig); - when(mockConfig.getMap()).thenReturn((createMapEntries(createMapping("400", "mappedValue1")))); - when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries(createMapping("^(400|404)$", "patternValue1"))); // Matches 400 or 404 + when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries(createMapping("^(400|404)$", "patternValue1"))); + targetsParameterConfig = new TargetsParameterConfig( + (createMapEntries(createMapping("400", "mappedValue1"), createMapping("key2", "mappedValue2"))), + "targetField", mockRegexConfig, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("400"); @@ -264,8 +315,10 @@ void test_map_matching_when_overlapping_ranges_in_map_and_pattern() { @Test void test_source_array_single_key() { - when(mockConfig.getSource()).thenReturn(new ArrayList(List.of("sourceField"))); - when(mockConfig.getMap()).thenReturn((createMapEntries(createMapping("400", "mappedValue1")))); + when(mappingsParameterConfig.getSource()).thenReturn(new ArrayList(List.of("sourceField"))); + targetsParameterConfig = new TargetsParameterConfig(createMapEntries(createMapping("400", "mappedValue1")), + "targetField", null, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("400"); @@ -277,8 +330,11 @@ void test_source_array_single_key() { @Test void test_source_array_multiple_keys() { - when(mockConfig.getSource()).thenReturn(new ArrayList(List.of("sourceField1", "sourceField2"))); - when(mockConfig.getMap()).thenReturn((createMapEntries(createMapping("key1", "mappedValue1"), createMapping("key2", "mappedValue2"), createMapping("key3", "mappedValue3")))); + when(mappingsParameterConfig.getSource()).thenReturn(new ArrayList(List.of("sourceField1", "sourceField2"))); + targetsParameterConfig = new TargetsParameterConfig( + createMapEntries(createMapping("key1", "mappedValue1"), createMapping("key2", "mappedValue2"), + createMapping("key3", "mappedValue3")), "targetField", null, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = buildRecordWithEvent(Map.of("sourceField1", "key1", "sourceField2", "key3")); @@ -290,8 +346,11 @@ void test_source_array_multiple_keys() { @Test void test_source_array_with_partial_match_without_default() { - when(mockConfig.getSource()).thenReturn(new ArrayList(List.of("sourceField1", "sourceField2"))); - when(mockConfig.getMap()).thenReturn((createMapEntries(createMapping("key1", "mappedValue1"), createMapping("key2", "mappedValue2"), createMapping("key3", "mappedValue3")))); + when(mappingsParameterConfig.getSource()).thenReturn(new ArrayList(List.of("sourceField1", "sourceField2"))); + targetsParameterConfig = new TargetsParameterConfig( + createMapEntries(createMapping("key1", "mappedValue1"), createMapping("key2", "mappedValue2"), + createMapping("key3", "mappedValue3")), "targetField", null, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = buildRecordWithEvent(Map.of("sourceField1", "key1", "sourceField2", "key4")); @@ -304,9 +363,11 @@ void test_source_array_with_partial_match_without_default() { @Test void test_source_array_with_partial_match_with_default() { final String defaultValue = "No Match Found"; - when(mockConfig.getSource()).thenReturn(new ArrayList(List.of("sourceField1", "sourceField2"))); - when(mockConfig.getDefaultValue()).thenReturn(defaultValue); - when(mockConfig.getMap()).thenReturn((createMapEntries(createMapping("key1", "mappedValue1"), createMapping("key2", "mappedValue2"), createMapping("key3", "mappedValue3")))); + when(mappingsParameterConfig.getSource()).thenReturn(new ArrayList(List.of("sourceField1", "sourceField2"))); + targetsParameterConfig = new TargetsParameterConfig( + createMapEntries(createMapping("key1", "mappedValue1"), createMapping("key2", "mappedValue2"), + createMapping("key3", "mappedValue3")), "targetField", null, null, defaultValue, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = buildRecordWithEvent(Map.of("sourceField1", "key1", "sourceField2", "key4")); @@ -318,11 +379,12 @@ void test_source_array_with_partial_match_with_default() { @Test void test_non_exact_matching() { - when(mockConfig.getRegexParameterConfiguration()).thenReturn(mockRegexConfig); + when(mockRegexConfig.getExact()).thenReturn(false); when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries( createMapping("^(1[0-9]|20)$", "patternValue1"), createMapping("foo", "bar2"))); - when(mockRegexConfig.getExact()).thenReturn(false); + targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("footer"); @@ -354,12 +416,13 @@ void test_nested_records_with_default_value() { Map.of("sourceField", "key2", "targetField", "mappedValue2"), Map.of("sourceField", "key3", "targetField", "No Match")); - when(mockConfig.getRegexParameterConfiguration()).thenReturn(mockRegexConfig); when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries( createMapping("key1", "mappedValue1"), createMapping("key2", "mappedValue2"))); - when(mockConfig.getDefaultValue()).thenReturn("No Match"); - when(mockConfig.getIterateOn()).thenReturn("collection"); + when(mappingsParameterConfig.getIterateOn()).thenReturn("collection"); + targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, "No Match", + null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = buildRecordWithEvent(testJson); @@ -379,11 +442,12 @@ void test_nested_records_without_default_value() { Map.of("sourceField", "key2", "targetField", "mappedValue2"), Map.of("sourceField", "key3")); - when(mockConfig.getRegexParameterConfiguration()).thenReturn(mockRegexConfig); when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries( createMapping("key1", "mappedValue1"), createMapping("key2", "mappedValue2"))); - when(mockConfig.getIterateOn()).thenReturn("collection"); + when(mappingsParameterConfig.getIterateOn()).thenReturn("collection"); + targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = buildRecordWithEvent(testJson); @@ -403,9 +467,10 @@ void test_nested_records_no_match() { Map.of("sourceField", "key2"), Map.of("sourceField", "key3")); - when(mockConfig.getRegexParameterConfiguration()).thenReturn(mockRegexConfig); when(mockRegexConfig.getPatterns()).thenReturn(createMapEntries(createMapping("key4", "mappedValue1"))); - when(mockConfig.getIterateOn()).thenReturn("collection"); + when(mappingsParameterConfig.getIterateOn()).thenReturn("collection"); + targetsParameterConfig = new TargetsParameterConfig(null, "targetField", mockRegexConfig, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = buildRecordWithEvent(testJson); @@ -415,8 +480,10 @@ void test_nested_records_no_match() { } @Test - void test_target_type_default(){ - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "200"))); + void test_target_type_default() { + targetsParameterConfig = new TargetsParameterConfig(createMapEntries(createMapping("key1", "200")), + "targetField", null, null, null, null); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("key1"); final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); @@ -426,9 +493,10 @@ void test_target_type_default(){ } @Test - void test_target_type_integer(){ - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "200"))); - when(mockConfig.getTargetType()).thenReturn(TargetType.INTEGER); + void test_target_type_integer() { + targetsParameterConfig = new TargetsParameterConfig(createMapEntries(createMapping("key1", "200")), + "targetField", null, null, null, TargetType.INTEGER); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("key1"); final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); @@ -438,9 +506,10 @@ void test_target_type_integer(){ } @Test - void test_target_type_boolean(){ - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "false"))); - when(mockConfig.getTargetType()).thenReturn(TargetType.BOOLEAN); + void test_target_type_boolean() { + targetsParameterConfig = new TargetsParameterConfig(createMapEntries(createMapping("key1", "false")), + "targetField", null, null, null, TargetType.BOOLEAN); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("key1"); final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); @@ -450,9 +519,10 @@ void test_target_type_boolean(){ } @Test - void test_target_type_double(){ - when(mockConfig.getMap()).thenReturn(createMapEntries(createMapping("key1", "20.3"))); - when(mockConfig.getTargetType()).thenReturn(TargetType.DOUBLE); + void test_target_type_double() { + targetsParameterConfig = new TargetsParameterConfig(createMapEntries(createMapping("key1", "20.3")), + "targetField", null, null, null, TargetType.DOUBLE); + when(mappingsParameterConfig.getTargetsParameterConfigs()).thenReturn(List.of(targetsParameterConfig)); final TranslateProcessor processor = createObjectUnderTest(); final Record record = getEvent("key1"); final List> translatedRecords = (List>) processor.doExecute(Collections.singletonList(record)); @@ -461,7 +531,6 @@ void test_target_type_double(){ assertThat(translatedRecords.get(0).getData().get("targetField", Double.class), is(20.3)); } - private TranslateProcessor createObjectUnderTest() { return new TranslateProcessor(pluginMetrics, mockConfig, expressionEvaluator); } From b663de3050badeae599fac555dc2cadfc6b93063 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Fri, 14 Jul 2023 10:51:25 -0500 Subject: [PATCH 4/5] Add support for Data Prepper expressions in the document_id_field of the OpenSearch sink, add opensearch prefix to opensearch source metadata keys (#3025) Signed-off-by: Taylor Gray --- .../expression/ExpressionEvaluator.java | 2 ++ .../expression/ExpressionEvaluatorTest.java | 5 ++++ .../GenericExpressionEvaluator.java | 11 ++++++++ .../GenericExpressionEvaluatorTest.java | 26 +++++++++++++++++++ .../opensearch-source/README.md | 4 +-- .../client/model/MetadataKeyAttributes.java | 4 +-- data-prepper-plugins/opensearch/README.md | 4 ++- .../sink/opensearch/OpenSearchSinkIT.java | 16 ++++++++---- .../sink/opensearch/OpenSearchSink.java | 21 ++++++++++++++- 9 files changed, 82 insertions(+), 11 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java index a894e8f6c0..c006a31cae 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/expression/ExpressionEvaluator.java @@ -32,4 +32,6 @@ default Boolean evaluateConditional(final String statement, final Event context) throw new ClassCastException("Unexpected expression return type of " + result.getClass()); } } + + Boolean isValidExpressionStatement(final String statement); } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java index 3cc066cda0..36a60ac447 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/expression/ExpressionEvaluatorTest.java @@ -18,6 +18,11 @@ class TestExpressionEvaluator implements ExpressionEvaluator { public Object evaluate(final String statement, final Event event) { return event.get(statement, Object.class); } + + @Override + public Boolean isValidExpressionStatement(final String statement) { + return true; + } } @Test diff --git a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java index 836b2f0cc8..072b3a7393 100644 --- a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java +++ b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator.java @@ -41,4 +41,15 @@ public Object evaluate(final String statement, final Event context) { throw new ExpressionEvaluationException("Unable to evaluate statement \"" + statement + "\"", exception); } } + + @Override + public Boolean isValidExpressionStatement(final String statement) { + try { + parser.parse(statement); + return true; + } + catch (final Exception exception) { + return false; + } + } } diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java index e0c973289f..d68808dc85 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluatorTest.java @@ -16,6 +16,7 @@ import java.util.UUID; import java.util.Random; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -93,5 +94,30 @@ void testGivenEvaluatorThrowsExceptionThenExceptionThrown() { verify(evaluator).evaluate(eq(parseTree), eq(event)); } + @Test + void isValidExpressionStatement_returns_true_when_parse_does_not_throw() { + final String statement = UUID.randomUUID().toString(); + final ParseTree parseTree = mock(ParseTree.class); + + doReturn(parseTree).when(parser).parse(eq(statement)); + + final boolean result = statementEvaluator.isValidExpressionStatement(statement); + + assertThat(result, equalTo(true)); + + verify(parser).parse(eq(statement)); + } + + @Test + void isValidExpressionStatement_returns_false_when_parse_throws() { + final String statement = UUID.randomUUID().toString(); + + doThrow(RuntimeException.class).when(parser).parse(eq(statement)); + + final boolean result = statementEvaluator.isValidExpressionStatement(statement); + + assertThat(result, equalTo(false)); + } + } diff --git a/data-prepper-plugins/opensearch-source/README.md b/data-prepper-plugins/opensearch-source/README.md index b904d67378..d0f27e0176 100644 --- a/data-prepper-plugins/opensearch-source/README.md +++ b/data-prepper-plugins/opensearch-source/README.md @@ -73,8 +73,8 @@ opensearch-source-pipeline: ### Using Metadata When the OpenSearch source constructs Data Prepper Events from documents in the cluster, the -document index is stored in the `EventMetadata` with an `index` key, and the document_id is -stored in the `EventMetadata` with a `document_id` key. This allows conditional routing based on the index or document_id, +document index is stored in the `EventMetadata` with an `opensearch-index` key, and the document_id is +stored in the `EventMetadata` with a `opensearch-document_id` key. This allows conditional routing based on the index or document_id, among other things. For example, one could send to an OpenSearch sink and use the same index and document_id from the source cluster in the destination cluster. A full config example for this use case is below diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java index d7b88ddfa9..68fbc4677b 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java @@ -6,6 +6,6 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; public class MetadataKeyAttributes { - public static final String DOCUMENT_ID_METADATA_ATTRIBUTE_NAME = "document_id"; - public static final String INDEX_METADATA_ATTRIBUTE_NAME = "index"; + public static final String DOCUMENT_ID_METADATA_ATTRIBUTE_NAME = "opensearch-document_id"; + public static final String INDEX_METADATA_ATTRIBUTE_NAME = "opensearch-index"; } diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md index ffe28dae54..9ec886f6e2 100644 --- a/data-prepper-plugins/opensearch/README.md +++ b/data-prepper-plugins/opensearch/README.md @@ -167,7 +167,9 @@ and is ignored unless `estimate_bulk_size_using_compression` is enabled. Default If this timeout expires before a bulk request has reached the bulk_size, the request will be flushed as-is. Set to -1 to disable the flush timeout and instead flush whatever is present at the end of each batch. Default is 60,000, or one minute. -- `document_id_field` (optional): A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id +- `document_id_field` (optional): A string of document identifier which is used as `id` for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the id for the document, if it is not present, a unique id is generated by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the document id. This field can also be a Data Prepper expression + that is evaluated to determine the document_id_field. For example, setting to `getMetadata(\"some_metadata_key\")` will use the value of the metadata key + as the document_id - `routing_field` (optional): A string of routing field which is used as hash for generating sharding id for the document when it is stored in the OpenSearch. Each incoming record is searched for this field and if it is present, it is used as the routing field for the document, if it is not present, default routing mechanism used by the OpenSearch when storing the document. Standard Data Prepper Json pointer syntax is used for retrieving the value. If the field has "/" in it then the incoming record is searched in the json sub-objects instead of just in the root of the json object. For example, if the field is specified as `info/id`, then the root of the event is searched for `info` and if it is found, then `id` is searched inside it. The value specified for `id` is used as the routing id diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index c6ac5752ec..de150c820b 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Measurement; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.http.util.EntityUtils; import org.hamcrest.MatcherAssert; import org.junit.Assert; @@ -32,6 +33,7 @@ import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.metrics.MetricsTestUtil; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -47,9 +49,6 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; -import org.apache.commons.lang3.RandomStringUtils; - -import static org.mockito.Mockito.when; import javax.ws.rs.HttpMethod; import java.io.BufferedReader; @@ -91,6 +90,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.getHosts; @@ -121,8 +121,10 @@ public class OpenSearchSinkIT { @Mock private AwsCredentialsSupplier awsCredentialsSupplier; + private ExpressionEvaluator expressionEvaluator; + public OpenSearchSink createObjectUnderTest(PluginSetting pluginSetting, boolean doInitialize) { - OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, null, awsCredentialsSupplier); + OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, null, expressionEvaluator, awsCredentialsSupplier); if (doInitialize) { sink.doInitialize(); } @@ -133,7 +135,7 @@ public OpenSearchSink createObjectUnderTestWithSinkContext(PluginSetting pluginS sinkContext = mock(SinkContext.class); testTagsTargetKey = RandomStringUtils.randomAlphabetic(5); when(sinkContext.getTagsTargetKey()).thenReturn(testTagsTargetKey); - OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, sinkContext, awsCredentialsSupplier); + OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, sinkContext, expressionEvaluator, awsCredentialsSupplier); if (doInitialize) { sink.doInitialize(); } @@ -142,6 +144,10 @@ public OpenSearchSink createObjectUnderTestWithSinkContext(PluginSetting pluginS @BeforeEach public void setup() { + + expressionEvaluator = mock(ExpressionEvaluator.class); + when(expressionEvaluator.isValidExpressionStatement(any(String.class))).thenReturn(false); + eventHandle = mock(EventHandle.class); lenient().doAnswer(a -> { return null; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 10d7ea1ca6..ec5ebe4de1 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -19,6 +19,8 @@ import org.opensearch.client.transport.TransportOptions; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluationException; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.MetricNames; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -108,6 +110,8 @@ public class OpenSearchSink extends AbstractSink> { private volatile boolean initialized; private PluginSetting pluginSetting; private final SinkContext sinkContext; + private final ExpressionEvaluator expressionEvaluator; + private final boolean isDocumentIdAnExpression; private FailedBulkOperationConverter failedBulkOperationConverter; @@ -119,10 +123,12 @@ public class OpenSearchSink extends AbstractSink> { public OpenSearchSink(final PluginSetting pluginSetting, final PluginFactory pluginFactory, final SinkContext sinkContext, + final ExpressionEvaluator expressionEvaluator, final AwsCredentialsSupplier awsCredentialsSupplier) { super(pluginSetting, Integer.MAX_VALUE, INITIALIZE_RETRY_WAIT_TIME_MS); this.awsCredentialsSupplier = awsCredentialsSupplier; this.sinkContext = sinkContext; + this.expressionEvaluator = expressionEvaluator; bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY); bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS); dynamicIndexDroppedEvents = pluginMetrics.counter(DYNAMIC_INDEX_DROPPED_EVENTS); @@ -133,6 +139,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.flushTimeout = openSearchSinkConfig.getIndexConfiguration().getFlushTimeout(); this.indexType = openSearchSinkConfig.getIndexConfiguration().getIndexType(); this.documentIdField = openSearchSinkConfig.getIndexConfiguration().getDocumentIdField(); + this.isDocumentIdAnExpression = expressionEvaluator.isValidExpressionStatement(documentIdField); this.routingField = openSearchSinkConfig.getIndexConfiguration().getRoutingField(); this.action = openSearchSinkConfig.getIndexConfiguration().getAction(); this.documentRootKey = openSearchSinkConfig.getIndexConfiguration().getDocumentRootKey(); @@ -317,7 +324,19 @@ public void doOutput(final Collection> records) { } private SerializedJson getDocument(final Event event) { - String docId = (documentIdField != null) ? event.get(documentIdField, String.class) : null; + + String docId = null; + + if (isDocumentIdAnExpression) { + try { + docId = (String) expressionEvaluator.evaluate(documentIdField, event); + } catch (final ExpressionEvaluationException e) { + LOG.error("Unable to construct document_id_field from expression {}, the document_id will be generated by OpenSearch", documentIdField); + } + } else if (Objects.nonNull(documentIdField)) { + docId = event.get(documentIdField, String.class); + } + String routing = (routingField != null) ? event.get(routingField, String.class) : null; final String document = DocumentBuilder.build(event, documentRootKey, Objects.nonNull(sinkContext)?sinkContext.getTagsTargetKey():null); From 0b804fd8d8572044dd32d8f3e4f1c24d9a9cdf8e Mon Sep 17 00:00:00 2001 From: venkataraopasyavula <126578319+venkataraopasyavula@users.noreply.github.com> Date: Sat, 15 Jul 2023 03:24:33 +0530 Subject: [PATCH 5/5] GitHub-issue#253 : Implemented GeoIP processor integration test (#2927) * GitHub-issue#253 : Implemented GeoIP processor integration test Signed-off-by: venkataraopasyavula * GitHub-issue#253 : Implemented GeoIP processor integration test Signed-off-by: venkataraopasyavula * GitHub-issue#253 : Implemented GeoIP processor integration test Signed-off-by: venkataraopasyavula * GitHub-issue#253 : Implemented GeoIP processor integration test Signed-off-by: venkataraopasyavula * GitHub-issue#253 : Implemented GeoIP processor integration test Signed-off-by: venkataraopasyavula --- .../geoip-processor/build.gradle | 1 + .../plugins/processor/GeoIPInputJson.java | 26 +++++ .../processor/GeoIPProcessorUrlServiceIT.java | 102 ++++++++++++++++++ .../dataprepper/plugins/processor/Peer.java | 26 +++++ 4 files changed, 155 insertions(+) create mode 100644 data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/processor/GeoIPInputJson.java create mode 100644 data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorUrlServiceIT.java create mode 100644 data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/processor/Peer.java diff --git a/data-prepper-plugins/geoip-processor/build.gradle b/data-prepper-plugins/geoip-processor/build.gradle index 7eb4d41b63..60eca37e68 100644 --- a/data-prepper-plugins/geoip-processor/build.gradle +++ b/data-prepper-plugins/geoip-processor/build.gradle @@ -66,6 +66,7 @@ task integrationTest(type: Test) { useJUnitPlatform() classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'tests.geoipProcessor.maxmindLicenseKey', System.getProperty('tests.geoipProcessor.maxmindLicenseKey') filter { includeTestsMatching '*IT' diff --git a/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/processor/GeoIPInputJson.java b/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/processor/GeoIPInputJson.java new file mode 100644 index 0000000000..9e84788bea --- /dev/null +++ b/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/processor/GeoIPInputJson.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor; + +public class GeoIPInputJson { + Peer PeerObject; + private String status; + + // Getter Methods + public Peer getPeer() { + return PeerObject; + } + public String getStatus() { + return status; + } + // Setter Methods + public void setPeer( Peer peerObject ) { + this.PeerObject = peerObject; + } + public void setStatus( String status ) { + this.status = status; + } +} diff --git a/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorUrlServiceIT.java b/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorUrlServiceIT.java new file mode 100644 index 0000000000..01aa8ece51 --- /dev/null +++ b/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/processor/GeoIPProcessorUrlServiceIT.java @@ -0,0 +1,102 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.processor.utils.IPValidationcheck; + +import java.io.File; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +@ExtendWith(MockitoExtension.class) +public class GeoIPProcessorUrlServiceIT { + + private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + private String tempPath; + private GeoIPProcessorConfig geoIPProcessorConfig; + private String maxmindLicenseKey; + private GeoIPProcessorService geoIPProcessorService; + private GeoIPInputJson geoIPInputJson; + private String jsonInput; + private static final String TEMP_PATH_FOLDER = "GeoIP"; + public static final String DATABASE_1 = "first_database"; + public static final String URL_SUFFIX = "&suffix=tar.gz"; + + @BeforeEach + public void setUp() throws JsonProcessingException { + + maxmindLicenseKey = System.getProperty("tests.geoipProcessor.maxmindLicenseKey"); + + jsonInput = "{\"peer\": {\"ip\": \"8.8.8.8\", \"host\": \"example.org\" }, \"status\": \"success\"}"; + + geoIPInputJson = objectMapper.readValue(jsonInput, GeoIPInputJson.class); + tempPath = System.getProperty("java.io.tmpdir")+ File.separator + TEMP_PATH_FOLDER; + + String asnUrl = "https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-ASN&license_key=" + maxmindLicenseKey + URL_SUFFIX; + String cityUrl = "https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-City&license_key=" + maxmindLicenseKey + URL_SUFFIX; + String countryUrl = "https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-Country&license_key=" + maxmindLicenseKey + URL_SUFFIX; + + String pipelineConfig = " aws:\n" + + " region: us-east-2\n" + + " sts_role_arn: \"arn:aws:iam::123456789:role/data-prepper-execution-role\"\n" + + " keys:\n" + + " - key:\n" + + " source: \"/peer/ip\"\n" + + " target: \"target1\"\n" + + " - key:\n" + + " source: \"/peer/ip2\"\n" + + " target: \"target2\"\n" + + " attributes: [\"city_name\",\"country_name\"]\n" + + " service_type:\n" + + " maxmind:\n" + + " database_path:\n" + + " - url: " + asnUrl + "\n" + + " - url: " + cityUrl + "\n" + + " - url: " + countryUrl + "\n" + + " load_type: \"cache\"\n" + + " cache_size: 8192\n" + + " cache_refresh_schedule: PT3M"; + + objectMapper.registerModule(new JavaTimeModule()); + this.geoIPProcessorConfig = objectMapper.readValue(pipelineConfig, GeoIPProcessorConfig.class); + } + + public GeoIPProcessorService createObjectUnderTest() { + return new GeoIPProcessorService(geoIPProcessorConfig, tempPath); + } + + @Test + void verify_enrichment_of_data_from_maxmind_url() throws UnknownHostException { + + Map geoData = new HashMap<>(); + this.geoIPProcessorService = createObjectUnderTest(); + String ipAddress = geoIPInputJson.getPeer().getIp(); + if (IPValidationcheck.isPublicIpAddress(ipAddress)) { + InetAddress inetAddress = InetAddress.getByName(ipAddress); + //All attributes are considered by default with the null value + geoData = geoIPProcessorService.getGeoData(inetAddress, null); + + assertThat(geoData.get("country_iso_code"), equalTo("US")); + assertThat(geoData.get("ip"), equalTo("8.8.8.8")); + assertThat(geoData.get("country_name"), equalTo("United States")); + assertThat(geoData.get("organization_name"), equalTo("GOOGLE")); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/processor/Peer.java b/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/processor/Peer.java new file mode 100644 index 0000000000..bd0c0da9b6 --- /dev/null +++ b/data-prepper-plugins/geoip-processor/src/integrationTest/java/org/opensearch/dataprepper/plugins/processor/Peer.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor; + +public class Peer { + private String ip; + private String host; + + // Getter Methods + public String getIp() { + return ip; + } + public String getHost() { + return host; + } + // Setter Methods + public void setIp( String ip ) { + this.ip = ip; + } + public void setHost( String host ) { + this.host = host; + } +} \ No newline at end of file