From eff31fe714f951e45384b82b2db6e681fa963a21 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Wed, 6 Sep 2023 10:25:31 -0500 Subject: [PATCH] Add metrics for the opensearch source (#3304) Add metrics for the opensearch source Signed-off-by: Taylor Gray --- .../opensearch-source/build.gradle | 1 + .../source/opensearch/OpenSearchService.java | 17 +++-- .../source/opensearch/OpenSearchSource.java | 13 +++- .../OpenSearchSourcePluginMetrics.java | 50 ++++++++++++++ .../worker/NoSearchContextWorker.java | 14 +++- .../source/opensearch/worker/PitWorker.java | 16 ++++- .../opensearch/worker/ScrollWorker.java | 16 ++++- .../opensearch/OpenSearchServiceTest.java | 6 +- .../opensearch/OpenSearchSourceTest.java | 14 +++- .../worker/NoSearchContextWorkerTest.java | 51 +++++++++++++- .../opensearch/worker/PitWorkerTest.java | 66 +++++++++++++++++-- .../opensearch/worker/ScrollWorkerTest.java | 60 ++++++++++++++++- 12 files changed, 297 insertions(+), 27 deletions(-) create mode 100644 data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java diff --git a/data-prepper-plugins/opensearch-source/build.gradle b/data-prepper-plugins/opensearch-source/build.gradle index eacbd762c4..1ffb49541b 100644 --- a/data-prepper-plugins/opensearch-source/build.gradle +++ b/data-prepper-plugins/opensearch-source/build.gradle @@ -7,6 +7,7 @@ dependencies { implementation project(':data-prepper-plugins:buffer-common') implementation project(':data-prepper-plugins:aws-plugin-api') implementation 'software.amazon.awssdk:apache-client' + implementation 'io.micrometer:micrometer-core' implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2' implementation 'software.amazon.awssdk:s3' diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java index 63f6a48f28..4c124afd92 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.NoSearchContextWorker; import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier; import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker; @@ -42,6 +43,7 @@ public class OpenSearchService { private final ScheduledExecutorService scheduledExecutorService; private final BufferAccumulator> bufferAccumulator; private final AcknowledgementSetManager acknowledgementSetManager; + private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; private SearchWorker searchWorker; private ScheduledFuture searchWorkerFuture; @@ -50,11 +52,12 @@ public static OpenSearchService createOpenSearchService(final SearchAccessor sea final SourceCoordinator sourceCoordinator, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final Buffer> buffer, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { return new OpenSearchService( searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, Executors.newSingleThreadScheduledExecutor(), BufferAccumulator.create(buffer, openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(), BUFFER_TIMEOUT), - acknowledgementSetManager); + acknowledgementSetManager, openSearchSourcePluginMetrics); } private OpenSearchService(final SearchAccessor searchAccessor, @@ -63,7 +66,8 @@ private OpenSearchService(final SearchAccessor searchAccessor, final Buffer> buffer, final ScheduledExecutorService scheduledExecutorService, final BufferAccumulator> bufferAccumulator, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { this.searchAccessor = searchAccessor; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.buffer = buffer; @@ -73,18 +77,19 @@ private OpenSearchService(final SearchAccessor searchAccessor, this.scheduledExecutorService = scheduledExecutorService; this.bufferAccumulator = bufferAccumulator; this.acknowledgementSetManager = acknowledgementSetManager; + this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics; } public void start() { switch(searchAccessor.getSearchContextType()) { case POINT_IN_TIME: - searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager); + searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); break; case SCROLL: - searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager); + searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); break; case NONE: - searchWorker = new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager); + searchWorker = new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); break; default: throw new IllegalArgumentException( diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java index 430f5aa618..681e22b075 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java @@ -4,6 +4,7 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -14,6 +15,7 @@ import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchClientFactory; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy; @@ -24,6 +26,7 @@ public class OpenSearchSource implements Source>, UsesSourceCoordi private final AwsCredentialsSupplier awsCredentialsSupplier; private final OpenSearchSourceConfiguration openSearchSourceConfiguration; private final AcknowledgementSetManager acknowledgementSetManager; + private final PluginMetrics pluginMetrics; private SourceCoordinator sourceCoordinator; private OpenSearchService openSearchService; @@ -31,10 +34,12 @@ public class OpenSearchSource implements Source>, UsesSourceCoordi @DataPrepperPluginConstructor public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration, final AwsCredentialsSupplier awsCredentialsSupplier, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final PluginMetrics pluginMetrics) { this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.awsCredentialsSupplier = awsCredentialsSupplier; this.acknowledgementSetManager = acknowledgementSetManager; + this.pluginMetrics = pluginMetrics; openSearchSourceConfiguration.validateAwsConfigWithUsernameAndPassword(); } @@ -47,14 +52,16 @@ public void start(final Buffer> buffer) { startProcess(openSearchSourceConfiguration, buffer); } - private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration, final Buffer> buffer) { + private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration, + final Buffer> buffer) { final OpenSearchClientFactory openSearchClientFactory = OpenSearchClientFactory.create(awsCredentialsSupplier); + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics = OpenSearchSourcePluginMetrics.create(pluginMetrics); final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create(openSearchSourceConfiguration, openSearchClientFactory); final SearchAccessor searchAccessor = searchAccessorStrategy.getSearchAccessor(); - openSearchService = OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager); + openSearchService = OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager, openSearchSourcePluginMetrics); openSearchService.start(); } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java new file mode 100644 index 0000000000..80bc2d22a8 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.metrics; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; +import org.opensearch.dataprepper.metrics.PluginMetrics; + +public class OpenSearchSourcePluginMetrics { + + static final String DOCUMENTS_PROCESSED = "documentsProcessed"; + static final String INDICES_PROCESSED = "indicesProcessed"; + static final String INDEX_PROCESSING_TIME_ELAPSED = "indexProcessingTime"; + static final String PROCESSING_ERRORS = "processingErrors"; + + private final Counter documentsProcessedCounter; + private final Counter indicesProcessedCounter; + private final Counter processingErrorsCounter; + private final Timer indexProcessingTimeTimer; + + public static OpenSearchSourcePluginMetrics create(final PluginMetrics pluginMetrics) { + return new OpenSearchSourcePluginMetrics(pluginMetrics); + } + + private OpenSearchSourcePluginMetrics(final PluginMetrics pluginMetrics) { + documentsProcessedCounter = pluginMetrics.counter(DOCUMENTS_PROCESSED); + indicesProcessedCounter = pluginMetrics.counter(INDICES_PROCESSED); + processingErrorsCounter = pluginMetrics.counter(PROCESSING_ERRORS); + indexProcessingTimeTimer = pluginMetrics.timer(INDEX_PROCESSING_TIME_ELAPSED); + } + + public Counter getDocumentsProcessedCounter() { + return documentsProcessedCounter; + } + + public Counter getIndicesProcessedCounter() { + return indicesProcessedCounter; + } + + public Counter getProcessingErrorsCounter() { + return processingErrorsCounter; + } + + public Timer getIndexProcessingTimeTimer() { + return indexProcessingTimeTimer; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java index cacd49c921..62edea9441 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java @@ -19,6 +19,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest; @@ -48,19 +49,22 @@ public class NoSearchContextWorker implements SearchWorker, Runnable { private final BufferAccumulator> bufferAccumulator; private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; private final AcknowledgementSetManager acknowledgementSetManager; + private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; public NoSearchContextWorker(final SearchAccessor searchAccessor, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final SourceCoordinator sourceCoordinator, final BufferAccumulator> bufferAccumulator, final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { this.searchAccessor = searchAccessor; this.sourceCoordinator = sourceCoordinator; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.bufferAccumulator = bufferAccumulator; this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier; this.acknowledgementSetManager = acknowledgementSetManager; + this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics; } @Override @@ -88,11 +92,12 @@ public void run() { sourceCoordinator, indexPartition.get()); - processIndex(indexPartition.get(), acknowledgementSet.getLeft()); + openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet.getLeft())); completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(), indexPartition.get(), sourceCoordinator); + openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment(); } catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) { LOG.warn("The search_after worker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage()); sourceCoordinator.giveUpPartitions(); @@ -102,8 +107,10 @@ public void run() { } catch (final Exception e) { LOG.error("Unknown exception while processing index '{}', moving on to another index:", indexPartition.get().getPartitionKey(), e); sourceCoordinator.giveUpPartitions(); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); } } catch (final Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Received an exception while trying to get index to process with search_after, backing off and retrying", e); try { Thread.sleep(STANDARD_BACKOFF_MILLIS); @@ -143,7 +150,9 @@ private void processIndex(final SourcePartition op acknowledgementSet.add(record.getData()); } bufferAccumulator.add(record); + openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment(); } catch (Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}", record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME), record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage()); @@ -157,6 +166,7 @@ private void processIndex(final SourcePartition op try { bufferAccumulator.flush(); } catch (final Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed writing remaining OpenSearch documents to buffer due to: {}", e.getMessage()); } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index 4162fedd01..0904a018ec 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; @@ -63,19 +64,22 @@ public class PitWorker implements SearchWorker, Runnable { private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; private final AcknowledgementSetManager acknowledgementSetManager; + private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; public PitWorker(final SearchAccessor searchAccessor, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final SourceCoordinator sourceCoordinator, final BufferAccumulator> bufferAccumulator, final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { this.searchAccessor = searchAccessor; this.sourceCoordinator = sourceCoordinator; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.bufferAccumulator = bufferAccumulator; this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier; this.acknowledgementSetManager = acknowledgementSetManager; + this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics; } @Override @@ -102,10 +106,12 @@ public void run() { sourceCoordinator, indexPartition.get()); - processIndex(indexPartition.get(), acknowledgementSet.getLeft()); + openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet.getLeft())); completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(), indexPartition.get(), sourceCoordinator); + + openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment(); } catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) { LOG.warn("PitWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage()); sourceCoordinator.giveUpPartitions(); @@ -113,6 +119,7 @@ public void run() { LOG.warn("Received SearchContextLimitExceeded exception for index {}. Giving up index and waiting {} seconds before retrying: {}", indexPartition.get().getPartitionKey(), BACKOFF_ON_PIT_LIMIT_REACHED.getSeconds(), e.getMessage()); sourceCoordinator.giveUpPartitions(); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); try { Thread.sleep(BACKOFF_ON_PIT_LIMIT_REACHED.toMillis()); } catch (final InterruptedException ex) { @@ -124,9 +131,11 @@ public void run() { } catch (final RuntimeException e) { LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e); sourceCoordinator.giveUpPartitions(); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); } } catch (final Exception e) { LOG.error("Received an exception while trying to get index to process with PIT, backing off and retrying", e); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); try { Thread.sleep(STANDARD_BACKOFF_MILLIS); } catch (final InterruptedException ex) { @@ -180,7 +189,9 @@ private void processIndex(final SourcePartition op acknowledgementSet.add(record.getData()); } bufferAccumulator.add(record); + openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment(); } catch (Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}", record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME), record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage()); @@ -195,6 +206,7 @@ private void processIndex(final SourcePartition op try { bufferAccumulator.flush(); } catch (final Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed flushing remaining OpenSearch documents to buffer due to: {}", e.getMessage()); } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java index 0b53b7e006..ecd878220b 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; @@ -55,19 +56,22 @@ public class ScrollWorker implements SearchWorker { private final BufferAccumulator> bufferAccumulator; private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; private final AcknowledgementSetManager acknowledgementSetManager; + private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; public ScrollWorker(final SearchAccessor searchAccessor, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final SourceCoordinator sourceCoordinator, final BufferAccumulator> bufferAccumulator, final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { this.searchAccessor = searchAccessor; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.sourceCoordinator = sourceCoordinator; this.bufferAccumulator = bufferAccumulator; this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier; this.acknowledgementSetManager = acknowledgementSetManager; + this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics; } @Override @@ -95,10 +99,12 @@ public void run() { sourceCoordinator, indexPartition.get()); - processIndex(indexPartition.get(), acknowledgementSet.getLeft()); + openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet.getLeft())); completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(), indexPartition.get(), sourceCoordinator); + + openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment(); } catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) { LOG.warn("ScrollWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage()); sourceCoordinator.giveUpPartitions(); @@ -106,6 +112,7 @@ public void run() { LOG.warn("Received SearchContextLimitExceeded exception for index {}. Giving up index and waiting {} seconds before retrying: {}", indexPartition.get().getPartitionKey(), BACKOFF_ON_SCROLL_LIMIT_REACHED.getSeconds(), e.getMessage()); sourceCoordinator.giveUpPartitions(); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); try { Thread.sleep(BACKOFF_ON_SCROLL_LIMIT_REACHED.toMillis()); } catch (final InterruptedException ex) { @@ -117,9 +124,11 @@ public void run() { } catch (final RuntimeException e) { LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e); sourceCoordinator.giveUpPartitions(); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); } } catch (final Exception e) { LOG.error("Received an exception while trying to get index to process with scroll, backing off and retrying", e); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); try { Thread.sleep(STANDARD_BACKOFF_MILLIS); } catch (final InterruptedException ex) { @@ -168,6 +177,7 @@ private void processIndex(final SourcePartition op try { bufferAccumulator.flush(); } catch (final Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed flushing remaining OpenSearch documents to buffer due to: {}", e.getMessage()); } } @@ -180,7 +190,9 @@ private void writeDocumentsToBuffer(final List documents, acknowledgementSet.add(record.getData()); } bufferAccumulator.add(record); + openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment(); } catch (Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}", record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME), record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage()); diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchServiceTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchServiceTest.java index 0e0566e2ba..731ee07e50 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchServiceTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchServiceTest.java @@ -22,6 +22,7 @@ import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.NoSearchContextWorker; import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier; import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker; @@ -65,6 +66,9 @@ public class OpenSearchServiceTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + @Mock private SourceCoordinator sourceCoordinator; @@ -93,7 +97,7 @@ private OpenSearchService createObjectUnderTest() { })) { executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor).thenReturn(scheduledExecutorService); bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(), BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - return OpenSearchService.createOpenSearchService(openSearchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager); + return OpenSearchService.createOpenSearchService(openSearchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager, openSearchSourcePluginMetrics); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceTest.java index c48f552bf7..4ed020a173 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceTest.java @@ -11,11 +11,13 @@ import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchClientFactory; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy; @@ -52,11 +54,17 @@ public class OpenSearchSourceTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + @Mock private SourceCoordinator sourceCoordinator; private OpenSearchSource createObjectUnderTest() { - return new OpenSearchSource(openSearchSourceConfiguration, awsCredentialsSupplier, acknowledgementSetManager); + return new OpenSearchSource(openSearchSourceConfiguration, awsCredentialsSupplier, acknowledgementSetManager, pluginMetrics); } @Test @@ -75,11 +83,13 @@ void start_with_non_null_buffer_does_not_throw() { try (final MockedStatic searchAccessorStrategyMockedStatic = mockStatic(SearchAccessorStrategy.class); final MockedStatic openSearchClientFactoryMockedStatic = mockStatic(OpenSearchClientFactory.class); + final MockedStatic openSearchSourcePluginMetricsMockedStatic = mockStatic(OpenSearchSourcePluginMetrics.class); final MockedStatic openSearchServiceMockedStatic = mockStatic(OpenSearchService.class)) { openSearchClientFactoryMockedStatic.when(() -> OpenSearchClientFactory.create(awsCredentialsSupplier)).thenReturn(openSearchClientFactory); searchAccessorStrategyMockedStatic.when(() -> SearchAccessorStrategy.create(openSearchSourceConfiguration, openSearchClientFactory)).thenReturn(searchAccessorStrategy); + openSearchSourcePluginMetricsMockedStatic.when(() -> OpenSearchSourcePluginMetrics.create(pluginMetrics)).thenReturn(openSearchSourcePluginMetrics); - openSearchServiceMockedStatic.when(() -> OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager)) + openSearchServiceMockedStatic.when(() -> OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager, openSearchSourcePluginMetrics)) .thenReturn(openSearchService); objectUnderTest.start(buffer); diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java index 0b51ae2e0f..12eb0223d2 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -22,6 +24,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest; @@ -53,6 +56,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS; @@ -77,16 +81,35 @@ public class NoSearchContextWorkerTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock(lenient = true) + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + + @Mock + private Counter documentsProcessedCounter; + + @Mock + private Counter indicesProcessedCounter; + + @Mock + private Counter processingErrorsCounter; + + @Mock + private Timer indexProcessingTimeTimer; + private ExecutorService executorService; @BeforeEach void setup() { executorService = Executors.newSingleThreadExecutor(); lenient().when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(false); + when(openSearchSourcePluginMetrics.getDocumentsProcessedCounter()).thenReturn(documentsProcessedCounter); + when(openSearchSourcePluginMetrics.getIndicesProcessedCounter()).thenReturn(indicesProcessedCounter); + when(openSearchSourcePluginMetrics.getProcessingErrorsCounter()).thenReturn(processingErrorsCounter); + when(openSearchSourcePluginMetrics.getIndexProcessingTimeTimer()).thenReturn(indexProcessingTimeTimer); } private NoSearchContextWorker createObjectUnderTest() { - return new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager); + return new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); } @Test @@ -104,7 +127,8 @@ void run_with_getNextPartition_returning_empty_will_sleep_and_exit_when_interrup } @Test - void run_when_search_without_search_context_throws_index_not_found_exception_completes_the_partition() throws InterruptedException { + void run_when_search_without_search_context_throws_index_not_found_exception_completes_the_partition() throws Exception { + mockTimerCallable(); final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); @@ -131,10 +155,16 @@ void run_when_search_without_search_context_throws_index_not_found_exception_com verify(sourceCoordinator).completePartition(partitionKey); verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + + verifyNoInteractions(documentsProcessedCounter); + verifyNoInteractions(indicesProcessedCounter); + verifyNoInteractions(processingErrorsCounter); } @Test void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_that_partition() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -188,10 +218,16 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha assertThat(noSearchContextSearchRequests.get(1).getIndex(), equalTo(partitionKey)); assertThat(noSearchContextSearchRequests.get(1).getPaginationSize(), equalTo(2)); assertThat(noSearchContextSearchRequests.get(1).getSearchAfter(), equalTo(searchWithSearchAfterResults.getNextSearchAfter())); + + verify(documentsProcessedCounter, times(3)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); } @Test void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_partition() throws Exception { + mockTimerCallable(); + final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); AtomicReference numEventsAdded = new AtomicReference<>(0); doAnswer(a -> { @@ -261,5 +297,16 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa assertThat(noSearchContextSearchRequests.get(1).getSearchAfter(), equalTo(searchWithSearchAfterResults.getNextSearchAfter())); verify(acknowledgementSet).complete(); + + verify(documentsProcessedCounter, times(3)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); + } + + private void mockTimerCallable() { + doAnswer(a -> { + a.getArgument(0).run(); + return null; + }).when(indexProcessingTimeTimer).record(any(Runnable.class)); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java index eac270e3d2..718dec6bb5 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -22,6 +24,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; @@ -84,16 +87,35 @@ public class PitWorkerTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock(lenient = true) + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + + @Mock + private Counter documentsProcessedCounter; + + @Mock + private Counter indicesProcessedCounter; + + @Mock + private Counter processingErrorsCounter; + + @Mock + private Timer indexProcessingTimeTimer; + private ExecutorService executorService; @BeforeEach void setup() { executorService = Executors.newSingleThreadExecutor(); lenient().when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(false); + when(openSearchSourcePluginMetrics.getDocumentsProcessedCounter()).thenReturn(documentsProcessedCounter); + when(openSearchSourcePluginMetrics.getIndicesProcessedCounter()).thenReturn(indicesProcessedCounter); + when(openSearchSourcePluginMetrics.getProcessingErrorsCounter()).thenReturn(processingErrorsCounter); + when(openSearchSourcePluginMetrics.getIndexProcessingTimeTimer()).thenReturn(indexProcessingTimeTimer); } private PitWorker createObjectUnderTest() { - return new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager); + return new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); } @Test @@ -112,6 +134,8 @@ void run_with_getNextPartition_returning_empty_will_sleep_and_exit_when_interrup @Test void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_and_closes_that_partition() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -188,10 +212,15 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ assertThat(deletePointInTimeRequest.getPitId(), equalTo(pitId)); verifyNoInteractions(acknowledgementSetManager); + + verify(documentsProcessedCounter, times(3)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); } @Test void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_partition() throws Exception { + mockTimerCallable(); final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); AtomicReference numEventsAdded = new AtomicReference<>(0); @@ -283,10 +312,16 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa assertThat(deletePointInTimeRequest.getPitId(), equalTo(pitId)); verify(acknowledgementSet).complete(); + + verify(documentsProcessedCounter, times(3)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); } @Test void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create_another_point_in_time() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -342,10 +377,16 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create verify(searchAccessor, never()).createPit(any(CreatePointInTimeRequest.class)); verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class)); verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(openSearchIndexProgressState)); + + verify(documentsProcessedCounter, times(3)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); } @Test - void run_gives_up_partitions_and_waits_when_createPit_throws_SearchContextLimitException() throws InterruptedException { + void run_gives_up_partitions_and_waits_when_createPit_throws_SearchContextLimitException() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -353,7 +394,7 @@ void run_gives_up_partitions_and_waits_when_createPit_throws_SearchContextLimitE when(searchAccessor.createPit(any(CreatePointInTimeRequest.class))).thenThrow(SearchContextLimitException.class); - when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)); + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); final Future future = executorService.submit(() -> createObjectUnderTest().run()); @@ -367,10 +408,16 @@ void run_gives_up_partitions_and_waits_when_createPit_throws_SearchContextLimitE verify(searchAccessor, never()).deletePit(any(DeletePointInTimeRequest.class)); verify(sourceCoordinator).giveUpPartitions(); verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + + verifyNoInteractions(documentsProcessedCounter); + verifyNoInteractions(indicesProcessedCounter); + verify(processingErrorsCounter).increment(); } @Test - void run_completes_partitions_when_createPit_throws_IndexNotFoundException() throws InterruptedException { + void run_completes_partitions_when_createPit_throws_IndexNotFoundException() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -393,5 +440,16 @@ void run_completes_partitions_when_createPit_throws_IndexNotFoundException() thr verify(searchAccessor, never()).deletePit(any(DeletePointInTimeRequest.class)); verify(sourceCoordinator).completePartition(partitionKey); verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + + verifyNoInteractions(documentsProcessedCounter); + verifyNoInteractions(indicesProcessedCounter); + verifyNoInteractions(processingErrorsCounter); + } + + private void mockTimerCallable() { + doAnswer(a -> { + a.getArgument(0).run(); + return null; + }).when(indexProcessingTimeTimer).record(any(Runnable.class)); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java index b28c140af6..f97838b187 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -22,6 +24,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; @@ -56,6 +59,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker.SCROLL_TIME_PER_BATCH; @@ -82,16 +86,35 @@ public class ScrollWorkerTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock(lenient = true) + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + + @Mock + private Counter documentsProcessedCounter; + + @Mock + private Counter indicesProcessedCounter; + + @Mock + private Counter processingErrorsCounter; + + @Mock + private Timer indexProcessingTimeTimer; + private ExecutorService executorService; @BeforeEach void setup() { executorService = Executors.newSingleThreadExecutor(); lenient().when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(false); + when(openSearchSourcePluginMetrics.getDocumentsProcessedCounter()).thenReturn(documentsProcessedCounter); + when(openSearchSourcePluginMetrics.getIndicesProcessedCounter()).thenReturn(indicesProcessedCounter); + when(openSearchSourcePluginMetrics.getProcessingErrorsCounter()).thenReturn(processingErrorsCounter); + when(openSearchSourcePluginMetrics.getIndexProcessingTimeTimer()).thenReturn(indexProcessingTimeTimer); } private ScrollWorker createObjectUnderTest() { - return new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager); + return new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); } @Test @@ -110,6 +133,8 @@ void run_with_getNextPartition_returning_empty_will_sleep_and_exit_when_interrup @Test void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scroll_and_closes_that_partition() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -181,10 +206,16 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro final DeleteScrollRequest deleteScrollRequest = deleteRequestArgumentCaptor.getValue(); assertThat(deleteScrollRequest, notNullValue()); assertThat(deleteScrollRequest.getScrollId(), equalTo(scrollId)); + + verify(documentsProcessedCounter, times(5)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); } @Test void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_and_closes_that_partition() throws Exception { + mockTimerCallable(); + final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); AtomicReference numEventsAdded = new AtomicReference<>(0); doAnswer(a -> { @@ -272,10 +303,16 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a assertThat(deleteScrollRequest.getScrollId(), equalTo(scrollId)); verify(acknowledgementSet).complete(); + + verify(documentsProcessedCounter, times(5)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); } @Test - void run_gives_up_partitions_and_waits_when_createScroll_throws_SearchContextLimitException() throws InterruptedException { + void run_gives_up_partitions_and_waits_when_createScroll_throws_SearchContextLimitException() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -300,11 +337,17 @@ void run_gives_up_partitions_and_waits_when_createScroll_throws_SearchContextLim verifyNoMoreInteractions(searchAccessor); verify(sourceCoordinator).giveUpPartitions(); verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + + verifyNoInteractions(documentsProcessedCounter); + verifyNoInteractions(indicesProcessedCounter); + verify(processingErrorsCounter).increment(); } @Test - void run_completes_partitions_createScroll_throws_IndexNotFoundException() throws InterruptedException { + void run_completes_partitions_createScroll_throws_IndexNotFoundException() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -330,6 +373,17 @@ void run_completes_partitions_createScroll_throws_IndexNotFoundException() throw verifyNoMoreInteractions(searchAccessor); verify(sourceCoordinator).completePartition(partitionKey); verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + + verifyNoInteractions(documentsProcessedCounter); + verifyNoInteractions(indicesProcessedCounter); + verifyNoInteractions(processingErrorsCounter); + } + + private void mockTimerCallable() { + doAnswer(a -> { + a.getArgument(0).run(); + return null; + }).when(indexProcessingTimeTimer).record(any(Runnable.class)); } }