Skip to content

Commit

Permalink
Merge branch 'master' into upgrade-grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
yidongnan authored Apr 12, 2024
2 parents 679c4e5 + cb029de commit 0a5324d
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 15 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ establishing a direct line of communication. Your feedback is highly appreciated

README: [English](README.md) | [中文](README-zh-CN.md)

**Documentation:** [English](https://yidongnan.github.io/grpc-spring-boot-starter/en/) | [中文](https://yidongnan.github.io/grpc-spring-boot-starter/zh-CN/)
**Documentation:** [English](https://grpc-ecosystem.github.io/grpc-spring/en/) | [中文](https://grpc-ecosystem.github.io/grpc-spring/zh-CN/)

## Features

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ buildscript {
protobufGradlePluginVersion = '0.9.4'

// https://github.com/spring-projects/spring-boot/releases
springBootVersion = '3.2.3'
springBootVersion = '3.2.4'
// https://github.com/spring-cloud/spring-cloud-release/releases
springCloudVersion = '2023.0.0'
// https://github.com/alibaba/spring-cloud-alibaba/releases
Expand Down
2 changes: 1 addition & 1 deletion docs/en/client/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ There are a number of supported schemes, that you can use to determine the targe
- `discovery` (Prio 6): \
(Optional) Uses spring-cloud's `DiscoveryClient` to lookup appropriate targets. The connections will be refreshed
automatically during `HeartbeatEvent`s. Uses the `gRPC_port` metadata to determine the port, otherwise uses the
service port. \
service port. Uses the `gRPC_service_config` metadata to determine [service config](https://grpc.github.io/grpc/core/md_doc_service_config.html). \
Example: `discovery:///service-name`
- `self` (Prio 0): \
The self address or scheme is a keyword that is available, if you also use `grpc-server-spring-boot-starter` and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.micrometer.core.instrument.Tags;
import net.devh.boot.grpc.common.util.Constants;

/**
* Provides factories for {@link io.grpc.StreamTracer} that records metrics.
Expand All @@ -47,6 +48,8 @@
final class MetricsClientStreamTracers {
private static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = Stopwatch::createUnstarted;
private final Supplier<Stopwatch> stopwatchSupplier;
private static final String INSTRUMENTATION_SOURCE_TAG_KEY = "instrumentation_source";
private static final String INSTRUMENTATION_VERSION_TAG_KEY = "instrumentation_version";

MetricsClientStreamTracers() {
this(STOPWATCH_SUPPLIER);
Expand Down Expand Up @@ -127,7 +130,10 @@ public void streamClosed(Status status) {

void recordFinishedAttempt() {
Tags attemptMetricTags =
Tags.of("grpc.method", fullMethodName, "grpc.status", statusCode.toString());
Tags.of("grpc.method", fullMethodName,
"grpc.status", statusCode.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION);
this.metricsClientMeters.getClientAttemptDuration()
.withTags(attemptMetricTags)
.record(attemptNanos, TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -168,7 +174,9 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory

// Record here in case newClientStreamTracer() would never be called.
this.metricsClientMeters.getAttemptCounter()
.withTags(Tags.of("grpc.method", fullMethodName))
.withTags(Tags.of("grpc.method", fullMethodName,
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION))
.increment();
}

Expand All @@ -188,7 +196,9 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada
// attempt, as first attempt cannot be a transparent retry.
if (attemptsPerCall.get() > 0) {
this.metricsClientMeters.getAttemptCounter()
.withTags((Tags.of("grpc.method", fullMethodName)))
.withTags((Tags.of("grpc.method", fullMethodName,
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION)))
.increment();
}
if (!info.isTransparentRetry()) {
Expand Down Expand Up @@ -248,7 +258,10 @@ void recordFinishedCall() {
}
callLatencyNanos = clientCallStopWatch.elapsed(TimeUnit.NANOSECONDS);
Tags clientCallMetricTags =
Tags.of("grpc.method", this.fullMethodName, "grpc.status", status.getCode().toString());
Tags.of("grpc.method", this.fullMethodName,
"grpc.status", status.getCode().toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION);
this.metricsClientMeters.getClientCallDuration()
.withTags(clientCallMetricTags)
.record(callLatencyNanos, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static net.devh.boot.grpc.client.nameresolver.DiscoveryClientResolverFactory.DISCOVERY_INSTANCE_ID_KEY;
import static net.devh.boot.grpc.client.nameresolver.DiscoveryClientResolverFactory.DISCOVERY_SERVICE_NAME_KEY;
import static net.devh.boot.grpc.common.util.GrpcUtils.CLOUD_DISCOVERY_METADATA_PORT;
import static net.devh.boot.grpc.common.util.GrpcUtils.CLOUD_DISCOVERY_METADATA_SERVICE_CONFIG;

import java.net.InetSocketAddress;
import java.util.List;
Expand All @@ -35,6 +36,8 @@
import org.springframework.util.CollectionUtils;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;

import io.grpc.Attributes;
import io.grpc.Attributes.Builder;
Expand All @@ -58,13 +61,15 @@ public class DiscoveryClientNameResolver extends NameResolver {
@Deprecated
private static final String LEGACY_CLOUD_DISCOVERY_METADATA_PORT = "gRPC.port";
private static final List<ServiceInstance> KEEP_PREVIOUS = null;
private static final Gson GSON = new Gson();

private final String name;
private final DiscoveryClient client;
private final SynchronizationContext syncContext;
private final Consumer<DiscoveryClientNameResolver> shutdownHook;
private final SharedResourceHolder.Resource<Executor> executorResource;
private final boolean usingExecutorResource;
private final ServiceConfigParser serviceConfigParser;

// The field must be accessed from syncContext, although the methods on an Listener2 can be called
// from any thread.
Expand Down Expand Up @@ -93,6 +98,7 @@ public DiscoveryClientNameResolver(final String name, final DiscoveryClient clie
this.executor = args.getOffloadExecutor();
this.usingExecutorResource = this.executor == null;
this.executorResource = executorResource;
this.serviceConfigParser = args.getServiceConfigParser();
}

/**
Expand Down Expand Up @@ -187,6 +193,55 @@ protected int getGrpcPort(final ServiceInstance instance) {
}
}

/**
* Extracts and parse gRPC service config from the given service instances.
*
* @param instances The list of instances to extract the service config from.
* @return Parsed gRPC service config or null.
*/
private ConfigOrError resolveServiceConfig(List<ServiceInstance> instances) {
final String serviceConfig = getServiceConfig(instances);
if (serviceConfig == null) {
return null;
}
log.debug("Found service config for {}", getName());
if (log.isTraceEnabled()) {
// This is to avoid blowing log into several lines if newlines present in service config string.
final String logStr = serviceConfig.replace("\r", "\\r").replace("\n", "\\n");
log.trace("Service config for {}: {}", getName(), logStr);
}
try {
@SuppressWarnings("unchecked")
Map<String, ?> parsedServiceConfig = GSON.fromJson(serviceConfig, Map.class);
return serviceConfigParser.parseServiceConfig(parsedServiceConfig);
} catch (JsonSyntaxException e) {
return ConfigOrError.fromError(
Status.UNKNOWN
.withDescription("Failed to parse grpc service config")
.withCause(e));
}
}

/**
* Extracts the gRPC service config string from the given service instances.
*
* @param instances The list of instances to extract the service config from.
* @return The gRPC service config or null.
*/
protected String getServiceConfig(final List<ServiceInstance> instances) {
for (final ServiceInstance inst : instances) {
final Map<String, String> metadata = inst.getMetadata();
if (metadata == null || metadata.isEmpty()) {
continue;
}
final String metaValue = metadata.get(CLOUD_DISCOVERY_METADATA_SERVICE_CONFIG);
if (metaValue != null && !metaValue.isEmpty()) {
return metaValue;
}
}
return null;
}

/**
* Gets the attributes from the service instance for later use in a load balancer. Can be overwritten to convert
* custom attributes.
Expand Down Expand Up @@ -318,6 +373,7 @@ private List<ServiceInstance> resolveInternal() {
log.debug("Ready to update server list for {}", getName());
this.savedListener.onResult(ResolutionResult.newBuilder()
.setAddresses(toTargets(newInstanceList))
.setServiceConfig(resolveServiceConfig(newInstanceList))
.build());
log.info("Done updating server list for {}", getName());
return newInstanceList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import net.devh.boot.grpc.client.metrics.MetricsClientStreamTracers.CallAttemptsTracerFactory;
import net.devh.boot.grpc.common.util.Constants;

/**
* Tests for {@link MetricsClientStreamTracers}.
Expand All @@ -61,6 +62,10 @@ class MetricsClientStreamTracersTest {
private static final String GRPC_METHOD_TAG_KEY = "grpc.method";
private static final String GRPC_STATUS_TAG_KEY = "grpc.status";
private static final String FULL_METHOD_NAME = "package1.service1/method1";
private static final String INSTRUMENTATION_SOURCE_TAG_KEY = "instrumentation_source";
private static final String INSTRUMENTATION_SOURCE_TAG_VALUE = Constants.LIBRARY_NAME;
private static final String INSTRUMENTATION_VERSION_TAG_KEY = "instrumentation_version";
private static final String INSTRUMENTATION_VERSION_TAG_VALUE = Constants.VERSION;

private static class StringInputStream extends InputStream {
final String string;
Expand Down Expand Up @@ -125,6 +130,8 @@ void clientBasicMetrics() {

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(1);

Expand All @@ -146,11 +153,16 @@ void clientBasicMetrics() {

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(1);

Tags expectedTags =
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME, GRPC_STATUS_TAG_KEY, Status.Code.OK.toString());
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME,
GRPC_STATUS_TAG_KEY, Status.Code.OK.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE,
INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE);

HistogramSnapshot attemptDurationSnapshot = meterRegistry.get(CLIENT_ATTEMPT_DURATION)
.tags(expectedTags)
Expand Down Expand Up @@ -200,6 +212,8 @@ void recordAttemptMetrics() {

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(1);

Expand All @@ -213,10 +227,15 @@ void recordAttemptMetrics() {
tracer.streamClosed(Status.UNAVAILABLE);

Tags expectedUnailableStatusTags =
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME, GRPC_STATUS_TAG_KEY, Status.Code.UNAVAILABLE.toString());
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME,
GRPC_STATUS_TAG_KEY, Status.Code.UNAVAILABLE.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE,
INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE);

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(1);
assertThat(meterRegistry.get(CLIENT_ATTEMPT_DURATION)
Expand Down Expand Up @@ -248,10 +267,15 @@ void recordAttemptMetrics() {
tracer.streamClosed(Status.NOT_FOUND);

Tags expectedNotFoundStatusTags =
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME, GRPC_STATUS_TAG_KEY, Status.Code.NOT_FOUND.toString());
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME,
GRPC_STATUS_TAG_KEY, Status.Code.NOT_FOUND.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE,
INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE);

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(2);

Expand Down Expand Up @@ -290,6 +314,8 @@ void recordAttemptMetrics() {

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(3);

Expand Down Expand Up @@ -342,10 +368,15 @@ void recordAttemptMetrics() {
callAttemptsTracerFactory.callEnded(Status.OK);

Tags expectedOKStatusTags =
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME, GRPC_STATUS_TAG_KEY, Status.Code.OK.toString());
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME,
GRPC_STATUS_TAG_KEY, Status.Code.OK.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE,
INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE);

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(4);
assertThat(meterRegistry.get(CLIENT_ATTEMPT_DURATION)
Expand Down Expand Up @@ -388,8 +419,10 @@ void clientStreamNeverCreatedStillRecordMetrics() {
callAttemptsTracerFactory.callEnded(status);

Tags expectedDeadlineExceededStatusTags =
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME, GRPC_STATUS_TAG_KEY,
Status.Code.DEADLINE_EXCEEDED.toString());
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME,
GRPC_STATUS_TAG_KEY, Status.Code.DEADLINE_EXCEEDED.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE,
INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE);

HistogramSnapshot attemptDurationSnapshot = meterRegistry.get(CLIENT_ATTEMPT_DURATION)
.tags(expectedDeadlineExceededStatusTags)
Expand Down
Loading

0 comments on commit 0a5324d

Please sign in to comment.