Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Inprocess memory leak #11406

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion census/src/test/java/io/grpc/census/CensusModulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.KnownLength;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
Expand Down Expand Up @@ -99,6 +100,7 @@
import io.opencensus.trace.Tracer;
import io.opencensus.trace.propagation.BinaryFormat;
import io.opencensus.trace.propagation.SpanContextParseException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -136,7 +138,7 @@ public class CensusModulesTest {
ClientStreamTracer.StreamInfo.newBuilder()
.setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L)).build();

private static class StringInputStream extends InputStream {
private static class StringInputStream extends InputStream implements KnownLength {
final String string;

StringInputStream(String string) {
Expand All @@ -149,6 +151,11 @@ public int read() {
// passed to the InProcess server and consumed by MARSHALLER.parse().
throw new UnsupportedOperationException("Should not be called");
}

@Override
public int available() throws IOException {
return string == null ? 0 : string.length();
}
}

private static final MethodDescriptor.Marshaller<String> MARSHALLER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,6 @@ protected abstract InternalServer newServer(
/**
* Returns true (which is default) if the transport reports message sizes to StreamTracers.
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
*/
protected boolean sizesReported() {
return true;
}

protected final Attributes eagAttrs() {
return EAG_ATTRS;
Expand All @@ -163,9 +160,9 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
* tests in an indeterminate state.
*/
protected InternalServer server;
private ServerTransport serverTransport;
private ManagedClientTransport client;
private MethodDescriptor<String, String> methodDescriptor =
protected ServerTransport serverTransport;
protected ManagedClientTransport client;
protected MethodDescriptor<String, String> methodDescriptor =
MethodDescriptor.<String, String>newBuilder()
.setType(MethodDescriptor.MethodType.UNKNOWN)
.setFullMethodName("service/method")
Expand All @@ -182,20 +179,20 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
"tracer-key", Metadata.ASCII_STRING_MARSHALLER);
private final String tracerKeyValue = "tracer-key-value";

private ManagedClientTransport.Listener mockClientTransportListener
protected ManagedClientTransport.Listener mockClientTransportListener
= mock(ManagedClientTransport.Listener.class);
private MockServerListener serverListener = new MockServerListener();
protected MockServerListener serverListener = new MockServerListener();
private ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
private final TestClientStreamTracer clientStreamTracer1 = new TestHeaderClientStreamTracer();
protected final TestClientStreamTracer clientStreamTracer1 = new TestHeaderClientStreamTracer();
private final TestClientStreamTracer clientStreamTracer2 = new TestHeaderClientStreamTracer();
private final ClientStreamTracer[] tracers = new ClientStreamTracer[] {
protected final ClientStreamTracer[] tracers = new ClientStreamTracer[] {
clientStreamTracer1, clientStreamTracer2
};
private final ClientStreamTracer[] noopTracers = new ClientStreamTracer[] {
new ClientStreamTracer() {}
};

private final TestServerStreamTracer serverStreamTracer1 = new TestServerStreamTracer();
protected final TestServerStreamTracer serverStreamTracer1 = new TestServerStreamTracer();
private final TestServerStreamTracer serverStreamTracer2 = new TestServerStreamTracer();
private final ServerStreamTracer.Factory serverStreamTracerFactory = mock(
ServerStreamTracer.Factory.class,
Expand Down Expand Up @@ -857,26 +854,16 @@ public void basicStream() throws Exception {
message.close();
assertThat(clientStreamTracer1.nextOutboundEvent())
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
if (sizesReported()) {
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(clientStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
}
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
assertNull("no additional message expected", serverStreamListener.messageQueue.poll());

clientStream.halfClose();
assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));

if (sizesReported()) {
assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(serverStreamTracer1.getInboundWireSize()).isEqualTo(0L);
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
}
assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.nextInboundEvent())
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");

Expand Down Expand Up @@ -907,25 +894,15 @@ public void basicStream() throws Exception {
assertNotNull("message expected", message);
assertThat(serverStreamTracer1.nextOutboundEvent())
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
if (sizesReported()) {
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(serverStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
}
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
assertTrue(clientStreamTracer1.getInboundHeaders());
assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
assertEquals("Hi. Who are you?", methodDescriptor.parseResponse(message));
assertThat(clientStreamTracer1.nextInboundEvent())
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
if (sizesReported()) {
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(clientStreamTracer1.getInboundWireSize()).isEqualTo(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
}
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);

message.close();
assertNull("no additional message expected", clientStreamListener.messageQueue.poll());
Expand Down Expand Up @@ -1285,17 +1262,10 @@ public void onReady() {
serverStream.close(Status.OK, new Metadata());
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertTrue(clientStreamTracer1.getInboundHeaders());
if (sizesReported()) {
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(clientStreamTracer1.getInboundWireSize()).isEqualTo(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
assertThat(serverStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
}
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
assertNull(clientStreamTracer1.getInboundTrailers());
assertSame(status, clientStreamTracer1.getStatus());
// There is a race between client cancelling and server closing. The final status seen by the
Expand Down Expand Up @@ -2184,7 +2154,7 @@ private static void runIfNotNull(Runnable runnable) {
}
}

private static void startTransport(
protected static void startTransport(
ManagedClientTransport clientTransport,
ManagedClientTransport.Listener listener) {
runIfNotNull(clientTransport.start(listener));
Expand All @@ -2202,7 +2172,7 @@ public void streamCreated(Attributes transportAttrs, Metadata metadata) {
}
}

private static class MockServerListener implements ServerListener {
public static class MockServerListener implements ServerListener {
public final BlockingQueue<MockServerTransportListener> listeners
= new LinkedBlockingQueue<>();
private final SettableFuture<?> shutdown = SettableFuture.create();
Expand Down Expand Up @@ -2233,7 +2203,7 @@ public MockServerTransportListener takeListenerOrFail(long timeout, TimeUnit uni
}
}

private static class MockServerTransportListener implements ServerTransportListener {
public static class MockServerTransportListener implements ServerTransportListener {
public final ServerTransport transport;
public final BlockingQueue<StreamCreation> streams = new LinkedBlockingQueue<>();
private final SettableFuture<?> terminated = SettableFuture.create();
Expand Down Expand Up @@ -2281,8 +2251,8 @@ public StreamCreation takeStreamOrFail(long timeout, TimeUnit unit)
}
}

private static class ServerStreamListenerBase implements ServerStreamListener {
private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
public static class ServerStreamListenerBase implements ServerStreamListener {
public final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
// Would have used Void instead of Object, but null elements are not allowed
private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<>();
private final CountDownLatch halfClosedLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -2341,8 +2311,8 @@ public void closed(Status status) {
}
}

private static class ClientStreamListenerBase implements ClientStreamListener {
private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
public static class ClientStreamListenerBase implements ClientStreamListener {
public final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<>();
// Would have used Void instead of Object, but null elements are not allowed
private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<>();
private final SettableFuture<Metadata> headers = SettableFuture.create();
Expand Down Expand Up @@ -2399,7 +2369,7 @@ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
}
}

private static class StreamCreation {
public static class StreamCreation {
public final ServerStream stream;
public final String method;
public final Metadata headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public static InProcessChannelBuilder forAddress(String name, int port) {
private ScheduledExecutorService scheduledExecutorService;
private int maxInboundMetadataSize = Integer.MAX_VALUE;
private boolean transportIncludeStatusCause = false;
private long assumedMessageSize = -1;

private InProcessChannelBuilder(@Nullable SocketAddress directAddress, @Nullable String target) {

Expand All @@ -117,10 +118,6 @@ public ClientTransportFactory buildClientTransportFactory() {
managedChannelImplBuilder.setStatsRecordStartedRpcs(false);
managedChannelImplBuilder.setStatsRecordFinishedRpcs(false);
managedChannelImplBuilder.setStatsRecordRetryMetrics(false);

// By default, In-process transport should not be retriable as that leaks memory. Since
// there is no wire, bytes aren't calculated so buffer limit isn't respected
managedChannelImplBuilder.disableRetry();
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved
}

@Internal
Expand Down Expand Up @@ -225,9 +222,23 @@ public InProcessChannelBuilder propagateCauseWithStatus(boolean enable) {
return this;
}

/**
* Sets whether to include the provided messageSize or not and is propagated
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
* forward to InProcessTransport. This was added to not calculate messageSize
* if already provided while calling builder.
*
* @param assumedMessageSize length of InProcess transport's messageSize
* @return this
*/
public InProcessChannelBuilder assumedMessageSize(long assumedMessageSize) {
checkArgument(assumedMessageSize >= 0, "assumedMessageSize must be >= 0");
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
this.assumedMessageSize = assumedMessageSize;
return this;
}

ClientTransportFactory buildTransportFactory() {
return new InProcessClientTransportFactory(
scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause);
return new InProcessClientTransportFactory(scheduledExecutorService,
maxInboundMetadataSize, transportIncludeStatusCause, assumedMessageSize);
}

void setStatsEnabled(boolean value) {
Expand All @@ -243,15 +254,17 @@ static final class InProcessClientTransportFactory implements ClientTransportFac
private final int maxInboundMetadataSize;
private boolean closed;
private final boolean includeCauseWithStatus;
private long assumedMessageSize;

private InProcessClientTransportFactory(
@Nullable ScheduledExecutorService scheduledExecutorService,
int maxInboundMetadataSize, boolean includeCauseWithStatus) {
int maxInboundMetadataSize, boolean includeCauseWithStatus, long assumedMessageSize) {
useSharedTimer = scheduledExecutorService == null;
timerService = useSharedTimer
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
this.maxInboundMetadataSize = maxInboundMetadataSize;
this.includeCauseWithStatus = includeCauseWithStatus;
this.assumedMessageSize = assumedMessageSize;
}

@Override
Expand All @@ -263,7 +276,7 @@ public ConnectionClientTransport newClientTransport(
// TODO(carl-mastrangelo): Pass channelLogger in.
return new InProcessTransport(
addr, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(),
options.getEagAttributes(), includeCauseWithStatus);
options.getEagAttributes(), includeCauseWithStatus, assumedMessageSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ static InProcessServer findServer(SocketAddress addr) {
*/
private ScheduledExecutorService scheduler;

private final long assumedMessageSize;

InProcessServer(
InProcessServerBuilder builder,
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
Expand All @@ -70,6 +72,7 @@ static InProcessServer findServer(SocketAddress addr) {
this.maxInboundMetadataSize = builder.maxInboundMetadataSize;
this.streamTracerFactories =
Collections.unmodifiableList(checkNotNull(streamTracerFactories, "streamTracerFactories"));
this.assumedMessageSize = builder.assumedMessageSize;
}

@Override
Expand Down Expand Up @@ -159,4 +162,8 @@ int getMaxInboundMetadataSize() {
List<ServerStreamTracer.Factory> getStreamTracerFactories() {
return streamTracerFactories;
}

long getAssumedMessageSize() {
return assumedMessageSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public static String generateName() {
private final ServerImplBuilder serverImplBuilder;
final SocketAddress listenAddress;
int maxInboundMetadataSize = Integer.MAX_VALUE;
long assumedMessageSize = -1;
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
ObjectPool<ScheduledExecutorService> schedulerPool =
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);

Expand Down Expand Up @@ -210,4 +211,18 @@ public InProcessServerBuilder useTransportSecurity(File certChain, File privateK
void setStatsEnabled(boolean value) {
this.serverImplBuilder.setStatsEnabled(value);
}

/**
* Sets whether to include the provided messageSize or not and is propagated
* forward to InProcessTransport. This was added to not calculate messageSize
* if already provided while calling builder.
*
* @param assumedMessageSize length of InProcess transport's messageSize
* @return this
*/
public InProcessServerBuilder assumedMessageSize(long assumedMessageSize) {
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkArgument(assumedMessageSize >= 0, "assumedMessageSize must be >= 0");
this.assumedMessageSize = assumedMessageSize;
return this;
}
}
Loading
Loading