From bdbb1747ae9aed5f224d05ec7572b4ed0e6fb65b Mon Sep 17 00:00:00 2001 From: Uday Chintala Date: Fri, 4 Aug 2023 23:28:24 +0530 Subject: [PATCH] Sns Sink Plugin with junit test cases (#2995) Sns Sink Plugin with junit test cases --------- Signed-off-by: Uday Kumar Chintala Signed-off-by: Uday Chintala --- data-prepper-plugins/sns-sink/README.md | 79 ++++++ data-prepper-plugins/sns-sink/build.gradle | 64 +++++ .../plugins/sink/sns/SnsSinkServiceIT.java | 229 +++++++++++++++++ .../plugins/sink/sns/SnsClientFactory.java | 43 ++++ .../dataprepper/plugins/sink/sns/SnsSink.java | 107 ++++++++ .../plugins/sink/sns/SnsSinkConfig.java | 135 ++++++++++ .../plugins/sink/sns/SnsSinkService.java | 233 ++++++++++++++++++ .../AwsAuthenticationOptions.java | 38 +++ .../plugins/sink/sns/dlq/DlqPushHandler.java | 136 ++++++++++ .../sink/sns/dlq/SnsSinkFailedDlqData.java | 55 +++++ .../sink/sns/SnsClientFactoryTest.java | 98 ++++++++ .../plugins/sink/sns/SnsSinkConfigTest.java | 81 ++++++ .../plugins/sink/sns/SnsSinkServiceTest.java | 184 ++++++++++++++ .../plugins/sink/sns/SnsSinkTest.java | 89 +++++++ .../sink/sns/dlq/DlqPushHandlerTest.java | 98 ++++++++ .../org.mockito.plugins.MockMaker | 3 + settings.gradle | 1 + 17 files changed, 1673 insertions(+) create mode 100644 data-prepper-plugins/sns-sink/README.md create mode 100644 data-prepper-plugins/sns-sink/build.gradle create mode 100644 data-prepper-plugins/sns-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceIT.java create mode 100644 data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactory.java create mode 100644 data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSink.java create mode 100644 data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfig.java create mode 100644 data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java create mode 100644 data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/configuration/AwsAuthenticationOptions.java create mode 100644 data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java create mode 100644 data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/SnsSinkFailedDlqData.java create mode 100644 data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactoryTest.java create mode 100644 data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfigTest.java create mode 100644 data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceTest.java create mode 100644 data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkTest.java create mode 100644 data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java create mode 100644 data-prepper-plugins/sns-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/data-prepper-plugins/sns-sink/README.md b/data-prepper-plugins/sns-sink/README.md new file mode 100644 index 0000000000..f84d5d8566 --- /dev/null +++ b/data-prepper-plugins/sns-sink/README.md @@ -0,0 +1,79 @@ +# SNS Sink + +This is the Data Prepper SNS Sink plugin that sends records to an SNS Topic. + +## Usages + +The SNS sink should be configured as part of Data Prepper pipeline yaml file. + +## Configuration Options + +``` +pipeline: + ... + sink: + - sns: + topic_arn: arn:aws:sns:ap-south-1:524239988922:my-topic + message_group_id: /type + message_deduplication_id: /id + batch_size: 10 + aws: + region: ap-south-1 + sts_role_arn: arn:aws:iam::524239988922:role/app-test + dlq: + s3: + bucket: test-bucket + key_path_prefix: dlq/ + codec: + ndjson: + max_retries: 5 +``` + +## SNS Pipeline Configuration + +- `topic_arn` (Optional) : The SNS Topic Arn of the Topic to push events. + +- `batch_size` (Optional) : An integer value indicates the maximum number of events required to ingest into sns topic. Defaults to 10. + +- `message_group_id` (optional): A string of message group identifier which is used as `message_group_id` for the message group when it is stored in the sns topic. Default to Auto generated Random key. + +- `message_deduplication_id` (Optional) : A string of message deduplication identifier which is used as `message_deduplication_id` for the message deduplication when it is stored in the sns topic. Default to Auto generated Random key. + +- `dlq_file`(optional): A String of absolute file path for DLQ failed output records. Defaults to null. + If not provided, failed records will be written into the default data-prepper log file (`logs/Data-Prepper.log`). If the `dlq` option is present along with this, an error is thrown. + +- `dlq` (optional): DLQ configurations. See [DLQ](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/README.md) for details. If the `dlq_file` option is present along with this, an error is thrown. + +- `codec` : This plugin is integrated with sink codec + +- `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details. SigV4 is enabled by default when this option is used. If this option is present, `aws_` options are not expected to be present. If any of `aws_` options are present along with this, error is thrown. + +- `max_retries` (Optional) : An integer value indicates the maximum number of times that single request should be retired in-order to ingest data to amazon SNS and S3. Defaults to `5`. + +### AWS Configuration + +* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). +* `sts_role_arn` (Optional) : The AWS STS role to assume for requests to SNS and S3. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). + + +### Counters + +* `snsSinkObjectsEventsSucceeded` - The number of events that the SNS sink has successfully sent to Topic. +* `snsSinkObjectsEventsFailed` - The number of events that the SNS sink has successfully sent to Topic. + +## Developer Guide + +This plugin is compatible with Java 11. See below + +- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) +- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) + +The integration tests for this plugin do not run as part of the Data Prepper build. + +The following command runs the integration tests: + +Note: Subscribe sns topic to sqs queues to run the integration tests. + +``` +./gradlew :data-prepper-plugins:sns-sink:integrationTest -Dtests.sns.sink.region=<> -Dtests.sns.sink.sts.role.arn=<> -Dtests.sns.sink.standard.topic=<> -Dtests.sns.sink.fifo.topic=<> -Dtests.sns.sink.dlq.file.path=<> -Dtests.sns.sink.standard.sqs.queue.url=<> -Dtests.sns.sink.fifo.sqs.queue.url=<> +``` diff --git a/data-prepper-plugins/sns-sink/build.gradle b/data-prepper-plugins/sns-sink/build.gradle new file mode 100644 index 0000000000..49d2918b00 --- /dev/null +++ b/data-prepper-plugins/sns-sink/build.gradle @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +dependencies { + implementation project(':data-prepper-api') + implementation project(path: ':data-prepper-plugins:common') + implementation project(':data-prepper-plugins:aws-plugin-api') + implementation 'io.micrometer:micrometer-core' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' + implementation 'software.amazon.awssdk:sns' + implementation 'software.amazon.awssdk:sts' + testImplementation 'software.amazon.awssdk:sqs' + implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.8.21' + implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.8.21' + implementation 'org.apache.commons:commons-lang3:3.12.0' + implementation project(':data-prepper-plugins:failures-common') + testImplementation project(':data-prepper-test-common') + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' +} + +test { + useJUnitPlatform() +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + resources.srcDir file('src/integrationTest/resources') + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'tests.sns.sink.region', System.getProperty('tests.sns.sink.region') + systemProperty 'tests.sns.sink.dlq.file.path', System.getProperty('tests.sns.sink.dlq.file.path') + systemProperty 'tests.sns.sink.sts.role.arn', System.getProperty('tests.sns.sink.sts.role.arn') + systemProperty 'tests.sns.sink.standard.topic', System.getProperty('tests.sns.sink.standard.topic') + systemProperty 'tests.sns.sink.fifo.topic', System.getProperty('tests.sns.sink.fifo.topic') + systemProperty 'tests.sns.sink.standard.sqs.queue.url', System.getProperty('tests.sns.sink.standard.sqs.queue.url') + systemProperty 'tests.sns.sink.fifo.sqs.queue.url', System.getProperty('tests.sns.sink.fifo.sqs.queue.url') + + filter { + includeTestsMatching '*IT' + } +} + diff --git a/data-prepper-plugins/sns-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceIT.java b/data-prepper-plugins/sns-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceIT.java new file mode 100644 index 0000000000..b3c235dab1 --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceIT.java @@ -0,0 +1,229 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.sns; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.sink.sns.dlq.DlqPushHandler; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sns.SnsClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.text.MessageFormat; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.sink.sns.SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED; +import static org.opensearch.dataprepper.plugins.sink.sns.SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS; + +public class SnsSinkServiceIT { + + private SnsClient snsClient; + + private PluginMetrics pluginMetrics; + + private PluginFactory pluginFactory; + + private PluginSetting pluginSetting; + + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + + private static final String SNS_SINK_CONFIG_YAML = " topic_arn: {0}\n" + + " batch_size: 10\n" + + " aws:\n" + + " region: {1}\n" + + " sts_role_arn: {2}\n" + + " dlq_file: {3}\n" + + " codec:\n" + + " ndjson:\n" + + " max_retries: 5"; + + private String standardTopic; + + private String fifoTopic; + + private String region; + + private String stsRoleArn; + + private String dlqFilePath; + + private Counter snsSinkObjectsEventsSucceeded; + + private Counter numberOfRecordsFailedCounter; + + private String standardSqsQueue; + + private SqsClient sqsClient; + + private String fifoSqsQueue; + + private DlqPushHandler dlqPushHandler; + + @BeforeEach + public void setup() { + this.standardTopic = System.getProperty("tests.sns.sink.standard.topic"); + this.fifoTopic = System.getProperty("tests.sns.sink.fifo.topic"); + this.region = System.getProperty("tests.sns.sink.region"); + this.stsRoleArn = System.getProperty("tests.sns.sink.sts.role.arn"); + this.dlqFilePath = System.getProperty("tests.sns.sink.dlq.file.path"); + this.standardSqsQueue = System.getProperty("tests.sns.sink.standard.sqs.queue.url"); + this.fifoSqsQueue = System.getProperty("tests.sns.sink.fifo.sqs.queue.url"); + + this.dlqPushHandler = mock(DlqPushHandler.class); + this.pluginMetrics = mock(PluginMetrics.class); + this.pluginFactory = mock(PluginFactory.class); + this.pluginSetting = mock(PluginSetting.class); + this.snsSinkObjectsEventsSucceeded = mock(Counter.class); + this.numberOfRecordsFailedCounter = mock(Counter.class); + this.snsClient = SnsClient.builder() + .region(Region.of(region)) + .build(); + this.sqsClient = SqsClient.builder().region(Region.of(region)).build(); + when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS)).thenReturn(snsSinkObjectsEventsSucceeded); + when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED)).thenReturn(numberOfRecordsFailedCounter); + } + + private Collection> setEventQueue(final int records) { + final Collection> jsonObjects = new LinkedList<>(); + for (int i = 0; i < records; i++) + jsonObjects.add(createRecord()); + return jsonObjects; + } + + private static Record createRecord() { + final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build(); + event.setEventHandle(mock(EventHandle.class)); + return new Record<>(event); + } + + public SnsSinkService createObjectUnderTest(final String topicName) throws JsonProcessingException { + String[] values = { topicName,region,stsRoleArn,dlqFilePath }; + final String configYaml = MessageFormat.format(SNS_SINK_CONFIG_YAML, values); + final SnsSinkConfig snsSinkConfig = objectMapper.readValue(configYaml, SnsSinkConfig.class); + return new SnsSinkService(snsSinkConfig,snsClient,pluginMetrics,pluginFactory,pluginSetting,mock(ExpressionEvaluator.class)); + } + + @ParameterizedTest + @ValueSource(ints = {5,9,10}) + public void sns_sink_service_test_with_standard_queue_with_multiple_records(final int recordCount) throws JsonProcessingException, InterruptedException { + final SnsSinkService objectUnderTest = createObjectUnderTest(standardTopic); + final Collection> records = setEventQueue(recordCount); + final List inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList()); + objectUnderTest.output(records); + Thread.sleep(Duration.ofSeconds(10).toMillis()); + List topicData = readMessagesFromSNSTopicQueue(inputRecords,standardSqsQueue); + assertThat(inputRecords, is(topicData)); + assertThat(inputRecords.size(), equalTo(topicData.size())); + verify(snsSinkObjectsEventsSucceeded).increment(recordCount); + } + + @Test + public void sns_sink_service_test_with_standard_queue_with_multiple_batch() throws JsonProcessingException, InterruptedException { + final SnsSinkService objectUnderTest = createObjectUnderTest(standardTopic); + final Collection> records = setEventQueue(11); + final List inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList()); + objectUnderTest.output(records); + Thread.sleep(Duration.ofSeconds(10).toMillis()); + List topicData = readMessagesFromSNSTopicQueue(inputRecords,standardSqsQueue); + assertThat(inputRecords, is(topicData)); + assertThat(inputRecords.size(), equalTo(topicData.size())); + verify(snsSinkObjectsEventsSucceeded,times(2)).increment(anyDouble()); + } + + private List readMessagesFromSNSTopicQueue(List inputRecords, final String sqsQueue) { + final List messages = new ArrayList<>(); + long startTime = System.currentTimeMillis(); + long endTime = startTime + 60000; + do { + messages.addAll(sqsClient.receiveMessage(builder -> builder.queueUrl(sqsQueue)).messages()); + if(messages.size() >= inputRecords.size()){ + break; + } + } while (System.currentTimeMillis() < endTime); + + List topicData = messages.stream().map(Message::body).map(obj-> { + try { + Map map = objectMapper.readValue(obj,Map.class); + return map.get("Message"); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + return topicData; + } + + private void deleteSqsMessages(String sqsQueue, List messages) throws InterruptedException { + for (Message message : messages) { + sqsClient.deleteMessage(builder -> builder.queueUrl(sqsQueue).receiptHandle(message.receiptHandle())); + Thread.sleep(Duration.ofSeconds(2).toMillis()); + } + } + + @ParameterizedTest + @ValueSource(ints = {1, 5, 10}) + public void sns_sink_service_test_with_fifo_queue_with_multiple_records(final int recordCount) throws JsonProcessingException, InterruptedException { + final SnsSinkService objectUnderTest = createObjectUnderTest(fifoTopic); + final Collection> records = setEventQueue(recordCount); + final List inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList()); + objectUnderTest.output(records); + Thread.sleep(Duration.ofSeconds(10).toMillis()); + List topicData = readMessagesFromSNSTopicQueue(inputRecords,fifoSqsQueue); + assertThat(inputRecords, is(topicData)); + assertThat(inputRecords.size(), equalTo(topicData.size())); + verify(snsSinkObjectsEventsSucceeded).increment(recordCount); + } + + + + @ParameterizedTest + @ValueSource(ints = {1,5,9}) + public void sns_sink_service_test_fail_to_push(final int recordCount) throws IOException, InterruptedException { + final ObjectMapper mapper = new ObjectMapper(); + final String topic = "test"; + Files.deleteIfExists(Path.of(dlqFilePath)); + final SnsSinkService objectUnderTest = createObjectUnderTest(topic); + final Collection> records = setEventQueue(recordCount); + objectUnderTest.output(records); + Thread.sleep(Duration.ofSeconds(10).toMillis()); + verify(numberOfRecordsFailedCounter).increment(recordCount); + final Map map = mapper.readValue(new String(Files.readAllBytes(Path.of(dlqFilePath))).replaceAll("(\\r|\\n)", ""), Map.class); + assertThat(map.get("topic"),equalTo(topic)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactory.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactory.java new file mode 100644 index 0000000000..b8fac5e4cb --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactory.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.sns; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.sink.sns.configuration.AwsAuthenticationOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.services.sns.SnsClient; + +public class SnsClientFactory { + + public static SnsClient createSNSClient(final SnsSinkConfig snsSinkConfig, + final AwsCredentialsSupplier awsCredentialsSupplier) { + final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(snsSinkConfig.getAwsAuthenticationOptions()); + final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); + + return SnsClient.builder() + .region(snsSinkConfig.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(awsCredentialsProvider) + .overrideConfiguration(createOverrideConfiguration(snsSinkConfig)).build(); + } + + private static ClientOverrideConfiguration createOverrideConfiguration(final SnsSinkConfig snsSinkConfig) { + final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(snsSinkConfig.getMaxConnectionRetries()).build(); + return ClientOverrideConfiguration.builder() + .retryPolicy(retryPolicy) + .build(); + } + + private static AwsCredentialsOptions convertToCredentialsOptions(final AwsAuthenticationOptions awsAuthenticationOptions) { + return AwsCredentialsOptions.builder() + .withRegion(awsAuthenticationOptions.getAwsRegion()) + .withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn()) + .withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides()) + .build(); + } +} diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSink.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSink.java new file mode 100644 index 0000000000..9c4e40e769 --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSink.java @@ -0,0 +1,107 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.sns; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sns.SnsClient; + +import java.util.Collection; + +/** + * Implementation class of sns-sink plugin. It is responsible for receive the collection of + * {@link Event} and upload to amazon sns based on thresholds configured. + */ +@DataPrepperPlugin(name = "sns", pluginType = Sink.class, pluginConfigurationType = SnsSinkConfig.class) +public class SnsSink extends AbstractSink> { + + private static final Logger LOG = LoggerFactory.getLogger(SnsSink.class); + + private volatile boolean sinkInitialized; + + private final SnsSinkService snsSinkService; + + /** + * @param pluginSetting dp plugin settings. + * @param snsSinkConfig sns sink configurations. + * @param pluginFactory dp plugin factory. + */ + @DataPrepperPluginConstructor + public SnsSink(final PluginSetting pluginSetting, + final SnsSinkConfig snsSinkConfig, + final PluginFactory pluginFactory, + final SinkContext sinkContext, + final ExpressionEvaluator expressionEvaluator, + final AwsCredentialsSupplier awsCredentialsSupplier) { + super(pluginSetting); + final PluginModel codecConfiguration = snsSinkConfig.getCodec(); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), + codecConfiguration.getPluginSettings()); + // TODO: Sink codec changes are pending + // codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); + sinkInitialized = Boolean.FALSE; + final SnsClient snsClient = SnsClientFactory.createSNSClient(snsSinkConfig, awsCredentialsSupplier); + + + + snsSinkService = new SnsSinkService(snsSinkConfig, + snsClient, + pluginMetrics, + pluginFactory, + pluginSetting, + expressionEvaluator); + } + + @Override + public boolean isReady() { + return sinkInitialized; + } + + @Override + public void doInitialize() { + try { + doInitializeInternal(); + } catch (InvalidPluginConfigurationException e) { + LOG.error("Invalid plugin configuration, Hence failed to initialize sns-sink plugin."); + this.shutdown(); + throw e; + } catch (Exception e) { + LOG.error("Failed to initialize sns-sink plugin."); + this.shutdown(); + throw e; + } + } + + /** + * Initialize {@link SnsSinkService} + */ + private void doInitializeInternal() { + sinkInitialized = Boolean.TRUE; + } + + /** + * @param records Records to be output + */ + @Override + public void doOutput(final Collection> records) { + if (records.isEmpty()) { + return; + } + snsSinkService.output(records); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfig.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfig.java new file mode 100644 index 0000000000..2e14e4718b --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfig.java @@ -0,0 +1,135 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.sns; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.sink.sns.configuration.AwsAuthenticationOptions; + +import java.util.Map; +import java.util.Objects; + +/** + * sns sink configuration class contains properties, used to read yaml configuration. + */ +public class SnsSinkConfig { + + private static final int DEFAULT_CONNECTION_RETRIES = 5; + + private static final int DEFAULT_BATCH_SIZE = 10; + + private static final int DEFAULT_UPLOAD_RETRIES = 5; + + public static final String STS_REGION = "region"; + + public static final String STS_ROLE_ARN = "sts_role_arn"; + + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("topic_arn") + @NotNull + private String topicArn; + + @JsonProperty("message_group_id") + private String messageGroupId; + + @JsonProperty("codec") + @NotNull + private PluginModel codec; + + @JsonProperty("dlq") + private PluginModel dlq; + + @JsonProperty("dlq_file") + private String dlqFile; + + @JsonProperty("batch_size") + private int batchSize = DEFAULT_BATCH_SIZE; + + @JsonProperty("message_deduplication_id") + private String messageDeduplicationId; + + private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; + + @JsonProperty("max_retries") + private int maxUploadRetries = DEFAULT_UPLOAD_RETRIES; + + public String getMessageDeduplicationId() { + return messageDeduplicationId; + } + + public int getBatchSize() { + return batchSize; + } + + public PluginModel getDlq() { + return dlq; + } + + public String getDlqFile() { + return dlqFile; + } + + /** + * Aws Authentication configuration Options. + * @return aws authentication options. + */ + public AwsAuthenticationOptions getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + public String getTopicArn() { + return topicArn; + } + + public String getMessageGroupId() { + return messageGroupId; + } + + /** + * Sink codec configuration Options. + * @return codec plugin model. + */ + public PluginModel getCodec() { + return codec; + } + + /** + * SNS client connection retries configuration Options. + * @return max connection retries value. + */ + public int getMaxConnectionRetries() { + return maxConnectionRetries; + } + + /** + * SNS object upload retries configuration Options. + * @return maximum upload retries value. + */ + public int getMaxUploadRetries() { + return maxUploadRetries; + } + + public String getDlqStsRoleARN(){ + return Objects.nonNull(getDlqPluginSetting().get(STS_ROLE_ARN)) ? + String.valueOf(getDlqPluginSetting().get(STS_ROLE_ARN)) : + awsAuthenticationOptions.getAwsStsRoleArn(); + } + + public String getDlqStsRegion(){ + return Objects.nonNull(getDlqPluginSetting().get(STS_REGION)) ? + String.valueOf(getDlqPluginSetting().get(STS_REGION)) : + awsAuthenticationOptions.getAwsRegion().toString(); + } + + public Map getDlqPluginSetting(){ + return dlq != null ? dlq.getPluginSettings() : Map.of(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java new file mode 100644 index 0000000000..ed42d3ab4f --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java @@ -0,0 +1,233 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.sns; + +import io.micrometer.core.instrument.Counter; + +import org.opensearch.dataprepper.expression.ExpressionEvaluationException; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.sink.sns.dlq.DlqPushHandler; +import org.opensearch.dataprepper.plugins.sink.sns.dlq.SnsSinkFailedDlqData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.sns.SnsClient; +import software.amazon.awssdk.services.sns.model.PublishBatchRequest; +import software.amazon.awssdk.services.sns.model.PublishBatchRequestEntry; +import software.amazon.awssdk.services.sns.model.PublishBatchResponse; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Class responsible for create {@link SnsClient} object, check thresholds, + * get new buffer and write records into buffer. + */ +public class SnsSinkService { + + private static final Logger LOG = LoggerFactory.getLogger(SnsSinkService.class); + + private static final String BUCKET = "bucket"; + + private static final String KEY_PATH = "key_path_prefix"; + + public static final String NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS = "snsSinkObjectsEventsSucceeded"; + + public static final String NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED = "snsSinkObjectsEventsFailed"; + + public static final String FIFO = ".fifo"; + + private final SnsSinkConfig snsSinkConfig; + + private final Lock reentrantLock; + + private final Collection bufferedEventHandles; + + private final SnsClient snsClient; + + private final DlqPushHandler dlqPushHandler; + + private final PluginSetting pluginSetting; + + private final List processRecordsList; + + private final String topicName; + + private final int maxRetries; + + private final Counter numberOfRecordsSuccessCounter; + + private final Counter numberOfRecordsFailedCounter; + + private final boolean isDocumentIdAnExpression; + + private final ExpressionEvaluator expressionEvaluator; + + /** + * @param snsSinkConfig sns sink related configuration. + * @param snsClient + * @param pluginMetrics metrics. + * @param pluginFactory + * @param pluginSetting + */ + public SnsSinkService(final SnsSinkConfig snsSinkConfig, + final SnsClient snsClient, + final PluginMetrics pluginMetrics, + final PluginFactory pluginFactory, + final PluginSetting pluginSetting, + final ExpressionEvaluator expressionEvaluator) { + this.snsSinkConfig = snsSinkConfig; + this.snsClient = snsClient; + this.reentrantLock = new ReentrantLock(); + this.bufferedEventHandles = new LinkedList<>(); + this.topicName = snsSinkConfig.getTopicArn(); + this.maxRetries = snsSinkConfig.getMaxUploadRetries(); + this.pluginSetting = pluginSetting; + this.expressionEvaluator = expressionEvaluator; + this.isDocumentIdAnExpression = expressionEvaluator.isValidExpressionStatement(snsSinkConfig.getMessageGroupId()); + this.processRecordsList = new ArrayList(); + this.numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS); + this.numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED); + + this.dlqPushHandler = new DlqPushHandler(snsSinkConfig.getDlqFile(), pluginFactory, + String.valueOf(snsSinkConfig.getDlqPluginSetting().get(BUCKET)), + snsSinkConfig.getDlqStsRoleARN() + ,snsSinkConfig.getDlqStsRegion(), + String.valueOf(snsSinkConfig.getDlqPluginSetting().get(KEY_PATH))); + } + + /** + * @param records received records and add into buffer. + */ + void output(Collection> records) { + reentrantLock.lock(); + try { + for (Record record : records) { + final Event event = record.getData(); + processRecordsList.add(event); + if (event.getEventHandle() != null) { + bufferedEventHandles.add(event.getEventHandle()); + } + if (snsSinkConfig.getBatchSize() == processRecordsList.size()) { + processRecords(); + processRecordsList.clear(); + } + } + // This block will process the last set of events below batch size + if(!processRecordsList.isEmpty()) { + processRecords(); + processRecordsList.clear(); + } + } catch (InterruptedException e) { + LOG.error("Exception while write event into buffer :", e); + } + reentrantLock.unlock(); + } + + private void processRecords() throws InterruptedException { + final AtomicReference errorMsgObj = new AtomicReference<>(); + final boolean isFlushToSNS = retryFlushToSNS(processRecordsList, topicName,errorMsgObj); + if (isFlushToSNS) { + numberOfRecordsSuccessCounter.increment(processRecordsList.size()); + } else { + numberOfRecordsFailedCounter.increment(processRecordsList.size()); + dlqPushHandler.perform(pluginSetting,new SnsSinkFailedDlqData(topicName,errorMsgObj.get(),0)); + } + releaseEventHandles(true); + } + + private void releaseEventHandles(final boolean result) { + for (EventHandle eventHandle : bufferedEventHandles) { + eventHandle.release(result); + } + bufferedEventHandles.clear(); + } + + /** + * perform retry in-case any issue occurred, based on max_upload_retries configuration. + * + * @param processRecordsList records to be process + * @param errorMsgObj pass the error message + * @return boolean based on object upload status. + * @throws InterruptedException interruption during sleep. + */ + protected boolean retryFlushToSNS(final List processRecordsList, + final String topicName, + final AtomicReference errorMsgObj) throws InterruptedException { + boolean isUploadedToSNS = Boolean.FALSE; + int retryCount = maxRetries; + do { + try { + publishToTopic(snsClient, topicName,processRecordsList); + isUploadedToSNS = Boolean.TRUE; + } catch (AwsServiceException | SdkClientException e) { + errorMsgObj.set(e.getMessage()); + LOG.error("Exception occurred while uploading records to sns. Retry countdown : {} | exception:", + retryCount, e); + --retryCount; + if (retryCount == 0) { + return isUploadedToSNS; + } + Thread.sleep(5000); + } + } while (!isUploadedToSNS); + return isUploadedToSNS; + } + + public void publishToTopic(SnsClient snsClient, String topicName,final List processRecordsList) { + LOG.debug("Trying to Push Msg to SNS: {}",topicName); + try { + final PublishBatchRequest request = createPublicRequestByTopic(topicName, processRecordsList); + final PublishBatchResponse publishBatchResponse = snsClient.publishBatch(request); + LOG.info(" Message sent. Status is " + publishBatchResponse.sdkHttpResponse().statusCode()); + }catch (Exception e) { + LOG.error("Error while pushing messages to topic ",e); + throw e; + } + } + + private PublishBatchRequest createPublicRequestByTopic(final String topicName, final List processRecordsList) { + final PublishBatchRequest.Builder requestBatch = PublishBatchRequest.builder().topicArn(topicName); + final List batchRequestEntries = new ArrayList(); + final String defaultRandomGroupId = UUID.randomUUID().toString(); + for (Event messageEvent : processRecordsList) { + final PublishBatchRequestEntry.Builder entry = PublishBatchRequestEntry.builder().id(String.valueOf(new Random().nextInt())).message(messageEvent.toJsonString()); + if(topicName.endsWith(FIFO)) + batchRequestEntries.add(entry + .messageGroupId(snsSinkConfig.getMessageGroupId() != null ? getMessageGroupId(messageEvent,snsSinkConfig.getMessageGroupId()) : defaultRandomGroupId) + .messageDeduplicationId(snsSinkConfig.getMessageDeduplicationId() != null ? getMessageGroupId(messageEvent,snsSinkConfig.getMessageDeduplicationId()) : UUID.randomUUID().toString()).build()); + else + batchRequestEntries.add(entry.build()); + } + requestBatch.publishBatchRequestEntries(batchRequestEntries); + return requestBatch.build(); + } + + private String getMessageGroupId(Event event,final String fieldName) { + if (isDocumentIdAnExpression) { + try { + return (String) expressionEvaluator.evaluate(fieldName, event); + } catch (final ExpressionEvaluationException e) { + LOG.error("Unable to construct message_group_id from expression {}, the message_group_id will be generated by Sns Sink", snsSinkConfig.getMessageGroupId()); + } + } + return event.get(fieldName, String.class); + } + +} diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/configuration/AwsAuthenticationOptions.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/configuration/AwsAuthenticationOptions.java new file mode 100644 index 0000000000..1c244de73b --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/configuration/AwsAuthenticationOptions.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.sns.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; +import software.amazon.awssdk.regions.Region; + +import java.util.Map; + +public class AwsAuthenticationOptions { + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + public String getAwsStsRoleArn() { + return awsStsRoleArn; + } + + public Map getAwsStsHeaderOverrides() { + return awsStsHeaderOverrides; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java new file mode 100644 index 0000000000..a3b6dd4995 --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.sns.dlq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import io.micrometer.core.instrument.util.StringUtils; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.failures.DlqObject; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.plugins.dlq.DlqProvider; +import org.opensearch.dataprepper.plugins.dlq.DlqWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.StringJoiner; + +import static java.util.UUID.randomUUID; + + +/** + * * An Handler class which helps log failed data to AWS S3 bucket or file based on configuration. + */ + +public class DlqPushHandler { + + private static final Logger LOG = LoggerFactory.getLogger(DlqPushHandler.class); + + private static final String BUCKET = "bucket"; + + private static final String ROLE_ARN = "sts_role_arn"; + + private static final String REGION = "region"; + + private static final String S3_PLUGIN_NAME = "s3"; + + private static final String KEY_PATH_PREFIX = "key_path_prefix"; + + private String dlqFile; + + private String keyPathPrefix; + + private DlqProvider dlqProvider; + + private ObjectWriter objectWriter; + + public DlqPushHandler(final String dlqFile, + final PluginFactory pluginFactory, + final String bucket, + final String stsRoleArn, + final String awsRegion, + final String dlqPathPrefix) { + if(dlqFile != null) { + this.dlqFile = dlqFile; + this.objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter(); + }else{ + this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,dlqPathPrefix); + } + } + + public void perform(final PluginSetting pluginSetting, + final Object failedData) { + if(dlqFile != null) + writeToFile(failedData); + else + pushToS3(pluginSetting, failedData); + } + + private void writeToFile(Object failedData) { + try(BufferedWriter dlqFileWriter = Files.newBufferedWriter(Paths.get(dlqFile), + StandardOpenOption.CREATE, StandardOpenOption.APPEND)) { + dlqFileWriter.write(objectWriter.writeValueAsString(failedData)+"\n"); + } catch (IOException e) { + LOG.error("Exception while writing failed data to DLQ file Exception: ",e); + } + } + + private void pushToS3(PluginSetting pluginSetting, Object failedData) { + DlqWriter dlqWriter = getDlqWriter(pluginSetting.getPipelineName()); + try { + String pluginId = randomUUID().toString(); + DlqObject dlqObject = DlqObject.builder() + .withPluginId(pluginId) + .withPluginName(pluginSetting.getName()) + .withPipelineName(pluginSetting.getPipelineName()) + .withFailedData(failedData) + .build(); + final List dlqObjects = Arrays.asList(dlqObject); + dlqWriter.write(dlqObjects, pluginSetting.getPipelineName(), pluginId); + LOG.info("wrote {} events to DLQ",dlqObjects.size()); + } catch (final IOException e) { + LOG.error("Exception while writing failed data to DLQ, Exception : ", e); + } + } + + private DlqWriter getDlqWriter(final String writerPipelineName) { + Optional potentialDlq = dlqProvider.getDlqWriter(new StringJoiner(MetricNames.DELIMITER) + .add(writerPipelineName).toString()); + DlqWriter dlqWriter = potentialDlq.isPresent() ? potentialDlq.get() : null; + return dlqWriter; + } + + private DlqProvider getDlqProvider(final PluginFactory pluginFactory, + final String bucket, + final String stsRoleArn, + final String awsRegion, + final String dlqPathPrefix) { + final Map props = new HashMap<>(); + props.put(BUCKET, bucket); + props.put(ROLE_ARN, stsRoleArn); + props.put(REGION, awsRegion); + this.keyPathPrefix = StringUtils.isEmpty(dlqPathPrefix) ? dlqPathPrefix : enforceDefaultDelimiterOnKeyPathPrefix(dlqPathPrefix); + props.put(KEY_PATH_PREFIX, dlqPathPrefix); + final PluginSetting dlqPluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + DlqProvider dlqProvider = pluginFactory.loadPlugin(DlqProvider.class, dlqPluginSetting); + return dlqProvider; + } + + private String enforceDefaultDelimiterOnKeyPathPrefix(final String keyPathPrefix) { + return (keyPathPrefix.charAt(keyPathPrefix.length() - 1) == '/') ? keyPathPrefix : keyPathPrefix.concat("/"); + } +} + diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/SnsSinkFailedDlqData.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/SnsSinkFailedDlqData.java new file mode 100644 index 0000000000..9ec14d336d --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/SnsSinkFailedDlqData.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.sns.dlq; + +public class SnsSinkFailedDlqData { + + private String topic; + + private String message; + + private int status; + + public SnsSinkFailedDlqData(String topic, String message, int status) { + this.topic = topic; + this.message = message; + this.status = status; + } + + public String getTopic() { + return topic; + } + + public SnsSinkFailedDlqData setTopic(String topic) { + this.topic = topic; + return this; + } + + public String getMessage() { + return message; + } + + public SnsSinkFailedDlqData setMessage(String message) { + this.message = message; + return this; + } + + public int getStatus() { + return status; + } + + public SnsSinkFailedDlqData setStatus(int status) { + this.status = status; + return this; + } + + @Override + public String toString() { + return "failedData\n" + + "topic \"" + topic + "\"\n" + + "message \"" + message + "\"\n" + + "status \"" + status +"\n"; + } +} diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactoryTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactoryTest.java new file mode 100644 index 0000000000..14499bc5fe --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactoryTest.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.sns; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.sink.sns.configuration.AwsAuthenticationOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sns.SnsClient; +import software.amazon.awssdk.services.sns.SnsClientBuilder; + +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class SnsClientFactoryTest { + @Mock + private SnsSinkConfig snsSinkConfig; + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + + @Mock + private AwsAuthenticationOptions awsAuthenticationOptions; + + @BeforeEach + void setUp() { + when(snsSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + } + + @Test + void createSnsClient_with_real_SnsClient() { + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); + final SnsClient s3Client = SnsClientFactory.createSNSClient(snsSinkConfig, awsCredentialsSupplier); + + assertThat(s3Client, notNullValue()); + } + + @ParameterizedTest + @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) + void createSNSClient_provides_correct_inputs(final String regionString) { + final Region region = Region.of(regionString); + final String stsRoleArn = UUID.randomUUID().toString(); + final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(region); + when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn); + when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides); + + final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class); + when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider); + + final SnsClientBuilder snsClientBuilder = mock(SnsClientBuilder.class); + when(snsClientBuilder.region(region)).thenReturn(snsClientBuilder); + when(snsClientBuilder.credentialsProvider(any())).thenReturn(snsClientBuilder); + when(snsClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(snsClientBuilder); + try(final MockedStatic s3ClientMockedStatic = mockStatic(SnsClient.class)) { + s3ClientMockedStatic.when(SnsClient::builder) + .thenReturn(snsClientBuilder); + SnsClientFactory.createSNSClient(snsSinkConfig, awsCredentialsSupplier); + } + + final ArgumentCaptor credentialsProviderArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsProvider.class); + verify(snsClientBuilder).credentialsProvider(credentialsProviderArgumentCaptor.capture()); + + final AwsCredentialsProvider actualCredentialsProvider = credentialsProviderArgumentCaptor.getValue(); + + assertThat(actualCredentialsProvider, equalTo(expectedCredentialsProvider)); + + final ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class); + verify(awsCredentialsSupplier).getProvider(optionsArgumentCaptor.capture()); + + final AwsCredentialsOptions actualCredentialsOptions = optionsArgumentCaptor.getValue(); + assertThat(actualCredentialsOptions.getRegion(), equalTo(region)); + assertThat(actualCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); + assertThat(actualCredentialsOptions.getStsHeaderOverrides(), equalTo(stsHeaderOverrides)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfigTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfigTest.java new file mode 100644 index 0000000000..2289e99ccd --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfigTest.java @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.sns; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.regions.Region; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +class SnsSinkConfigTest { + public static final int DEFAULT_MAX_RETRIES = 5; + private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Test + void sns_sink_default_id_test(){ + assertThat(new SnsSinkConfig().getMessageGroupId(),nullValue()); + } + + @Test + void sns_sink_default_topic_test() { + assertThat(new SnsSinkConfig().getTopicArn(),nullValue()); + } + + @Test + void sns_sink_default_codec_test(){ + assertThat(new SnsSinkConfig().getCodec(),nullValue()); + } + + @Test + void sns_sink_default_max_retries_test(){ + assertThat(new SnsSinkConfig().getMaxUploadRetries(),equalTo(DEFAULT_MAX_RETRIES)); + } + + @Test + void sns_sink_default_max_upload_retries_test(){ + assertThat(new SnsSinkConfig().getMaxUploadRetries(),equalTo(DEFAULT_MAX_RETRIES)); + } + + @Test + void sns_sink_default_max_connection_retries_test(){ + assertThat(new SnsSinkConfig().getMaxConnectionRetries(),equalTo(DEFAULT_MAX_RETRIES)); + } + + + @Test + void sns_sink_pipeline_config_test() throws JsonProcessingException { + final String config = " topic_arn: arn:aws:sns:ap-south-1:524239988912:my-topic\n" + + " message_group_id: test\n" + + " aws:\n" + + " region: ap-south-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " codec:\n" + + " ndjson:\n" + + " max_retries: 10\n" + + " dlq_file: /test/dlq-file.log\n" + + " dlq:\n" + + " s3:\n" + + " bucket: test\n" + + " key_path_prefix: test\n" + + " region: ap-south-1\n" + + " sts_role_arn: test-role-arn\n"; + final SnsSinkConfig snsSinkConfig = objectMapper.readValue(config, SnsSinkConfig.class); + assertThat(snsSinkConfig.getMaxUploadRetries(),equalTo(10)); + assertThat(snsSinkConfig.getTopicArn(),equalTo("arn:aws:sns:ap-south-1:524239988912:my-topic")); + assertThat(snsSinkConfig.getMessageGroupId(),equalTo("test")); + assertThat(snsSinkConfig.getAwsAuthenticationOptions().getAwsRegion(),equalTo(Region.AP_SOUTH_1)); + assertThat(snsSinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(),equalTo("arn:aws:iam::524239988912:role/app-test")); + assertThat(snsSinkConfig.getAwsAuthenticationOptions().getAwsStsHeaderOverrides().get("test"),equalTo("test")); + assertThat(snsSinkConfig.getDlqStsRegion(),equalTo("ap-south-1")); + assertThat(snsSinkConfig.getDlqStsRoleARN(),equalTo("test-role-arn")); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceTest.java new file mode 100644 index 0000000000..658105ad89 --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceTest.java @@ -0,0 +1,184 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.sns; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.http.SdkHttpResponse; +import software.amazon.awssdk.services.sns.SnsClient; +import software.amazon.awssdk.services.sns.model.PublishBatchRequest; +import software.amazon.awssdk.services.sns.model.PublishBatchResponse; +import software.amazon.awssdk.services.sns.model.SnsException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; + +public class SnsSinkServiceTest { + + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + private SnsSinkConfig snsSinkConfig; + + private SnsClient snsClient; + + private PluginMetrics pluginMetrics; + + private Counter numberOfRecordsSuccessCounter; + + private Counter numberOfRecordsFailedCounter; + + private static final String config = " topic_arn: arn:aws:sns:ap-south-1:524239988912:my-topic\n" + + " message_group_id: test\n" + + " aws:\n" + + " region: ap-south-1\n" + + " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" + + " sts_header_overrides: {\"test\":\"test\"}\n" + + " codec:\n" + + " ndjson:\n" + + " max_retries: 10\n" + + " dlq_file: C:\\Work\\dlq.txt\n"; + private PublishBatchResponse publishBatchResponse; + + private SdkHttpResponse sdkHttpResponse; + + @BeforeEach + public void setup() throws JsonProcessingException { + this.snsSinkConfig = objectMapper.readValue(config, SnsSinkConfig.class); + this.snsClient = mock(SnsClient.class); + this.pluginMetrics = mock(PluginMetrics.class); + this.numberOfRecordsSuccessCounter = mock(Counter.class); + this.numberOfRecordsFailedCounter = mock(Counter.class); + this.publishBatchResponse = mock(PublishBatchResponse.class); + this.sdkHttpResponse = mock(SdkHttpResponse.class); + when(pluginMetrics.counter(SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS)).thenReturn(numberOfRecordsSuccessCounter); + when(pluginMetrics.counter(SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED)).thenReturn(numberOfRecordsFailedCounter); + when(snsClient.publishBatch(any(PublishBatchRequest.class))).thenReturn(publishBatchResponse); + when(publishBatchResponse.sdkHttpResponse()).thenReturn(sdkHttpResponse); + when(publishBatchResponse.sdkHttpResponse().statusCode()).thenReturn(new Random().nextInt()); + when(sdkHttpResponse.statusCode()).thenReturn(200); + } + + + private SnsSinkService createObjectUnderTest(){ + return new SnsSinkService(snsSinkConfig, + snsClient, + pluginMetrics, mock(PluginFactory.class), + mock(PluginSetting.class), + mock(ExpressionEvaluator.class)); + } + + @Test + public void sns_sink_test_with_empty_collection_records(){ + numberOfRecordsSuccessCounter = mock(Counter.class); + SnsSinkService snsSinkService = createObjectUnderTest(); + snsSinkService.output(List.of()); + verifyNoInteractions(snsClient); + verifyNoInteractions(numberOfRecordsSuccessCounter); + verifyNoInteractions(numberOfRecordsFailedCounter); + } + + @Test + public void sns_sink_test_with_single_collection_record_success_push_to_sns(){ + when(sdkHttpResponse.statusCode()).thenReturn(200); + when(snsClient.publishBatch(any(PublishBatchRequest.class))).thenReturn(publishBatchResponse); + SnsSinkService snsSinkService = createObjectUnderTest(); + final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); + Collection> records = List.of(eventRecord); + final ArgumentCaptor publishBatchRequestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class); + + snsSinkService.output(records); + + verify(snsClient).publishBatch(publishBatchRequestCaptor.capture()); + final PublishBatchRequest actualRequest = publishBatchRequestCaptor.getValue(); + assertThat(actualRequest.topicArn(), equalTo("arn:aws:sns:ap-south-1:524239988912:my-topic")); + assertThat(actualRequest.publishBatchRequestEntries().size(), equalTo(1)); + assertThat(actualRequest.publishBatchRequestEntries().get(0).message(),equalTo(eventRecord.getData().toJsonString())); + verify(numberOfRecordsSuccessCounter).increment(records.size()); + } + + @Test + public void sns_sink_test_with_multiple_collection_record_success_push_to_sns(){ + SnsSinkService snsSinkService = createObjectUnderTest(); + final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); + Collection> records = new ArrayList<>(); + for(int recordSize = 0; recordSize <= 11 ; recordSize++) { + records.add(eventRecord); + } + final ArgumentCaptor publishBatchRequestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class); + + snsSinkService.output(records); + + verify(snsClient,times(2)).publishBatch(publishBatchRequestCaptor.capture()); + final PublishBatchRequest actualRequest = publishBatchRequestCaptor.getValue(); + assertThat(actualRequest.topicArn(), equalTo("arn:aws:sns:ap-south-1:524239988912:my-topic")); + assertThat(actualRequest.publishBatchRequestEntries().size(), equalTo(2)); + assertThat(actualRequest.publishBatchRequestEntries().get(0).message(),equalTo(eventRecord.getData().toJsonString())); + } + + @Test + public void sns_sink_test_with_single_collection_record_failed_to_push_to_sns() throws IOException { + final Counter snsSinkObjectsEventsFailedCounter = mock(Counter.class); + when(pluginMetrics.counter(SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED)).thenReturn(snsSinkObjectsEventsFailedCounter); + final SnsException snsException = (SnsException)SnsException.builder().message("internal server error").awsErrorDetails(AwsErrorDetails.builder().errorMessage("internal server error").build()).build(); + when(snsClient.publishBatch(any(PublishBatchRequest.class))).thenThrow(snsException); + SnsSinkService snsSinkService = createObjectUnderTest(); + final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); + Collection> records = List.of(eventRecord); + final ArgumentCaptor publishBatchRequestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class); + snsSinkService.output(records); + verify(snsClient,times(10)).publishBatch(publishBatchRequestCaptor.capture()); + final PublishBatchRequest actualRequest = publishBatchRequestCaptor.getValue(); + assertThat(actualRequest.topicArn(), equalTo("arn:aws:sns:ap-south-1:524239988912:my-topic")); + assertThat(actualRequest.publishBatchRequestEntries().size(), equalTo(1)); + assertThat(actualRequest.publishBatchRequestEntries().get(0).message(),equalTo(eventRecord.getData().toJsonString())); + verify(snsSinkObjectsEventsFailedCounter).increment(records.size()); + } + + @Test + void sns_sink_service_test_output_with_single_record_ack_release() { + final SnsSinkService snsSinkService = createObjectUnderTest(); + final Event event = mock(Event.class); + given(event.toJsonString()).willReturn("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}"); + given(event.getEventHandle()).willReturn(mock(EventHandle.class)); + final ArgumentCaptor publishBatchRequestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class); + + snsSinkService.output(List.of(new Record<>(event))); + + verify(snsClient,times(1)).publishBatch(publishBatchRequestCaptor.capture()); + final PublishBatchRequest actualRequest = publishBatchRequestCaptor.getValue(); + assertThat(actualRequest.topicArn(), equalTo("arn:aws:sns:ap-south-1:524239988912:my-topic")); + assertThat(actualRequest.publishBatchRequestEntries().size(), equalTo(1)); + assertThat(actualRequest.publishBatchRequestEntries().get(0).message(),equalTo("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}")); + verify(numberOfRecordsSuccessCounter).increment(1); + } +} diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkTest.java new file mode 100644 index 0000000000..9294c6172c --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkTest.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.sns; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.sink.sns.configuration.AwsAuthenticationOptions; +import software.amazon.awssdk.regions.Region; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class SnsSinkTest { + + public static final int MAX_EVENTS = 100; + public static final int MAX_RETRIES = 5; + public static final String BUCKET_NAME = "dataprepper"; + public static final String S3_REGION = "us-east-1"; + public static final String MAXIMUM_SIZE = "1kb"; + public static final String OBJECT_KEY_NAME_PATTERN = "my-elb-%{yyyy-MM-dd'T'hh-mm-ss}"; + public static final String CODEC_PLUGIN_NAME = "json"; + public static final String SINK_PLUGIN_NAME = "sns"; + public static final String SINK_PIPELINE_NAME = "sns-sink-pipeline"; + private SnsSinkConfig snsSinkConfig; + private SnsSink snsSink; + private PluginSetting pluginSetting; + private PluginFactory pluginFactory; + private AwsCredentialsSupplier awsCredentialsSupplier; + private SinkContext sinkContext; + private PluginModel pluginModel; + + @BeforeEach + void setUp() { + snsSinkConfig = mock(SnsSinkConfig.class); + sinkContext = mock(SinkContext.class); + AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + pluginSetting = mock(PluginSetting.class); + PluginModel pluginModel = mock(PluginModel.class); + pluginFactory = mock(PluginFactory.class); + awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); + Map dlqMap = mock(HashMap.class); + + when(snsSinkConfig.getMessageGroupId()).thenReturn("/message"); + when(snsSinkConfig.getMessageDeduplicationId()).thenReturn("/message"); + when(snsSinkConfig.getDlq()).thenReturn(pluginModel); + when(pluginModel.getPluginSettings()).thenReturn(dlqMap); + when(snsSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(S3_REGION)); + when(snsSinkConfig.getCodec()).thenReturn(pluginModel); + when(pluginModel.getPluginName()).thenReturn(CODEC_PLUGIN_NAME); + when(pluginSetting.getName()).thenReturn(SINK_PLUGIN_NAME); + when(pluginSetting.getPipelineName()).thenReturn(SINK_PIPELINE_NAME); + } + + private SnsSink createObjectUnderTest() { + return new SnsSink(pluginSetting, snsSinkConfig, pluginFactory, sinkContext,mock(ExpressionEvaluator.class), awsCredentialsSupplier); + } + + @Test + void test_sns_sink_plugin_isReady_positive() { + snsSink = createObjectUnderTest(); + Assertions.assertNotNull(snsSink); + Assertions.assertNotNull(snsSinkConfig); + snsSink.doInitialize(); + assertTrue(snsSink.isReady(), "sns sink is not initialized and not ready to work"); + } + + @Test + void test_sns_sink_plugin_isReady_negative() { + snsSink = createObjectUnderTest(); + Assertions.assertNotNull(snsSink); + assertFalse(snsSink.isReady(), "sns sink is initialized and ready to work"); + } +} diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java new file mode 100644 index 0000000000..5af66430d1 --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.sns.dlq; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.plugins.dlq.DlqProvider; +import org.opensearch.dataprepper.plugins.dlq.DlqWriter; +import org.opensearch.dataprepper.plugins.sink.sns.configuration.AwsAuthenticationOptions; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class DlqPushHandlerTest { + + private static final String BUCKET = "bucket"; + private static final String BUCKET_VALUE = "test"; + private static final String ROLE = "arn:aws:iam::524239988122:role/app-test"; + + private static final String REGION = "ap-south-1"; + private static final String S3_PLUGIN_NAME = "s3"; + private static final String KEY_PATH_PREFIX = "key_path_prefix"; + + private static final String KEY_PATH_PREFIX_VALUE = "dlq/"; + + private static final String PIPELINE_NAME = "log-pipeline"; + + private static final String DLQ_FILE = "local_dlq_file"; + + private PluginModel pluginModel; + + private DlqPushHandler dlqPushHandler; + private PluginFactory pluginFactory; + + private AwsAuthenticationOptions awsAuthenticationOptions; + + private DlqProvider dlqProvider; + + private DlqWriter dlqWriter; + + + @BeforeEach + public void setUp(){ + this.pluginFactory = mock(PluginFactory.class); + this.pluginModel = mock(PluginModel.class); + this.awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + this.dlqProvider = mock(DlqProvider.class); + this.dlqWriter = mock(DlqWriter.class); + } + + @Test + void perform_for_dlq_s3_success() throws IOException { + Map props = new HashMap<>(); + props.put(BUCKET,BUCKET_VALUE); + props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE); + + when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider); + + when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter)); + doNothing().when(dlqWriter).write(anyList(), anyString(), anyString()); + SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0); + dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE); + + PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props); + pluginSetting.setPipelineName(PIPELINE_NAME); + dlqPushHandler.perform(pluginSetting, failedDlqData); + Assertions.assertNotNull(pluginFactory); + verify(dlqWriter).write(anyList(), anyString(), anyString()); + } + + + @Test + void perform_for_dlq_local_file_success(){ + + SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0); + dlqPushHandler = new DlqPushHandler(DLQ_FILE,pluginFactory,null, ROLE, REGION,null); + + PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, null); + pluginSetting.setPipelineName(PIPELINE_NAME); + dlqPushHandler.perform(pluginSetting, failedDlqData); + } +} diff --git a/data-prepper-plugins/sns-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/data-prepper-plugins/sns-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000..23c33feb6d --- /dev/null +++ b/data-prepper-plugins/sns-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1,3 @@ +# To enable mocking of final classes with vanilla Mockito +# https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods +mock-maker-inline diff --git a/settings.gradle b/settings.gradle index 02125825fd..ba5f619261 100644 --- a/settings.gradle +++ b/settings.gradle @@ -128,4 +128,5 @@ include 'data-prepper-plugins:buffer-common' include 'data-prepper-plugins:sqs-source' include 'data-prepper-plugins:cloudwatch-logs' include 'data-prepper-plugins:http-sink' +include 'data-prepper-plugins:sns-sink' include 'data-prepper-plugins:prometheus-sink'