diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java index f8ad3479a8..981b4ee384 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java @@ -35,7 +35,7 @@ public abstract class AbstractBuffer> implements Buffer { private final Timer readTimer; private final Timer checkpointTimer; - public AbstractBuffer(final PluginSetting pluginSetting) { + AbstractBuffer(final PluginSetting pluginSetting) { this(PluginMetrics.fromPluginSetting(pluginSetting), pluginSetting.getPipelineName()); } diff --git a/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBuffer.java b/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBuffer.java index fb67ccf543..977a1298f0 100644 --- a/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBuffer.java +++ b/data-prepper-plugins/blocking-buffer/src/main/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBuffer.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.buffer.blockingbuffer; +import com.google.common.base.Stopwatch; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.buffer.AbstractBuffer; @@ -12,8 +14,6 @@ import org.opensearch.dataprepper.model.buffer.SizeOverflowException; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.record.Record; -import com.google.common.base.Stopwatch; -import com.google.common.util.concurrent.AtomicDouble; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +50,9 @@ public class BlockingBuffer> extends AbstractBuffer { private static final String ATTRIBUTE_BATCH_SIZE = "batch_size"; private static final String BLOCKING_BUFFER = "BlockingBuffer"; private static final String BUFFER_USAGE_METRIC = "bufferUsage"; + public static final String CAPACITY_USED_METRIC = "capacityUsed"; private final int bufferCapacity; private final int batchSize; - private final AtomicDouble bufferUsage; private final BlockingQueue blockingQueue; private final String pipelineName; @@ -67,12 +67,16 @@ public class BlockingBuffer> extends AbstractBuffer { */ public BlockingBuffer(final int bufferCapacity, final int batchSize, final String pipelineName) { super(BLOCKING_BUFFER, pipelineName); - bufferUsage = pluginMetrics.gauge(BUFFER_USAGE_METRIC, new AtomicDouble()); this.bufferCapacity = bufferCapacity; this.batchSize = batchSize; this.blockingQueue = new LinkedBlockingQueue<>(bufferCapacity); this.capacitySemaphore = new Semaphore(bufferCapacity); this.pipelineName = pipelineName; + + PluginMetrics pluginMetrics = PluginMetrics.fromNames(BLOCKING_BUFFER, pipelineName); + + pluginMetrics.gauge(CAPACITY_USED_METRIC, capacitySemaphore, capacity -> bufferCapacity - capacity.availablePermits()); + pluginMetrics.gauge(BUFFER_USAGE_METRIC, capacitySemaphore, capacity -> ((double) bufferCapacity - capacity.availablePermits()) / bufferCapacity * 100); } /** @@ -196,15 +200,6 @@ public static PluginSetting getDefaultPluginSettings() { return new PluginSetting(PLUGIN_NAME, settings); } - @Override - public void postProcess(final Long recordsInBuffer) { - // adding bounds to address race conditions and reporting negative buffer usage - final Double nonNegativeTotalRecords = recordsInBuffer.doubleValue() < 0 ? 0 : recordsInBuffer.doubleValue(); - final Double boundedTotalRecords = nonNegativeTotalRecords > bufferCapacity ? bufferCapacity : nonNegativeTotalRecords; - final Double usage = boundedTotalRecords / bufferCapacity * 100; - bufferUsage.set(usage); - } - @Override public void doCheckpoint(final CheckpointState checkpointState) { final int numCheckedRecords = checkpointState.getNumRecordsToBeChecked(); diff --git a/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java b/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java index c26ade3420..194c810ec4 100644 --- a/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java +++ b/data-prepper-plugins/blocking-buffer/src/test/java/org/opensearch/dataprepper/plugins/buffer/blockingbuffer/BlockingBufferTests.java @@ -217,6 +217,8 @@ public void testBatchRead(final int readTimeout) throws Exception { assertThat(record.getData(), equalTo("TEST" + i)); i++; } + verifyBufferUsageMetric(38.46153846153847); + blockingBuffer.checkpoint(partialReadResult.getValue()); verifyBufferUsageMetric(15.384615384615385); final Map.Entry>, CheckpointState> finalReadResult = blockingBuffer.read(readTimeout); final Collection> finalBatch = finalReadResult.getKey(); @@ -227,6 +229,8 @@ public void testBatchRead(final int readTimeout) throws Exception { assertThat(record.getData(), equalTo("TEST" + i)); i++; } + verifyBufferUsageMetric(15.384615384615385); + blockingBuffer.checkpoint(finalReadResult.getValue()); verifyBufferUsageMetric(0.0); }