Skip to content

Commit

Permalink
Corrects the bufferUsage metric by making it equal to the difference …
Browse files Browse the repository at this point in the history
…between the bufferCapacity and the available permits in the semaphore. Adds a new capacityUsed metric which tracks the actual capacity used by the semaphore which blocks. Resolves opensearch-project#3936. (opensearch-project#3937)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Jan 10, 2024
1 parent fbc6fab commit d61b0c5
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public abstract class AbstractBuffer<T extends Record<?>> implements Buffer<T> {
private final Timer readTimer;
private final Timer checkpointTimer;

public AbstractBuffer(final PluginSetting pluginSetting) {
AbstractBuffer(final PluginSetting pluginSetting) {
this(PluginMetrics.fromPluginSetting(pluginSetting), pluginSetting.getPipelineName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@

package org.opensearch.dataprepper.plugins.buffer.blockingbuffer;

import com.google.common.base.Stopwatch;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.buffer.AbstractBuffer;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.AtomicDouble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -50,9 +50,9 @@ public class BlockingBuffer<T extends Record<?>> extends AbstractBuffer<T> {
private static final String ATTRIBUTE_BATCH_SIZE = "batch_size";
private static final String BLOCKING_BUFFER = "BlockingBuffer";
private static final String BUFFER_USAGE_METRIC = "bufferUsage";
public static final String CAPACITY_USED_METRIC = "capacityUsed";
private final int bufferCapacity;
private final int batchSize;
private final AtomicDouble bufferUsage;
private final BlockingQueue<T> blockingQueue;
private final String pipelineName;

Expand All @@ -67,12 +67,16 @@ public class BlockingBuffer<T extends Record<?>> extends AbstractBuffer<T> {
*/
public BlockingBuffer(final int bufferCapacity, final int batchSize, final String pipelineName) {
super(BLOCKING_BUFFER, pipelineName);
bufferUsage = pluginMetrics.gauge(BUFFER_USAGE_METRIC, new AtomicDouble());
this.bufferCapacity = bufferCapacity;
this.batchSize = batchSize;
this.blockingQueue = new LinkedBlockingQueue<>(bufferCapacity);
this.capacitySemaphore = new Semaphore(bufferCapacity);
this.pipelineName = pipelineName;

PluginMetrics pluginMetrics = PluginMetrics.fromNames(BLOCKING_BUFFER, pipelineName);

pluginMetrics.gauge(CAPACITY_USED_METRIC, capacitySemaphore, capacity -> bufferCapacity - capacity.availablePermits());
pluginMetrics.gauge(BUFFER_USAGE_METRIC, capacitySemaphore, capacity -> ((double) bufferCapacity - capacity.availablePermits()) / bufferCapacity * 100);
}

/**
Expand Down Expand Up @@ -196,15 +200,6 @@ public static PluginSetting getDefaultPluginSettings() {
return new PluginSetting(PLUGIN_NAME, settings);
}

@Override
public void postProcess(final Long recordsInBuffer) {
// adding bounds to address race conditions and reporting negative buffer usage
final Double nonNegativeTotalRecords = recordsInBuffer.doubleValue() < 0 ? 0 : recordsInBuffer.doubleValue();
final Double boundedTotalRecords = nonNegativeTotalRecords > bufferCapacity ? bufferCapacity : nonNegativeTotalRecords;
final Double usage = boundedTotalRecords / bufferCapacity * 100;
bufferUsage.set(usage);
}

@Override
public void doCheckpoint(final CheckpointState checkpointState) {
final int numCheckedRecords = checkpointState.getNumRecordsToBeChecked();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ public void testBatchRead(final int readTimeout) throws Exception {
assertThat(record.getData(), equalTo("TEST" + i));
i++;
}
verifyBufferUsageMetric(38.46153846153847);
blockingBuffer.checkpoint(partialReadResult.getValue());
verifyBufferUsageMetric(15.384615384615385);
final Map.Entry<Collection<Record<String>>, CheckpointState> finalReadResult = blockingBuffer.read(readTimeout);
final Collection<Record<String>> finalBatch = finalReadResult.getKey();
Expand All @@ -227,6 +229,8 @@ public void testBatchRead(final int readTimeout) throws Exception {
assertThat(record.getData(), equalTo("TEST" + i));
i++;
}
verifyBufferUsageMetric(15.384615384615385);
blockingBuffer.checkpoint(finalReadResult.getValue());
verifyBufferUsageMetric(0.0);
}

Expand Down

0 comments on commit d61b0c5

Please sign in to comment.