Skip to content

Commit

Permalink
Sns Sink Plugin with junit test cases (#2995)
Browse files Browse the repository at this point in the history
Sns Sink Plugin with junit test cases

---------

Signed-off-by: Uday Kumar Chintala <[email protected]>
Signed-off-by: Uday Chintala <[email protected]>
  • Loading branch information
udaych20 authored Aug 4, 2023
1 parent fbfb82e commit bdbb174
Show file tree
Hide file tree
Showing 17 changed files with 1,673 additions and 0 deletions.
79 changes: 79 additions & 0 deletions data-prepper-plugins/sns-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# SNS Sink

This is the Data Prepper SNS Sink plugin that sends records to an SNS Topic.

## Usages

The SNS sink should be configured as part of Data Prepper pipeline yaml file.

## Configuration Options

```
pipeline:
...
sink:
- sns:
topic_arn: arn:aws:sns:ap-south-1:524239988922:my-topic
message_group_id: /type
message_deduplication_id: /id
batch_size: 10
aws:
region: ap-south-1
sts_role_arn: arn:aws:iam::524239988922:role/app-test
dlq:
s3:
bucket: test-bucket
key_path_prefix: dlq/
codec:
ndjson:
max_retries: 5
```

## SNS Pipeline Configuration

- `topic_arn` (Optional) : The SNS Topic Arn of the Topic to push events.

- `batch_size` (Optional) : An integer value indicates the maximum number of events required to ingest into sns topic. Defaults to 10.

- `message_group_id` (optional): A string of message group identifier which is used as `message_group_id` for the message group when it is stored in the sns topic. Default to Auto generated Random key.

- `message_deduplication_id` (Optional) : A string of message deduplication identifier which is used as `message_deduplication_id` for the message deduplication when it is stored in the sns topic. Default to Auto generated Random key.

- `dlq_file`(optional): A String of absolute file path for DLQ failed output records. Defaults to null.
If not provided, failed records will be written into the default data-prepper log file (`logs/Data-Prepper.log`). If the `dlq` option is present along with this, an error is thrown.

- `dlq` (optional): DLQ configurations. See [DLQ](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/README.md) for details. If the `dlq_file` option is present along with this, an error is thrown.

- `codec` : This plugin is integrated with sink codec

- `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details. SigV4 is enabled by default when this option is used. If this option is present, `aws_` options are not expected to be present. If any of `aws_` options are present along with this, error is thrown.

- `max_retries` (Optional) : An integer value indicates the maximum number of times that single request should be retired in-order to ingest data to amazon SNS and S3. Defaults to `5`.

### <a name="aws_configuration">AWS Configuration</a>

* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
* `sts_role_arn` (Optional) : The AWS STS role to assume for requests to SNS and S3. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).


### Counters

* `snsSinkObjectsEventsSucceeded` - The number of events that the SNS sink has successfully sent to Topic.
* `snsSinkObjectsEventsFailed` - The number of events that the SNS sink has successfully sent to Topic.

## Developer Guide

This plugin is compatible with Java 11. See below

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)

The integration tests for this plugin do not run as part of the Data Prepper build.

The following command runs the integration tests:

Note: Subscribe sns topic to sqs queues to run the integration tests.

```
./gradlew :data-prepper-plugins:sns-sink:integrationTest -Dtests.sns.sink.region=<<aws-region>> -Dtests.sns.sink.sts.role.arn=<<aws-sts-role-arn>> -Dtests.sns.sink.standard.topic=<<standard-topic-arn>> -Dtests.sns.sink.fifo.topic=<<fifo-topic-arn>> -Dtests.sns.sink.dlq.file.path=<<dlq-file-path>> -Dtests.sns.sink.standard.sqs.queue.url=<<sqs-standard-queue>> -Dtests.sns.sink.fifo.sqs.queue.url=<<sqs-fifo-queue>>
```
64 changes: 64 additions & 0 deletions data-prepper-plugins/sns-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

dependencies {
implementation project(':data-prepper-api')
implementation project(path: ':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'software.amazon.awssdk:sns'
implementation 'software.amazon.awssdk:sts'
testImplementation 'software.amazon.awssdk:sqs'
implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.8.21'
implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.8.21'
implementation 'org.apache.commons:commons-lang3:3.12.0'
implementation project(':data-prepper-plugins:failures-common')
testImplementation project(':data-prepper-test-common')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
}

test {
useJUnitPlatform()
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.sns.sink.region', System.getProperty('tests.sns.sink.region')
systemProperty 'tests.sns.sink.dlq.file.path', System.getProperty('tests.sns.sink.dlq.file.path')
systemProperty 'tests.sns.sink.sts.role.arn', System.getProperty('tests.sns.sink.sts.role.arn')
systemProperty 'tests.sns.sink.standard.topic', System.getProperty('tests.sns.sink.standard.topic')
systemProperty 'tests.sns.sink.fifo.topic', System.getProperty('tests.sns.sink.fifo.topic')
systemProperty 'tests.sns.sink.standard.sqs.queue.url', System.getProperty('tests.sns.sink.standard.sqs.queue.url')
systemProperty 'tests.sns.sink.fifo.sqs.queue.url', System.getProperty('tests.sns.sink.fifo.sqs.queue.url')

filter {
includeTestsMatching '*IT'
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.sns;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.sink.sns.dlq.DlqPushHandler;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.sink.sns.SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED;
import static org.opensearch.dataprepper.plugins.sink.sns.SnsSinkService.NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS;

public class SnsSinkServiceIT {

private SnsClient snsClient;

private PluginMetrics pluginMetrics;

private PluginFactory pluginFactory;

private PluginSetting pluginSetting;

private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));


private static final String SNS_SINK_CONFIG_YAML = " topic_arn: {0}\n" +
" batch_size: 10\n" +
" aws:\n" +
" region: {1}\n" +
" sts_role_arn: {2}\n" +
" dlq_file: {3}\n" +
" codec:\n" +
" ndjson:\n" +
" max_retries: 5";

private String standardTopic;

private String fifoTopic;

private String region;

private String stsRoleArn;

private String dlqFilePath;

private Counter snsSinkObjectsEventsSucceeded;

private Counter numberOfRecordsFailedCounter;

private String standardSqsQueue;

private SqsClient sqsClient;

private String fifoSqsQueue;

private DlqPushHandler dlqPushHandler;

@BeforeEach
public void setup() {
this.standardTopic = System.getProperty("tests.sns.sink.standard.topic");
this.fifoTopic = System.getProperty("tests.sns.sink.fifo.topic");
this.region = System.getProperty("tests.sns.sink.region");
this.stsRoleArn = System.getProperty("tests.sns.sink.sts.role.arn");
this.dlqFilePath = System.getProperty("tests.sns.sink.dlq.file.path");
this.standardSqsQueue = System.getProperty("tests.sns.sink.standard.sqs.queue.url");
this.fifoSqsQueue = System.getProperty("tests.sns.sink.fifo.sqs.queue.url");

this.dlqPushHandler = mock(DlqPushHandler.class);
this.pluginMetrics = mock(PluginMetrics.class);
this.pluginFactory = mock(PluginFactory.class);
this.pluginSetting = mock(PluginSetting.class);
this.snsSinkObjectsEventsSucceeded = mock(Counter.class);
this.numberOfRecordsFailedCounter = mock(Counter.class);
this.snsClient = SnsClient.builder()
.region(Region.of(region))
.build();
this.sqsClient = SqsClient.builder().region(Region.of(region)).build();
when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_SUCCESS)).thenReturn(snsSinkObjectsEventsSucceeded);
when(pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_SNS_FAILED)).thenReturn(numberOfRecordsFailedCounter);
}

private Collection<Record<Event>> setEventQueue(final int records) {
final Collection<Record<Event>> jsonObjects = new LinkedList<>();
for (int i = 0; i < records; i++)
jsonObjects.add(createRecord());
return jsonObjects;
}

private static Record<Event> createRecord() {
final JacksonEvent event = JacksonLog.builder().withData("[{\"name\":\"test\"}]").build();
event.setEventHandle(mock(EventHandle.class));
return new Record<>(event);
}

public SnsSinkService createObjectUnderTest(final String topicName) throws JsonProcessingException {
String[] values = { topicName,region,stsRoleArn,dlqFilePath };
final String configYaml = MessageFormat.format(SNS_SINK_CONFIG_YAML, values);
final SnsSinkConfig snsSinkConfig = objectMapper.readValue(configYaml, SnsSinkConfig.class);
return new SnsSinkService(snsSinkConfig,snsClient,pluginMetrics,pluginFactory,pluginSetting,mock(ExpressionEvaluator.class));
}

@ParameterizedTest
@ValueSource(ints = {5,9,10})
public void sns_sink_service_test_with_standard_queue_with_multiple_records(final int recordCount) throws JsonProcessingException, InterruptedException {
final SnsSinkService objectUnderTest = createObjectUnderTest(standardTopic);
final Collection<Record<Event>> records = setEventQueue(recordCount);
final List<String> inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList());
objectUnderTest.output(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
List<String> topicData = readMessagesFromSNSTopicQueue(inputRecords,standardSqsQueue);
assertThat(inputRecords, is(topicData));
assertThat(inputRecords.size(), equalTo(topicData.size()));
verify(snsSinkObjectsEventsSucceeded).increment(recordCount);
}

@Test
public void sns_sink_service_test_with_standard_queue_with_multiple_batch() throws JsonProcessingException, InterruptedException {
final SnsSinkService objectUnderTest = createObjectUnderTest(standardTopic);
final Collection<Record<Event>> records = setEventQueue(11);
final List<String> inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList());
objectUnderTest.output(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
List<String> topicData = readMessagesFromSNSTopicQueue(inputRecords,standardSqsQueue);
assertThat(inputRecords, is(topicData));
assertThat(inputRecords.size(), equalTo(topicData.size()));
verify(snsSinkObjectsEventsSucceeded,times(2)).increment(anyDouble());
}

private List<String> readMessagesFromSNSTopicQueue(List<String> inputRecords, final String sqsQueue) {
final List<Message> messages = new ArrayList<>();
long startTime = System.currentTimeMillis();
long endTime = startTime + 60000;
do {
messages.addAll(sqsClient.receiveMessage(builder -> builder.queueUrl(sqsQueue)).messages());
if(messages.size() >= inputRecords.size()){
break;
}
} while (System.currentTimeMillis() < endTime);

List<String> topicData = messages.stream().map(Message::body).map(obj-> {
try {
Map<String,String> map = objectMapper.readValue(obj,Map.class);
return map.get("Message");
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
return topicData;
}

private void deleteSqsMessages(String sqsQueue, List<Message> messages) throws InterruptedException {
for (Message message : messages) {
sqsClient.deleteMessage(builder -> builder.queueUrl(sqsQueue).receiptHandle(message.receiptHandle()));
Thread.sleep(Duration.ofSeconds(2).toMillis());
}
}

@ParameterizedTest
@ValueSource(ints = {1, 5, 10})
public void sns_sink_service_test_with_fifo_queue_with_multiple_records(final int recordCount) throws JsonProcessingException, InterruptedException {
final SnsSinkService objectUnderTest = createObjectUnderTest(fifoTopic);
final Collection<Record<Event>> records = setEventQueue(recordCount);
final List<String> inputRecords = records.stream().map(Record::getData).map(Event::toJsonString).collect(Collectors.toList());
objectUnderTest.output(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
List<String> topicData = readMessagesFromSNSTopicQueue(inputRecords,fifoSqsQueue);
assertThat(inputRecords, is(topicData));
assertThat(inputRecords.size(), equalTo(topicData.size()));
verify(snsSinkObjectsEventsSucceeded).increment(recordCount);
}



@ParameterizedTest
@ValueSource(ints = {1,5,9})
public void sns_sink_service_test_fail_to_push(final int recordCount) throws IOException, InterruptedException {
final ObjectMapper mapper = new ObjectMapper();
final String topic = "test";
Files.deleteIfExists(Path.of(dlqFilePath));
final SnsSinkService objectUnderTest = createObjectUnderTest(topic);
final Collection<Record<Event>> records = setEventQueue(recordCount);
objectUnderTest.output(records);
Thread.sleep(Duration.ofSeconds(10).toMillis());
verify(numberOfRecordsFailedCounter).increment(recordCount);
final Map<String,String> map = mapper.readValue(new String(Files.readAllBytes(Path.of(dlqFilePath))).replaceAll("(\\r|\\n)", ""), Map.class);
assertThat(map.get("topic"),equalTo(topic));
}
}
Loading

0 comments on commit bdbb174

Please sign in to comment.