diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java index 9b5a172ef9..6b5430d997 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java @@ -102,7 +102,7 @@ public void start(Buffer> buffer) { Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics); ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, shardManager, buffer); - Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, shardManager); + Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics); // May consider start or shutdown the scheduler on demand // Currently, event after the exports are done, the related scheduler will not be shutdown diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java index b53d5df2b6..70a6cbcf31 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java @@ -26,8 +26,8 @@ public class ExportRecordConverter extends RecordConverter { private static final String ITEM_KEY = "Item"; - static final String EXPORT_RECORD_SUCCESS_COUNT = "exportRecordSuccess"; - static final String EXPORT_RECORD_ERROR_COUNT = "exportRecordErrors"; + static final String EXPORT_RECORDS_PROCESSED_COUNT = "exportRecordsProcessed"; + static final String EXPORT_RECORDS_PROCESSING_ERROR_COUNT = "exportRecordProcessingErrors"; IonObjectMapper MAPPER = new IonObjectMapper(); @@ -40,8 +40,8 @@ public class ExportRecordConverter extends RecordConverter { public ExportRecordConverter(Buffer> buffer, TableInfo tableInfo, PluginMetrics pluginMetrics) { super(buffer, tableInfo); this.pluginMetrics = pluginMetrics; - this.exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORD_SUCCESS_COUNT); - this.exportRecordErrorCounter = pluginMetrics.counter(EXPORT_RECORD_ERROR_COUNT); + this.exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT); + this.exportRecordErrorCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java index 2a221298f0..0286627ba6 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.converter; public class MetadataKeyAttributes { - static final String COMPOSITE_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE = "_id"; + static final String PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE = "primary_key"; static final String PARTITION_KEY_METADATA_ATTRIBUTE = "partition_key"; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java index 3b5816778b..cedf7fb0f1 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java @@ -17,7 +17,7 @@ import java.util.List; import java.util.Map; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.COMPOSITE_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE; @@ -86,7 +86,7 @@ public Record convertToEvent(Map data, Instant eventCreat } eventMetadata.setAttribute(STREAM_EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(streamEventName)); - eventMetadata.setAttribute(COMPOSITE_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, getId(data)); + eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, getId(data)); eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, getPartitionKey(data)); eventMetadata.setAttribute(SORT_KEY_METADATA_ATTRIBUTE, getSortKey(data)); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java index 2ba6fdf45e..bdfe6dcbe1 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java @@ -26,8 +26,8 @@ public class StreamRecordConverter extends RecordConverter { private static final Logger LOG = LoggerFactory.getLogger(StreamRecordConverter.class); - static final String CHANGE_EVENT_SUCCESS_COUNT = "changeEventSuccess"; - static final String CHANGE_EVENT_ERROR_COUNT = "changeEventErrors"; + static final String CHANGE_EVENTS_PROCESSED_COUNT = "changeEventsProcessed"; + static final String CHANGE_EVENTS_PROCESSING_ERROR_COUNT = "changeEventsProcessingErrors"; private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -42,8 +42,8 @@ public class StreamRecordConverter extends RecordConverter { public StreamRecordConverter(Buffer> buffer, TableInfo tableInfo, PluginMetrics pluginMetrics) { super(buffer, tableInfo); this.pluginMetrics = pluginMetrics; - this.changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENT_SUCCESS_COUNT); - this.changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENT_ERROR_COUNT); + this.changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT); + this.changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT); } @Override diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java index 3e5a74eb5f..6c56323cca 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; @@ -41,7 +42,8 @@ public class DataFileScheduler implements Runnable { */ private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 15_000; - static final String EXPORT_FILE_SUCCESS_COUNT = "exportFileSuccess"; + static final String EXPORT_S3_OBJECTS_PROCESSED_COUNT = "exportS3ObjectsProcessed"; + static final String ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE = "activeExportS3ObjectConsumers"; private final EnhancedSourceCoordinator coordinator; @@ -54,6 +56,7 @@ public class DataFileScheduler implements Runnable { private final Counter exportFileSuccessCounter; + private final AtomicLong activeExportS3ObjectConsumersGauge; public DataFileScheduler(EnhancedSourceCoordinator coordinator, DataFileLoaderFactory loaderFactory, PluginMetrics pluginMetrics) { @@ -64,7 +67,8 @@ public DataFileScheduler(EnhancedSourceCoordinator coordinator, DataFileLoaderFa executor = Executors.newFixedThreadPool(MAX_JOB_COUNT); - this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_FILE_SUCCESS_COUNT); + this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT); + this.activeExportS3ObjectConsumersGauge = pluginMetrics.gauge(ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE, new AtomicLong()); } private void processDataFilePartition(DataFilePartition dataFilePartition) { @@ -83,13 +87,15 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) { public void run() { LOG.info("Start running Data File Scheduler"); - while (!Thread.interrupted()) { + while (!Thread.currentThread().isInterrupted()) { if (numOfWorkers.get() < MAX_JOB_COUNT) { final Optional sourcePartition = coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE); if (sourcePartition.isPresent()) { + activeExportS3ObjectConsumersGauge.incrementAndGet(); DataFilePartition dataFilePartition = (DataFilePartition) sourcePartition.get(); processDataFilePartition(dataFilePartition); + activeExportS3ObjectConsumersGauge.decrementAndGet(); } } try { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java index 86fc52572f..93f9e5b51a 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java @@ -51,8 +51,8 @@ public class ExportScheduler implements Runnable { private static final String FAILED_STATUS = "Failed"; static final String EXPORT_JOB_SUCCESS_COUNT = "exportJobSuccess"; - static final String EXPORT_JOB_ERROR_COUNT = "exportJobErrors"; - static final String EXPORT_FILES_TOTAL_COUNT = "exportFilesTotal"; + static final String EXPORT_JOB_FAILURE_COUNT = "exportJobFailure"; + static final String EXPORT_S3_OBJECTS_TOTAL_COUNT = "exportS3ObjectsTotal"; static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal"; private final PluginMetrics pluginMetrics; @@ -68,9 +68,9 @@ public class ExportScheduler implements Runnable { private final ExportTaskManager exportTaskManager; private final Counter exportJobSuccessCounter; - private final Counter exportJobErrorCounter; + private final Counter exportJobFailureCounter; - private final Counter exportFilesTotalCounter; + private final Counter exportS3ObjectsTotalCounter; private final Counter exportRecordsTotalCounter; public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, DynamoDbClient dynamoDBClient, ManifestFileReader manifestFileReader, PluginMetrics pluginMetrics) { @@ -83,8 +83,8 @@ public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, Dyna executor = Executors.newCachedThreadPool(); exportJobSuccessCounter = pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT); - exportJobErrorCounter = pluginMetrics.counter(EXPORT_JOB_ERROR_COUNT); - exportFilesTotalCounter = pluginMetrics.counter(EXPORT_FILES_TOTAL_COUNT); + exportJobFailureCounter = pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT); + exportS3ObjectsTotalCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT); exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT); @@ -92,8 +92,8 @@ public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, Dyna @Override public void run() { - LOG.info("Start running Export Scheduler"); - while (!Thread.interrupted()) { + LOG.debug("Start running Export Scheduler"); + while (!Thread.currentThread().isInterrupted()) { // Does not have limit on max leases // As most of the time it's just to wait final Optional sourcePartition = enhancedSourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE); @@ -186,7 +186,7 @@ private void createDataFilePartitions(String exportArn, String bucketName, Map sourcePartition = coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE); if (sourcePartition.isPresent()) { + activeChangeEventConsumers.incrementAndGet(); StreamPartition streamPartition = (StreamPartition) sourcePartition.get(); processStreamPartition(streamPartition); + activeChangeEventConsumers.decrementAndGet(); } } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java index d3f7175867..838bb9f0ab 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java @@ -33,8 +33,8 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORD_ERROR_COUNT; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORD_SUCCESS_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSING_ERROR_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSED_COUNT; @ExtendWith(MockitoExtension.class) class ExportRecordConverterTest { @@ -73,8 +73,8 @@ void setup() { tableInfo = new TableInfo(tableArn, metadata); - given(pluginMetrics.counter(EXPORT_RECORD_SUCCESS_COUNT)).willReturn(exportRecordSuccess); - given(pluginMetrics.counter(EXPORT_RECORD_ERROR_COUNT)).willReturn(exportRecordErrors); + given(pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT)).willReturn(exportRecordSuccess); + given(pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT)).willReturn(exportRecordErrors); } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java index f51d14b70f..1b9b161ecc 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java @@ -38,8 +38,8 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENT_ERROR_COUNT; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENT_SUCCESS_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSING_ERROR_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSED_COUNT; @ExtendWith(MockitoExtension.class) class StreamRecordConverterTest { @@ -77,8 +77,8 @@ void setup() { tableInfo = new TableInfo(tableArn, metadata); - given(pluginMetrics.counter(CHANGE_EVENT_SUCCESS_COUNT)).willReturn(changeEventSuccessCounter); - given(pluginMetrics.counter(CHANGE_EVENT_ERROR_COUNT)).willReturn(changeEventErrorCounter); + given(pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT)).willReturn(changeEventSuccessCounter); + given(pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT)).willReturn(changeEventErrorCounter); } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java index 535ef994fc..3f3d15d8cd 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.export; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -30,14 +31,21 @@ import java.util.Random; import java.util.StringJoiner; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.zip.GZIPOutputStream; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) +@Disabled class DataFileLoaderTest { @Mock @@ -128,9 +136,12 @@ void test_run_loadFile_correctly() throws InterruptedException { .checkpointer(checkpointer) .build(); - loader.run(); - // Run for a while - Thread.sleep(1000); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + final Future future = executorService.submit(loader); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); // Should call s3 getObject verify(s3Client).getObject(any(GetObjectRequest.class)); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java index 33acaf9003..dd7562341b 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java @@ -25,14 +25,21 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileScheduler.EXPORT_FILE_SUCCESS_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileScheduler.ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileScheduler.EXPORT_S3_OBJECTS_PROCESSED_COUNT; @ExtendWith(MockitoExtension.class) class DataFileSchedulerTest { @@ -52,6 +59,9 @@ class DataFileSchedulerTest { @Mock private Counter exportFileSuccess; + @Mock + private AtomicLong activeExportS3ObjectConsumers; + @Mock private DataFileLoaderFactory loaderFactory; @@ -77,8 +87,6 @@ void setup() { DataFileProgressState state = new DataFileProgressState(); state.setLoaded(0); state.setTotal(100); -// lenient().when(dataFilePartition.getProgressState()).thenReturn(Optional.of(state)); - dataFilePartition = new DataFilePartition(exportArn, bucketName, manifestKey, Optional.of(state)); // Mock Global Table Info @@ -90,7 +98,7 @@ void setup() { .sortKeyAttributeName("SK") .streamArn(streamArn) .build(); -// Map tableState = metadata; + lenient().when(tableInfoGlobalState.getProgressState()).thenReturn(Optional.of(metadata.toMap())); @@ -99,7 +107,8 @@ void setup() { lenient().when(coordinator.getPartition(exportArn)).thenReturn(Optional.of(exportInfoGlobalState)); lenient().when(exportInfoGlobalState.getProgressState()).thenReturn(Optional.of(loadStatus.toMap())); - given(pluginMetrics.counter(EXPORT_FILE_SUCCESS_COUNT)).willReturn(exportFileSuccess); + given(pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT)).willReturn(exportFileSuccess); + given(pluginMetrics.gauge(eq(ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE), any(AtomicLong.class))).willReturn(activeExportS3ObjectConsumers); lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); @@ -115,11 +124,12 @@ public void test_run_DataFileLoader_correctly() throws InterruptedException { scheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics); - ExecutorService executor = Executors.newSingleThreadExecutor(); - executor.submit(scheduler); - - // Run for a while - Thread.sleep(500); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + final Future future = executorService.submit(() -> scheduler.run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); // Should acquire data file partition verify(coordinator).acquireAvailablePartition(DataFilePartition.PARTITION_TYPE); @@ -136,7 +146,7 @@ public void test_run_DataFileLoader_correctly() throws InterruptedException { // Should update metrics. verify(exportFileSuccess).increment(); - executor.shutdownNow(); + executorService.shutdownNow(); } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java index dcd44a26aa..2a1506643f 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java @@ -43,8 +43,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_FILES_TOTAL_COUNT; -import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_JOB_ERROR_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_S3_OBJECTS_TOTAL_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_JOB_FAILURE_COUNT; import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_JOB_SUCCESS_COUNT; import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_RECORDS_TOTAL_COUNT; @@ -108,8 +108,8 @@ void setup() { when(exportPartition.getProgressState()).thenReturn(Optional.of(state)); given(pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT)).willReturn(exportJobSuccess); - given(pluginMetrics.counter(EXPORT_JOB_ERROR_COUNT)).willReturn(exportJobErrors); - given(pluginMetrics.counter(EXPORT_FILES_TOTAL_COUNT)).willReturn(exportFilesTotal); + given(pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT)).willReturn(exportJobErrors); + given(pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT)).willReturn(exportFilesTotal); given(pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT)).willReturn(exportRecordsTotal); ExportSummary summary = mock(ExportSummary.class); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java index 875fce4a85..c041323a0d 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java @@ -6,10 +6,12 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; @@ -22,14 +24,23 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler.ACTIVE_CHANGE_EVENT_CONSUMERS; @ExtendWith(MockitoExtension.class) +@Disabled class StreamSchedulerTest { @Mock @@ -51,6 +62,12 @@ class StreamSchedulerTest { @Mock private ShardConsumerFactory consumerFactory; + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private AtomicLong activeShardConsumers; + private final String tableName = UUID.randomUUID().toString(); private final String tableArn = "arn:aws:dynamodb:us-west-2:123456789012:table/" + tableName; @@ -81,6 +98,8 @@ void setup() { lenient().when(consumerFactory.createConsumer(any(StreamPartition.class))).thenReturn(() -> System.out.println("Hello")); lenient().when(shardManager.getChildShardIds(anyString(), anyString())).thenReturn(List.of(shardId)); + when(pluginMetrics.gauge(eq(ACTIVE_CHANGE_EVENT_CONSUMERS), any(AtomicLong.class))).thenReturn(activeShardConsumers); + } @@ -88,13 +107,16 @@ void setup() { public void test_normal_run() throws InterruptedException { given(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition)).willReturn(Optional.empty()); - scheduler = new StreamScheduler(coordinator, consumerFactory, shardManager); + scheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); - ExecutorService executor = Executors.newSingleThreadExecutor(); - executor.submit(scheduler); + final Future future = executorService.submit(() -> scheduler.run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); - // Need to run a while - Thread.sleep(2000); // Should acquire the stream partition verify(coordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE); // Should start a new consumer @@ -104,7 +126,6 @@ public void test_normal_run() throws InterruptedException { // Should mask the stream partition as completed. verify(coordinator).completePartition(any(StreamPartition.class)); - executor.shutdownNow(); - + executorService.shutdownNow(); } } \ No newline at end of file