diff --git a/data-prepper-plugins/sns-sink/README.md b/data-prepper-plugins/sns-sink/README.md
new file mode 100644
index 0000000000..f84d5d8566
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/README.md
@@ -0,0 +1,79 @@
+# SNS Sink
+
+This is the Data Prepper SNS Sink plugin that sends records to an SNS Topic.
+
+## Usages
+
+The SNS sink should be configured as part of Data Prepper pipeline yaml file.
+
+## Configuration Options
+
+```
+pipeline:
+ ...
+ sink:
+ - sns:
+ topic_arn: arn:aws:sns:ap-south-1:524239988922:my-topic
+ message_group_id: /type
+ message_deduplication_id: /id
+ batch_size: 10
+ aws:
+ region: ap-south-1
+ sts_role_arn: arn:aws:iam::524239988922:role/app-test
+ dlq:
+ s3:
+ bucket: test-bucket
+ key_path_prefix: dlq/
+ codec:
+ ndjson:
+ max_retries: 5
+```
+
+## SNS Pipeline Configuration
+
+- `topic_arn` (Optional) : The SNS Topic Arn of the Topic to push events.
+
+- `batch_size` (Optional) : An integer value indicates the maximum number of events required to ingest into sns topic. Defaults to 10.
+
+- `message_group_id` (optional): A string of message group identifier which is used as `message_group_id` for the message group when it is stored in the sns topic. Default to Auto generated Random key.
+
+- `message_deduplication_id` (Optional) : A string of message deduplication identifier which is used as `message_deduplication_id` for the message deduplication when it is stored in the sns topic. Default to Auto generated Random key.
+
+- `dlq_file`(optional): A String of absolute file path for DLQ failed output records. Defaults to null.
+ If not provided, failed records will be written into the default data-prepper log file (`logs/Data-Prepper.log`). If the `dlq` option is present along with this, an error is thrown.
+
+- `dlq` (optional): DLQ configurations. See [DLQ](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/README.md) for details. If the `dlq_file` option is present along with this, an error is thrown.
+
+- `codec` : This plugin is integrated with sink codec
+
+- `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details. SigV4 is enabled by default when this option is used. If this option is present, `aws_` options are not expected to be present. If any of `aws_` options are present along with this, error is thrown.
+
+- `max_retries` (Optional) : An integer value indicates the maximum number of times that single request should be retired in-order to ingest data to amazon SNS and S3. Defaults to `5`.
+
+### AWS Configuration
+
+* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
+* `sts_role_arn` (Optional) : The AWS STS role to assume for requests to SNS and S3. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
+
+
+### Counters
+
+* `snsSinkObjectsEventsSucceeded` - The number of events that the SNS sink has successfully sent to Topic.
+* `snsSinkObjectsEventsFailed` - The number of events that the SNS sink has successfully sent to Topic.
+
+## Developer Guide
+
+This plugin is compatible with Java 11. See below
+
+- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
+- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
+
+The integration tests for this plugin do not run as part of the Data Prepper build.
+
+The following command runs the integration tests:
+
+Note: Subscribe sns topic to sqs queues to run the integration tests.
+
+```
+./gradlew :data-prepper-plugins:sns-sink:integrationTest -Dtests.sns.sink.region=<> -Dtests.sns.sink.sts.role.arn=<> -Dtests.sns.sink.standard.topic=<> -Dtests.sns.sink.fifo.topic=<> -Dtests.sns.sink.dlq.file.path=<> -Dtests.sns.sink.standard.sqs.queue.url=<> -Dtests.sns.sink.fifo.sqs.queue.url=<>
+```
diff --git a/data-prepper-plugins/sns-sink/build.gradle b/data-prepper-plugins/sns-sink/build.gradle
new file mode 100644
index 0000000000..49d2918b00
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/build.gradle
@@ -0,0 +1,64 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+dependencies {
+ implementation project(':data-prepper-api')
+ implementation project(path: ':data-prepper-plugins:common')
+ implementation project(':data-prepper-plugins:aws-plugin-api')
+ implementation 'io.micrometer:micrometer-core'
+ implementation 'com.fasterxml.jackson.core:jackson-core'
+ implementation 'com.fasterxml.jackson.core:jackson-databind'
+ implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
+ implementation 'software.amazon.awssdk:sns'
+ implementation 'software.amazon.awssdk:sts'
+ testImplementation 'software.amazon.awssdk:sqs'
+ implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.8.21'
+ implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.8.21'
+ implementation 'org.apache.commons:commons-lang3:3.12.0'
+ implementation project(':data-prepper-plugins:failures-common')
+ testImplementation project(':data-prepper-test-common')
+ testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
+}
+
+test {
+ useJUnitPlatform()
+}
+
+sourceSets {
+ integrationTest {
+ java {
+ compileClasspath += main.output + test.output
+ runtimeClasspath += main.output + test.output
+ srcDir file('src/integrationTest/java')
+ }
+ resources.srcDir file('src/integrationTest/resources')
+ }
+}
+
+configurations {
+ integrationTestImplementation.extendsFrom testImplementation
+ integrationTestRuntime.extendsFrom testRuntime
+}
+
+task integrationTest(type: Test) {
+ group = 'verification'
+ testClassesDirs = sourceSets.integrationTest.output.classesDirs
+
+ useJUnitPlatform()
+
+ classpath = sourceSets.integrationTest.runtimeClasspath
+ systemProperty 'tests.sns.sink.region', System.getProperty('tests.sns.sink.region')
+ systemProperty 'tests.sns.sink.dlq.file.path', System.getProperty('tests.sns.sink.dlq.file.path')
+ systemProperty 'tests.sns.sink.sts.role.arn', System.getProperty('tests.sns.sink.sts.role.arn')
+ systemProperty 'tests.sns.sink.standard.topic', System.getProperty('tests.sns.sink.standard.topic')
+ systemProperty 'tests.sns.sink.fifo.topic', System.getProperty('tests.sns.sink.fifo.topic')
+ systemProperty 'tests.sns.sink.standard.sqs.queue.url', System.getProperty('tests.sns.sink.standard.sqs.queue.url')
+ systemProperty 'tests.sns.sink.fifo.sqs.queue.url', System.getProperty('tests.sns.sink.fifo.sqs.queue.url')
+
+ filter {
+ includeTestsMatching '*IT'
+ }
+}
+
diff --git a/data-prepper-plugins/sns-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceIT.java b/data-prepper-plugins/sns-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceIT.java
new file mode 100644
index 0000000000..b3c235dab1
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceIT.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package org.opensearch.dataprepper.plugins.sink.sns;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
+import io.micrometer.core.instrument.Counter;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.opensearch.dataprepper.expression.ExpressionEvaluator;
+import org.opensearch.dataprepper.metrics.PluginMetrics;
+import org.opensearch.dataprepper.model.configuration.PluginSetting;
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.event.EventHandle;
+import org.opensearch.dataprepper.model.event.JacksonEvent;
+import org.opensearch.dataprepper.model.log.JacksonLog;
+import org.opensearch.dataprepper.model.plugin.PluginFactory;
+import org.opensearch.dataprepper.model.record.Record;
+import org.opensearch.dataprepper.plugins.sink.sns.dlq.DlqPushHandler;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sns.SnsClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.MessageFormat;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.anyDouble;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.opensearch.dataprepper.plugins.sink.sns.SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED;
+import static org.opensearch.dataprepper.plugins.sink.sns.SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS;
+
+public class SnsSinkServiceIT {
+
+ private SnsClient snsClient;
+
+ private PluginMetrics pluginMetrics;
+
+ private PluginFactory pluginFactory;
+
+ private PluginSetting pluginSetting;
+
+ private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));
+
+
+ private static final String SNS_SINK_CONFIG_YAML = " topic_arn: {0}\n" +
+ " batch_size: 10\n" +
+ " aws:\n" +
+ " region: {1}\n" +
+ " sts_role_arn: {2}\n" +
+ " dlq_file: {3}\n" +
+ " codec:\n" +
+ " ndjson:\n" +
+ " max_retries: 5";
+
+ private String standardTopic;
+
+ private String fifoTopic;
+
+ private String region;
+
+ private String stsRoleArn;
+
+ private String dlqFilePath;
+
+ private Counter snsSinkObjectsEventsSucceeded;
+
+ private Counter numberOfRecordsFailedCounter;
+
+ private String standardSqsQueue;
+
+ private SqsClient sqsClient;
+
+ private String fifoSqsQueue;
+
+ private DlqPushHandler dlqPushHandler;
+
+ @BeforeEach
+ public void setup() {
+ this.standardTopic = System.getProperty("tests.sns.sink.standard.topic");
+ this.fifoTopic = System.getProperty("tests.sns.sink.fifo.topic");
+ this.region = System.getProperty("tests.sns.sink.region");
+ this.stsRoleArn = System.getProperty("tests.sns.sink.sts.role.arn");
+ this.dlqFilePath = System.getProperty("tests.sns.sink.dlq.file.path");
+ this.standardSqsQueue = System.getProperty("tests.sns.sink.standard.sqs.queue.url");
+ this.fifoSqsQueue = System.getProperty("tests.sns.sink.fifo.sqs.queue.url");
+
+ this.dlqPushHandler = mock(DlqPushHandler.class);
+ this.pluginMetrics = mock(PluginMetrics.class);
+ this.pluginFactory = mock(PluginFactory.class);
+ this.pluginSetting = mock(PluginSetting.class);
+ this.snsSinkObjectsEventsSucceeded = mock(Counter.class);
+ this.numberOfRecordsFailedCounter = mock(Counter.class);
+ this.snsClient = SnsClient.builder()
+ .region(Region.of(region))
+ .build();
+ this.sqsClient = SqsClient.builder().region(Region.of(region)).build();
+ when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS)).thenReturn(snsSinkObjectsEventsSucceeded);
+ when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED)).thenReturn(numberOfRecordsFailedCounter);
+ }
+
+ private Collection> setEventQueue(final int records) {
+ final Collection> jsonObjects = new LinkedList<>();
+ for (int i = 0; i < records; i++)
+ jsonObjects.add(createRecord());
+ return jsonObjects;
+ }
+
+ private static Record createRecord() {
+ final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build();
+ event.setEventHandle(mock(EventHandle.class));
+ return new Record<>(event);
+ }
+
+ public SnsSinkService createObjectUnderTest(final String topicName) throws JsonProcessingException {
+ String[] values = { topicName,region,stsRoleArn,dlqFilePath };
+ final String configYaml = MessageFormat.format(SNS_SINK_CONFIG_YAML, values);
+ final SnsSinkConfig snsSinkConfig = objectMapper.readValue(configYaml, SnsSinkConfig.class);
+ return new SnsSinkService(snsSinkConfig,snsClient,pluginMetrics,pluginFactory,pluginSetting,mock(ExpressionEvaluator.class));
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {5,9,10})
+ public void sns_sink_service_test_with_standard_queue_with_multiple_records(final int recordCount) throws JsonProcessingException, InterruptedException {
+ final SnsSinkService objectUnderTest = createObjectUnderTest(standardTopic);
+ final Collection> records = setEventQueue(recordCount);
+ final List inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList());
+ objectUnderTest.output(records);
+ Thread.sleep(Duration.ofSeconds(10).toMillis());
+ List topicData = readMessagesFromSNSTopicQueue(inputRecords,standardSqsQueue);
+ assertThat(inputRecords, is(topicData));
+ assertThat(inputRecords.size(), equalTo(topicData.size()));
+ verify(snsSinkObjectsEventsSucceeded).increment(recordCount);
+ }
+
+ @Test
+ public void sns_sink_service_test_with_standard_queue_with_multiple_batch() throws JsonProcessingException, InterruptedException {
+ final SnsSinkService objectUnderTest = createObjectUnderTest(standardTopic);
+ final Collection> records = setEventQueue(11);
+ final List inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList());
+ objectUnderTest.output(records);
+ Thread.sleep(Duration.ofSeconds(10).toMillis());
+ List topicData = readMessagesFromSNSTopicQueue(inputRecords,standardSqsQueue);
+ assertThat(inputRecords, is(topicData));
+ assertThat(inputRecords.size(), equalTo(topicData.size()));
+ verify(snsSinkObjectsEventsSucceeded,times(2)).increment(anyDouble());
+ }
+
+ private List readMessagesFromSNSTopicQueue(List inputRecords, final String sqsQueue) {
+ final List messages = new ArrayList<>();
+ long startTime = System.currentTimeMillis();
+ long endTime = startTime + 60000;
+ do {
+ messages.addAll(sqsClient.receiveMessage(builder -> builder.queueUrl(sqsQueue)).messages());
+ if(messages.size() >= inputRecords.size()){
+ break;
+ }
+ } while (System.currentTimeMillis() < endTime);
+
+ List topicData = messages.stream().map(Message::body).map(obj-> {
+ try {
+ Map map = objectMapper.readValue(obj,Map.class);
+ return map.get("Message");
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ return topicData;
+ }
+
+ private void deleteSqsMessages(String sqsQueue, List messages) throws InterruptedException {
+ for (Message message : messages) {
+ sqsClient.deleteMessage(builder -> builder.queueUrl(sqsQueue).receiptHandle(message.receiptHandle()));
+ Thread.sleep(Duration.ofSeconds(2).toMillis());
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {1, 5, 10})
+ public void sns_sink_service_test_with_fifo_queue_with_multiple_records(final int recordCount) throws JsonProcessingException, InterruptedException {
+ final SnsSinkService objectUnderTest = createObjectUnderTest(fifoTopic);
+ final Collection> records = setEventQueue(recordCount);
+ final List inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList());
+ objectUnderTest.output(records);
+ Thread.sleep(Duration.ofSeconds(10).toMillis());
+ List topicData = readMessagesFromSNSTopicQueue(inputRecords,fifoSqsQueue);
+ assertThat(inputRecords, is(topicData));
+ assertThat(inputRecords.size(), equalTo(topicData.size()));
+ verify(snsSinkObjectsEventsSucceeded).increment(recordCount);
+ }
+
+
+
+ @ParameterizedTest
+ @ValueSource(ints = {1,5,9})
+ public void sns_sink_service_test_fail_to_push(final int recordCount) throws IOException, InterruptedException {
+ final ObjectMapper mapper = new ObjectMapper();
+ final String topic = "test";
+ Files.deleteIfExists(Path.of(dlqFilePath));
+ final SnsSinkService objectUnderTest = createObjectUnderTest(topic);
+ final Collection> records = setEventQueue(recordCount);
+ objectUnderTest.output(records);
+ Thread.sleep(Duration.ofSeconds(10).toMillis());
+ verify(numberOfRecordsFailedCounter).increment(recordCount);
+ final Map map = mapper.readValue(new String(Files.readAllBytes(Path.of(dlqFilePath))).replaceAll("(\\r|\\n)", ""), Map.class);
+ assertThat(map.get("topic"),equalTo(topic));
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactory.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactory.java
new file mode 100644
index 0000000000..b8fac5e4cb
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.sink.sns;
+
+import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
+import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
+import org.opensearch.dataprepper.plugins.sink.sns.configuration.AwsAuthenticationOptions;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.services.sns.SnsClient;
+
+public class SnsClientFactory {
+
+ public static SnsClient createSNSClient(final SnsSinkConfig snsSinkConfig,
+ final AwsCredentialsSupplier awsCredentialsSupplier) {
+ final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(snsSinkConfig.getAwsAuthenticationOptions());
+ final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions);
+
+ return SnsClient.builder()
+ .region(snsSinkConfig.getAwsAuthenticationOptions().getAwsRegion())
+ .credentialsProvider(awsCredentialsProvider)
+ .overrideConfiguration(createOverrideConfiguration(snsSinkConfig)).build();
+ }
+
+ private static ClientOverrideConfiguration createOverrideConfiguration(final SnsSinkConfig snsSinkConfig) {
+ final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(snsSinkConfig.getMaxConnectionRetries()).build();
+ return ClientOverrideConfiguration.builder()
+ .retryPolicy(retryPolicy)
+ .build();
+ }
+
+ private static AwsCredentialsOptions convertToCredentialsOptions(final AwsAuthenticationOptions awsAuthenticationOptions) {
+ return AwsCredentialsOptions.builder()
+ .withRegion(awsAuthenticationOptions.getAwsRegion())
+ .withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn())
+ .withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides())
+ .build();
+ }
+}
diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSink.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSink.java
new file mode 100644
index 0000000000..9c4e40e769
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSink.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package org.opensearch.dataprepper.plugins.sink.sns;
+
+import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
+import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
+import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
+import org.opensearch.dataprepper.model.configuration.PluginModel;
+import org.opensearch.dataprepper.model.configuration.PluginSetting;
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
+import org.opensearch.dataprepper.model.plugin.PluginFactory;
+import org.opensearch.dataprepper.model.record.Record;
+import org.opensearch.dataprepper.model.sink.AbstractSink;
+import org.opensearch.dataprepper.model.sink.Sink;
+import org.opensearch.dataprepper.model.sink.SinkContext;
+import org.opensearch.dataprepper.expression.ExpressionEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sns.SnsClient;
+
+import java.util.Collection;
+
+/**
+ * Implementation class of sns-sink plugin. It is responsible for receive the collection of
+ * {@link Event} and upload to amazon sns based on thresholds configured.
+ */
+@DataPrepperPlugin(name = "sns", pluginType = Sink.class, pluginConfigurationType = SnsSinkConfig.class)
+public class SnsSink extends AbstractSink> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SnsSink.class);
+
+ private volatile boolean sinkInitialized;
+
+ private final SnsSinkService snsSinkService;
+
+ /**
+ * @param pluginSetting dp plugin settings.
+ * @param snsSinkConfig sns sink configurations.
+ * @param pluginFactory dp plugin factory.
+ */
+ @DataPrepperPluginConstructor
+ public SnsSink(final PluginSetting pluginSetting,
+ final SnsSinkConfig snsSinkConfig,
+ final PluginFactory pluginFactory,
+ final SinkContext sinkContext,
+ final ExpressionEvaluator expressionEvaluator,
+ final AwsCredentialsSupplier awsCredentialsSupplier) {
+ super(pluginSetting);
+ final PluginModel codecConfiguration = snsSinkConfig.getCodec();
+ final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
+ codecConfiguration.getPluginSettings());
+ // TODO: Sink codec changes are pending
+ // codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings);
+ sinkInitialized = Boolean.FALSE;
+ final SnsClient snsClient = SnsClientFactory.createSNSClient(snsSinkConfig, awsCredentialsSupplier);
+
+
+
+ snsSinkService = new SnsSinkService(snsSinkConfig,
+ snsClient,
+ pluginMetrics,
+ pluginFactory,
+ pluginSetting,
+ expressionEvaluator);
+ }
+
+ @Override
+ public boolean isReady() {
+ return sinkInitialized;
+ }
+
+ @Override
+ public void doInitialize() {
+ try {
+ doInitializeInternal();
+ } catch (InvalidPluginConfigurationException e) {
+ LOG.error("Invalid plugin configuration, Hence failed to initialize sns-sink plugin.");
+ this.shutdown();
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Failed to initialize sns-sink plugin.");
+ this.shutdown();
+ throw e;
+ }
+ }
+
+ /**
+ * Initialize {@link SnsSinkService}
+ */
+ private void doInitializeInternal() {
+ sinkInitialized = Boolean.TRUE;
+ }
+
+ /**
+ * @param records Records to be output
+ */
+ @Override
+ public void doOutput(final Collection> records) {
+ if (records.isEmpty()) {
+ return;
+ }
+ snsSinkService.output(records);
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfig.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfig.java
new file mode 100644
index 0000000000..2e14e4718b
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfig.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package org.opensearch.dataprepper.plugins.sink.sns;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.validation.Valid;
+import jakarta.validation.constraints.NotNull;
+import org.opensearch.dataprepper.model.configuration.PluginModel;
+import org.opensearch.dataprepper.plugins.sink.sns.configuration.AwsAuthenticationOptions;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * sns sink configuration class contains properties, used to read yaml configuration.
+ */
+public class SnsSinkConfig {
+
+ private static final int DEFAULT_CONNECTION_RETRIES = 5;
+
+ private static final int DEFAULT_BATCH_SIZE = 10;
+
+ private static final int DEFAULT_UPLOAD_RETRIES = 5;
+
+ public static final String STS_REGION = "region";
+
+ public static final String STS_ROLE_ARN = "sts_role_arn";
+
+ @JsonProperty("aws")
+ @NotNull
+ @Valid
+ private AwsAuthenticationOptions awsAuthenticationOptions;
+
+ @JsonProperty("topic_arn")
+ @NotNull
+ private String topicArn;
+
+ @JsonProperty("message_group_id")
+ private String messageGroupId;
+
+ @JsonProperty("codec")
+ @NotNull
+ private PluginModel codec;
+
+ @JsonProperty("dlq")
+ private PluginModel dlq;
+
+ @JsonProperty("dlq_file")
+ private String dlqFile;
+
+ @JsonProperty("batch_size")
+ private int batchSize = DEFAULT_BATCH_SIZE;
+
+ @JsonProperty("message_deduplication_id")
+ private String messageDeduplicationId;
+
+ private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES;
+
+ @JsonProperty("max_retries")
+ private int maxUploadRetries = DEFAULT_UPLOAD_RETRIES;
+
+ public String getMessageDeduplicationId() {
+ return messageDeduplicationId;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public PluginModel getDlq() {
+ return dlq;
+ }
+
+ public String getDlqFile() {
+ return dlqFile;
+ }
+
+ /**
+ * Aws Authentication configuration Options.
+ * @return aws authentication options.
+ */
+ public AwsAuthenticationOptions getAwsAuthenticationOptions() {
+ return awsAuthenticationOptions;
+ }
+
+ public String getTopicArn() {
+ return topicArn;
+ }
+
+ public String getMessageGroupId() {
+ return messageGroupId;
+ }
+
+ /**
+ * Sink codec configuration Options.
+ * @return codec plugin model.
+ */
+ public PluginModel getCodec() {
+ return codec;
+ }
+
+ /**
+ * SNS client connection retries configuration Options.
+ * @return max connection retries value.
+ */
+ public int getMaxConnectionRetries() {
+ return maxConnectionRetries;
+ }
+
+ /**
+ * SNS object upload retries configuration Options.
+ * @return maximum upload retries value.
+ */
+ public int getMaxUploadRetries() {
+ return maxUploadRetries;
+ }
+
+ public String getDlqStsRoleARN(){
+ return Objects.nonNull(getDlqPluginSetting().get(STS_ROLE_ARN)) ?
+ String.valueOf(getDlqPluginSetting().get(STS_ROLE_ARN)) :
+ awsAuthenticationOptions.getAwsStsRoleArn();
+ }
+
+ public String getDlqStsRegion(){
+ return Objects.nonNull(getDlqPluginSetting().get(STS_REGION)) ?
+ String.valueOf(getDlqPluginSetting().get(STS_REGION)) :
+ awsAuthenticationOptions.getAwsRegion().toString();
+ }
+
+ public Map getDlqPluginSetting(){
+ return dlq != null ? dlq.getPluginSettings() : Map.of();
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java
new file mode 100644
index 0000000000..ed42d3ab4f
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkService.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package org.opensearch.dataprepper.plugins.sink.sns;
+
+import io.micrometer.core.instrument.Counter;
+
+import org.opensearch.dataprepper.expression.ExpressionEvaluationException;
+import org.opensearch.dataprepper.expression.ExpressionEvaluator;
+import org.opensearch.dataprepper.metrics.PluginMetrics;
+import org.opensearch.dataprepper.model.configuration.PluginSetting;
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.event.EventHandle;
+import org.opensearch.dataprepper.model.plugin.PluginFactory;
+import org.opensearch.dataprepper.model.record.Record;
+import org.opensearch.dataprepper.plugins.sink.sns.dlq.DlqPushHandler;
+import org.opensearch.dataprepper.plugins.sink.sns.dlq.SnsSinkFailedDlqData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.sns.SnsClient;
+import software.amazon.awssdk.services.sns.model.PublishBatchRequest;
+import software.amazon.awssdk.services.sns.model.PublishBatchRequestEntry;
+import software.amazon.awssdk.services.sns.model.PublishBatchResponse;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Class responsible for create {@link SnsClient} object, check thresholds,
+ * get new buffer and write records into buffer.
+ */
+public class SnsSinkService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SnsSinkService.class);
+
+ private static final String BUCKET = "bucket";
+
+ private static final String KEY_PATH = "key_path_prefix";
+
+ public static final String NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS = "snsSinkObjectsEventsSucceeded";
+
+ public static final String NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED = "snsSinkObjectsEventsFailed";
+
+ public static final String FIFO = ".fifo";
+
+ private final SnsSinkConfig snsSinkConfig;
+
+ private final Lock reentrantLock;
+
+ private final Collection bufferedEventHandles;
+
+ private final SnsClient snsClient;
+
+ private final DlqPushHandler dlqPushHandler;
+
+ private final PluginSetting pluginSetting;
+
+ private final List processRecordsList;
+
+ private final String topicName;
+
+ private final int maxRetries;
+
+ private final Counter numberOfRecordsSuccessCounter;
+
+ private final Counter numberOfRecordsFailedCounter;
+
+ private final boolean isDocumentIdAnExpression;
+
+ private final ExpressionEvaluator expressionEvaluator;
+
+ /**
+ * @param snsSinkConfig sns sink related configuration.
+ * @param snsClient
+ * @param pluginMetrics metrics.
+ * @param pluginFactory
+ * @param pluginSetting
+ */
+ public SnsSinkService(final SnsSinkConfig snsSinkConfig,
+ final SnsClient snsClient,
+ final PluginMetrics pluginMetrics,
+ final PluginFactory pluginFactory,
+ final PluginSetting pluginSetting,
+ final ExpressionEvaluator expressionEvaluator) {
+ this.snsSinkConfig = snsSinkConfig;
+ this.snsClient = snsClient;
+ this.reentrantLock = new ReentrantLock();
+ this.bufferedEventHandles = new LinkedList<>();
+ this.topicName = snsSinkConfig.getTopicArn();
+ this.maxRetries = snsSinkConfig.getMaxUploadRetries();
+ this.pluginSetting = pluginSetting;
+ this.expressionEvaluator = expressionEvaluator;
+ this.isDocumentIdAnExpression = expressionEvaluator.isValidExpressionStatement(snsSinkConfig.getMessageGroupId());
+ this.processRecordsList = new ArrayList();
+ this.numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS);
+ this.numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED);
+
+ this.dlqPushHandler = new DlqPushHandler(snsSinkConfig.getDlqFile(), pluginFactory,
+ String.valueOf(snsSinkConfig.getDlqPluginSetting().get(BUCKET)),
+ snsSinkConfig.getDlqStsRoleARN()
+ ,snsSinkConfig.getDlqStsRegion(),
+ String.valueOf(snsSinkConfig.getDlqPluginSetting().get(KEY_PATH)));
+ }
+
+ /**
+ * @param records received records and add into buffer.
+ */
+ void output(Collection> records) {
+ reentrantLock.lock();
+ try {
+ for (Record record : records) {
+ final Event event = record.getData();
+ processRecordsList.add(event);
+ if (event.getEventHandle() != null) {
+ bufferedEventHandles.add(event.getEventHandle());
+ }
+ if (snsSinkConfig.getBatchSize() == processRecordsList.size()) {
+ processRecords();
+ processRecordsList.clear();
+ }
+ }
+ // This block will process the last set of events below batch size
+ if(!processRecordsList.isEmpty()) {
+ processRecords();
+ processRecordsList.clear();
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Exception while write event into buffer :", e);
+ }
+ reentrantLock.unlock();
+ }
+
+ private void processRecords() throws InterruptedException {
+ final AtomicReference errorMsgObj = new AtomicReference<>();
+ final boolean isFlushToSNS = retryFlushToSNS(processRecordsList, topicName,errorMsgObj);
+ if (isFlushToSNS) {
+ numberOfRecordsSuccessCounter.increment(processRecordsList.size());
+ } else {
+ numberOfRecordsFailedCounter.increment(processRecordsList.size());
+ dlqPushHandler.perform(pluginSetting,new SnsSinkFailedDlqData(topicName,errorMsgObj.get(),0));
+ }
+ releaseEventHandles(true);
+ }
+
+ private void releaseEventHandles(final boolean result) {
+ for (EventHandle eventHandle : bufferedEventHandles) {
+ eventHandle.release(result);
+ }
+ bufferedEventHandles.clear();
+ }
+
+ /**
+ * perform retry in-case any issue occurred, based on max_upload_retries configuration.
+ *
+ * @param processRecordsList records to be process
+ * @param errorMsgObj pass the error message
+ * @return boolean based on object upload status.
+ * @throws InterruptedException interruption during sleep.
+ */
+ protected boolean retryFlushToSNS(final List processRecordsList,
+ final String topicName,
+ final AtomicReference errorMsgObj) throws InterruptedException {
+ boolean isUploadedToSNS = Boolean.FALSE;
+ int retryCount = maxRetries;
+ do {
+ try {
+ publishToTopic(snsClient, topicName,processRecordsList);
+ isUploadedToSNS = Boolean.TRUE;
+ } catch (AwsServiceException | SdkClientException e) {
+ errorMsgObj.set(e.getMessage());
+ LOG.error("Exception occurred while uploading records to sns. Retry countdown : {} | exception:",
+ retryCount, e);
+ --retryCount;
+ if (retryCount == 0) {
+ return isUploadedToSNS;
+ }
+ Thread.sleep(5000);
+ }
+ } while (!isUploadedToSNS);
+ return isUploadedToSNS;
+ }
+
+ public void publishToTopic(SnsClient snsClient, String topicName,final List processRecordsList) {
+ LOG.debug("Trying to Push Msg to SNS: {}",topicName);
+ try {
+ final PublishBatchRequest request = createPublicRequestByTopic(topicName, processRecordsList);
+ final PublishBatchResponse publishBatchResponse = snsClient.publishBatch(request);
+ LOG.info(" Message sent. Status is " + publishBatchResponse.sdkHttpResponse().statusCode());
+ }catch (Exception e) {
+ LOG.error("Error while pushing messages to topic ",e);
+ throw e;
+ }
+ }
+
+ private PublishBatchRequest createPublicRequestByTopic(final String topicName, final List processRecordsList) {
+ final PublishBatchRequest.Builder requestBatch = PublishBatchRequest.builder().topicArn(topicName);
+ final List batchRequestEntries = new ArrayList();
+ final String defaultRandomGroupId = UUID.randomUUID().toString();
+ for (Event messageEvent : processRecordsList) {
+ final PublishBatchRequestEntry.Builder entry = PublishBatchRequestEntry.builder().id(String.valueOf(new Random().nextInt())).message(messageEvent.toJsonString());
+ if(topicName.endsWith(FIFO))
+ batchRequestEntries.add(entry
+ .messageGroupId(snsSinkConfig.getMessageGroupId() != null ? getMessageGroupId(messageEvent,snsSinkConfig.getMessageGroupId()) : defaultRandomGroupId)
+ .messageDeduplicationId(snsSinkConfig.getMessageDeduplicationId() != null ? getMessageGroupId(messageEvent,snsSinkConfig.getMessageDeduplicationId()) : UUID.randomUUID().toString()).build());
+ else
+ batchRequestEntries.add(entry.build());
+ }
+ requestBatch.publishBatchRequestEntries(batchRequestEntries);
+ return requestBatch.build();
+ }
+
+ private String getMessageGroupId(Event event,final String fieldName) {
+ if (isDocumentIdAnExpression) {
+ try {
+ return (String) expressionEvaluator.evaluate(fieldName, event);
+ } catch (final ExpressionEvaluationException e) {
+ LOG.error("Unable to construct message_group_id from expression {}, the message_group_id will be generated by Sns Sink", snsSinkConfig.getMessageGroupId());
+ }
+ }
+ return event.get(fieldName, String.class);
+ }
+
+}
diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/configuration/AwsAuthenticationOptions.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/configuration/AwsAuthenticationOptions.java
new file mode 100644
index 0000000000..1c244de73b
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/configuration/AwsAuthenticationOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.sink.sns.configuration;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.validation.constraints.Size;
+import software.amazon.awssdk.regions.Region;
+
+import java.util.Map;
+
+public class AwsAuthenticationOptions {
+ @JsonProperty("region")
+ @Size(min = 1, message = "Region cannot be empty string")
+ private String awsRegion;
+
+ @JsonProperty("sts_role_arn")
+ @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
+ private String awsStsRoleArn;
+
+ @JsonProperty("sts_header_overrides")
+ @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
+ private Map awsStsHeaderOverrides;
+
+ public Region getAwsRegion() {
+ return awsRegion != null ? Region.of(awsRegion) : null;
+ }
+
+ public String getAwsStsRoleArn() {
+ return awsStsRoleArn;
+ }
+
+ public Map getAwsStsHeaderOverrides() {
+ return awsStsHeaderOverrides;
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java
new file mode 100644
index 0000000000..a3b6dd4995
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandler.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package org.opensearch.dataprepper.plugins.sink.sns.dlq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import io.micrometer.core.instrument.util.StringUtils;
+import org.opensearch.dataprepper.metrics.MetricNames;
+import org.opensearch.dataprepper.model.configuration.PluginSetting;
+import org.opensearch.dataprepper.model.failures.DlqObject;
+import org.opensearch.dataprepper.model.plugin.PluginFactory;
+import org.opensearch.dataprepper.plugins.dlq.DlqProvider;
+import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.StringJoiner;
+
+import static java.util.UUID.randomUUID;
+
+
+/**
+ * * An Handler class which helps log failed data to AWS S3 bucket or file based on configuration.
+ */
+
+public class DlqPushHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DlqPushHandler.class);
+
+ private static final String BUCKET = "bucket";
+
+ private static final String ROLE_ARN = "sts_role_arn";
+
+ private static final String REGION = "region";
+
+ private static final String S3_PLUGIN_NAME = "s3";
+
+ private static final String KEY_PATH_PREFIX = "key_path_prefix";
+
+ private String dlqFile;
+
+ private String keyPathPrefix;
+
+ private DlqProvider dlqProvider;
+
+ private ObjectWriter objectWriter;
+
+ public DlqPushHandler(final String dlqFile,
+ final PluginFactory pluginFactory,
+ final String bucket,
+ final String stsRoleArn,
+ final String awsRegion,
+ final String dlqPathPrefix) {
+ if(dlqFile != null) {
+ this.dlqFile = dlqFile;
+ this.objectWriter = new ObjectMapper().writer().withDefaultPrettyPrinter();
+ }else{
+ this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,dlqPathPrefix);
+ }
+ }
+
+ public void perform(final PluginSetting pluginSetting,
+ final Object failedData) {
+ if(dlqFile != null)
+ writeToFile(failedData);
+ else
+ pushToS3(pluginSetting, failedData);
+ }
+
+ private void writeToFile(Object failedData) {
+ try(BufferedWriter dlqFileWriter = Files.newBufferedWriter(Paths.get(dlqFile),
+ StandardOpenOption.CREATE, StandardOpenOption.APPEND)) {
+ dlqFileWriter.write(objectWriter.writeValueAsString(failedData)+"\n");
+ } catch (IOException e) {
+ LOG.error("Exception while writing failed data to DLQ file Exception: ",e);
+ }
+ }
+
+ private void pushToS3(PluginSetting pluginSetting, Object failedData) {
+ DlqWriter dlqWriter = getDlqWriter(pluginSetting.getPipelineName());
+ try {
+ String pluginId = randomUUID().toString();
+ DlqObject dlqObject = DlqObject.builder()
+ .withPluginId(pluginId)
+ .withPluginName(pluginSetting.getName())
+ .withPipelineName(pluginSetting.getPipelineName())
+ .withFailedData(failedData)
+ .build();
+ final List dlqObjects = Arrays.asList(dlqObject);
+ dlqWriter.write(dlqObjects, pluginSetting.getPipelineName(), pluginId);
+ LOG.info("wrote {} events to DLQ",dlqObjects.size());
+ } catch (final IOException e) {
+ LOG.error("Exception while writing failed data to DLQ, Exception : ", e);
+ }
+ }
+
+ private DlqWriter getDlqWriter(final String writerPipelineName) {
+ Optional potentialDlq = dlqProvider.getDlqWriter(new StringJoiner(MetricNames.DELIMITER)
+ .add(writerPipelineName).toString());
+ DlqWriter dlqWriter = potentialDlq.isPresent() ? potentialDlq.get() : null;
+ return dlqWriter;
+ }
+
+ private DlqProvider getDlqProvider(final PluginFactory pluginFactory,
+ final String bucket,
+ final String stsRoleArn,
+ final String awsRegion,
+ final String dlqPathPrefix) {
+ final Map props = new HashMap<>();
+ props.put(BUCKET, bucket);
+ props.put(ROLE_ARN, stsRoleArn);
+ props.put(REGION, awsRegion);
+ this.keyPathPrefix = StringUtils.isEmpty(dlqPathPrefix) ? dlqPathPrefix : enforceDefaultDelimiterOnKeyPathPrefix(dlqPathPrefix);
+ props.put(KEY_PATH_PREFIX, dlqPathPrefix);
+ final PluginSetting dlqPluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
+ DlqProvider dlqProvider = pluginFactory.loadPlugin(DlqProvider.class, dlqPluginSetting);
+ return dlqProvider;
+ }
+
+ private String enforceDefaultDelimiterOnKeyPathPrefix(final String keyPathPrefix) {
+ return (keyPathPrefix.charAt(keyPathPrefix.length() - 1) == '/') ? keyPathPrefix : keyPathPrefix.concat("/");
+ }
+}
+
diff --git a/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/SnsSinkFailedDlqData.java b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/SnsSinkFailedDlqData.java
new file mode 100644
index 0000000000..9ec14d336d
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/SnsSinkFailedDlqData.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package org.opensearch.dataprepper.plugins.sink.sns.dlq;
+
+public class SnsSinkFailedDlqData {
+
+ private String topic;
+
+ private String message;
+
+ private int status;
+
+ public SnsSinkFailedDlqData(String topic, String message, int status) {
+ this.topic = topic;
+ this.message = message;
+ this.status = status;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public SnsSinkFailedDlqData setTopic(String topic) {
+ this.topic = topic;
+ return this;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public SnsSinkFailedDlqData setMessage(String message) {
+ this.message = message;
+ return this;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public SnsSinkFailedDlqData setStatus(int status) {
+ this.status = status;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "failedData\n" +
+ "topic \"" + topic + "\"\n" +
+ "message \"" + message + "\"\n" +
+ "status \"" + status +"\n";
+ }
+}
diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactoryTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactoryTest.java
new file mode 100644
index 0000000000..14499bc5fe
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsClientFactoryTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package org.opensearch.dataprepper.plugins.sink.sns;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
+import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
+import org.opensearch.dataprepper.plugins.sink.sns.configuration.AwsAuthenticationOptions;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sns.SnsClient;
+import software.amazon.awssdk.services.sns.SnsClientBuilder;
+
+import java.util.Map;
+import java.util.UUID;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class SnsClientFactoryTest {
+ @Mock
+ private SnsSinkConfig snsSinkConfig;
+ @Mock
+ private AwsCredentialsSupplier awsCredentialsSupplier;
+
+ @Mock
+ private AwsAuthenticationOptions awsAuthenticationOptions;
+
+ @BeforeEach
+ void setUp() {
+ when(snsSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
+ }
+
+ @Test
+ void createSnsClient_with_real_SnsClient() {
+ when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1);
+ final SnsClient s3Client = SnsClientFactory.createSNSClient(snsSinkConfig, awsCredentialsSupplier);
+
+ assertThat(s3Client, notNullValue());
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"})
+ void createSNSClient_provides_correct_inputs(final String regionString) {
+ final Region region = Region.of(regionString);
+ final String stsRoleArn = UUID.randomUUID().toString();
+ final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
+ when(awsAuthenticationOptions.getAwsRegion()).thenReturn(region);
+ when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn);
+ when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides);
+
+ final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class);
+ when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider);
+
+ final SnsClientBuilder snsClientBuilder = mock(SnsClientBuilder.class);
+ when(snsClientBuilder.region(region)).thenReturn(snsClientBuilder);
+ when(snsClientBuilder.credentialsProvider(any())).thenReturn(snsClientBuilder);
+ when(snsClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(snsClientBuilder);
+ try(final MockedStatic s3ClientMockedStatic = mockStatic(SnsClient.class)) {
+ s3ClientMockedStatic.when(SnsClient::builder)
+ .thenReturn(snsClientBuilder);
+ SnsClientFactory.createSNSClient(snsSinkConfig, awsCredentialsSupplier);
+ }
+
+ final ArgumentCaptor credentialsProviderArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsProvider.class);
+ verify(snsClientBuilder).credentialsProvider(credentialsProviderArgumentCaptor.capture());
+
+ final AwsCredentialsProvider actualCredentialsProvider = credentialsProviderArgumentCaptor.getValue();
+
+ assertThat(actualCredentialsProvider, equalTo(expectedCredentialsProvider));
+
+ final ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class);
+ verify(awsCredentialsSupplier).getProvider(optionsArgumentCaptor.capture());
+
+ final AwsCredentialsOptions actualCredentialsOptions = optionsArgumentCaptor.getValue();
+ assertThat(actualCredentialsOptions.getRegion(), equalTo(region));
+ assertThat(actualCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn));
+ assertThat(actualCredentialsOptions.getStsHeaderOverrides(), equalTo(stsHeaderOverrides));
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfigTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfigTest.java
new file mode 100644
index 0000000000..2289e99ccd
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkConfigTest.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package org.opensearch.dataprepper.plugins.sink.sns;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.regions.Region;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+
+class SnsSinkConfigTest {
+ public static final int DEFAULT_MAX_RETRIES = 5;
+ private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));
+
+ @Test
+ void sns_sink_default_id_test(){
+ assertThat(new SnsSinkConfig().getMessageGroupId(),nullValue());
+ }
+
+ @Test
+ void sns_sink_default_topic_test() {
+ assertThat(new SnsSinkConfig().getTopicArn(),nullValue());
+ }
+
+ @Test
+ void sns_sink_default_codec_test(){
+ assertThat(new SnsSinkConfig().getCodec(),nullValue());
+ }
+
+ @Test
+ void sns_sink_default_max_retries_test(){
+ assertThat(new SnsSinkConfig().getMaxUploadRetries(),equalTo(DEFAULT_MAX_RETRIES));
+ }
+
+ @Test
+ void sns_sink_default_max_upload_retries_test(){
+ assertThat(new SnsSinkConfig().getMaxUploadRetries(),equalTo(DEFAULT_MAX_RETRIES));
+ }
+
+ @Test
+ void sns_sink_default_max_connection_retries_test(){
+ assertThat(new SnsSinkConfig().getMaxConnectionRetries(),equalTo(DEFAULT_MAX_RETRIES));
+ }
+
+
+ @Test
+ void sns_sink_pipeline_config_test() throws JsonProcessingException {
+ final String config = " topic_arn: arn:aws:sns:ap-south-1:524239988912:my-topic\n" +
+ " message_group_id: test\n" +
+ " aws:\n" +
+ " region: ap-south-1\n" +
+ " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" +
+ " sts_header_overrides: {\"test\":\"test\"}\n" +
+ " codec:\n" +
+ " ndjson:\n" +
+ " max_retries: 10\n" +
+ " dlq_file: /test/dlq-file.log\n" +
+ " dlq:\n" +
+ " s3:\n" +
+ " bucket: test\n" +
+ " key_path_prefix: test\n" +
+ " region: ap-south-1\n" +
+ " sts_role_arn: test-role-arn\n";
+ final SnsSinkConfig snsSinkConfig = objectMapper.readValue(config, SnsSinkConfig.class);
+ assertThat(snsSinkConfig.getMaxUploadRetries(),equalTo(10));
+ assertThat(snsSinkConfig.getTopicArn(),equalTo("arn:aws:sns:ap-south-1:524239988912:my-topic"));
+ assertThat(snsSinkConfig.getMessageGroupId(),equalTo("test"));
+ assertThat(snsSinkConfig.getAwsAuthenticationOptions().getAwsRegion(),equalTo(Region.AP_SOUTH_1));
+ assertThat(snsSinkConfig.getAwsAuthenticationOptions().getAwsStsRoleArn(),equalTo("arn:aws:iam::524239988912:role/app-test"));
+ assertThat(snsSinkConfig.getAwsAuthenticationOptions().getAwsStsHeaderOverrides().get("test"),equalTo("test"));
+ assertThat(snsSinkConfig.getDlqStsRegion(),equalTo("ap-south-1"));
+ assertThat(snsSinkConfig.getDlqStsRoleARN(),equalTo("test-role-arn"));
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceTest.java
new file mode 100644
index 0000000000..658105ad89
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkServiceTest.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package org.opensearch.dataprepper.plugins.sink.sns;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
+import io.micrometer.core.instrument.Counter;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.opensearch.dataprepper.expression.ExpressionEvaluator;
+import org.opensearch.dataprepper.metrics.PluginMetrics;
+import org.opensearch.dataprepper.model.configuration.PluginSetting;
+import org.opensearch.dataprepper.model.event.Event;
+import org.opensearch.dataprepper.model.event.EventHandle;
+import org.opensearch.dataprepper.model.event.JacksonEvent;
+import org.opensearch.dataprepper.model.plugin.PluginFactory;
+import org.opensearch.dataprepper.model.record.Record;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.http.SdkHttpResponse;
+import software.amazon.awssdk.services.sns.SnsClient;
+import software.amazon.awssdk.services.sns.model.PublishBatchRequest;
+import software.amazon.awssdk.services.sns.model.PublishBatchResponse;
+import software.amazon.awssdk.services.sns.model.SnsException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+import static org.mockito.ArgumentMatchers.any;
+
+public class SnsSinkServiceTest {
+
+ private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));
+
+ private SnsSinkConfig snsSinkConfig;
+
+ private SnsClient snsClient;
+
+ private PluginMetrics pluginMetrics;
+
+ private Counter numberOfRecordsSuccessCounter;
+
+ private Counter numberOfRecordsFailedCounter;
+
+ private static final String config = " topic_arn: arn:aws:sns:ap-south-1:524239988912:my-topic\n" +
+ " message_group_id: test\n" +
+ " aws:\n" +
+ " region: ap-south-1\n" +
+ " sts_role_arn: arn:aws:iam::524239988912:role/app-test\n" +
+ " sts_header_overrides: {\"test\":\"test\"}\n" +
+ " codec:\n" +
+ " ndjson:\n" +
+ " max_retries: 10\n" +
+ " dlq_file: C:\\Work\\dlq.txt\n";
+ private PublishBatchResponse publishBatchResponse;
+
+ private SdkHttpResponse sdkHttpResponse;
+
+ @BeforeEach
+ public void setup() throws JsonProcessingException {
+ this.snsSinkConfig = objectMapper.readValue(config, SnsSinkConfig.class);
+ this.snsClient = mock(SnsClient.class);
+ this.pluginMetrics = mock(PluginMetrics.class);
+ this.numberOfRecordsSuccessCounter = mock(Counter.class);
+ this.numberOfRecordsFailedCounter = mock(Counter.class);
+ this.publishBatchResponse = mock(PublishBatchResponse.class);
+ this.sdkHttpResponse = mock(SdkHttpResponse.class);
+ when(pluginMetrics.counter(SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS)).thenReturn(numberOfRecordsSuccessCounter);
+ when(pluginMetrics.counter(SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED)).thenReturn(numberOfRecordsFailedCounter);
+ when(snsClient.publishBatch(any(PublishBatchRequest.class))).thenReturn(publishBatchResponse);
+ when(publishBatchResponse.sdkHttpResponse()).thenReturn(sdkHttpResponse);
+ when(publishBatchResponse.sdkHttpResponse().statusCode()).thenReturn(new Random().nextInt());
+ when(sdkHttpResponse.statusCode()).thenReturn(200);
+ }
+
+
+ private SnsSinkService createObjectUnderTest(){
+ return new SnsSinkService(snsSinkConfig,
+ snsClient,
+ pluginMetrics, mock(PluginFactory.class),
+ mock(PluginSetting.class),
+ mock(ExpressionEvaluator.class));
+ }
+
+ @Test
+ public void sns_sink_test_with_empty_collection_records(){
+ numberOfRecordsSuccessCounter = mock(Counter.class);
+ SnsSinkService snsSinkService = createObjectUnderTest();
+ snsSinkService.output(List.of());
+ verifyNoInteractions(snsClient);
+ verifyNoInteractions(numberOfRecordsSuccessCounter);
+ verifyNoInteractions(numberOfRecordsFailedCounter);
+ }
+
+ @Test
+ public void sns_sink_test_with_single_collection_record_success_push_to_sns(){
+ when(sdkHttpResponse.statusCode()).thenReturn(200);
+ when(snsClient.publishBatch(any(PublishBatchRequest.class))).thenReturn(publishBatchResponse);
+ SnsSinkService snsSinkService = createObjectUnderTest();
+ final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}"));
+ Collection> records = List.of(eventRecord);
+ final ArgumentCaptor publishBatchRequestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class);
+
+ snsSinkService.output(records);
+
+ verify(snsClient).publishBatch(publishBatchRequestCaptor.capture());
+ final PublishBatchRequest actualRequest = publishBatchRequestCaptor.getValue();
+ assertThat(actualRequest.topicArn(), equalTo("arn:aws:sns:ap-south-1:524239988912:my-topic"));
+ assertThat(actualRequest.publishBatchRequestEntries().size(), equalTo(1));
+ assertThat(actualRequest.publishBatchRequestEntries().get(0).message(),equalTo(eventRecord.getData().toJsonString()));
+ verify(numberOfRecordsSuccessCounter).increment(records.size());
+ }
+
+ @Test
+ public void sns_sink_test_with_multiple_collection_record_success_push_to_sns(){
+ SnsSinkService snsSinkService = createObjectUnderTest();
+ final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}"));
+ Collection> records = new ArrayList<>();
+ for(int recordSize = 0; recordSize <= 11 ; recordSize++) {
+ records.add(eventRecord);
+ }
+ final ArgumentCaptor publishBatchRequestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class);
+
+ snsSinkService.output(records);
+
+ verify(snsClient,times(2)).publishBatch(publishBatchRequestCaptor.capture());
+ final PublishBatchRequest actualRequest = publishBatchRequestCaptor.getValue();
+ assertThat(actualRequest.topicArn(), equalTo("arn:aws:sns:ap-south-1:524239988912:my-topic"));
+ assertThat(actualRequest.publishBatchRequestEntries().size(), equalTo(2));
+ assertThat(actualRequest.publishBatchRequestEntries().get(0).message(),equalTo(eventRecord.getData().toJsonString()));
+ }
+
+ @Test
+ public void sns_sink_test_with_single_collection_record_failed_to_push_to_sns() throws IOException {
+ final Counter snsSinkObjectsEventsFailedCounter = mock(Counter.class);
+ when(pluginMetrics.counter(SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED)).thenReturn(snsSinkObjectsEventsFailedCounter);
+ final SnsException snsException = (SnsException)SnsException.builder().message("internal server error").awsErrorDetails(AwsErrorDetails.builder().errorMessage("internal server error").build()).build();
+ when(snsClient.publishBatch(any(PublishBatchRequest.class))).thenThrow(snsException);
+ SnsSinkService snsSinkService = createObjectUnderTest();
+ final Record eventRecord = new Record<>(JacksonEvent.fromMessage("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}"));
+ Collection> records = List.of(eventRecord);
+ final ArgumentCaptor publishBatchRequestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class);
+ snsSinkService.output(records);
+ verify(snsClient,times(10)).publishBatch(publishBatchRequestCaptor.capture());
+ final PublishBatchRequest actualRequest = publishBatchRequestCaptor.getValue();
+ assertThat(actualRequest.topicArn(), equalTo("arn:aws:sns:ap-south-1:524239988912:my-topic"));
+ assertThat(actualRequest.publishBatchRequestEntries().size(), equalTo(1));
+ assertThat(actualRequest.publishBatchRequestEntries().get(0).message(),equalTo(eventRecord.getData().toJsonString()));
+ verify(snsSinkObjectsEventsFailedCounter).increment(records.size());
+ }
+
+ @Test
+ void sns_sink_service_test_output_with_single_record_ack_release() {
+ final SnsSinkService snsSinkService = createObjectUnderTest();
+ final Event event = mock(Event.class);
+ given(event.toJsonString()).willReturn("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}");
+ given(event.getEventHandle()).willReturn(mock(EventHandle.class));
+ final ArgumentCaptor publishBatchRequestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class);
+
+ snsSinkService.output(List.of(new Record<>(event)));
+
+ verify(snsClient,times(1)).publishBatch(publishBatchRequestCaptor.capture());
+ final PublishBatchRequest actualRequest = publishBatchRequestCaptor.getValue();
+ assertThat(actualRequest.topicArn(), equalTo("arn:aws:sns:ap-south-1:524239988912:my-topic"));
+ assertThat(actualRequest.publishBatchRequestEntries().size(), equalTo(1));
+ assertThat(actualRequest.publishBatchRequestEntries().get(0).message(),equalTo("{\"message\":\"c3f847eb-333a-49c3-a4cd-54715ad1b58a\"}"));
+ verify(numberOfRecordsSuccessCounter).increment(1);
+ }
+}
diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkTest.java
new file mode 100644
index 0000000000..9294c6172c
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/SnsSinkTest.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.plugins.sink.sns;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
+import org.opensearch.dataprepper.expression.ExpressionEvaluator;
+import org.opensearch.dataprepper.model.configuration.PluginModel;
+import org.opensearch.dataprepper.model.configuration.PluginSetting;
+import org.opensearch.dataprepper.model.plugin.PluginFactory;
+import org.opensearch.dataprepper.model.sink.SinkContext;
+import org.opensearch.dataprepper.plugins.sink.sns.configuration.AwsAuthenticationOptions;
+import software.amazon.awssdk.regions.Region;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class SnsSinkTest {
+
+ public static final int MAX_EVENTS = 100;
+ public static final int MAX_RETRIES = 5;
+ public static final String BUCKET_NAME = "dataprepper";
+ public static final String S3_REGION = "us-east-1";
+ public static final String MAXIMUM_SIZE = "1kb";
+ public static final String OBJECT_KEY_NAME_PATTERN = "my-elb-%{yyyy-MM-dd'T'hh-mm-ss}";
+ public static final String CODEC_PLUGIN_NAME = "json";
+ public static final String SINK_PLUGIN_NAME = "sns";
+ public static final String SINK_PIPELINE_NAME = "sns-sink-pipeline";
+ private SnsSinkConfig snsSinkConfig;
+ private SnsSink snsSink;
+ private PluginSetting pluginSetting;
+ private PluginFactory pluginFactory;
+ private AwsCredentialsSupplier awsCredentialsSupplier;
+ private SinkContext sinkContext;
+ private PluginModel pluginModel;
+
+ @BeforeEach
+ void setUp() {
+ snsSinkConfig = mock(SnsSinkConfig.class);
+ sinkContext = mock(SinkContext.class);
+ AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
+ pluginSetting = mock(PluginSetting.class);
+ PluginModel pluginModel = mock(PluginModel.class);
+ pluginFactory = mock(PluginFactory.class);
+ awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
+ Map dlqMap = mock(HashMap.class);
+
+ when(snsSinkConfig.getMessageGroupId()).thenReturn("/message");
+ when(snsSinkConfig.getMessageDeduplicationId()).thenReturn("/message");
+ when(snsSinkConfig.getDlq()).thenReturn(pluginModel);
+ when(pluginModel.getPluginSettings()).thenReturn(dlqMap);
+ when(snsSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
+ when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(S3_REGION));
+ when(snsSinkConfig.getCodec()).thenReturn(pluginModel);
+ when(pluginModel.getPluginName()).thenReturn(CODEC_PLUGIN_NAME);
+ when(pluginSetting.getName()).thenReturn(SINK_PLUGIN_NAME);
+ when(pluginSetting.getPipelineName()).thenReturn(SINK_PIPELINE_NAME);
+ }
+
+ private SnsSink createObjectUnderTest() {
+ return new SnsSink(pluginSetting, snsSinkConfig, pluginFactory, sinkContext,mock(ExpressionEvaluator.class), awsCredentialsSupplier);
+ }
+
+ @Test
+ void test_sns_sink_plugin_isReady_positive() {
+ snsSink = createObjectUnderTest();
+ Assertions.assertNotNull(snsSink);
+ Assertions.assertNotNull(snsSinkConfig);
+ snsSink.doInitialize();
+ assertTrue(snsSink.isReady(), "sns sink is not initialized and not ready to work");
+ }
+
+ @Test
+ void test_sns_sink_plugin_isReady_negative() {
+ snsSink = createObjectUnderTest();
+ Assertions.assertNotNull(snsSink);
+ assertFalse(snsSink.isReady(), "sns sink is initialized and ready to work");
+ }
+}
diff --git a/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java
new file mode 100644
index 0000000000..5af66430d1
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sns/dlq/DlqPushHandlerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package org.opensearch.dataprepper.plugins.sink.sns.dlq;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.opensearch.dataprepper.model.configuration.PluginModel;
+import org.opensearch.dataprepper.model.configuration.PluginSetting;
+import org.opensearch.dataprepper.model.plugin.PluginFactory;
+import org.opensearch.dataprepper.plugins.dlq.DlqProvider;
+import org.opensearch.dataprepper.plugins.dlq.DlqWriter;
+import org.opensearch.dataprepper.plugins.sink.sns.configuration.AwsAuthenticationOptions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class DlqPushHandlerTest {
+
+ private static final String BUCKET = "bucket";
+ private static final String BUCKET_VALUE = "test";
+ private static final String ROLE = "arn:aws:iam::524239988122:role/app-test";
+
+ private static final String REGION = "ap-south-1";
+ private static final String S3_PLUGIN_NAME = "s3";
+ private static final String KEY_PATH_PREFIX = "key_path_prefix";
+
+ private static final String KEY_PATH_PREFIX_VALUE = "dlq/";
+
+ private static final String PIPELINE_NAME = "log-pipeline";
+
+ private static final String DLQ_FILE = "local_dlq_file";
+
+ private PluginModel pluginModel;
+
+ private DlqPushHandler dlqPushHandler;
+ private PluginFactory pluginFactory;
+
+ private AwsAuthenticationOptions awsAuthenticationOptions;
+
+ private DlqProvider dlqProvider;
+
+ private DlqWriter dlqWriter;
+
+
+ @BeforeEach
+ public void setUp(){
+ this.pluginFactory = mock(PluginFactory.class);
+ this.pluginModel = mock(PluginModel.class);
+ this.awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
+ this.dlqProvider = mock(DlqProvider.class);
+ this.dlqWriter = mock(DlqWriter.class);
+ }
+
+ @Test
+ void perform_for_dlq_s3_success() throws IOException {
+ Map props = new HashMap<>();
+ props.put(BUCKET,BUCKET_VALUE);
+ props.put(KEY_PATH_PREFIX,KEY_PATH_PREFIX_VALUE);
+
+ when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class))).thenReturn(dlqProvider);
+
+ when(dlqProvider.getDlqWriter(anyString())).thenReturn(Optional.of(dlqWriter));
+ doNothing().when(dlqWriter).write(anyList(), anyString(), anyString());
+ SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0);
+ dlqPushHandler = new DlqPushHandler(null,pluginFactory, BUCKET_VALUE, ROLE, REGION,KEY_PATH_PREFIX_VALUE);
+
+ PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, props);
+ pluginSetting.setPipelineName(PIPELINE_NAME);
+ dlqPushHandler.perform(pluginSetting, failedDlqData);
+ Assertions.assertNotNull(pluginFactory);
+ verify(dlqWriter).write(anyList(), anyString(), anyString());
+ }
+
+
+ @Test
+ void perform_for_dlq_local_file_success(){
+
+ SnsSinkFailedDlqData failedDlqData = new SnsSinkFailedDlqData("topic","message",0);
+ dlqPushHandler = new DlqPushHandler(DLQ_FILE,pluginFactory,null, ROLE, REGION,null);
+
+ PluginSetting pluginSetting = new PluginSetting(S3_PLUGIN_NAME, null);
+ pluginSetting.setPipelineName(PIPELINE_NAME);
+ dlqPushHandler.perform(pluginSetting, failedDlqData);
+ }
+}
diff --git a/data-prepper-plugins/sns-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/data-prepper-plugins/sns-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000000..23c33feb6d
--- /dev/null
+++ b/data-prepper-plugins/sns-sink/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1,3 @@
+# To enable mocking of final classes with vanilla Mockito
+# https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods
+mock-maker-inline
diff --git a/settings.gradle b/settings.gradle
index 02125825fd..ba5f619261 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -128,4 +128,5 @@ include 'data-prepper-plugins:buffer-common'
include 'data-prepper-plugins:sqs-source'
include 'data-prepper-plugins:cloudwatch-logs'
include 'data-prepper-plugins:http-sink'
+include 'data-prepper-plugins:sns-sink'
include 'data-prepper-plugins:prometheus-sink'