diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/CustomLocalDateTimeDeserializer.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/CustomLocalDateTimeDeserializer.java deleted file mode 100644 index c118e95cbc..0000000000 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/CustomLocalDateTimeDeserializer.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * - */ -package org.opensearch.dataprepper.plugins.source; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.deser.std.StdDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.time.LocalDateTime; -import java.time.format.DateTimeParseException; - -public class CustomLocalDateTimeDeserializer extends StdDeserializer { - private static final Logger LOG = LoggerFactory.getLogger(CustomLocalDateTimeDeserializer.class); - static final String CURRENT_LOCAL_DATE_TIME_STRING = "now"; - - public CustomLocalDateTimeDeserializer() { - this(null); - } - - public CustomLocalDateTimeDeserializer(Class vc) { - super(vc); - } - - @Override - public LocalDateTime deserialize(JsonParser parser, DeserializationContext context) throws IOException { - final String valueAsString = parser.getValueAsString(); - - if (valueAsString.equals(CURRENT_LOCAL_DATE_TIME_STRING)) { - return LocalDateTime.now(); - } else { - try { - return LocalDateTime.parse(valueAsString); - } catch (final DateTimeParseException e) { - LOG.error("Unable to parse {} to LocalDateTime.", valueAsString, e); - throw new IllegalArgumentException("Unable to obtain instance of LocalDateTime from " + valueAsString, e); - } - } - } -} diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanOptions.java index 389c2150f8..eeba77d066 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanOptions.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanOptions.java @@ -18,17 +18,17 @@ */ public class ScanOptions { private static final Logger LOG = LoggerFactory.getLogger(ScanOptions.class); - private LocalDateTime startDateTime; + private final LocalDateTime startDateTime; - private Duration range; + private final Duration range; - private S3ScanBucketOption bucketOption; + private final S3ScanBucketOption bucketOption; - private LocalDateTime endDateTime; + private final LocalDateTime endDateTime; - private LocalDateTime useStartDateTime; + private final LocalDateTime useStartDateTime; - private LocalDateTime useEndDateTime; + private final LocalDateTime useEndDateTime; private ScanOptions(Builder builder){ this.startDateTime = builder.startDateTime; @@ -96,11 +96,7 @@ public Builder setBucketOption(S3ScanBucketOption bucketOption) { } public ScanOptions build() { - LocalDateTime bucketStartDateTime = Objects.isNull(bucketOption.getStartTime()) ? startDateTime : bucketOption.getStartTime(); - LocalDateTime bucketEndDateTime = Objects.isNull(bucketOption.getEndTime()) ? endDateTime : bucketOption.getEndTime(); - Duration bucketRange = Objects.isNull(bucketOption.getRange()) ? range : bucketOption.getRange(); - - long nonNullCount = Stream.of(bucketStartDateTime, bucketEndDateTime, bucketRange) + long globalLevelNonNullCount = Stream.of(startDateTime, endDateTime, range) .filter(Objects::nonNull) .count(); @@ -109,54 +105,35 @@ public ScanOptions build() { .filter(Objects::nonNull) .count(); - if (nonNullCount == 3) { - if (originalBucketLevelNonNullCount == 3) { - scanRangeDateValidationError(); - } else if (originalBucketLevelNonNullCount == 2) { - setDateTimeToUse(bucketOption.getStartTime(), bucketOption.getEndTime(), bucketOption.getRange()); - } else if (originalBucketLevelNonNullCount == 1) { - if (Objects.nonNull(bucketOption.getStartTime()) || Objects.nonNull(bucketOption.getEndTime())) { - setDateTimeToUse(bucketOption.getStartTime(), bucketOption.getEndTime(), bucketOption.getRange()); - } else { - LOG.warn("Scan is configured with start_time and end_time at global level and range at bucket level for the bucket with name {}. " + - "Unable to establish a time period with range alone at bucket level. " + - "Using start_time and end_time configured at global level and ignoring range.", bucketOption.getName()); - setDateTimeToUse(startDateTime, endDateTime, range); - } - } - } else { - setDateTimeToUse(bucketStartDateTime, bucketEndDateTime, bucketRange); + if (originalBucketLevelNonNullCount != 0) { + setDateTimeToUse(bucketOption.getStartTime(), bucketOption.getEndTime(), bucketOption.getRange()); + } else if (globalLevelNonNullCount != 0) { + setDateTimeToUse(startDateTime, endDateTime, range); } + return new ScanOptions(this); } private void setDateTimeToUse(LocalDateTime bucketStartDateTime, LocalDateTime bucketEndDateTime, Duration bucketRange) { - if (Objects.nonNull(bucketStartDateTime) && Objects.nonNull(bucketEndDateTime)) { this.useStartDateTime = bucketStartDateTime; this.useEndDateTime = bucketEndDateTime; - } else if (Objects.nonNull(bucketStartDateTime) && Objects.nonNull(bucketRange)) { - this.useStartDateTime = bucketStartDateTime; - this.useEndDateTime = bucketStartDateTime.plus(bucketRange); - } else if (Objects.nonNull(bucketEndDateTime) && Objects.nonNull(bucketRange)) { - this.useStartDateTime = bucketEndDateTime.minus(bucketRange); - this.useEndDateTime = bucketEndDateTime; + LOG.info("Scanning objects modified from {} to {} from bucket: {}", useStartDateTime, useEndDateTime, bucketOption.getName()); } else if (Objects.nonNull(bucketStartDateTime)) { this.useStartDateTime = bucketStartDateTime; + LOG.info("Scanning objects modified after {} from bucket: {}", useStartDateTime, bucketOption.getName()); } else if (Objects.nonNull(bucketEndDateTime)) { this.useEndDateTime = bucketEndDateTime; + LOG.info("Scanning objects modified before {} from bucket: {}", useEndDateTime, bucketOption.getName()); } else if (Objects.nonNull(bucketRange)) { - LOG.warn("Scan is configured with just range for the bucket with name {}. Unable to establish a time period with range alone. " + - "Configure start_time or end_time, else all the objects in the bucket will be included", bucketOption.getName()); + this.useEndDateTime = LocalDateTime.now(); + this.useStartDateTime = this.useEndDateTime.minus(bucketRange); + LOG.info("Scanning objects modified from {} to {} from bucket: {}", useStartDateTime, useEndDateTime, bucketOption.getName()); + } else { + LOG.info("Scanning all objects from bucket: {}", bucketOption.getName()); } } - private void scanRangeDateValidationError() { - String message = "To set a time range for the bucket with name " + bucketOption.getName() + - ", specify any two configurations from start_time, end_time and range"; - throw new IllegalArgumentException(message); - } - @Override public String toString() { return "startDateTime=" + startDateTime + diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java index b54dab4075..58492d0cce 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOption.java @@ -6,10 +6,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; +import jakarta.validation.constraints.AssertFalse; import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.Size; -import org.opensearch.dataprepper.plugins.source.CustomLocalDateTimeDeserializer; import java.time.Duration; import java.time.LocalDateTime; @@ -27,11 +28,11 @@ public class S3ScanBucketOption { @Size(min = 3, max = 500, message = "bucket length should be at least 3 characters") private String name; - @JsonDeserialize(using = CustomLocalDateTimeDeserializer.class) + @JsonDeserialize(using = LocalDateTimeDeserializer.class) @JsonProperty("start_time") private LocalDateTime startTime; - @JsonDeserialize(using = CustomLocalDateTimeDeserializer.class) + @JsonDeserialize(using = LocalDateTimeDeserializer.class) @JsonProperty("end_time") private LocalDateTime endTime; @@ -46,6 +47,11 @@ public boolean hasValidTimeOptions() { return Stream.of(startTime, endTime, range).filter(Objects::nonNull).count() < 3; } + @AssertFalse(message = "bucket start_time or end_time cannot be used along with range") + public boolean hasValidTimeAndRangeOptions() { + return (startTime != null || endTime != null) && range != null; + } + public String getName() { if (name.startsWith(S3_PREFIX)) { return name.substring(S3_PREFIX.length()); diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java index 3db6abb179..1c0df901d0 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptions.java @@ -6,9 +6,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; import jakarta.validation.Valid; +import jakarta.validation.constraints.AssertFalse; import jakarta.validation.constraints.AssertTrue; -import org.opensearch.dataprepper.plugins.source.CustomLocalDateTimeDeserializer; import java.time.Duration; import java.time.LocalDateTime; @@ -24,11 +25,11 @@ public class S3ScanScanOptions { @JsonProperty("range") private Duration range; - @JsonDeserialize(using = CustomLocalDateTimeDeserializer.class) + @JsonDeserialize(using = LocalDateTimeDeserializer.class) @JsonProperty("start_time") private LocalDateTime startTime; - @JsonDeserialize(using = CustomLocalDateTimeDeserializer.class) + @JsonDeserialize(using = LocalDateTimeDeserializer.class) @JsonProperty("end_time") private LocalDateTime endTime; @@ -45,6 +46,11 @@ public boolean hasValidTimeOptions() { return Stream.of(startTime, endTime, range).filter(Objects::nonNull).count() < 3; } + @AssertFalse(message = "start_time or end_time cannot be used along with range") + public boolean hasValidTimeAndRangeOptions() { + return (startTime != null || endTime != null) && range != null; + } + @AssertTrue(message = "start_time, end_time, and range are not valid options when using scheduling with s3 scan") public boolean hasValidTimeOptionsWithScheduling() { return !Objects.nonNull(schedulingOptions) || Stream.of(startTime, endTime, range).noneMatch(Objects::nonNull); diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/CustomLocalDateTimeDeserializerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/CustomLocalDateTimeDeserializerTest.java deleted file mode 100644 index e814da31f1..0000000000 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/CustomLocalDateTimeDeserializerTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * - */ - -package org.opensearch.dataprepper.plugins.source; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -import java.time.Duration; -import java.time.LocalDateTime; -import java.time.temporal.ChronoUnit; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; -import static org.junit.Assert.assertThrows; -import static org.opensearch.dataprepper.plugins.source.CustomLocalDateTimeDeserializer.CURRENT_LOCAL_DATE_TIME_STRING; - -class CustomLocalDateTimeDeserializerTest { - private ObjectMapper objectMapper; - - @BeforeEach - void setUp() { - objectMapper = new ObjectMapper(); - - final SimpleModule simpleModule = new SimpleModule(); - simpleModule.addDeserializer(LocalDateTime.class, new CustomLocalDateTimeDeserializer()); - objectMapper.registerModule(simpleModule); - } - - @ParameterizedTest - @ValueSource(strings = {"2023-01-2118:00:00", "2023-01-21T8:00:00"}) - void deserialize_with_invalid_values_throws(final String invalidDateTimeString) { - assertThrows(IllegalArgumentException.class, () -> objectMapper.convertValue(invalidDateTimeString, LocalDateTime.class)); - } - - @Test - void deserialize_with_predefined_custom_value_returns_current_local_datetime() { - final LocalDateTime expectedDateTime = objectMapper.convertValue(CURRENT_LOCAL_DATE_TIME_STRING, LocalDateTime.class); - assertThat(expectedDateTime, lessThan(LocalDateTime.now())); - assertThat(expectedDateTime, greaterThan(LocalDateTime.now().minus(Duration.of(5, ChronoUnit.SECONDS)))); - } - - @Test - void deserialize_with_iso_local_date_time_string_returns_correct_local_datetime() { - final String testLocalDateTimeString = "2023-01-21T18:30:45"; - final LocalDateTime expectedDateTime = objectMapper.convertValue(testLocalDateTimeString, LocalDateTime.class); - assertThat(expectedDateTime, equalTo(LocalDateTime.of(2023, 1, 21, 18, 30, 45))); - assertThat(expectedDateTime.getYear(), equalTo(2023)); - assertThat(expectedDateTime.getMonthValue(), equalTo(1)); - assertThat(expectedDateTime.getDayOfMonth(), equalTo(21)); - assertThat(expectedDateTime.getHour(), equalTo(18)); - assertThat(expectedDateTime.getMinute(), equalTo(30)); - assertThat(expectedDateTime.getSecond(), equalTo(45)); - } -} \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanServiceTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanServiceTest.java index 09807e24b8..de40f6d79d 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanServiceTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3ScanServiceTest.java @@ -24,6 +24,8 @@ import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -57,12 +59,10 @@ void scan_service_test_and_verify_thread_invoking() { void scan_service_with_valid_s3_scan_configuration_test_and_verify() { final String bucketName="my-bucket-5"; final LocalDateTime startDateTime = LocalDateTime.parse("2023-03-07T10:00:00"); - final Duration range = Duration.parse("P2DT1H"); final List includeKeyPathList = List.of("file1.csv","file2.csv"); final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class); final S3ScanScanOptions s3ScanScanOptions = mock(S3ScanScanOptions.class); when(s3ScanScanOptions.getStartTime()).thenReturn(startDateTime); - when(s3ScanScanOptions.getRange()).thenReturn(range); S3ScanBucketOptions bucket = mock(S3ScanBucketOptions.class); final S3ScanBucketOption s3ScanBucketOption = mock(S3ScanBucketOption.class); when(s3ScanBucketOption.getName()).thenReturn(bucketName); @@ -78,13 +78,12 @@ void scan_service_with_valid_s3_scan_configuration_test_and_verify() { assertThat(scanOptionsBuilder.get(0).getBucketOption().getS3ScanFilter().getS3scanIncludePrefixOptions(),sameInstance(includeKeyPathList)); assertThat(scanOptionsBuilder.get(0).getBucketOption().getName(),sameInstance(bucketName)); assertThat(scanOptionsBuilder.get(0).getUseStartDateTime(),equalTo(startDateTime)); - assertThat(scanOptionsBuilder.get(0).getUseEndDateTime(),equalTo(startDateTime.plus(range))); + assertThat(scanOptionsBuilder.get(0).getUseEndDateTime(),equalTo(null)); } @Test void scan_service_with_valid_bucket_time_range_configuration_test_and_verify() { final String bucketName="my-bucket-5"; - final LocalDateTime startDateTime = LocalDateTime.parse("2023-03-07T10:00:00"); final Duration range = Duration.parse("P2DT1H"); final List includeKeyPathList = List.of("file1.csv","file2.csv"); final S3SourceConfig s3SourceConfig = mock(S3SourceConfig.class); @@ -94,7 +93,6 @@ void scan_service_with_valid_bucket_time_range_configuration_test_and_verify() { when(s3ScanBucketOption.getName()).thenReturn(bucketName); S3ScanKeyPathOption s3ScanKeyPathOption = mock(S3ScanKeyPathOption.class); when(s3ScanKeyPathOption.getS3scanIncludePrefixOptions()).thenReturn(includeKeyPathList); - when(s3ScanBucketOption.getStartTime()).thenReturn(startDateTime); when(s3ScanBucketOption.getRange()).thenReturn(range); when(s3ScanBucketOption.getS3ScanFilter()).thenReturn(s3ScanKeyPathOption); when(bucket.getS3ScanBucketOption()).thenReturn(s3ScanBucketOption); @@ -104,8 +102,10 @@ void scan_service_with_valid_bucket_time_range_configuration_test_and_verify() { final List scanOptionsBuilder = service.getScanOptions(); assertThat(scanOptionsBuilder.get(0).getBucketOption().getS3ScanFilter().getS3scanIncludePrefixOptions(),sameInstance(includeKeyPathList)); assertThat(scanOptionsBuilder.get(0).getBucketOption().getName(),sameInstance(bucketName)); - assertThat(scanOptionsBuilder.get(0).getUseStartDateTime(),equalTo(startDateTime)); - assertThat(scanOptionsBuilder.get(0).getUseEndDateTime(),equalTo(startDateTime.plus(range))); + assertThat(scanOptionsBuilder.get(0).getUseStartDateTime(), lessThanOrEqualTo(LocalDateTime.now().minus(range))); + assertThat(scanOptionsBuilder.get(0).getUseStartDateTime(), greaterThanOrEqualTo(LocalDateTime.now().minus(range).minus(Duration.parse("PT5S")))); + assertThat(scanOptionsBuilder.get(0).getUseEndDateTime(), lessThanOrEqualTo(LocalDateTime.now())); + assertThat(scanOptionsBuilder.get(0).getUseEndDateTime(), greaterThanOrEqualTo(LocalDateTime.now().minus(Duration.parse("PT5S")))); } @Test diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/ScanOptionsTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/ScanOptionsTest.java index e508307b2d..6ac0b2b3cf 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/ScanOptionsTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/ScanOptionsTest.java @@ -17,31 +17,34 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; class ScanOptionsTest { @ParameterizedTest @MethodSource("validGlobalTimeRangeOptions") - public void s3scan_options_with_valid_global_time_range_build_success( + void s3scan_options_with_valid_global_time_range_build_success( LocalDateTime startDateTime, LocalDateTime endDateTime, Duration range, - LocalDateTime useStartDateTime, LocalDateTime useEndDateTime) { + LocalDateTime useStartDateTime, LocalDateTime useEndDateTime) throws NoSuchFieldException, IllegalAccessException { + S3ScanBucketOption bucketOption = new S3ScanBucketOption(); + setField(S3ScanBucketOption.class, bucketOption, "name", "bucket_name"); final ScanOptions scanOptions = ScanOptions.builder() .setStartDateTime(startDateTime) .setEndDateTime(endDateTime) .setRange(range) - .setBucketOption(new S3ScanBucketOption()) + .setBucketOption(bucketOption) .build(); - assertThat(scanOptions.getUseStartDateTime(), equalTo(useStartDateTime)); - assertThat(scanOptions.getUseEndDateTime(), equalTo(useEndDateTime)); assertThat(scanOptions.getBucketOption(), instanceOf(S3ScanBucketOption.class)); + assertThat(scanOptions.getBucketOption().getName(), equalTo("bucket_name")); + validateStartAndEndTime(useStartDateTime, useEndDateTime, scanOptions); } @ParameterizedTest @MethodSource("validBucketTimeRangeOptions") - public void s3scan_options_with_valid_bucket_time_range_build_success( + void s3scan_options_with_valid_bucket_time_range_build_success( LocalDateTime bucketStartDateTime, LocalDateTime bucketEndDateTime, Duration bucketRange, LocalDateTime useStartDateTime, LocalDateTime useEndDateTime) throws NoSuchFieldException, IllegalAccessException { S3ScanBucketOption bucketOption = new S3ScanBucketOption(); @@ -53,30 +56,14 @@ public void s3scan_options_with_valid_bucket_time_range_build_success( .setBucketOption(bucketOption) .build(); - assertThat(scanOptions.getUseStartDateTime(), equalTo(useStartDateTime)); - assertThat(scanOptions.getUseEndDateTime(), equalTo(useEndDateTime)); assertThat(scanOptions.getBucketOption(), instanceOf(S3ScanBucketOption.class)); assertThat(scanOptions.getBucketOption().getName(), equalTo("bucket_name")); - } - - @ParameterizedTest - @MethodSource("invalidTimeRangeOptions") - public void s3scan_options_with_invalid_bucket_time_range_throws_exception_when_build( - LocalDateTime bucketStartDateTime, LocalDateTime bucketEndDateTime, Duration bucketRange - ) throws NoSuchFieldException, IllegalAccessException { - S3ScanBucketOption bucketOption = new S3ScanBucketOption(); - setField(S3ScanBucketOption.class, bucketOption, "name", "bucket_name"); - setField(S3ScanBucketOption.class, bucketOption, "startTime", bucketStartDateTime); - setField(S3ScanBucketOption.class, bucketOption, "endTime", bucketEndDateTime); - setField(S3ScanBucketOption.class, bucketOption, "range", bucketRange); - assertThrows(IllegalArgumentException.class, () -> ScanOptions.builder() - .setBucketOption(bucketOption) - .build()); + validateStartAndEndTime(useStartDateTime, useEndDateTime, scanOptions); } @ParameterizedTest @MethodSource("validCombinedTimeRangeOptions") - public void s3scan_options_with_valid_combined_time_range_build_success( + void s3scan_options_with_valid_combined_time_range_build_success( LocalDateTime globalStartDateTime, LocalDateTime globeEndDateTime, Duration globalRange, LocalDateTime bucketStartDateTime, LocalDateTime bucketEndDateTime, Duration bucketRange, LocalDateTime useStartDateTime, LocalDateTime useEndDateTime) throws NoSuchFieldException, IllegalAccessException { @@ -92,62 +79,40 @@ public void s3scan_options_with_valid_combined_time_range_build_success( .setBucketOption(bucketOption) .build(); - assertThat(scanOptions.getUseStartDateTime(), equalTo(useStartDateTime)); - assertThat(scanOptions.getUseEndDateTime(), equalTo(useEndDateTime)); assertThat(scanOptions.getBucketOption(), instanceOf(S3ScanBucketOption.class)); assertThat(scanOptions.getBucketOption().getName(), equalTo("bucket_name")); + validateStartAndEndTime(useStartDateTime, useEndDateTime, scanOptions); } - @ParameterizedTest - @MethodSource("invalidCombinedTimeRangeOptions") - public void s3scan_options_with_invalid_combined_time_range_throws_exception_when_build( - LocalDateTime globalStartDateTime, LocalDateTime globeEndDateTime, Duration globalRange, - LocalDateTime bucketStartDateTime, LocalDateTime bucketEndDateTime, Duration bucketRange - ) throws NoSuchFieldException, IllegalAccessException { - S3ScanBucketOption bucketOption = new S3ScanBucketOption(); - setField(S3ScanBucketOption.class, bucketOption, "name", "bucket_name"); - setField(S3ScanBucketOption.class, bucketOption, "startTime", bucketStartDateTime); - setField(S3ScanBucketOption.class, bucketOption, "endTime", bucketEndDateTime); - setField(S3ScanBucketOption.class, bucketOption, "range", bucketRange); - - assertThrows(IllegalArgumentException.class, () -> ScanOptions.builder() - .setStartDateTime(globalStartDateTime) - .setEndDateTime(globeEndDateTime) - .setRange(globalRange) - .setBucketOption(bucketOption) - .build()); + private static void validateStartAndEndTime(final LocalDateTime useStartDateTime, + final LocalDateTime useEndDateTime, + final ScanOptions scanOptions) { + if (useStartDateTime != null) { + assertThat(scanOptions.getUseStartDateTime(), lessThanOrEqualTo(useStartDateTime.plus(Duration.parse("PT5S")))); + assertThat(scanOptions.getUseStartDateTime(), greaterThanOrEqualTo(useStartDateTime)); + } + if (useEndDateTime != null) { + assertThat(scanOptions.getUseEndDateTime(), lessThanOrEqualTo(useEndDateTime.plus(Duration.parse("PT5S")))); + assertThat(scanOptions.getUseEndDateTime(), greaterThanOrEqualTo(useEndDateTime)); + } } - + private static Stream validGlobalTimeRangeOptions() { return Stream.of( - Arguments.of( - LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00"), null, - LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00")), - Arguments.of(LocalDateTime.parse("2023-01-21T18:00:00"), null, Duration.parse("P3D"), - LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00")), - Arguments.of(null, LocalDateTime.parse("2023-01-24T18:00:00"), Duration.parse("P3D"), + Arguments.of(LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00"), null, LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00")), + Arguments.of(null, null, Duration.parse("P90D"), LocalDateTime.now().minus(Duration.parse("P90D")), LocalDateTime.now()), Arguments.of(LocalDateTime.parse("2023-01-21T18:00:00"), null, null, LocalDateTime.parse("2023-01-21T18:00:00"), null), Arguments.of(null, LocalDateTime.parse("2023-01-21T18:00:00"), null, null, LocalDateTime.parse("2023-01-21T18:00:00")), Arguments.of(null, null, null, null, null) ); } - private static Stream invalidTimeRangeOptions() { - return Stream.of( - Arguments.of(LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-04-21T18:00:00"), - Duration.parse("P90DT3H4M")) - ); - } - private static Stream validBucketTimeRangeOptions() { return Stream.of( Arguments.of(LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00"), null, LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00")), - Arguments.of(LocalDateTime.parse("2023-01-21T18:00:00"), null, Duration.ofDays(3L), - LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00")), - Arguments.of(null, LocalDateTime.parse("2023-01-24T18:00:00"), Duration.ofDays(3L), - LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00")), + Arguments.of(null, null, Duration.parse("P90D"), LocalDateTime.now().minus(Duration.parse("P90D")), LocalDateTime.now()), Arguments.of(LocalDateTime.parse("2023-01-21T18:00:00"), null, null, LocalDateTime.parse("2023-01-21T18:00:00"), null), Arguments.of(null, LocalDateTime.parse("2023-01-21T18:00:00"), null, null, LocalDateTime.parse("2023-01-21T18:00:00")), Arguments.of(null, null, null, null, null) @@ -157,49 +122,33 @@ private static Stream validBucketTimeRangeOptions() { private static Stream validCombinedTimeRangeOptions() { return Stream.of( Arguments.of( - LocalDateTime.parse("2023-05-21T18:00:00"), null, null, - null, LocalDateTime.parse("2023-05-24T18:00:00"), null, - LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00")), - Arguments.of( - null, LocalDateTime.parse("2023-05-24T18:00:00"), null, - LocalDateTime.parse("2023-05-21T18:00:00"), null, null, - LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00")), + LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00"), null, + LocalDateTime.parse("2023-08-21T18:00:00"), LocalDateTime.parse("2023-08-24T18:00:00"), null, + LocalDateTime.parse("2023-08-21T18:00:00"), LocalDateTime.parse("2023-08-24T18:00:00")), Arguments.of( - LocalDateTime.parse("2023-05-21T18:00:00"), null, null, - null, null, Duration.ofDays(3L), - LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00")), + LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00"), null, + LocalDateTime.parse("2023-08-21T18:00:00"), null, null, + LocalDateTime.parse("2023-08-21T18:00:00"), null), Arguments.of( - LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00"), null, LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00"), null, - LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00")), + null, LocalDateTime.parse("2023-08-24T18:00:00"), null, + null, LocalDateTime.parse("2023-08-24T18:00:00")), Arguments.of( - LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00"), null, - LocalDateTime.parse("2023-05-21T18:00:00"), null, Duration.ofDays(3L), + LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00"), null, + null, null, null, LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00")), Arguments.of( - LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00"), null, - null, null, Duration.ofDays(3L), - LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00")), - Arguments.of( - LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-12-24T18:00:00"), null, - LocalDateTime.parse("2023-05-21T18:00:00"), null, null, - LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-12-24T18:00:00")), - Arguments.of( - LocalDateTime.parse("2023-01-21T18:00:00"), null, Duration.ofDays(3L), - null, LocalDateTime.parse("2023-05-21T18:00:00"), null, - null, LocalDateTime.parse("2023-05-21T18:00:00")), + LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00"), null, + null, null, Duration.parse("P90D"), + LocalDateTime.now().minus(Duration.parse("P90D")), LocalDateTime.now()), Arguments.of( - null, LocalDateTime.parse("2023-01-21T18:00:00"), Duration.ofDays(3L), - LocalDateTime.parse("2023-05-21T18:00:00"), null, null, - LocalDateTime.parse("2023-05-21T18:00:00"), null) - ); - } - - private static Stream invalidCombinedTimeRangeOptions() { - return Stream.of( + null, null, Duration.parse("P30D"), + null, null, Duration.parse("P90D"), + LocalDateTime.now().minus(Duration.parse("P90D")), LocalDateTime.now()), Arguments.of( - LocalDateTime.parse("2023-01-21T18:00:00"), LocalDateTime.parse("2023-01-24T18:00:00"), Duration.ofDays(3L), - LocalDateTime.parse("2023-05-21T18:00:00"), LocalDateTime.parse("2023-05-24T18:00:00"), Duration.ofDays(3L)) + null, null, Duration.parse("P30D"), + null, null, null, + LocalDateTime.now().minus(Duration.parse("P30D")), LocalDateTime.now()) ); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOptionTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOptionTest.java index 629c6726e7..6e4a6df6fc 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOptionTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanBucketOptionTest.java @@ -18,7 +18,7 @@ public class S3ScanBucketOptionTest { - private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); @Test public void s3scan_bucket_options_with_scan_buckets_yaml_configuration_test() throws JsonProcessingException { final String bucketOptionsYaml = " name: test-s3-source-test-output\n" + diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptionsTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptionsTest.java index f0e31de164..013d82aaeb 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptionsTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/configuration/S3ScanScanOptionsTest.java @@ -19,7 +19,7 @@ public class S3ScanScanOptionsTest { - private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); @Test public void s3scan_options_test_with_scan_yaml_configuration_test() throws JsonProcessingException {