Skip to content

Commit

Permalink
Fix kafka buffer metrics (opensearch-project#3805)
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Dec 6, 2023
1 parent ad62f74 commit 824b72b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ public void write(T record, int timeoutInMillis) throws TimeoutException {

try {
doWrite(record, timeoutInMillis);
recordsWrittenCounter.increment();
recordsInBuffer.incrementAndGet();
if (!isByteBuffer()) {
recordsWrittenCounter.increment();
recordsInBuffer.incrementAndGet();
}
postProcess(recordsInBuffer.get());
} catch (TimeoutException e) {
recordsWriteFailed.increment();
Expand All @@ -98,8 +100,11 @@ public void writeAll(Collection<T> records, int timeoutInMillis) throws Exceptio
final int size = records.size();
try {
doWriteAll(records, timeoutInMillis);
recordsWrittenCounter.increment(size);
recordsInBuffer.addAndGet(size);
// we do not know how many records when the buffer is bytebuffer
if (!isByteBuffer()) {
recordsWrittenCounter.increment(size);
recordsInBuffer.addAndGet(size);
}
postProcess(recordsInBuffer.get());
} catch (Exception e) {
recordsWriteFailed.increment(size);
Expand Down Expand Up @@ -127,9 +132,12 @@ public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis
@Override
public Map.Entry<Collection<T>, CheckpointState> read(int timeoutInMillis) {
final Map.Entry<Collection<T>, CheckpointState> readResult = readTimer.record(() -> doRead(timeoutInMillis));
recordsReadCounter.increment(readResult.getKey().size() * 1.0);
recordsInFlight.addAndGet(readResult.getValue().getNumRecordsToBeChecked());
recordsInBuffer.addAndGet(-1 * readResult.getValue().getNumRecordsToBeChecked());
// we do not know how many records when the buffer is bytebuffer
if (!isByteBuffer()) {
recordsReadCounter.increment(readResult.getKey().size() * 1.0);
recordsInFlight.addAndGet(readResult.getValue().getNumRecordsToBeChecked());
recordsInBuffer.addAndGet(-1 * readResult.getValue().getNumRecordsToBeChecked());
}
postProcess(recordsInBuffer.get());
return readResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void doWriteAll(Collection<Record<Event>> records, int timeoutInMillis) t

@Override
public Map.Entry<Collection<Record<Event>>, CheckpointState> doRead(int timeoutInMillis) {
return innerBuffer.doRead(timeoutInMillis);
return innerBuffer.read(timeoutInMillis);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ void test_kafkaBuffer_basicFunctionality() throws TimeoutException {
kafkaBuffer.doWrite(record, 10000);
kafkaBuffer.doRead(10000);
verify(producer).produceRecords(record);
verify(blockingBuffer).doRead(anyInt());
verify(blockingBuffer).read(anyInt());
}

@Test
Expand Down

0 comments on commit 824b72b

Please sign in to comment.