diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java index 598fbbf218..f8ad3479a8 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java @@ -71,8 +71,10 @@ public void write(T record, int timeoutInMillis) throws TimeoutException { try { doWrite(record, timeoutInMillis); - recordsWrittenCounter.increment(); - recordsInBuffer.incrementAndGet(); + if (!isByteBuffer()) { + recordsWrittenCounter.increment(); + recordsInBuffer.incrementAndGet(); + } postProcess(recordsInBuffer.get()); } catch (TimeoutException e) { recordsWriteFailed.increment(); @@ -98,8 +100,11 @@ public void writeAll(Collection records, int timeoutInMillis) throws Exceptio final int size = records.size(); try { doWriteAll(records, timeoutInMillis); - recordsWrittenCounter.increment(size); - recordsInBuffer.addAndGet(size); + // we do not know how many records when the buffer is bytebuffer + if (!isByteBuffer()) { + recordsWrittenCounter.increment(size); + recordsInBuffer.addAndGet(size); + } postProcess(recordsInBuffer.get()); } catch (Exception e) { recordsWriteFailed.increment(size); @@ -127,9 +132,12 @@ public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis @Override public Map.Entry, CheckpointState> read(int timeoutInMillis) { final Map.Entry, CheckpointState> readResult = readTimer.record(() -> doRead(timeoutInMillis)); - recordsReadCounter.increment(readResult.getKey().size() * 1.0); - recordsInFlight.addAndGet(readResult.getValue().getNumRecordsToBeChecked()); - recordsInBuffer.addAndGet(-1 * readResult.getValue().getNumRecordsToBeChecked()); + // we do not know how many records when the buffer is bytebuffer + if (!isByteBuffer()) { + recordsReadCounter.increment(readResult.getKey().size() * 1.0); + recordsInFlight.addAndGet(readResult.getValue().getNumRecordsToBeChecked()); + recordsInBuffer.addAndGet(-1 * readResult.getValue().getNumRecordsToBeChecked()); + } postProcess(recordsInBuffer.get()); return readResult; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index ef86b558d8..9e1ce4b9b7 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -117,7 +117,7 @@ public void doWriteAll(Collection> records, int timeoutInMillis) t @Override public Map.Entry>, CheckpointState> doRead(int timeoutInMillis) { - return innerBuffer.doRead(timeoutInMillis); + return innerBuffer.read(timeoutInMillis); } @Override diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java index 7b7f910bdb..d805512cd4 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -205,7 +205,7 @@ void test_kafkaBuffer_basicFunctionality() throws TimeoutException { kafkaBuffer.doWrite(record, 10000); kafkaBuffer.doRead(10000); verify(producer).produceRecords(record); - verify(blockingBuffer).doRead(anyInt()); + verify(blockingBuffer).read(anyInt()); } @Test