Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Inprocess memory leak #11406

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion census/src/test/java/io/grpc/census/CensusModulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.KnownLength;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
Expand Down Expand Up @@ -99,6 +100,7 @@
import io.opencensus.trace.Tracer;
import io.opencensus.trace.propagation.BinaryFormat;
import io.opencensus.trace.propagation.SpanContextParseException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -136,7 +138,7 @@ public class CensusModulesTest {
ClientStreamTracer.StreamInfo.newBuilder()
.setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L)).build();

private static class StringInputStream extends InputStream {
private static class StringInputStream extends InputStream implements KnownLength {
final String string;

StringInputStream(String string) {
Expand All @@ -149,6 +151,11 @@ public int read() {
// passed to the InProcess server and consumed by MARSHALLER.parse().
throw new UnsupportedOperationException("Should not be called");
}

@Override
public int available() throws IOException {
return string == null ? 0 : string.length();
}
}

private static final MethodDescriptor.Marshaller<String> MARSHALLER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import io.grpc.Status;
import io.grpc.internal.testing.TestClientStreamTracer;
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.internal.testing.TestStreamTracer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -139,9 +140,6 @@ protected abstract InternalServer newServer(
/**
* Returns true (which is default) if the transport reports message sizes to StreamTracers.
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
*/
protected boolean sizesReported() {
return true;
}

protected final Attributes eagAttrs() {
return EAG_ATTRS;
Expand Down Expand Up @@ -252,6 +250,11 @@ protected long fakeCurrentTimeNanos() {
throw new UnsupportedOperationException();
}

protected void assertInProcessTransportAssumedMessageSize(
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
TestStreamTracer streamTracerSender, TestStreamTracer streamTracerReceiver) {
// implemented by SizesReportedInProcessTransportTest
}

// TODO(ejona):
// multiple streams on same transport
// multiple client transports to same server
Expand Down Expand Up @@ -857,29 +860,20 @@ public void basicStream() throws Exception {
message.close();
assertThat(clientStreamTracer1.nextOutboundEvent())
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
if (sizesReported()) {
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(clientStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
}
assertThat(clientStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
assertNull("no additional message expected", serverStreamListener.messageQueue.poll());

clientStream.halfClose();
assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS));

if (sizesReported()) {
assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(serverStreamTracer1.getInboundWireSize()).isEqualTo(0L);
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
}
assertThat(serverStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.nextInboundEvent())
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");

assertInProcessTransportAssumedMessageSize(clientStreamTracer1, serverStreamTracer1);
Metadata serverHeaders = new Metadata();
serverHeaders.put(asciiKey, "server");
serverHeaders.put(asciiKey, "dupvalue");
Expand Down Expand Up @@ -907,25 +901,16 @@ public void basicStream() throws Exception {
assertNotNull("message expected", message);
assertThat(serverStreamTracer1.nextOutboundEvent())
.matches("outboundMessageSent\\(0, -?[0-9]+, -?[0-9]+\\)");
if (sizesReported()) {
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(serverStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
}
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
assertTrue(clientStreamTracer1.getInboundHeaders());
assertThat(clientStreamTracer1.nextInboundEvent()).isEqualTo("inboundMessage(0)");
assertEquals("Hi. Who are you?", methodDescriptor.parseResponse(message));
assertThat(clientStreamTracer1.nextInboundEvent())
.matches("inboundMessageRead\\(0, -?[0-9]+, -?[0-9]+\\)");
if (sizesReported()) {
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(clientStreamTracer1.getInboundWireSize()).isEqualTo(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
}
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertInProcessTransportAssumedMessageSize(serverStreamTracer1, clientStreamTracer1);

message.close();
assertNull("no additional message expected", clientStreamListener.messageQueue.poll());
Expand Down Expand Up @@ -1285,17 +1270,11 @@ public void onReady() {
serverStream.close(Status.OK, new Metadata());
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertTrue(clientStreamTracer1.getInboundHeaders());
if (sizesReported()) {
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
} else {
assertThat(clientStreamTracer1.getInboundWireSize()).isEqualTo(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isEqualTo(0L);
assertThat(serverStreamTracer1.getOutboundWireSize()).isEqualTo(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isEqualTo(0L);
}
assertThat(clientStreamTracer1.getInboundWireSize()).isGreaterThan(0L);
assertThat(clientStreamTracer1.getInboundUncompressedSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundWireSize()).isGreaterThan(0L);
assertThat(serverStreamTracer1.getOutboundUncompressedSize()).isGreaterThan(0L);
assertInProcessTransportAssumedMessageSize(serverStreamTracer1, clientStreamTracer1);
assertNull(clientStreamTracer1.getInboundTrailers());
assertSame(status, clientStreamTracer1.getStatus());
// There is a race between client cancelling and server closing. The final status seen by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ public ClientTransportFactory buildClientTransportFactory() {
managedChannelImplBuilder.setStatsRecordStartedRpcs(false);
managedChannelImplBuilder.setStatsRecordFinishedRpcs(false);
managedChannelImplBuilder.setStatsRecordRetryMetrics(false);

// By default, In-process transport should not be retriable as that leaks memory. Since
// there is no wire, bytes aren't calculated so buffer limit isn't respected
managedChannelImplBuilder.disableRetry();
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved
}

@Internal
Expand Down Expand Up @@ -243,6 +239,7 @@ static final class InProcessClientTransportFactory implements ClientTransportFac
private final int maxInboundMetadataSize;
private boolean closed;
private final boolean includeCauseWithStatus;
private final long assumedMessageSize = -1;
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved

private InProcessClientTransportFactory(
@Nullable ScheduledExecutorService scheduledExecutorService,
Expand All @@ -263,7 +260,7 @@ public ConnectionClientTransport newClientTransport(
// TODO(carl-mastrangelo): Pass channelLogger in.
return new InProcessTransport(
addr, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(),
options.getEagAttributes(), includeCauseWithStatus);
options.getEagAttributes(), includeCauseWithStatus, assumedMessageSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ static InProcessServer findServer(SocketAddress addr) {
*/
private ScheduledExecutorService scheduler;

private final long assumedMessageSize;

InProcessServer(
InProcessServerBuilder builder,
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
Expand All @@ -70,6 +72,7 @@ static InProcessServer findServer(SocketAddress addr) {
this.maxInboundMetadataSize = builder.maxInboundMetadataSize;
this.streamTracerFactories =
Collections.unmodifiableList(checkNotNull(streamTracerFactories, "streamTracerFactories"));
this.assumedMessageSize = builder.assumedMessageSize;
}

@Override
Expand Down Expand Up @@ -159,4 +162,8 @@ int getMaxInboundMetadataSize() {
List<ServerStreamTracer.Factory> getStreamTracerFactories() {
return streamTracerFactories;
}

long getAssumedMessageSize() {
return assumedMessageSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public static String generateName() {
private final ServerImplBuilder serverImplBuilder;
final SocketAddress listenAddress;
int maxInboundMetadataSize = Integer.MAX_VALUE;
long assumedMessageSize = -1;
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
ObjectPool<ScheduledExecutorService> schedulerPool =
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);

Expand Down Expand Up @@ -210,4 +211,9 @@ public InProcessServerBuilder useTransportSecurity(File certChain, File privateK
void setStatsEnabled(boolean value) {
this.serverImplBuilder.setStatsEnabled(value);
}

public InProcessServerBuilder assumedMessageSize(long assumedMessageSize) {
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved
this.assumedMessageSize = assumedMessageSize;
return this;
}
}
71 changes: 71 additions & 0 deletions inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
Expand All @@ -35,6 +36,7 @@
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalLogId;
import io.grpc.InternalMetadata;
import io.grpc.KnownLength;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.SecurityLevel;
Expand All @@ -59,6 +61,8 @@
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.StreamListener;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketAddress;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -95,6 +99,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private ServerTransportListener serverTransportListener;
private Attributes serverStreamAttributes;
private ManagedClientTransport.Listener clientTransportListener;
// The size is assumed from the sender's side.
private long assumedMessageSize = -1;
@GuardedBy("this")
private boolean shutdown;
@GuardedBy("this")
Expand Down Expand Up @@ -160,6 +166,14 @@ public InProcessTransport(
Optional.<ServerListener>absent(), includeCauseWithStatus);
}

public InProcessTransport(
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs, boolean includeCauseWithStatus, long assumedMessageSize) {
this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs,
Optional.<ServerListener>absent(), includeCauseWithStatus);
this.assumedMessageSize = assumedMessageSize;
}

InProcessTransport(
String name, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
Expand All @@ -186,6 +200,7 @@ public synchronized Runnable start(ManagedClientTransport.Listener listener) {
serverSchedulerPool = server.getScheduledExecutorServicePool();
serverScheduler = serverSchedulerPool.getObject();
serverStreamTracerFactories = server.getStreamTracerFactories();
assumedMessageSize = server.getAssumedMessageSize();
// Must be semi-initialized; past this point, can begin receiving requests
serverTransportListener = server.register(this);
}
Expand Down Expand Up @@ -414,6 +429,17 @@ private void streamClosed() {
}
}

private long getKnownLength(InputStream inputStream) {
try {
return inputStream.available();
} catch (IOException | RuntimeException e) {
throw Status.INTERNAL
.withDescription("Failed to calculate size")
.withCause(e)
.asRuntimeException();
}
}

private class InProcessServerStream implements ServerStream {
final StatsTraceContext statsTraceCtx;
// All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
Expand Down Expand Up @@ -515,6 +541,29 @@ public void writeMessage(InputStream message) {
statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);

long messageLength;
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved
try {
if (assumedMessageSize != -1) {
messageLength = assumedMessageSize;
} else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
messageLength = getKnownLength(message);
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved
} else {
InputStream oldMessage = message;
byte[] payload = ByteStreams.toByteArray(message);
messageLength = payload.length;
message = new ByteArrayInputStream(payload);
oldMessage.close();
}
} catch (IOException e) {
throw new RuntimeException("Error processing the message length", e);
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved
}

statsTraceCtx.outboundUncompressedSize(messageLength);
statsTraceCtx.outboundWireSize(messageLength);
// messageLength should be same at receiver's end as no actual wire is involved.
clientStream.statsTraceCtx.inboundUncompressedSize(messageLength);
clientStream.statsTraceCtx.inboundWireSize(messageLength);
outboundSeqNo++;
StreamListener.MessageProducer producer = new SingleMessageProducer(message);
if (clientRequested > 0) {
Expand Down Expand Up @@ -786,6 +835,28 @@ public void writeMessage(InputStream message) {
statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);

long messageLength;
try {
if (assumedMessageSize != -1) {
messageLength = assumedMessageSize;
} else if (message instanceof KnownLength || message instanceof ByteArrayInputStream) {
messageLength = getKnownLength(message);
} else {
InputStream oldMessage = message;
byte[] payload = ByteStreams.toByteArray(message);
messageLength = payload.length;
message = new ByteArrayInputStream(payload);
oldMessage.close();
}
} catch (IOException e) {
throw new RuntimeException("Error processing the message length", e);
}
statsTraceCtx.outboundUncompressedSize(messageLength);
statsTraceCtx.outboundWireSize(messageLength);
// messageLength should be same at receiver's end as no actual wire is involved.
serverStream.statsTraceCtx.inboundUncompressedSize(messageLength);
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
serverStream.statsTraceCtx.inboundWireSize(messageLength);
outboundSeqNo++;
StreamListener.MessageProducer producer = new SingleMessageProducer(message);
if (serverRequested > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,6 @@ protected ManagedClientTransport newClientTransport(InternalServer server) {
testAuthority(server), USER_AGENT, eagAttrs(), false);
}

@Override
protected boolean sizesReported() {
// TODO(zhangkun83): InProcessTransport doesn't record metrics for now
// (https://github.com/grpc/grpc-java/issues/2284)
return false;
}

@Test
@Ignore
@Override
Expand Down
Loading
Loading