Skip to content

Commit

Permalink
report uncompressed message size when it does not need compression (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang authored Oct 7, 2024
1 parent 1ded8af commit 2aae68e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 16 deletions.
3 changes: 2 additions & 1 deletion core/src/main/java/io/grpc/internal/MessageDeframer.java
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ private void processBody() {
// There is no reliable way to get the uncompressed size per message when it's compressed,
// because the uncompressed bytes are provided through an InputStream whose total size is
// unknown until all bytes are read, and we don't know when it happens.
statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize, -1);
statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize,
(compressedFlag || fullStreamDecompressor != null) ? -1 : inboundBodyWireSize);
inboundBodyWireSize = 0;
InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
nextFrame.touch();
Expand Down
33 changes: 19 additions & 14 deletions core/src/test/java/io/grpc/internal/MessageDeframerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void simplePayload() {
assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock, 2, 2);
checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 2, 2);
}

@Test
Expand All @@ -148,7 +148,7 @@ public void smallCombinedPayloads() {
verify(listener, atLeastOnce()).bytesRead(anyInt());
assertEquals(Bytes.asList(new byte[]{14, 15}), bytes(streams.get(1).next()));
verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1, 2, 2);
checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1, 2, 2);
}

@Test
Expand All @@ -162,7 +162,7 @@ public void endOfStreamWithPayloadShouldNotifyEndOfStream() {
verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1);
checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1);
}

@Test
Expand All @@ -177,7 +177,7 @@ public void endOfStreamShouldNotifyEndOfStream() {
}
verify(listener).deframerClosed(false);
verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock);
checkStats(tracer, transportTracer.getStats(), fakeClock, false);
}

@Test
Expand All @@ -189,7 +189,7 @@ public void endOfStreamWithPartialMessageShouldNotifyDeframerClosedWithPartialMe
verify(listener, atLeastOnce()).bytesRead(anyInt());
verify(listener).deframerClosed(true);
verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock);
checkStats(tracer, transportTracer.getStats(), fakeClock, false);
}

@Test
Expand All @@ -206,7 +206,7 @@ public void endOfStreamWithInvalidGzipBlockShouldNotifyDeframerClosedWithPartial
deframer.closeWhenComplete();
verify(listener).deframerClosed(true);
verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock);
checkStats(tracer, transportTracer.getStats(), fakeClock, false);
}

@Test
Expand All @@ -228,10 +228,11 @@ public void payloadSplitBetweenBuffers() {
tracer,
transportTracer.getStats(),
fakeClock,
true,
7 /* msg size */ + 2 /* second buffer adds two bytes of overhead in deflate block */,
7);
} else {
checkStats(tracer, transportTracer.getStats(), fakeClock, 7, 7);
checkStats(tracer, transportTracer.getStats(), fakeClock, false, 7, 7);
}
}

Expand All @@ -248,7 +249,7 @@ public void frameHeaderSplitBetweenBuffers() {
assertEquals(Bytes.asList(new byte[]{3}), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1);
checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1);
}

@Test
Expand All @@ -259,7 +260,7 @@ public void emptyPayload() {
assertEquals(Bytes.asList(), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock, 0, 0);
checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 0, 0);
}

@Test
Expand All @@ -273,9 +274,10 @@ public void largerFrameSize() {
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
if (useGzipInflatingBuffer) {
checkStats(tracer, transportTracer.getStats(), fakeClock, 8 /* compressed size */, 1000);
checkStats(tracer, transportTracer.getStats(), fakeClock,true,
8 /* compressed size */, 1000);
} else {
checkStats(tracer, transportTracer.getStats(), fakeClock, 1000, 1000);
checkStats(tracer, transportTracer.getStats(), fakeClock, false, 1000, 1000);
}
}

Expand All @@ -292,7 +294,7 @@ public void endOfStreamCallbackShouldWaitForMessageDelivery() {
verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener);
checkStats(tracer, transportTracer.getStats(), fakeClock, 1, 1);
checkStats(tracer, transportTracer.getStats(), fakeClock, useGzipInflatingBuffer, 1, 1);
}

@Test
Expand All @@ -308,6 +310,7 @@ public void compressed() {
verify(listener).messagesAvailable(producer.capture());
assertEquals(Bytes.asList(new byte[1000]), bytes(producer.getValue().next()));
verify(listener, atLeastOnce()).bytesRead(anyInt());
checkStats(tracer, transportTracer.getStats(), fakeClock, true, 29, 1000);
verifyNoMoreInteractions(listener);
}

Expand Down Expand Up @@ -502,15 +505,17 @@ public void sizeEnforcingInputStream_markReset() throws IOException {
* @param sizes in the format {wire0, uncompressed0, wire1, uncompressed1, ...}
*/
private static void checkStats(
TestBaseStreamTracer tracer, TransportStats transportStats, FakeClock clock, long... sizes) {
TestBaseStreamTracer tracer, TransportStats transportStats, FakeClock clock,
boolean compressed, long... sizes) {
assertEquals(0, sizes.length % 2);
int count = sizes.length / 2;
long expectedWireSize = 0;
long expectedUncompressedSize = 0;
for (int i = 0; i < count; i++) {
assertEquals("inboundMessage(" + i + ")", tracer.nextInboundEvent());
assertEquals(
String.format(Locale.US, "inboundMessageRead(%d, %d, -1)", i, sizes[i * 2]),
String.format(Locale.US, "inboundMessageRead(%d, %d, %d)", i, sizes[i * 2],
compressed ? -1 : sizes[i * 2 + 1]),
tracer.nextInboundEvent());
expectedWireSize += sizes[i * 2];
expectedUncompressedSize += sizes[i * 2 + 1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ public void outboundMessageSent(
@Override
public void inboundMessageRead(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
//TODO(yifeizhuang): needs support from message deframer.
if (optionalWireSize != optionalUncompressedSize) {
recordInboundCompressedMessage(span, seqNo, optionalWireSize);
}
Expand Down

0 comments on commit 2aae68e

Please sign in to comment.