Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Inprocess memory leak #11406

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion census/src/test/java/io/grpc/census/CensusModulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.KnownLength;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
Expand Down Expand Up @@ -99,6 +100,7 @@
import io.opencensus.trace.Tracer;
import io.opencensus.trace.propagation.BinaryFormat;
import io.opencensus.trace.propagation.SpanContextParseException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -136,7 +138,7 @@ public class CensusModulesTest {
ClientStreamTracer.StreamInfo.newBuilder()
.setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L)).build();

private static class StringInputStream extends InputStream {
private static class StringInputStream extends InputStream implements KnownLength {
final String string;

StringInputStream(String string) {
Expand All @@ -149,6 +151,11 @@ public int read() {
// passed to the InProcess server and consumed by MARSHALLER.parse().
throw new UnsupportedOperationException("Should not be called");
}

@Override
public int available() throws IOException {
return string == null ? 0 : string.length();
}
}

private static final MethodDescriptor.Marshaller<String> MARSHALLER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import io.grpc.Status;
import io.grpc.internal.testing.TestClientStreamTracer;
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.internal.testing.TestStreamTracer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -249,6 +250,11 @@ protected long fakeCurrentTimeNanos() {
throw new UnsupportedOperationException();
}

protected void assertInProcessTransportAssumedMessageSize(
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
TestStreamTracer streamTracerSender, TestStreamTracer streamTracerReceiver) {
// implemented by SizesReportedInProcessTransportTest
}

// TODO(ejona):
// multiple streams on same transport
// multiple client transports to same server
Expand Down Expand Up @@ -867,6 +873,7 @@ public void basicStream() throws Exception {
assertThat(serverStreamTracer1.nextInboundEvent())
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");

assertInProcessTransportAssumedMessageSize(clientStreamTracer1, serverStreamTracer1);
Metadata serverHeaders = new Metadata();
serverHeaders.put(asciiKey, "server");
serverHeaders.put(asciiKey, "dupvalue");
Expand Down Expand Up @@ -903,6 +910,7 @@ public void basicStream() throws Exception {
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertInProcessTransportAssumedMessageSize(serverStreamTracer1, clientStreamTracer1);

message.close();
assertNull("no additional message expected", clientStreamListener.messageQueue.poll());
Expand Down Expand Up @@ -1266,6 +1274,7 @@ public void onReady() {
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
assertInProcessTransportAssumedMessageSize(serverStreamTracer1, clientStreamTracer1);
assertNull(clientStreamTracer1.getInboundTrailers());
assertSame(status, clientStreamTracer1.getStatus());
// There is a race between client cancelling and server closing. The final status seen by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ static final class InProcessClientTransportFactory implements ClientTransportFac
private final int maxInboundMetadataSize;
private boolean closed;
private final boolean includeCauseWithStatus;
long assumedMessageSize = -1;
private final long assumedMessageSize = -1;
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved

private InProcessClientTransportFactory(
@Nullable ScheduledExecutorService scheduledExecutorService,
Expand Down Expand Up @@ -288,9 +288,5 @@ public void close() {
public Collection<Class<? extends SocketAddress>> getSupportedSocketAddressTypes() {
return Arrays.asList(InProcessSocketAddress.class, AnonymousInProcessSocketAddress.class);
}

public void assumedMessageSize(int bytes) {
this.assumedMessageSize = bytes;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static InProcessServer findServer(SocketAddress addr) {
*/
private ScheduledExecutorService scheduler;

private long assumedMessageSize = -1;
private final long assumedMessageSize;

InProcessServer(
InProcessServerBuilder builder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ void setStatsEnabled(boolean value) {
this.serverImplBuilder.setStatsEnabled(value);
}

public InProcessServerBuilder assumedMessageSize(long bytes) {
this.assumedMessageSize = bytes;
public InProcessServerBuilder assumedMessageSize(long assumedMessageSize) {
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved
this.assumedMessageSize = assumedMessageSize;
return this;
}
}
45 changes: 21 additions & 24 deletions inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private ServerTransportListener serverTransportListener;
private Attributes serverStreamAttributes;
private ManagedClientTransport.Listener clientTransportListener;
// These sizes are assumed from the sender' side.
private long serverAssumedMessageSize = -1;
private long clientAssumedMessageSize = -1;
// The size is assumed from the sender's side.
private long assumedMessageSize = -1;
@GuardedBy("this")
private boolean shutdown;
@GuardedBy("this")
Expand Down Expand Up @@ -172,7 +171,7 @@ public InProcessTransport(
Attributes eagAttrs, boolean includeCauseWithStatus, long assumedMessageSize) {
this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs,
Optional.<ServerListener>absent(), includeCauseWithStatus);
this.clientAssumedMessageSize = assumedMessageSize;
this.assumedMessageSize = assumedMessageSize;
}

InProcessTransport(
Expand Down Expand Up @@ -201,9 +200,9 @@ public synchronized Runnable start(ManagedClientTransport.Listener listener) {
serverSchedulerPool = server.getScheduledExecutorServicePool();
serverScheduler = serverSchedulerPool.getObject();
serverStreamTracerFactories = server.getStreamTracerFactories();
assumedMessageSize = server.getAssumedMessageSize();
// Must be semi-initialized; past this point, can begin receiving requests
serverTransportListener = server.register(this);
serverAssumedMessageSize = server.getAssumedMessageSize();
}
}
if (serverTransportListener == null) {
Expand Down Expand Up @@ -430,7 +429,7 @@ private void streamClosed() {
}
}

private long getMessageLength(InputStream inputStream) {
private long getKnownLength(InputStream inputStream) {
try {
return inputStream.available();
} catch (IOException | RuntimeException e) {
Expand Down Expand Up @@ -543,22 +542,21 @@ public void writeMessage(InputStream message) {
clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);

/*long messageLength = serverAssumedMessageSize == -1
? getMessageLength(message) : serverAssumedMessageSize;*/
long messageLength = -1;
long messageLength;
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved
try {
if (!(message instanceof KnownLength) && !(message instanceof ByteArrayInputStream)) {
if (assumedMessageSize != -1) {
messageLength = assumedMessageSize;
} else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
messageLength = getKnownLength(message);
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved
} else {
InputStream oldMessage = message;
byte[] payload = ByteStreams.toByteArray(message);
messageLength = payload.length;
message = new ByteArrayInputStream(payload);
oldMessage.close();
}
else {
messageLength = getMessageLength(message);
}
} catch (IOException ex) {
throw new RuntimeException(ex);
} catch (IOException e) {
throw new RuntimeException("Error processing the message length", e);
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved
}

statsTraceCtx.outboundUncompressedSize(messageLength);
Expand Down Expand Up @@ -838,22 +836,21 @@ public void writeMessage(InputStream message) {
serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);

/*long messageLength = clientAssumedMessageSize == -1
? getMessageLength(message) : clientAssumedMessageSize;*/
long messageLength = -1;
long messageLength;
try {
if (!(message instanceof KnownLength) && !(message instanceof ByteArrayInputStream)) {
if (assumedMessageSize != -1) {
messageLength = assumedMessageSize;
} else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
messageLength = getKnownLength(message);
} else {
InputStream oldMessage = message;
byte[] payload = ByteStreams.toByteArray(message);
messageLength = payload.length;
message = new ByteArrayInputStream(payload);
oldMessage.close();
}
else {
messageLength = getMessageLength(message);
}
} catch (IOException ex) {
throw new RuntimeException(ex);
} catch (IOException e) {
throw new RuntimeException("Error processing the message length", e);
}
statsTraceCtx.outboundUncompressedSize(messageLength);
statsTraceCtx.outboundWireSize(messageLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ protected InternalServer newServer(
protected ManagedClientTransport newClientTransport(InternalServer server) {
return new InProcessTransport(
address, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
testAuthority(server), USER_AGENT, eagAttrs(), false, -1);
testAuthority(server), USER_AGENT, eagAttrs(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected String testAuthority(InternalServer server) {
protected ManagedClientTransport newClientTransport(InternalServer server) {
return new InProcessTransport(
new InProcessSocketAddress(TRANSPORT_NAME), GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
testAuthority(server), USER_AGENT, eagAttrs(), false, -1);
testAuthority(server), USER_AGENT, eagAttrs(), false);
}

@Test
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a need to have a completely separate class. Just add tests to the regular InProcessTransportTest where you check that sizes are correctly reported for various scenarios: Specified an assumedSize that is different from the actual size; didn't specify the assumedSize; sent several messages.

Copy link
Member Author

@shivaspeaks shivaspeaks Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might not be able do that because we need to have an instance of InProcessTransport specified with TEST_MESSAGE_LENGTH for this test. So if we do that within InProcessTransportTest itself then we are expecting other tests like AnonymousInProcessTransportTest with the expected TEST_MESSAGE_LENGTH.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just need to do the full channel creation in the specific tests instead of relying on the shared setup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to take reference from basicStream() and creating a new test function in InProcessTransportTest naming it basicStreamInProcess(). All the lines before line 857, I'm almost taking every line for the setup except the Assertion statements and verification statements.

And at last I'll just be Asserting with what we need to test

assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundWireSize());
assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundUncompressedSize());
assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundWireSize());
assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundUncompressedSize());

So by doing this, a lot of variables and inner private classes of AbstractTransportTest comes into picture. I made all of them as protected or public wherever necessary. Otherwise we will be ending up re-writing all the classes again provided in AbstractTransportTest. Adding all this will pollute the class even more and that too just for 1 test! Also, I'm running into more unseen errors by full channel creation within test. Maybe we should fallback to the current approach itself of having a different class and we cans specify in the javadoc that this test specifically runs for assumed size in InProcess transport? Its getting complex here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @larry-safran, we are waiting for your reply. We don't want to copy the private classes TestServerStreamTracer and TestClienttreamTracer in AbstractTransportTest, and moving them out to separate classes defeats the purpose of avoiding creating new test files unnecessarily.

Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.inprocess;

import static com.google.common.truth.Truth.assertThat;

import io.grpc.ServerStreamTracer;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.testing.TestStreamTracer;
import java.util.List;
import org.junit.After;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class SizesReportedInProcessTransportTest extends InProcessTransportTest {
private static final long TEST_MESSAGE_LENGTH = 100;
private AnonymousInProcessSocketAddress address = new AnonymousInProcessSocketAddress();

@After
@Override
public void tearDown() throws InterruptedException {
super.tearDown();
assertThat(address.getServer()).isNull();
}

@Override
protected InternalServer newServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
InProcessServerBuilder builder = InProcessServerBuilder.forAddress(address)
.maxInboundMetadataSize(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE)
.assumedMessageSize(TEST_MESSAGE_LENGTH);
return new InProcessServer(builder, streamTracerFactories);
}

@Override
protected ManagedClientTransport newClientTransport(InternalServer server) {
return new InProcessTransport(
address, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
testAuthority(server), USER_AGENT, eagAttrs(), false, TEST_MESSAGE_LENGTH);
}

@Override
public void assertInProcessTransportAssumedMessageSize(
TestStreamTracer streamTracerSender, TestStreamTracer streamTracerReceiver) {
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundWireSize());
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerSender.getOutboundUncompressedSize());
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundWireSize());
Assert.assertEquals(TEST_MESSAGE_LENGTH, streamTracerReceiver.getInboundUncompressedSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.KnownLength;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
Expand All @@ -53,6 +54,7 @@
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -107,7 +109,7 @@ public class OpenTelemetryMetricsModuleTest {
{ 0L, 1024L, 2048L, 4096L, 16384L, 65536L, 262144L, 1048576L, 4194304L, 16777216L,
67108864L, 268435456L, 1073741824L, 4294967296L };

private static final class StringInputStream extends InputStream {
private static final class StringInputStream extends InputStream implements KnownLength {
final String string;

StringInputStream(String string) {
Expand All @@ -118,6 +120,11 @@ private static final class StringInputStream extends InputStream {
public int read() {
throw new UnsupportedOperationException("should not be called");
}

@Override
public int available() throws IOException {
return string == null ? 0 : string.length();
}
}

private static final MethodDescriptor.Marshaller<String> MARSHALLER =
Expand Down
Loading