From ff1a6213996e52e40ea3571ca51a1651902b58b9 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka <41027584+kkondaka@users.noreply.github.com> Date: Mon, 14 Oct 2024 12:54:49 -0700 Subject: [PATCH] Rename s3 sink object metadata config options (#5041) * Addressed review comments. Introduced a new config, will deprecate the old config Signed-off-by: Kondaka * Addressed review comments. Introduced a new config for metadata Signed-off-by: Kondaka * Addressed review comments. Created a separate class for object metadata Signed-off-by: Kondaka * Addressed review comments. Signed-off-by: Kondaka * Fixed indentation Signed-off-by: Kondaka --------- Signed-off-by: Kondaka (cherry picked from commit 8216fdcf849c96af0de8af46abaa6f00d5ac7512) --- .../plugins/sink/s3/ObjectMetadata.java | 43 ++++++++++++++ .../plugins/sink/s3/ObjectMetadataConfig.java | 18 ++++++ .../plugins/sink/s3/S3SinkConfig.java | 13 ++++- .../sink/s3/grouping/S3GroupIdentifier.java | 17 ++++-- .../s3/grouping/S3GroupIdentifierFactory.java | 2 +- .../sink/s3/ObjectMetadataConfigTest.java | 30 ++++++++++ .../plugins/sink/s3/ObjectMetadataTest.java | 58 +++++++++++++++++++ 7 files changed, 173 insertions(+), 8 deletions(-) create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadata.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadataConfig.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadataConfigTest.java create mode 100644 data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadataTest.java diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadata.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadata.java new file mode 100644 index 0000000000..55785b6ee7 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadata.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3; + +import java.util.HashMap; +import java.util.Map; + +public class ObjectMetadata { + private Map metadata; + private ObjectMetadataConfig objectMetadataConfig; + private PredefinedObjectMetadata predefinedObjectMetadata; + + public ObjectMetadata(final Object objectMetadataConfig) { + if (objectMetadataConfig instanceof ObjectMetadataConfig) { + this.objectMetadataConfig = (ObjectMetadataConfig)objectMetadataConfig; + } else { // instanceof PredefinedObjectMetadata + this.predefinedObjectMetadata = (PredefinedObjectMetadata)objectMetadataConfig; + } + this.metadata = new HashMap(); + } + + public void setEventCount(final int eventCount) { + String numberOfEventsKey = null; + if (objectMetadataConfig != null) { + numberOfEventsKey = objectMetadataConfig.getNumberOfEventsKey(); + } else if (predefinedObjectMetadata != null) { + numberOfEventsKey = predefinedObjectMetadata.getNumberOfObjects(); + } + if (numberOfEventsKey != null) { + metadata.put(numberOfEventsKey, Integer.toString(eventCount)); + } + } + + public Map get() { + return metadata; + } + +} + + diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadataConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadataConfig.java new file mode 100644 index 0000000000..bfb6d6004a --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadataConfig.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class ObjectMetadataConfig { + @JsonProperty("number_of_events_key") + private String numberOfEventsKey; + + public String getNumberOfEventsKey() { + return numberOfEventsKey; + } + +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java index 9e690d739a..a33611e8a9 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkConfig.java @@ -46,6 +46,15 @@ public class S3SinkConfig { @JsonProperty("predefined_object_metadata") private PredefinedObjectMetadata predefinedObjectMetadata; + @JsonProperty("object_metadata") + private ObjectMetadataConfig objectMetadataConfig; + + @AssertTrue(message = "Only one of object_metadata and predefined_object_metadata can be used.") + private boolean isValidMetadataConfig() { + return (objectMetadataConfig != null && predefinedObjectMetadata == null) || + (objectMetadataConfig == null && predefinedObjectMetadata != null); + } + @AssertTrue(message = "You may not use both bucket and bucket_selector together in one S3 sink.") private boolean isValidBucketConfig() { return (bucketName != null && bucketSelector == null) || @@ -142,8 +151,8 @@ public ObjectKeyOptions getObjectKeyOptions() { return objectKeyOptions; } - public PredefinedObjectMetadata getPredefinedObjectMetadata() { - return predefinedObjectMetadata; + public ObjectMetadataConfig getObjectMetadataConfig() { + return objectMetadataConfig; } /** diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java index 52fa2578fd..96c767b9ea 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifier.java @@ -5,7 +5,7 @@ package org.opensearch.dataprepper.plugins.sink.s3.grouping; -import org.opensearch.dataprepper.plugins.sink.s3.PredefinedObjectMetadata; +import org.opensearch.dataprepper.plugins.sink.s3.ObjectMetadata; import java.util.Map; import java.util.Objects; @@ -13,16 +13,16 @@ class S3GroupIdentifier { private final Map groupIdentifierHash; private final String groupIdentifierFullObjectKey; - private final PredefinedObjectMetadata predefinedObjectMetadata; + private final Object objectMetadataConfig; private final String fullBucketName; public S3GroupIdentifier(final Map groupIdentifierHash, final String groupIdentifierFullObjectKey, - final PredefinedObjectMetadata predefineObjectMetadata, + final Object objectMetadataConfig, final String fullBucketName) { this.groupIdentifierHash = groupIdentifierHash; this.groupIdentifierFullObjectKey = groupIdentifierFullObjectKey; - this.predefinedObjectMetadata = predefineObjectMetadata; + this.objectMetadataConfig = objectMetadataConfig; this.fullBucketName = fullBucketName; } @@ -43,6 +43,13 @@ public int hashCode() { public Map getGroupIdentifierHash() { return groupIdentifierHash; } - public Map getMetadata(int eventCount) { return predefinedObjectMetadata != null ? Map.of(predefinedObjectMetadata.getNumberOfObjects(), Integer.toString(eventCount)) : null; } + public Map getMetadata(int eventCount) { + if (objectMetadataConfig == null) { + return null; + } + ObjectMetadata objectMetadata = new ObjectMetadata(objectMetadataConfig); + objectMetadata.setEventCount(eventCount); + return objectMetadata.get(); + } public String getFullBucketName() { return fullBucketName; } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java index 89315d95a1..4570898c7d 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupIdentifierFactory.java @@ -70,6 +70,6 @@ public S3GroupIdentifier getS3GroupIdentifierForEvent(final Event event) { } - return new S3GroupIdentifier(groupIdentificationHash, fullObjectKey, s3SinkConfig.getPredefinedObjectMetadata(), fullBucketName); + return new S3GroupIdentifier(groupIdentificationHash, fullObjectKey, s3SinkConfig.getObjectMetadataConfig(), fullBucketName); } } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadataConfigTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadataConfigTest.java new file mode 100644 index 0000000000..e745944d6c --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadataConfigTest.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3; + +import org.junit.jupiter.api.Test; + +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.UUID; + +public class ObjectMetadataConfigTest { + @Test + void test_default() { + assertThat(new ObjectMetadataConfig().getNumberOfEventsKey(), equalTo(null)); + } + + @Test + void test_number_of_events_key() throws Exception { + final String numberOfEventsKey = UUID.randomUUID().toString(); + final ObjectMetadataConfig objectUnderTest = new ObjectMetadataConfig(); + ReflectivelySetField.setField(ObjectMetadataConfig.class, objectUnderTest, "numberOfEventsKey", numberOfEventsKey); + assertThat(objectUnderTest.getNumberOfEventsKey(), equalTo(numberOfEventsKey)); + } +} + diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadataTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadataTest.java new file mode 100644 index 0000000000..eebb1be325 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/ObjectMetadataTest.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3; + +import static org.hamcrest.CoreMatchers.equalTo; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.hamcrest.MatcherAssert.assertThat; +import org.mockito.Mock; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Random; +import java.util.UUID; + +public class ObjectMetadataTest { + private ObjectMetadata objectMetadata; + @Mock + private ObjectMetadataConfig objectMetadataConfig; + @Mock + private PredefinedObjectMetadata predefinedObjectMetadata; + private String numberOfEventsKey; + + private ObjectMetadata createObjectUnderTest(Object metadataConfig) { + return new ObjectMetadata(metadataConfig); + } + + @BeforeEach + public void setup() { + objectMetadataConfig = mock(ObjectMetadataConfig.class); + predefinedObjectMetadata = mock(PredefinedObjectMetadata.class); + numberOfEventsKey = UUID.randomUUID().toString(); + when(objectMetadataConfig.getNumberOfEventsKey()).thenReturn(numberOfEventsKey); + when(predefinedObjectMetadata.getNumberOfObjects()).thenReturn(numberOfEventsKey); + } + + @Test + public void test_setEventCount_with_PredefinedObjectMetadata() { + objectMetadata = createObjectUnderTest(predefinedObjectMetadata); + Random random = new Random(); + Integer numEvents = Math.abs(random.nextInt()); + objectMetadata.setEventCount(numEvents); + assertThat(objectMetadata.get().get(numberOfEventsKey), equalTo(Integer.toString(numEvents))); + } + + @Test + void test_setEventCount_with_ObjectMetadata() { + objectMetadata = createObjectUnderTest(objectMetadataConfig); + Random random = new Random(); + Integer numEvents = Math.abs(random.nextInt()); + objectMetadata.setEventCount(numEvents); + assertThat(objectMetadata.get().get(numberOfEventsKey), equalTo(Integer.toString(numEvents))); + } + +}