diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySinkAccessor.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySinkAccessor.java index 77d659f816..21f359f361 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySinkAccessor.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySinkAccessor.java @@ -34,7 +34,7 @@ public class InMemorySinkAccessor { public List> get(final String testingKey) { lock.lock(); try { - return recordsMap.getOrDefault(testingKey, Collections.emptyList()); + return new ArrayList<>(recordsMap.getOrDefault(testingKey, Collections.emptyList())); } finally { lock.unlock(); } @@ -49,7 +49,7 @@ public List> get(final String testingKey) { public List> getAndClear(final String testingKey) { lock.lock(); try { - final List> records = recordsMap.getOrDefault(testingKey, Collections.emptyList()); + final List> records = get(testingKey); recordsMap.remove(testingKey); diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/ParseTreeEvaluatorListenerTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/ParseTreeEvaluatorListenerTest.java index 9b517d8ae0..f0e1fdb9f4 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/ParseTreeEvaluatorListenerTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/ParseTreeEvaluatorListenerTest.java @@ -193,7 +193,7 @@ void testSimpleRelationalOperatorExpressionWithInValidLiteralType() { @Test void testSimpleRelationalOperatorExpressionWithJsonPointerTypeValidValue() { final String testKey = "testKey"; - final int testValue = random.nextInt(1000); + final int testValue = random.nextInt(1000) + 2; final Map data = Map.of(testKey, testValue); final Event testEvent = createTestEvent(data); final String greaterThanStatement = String.format(" /%s > %d", testKey, testValue - 1); @@ -207,7 +207,7 @@ void testSimpleRelationalOperatorExpressionWithJsonPointerTypeValidValue() { } @Test - void testSimpleRelationalOperatorExpressionWithJsonPointerTypeInValidValue() { + void testSimpleRelationalOperatorExpressionWithJsonPointerTypeInValidValueWithPositiveInteger() { final String testKey = "testKey"; final boolean testValue = true; final Map data = Map.of(testKey, testValue); diff --git a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java index 57872c7ecd..0eb33f979f 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessor.java @@ -5,7 +5,7 @@ package org.opensearch.dataprepper.plugins.processor.anomalydetector; -import io.micrometer.core.instrument.Counter; + import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -23,19 +23,20 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; @DataPrepperPlugin(name = "anomaly_detector", pluginType = Processor.class, pluginConfigurationType = AnomalyDetectorProcessorConfig.class) public class AnomalyDetectorProcessor extends AbstractProcessor, Record> { public static final String DEVIATION_KEY = "deviation_from_expected"; public static final String GRADE_KEY = "grade"; - static final String NUMBER_RCF_INSTANCES = "numberRCFInstances"; + static final String NUMBER_RCF_INSTANCES = "RCFInstances"; private final Boolean verbose; private final IdentificationKeysHasher identificationKeysHasher; - private final Counter numberRCFInstances; private final List keys; private final PluginFactory pluginFactory; private final HashMap forestMap; + private final AtomicInteger cardinality; private final AnomalyDetectorProcessorConfig anomalyDetectorProcessorConfig; @DataPrepperPluginConstructor @@ -44,9 +45,9 @@ public AnomalyDetectorProcessor(final AnomalyDetectorProcessorConfig anomalyDete this.identificationKeysHasher = new IdentificationKeysHasher(anomalyDetectorProcessorConfig.getIdentificationKeys()); this.anomalyDetectorProcessorConfig = anomalyDetectorProcessorConfig; this.pluginFactory = pluginFactory; - this.numberRCFInstances = pluginMetrics.counter(NUMBER_RCF_INSTANCES); this.keys = anomalyDetectorProcessorConfig.getKeys(); this.verbose = anomalyDetectorProcessorConfig.getVerbose(); + this.cardinality = pluginMetrics.gauge(NUMBER_RCF_INSTANCES, new AtomicInteger()); forestMap = new HashMap<>(); } @@ -71,10 +72,10 @@ public Collection> doExecute(Collection> records) { forest = loadAnomalyDetectorMode(pluginFactory); forest.initialize(keys, verbose); forestMap.put(identificationKeysMap.hashCode(), forest); - this.numberRCFInstances.increment(); } recordsOut.addAll(forestMap.get(identificationKeysMap.hashCode()).handleEvents(List.of(record))); } + cardinality.set(forestMap.size()); return recordsOut; } diff --git a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java index c92fdb9000..7e796e660a 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorConfig.java @@ -10,6 +10,7 @@ import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; +import java.util.Collections; import java.util.List; public class AnomalyDetectorProcessorConfig { @@ -22,7 +23,7 @@ public class AnomalyDetectorProcessorConfig { private List keys; @JsonProperty("identification_keys") - private List identificationKeys; + private List identificationKeys = Collections.emptyList(); @JsonProperty("verbose") private Boolean verbose = false; diff --git a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java index 65c39518d2..302a692dd7 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/AnomalyDetectorProcessorTests.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -48,7 +49,7 @@ public class AnomalyDetectorProcessorTests { @Mock private PluginMetrics pluginMetrics; @Mock - private Counter numberRCFInstances; + private AtomicInteger numberRCFInstances; @Mock private Counter recordsIn; @@ -81,7 +82,7 @@ void setUp() { when(pluginFactory.loadPlugin(eq(AnomalyDetectorMode.class), any(PluginSetting.class))) .thenAnswer(invocation -> new RandomCutForestMode(randomCutForestModeConfig)); - when(pluginMetrics.counter(AnomalyDetectorProcessor.NUMBER_RCF_INSTANCES)).thenReturn(numberRCFInstances); + when(pluginMetrics.gauge(eq(AnomalyDetectorProcessor.NUMBER_RCF_INSTANCES), any())).thenReturn(numberRCFInstances); when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(recordsIn); when(pluginMetrics.counter(MetricNames.RECORDS_OUT)).thenReturn(recordsOut); when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(timeElapsed); diff --git a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestModeTests.java b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestModeTests.java index c52788433f..818ab14d7b 100644 --- a/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestModeTests.java +++ b/data-prepper-plugins/anomaly-detector-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/anomalydetector/modes/RandomCutForestModeTests.java @@ -21,6 +21,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.greaterThan; import org.junit.jupiter.api.Test; @@ -28,6 +29,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Mockito.mock; import org.mockito.Mock; import static org.mockito.Mockito.when; @@ -205,6 +208,7 @@ void testRandomCutForestModeVerboseFalse() { } final List> anomalyRecords = randomCutForestMode.handleEvents(recordsWithAnomaly).stream().collect(toList());; - assertThat(anomalyRecords.size(), equalTo(1)); + // Due to inherent variance in the RCF algorithm, 1-3 anomalies will be detected after the level shift. + assertThat(anomalyRecords.size(), both(greaterThanOrEqualTo(1)).and(lessThanOrEqualTo(3))); } } diff --git a/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java b/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java index 8eb8e401fd..f21b109ab5 100644 --- a/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java +++ b/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfig.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.plugins.dlq.s3; import com.fasterxml.jackson.annotation.JsonProperty; -import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.Size; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -33,9 +33,11 @@ public class S3DlqWriterConfig { private static final String DEFAULT_AWS_REGION = "us-east-1"; private static final String AWS_IAM_ROLE = "role"; private static final String AWS_IAM = "iam"; + private static final String S3_PREFIX = "s3://"; + @JsonProperty("bucket") - @NotNull - @Size(min = 3, max = 63, message = "bucket lengthy should be between 3 and 63 characters") + @NotEmpty + @Size(min = 3, max = 500, message = "bucket length should be at least 3 characters") private String bucket; @JsonProperty("key_path_prefix") @@ -55,6 +57,9 @@ public class S3DlqWriterConfig { private String stsExternalId; public String getBucket() { + if (bucket.startsWith(S3_PREFIX)) { + return bucket.substring(S3_PREFIX.length()); + } return bucket; } diff --git a/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java b/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java index d1f61ae14e..0629256277 100644 --- a/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java +++ b/data-prepper-plugins/failures-common/src/test/java/org/opensearch/dataprepper/plugins/dlq/s3/S3DlqWriterConfigTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.ValueSource; import software.amazon.awssdk.regions.Region; @@ -42,6 +43,14 @@ public void getS3ClientWithInvalidStsRoleArnThrowException(final String stsRoleA assertThrows(IllegalArgumentException.class, config::getS3Client); } + @ParameterizedTest + @CsvSource({"bucket-name, bucket-name", "s3://bucket-name, bucket-name"}) + public void getS3BucketNameShouldReturnCorrectBucketName(final String bucketName, final String expectedBucketName) throws NoSuchFieldException, IllegalAccessException { + final S3DlqWriterConfig config = new S3DlqWriterConfig(); + reflectivelySetField(config, "bucket", bucketName); + assertThat(config.getBucket(), is(equalTo(expectedBucketName))); + } + @ParameterizedTest @NullSource @ValueSource(strings = {"", "arn:aws:iam::123456789012:role/some-role"}) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java index 87f98c1956..d6f2406794 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConfig.java @@ -9,10 +9,10 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.Size; + import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import java.time.Duration; - /** * * A helper class that helps to read consumer configuration values from * pipelines.yaml @@ -22,8 +22,7 @@ public class TopicConfig { static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5); static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45); static final int DEFAULT_MAX_RETRY_ATTEMPT = Integer.MAX_VALUE; - static final String DEFAULT_AUTO_OFFSET_RESET = "latest"; - + static final String DEFAULT_AUTO_OFFSET_RESET = "earliest"; static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5); static final Duration DEFAULT_MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4); static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(5); @@ -70,7 +69,7 @@ public class TopicConfig { private Duration maxRetryDelay = DEFAULT_MAX_RETRY_DELAY; @JsonProperty("serde_format") - private MessageFormat serdeFormat = MessageFormat.PLAINTEXT; + private MessageFormat serdeFormat= MessageFormat.PLAINTEXT; @JsonProperty("auto_commit") private Boolean autoCommit = DEFAULT_AUTO_COMMIT; @@ -140,7 +139,7 @@ public class TopicConfig { @JsonProperty("heart_beat_interval") @Valid @Size(min = 1) - private Duration heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL_DURATION; + private Duration heartBeatInterval= DEFAULT_HEART_BEAT_INTERVAL_DURATION; @JsonProperty("is_create") private Boolean isCreate=Boolean.FALSE; @@ -303,7 +302,6 @@ public void setName(String name) { this.name = name; } - public KafkaKeyMode getKafkaKeyMode() { return kafkaKeyMode; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index d083b4e98b..805cfb6497 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -162,7 +162,7 @@ public void consumeRecords() throws Exception { Thread.sleep(10000); } catch (RecordDeserializationException e) { LOG.warn("Deserialization error - topic {} partition {} offset {}, seeking past the error record", - e.topicPartition().topic(), e.topicPartition().partition(), e.offset()); + e.topicPartition().topic(), e.topicPartition().partition(), e.offset(), e); topicMetrics.getNumberOfDeserializationErrors().increment(); consumer.seek(e.topicPartition(), e.offset()+1); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java index 17a177fd0f..8b4c63b96f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducer.java @@ -94,9 +94,9 @@ public void produceRecords(final Record record) { Event event = getEvent(record); final String key = event.formatString(kafkaSinkConfig.getPartitionKey(), expressionEvaluator); try { - if (MessageFormat.JSON.toString().equalsIgnoreCase(serdeFormat)) { + if (serdeFormat == MessageFormat.JSON.toString()) { publishJsonMessage(record, key); - } else if (MessageFormat.AVRO.toString().equalsIgnoreCase(serdeFormat)) { + } else if (serdeFormat == MessageFormat.AVRO.toString()) { publishAvroMessage(record, key); } else { publishPlaintextMessage(record, key); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 92cf2527f8..a388d3ee6e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -48,8 +48,6 @@ import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; -import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; -import com.amazonaws.services.schemaregistry.utils.AvroRecordType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,6 +98,8 @@ public class KafkaSource implements Source> { private static final String SCHEMA_TYPE = "schemaType"; private final AcknowledgementSetManager acknowledgementSetManager; private static CachedSchemaRegistryClient schemaRegistryClient; + private GlueSchemaRegistryKafkaDeserializer glueDeserializer; + private StringDeserializer stringDeserializer; @DataPrepperPluginConstructor public KafkaSource(final KafkaSourceConfig sourceConfig, @@ -110,6 +110,7 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; this.pipelineName = pipelineDescription.getPipelineName(); + this.stringDeserializer = new StringDeserializer(); shutdownInProgress = new AtomicBoolean(false); } @@ -135,7 +136,12 @@ public void start(Buffer> buffer) { break; case PLAINTEXT: default: - kafkaConsumer = new KafkaConsumer(consumerProperties); + glueDeserializer = KafkaSourceSecurityConfigurer.getGlueSerializer(sourceConfig); + if (Objects.nonNull(glueDeserializer)) { + kafkaConsumer = new KafkaConsumer(consumerProperties, stringDeserializer, glueDeserializer); + } else { + kafkaConsumer = new KafkaConsumer(consumerProperties); + } break; } consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics); @@ -296,7 +302,6 @@ private void setSchemaRegistryProperties(Properties properties, TopicConfig topi } if (schemaConfig.getType() == SchemaRegistryType.AWS_GLUE) { - setPropertiesForGlueSchemaRegistry(properties); return; } @@ -309,13 +314,6 @@ private void setSchemaRegistryProperties(Properties properties, TopicConfig topi } } - private void setPropertiesForGlueSchemaRegistry(Properties properties) { - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); - properties.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion()); - properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); - } - private void setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(Properties properties, final TopicConfig topicConfig) { MessageFormat dataFormat = topicConfig.getSerdeFormat(); schemaType = dataFormat.toString(); diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java index 4a6aaf30da..77fcd6e2fc 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaSourceSecurityConfigurer.java @@ -8,9 +8,11 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType; import org.apache.kafka.clients.consumer.ConsumerConfig; import software.amazon.awssdk.services.kafka.KafkaClient; @@ -25,9 +27,17 @@ import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.regions.Region; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; +import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer; +import software.amazon.awssdk.services.glue.model.Compatibility; + import org.slf4j.Logger; import java.util.Base64; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.UUID; @@ -73,6 +83,9 @@ public class KafkaSourceSecurityConfigurer { private static final int MAX_KAFKA_CLIENT_RETRIES = 360; // for one hour every 10 seconds + private static AwsCredentialsProvider credentialsProvider; + private static GlueSchemaRegistryKafkaDeserializer glueDeserializer; + /*public static void setSaslPlainTextProperties(final KafkaSourceConfig kafkaSourConfig, final Properties properties) { @@ -173,7 +186,6 @@ public static void setAwsIamAuthProperties(Properties properties, final AwsIamAu } public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuthConfig, final AwsConfig awsConfig, final Logger LOG) { - AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); if (awsIamAuthConfig == AwsIamAuthConfig.ROLE) { String sessionName = "data-prepper-kafka-session" + UUID.randomUUID(); StsClient stsClient = StsClient.builder() @@ -216,10 +228,11 @@ public static String getBootStrapServersForMsk(final AwsIamAuthConfig awsIamAuth try { Thread.sleep(10000); } catch (InterruptedException exp) {} + retryable = true; } catch (Exception e) { throw new RuntimeException("Failed to get bootstrap server information from MSK.", e); } - } while (numRetries++ < MAX_KAFKA_CLIENT_RETRIES); + } while (retryable && numRetries++ < MAX_KAFKA_CLIENT_RETRIES); if (Objects.isNull(result)) { throw new RuntimeException("Failed to get bootstrap server information from MSK after trying multiple times with retryable exceptions."); } @@ -240,6 +253,8 @@ public static void setAuthProperties(Properties properties, final KafkaSourceCon final KafkaSourceConfig.EncryptionConfig encryptionConfig = sourceConfig.getEncryptionConfig(); final EncryptionType encryptionType = encryptionConfig.getType(); + credentialsProvider = DefaultCredentialsProvider.create(); + String bootstrapServers = sourceConfig.getBootStrapServers(); AwsIamAuthConfig awsIamAuthConfig = null; if (Objects.nonNull(authConfig)) { @@ -279,5 +294,21 @@ public static void setAuthProperties(Properties properties, final KafkaSourceCon } properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } + + public static GlueSchemaRegistryKafkaDeserializer getGlueSerializer(final KafkaSourceConfig sourceConfig) { + SchemaConfig schemaConfig = sourceConfig.getSchemaConfig(); + if (Objects.isNull(schemaConfig) || schemaConfig.getType() != SchemaRegistryType.AWS_GLUE) { + return null; + } + Map configs = new HashMap(); + configs.put(AWSSchemaRegistryConstants.AWS_REGION, sourceConfig.getAwsConfig().getRegion()); + configs.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); + configs.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, "86400000"); + configs.put(AWSSchemaRegistryConstants.CACHE_SIZE, "10"); + configs.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL); + glueDeserializer = new GlueSchemaRegistryKafkaDeserializer(credentialsProvider, configs); + return glueDeserializer; + } + } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java index be868b3e6f..05843ed1a9 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java @@ -13,6 +13,8 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType; +import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.junit.jupiter.api.Assertions; @@ -25,8 +27,10 @@ import org.mockito.quality.Strictness; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Objects; import java.time.Duration; @ExtendWith(MockitoExtension.class) @@ -37,6 +41,9 @@ class KafkaSourceTest { @Mock private KafkaSourceConfig sourceConfig; + @Mock + private KafkaSourceConfig.EncryptionConfig encryptionConfig; + @Mock private PluginMetrics pluginMetrics; @@ -64,6 +71,7 @@ public KafkaSource createObjectUnderTest() { @BeforeEach void setUp() throws Exception { sourceConfig = mock(KafkaSourceConfig.class); + encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class); pipelineDescription = mock(PipelineDescription.class); pluginMetrics = mock(PluginMetrics.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); @@ -79,12 +87,21 @@ void setUp() throws Exception { when(topic2.getConsumerMaxPollRecords()).thenReturn(1); when(topic1.getGroupId()).thenReturn(TEST_GROUP_ID); when(topic2.getGroupId()).thenReturn(TEST_GROUP_ID); + when(topic1.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); + when(topic2.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5)); + when(topic1.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5)); + when(topic2.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5)); when(topic1.getAutoCommit()).thenReturn(false); + when(topic1.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); + when(topic2.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); when(topic2.getAutoCommit()).thenReturn(false); when(topic1.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); when(topic2.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(10)); when(sourceConfig.getBootStrapServers()).thenReturn("http://localhost:1234"); when(sourceConfig.getTopics()).thenReturn(Arrays.asList(topic1, topic2)); + when(sourceConfig.getSchemaConfig()).thenReturn(null); + when(sourceConfig.getEncryptionConfig()).thenReturn(encryptionConfig); + when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE); } /* @Test @@ -108,4 +125,14 @@ void test_kafkaSource_start_execution_exception() { kafkaSource = createObjectUnderTest(); Assertions.assertThrows(Exception.class, () -> kafkaSource.start(buffer)); } + + @Test + void test_kafkaSource_basicFunctionality() { + when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + kafkaSource = createObjectUnderTest(); + assertTrue(Objects.nonNull(kafkaSource)); + kafkaSource.start(buffer); + assertTrue(Objects.nonNull(kafkaSource.getConsumer())); + } } diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java index eee926a698..5543e7c21e 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java @@ -5,11 +5,6 @@ package org.opensearch.dataprepper.plugins.source.otellogs; -import com.linecorp.armeria.common.HttpHeaderNames; -import com.linecorp.armeria.common.grpc.GrpcStatusFunction; -import org.opensearch.dataprepper.plugins.codec.CompressionOption; -import org.opensearch.dataprepper.plugins.health.HealthGrpcService; -import org.opensearch.dataprepper.plugins.source.otellogs.certificate.CertificateProviderFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; @@ -18,11 +13,13 @@ import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpMethod; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.grpc.GrpcStatusFunction; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.grpc.GrpcService; @@ -69,6 +66,9 @@ import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.health.HealthGrpcService; +import org.opensearch.dataprepper.plugins.source.otellogs.certificate.CertificateProviderFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -89,25 +89,22 @@ import java.util.stream.Stream; import java.util.zip.GZIPOutputStream; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.DEFAULT_PORT; -import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS; -import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.SSL; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.contains; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.doThrow; @@ -117,6 +114,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.DEFAULT_PORT; +import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS; +import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.SSL; @ExtendWith(MockitoExtension.class) class OTelLogsSourceTest { @@ -800,8 +800,9 @@ private void assertSecureResponseWithStatusCode(final AggregatedHttpResponse res .stream() .map(Map.Entry::getKey) .map(AsciiString::toString) + .map(String::toLowerCase) .collect(Collectors.toList()); - assertThat("Response Header Keys", headerKeys, not(contains("server"))); + assertThat("Response Header Keys", headerKeys, not(hasItem("server"))); } private byte[] createGZipCompressedPayload(final String payload) throws IOException { diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle index 00bc6d0f11..5d74fd169d 100644 --- a/data-prepper-plugins/s3-sink/build.gradle +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -23,8 +23,10 @@ dependencies { implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.8.21' implementation libs.commons.lang3 testImplementation project(':data-prepper-test-common') - implementation project(':data-prepper-plugins:parquet-codecs') - implementation project(':data-prepper-plugins:parse-json-processor') + testImplementation project(':data-prepper-plugins:parquet-codecs') + testImplementation project(':data-prepper-plugins:parse-json-processor') + testImplementation project(':data-prepper-plugins:csv-processor') + testImplementation project(':data-prepper-plugins:avro-codecs') } test { @@ -55,7 +57,7 @@ task integrationTest(type: Test) { classpath = sourceSets.integrationTest.runtimeClasspath systemProperty 'tests.s3sink.bucket', System.getProperty('tests.s3sink.bucket') - systemProperty 'tests.s3ink.region', System.getProperty('tests.s3sink.region') + systemProperty 'tests.s3sink.region', System.getProperty('tests.s3sink.region') filter { includeTestsMatching '*IT' diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java index 7134dc47fc..d679663f11 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java @@ -5,10 +5,13 @@ package org.opensearch.dataprepper.plugins.sink.s3; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.ParquetReadOptions; @@ -24,9 +27,9 @@ import org.apache.parquet.io.RecordReader; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; -import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -43,13 +46,16 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.OutputCodecContext; import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.codec.GZipDecompressionEngine; import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputCodec; import org.opensearch.dataprepper.plugins.codec.json.NdjsonOutputConfig; import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodec; import org.opensearch.dataprepper.plugins.codec.parquet.ParquetOutputCodecConfig; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; @@ -66,6 +72,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.time.Duration; @@ -129,7 +136,7 @@ class S3SinkServiceIT { @BeforeEach public void setUp() { - s3region = System.getProperty("tests.s3ink.region"); + s3region = System.getProperty("tests.s3sink.region"); s3Client = S3Client.builder().region(Region.of(s3region)).build(); bucketName = System.getProperty("tests.s3sink.bucket"); @@ -168,19 +175,65 @@ void configureNewLineCodec() { } @Test - void verify_flushed_records_into_s3_bucketNewLine() { + void verify_flushed_records_into_s3_bucketNewLine() throws JsonProcessingException { configureNewLineCodec(); S3SinkService s3SinkService = createObjectUnderTest(); Collection> recordsData = setEventQueue(); s3SinkService.output(recordsData); - String objectData = getS3Object(); + String objectData = new String(getS3Object()); + final ObjectMapper objectMapperForDeserialization = new ObjectMapper(); int count = 0; - String[] objectDataArr = objectData.split("\r\n"); + + String[] objectDataArr = objectData.split(System.lineSeparator()); + assertThat(objectDataArr.length, equalTo(recordsData.size())); + for (Record recordData : recordsData) { - String objectRecord = recordData.getData().toJsonString(); - assertThat(objectDataArr[count], CoreMatchers.containsString(objectRecord)); + final String actualLine = objectDataArr[count]; + final Map actualDeserializedJson = objectMapperForDeserialization.readValue(actualLine, Map.class); + + final Map expectedMap = new HashMap<>(recordData.getData().toMap()); + expectedMap.put("Tag", new ArrayList<>(recordData.getData().getMetadata().getTags())); + assertThat(actualDeserializedJson, equalTo(expectedMap)); + + final String expectedJsonString = recordData.getData().jsonBuilder().includeTags("Tag").toJsonString(); + assertThat(actualLine, equalTo(expectedJsonString)); + count++; + } + } + + @Test + void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOException { + configureNewLineCodec(); + bufferFactory = new CompressionBufferFactory(bufferFactory, CompressionOption.GZIP.getCompressionEngine()); + S3SinkService s3SinkService = createObjectUnderTest(); + Collection> recordsData = setEventQueue(); + + s3SinkService.output(recordsData); + byte[] s3ObjectBytes = getS3Object(); + + ByteArrayInputStream s3ObjectInputStream = new ByteArrayInputStream(s3ObjectBytes); + InputStream decompressingInputStream = new GZipDecompressionEngine().createInputStream(s3ObjectInputStream); + + String objectData = IOUtils.toString(decompressingInputStream, Charset.defaultCharset()); + + final ObjectMapper objectMapperForDeserialization = new ObjectMapper(); + int count = 0; + + String[] objectDataArr = objectData.split(System.lineSeparator()); + assertThat(objectDataArr.length, equalTo(recordsData.size())); + + for (Record recordData : recordsData) { + final String actualLine = objectDataArr[count]; + final Map actualDeserializedJson = objectMapperForDeserialization.readValue(actualLine, Map.class); + + final Map expectedMap = new HashMap<>(recordData.getData().toMap()); + expectedMap.put("Tag", new ArrayList<>(recordData.getData().getMetadata().getTags())); + assertThat(actualDeserializedJson, equalTo(expectedMap)); + + final String expectedJsonString = recordData.getData().jsonBuilder().includeTags("Tag").toJsonString(); + assertThat(actualLine, equalTo(expectedJsonString)); count++; } } @@ -202,7 +255,7 @@ private int gets3ObjectCount() { return s3ObjectCount; } - private String getS3Object() { + private byte[] getS3Object() { ListObjectsRequest listObjects = ListObjectsRequest.builder() .bucket(bucketName) @@ -220,8 +273,7 @@ private String getS3Object() { .bucket(bucketName).build(); ResponseBytes objectBytes = s3Client.getObjectAsBytes(objectRequest); - byte[] data = objectBytes.asByteArray(); - return new String(data); + return objectBytes.asByteArray(); } private String getPathPrefix() { @@ -240,20 +292,19 @@ private static Record createRecord() { final EventMetadata defaultEventMetadata = DefaultEventMetadata.builder(). withEventType(EventType.LOG.toString()). withTags(testTags).build(); - Map json = generateJson(testTags); + Map json = generateJson(); final JacksonEvent event = JacksonLog.builder().withData(json).withEventMetadata(defaultEventMetadata).build(); event.setEventHandle(mock(EventHandle.class)); return new Record<>(event); } - private static Map generateJson(Set testTags) { + private static Map generateJson() { final Map jsonObject = new LinkedHashMap<>(); for (int i = 0; i < 2; i++) { jsonObject.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); } jsonObject.put(UUID.randomUUID().toString(), Arrays.asList(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString())); - jsonObject.put("Tag", testTags.toArray()); return jsonObject; } @@ -280,6 +331,7 @@ private static List generateRecords(int numberOfRecords) { } @Test + @Disabled void verify_flushed_records_into_s3_bucket_Parquet() throws IOException { configureParquetCodec(); S3SinkService s3SinkService = createObjectUnderTest(); @@ -287,7 +339,7 @@ void verify_flushed_records_into_s3_bucket_Parquet() throws IOException { s3SinkService.output(recordsData); - List> actualRecords = createParquetRecordsList(new ByteArrayInputStream(getS3Object().getBytes())); + List> actualRecords = createParquetRecordsList(new ByteArrayInputStream(getS3Object())); int index = 0; for (final HashMap actualMap : actualRecords) { assertThat(actualMap, notNullValue()); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java index 11aa67637d..c880a72464 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java @@ -21,8 +21,10 @@ import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.LocalFileBufferFactory; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; @@ -64,11 +66,14 @@ public S3Sink(final PluginSetting pluginSetting, codec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings); sinkInitialized = Boolean.FALSE; + final BufferFactory innerBufferFactory; if (s3SinkConfig.getBufferType().equals(BufferTypeOptions.LOCALFILE)) { - bufferFactory = new LocalFileBufferFactory(); + innerBufferFactory = new LocalFileBufferFactory(); } else { - bufferFactory = new InMemoryBufferFactory(); + innerBufferFactory = new InMemoryBufferFactory(); } + final CompressionEngine compressionEngine = s3SinkConfig.getCompression().getCompressionEngine(); + bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine); final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier); s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, OutputCodecContext.fromSinkContext(sinkContext), s3Client, pluginMetrics); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java index 1b18994f66..6124f20538 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java @@ -9,8 +9,10 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; @@ -19,6 +21,7 @@ * s3 sink configuration class contains properties, used to read yaml configuration. */ public class S3SinkConfig { + static final String S3_PREFIX = "s3://"; private static final int DEFAULT_CONNECTION_RETRIES = 5; private static final int DEFAULT_UPLOAD_RETRIES = 5; @@ -29,13 +32,16 @@ public class S3SinkConfig { private AwsAuthenticationOptions awsAuthenticationOptions; @JsonProperty("bucket") - @NotNull @NotEmpty + @Size(min = 3, max = 500, message = "bucket length should be at least 3 characters") private String bucketName; @JsonProperty("object_key") private ObjectKeyOptions objectKeyOptions; + @JsonProperty("compression") + private CompressionOption compression = CompressionOption.NONE; + @JsonProperty("threshold") @NotNull private ThresholdOptions thresholdOptions; @@ -73,6 +79,9 @@ public ThresholdOptions getThresholdOptions() { * @return bucket name. */ public String getBucketName() { + if (bucketName.startsWith(S3_PREFIX)) { + return bucketName.substring(S3_PREFIX.length()); + } return bucketName; } @@ -118,4 +127,8 @@ public int getMaxConnectionRetries() { public int getMaxUploadRetries() { return maxUploadRetries; } + + public CompressionOption getCompression() { + return compression; + } } \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java index b90775ed47..afd695db2b 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/Buffer.java @@ -6,7 +6,6 @@ package org.opensearch.dataprepper.plugins.sink.s3.accumulator; import software.amazon.awssdk.services.s3.S3Client; -import java.io.IOException; import java.io.OutputStream; /** @@ -22,11 +21,9 @@ public interface Buffer { int getEventCount(); long getDuration(); - boolean isCodecStarted(); - void setCodecStarted(boolean codecStarted); void flushToS3(S3Client s3Client, String bucket, String key) ; - void writeEvent(byte[] bytes) throws IOException; + OutputStream getOutputStream(); void setEventCount(int eventCount); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java new file mode 100644 index 0000000000..440c030ac0 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBuffer.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; +import software.amazon.awssdk.services.s3.S3Client; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Objects; + +class CompressionBuffer implements Buffer { + private final Buffer innerBuffer; + private final CompressionEngine compressionEngine; + private volatile OutputStream outputStream; + + CompressionBuffer(final Buffer innerBuffer, final CompressionEngine compressionEngine) { + this.innerBuffer = Objects.requireNonNull(innerBuffer); + this.compressionEngine = Objects.requireNonNull(compressionEngine); + } + + @Override + public long getSize() { + return innerBuffer.getSize(); + } + + @Override + public int getEventCount() { + return innerBuffer.getEventCount(); + } + + @Override + public long getDuration() { + return innerBuffer.getDuration(); + } + + @Override + public void flushToS3(final S3Client s3Client, final String bucket, final String key) { + innerBuffer.flushToS3(s3Client, bucket, key); + } + + @Override + public OutputStream getOutputStream() { + if(outputStream == null) { + synchronized (this) { + if(outputStream == null) { + final OutputStream innerBufferOutputStream = innerBuffer.getOutputStream(); + try { + outputStream = compressionEngine.createOutputStream(innerBufferOutputStream); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + } + return outputStream; + } + + @Override + public void setEventCount(final int eventCount) { + innerBuffer.setEventCount(eventCount); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java new file mode 100644 index 0000000000..5dcb652f0f --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactory.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; + +import java.util.Objects; + +public class CompressionBufferFactory implements BufferFactory { + private final BufferFactory innerBufferFactory; + private final CompressionEngine compressionEngine; + + public CompressionBufferFactory(final BufferFactory innerBufferFactory, final CompressionEngine compressionEngine) { + this.innerBufferFactory = Objects.requireNonNull(innerBufferFactory); + this.compressionEngine = Objects.requireNonNull(compressionEngine); + } + + @Override + public Buffer getBuffer() { + return new CompressionBuffer(innerBufferFactory.getBuffer(), compressionEngine); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java index ea1f3bc697..58121912d7 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/InMemoryBuffer.java @@ -10,7 +10,6 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.TimeUnit; @@ -61,27 +60,6 @@ public void flushToS3(S3Client s3Client, String bucket, String key) { RequestBody.fromBytes(byteArray)); } - /** - * write byte array to output stream. - * - * @param bytes byte array. - * @throws IOException while writing to output stream fails. - */ - @Override - public void writeEvent(byte[] bytes) throws IOException { - byteArrayOutputStream.write(bytes); - byteArrayOutputStream.write(System.lineSeparator().getBytes()); - eventCount++; - } - @Override - public boolean isCodecStarted() { - return isCodecStarted; - } - - @Override - public void setCodecStarted(boolean codecStarted) { - isCodecStarted = codecStarted; - } @Override public void setEventCount(int eventCount) { this.eventCount = eventCount; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java index 733a2b86fa..52b6229d92 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/LocalFileBuffer.java @@ -77,18 +77,6 @@ public void flushToS3(S3Client s3Client, String bucket, String key) { removeTemporaryFile(); } - /** - * write byte array to output stream. - * @param bytes byte array. - * @throws IOException while writing to output stream fails. - */ - @Override - public void writeEvent(byte[] bytes) throws IOException { - outputStream.write(bytes); - outputStream.write(System.lineSeparator().getBytes()); - eventCount++; - } - /** * Flushing the buffered data into the output stream. */ @@ -113,15 +101,7 @@ protected void removeTemporaryFile() { } } } - @Override - public boolean isCodecStarted() { - return isCodecStarted; - } - @Override - public void setCodecStarted(boolean codecStarted) { - isCodecStarted = codecStarted; - } @Override public void setEventCount(int eventCount) { this.eventCount = eventCount; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionEngine.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionEngine.java new file mode 100644 index 0000000000..46ffc503ad --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionEngine.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import java.io.IOException; +import java.io.OutputStream; + +public interface CompressionEngine { + OutputStream createOutputStream(OutputStream outputStream) throws IOException; +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java new file mode 100644 index 0000000000..7e759909d5 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public enum CompressionOption { + NONE("none", NoneCompressionEngine::new), + GZIP("gzip", GZipCompressionEngine::new); + + private static final Map OPTIONS_MAP = Arrays.stream(CompressionOption.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private final String option; + private final Supplier compressionEngineSupplier; + + CompressionOption(final String option, final Supplier compressionEngineSupplier) { + this.option = option.toLowerCase(); + this.compressionEngineSupplier = compressionEngineSupplier; + } + + public CompressionEngine getCompressionEngine() { + return compressionEngineSupplier.get(); + } + + @JsonCreator + public static CompressionOption fromOptionValue(final String option) { + return OPTIONS_MAP.get(option.toLowerCase()); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java new file mode 100644 index 0000000000..f59956a8ed --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; + +import java.io.IOException; +import java.io.OutputStream; + +public class GZipCompressionEngine implements CompressionEngine { + @Override + public OutputStream createOutputStream(final OutputStream outputStream) throws IOException { + return new GzipCompressorOutputStream(outputStream); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java new file mode 100644 index 0000000000..9c852b4f85 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import java.io.OutputStream; + +public class NoneCompressionEngine implements CompressionEngine { + @Override + public OutputStream createOutputStream(final OutputStream outputStream) { + return outputStream; + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfigTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfigTest.java index 297a1ef818..d1660ebc63 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfigTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfigTest.java @@ -16,6 +16,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig.S3_PREFIX; class S3SinkConfigTest { @@ -45,6 +46,15 @@ void get_bucket_name_test() throws NoSuchFieldException, IllegalAccessException assertThat(objectUnderTest.getBucketName(), equalTo(bucketName)); } + @Test + void get_bucket_name_with_s3_prefix_test() throws NoSuchFieldException, IllegalAccessException { + final String bucketName = UUID.randomUUID().toString(); + final String bucketNameWithPrefix = S3_PREFIX + bucketName; + final S3SinkConfig objectUnderTest = new S3SinkConfig(); + ReflectivelySetField.setField(S3SinkConfig.class, objectUnderTest, "bucketName", bucketNameWithPrefix); + assertThat(objectUnderTest.getBucketName(), equalTo(bucketName)); + } + @Test void get_object_key_test() { assertThat("Object key is not an instance of ObjectKeyOptions", diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java index 61d27cecae..75ae2dde1c 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkTest.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferTypeOptions; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption; import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.s3.configuration.ThresholdOptions; @@ -70,6 +71,7 @@ void setUp() { when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(MAX_EVENTS); when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse(MAXIMUM_SIZE)); when(s3SinkConfig.getThresholdOptions().getEventCollectTimeOut()).thenReturn(Duration.ofSeconds(MAX_RETRIES)); + when(s3SinkConfig.getCompression()).thenReturn(CompressionOption.NONE); when(objectKeyOptions.getNamePattern()).thenReturn(OBJECT_KEY_NAME_PATTERN); when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(S3_REGION)); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java new file mode 100644 index 0000000000..a27798f3df --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferFactoryTest.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class CompressionBufferFactoryTest { + @Mock + private BufferFactory innerBufferFactory; + + @Mock + private CompressionEngine compressionEngine; + + private CompressionBufferFactory createObjectUnderTest() { + return new CompressionBufferFactory(innerBufferFactory, compressionEngine); + } + + @Test + void constructor_throws_if_inner_BufferFactory_is_null() { + innerBufferFactory = null; + + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_if_CompressionEngine_is_null() { + compressionEngine = null; + + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Nested + class WithBuffer { + @Mock + private Buffer innerBuffer; + + @BeforeEach + void setUp() { + when(innerBufferFactory.getBuffer()).thenReturn(innerBuffer); + } + + @Test + void getBuffer_returns_CompressionBuffer() { + final Buffer buffer = createObjectUnderTest().getBuffer(); + assertThat(buffer, instanceOf(CompressionBuffer.class)); + } + + @Test + void getBuffer_returns_new_on_each_call() { + final CompressionBufferFactory objectUnderTest = createObjectUnderTest(); + final Buffer firstBuffer = objectUnderTest.getBuffer(); + + assertThat(objectUnderTest.getBuffer(), not(equalTo(firstBuffer))); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java new file mode 100644 index 0000000000..3a7055414b --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/CompressionBufferTest.java @@ -0,0 +1,140 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.accumulator; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine; +import software.amazon.awssdk.services.s3.S3Client; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Random; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class CompressionBufferTest { + @Mock + private Buffer innerBuffer; + + @Mock + private CompressionEngine compressionEngine; + + private Random random; + + @BeforeEach + void setUp() { + random = new Random(); + } + + private CompressionBuffer createObjectUnderTest() { + return new CompressionBuffer(innerBuffer, compressionEngine); + } + + @Test + void constructor_throws_if_innerBuffer_is_null() { + innerBuffer = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_if_compressionEngine_is_null() { + compressionEngine = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void getSize_returns_inner_getSize() { + final long size = random.nextInt(10_000) + 1_000; + + final CompressionBuffer objectUnderTest = createObjectUnderTest(); + when(innerBuffer.getSize()).thenReturn(size); + + assertThat(objectUnderTest.getSize(), equalTo(size)); + } + + @Test + void getEventCount_returns_inner_getEventCount() { + final int eventCount = random.nextInt(10_000) + 1_000; + + final CompressionBuffer objectUnderTest = createObjectUnderTest(); + when(innerBuffer.getEventCount()).thenReturn(eventCount); + + assertThat(objectUnderTest.getEventCount(), equalTo(eventCount)); + } + + @Test + void getDuration_returns_inner_getDuration() { + final long duration = random.nextInt(10_000) + 1_000; + + final CompressionBuffer objectUnderTest = createObjectUnderTest(); + when(innerBuffer.getDuration()).thenReturn(duration); + + assertThat(objectUnderTest.getDuration(), equalTo(duration)); + } + + @Test + void flushToS3_calls_inner_flushToS3() { + final S3Client s3Client = mock(S3Client.class); + final String bucket = UUID.randomUUID().toString(); + final String key = UUID.randomUUID().toString(); + + createObjectUnderTest().flushToS3(s3Client, bucket, key); + + verify(innerBuffer).flushToS3(s3Client, bucket, key); + } + + @Test + void getOutputStream_returns_outputStream_via_CompressionEngine() throws IOException { + final OutputStream innerBufferOutputStream = mock(OutputStream.class); + when(innerBuffer.getOutputStream()).thenReturn(innerBufferOutputStream); + final OutputStream compressionEngineOutputStream = mock(OutputStream.class); + when(compressionEngine.createOutputStream(innerBufferOutputStream)).thenReturn(compressionEngineOutputStream); + + final OutputStream actualOutputStream = createObjectUnderTest().getOutputStream(); + + + assertThat(actualOutputStream, sameInstance(compressionEngineOutputStream)); + } + + @Test + void getOutputStream_wraps_OutputStream_only_once() throws IOException { + final OutputStream innerBufferOutputStream = mock(OutputStream.class); + when(innerBuffer.getOutputStream()).thenReturn(innerBufferOutputStream); + final OutputStream compressionEngineOutputStream = mock(OutputStream.class); + when(compressionEngine.createOutputStream(innerBufferOutputStream)).thenReturn(compressionEngineOutputStream); + + final CompressionBuffer objectUnderTest = createObjectUnderTest(); + final OutputStream outputStream = objectUnderTest.getOutputStream(); + assertThat(objectUnderTest.getOutputStream(), sameInstance(outputStream)); + assertThat(objectUnderTest.getOutputStream(), sameInstance(outputStream)); + assertThat(objectUnderTest.getOutputStream(), sameInstance(outputStream)); + + verify(compressionEngine, times(1)).createOutputStream(any()); + } + + @Test + void setEventCount_calls_inner_setEventCount() { + final int eventCount = random.nextInt(10_000) + 1_000; + + createObjectUnderTest().setEventCount(eventCount); + + verify(innerBuffer).setEventCount(eventCount); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngineTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngineTest.java new file mode 100644 index 0000000000..a92930e958 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngineTest.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +class GZipCompressionEngineTest { + + private GZipCompressionEngine createObjectUnderTest() { + return new GZipCompressionEngine(); + } + + @Test + void createOutputStream_should_return_GzipCompressorOutputStream() throws IOException { + final OutputStream innerOutputStream = mock(OutputStream.class); + final OutputStream outputStream = createObjectUnderTest().createOutputStream(innerOutputStream); + + assertThat(outputStream, instanceOf(GzipCompressorOutputStream.class)); + } + + @Test + void createOutputStream_should_write_compressed_data() throws IOException { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + + final OutputStream outputStream = createObjectUnderTest().createOutputStream(byteArrayOutputStream); + + final byte[] inputBytes = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); + + outputStream.write(inputBytes); + outputStream.close(); + + final byte[] writtenBytes = byteArrayOutputStream.toByteArray(); + + assertTrue(GzipCompressorInputStream.matches(writtenBytes, 2)); + + final ByteArrayInputStream verificationInputStream = new ByteArrayInputStream(writtenBytes); + + final GzipCompressorInputStream uncompressingInputStream = new GzipCompressorInputStream(verificationInputStream); + final byte[] uncompressedBytes = uncompressingInputStream.readAllBytes(); + assertThat(uncompressedBytes, equalTo(inputBytes)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngineTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngineTest.java new file mode 100644 index 0000000000..17c581b0c7 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngineTest.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.OutputStream; + +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoInteractions; + +class NoneCompressionEngineTest { + + private OutputStream innerOutputStream; + + @BeforeEach + void setUp() { + innerOutputStream = mock(OutputStream.class); + } + + private NoneCompressionEngine createObjectUnderTest() { + return new NoneCompressionEngine(); + } + + @Test + void createOutputStream_returns_innerOutputStream() { + OutputStream outputStream = createObjectUnderTest().createOutputStream(innerOutputStream); + + assertThat(outputStream, sameInstance(innerOutputStream)); + verifyNoInteractions(innerOutputStream); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java index cadcaf71e8..e1ebea9fa0 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java @@ -7,10 +7,14 @@ import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption; +import org.opensearch.dataprepper.plugins.source.configuration.S3ScanSchedulingOptions; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.utils.Pair; import java.time.Instant; @@ -18,6 +22,7 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -26,23 +31,38 @@ public class S3ScanPartitionCreationSupplier implements Function, List> { + private static final Logger LOG = LoggerFactory.getLogger(S3ScanPartitionCreationSupplier.class); + private static final String BUCKET_OBJECT_PARTITION_KEY_FORMAT = "%s|%s"; + static final String SCAN_COUNT = "SCAN_COUNT"; + static final String LAST_SCAN_TIME = "LAST_SCAN_TIME"; private final S3Client s3Client; private final BucketOwnerProvider bucketOwnerProvider; private final List scanOptionsList; + private final S3ScanSchedulingOptions schedulingOptions; public S3ScanPartitionCreationSupplier(final S3Client s3Client, final BucketOwnerProvider bucketOwnerProvider, - final List scanOptionsList) { + final List scanOptionsList, + final S3ScanSchedulingOptions schedulingOptions) { this.s3Client = s3Client; this.bucketOwnerProvider = bucketOwnerProvider; this.scanOptionsList = scanOptionsList; + this.schedulingOptions = schedulingOptions; } @Override public List apply(final Map globalStateMap) { + if (globalStateMap.isEmpty()) { + initializeGlobalStateMap(globalStateMap); + } + + if (shouldScanBeSkipped(globalStateMap)) { + return Collections.emptyList(); + } + final List objectsToProcess = new ArrayList<>(); for (final ScanOptions scanOptions : scanOptionsList) { @@ -60,27 +80,33 @@ public List apply(final Map globalStateMap) s3ScanKeyPathOption.getS3scanIncludePrefixOptions().forEach(includePath -> { listObjectsV2Request.prefix(includePath); objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, - scanOptions.getBucketOption().getName(), scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime())); + scanOptions.getBucketOption().getName(), scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap)); }); else objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, - scanOptions.getBucketOption().getName(), scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime())); + scanOptions.getBucketOption().getName(), scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime(), globalStateMap)); } + globalStateMap.put(SCAN_COUNT, (Integer) globalStateMap.get(SCAN_COUNT) + 1); + globalStateMap.put(LAST_SCAN_TIME, Instant.now().toEpochMilli()); + return objectsToProcess; } private List listFilteredS3ObjectsForBucket(final List excludeKeyPaths, - final ListObjectsV2Request.Builder listObjectsV2Request, - final String bucket, - final LocalDateTime startDateTime, - final LocalDateTime endDateTime) { + final ListObjectsV2Request.Builder listObjectsV2Request, + final String bucket, + final LocalDateTime startDateTime, + final LocalDateTime endDateTime, + final Map globalStateMap) { + Instant mostRecentLastModifiedTimestamp = globalStateMap.containsKey(bucket) ? Instant.parse((String) globalStateMap.get(bucket)) : null; final List allPartitionIdentifiers = new ArrayList<>(); ListObjectsV2Response listObjectsV2Response = null; do { listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request.fetchOwner(true).continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null).build()); allPartitionIdentifiers.addAll(listObjectsV2Response.contents().stream() + .filter(s3Object -> isLastModifiedTimeAfterMostRecentScanForBucket(bucket, s3Object, globalStateMap)) .map(s3Object -> Pair.of(s3Object.key(), instantToLocalDateTime(s3Object.lastModified()))) .filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/")) .filter(keyTimestampPair -> excludeKeyPaths.stream() @@ -89,8 +115,15 @@ private List listFilteredS3ObjectsForBucket(final List PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build()) .collect(Collectors.toList())); + + LOG.info("Found page of {} objects from bucket {}", listObjectsV2Response.keyCount(), bucket); + + mostRecentLastModifiedTimestamp = getMostRecentLastModifiedTimestamp(listObjectsV2Response, mostRecentLastModifiedTimestamp); } while (listObjectsV2Response.isTruncated()); + globalStateMap.put(bucket, Objects.nonNull(mostRecentLastModifiedTimestamp) ? mostRecentLastModifiedTimestamp.toString() : null); + + LOG.info("Returning partitions for {} S3 objects from bucket {}", allPartitionIdentifiers.size(), bucket); return allPartitionIdentifiers; } @@ -106,9 +139,78 @@ private LocalDateTime instantToLocalDateTime(final Instant instant) { private boolean isKeyMatchedBetweenTimeRange(final LocalDateTime lastModifiedTime, final LocalDateTime startDateTime, final LocalDateTime endDateTime){ - if (Objects.isNull(startDateTime) || Objects.isNull(endDateTime)) { + if (Objects.isNull(startDateTime) || Objects.isNull(endDateTime) || Objects.nonNull(schedulingOptions)) { return true; } return lastModifiedTime.isAfter(startDateTime) && lastModifiedTime.isBefore(endDateTime); } + + private void initializeGlobalStateMap(final Map globalStateMap) { + globalStateMap.put(SCAN_COUNT, 0); + } + + private boolean isLastModifiedTimeAfterMostRecentScanForBucket(final String bucketName, + final S3Object s3Object, + final Map globalStateMap) { + if (!globalStateMap.containsKey(bucketName) || Objects.isNull(globalStateMap.get(bucketName))) { + return true; + } + + final Instant lastProcessedObjectTimestamp = Instant.parse((String) globalStateMap.get(bucketName)); + + return s3Object.lastModified().compareTo(lastProcessedObjectTimestamp) > 0; + } + + private Instant getMostRecentLastModifiedTimestamp(final ListObjectsV2Response listObjectsV2Response, + Instant mostRecentLastModifiedTimestamp) { + + if (Objects.isNull(schedulingOptions)) { + return null; + } + + for (final S3Object s3Object : listObjectsV2Response.contents()) { + if (Objects.isNull(mostRecentLastModifiedTimestamp) || s3Object.lastModified().isAfter(mostRecentLastModifiedTimestamp)) { + mostRecentLastModifiedTimestamp = s3Object.lastModified(); + } + } + + return mostRecentLastModifiedTimestamp; + } + + private boolean shouldScanBeSkipped(final Map globalStateMap) { + if (Objects.isNull(schedulingOptions) && hasAlreadyBeenScanned(globalStateMap)) { + LOG.info("Skipping scan because the buckets have already been scanned once"); + return true; + } + + if (Objects.nonNull(schedulingOptions) && + (hasReachedMaxScanCount(globalStateMap) || !hasReachedScheduledScanTime(globalStateMap))) { + + if (hasReachedMaxScanCount(globalStateMap)) { + LOG.info("Skipping scan as the max scan count {} has been reached", schedulingOptions.getCount()); + } else { + LOG.info("Skipping scan as the interval of {} seconds has not been reached yet", schedulingOptions.getInterval().toSeconds()); + } + + return true; + } + + return false; + } + + private boolean hasAlreadyBeenScanned(final Map globalStateMap) { + return (Integer) globalStateMap.get(SCAN_COUNT) > 0; + } + + private boolean hasReachedMaxScanCount(final Map globalStateMap) { + return (Integer) globalStateMap.get(SCAN_COUNT) >= schedulingOptions.getCount(); + } + + private boolean hasReachedScheduledScanTime(final Map globalStateMap) { + if (!globalStateMap.containsKey(LAST_SCAN_TIME)) { + return true; + } + + return Instant.now().minus(schedulingOptions.getInterval()).isAfter(Instant.ofEpochMilli((Long) globalStateMap.get(LAST_SCAN_TIME))); + } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java index 2b1d8388aa..39a251cddc 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java @@ -42,6 +42,7 @@ public class ScanObjectWorker implements Runnable{ private static final Logger LOG = LoggerFactory.getLogger(ScanObjectWorker.class); private static final int STANDARD_BACKOFF_MILLIS = 30_000; + private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000; static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE; static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter"; @@ -93,13 +94,24 @@ public ScanObjectWorker(final S3Client s3Client, acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME); this.sourceCoordinator.initialize(); - this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList); + this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions); } @Override public void run() { while (!shouldStopProcessing) { - startProcessingObject(STANDARD_BACKOFF_MILLIS); + + try { + startProcessingObject(STANDARD_BACKOFF_MILLIS); + } catch (final Exception e) { + LOG.error("Received an exception while processing S3 objects, backing off and retrying", e); + try { + Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS); + } catch (InterruptedException ex) { + LOG.error("S3 Scan worker thread interrupted while backing off.", ex); + } + } + } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java index c840b82907..b54dab4075 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java @@ -6,8 +6,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.datatype.jsr310.deser.DurationDeserializer; import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.Size; import org.opensearch.dataprepper.plugins.source.CustomLocalDateTimeDeserializer; import java.time.Duration; @@ -19,7 +20,11 @@ * Class consists the bucket related configuration properties. */ public class S3ScanBucketOption { + private static final String S3_PREFIX = "s3://"; + @JsonProperty("name") + @NotEmpty + @Size(min = 3, max = 500, message = "bucket length should be at least 3 characters") private String name; @JsonDeserialize(using = CustomLocalDateTimeDeserializer.class) @@ -30,7 +35,6 @@ public class S3ScanBucketOption { @JsonProperty("end_time") private LocalDateTime endTime; - @JsonDeserialize(using = DurationDeserializer.class) @JsonProperty("range") private Duration range; @@ -43,6 +47,9 @@ public boolean hasValidTimeOptions() { } public String getName() { + if (name.startsWith(S3_PREFIX)) { + return name.substring(S3_PREFIX.length()); + } return name; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java index 6948597119..3db6abb179 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.datatype.jsr310.deser.DurationDeserializer; +import jakarta.validation.Valid; import jakarta.validation.constraints.AssertTrue; import org.opensearch.dataprepper.plugins.source.CustomLocalDateTimeDeserializer; @@ -20,7 +20,7 @@ * Class consists the scan options list bucket configuration properties. */ public class S3ScanScanOptions { - @JsonDeserialize(using = DurationDeserializer.class) + @JsonProperty("range") private Duration range; @@ -33,16 +33,23 @@ public class S3ScanScanOptions { private LocalDateTime endTime; @JsonProperty("buckets") + @Valid private List buckets; @JsonProperty("scheduling") - private S3ScanSchedulingOptions schedulingOptions = new S3ScanSchedulingOptions(); + @Valid + private S3ScanSchedulingOptions schedulingOptions; @AssertTrue(message = "At most two options from start_time, end_time and range can be specified at the same time") public boolean hasValidTimeOptions() { return Stream.of(startTime, endTime, range).filter(Objects::nonNull).count() < 3; } + @AssertTrue(message = "start_time, end_time, and range are not valid options when using scheduling with s3 scan") + public boolean hasValidTimeOptionsWithScheduling() { + return !Objects.nonNull(schedulingOptions) || Stream.of(startTime, endTime, range).noneMatch(Objects::nonNull); + } + public Duration getRange() { return range; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptions.java index c474675818..604b8debc2 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptions.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanSchedulingOptions.java @@ -7,16 +7,23 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import org.hibernate.validator.constraints.time.DurationMax; +import org.hibernate.validator.constraints.time.DurationMin; import java.time.Duration; public class S3ScanSchedulingOptions { + @JsonProperty("interval") - private Duration interval = Duration.ofHours(8); + @NotNull + @DurationMin(seconds = 30L, message = "S3 scan interval must be at least 30 seconds") + @DurationMax(days = 365L, message = "S3 scan interval must be less than or equal to 365 days") + private Duration interval; - @Min(1) + @Min(2) @JsonProperty("count") - private int count = 1; + private int count = Integer.MAX_VALUE; public Duration getInterval() { return interval; diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplierTest.java index d3c557aa06..40a9501766 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplierTest.java @@ -15,17 +15,20 @@ import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOption; import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption; +import org.opensearch.dataprepper.plugins.source.configuration.S3ScanSchedulingOptions; import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; +import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +42,10 @@ import static org.hamcrest.Matchers.notNullValue; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.opensearch.dataprepper.plugins.source.S3ScanPartitionCreationSupplier.LAST_SCAN_TIME; +import static org.opensearch.dataprepper.plugins.source.S3ScanPartitionCreationSupplier.SCAN_COUNT; @ExtendWith(MockitoExtension.class) public class S3ScanPartitionCreationSupplierTest { @@ -51,6 +58,8 @@ public class S3ScanPartitionCreationSupplierTest { private List scanOptionsList; + private S3ScanSchedulingOptions schedulingOptions; + @BeforeEach void setup() { scanOptionsList = new ArrayList<>(); @@ -58,11 +67,12 @@ void setup() { private Function, List> createObjectUnderTest() { - return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList); + return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions); } @Test - void getNextPartition_supplier_returns_expected_PartitionIdentifiers() { + void getNextPartition_supplier_without_scheduling_options_returns_expected_PartitionIdentifiers() { + schedulingOptions = null; final String firstBucket = UUID.randomUUID().toString(); final String secondBucket = UUID.randomUUID().toString(); @@ -130,11 +140,137 @@ void getNextPartition_supplier_returns_expected_PartitionIdentifiers() { final ArgumentCaptor listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse); - final List resultingPartitions = partitionCreationSupplier.apply(new HashMap<>()); + final Map globalStateMap = new HashMap<>(); + final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); + + assertThat(globalStateMap, notNullValue()); + assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); + assertThat(globalStateMap.get(SCAN_COUNT), equalTo(1)); + + assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList())); assertThat(resultingPartitions, notNullValue()); assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); } + + @Test + void getNextPartition_supplier_with_scheduling_options_returns_expected_PartitionIdentifiers() { + schedulingOptions = mock(S3ScanSchedulingOptions.class); + given(schedulingOptions.getInterval()).willReturn(Duration.ofMillis(0)); + given(schedulingOptions.getCount()).willReturn(2); + + final String firstBucket = "bucket-one"; + final String secondBucket = "bucket-two"; + + final ScanOptions firstBucketScanOptions = mock(ScanOptions.class); + final S3ScanBucketOption firstBucketScanBucketOption = mock(S3ScanBucketOption.class); + given(firstBucketScanOptions.getBucketOption()).willReturn(firstBucketScanBucketOption); + given(firstBucketScanBucketOption.getName()).willReturn(firstBucket); + given(firstBucketScanOptions.getUseStartDateTime()).willReturn(null); + given(firstBucketScanOptions.getUseEndDateTime()).willReturn(null); + final S3ScanKeyPathOption firstBucketScanKeyPath = mock(S3ScanKeyPathOption.class); + given(firstBucketScanBucketOption.getS3ScanFilter()).willReturn(firstBucketScanKeyPath); + given(firstBucketScanKeyPath.getS3scanIncludePrefixOptions()).willReturn(List.of(UUID.randomUUID().toString())); + given(firstBucketScanKeyPath.getS3ScanExcludeSuffixOptions()).willReturn(List.of(".invalid")); + scanOptionsList.add(firstBucketScanOptions); + + final ScanOptions secondBucketScanOptions = mock(ScanOptions.class); + final S3ScanBucketOption secondBucketScanBucketOption = mock(S3ScanBucketOption.class); + given(secondBucketScanOptions.getBucketOption()).willReturn(secondBucketScanBucketOption); + given(secondBucketScanBucketOption.getName()).willReturn(secondBucket); + given(secondBucketScanOptions.getUseStartDateTime()).willReturn(null); + given(secondBucketScanOptions.getUseEndDateTime()).willReturn(null); + final S3ScanKeyPathOption secondBucketScanKeyPath = mock(S3ScanKeyPathOption.class); + given(secondBucketScanBucketOption.getS3ScanFilter()).willReturn(secondBucketScanKeyPath); + given(secondBucketScanKeyPath.getS3scanIncludePrefixOptions()).willReturn(null); + given(secondBucketScanKeyPath.getS3ScanExcludeSuffixOptions()).willReturn(null); + scanOptionsList.add(secondBucketScanOptions); + + final Function, List> partitionCreationSupplier = createObjectUnderTest(); + + final List expectedPartitionIdentifiers = new ArrayList<>(); + + final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class); + final List s3ObjectsList = new ArrayList<>(); + + final S3Object invalidFolderObject = mock(S3Object.class); + given(invalidFolderObject.key()).willReturn("folder-key/"); + given(invalidFolderObject.lastModified()).willReturn(Instant.now()); + s3ObjectsList.add(invalidFolderObject); + + final S3Object invalidForFirstBucketSuffixObject = mock(S3Object.class); + given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid"); + given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now()); + s3ObjectsList.add(invalidForFirstBucketSuffixObject); + expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build()); + + final Instant mostRecentFirstScan = Instant.now().plusSeconds(1); + final S3Object validObject = mock(S3Object.class); + given(validObject.key()).willReturn("valid"); + given(validObject.lastModified()).willReturn(mostRecentFirstScan); + s3ObjectsList.add(validObject); + expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build()); + expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build()); + + final S3Object secondScanObject = mock(S3Object.class); + final Instant mostRecentSecondScan = Instant.now().plusSeconds(10); + given(secondScanObject.key()).willReturn("second-scan"); + given(secondScanObject.lastModified()).willReturn(mostRecentSecondScan); + + final List expectedPartitionIdentifiersSecondScan = new ArrayList<>(); + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + secondScanObject.key()).build()); + expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + secondScanObject.key()).build()); + + final List secondScanObjects = new ArrayList<>(s3ObjectsList); + secondScanObjects.add(secondScanObject); + given(listObjectsResponse.contents()) + .willReturn(s3ObjectsList) + .willReturn(s3ObjectsList) + .willReturn(s3ObjectsList) + .willReturn(s3ObjectsList) + .willReturn(secondScanObjects) + .willReturn(secondScanObjects) + .willReturn(secondScanObjects) + .willReturn(secondScanObjects); + + final ArgumentCaptor listObjectsV2RequestArgumentCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); + given(s3Client.listObjectsV2(listObjectsV2RequestArgumentCaptor.capture())).willReturn(listObjectsResponse); + + final Map globalStateMap = new HashMap<>(); + final List resultingPartitions = partitionCreationSupplier.apply(globalStateMap); + + assertThat(resultingPartitions, notNullValue()); + assertThat(resultingPartitions.size(), equalTo(expectedPartitionIdentifiers.size())); + assertThat(resultingPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey) + .map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(globalStateMap, notNullValue()); + assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); + assertThat(globalStateMap.get(SCAN_COUNT), equalTo(1)); + assertThat(globalStateMap.containsKey(firstBucket), equalTo(true)); + assertThat(globalStateMap.get(firstBucket), equalTo(mostRecentFirstScan.toString())); + assertThat(globalStateMap.containsKey(secondBucket), equalTo(true)); + assertThat(globalStateMap.get(secondBucket), equalTo(mostRecentFirstScan.toString())); + + final List secondScanPartitions = partitionCreationSupplier.apply(globalStateMap); + assertThat(secondScanPartitions.size(), equalTo(expectedPartitionIdentifiersSecondScan.size())); + assertThat(secondScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()), + containsInAnyOrder(expectedPartitionIdentifiersSecondScan.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList()))); + + assertThat(globalStateMap, notNullValue()); + assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); + assertThat(globalStateMap.get(SCAN_COUNT), equalTo(2)); + assertThat(globalStateMap.containsKey(firstBucket), equalTo(true)); + assertThat(globalStateMap.get(firstBucket), equalTo(mostRecentSecondScan.toString())); + assertThat(globalStateMap.containsKey(secondBucket), equalTo(true)); + assertThat(globalStateMap.get(secondBucket), equalTo(mostRecentSecondScan.toString())); + assertThat(Instant.ofEpochMilli((Long) globalStateMap.get(LAST_SCAN_TIME)).isBefore(Instant.now()), equalTo(true)); + + assertThat(partitionCreationSupplier.apply(globalStateMap), equalTo(Collections.emptyList())); + + verify(listObjectsResponse, times(8)).contents(); + } } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/ScanOptionsTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/ScanOptionsTest.java index e4d50ef9ba..804d6e1e52 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/ScanOptionsTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/ScanOptionsTest.java @@ -42,12 +42,15 @@ public void s3scan_options_with_valid_global_time_range_build_success( @ParameterizedTest @MethodSource("invalidTimeRangeOptions") public void s3scan_options_with_invalid_global_time_range_throws_exception_when_build( - LocalDateTime startDateTime, LocalDateTime endDateTime, Duration range) { + LocalDateTime startDateTime, LocalDateTime endDateTime, Duration range) throws NoSuchFieldException, IllegalAccessException { + S3ScanBucketOption bucketOption = new S3ScanBucketOption(); + setField(S3ScanBucketOption.class, bucketOption, "name", "bucket_name"); + assertThrows(IllegalArgumentException.class, () -> ScanOptions.builder() .setStartDateTime(startDateTime) .setEndDateTime(endDateTime) .setRange(range) - .setBucketOption(new S3ScanBucketOption()) + .setBucketOption(bucketOption) .build()); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOptionTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOptionTest.java index 69661e6efd..629c6726e7 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOptionTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOptionTest.java @@ -41,4 +41,11 @@ public void s3scan_bucket_options_with_scan_buckets_yaml_configuration_test() th assertThat(s3ScanBucketOption.getS3ScanFilter().getS3ScanExcludeSuffixOptions(),instanceOf(List.class)); assertThat(s3ScanBucketOption.getS3ScanFilter().getS3ScanExcludeSuffixOptions().get(1),equalTo(".png")); } + + @Test + public void s3scan_bucket_options_with_s3_prefix_test() throws JsonProcessingException { + final String bucketOptionsYaml = "---\nname: s3://test-s3-source-test-output"; + final S3ScanBucketOption s3ScanBucketOption = objectMapper.readValue(bucketOptionsYaml, S3ScanBucketOption.class); + assertThat(s3ScanBucketOption.getName(), equalTo("test-s3-source-test-output")); + } } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptionsTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptionsTest.java index e0c7890520..f0e31de164 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptionsTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptionsTest.java @@ -10,7 +10,6 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import org.junit.Test; -import java.time.Duration; import java.time.LocalDateTime; import java.util.List; @@ -25,7 +24,6 @@ public class S3ScanScanOptionsTest { @Test public void s3scan_options_test_with_scan_yaml_configuration_test() throws JsonProcessingException { final String scanYaml = " start_time: 2023-01-21T18:00:00\n" + - " range: P90DT3H4M\n" + " end_time: 2023-04-21T18:00:00\n" + " buckets:\n" + " - bucket:\n" + @@ -38,7 +36,6 @@ public void s3scan_options_test_with_scan_yaml_configuration_test() throws JsonP final S3ScanScanOptions s3ScanScanOptions = objectMapper.readValue(scanYaml, S3ScanScanOptions.class); assertThat(s3ScanScanOptions.getStartTime(),equalTo(LocalDateTime.parse("2023-01-21T18:00:00"))); assertThat(s3ScanScanOptions.getEndTime(),equalTo(LocalDateTime.parse("2023-04-21T18:00:00"))); - assertThat(s3ScanScanOptions.getRange(),equalTo(Duration.parse("P90DT3H4M"))); assertThat(s3ScanScanOptions.getBuckets(),instanceOf(List.class)); assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getName(),equalTo("test-s3-source-test-output")); assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getS3ScanFilter().getS3ScanExcludeSuffixOptions(),instanceOf(List.class));