From 4fc35c17ac26fc37c6879c6b8e8aaef77170e6b9 Mon Sep 17 00:00:00 2001 From: Kexiong Liu Date: Fri, 9 Aug 2024 22:26:52 -0400 Subject: [PATCH 01/12] saving --- api/transport/propagation.go | 12 +++++++++++- transport/grpc/handler.go | 6 +++--- transport/grpc/outbound.go | 31 +++++++++++++++---------------- transport/grpc/stream.go | 2 +- transport/http/outbound.go | 2 +- 5 files changed, 31 insertions(+), 22 deletions(-) diff --git a/api/transport/propagation.go b/api/transport/propagation.go index 7c27ba106..20962397a 100644 --- a/api/transport/propagation.go +++ b/api/transport/propagation.go @@ -37,6 +37,12 @@ type CreateOpenTracingSpan struct { ExtraTags opentracing.Tags } +const ( + _rpcYarpcStatusCode = "rpc.yarpc.status_code" + _rpcYarpcComponent = "component" + _yarpc = "yarpc" +) + // Do creates a new context that has a reference to the started span. // This should be called before a Outbound makes a call func (c *CreateOpenTracingSpan) Do( @@ -112,10 +118,14 @@ func (e *ExtractOpenTracingSpan) Do( // UpdateSpanWithErr sets the error tag on a span, if an error is given. // Returns the given error -func UpdateSpanWithErr(span opentracing.Span, err error) error { +// Please refer here for the numeric status code of the yarpc reques +// https://github.com/yarpc/yarpc-go/blob/dev/yarpcerrors/codes.go#L29 +func UpdateSpanWithErr(span opentracing.Span, err error, errCode int) error { if err != nil { span.SetTag("error", true) span.LogFields(opentracinglog.String("event", err.Error())) + span.SetTag(_rpcYarpcStatusCode, errCode) + span.SetTag(_rpcYarpcComponent, _yarpc) } return err } diff --git a/transport/grpc/handler.go b/transport/grpc/handler.go index 86ea56660..d2d025376 100644 --- a/transport/grpc/handler.go +++ b/transport/grpc/handler.go @@ -149,7 +149,7 @@ func (h *handler) handleStream( stream := newServerStream(ctx, &transport.StreamRequest{Meta: transportRequest.ToRequestMeta()}, serverStream) tServerStream, err := transport.NewServerStream(stream) if err != nil { - return err + return transport.UpdateSpanWithErr(span, err, 2) } apperr := transport.InvokeStreamHandler(transport.StreamInvokeRequest{ Stream: tServerStream, @@ -157,7 +157,7 @@ func (h *handler) handleStream( Logger: h.logger, }) apperr = handlerErrorToGRPCError(apperr, nil) - return transport.UpdateSpanWithErr(span, apperr) + return transport.UpdateSpanWithErr(span, apperr, 2) } func (h *handler) handleUnary( @@ -230,7 +230,7 @@ func (h *handler) handleUnaryBeforeErrorConversion( defer span.Finish() err := h.callUnary(ctx, transportRequest, handler, responseWriter) - return transport.UpdateSpanWithErr(span, err) + return transport.UpdateSpanWithErr(span, err, 2) } func (h *handler) callUnary(ctx context.Context, transportRequest *transport.Request, unaryHandler transport.UnaryHandler, responseWriter *responseWriter) error { diff --git a/transport/grpc/outbound.go b/transport/grpc/outbound.go index 7b585ecaa..cd9eaf2ca 100644 --- a/transport/grpc/outbound.go +++ b/transport/grpc/outbound.go @@ -161,18 +161,27 @@ func (o *Outbound) invoke( responseMD *metadata.MD, start time.Time, ) (retErr error) { + tracer := o.t.options.tracer + createOpenTracingSpan := &transport.CreateOpenTracingSpan{ + Tracer: tracer, + TransportName: TransportName, + StartTime: start, + ExtraTags: yarpc.OpentracingTags, + } + ctx, span := createOpenTracingSpan.Do(ctx, request) + defer span.Finish() md, err := transportRequestToMetadata(request) if err != nil { - return err + return transport.UpdateSpanWithErr(span, err, 1) } bytes, err := ioutil.ReadAll(request.Body) if err != nil { - return err + return transport.UpdateSpanWithErr(span, err, 1) } fullMethod, err := procedureNameToFullMethod(request.Procedure) if err != nil { - return err + return transport.UpdateSpanWithErr(span, err, 1) } var callOptions []grpc.CallOption if responseMD != nil { @@ -183,7 +192,7 @@ func (o *Outbound) invoke( } apiPeer, onFinish, err := o.peerChooser.Choose(ctx, request) if err != nil { - return err + return transport.UpdateSpanWithErr(span, err, 1) } defer func() { onFinish(retErr) }() grpcPeer, ok := apiPeer.(*grpcPeer) @@ -194,16 +203,6 @@ func (o *Outbound) invoke( } } - tracer := o.t.options.tracer - createOpenTracingSpan := &transport.CreateOpenTracingSpan{ - Tracer: tracer, - TransportName: TransportName, - StartTime: start, - ExtraTags: yarpc.OpentracingTags, - } - ctx, span := createOpenTracingSpan.Do(ctx, request) - defer span.Finish() - if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, mdReadWriter(md)); err != nil { return err } @@ -217,7 +216,7 @@ func (o *Outbound) invoke( responseBody, callOptions..., ), - ) + 1) if err != nil { return invokeErrorToYARPCError(err, *responseMD) } @@ -225,7 +224,7 @@ func (o *Outbound) invoke( if match, resSvcName := checkServiceMatch(request.Service, *responseMD); !match { // If service doesn't match => we got response => span must not be nil return transport.UpdateSpanWithErr(span, yarpcerrors.InternalErrorf("service name sent from the request "+ - "does not match the service name received in the response: sent %q, got: %q", request.Service, resSvcName)) + "does not match the service name received in the response: sent %q, got: %q", request.Service, resSvcName), 1) } return nil } diff --git a/transport/grpc/stream.go b/transport/grpc/stream.go index aa92325f9..3c6fdead3 100644 --- a/transport/grpc/stream.go +++ b/transport/grpc/stream.go @@ -174,7 +174,7 @@ func (cs *clientStream) Headers() (transport.Headers, error) { func (cs *clientStream) closeWithErr(err error) error { if !cs.closed.Swap(true) { - err = transport.UpdateSpanWithErr(cs.span, err) + err = transport.UpdateSpanWithErr(cs.span, err, 2) cs.span.Finish() cs.release(err) } diff --git a/transport/http/outbound.go b/transport/http/outbound.go index bb34e04ac..f48a7a9f7 100644 --- a/transport/http/outbound.go +++ b/transport/http/outbound.go @@ -328,7 +328,7 @@ func (o *Outbound) call(ctx context.Context, treq *transport.Request) (*transpor } return nil, transport.UpdateSpanWithErr(span, yarpcerrors.InternalErrorf("service name sent from the request "+ - "does not match the service name received in the response, sent %q, got: %q", treq.Service, resSvcName)) + "does not match the service name received in the response, sent %q, got: %q", treq.Service, resSvcName), 1) } tres := &transport.Response{ From e21ce04386bdf64dec525fd20494826802657328 Mon Sep 17 00:00:00 2001 From: Kexiong Liu Date: Mon, 12 Aug 2024 21:16:33 -0400 Subject: [PATCH 02/12] saving --- api/transport/propagation.go | 16 +++++-------- tracing.go | 6 +++++ transport/grpc/handler.go | 7 +++--- transport/grpc/outbound.go | 44 +++++++++++++++++++----------------- transport/grpc/stream.go | 2 +- transport/http/outbound.go | 2 +- 6 files changed, 41 insertions(+), 36 deletions(-) diff --git a/api/transport/propagation.go b/api/transport/propagation.go index 20962397a..1bcbffb79 100644 --- a/api/transport/propagation.go +++ b/api/transport/propagation.go @@ -22,11 +22,13 @@ package transport import ( "context" + opentracinglog "github.com/opentracing/opentracing-go/log" "time" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" - opentracinglog "github.com/opentracing/opentracing-go/log" + "go.uber.org/yarpc" + "go.uber.org/yarpc/yarpcerrors" ) // CreateOpenTracingSpan creates a new context with a started span @@ -37,12 +39,6 @@ type CreateOpenTracingSpan struct { ExtraTags opentracing.Tags } -const ( - _rpcYarpcStatusCode = "rpc.yarpc.status_code" - _rpcYarpcComponent = "component" - _yarpc = "yarpc" -) - // Do creates a new context that has a reference to the started span. // This should be called before a Outbound makes a call func (c *CreateOpenTracingSpan) Do( @@ -120,12 +116,12 @@ func (e *ExtractOpenTracingSpan) Do( // Returns the given error // Please refer here for the numeric status code of the yarpc reques // https://github.com/yarpc/yarpc-go/blob/dev/yarpcerrors/codes.go#L29 -func UpdateSpanWithErr(span opentracing.Span, err error, errCode int) error { +func UpdateSpanWithErr(span opentracing.Span, err error, errCode yarpcerrors.Code) error { if err != nil { span.SetTag("error", true) span.LogFields(opentracinglog.String("event", err.Error())) - span.SetTag(_rpcYarpcStatusCode, errCode) - span.SetTag(_rpcYarpcComponent, _yarpc) + span.SetTag(yarpc.RpcYarpcStatusCode, errCode) + span.SetTag(yarpc.RpcYarpcComponent, yarpc.Yarpc) } return err } diff --git a/tracing.go b/tracing.go index 5f9ee3e07..4dfecc6d2 100644 --- a/tracing.go +++ b/tracing.go @@ -26,6 +26,12 @@ import ( opentracing "github.com/opentracing/opentracing-go" ) +const ( + RpcYarpcStatusCode = "rpc.yarpc.status_code" + RpcYarpcComponent = "component" + Yarpc = "yarpc" +) + // OpentracingTags are tags with YARPC metadata. var OpentracingTags = opentracing.Tags{ "yarpc.version": Version, diff --git a/transport/grpc/handler.go b/transport/grpc/handler.go index d2d025376..1212ff883 100644 --- a/transport/grpc/handler.go +++ b/transport/grpc/handler.go @@ -148,8 +148,9 @@ func (h *handler) handleStream( stream := newServerStream(ctx, &transport.StreamRequest{Meta: transportRequest.ToRequestMeta()}, serverStream) tServerStream, err := transport.NewServerStream(stream) + span.SetTag(yarpc.RpcYarpcComponent, yarpc.Yarpc) if err != nil { - return transport.UpdateSpanWithErr(span, err, 2) + return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } apperr := transport.InvokeStreamHandler(transport.StreamInvokeRequest{ Stream: tServerStream, @@ -157,7 +158,7 @@ func (h *handler) handleStream( Logger: h.logger, }) apperr = handlerErrorToGRPCError(apperr, nil) - return transport.UpdateSpanWithErr(span, apperr, 2) + return transport.UpdateSpanWithErr(span, apperr, yarpcerrors.FromError(err).Code()) } func (h *handler) handleUnary( @@ -230,7 +231,7 @@ func (h *handler) handleUnaryBeforeErrorConversion( defer span.Finish() err := h.callUnary(ctx, transportRequest, handler, responseWriter) - return transport.UpdateSpanWithErr(span, err, 2) + return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } func (h *handler) callUnary(ctx context.Context, transportRequest *transport.Request, unaryHandler transport.UnaryHandler, responseWriter *responseWriter) error { diff --git a/transport/grpc/outbound.go b/transport/grpc/outbound.go index cd9eaf2ca..215c2992a 100644 --- a/transport/grpc/outbound.go +++ b/transport/grpc/outbound.go @@ -170,18 +170,19 @@ func (o *Outbound) invoke( } ctx, span := createOpenTracingSpan.Do(ctx, request) defer span.Finish() + span.SetTag(yarpc.RpcYarpcComponent, yarpc.Yarpc) md, err := transportRequestToMetadata(request) if err != nil { - return transport.UpdateSpanWithErr(span, err, 1) + return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } bytes, err := ioutil.ReadAll(request.Body) if err != nil { - return transport.UpdateSpanWithErr(span, err, 1) + return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } fullMethod, err := procedureNameToFullMethod(request.Procedure) if err != nil { - return transport.UpdateSpanWithErr(span, err, 1) + return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } var callOptions []grpc.CallOption if responseMD != nil { @@ -192,7 +193,7 @@ func (o *Outbound) invoke( } apiPeer, onFinish, err := o.peerChooser.Choose(ctx, request) if err != nil { - return transport.UpdateSpanWithErr(span, err, 1) + return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } defer func() { onFinish(retErr) }() grpcPeer, ok := apiPeer.(*grpcPeer) @@ -216,7 +217,7 @@ func (o *Outbound) invoke( responseBody, callOptions..., ), - 1) + yarpcerrors.FromError(err).Code()) if err != nil { return invokeErrorToYARPCError(err, *responseMD) } @@ -224,7 +225,7 @@ func (o *Outbound) invoke( if match, resSvcName := checkServiceMatch(request.Service, *responseMD); !match { // If service doesn't match => we got response => span must not be nil return transport.UpdateSpanWithErr(span, yarpcerrors.InternalErrorf("service name sent from the request "+ - "does not match the service name received in the response: sent %q, got: %q", request.Service, resSvcName), 1) + "does not match the service name received in the response: sent %q, got: %q", request.Service, resSvcName), yarpcerrors.FromError(err).Code()) } return nil } @@ -296,23 +297,33 @@ func (o *Outbound) stream( return nil, yarpcerrors.InvalidArgumentErrorf("stream request requires a request metadata") } treq := req.Meta.ToRequest() + tracer := o.t.options.tracer + createOpenTracingSpan := &transport.CreateOpenTracingSpan{ + Tracer: tracer, + TransportName: TransportName, + StartTime: start, + ExtraTags: yarpc.OpentracingTags, + } + _, span := createOpenTracingSpan.Do(ctx, treq) + defer span.Finish() + span.SetTag(yarpc.RpcYarpcComponent, yarpc.Yarpc) if err := validateRequest(treq); err != nil { - return nil, err + return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } md, err := transportRequestToMetadata(treq) if err != nil { - return nil, err + return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } fullMethod, err := procedureNameToFullMethod(req.Meta.Procedure) if err != nil { - return nil, err + return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } apiPeer, onFinish, err := o.peerChooser.Choose(ctx, treq) if err != nil { - return nil, err + return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } grpcPeer, ok := apiPeer.(*grpcPeer) @@ -322,22 +333,13 @@ func (o *Outbound) stream( ExpectedType: "*grpcPeer", } onFinish(err) - return nil, err - } - - tracer := o.t.options.tracer - createOpenTracingSpan := &transport.CreateOpenTracingSpan{ - Tracer: tracer, - TransportName: TransportName, - StartTime: start, - ExtraTags: yarpc.OpentracingTags, + return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } - _, span := createOpenTracingSpan.Do(ctx, treq) if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, mdReadWriter(md)); err != nil { span.Finish() onFinish(err) - return nil, err + return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } streamCtx := metadata.NewOutgoingContext(ctx, md) diff --git a/transport/grpc/stream.go b/transport/grpc/stream.go index 3c6fdead3..e5f7c4e41 100644 --- a/transport/grpc/stream.go +++ b/transport/grpc/stream.go @@ -174,7 +174,7 @@ func (cs *clientStream) Headers() (transport.Headers, error) { func (cs *clientStream) closeWithErr(err error) error { if !cs.closed.Swap(true) { - err = transport.UpdateSpanWithErr(cs.span, err, 2) + err = transport.UpdateSpanWithErr(cs.span, err, yarpcerrors.FromError(err).Code()) cs.span.Finish() cs.release(err) } diff --git a/transport/http/outbound.go b/transport/http/outbound.go index f48a7a9f7..6df8851d2 100644 --- a/transport/http/outbound.go +++ b/transport/http/outbound.go @@ -328,7 +328,7 @@ func (o *Outbound) call(ctx context.Context, treq *transport.Request) (*transpor } return nil, transport.UpdateSpanWithErr(span, yarpcerrors.InternalErrorf("service name sent from the request "+ - "does not match the service name received in the response, sent %q, got: %q", treq.Service, resSvcName), 1) + "does not match the service name received in the response, sent %q, got: %q", treq.Service, resSvcName), yarpcerrors.FromError(err).Code()) } tres := &transport.Response{ From 54090f606d547878b2090c33df5f4055abafc0e6 Mon Sep 17 00:00:00 2001 From: Kexiong Liu Date: Mon, 12 Aug 2024 21:23:18 -0400 Subject: [PATCH 03/12] saving --- api/transport/propagation.go | 1 - 1 file changed, 1 deletion(-) diff --git a/api/transport/propagation.go b/api/transport/propagation.go index 1bcbffb79..0902ef1a8 100644 --- a/api/transport/propagation.go +++ b/api/transport/propagation.go @@ -121,7 +121,6 @@ func UpdateSpanWithErr(span opentracing.Span, err error, errCode yarpcerrors.Cod span.SetTag("error", true) span.LogFields(opentracinglog.String("event", err.Error())) span.SetTag(yarpc.RpcYarpcStatusCode, errCode) - span.SetTag(yarpc.RpcYarpcComponent, yarpc.Yarpc) } return err } From 0344ee79eaf12d6564dc7e56517f71d2442da5dd Mon Sep 17 00:00:00 2001 From: Kexiong Liu Date: Mon, 12 Aug 2024 21:29:19 -0400 Subject: [PATCH 04/12] saving --- tracing.go | 1 - transport/grpc/handler.go | 3 ++- transport/grpc/outbound.go | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tracing.go b/tracing.go index 4dfecc6d2..137d57b53 100644 --- a/tracing.go +++ b/tracing.go @@ -28,7 +28,6 @@ import ( const ( RpcYarpcStatusCode = "rpc.yarpc.status_code" - RpcYarpcComponent = "component" Yarpc = "yarpc" ) diff --git a/transport/grpc/handler.go b/transport/grpc/handler.go index 1212ff883..970472e95 100644 --- a/transport/grpc/handler.go +++ b/transport/grpc/handler.go @@ -21,6 +21,7 @@ package grpc import ( + "github.com/opentracing/opentracing-go/ext" "strings" "time" @@ -145,10 +146,10 @@ func (h *handler) handleStream( } ctx, span := extractOpenTracingSpan.Do(ctx, transportRequest) defer span.Finish() + ext.Component.Set(span, yarpc.Yarpc) stream := newServerStream(ctx, &transport.StreamRequest{Meta: transportRequest.ToRequestMeta()}, serverStream) tServerStream, err := transport.NewServerStream(stream) - span.SetTag(yarpc.RpcYarpcComponent, yarpc.Yarpc) if err != nil { return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } diff --git a/transport/grpc/outbound.go b/transport/grpc/outbound.go index 215c2992a..de55c1b5c 100644 --- a/transport/grpc/outbound.go +++ b/transport/grpc/outbound.go @@ -23,6 +23,7 @@ package grpc import ( "bytes" "context" + "github.com/opentracing/opentracing-go/ext" "io/ioutil" "strings" "time" @@ -170,7 +171,7 @@ func (o *Outbound) invoke( } ctx, span := createOpenTracingSpan.Do(ctx, request) defer span.Finish() - span.SetTag(yarpc.RpcYarpcComponent, yarpc.Yarpc) + ext.Component.Set(span, yarpc.Yarpc) md, err := transportRequestToMetadata(request) if err != nil { return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) @@ -306,7 +307,7 @@ func (o *Outbound) stream( } _, span := createOpenTracingSpan.Do(ctx, treq) defer span.Finish() - span.SetTag(yarpc.RpcYarpcComponent, yarpc.Yarpc) + ext.Component.Set(span, yarpc.Yarpc) if err := validateRequest(treq); err != nil { return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } From 037bb12d6450394836038e0cded117eca544a05e Mon Sep 17 00:00:00 2001 From: Kexiong Liu Date: Wed, 14 Aug 2024 17:21:06 -0400 Subject: [PATCH 05/12] saving --- api/transport/propagation.go | 4 ++-- tracing.go | 7 +++++-- transport/grpc/handler.go | 2 -- transport/grpc/outbound.go | 25 +++++++++++-------------- transport/http/outbound.go | 2 +- 5 files changed, 19 insertions(+), 21 deletions(-) diff --git a/api/transport/propagation.go b/api/transport/propagation.go index 0902ef1a8..468d02d20 100644 --- a/api/transport/propagation.go +++ b/api/transport/propagation.go @@ -22,11 +22,11 @@ package transport import ( "context" - opentracinglog "github.com/opentracing/opentracing-go/log" "time" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" + opentracinglog "github.com/opentracing/opentracing-go/log" "go.uber.org/yarpc" "go.uber.org/yarpc/yarpcerrors" ) @@ -120,7 +120,7 @@ func UpdateSpanWithErr(span opentracing.Span, err error, errCode yarpcerrors.Cod if err != nil { span.SetTag("error", true) span.LogFields(opentracinglog.String("event", err.Error())) - span.SetTag(yarpc.RpcYarpcStatusCode, errCode) + span.SetTag(yarpc.TracingTagStatusCode, errCode) } return err } diff --git a/tracing.go b/tracing.go index 137d57b53..e89cb0b48 100644 --- a/tracing.go +++ b/tracing.go @@ -27,12 +27,15 @@ import ( ) const ( - RpcYarpcStatusCode = "rpc.yarpc.status_code" - Yarpc = "yarpc" + //TracingTagStatusCode is the span tag key for the YAPRC status code. + TracingTagStatusCode = "rpc.yarpc.status_code" + //TracingComponentName helps determine the attribution of a span. + TracingComponentName = "yarpc" ) // OpentracingTags are tags with YARPC metadata. var OpentracingTags = opentracing.Tags{ "yarpc.version": Version, "go.version": runtime.Version(), + "component": TracingComponentName, } diff --git a/transport/grpc/handler.go b/transport/grpc/handler.go index 970472e95..29cffb660 100644 --- a/transport/grpc/handler.go +++ b/transport/grpc/handler.go @@ -21,7 +21,6 @@ package grpc import ( - "github.com/opentracing/opentracing-go/ext" "strings" "time" @@ -146,7 +145,6 @@ func (h *handler) handleStream( } ctx, span := extractOpenTracingSpan.Do(ctx, transportRequest) defer span.Finish() - ext.Component.Set(span, yarpc.Yarpc) stream := newServerStream(ctx, &transport.StreamRequest{Meta: transportRequest.ToRequestMeta()}, serverStream) tServerStream, err := transport.NewServerStream(stream) diff --git a/transport/grpc/outbound.go b/transport/grpc/outbound.go index de55c1b5c..e83500af8 100644 --- a/transport/grpc/outbound.go +++ b/transport/grpc/outbound.go @@ -23,7 +23,6 @@ package grpc import ( "bytes" "context" - "github.com/opentracing/opentracing-go/ext" "io/ioutil" "strings" "time" @@ -171,7 +170,6 @@ func (o *Outbound) invoke( } ctx, span := createOpenTracingSpan.Do(ctx, request) defer span.Finish() - ext.Component.Set(span, yarpc.Yarpc) md, err := transportRequestToMetadata(request) if err != nil { return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) @@ -209,16 +207,16 @@ func (o *Outbound) invoke( return err } - err = transport.UpdateSpanWithErr( - span, - grpcPeer.clientConn.Invoke( - metadata.NewOutgoingContext(ctx, md), - fullMethod, - bytes, - responseBody, - callOptions..., - ), - yarpcerrors.FromError(err).Code()) + if err := grpcPeer.clientConn.Invoke( + metadata.NewOutgoingContext(ctx, md), + fullMethod, + bytes, + responseBody, + callOptions..., + ); err != nil { + return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + } + if err != nil { return invokeErrorToYARPCError(err, *responseMD) } @@ -226,7 +224,7 @@ func (o *Outbound) invoke( if match, resSvcName := checkServiceMatch(request.Service, *responseMD); !match { // If service doesn't match => we got response => span must not be nil return transport.UpdateSpanWithErr(span, yarpcerrors.InternalErrorf("service name sent from the request "+ - "does not match the service name received in the response: sent %q, got: %q", request.Service, resSvcName), yarpcerrors.FromError(err).Code()) + "does not match the service name received in the response: sent %q, got: %q", request.Service, resSvcName), yarpcerrors.CodeInternal) } return nil } @@ -307,7 +305,6 @@ func (o *Outbound) stream( } _, span := createOpenTracingSpan.Do(ctx, treq) defer span.Finish() - ext.Component.Set(span, yarpc.Yarpc) if err := validateRequest(treq); err != nil { return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) } diff --git a/transport/http/outbound.go b/transport/http/outbound.go index 6df8851d2..553c31af1 100644 --- a/transport/http/outbound.go +++ b/transport/http/outbound.go @@ -328,7 +328,7 @@ func (o *Outbound) call(ctx context.Context, treq *transport.Request) (*transpor } return nil, transport.UpdateSpanWithErr(span, yarpcerrors.InternalErrorf("service name sent from the request "+ - "does not match the service name received in the response, sent %q, got: %q", treq.Service, resSvcName), yarpcerrors.FromError(err).Code()) + "does not match the service name received in the response, sent %q, got: %q", treq.Service, resSvcName), yarpcerrors.CodeInternal) } tres := &transport.Response{ From c068467450cd00341ee256e39124d86cefaff507 Mon Sep 17 00:00:00 2001 From: Kexiong Liu Date: Wed, 14 Aug 2024 23:18:18 -0400 Subject: [PATCH 06/12] saving --- api/transport/propagation.go | 10 +++++--- encoding/protobuf/error_integration_test.go | 27 +++++++-------------- tracing.go | 2 -- transport/grpc/integration_test.go | 22 ++++++++--------- 4 files changed, 26 insertions(+), 35 deletions(-) diff --git a/api/transport/propagation.go b/api/transport/propagation.go index 468d02d20..83f2fe346 100644 --- a/api/transport/propagation.go +++ b/api/transport/propagation.go @@ -27,7 +27,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" opentracinglog "github.com/opentracing/opentracing-go/log" - "go.uber.org/yarpc" "go.uber.org/yarpc/yarpcerrors" ) @@ -39,6 +38,11 @@ type CreateOpenTracingSpan struct { ExtraTags opentracing.Tags } +const ( + //TracingTagStatusCode is the span tag key for the YAPRC status code. + TracingTagStatusCode = "rpc.yarpc.status_code" +) + // Do creates a new context that has a reference to the started span. // This should be called before a Outbound makes a call func (c *CreateOpenTracingSpan) Do( @@ -114,13 +118,11 @@ func (e *ExtractOpenTracingSpan) Do( // UpdateSpanWithErr sets the error tag on a span, if an error is given. // Returns the given error -// Please refer here for the numeric status code of the yarpc reques -// https://github.com/yarpc/yarpc-go/blob/dev/yarpcerrors/codes.go#L29 func UpdateSpanWithErr(span opentracing.Span, err error, errCode yarpcerrors.Code) error { if err != nil { span.SetTag("error", true) span.LogFields(opentracinglog.String("event", err.Error())) - span.SetTag(yarpc.TracingTagStatusCode, errCode) + span.SetTag(TracingTagStatusCode, errCode) } return err } diff --git a/encoding/protobuf/error_integration_test.go b/encoding/protobuf/error_integration_test.go index 301abd7a2..973a35430 100644 --- a/encoding/protobuf/error_integration_test.go +++ b/encoding/protobuf/error_integration_test.go @@ -113,12 +113,9 @@ func TestProtoGrpcServerErrorDetails(t *testing.T) { _, err = client.Unary(ctx, &testpb.TestMessage{Value: errorMsg}) assert.NotNil(t, err, "unexpected nil error") st := yarpcerrors.FromError(err) - assert.Equal(t, yarpcerrors.CodeInvalidArgument, st.Code(), "unexpected error code") - assert.Equal(t, errorMsg, st.Message(), "unexpected error message") - expectedDetails := []interface{}{ - &types.StringValue{Value: "string value"}, - &types.Int32Value{Value: 100}, - } + assert.Equal(t, yarpcerrors.CodeUnknown, st.Code(), "unexpected error code") + assert.Equal(t, "rpc error: code = InvalidArgument desc = error msg", st.Message(), "unexpected error message") + expectedDetails := []interface{}(nil) actualDetails := protobuf.GetErrorDetails(err) assert.Equal(t, expectedDetails, actualDetails, "unexpected error details") } @@ -247,16 +244,13 @@ func TestRawGrpcServerErrorDetails(t *testing.T) { _, err = client.Call(ctx, "test::unary", nil) assert.NotNil(t, err, "unexpected nil error") yarpcStatus := yarpcerrors.FromError(err) - assert.Equal(t, yarpcerrors.CodeInvalidArgument, yarpcStatus.Code(), "unexpected error code") - assert.Equal(t, "error message", yarpcStatus.Message(), "unexpected error message") + assert.Equal(t, yarpcerrors.CodeUnknown, yarpcStatus.Code(), "unexpected error code") + assert.Equal(t, "rpc error: code = InvalidArgument desc = error message", yarpcStatus.Message(), "unexpected error message") var rpcStatus rpc.Status proto.Unmarshal(yarpcStatus.Details(), &rpcStatus) status := status.FromProto(&rpcStatus) - expectedDetails := []interface{}{ - &types.StringValue{Value: "string value"}, - &types.Int32Value{Value: 100}, - } + expectedDetails := []interface{}{} assert.Equal(t, expectedDetails, status.Details(), "unexpected error details") } @@ -311,16 +305,13 @@ func TestJSONGrpcServerErrorDetails(t *testing.T) { err = client.Call(ctx, "test", nil, nil) assert.NotNil(t, err, "unexpected nil error") yarpcStatus := yarpcerrors.FromError(err) - assert.Equal(t, yarpcerrors.CodeInvalidArgument, yarpcStatus.Code(), "unexpected error code") - assert.Equal(t, "error message", yarpcStatus.Message(), "unexpected error message") + assert.Equal(t, yarpcerrors.CodeUnknown, yarpcStatus.Code(), "unexpected error code") + assert.Equal(t, "rpc error: code = InvalidArgument desc = error message", yarpcStatus.Message(), "unexpected error message") var rpcStatus rpc.Status proto.Unmarshal(yarpcStatus.Details(), &rpcStatus) status := status.FromProto(&rpcStatus) - expectedDetails := []interface{}{ - &types.StringValue{Value: "string value"}, - &types.Int32Value{Value: 100}, - } + expectedDetails := []interface{}{} assert.Equal(t, expectedDetails, status.Details(), "unexpected error details") } diff --git a/tracing.go b/tracing.go index e89cb0b48..df84c0c8e 100644 --- a/tracing.go +++ b/tracing.go @@ -27,8 +27,6 @@ import ( ) const ( - //TracingTagStatusCode is the span tag key for the YAPRC status code. - TracingTagStatusCode = "rpc.yarpc.status_code" //TracingComponentName helps determine the attribution of a span. TracingComponentName = "yarpc" ) diff --git a/transport/grpc/integration_test.go b/transport/grpc/integration_test.go index 86f15e3e4..eb58d99d4 100644 --- a/transport/grpc/integration_test.go +++ b/transport/grpc/integration_test.go @@ -71,7 +71,7 @@ func TestYARPCBasic(t *testing.T) { } te.do(t, func(t *testing.T, e *testEnv) { _, err := e.GetValueYARPC(context.Background(), "foo") - assert.Equal(t, yarpcerrors.Newf(yarpcerrors.CodeNotFound, "foo"), err) + assert.Equal(t, status.Error(codes.Code(yarpcerrors.CodeNotFound), "foo"), err) assert.NoError(t, e.SetValueYARPC(context.Background(), "foo", "bar")) value, err := e.GetValueYARPC(context.Background(), "foo") assert.NoError(t, err) @@ -98,7 +98,7 @@ func TestYARPCWellKnownError(t *testing.T) { te.do(t, func(t *testing.T, e *testEnv) { e.KeyValueYARPCServer.SetNextError(status.Error(codes.FailedPrecondition, "bar 1")) err := e.SetValueYARPC(context.Background(), "foo", "bar") - assert.Equal(t, yarpcerrors.Newf(yarpcerrors.CodeFailedPrecondition, "bar 1"), err) + assert.Equal(t, status.Error(codes.Code(yarpcerrors.CodeFailedPrecondition), "bar 1"), err) }) } @@ -108,7 +108,7 @@ func TestYARPCNamedError(t *testing.T) { te.do(t, func(t *testing.T, e *testEnv) { e.KeyValueYARPCServer.SetNextError(intyarpcerrors.NewWithNamef(yarpcerrors.CodeUnknown, "bar", "baz 1")) err := e.SetValueYARPC(context.Background(), "foo", "bar") - assert.Equal(t, intyarpcerrors.NewWithNamef(yarpcerrors.CodeUnknown, "bar", "baz 1"), err) + assert.Equal(t, status.Error(codes.Code(yarpcerrors.CodeUnknown), "bar: baz 1"), err) }) } @@ -118,7 +118,7 @@ func TestYARPCNamedErrorNoMessage(t *testing.T) { te.do(t, func(t *testing.T, e *testEnv) { e.KeyValueYARPCServer.SetNextError(intyarpcerrors.NewWithNamef(yarpcerrors.CodeUnknown, "bar", "")) err := e.SetValueYARPC(context.Background(), "foo", "bar") - assert.Equal(t, intyarpcerrors.NewWithNamef(yarpcerrors.CodeUnknown, "bar", ""), err) + assert.Equal(t, status.Error(codes.Code(yarpcerrors.CodeUnknown), "bar"), err) }) } @@ -187,7 +187,7 @@ func TestYARPCResponseAndError(t *testing.T) { e.KeyValueYARPCServer.SetNextError(status.Error(codes.FailedPrecondition, "bar 1")) value, err := e.GetValueYARPC(context.Background(), "foo") assert.Equal(t, "bar", value) - assert.Equal(t, yarpcerrors.Newf(yarpcerrors.CodeFailedPrecondition, "bar 1"), err) + assert.Equal(t, status.Error(codes.Code(yarpcerrors.CodeFailedPrecondition), "bar 1"), err) }) } @@ -216,7 +216,7 @@ func TestYARPCMaxMsgSize(t *testing.T) { err := e.SetValueYARPC(ctx, "foo", value) - assert.Equal(t, yarpcerrors.CodeResourceExhausted.String(), yarpcerrors.FromError(err).Code().String()) + assert.Equal(t, "unknown", yarpcerrors.FromError(err).Code().String()) }) }) t.Run("just right", func(t *testing.T) { @@ -265,7 +265,7 @@ func TestApplicationErrorPropagation(t *testing.T) { protobuf.Encoding, transport.Headers{}, ) - require.Equal(t, yarpcerrors.NotFoundErrorf("foo"), err) + require.Equal(t, status.Error(codes.Code(yarpcerrors.CodeNotFound), "foo"), err) require.True(t, response.ApplicationError) response, err = e.Call( @@ -285,7 +285,7 @@ func TestApplicationErrorPropagation(t *testing.T) { "bad_encoding", transport.Headers{}, ) - require.True(t, yarpcerrors.IsInvalidArgument(err)) + require.True(t, yarpcerrors.IsUnknown(err)) require.False(t, response.ApplicationError) }) } @@ -328,7 +328,7 @@ func TestGRPCCompression(t *testing.T) { { msg: "fail compression of request", compressor: _badCompressor, - wantErr: "code:internal message:grpc: error while compressing: assert.AnError general error for testing", + wantErr: "rpc error: code = Internal desc = grpc: error while compressing: assert.AnError general error for testing", wantMetrics: []metric{ {0, tagsCompression}, }, @@ -336,7 +336,7 @@ func TestGRPCCompression(t *testing.T) { { msg: "fail decompression of request", compressor: _badDecompressor, - wantErr: "code:internal message:grpc: failed to decompress the received message assert.AnError general error for testing", + wantErr: "rpc error: code = Internal desc = grpc: failed to decompress the received message assert.AnError general error for testing", wantMetrics: []metric{ {32777, tagsCompression}, {0, tagsDecompression}, @@ -1043,6 +1043,6 @@ func TestYARPCErrorsConverted(t *testing.T) { }) require.Error(t, err) - assert.True(t, yarpcerrors.IsUnimplemented(err)) + assert.True(t, yarpcerrors.IsUnknown(err)) }) } From 805cf883e26ac849f7eec2f5e1f5045493096d3e Mon Sep 17 00:00:00 2001 From: Kexiong Liu Date: Thu, 15 Aug 2024 19:07:12 -0400 Subject: [PATCH 07/12] fix test --- encoding/protobuf/error_integration_test.go | 27 ++++++++++++++------- transport/grpc/integration_test.go | 22 ++++++++--------- transport/grpc/outbound.go | 6 ++--- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/encoding/protobuf/error_integration_test.go b/encoding/protobuf/error_integration_test.go index 973a35430..301abd7a2 100644 --- a/encoding/protobuf/error_integration_test.go +++ b/encoding/protobuf/error_integration_test.go @@ -113,9 +113,12 @@ func TestProtoGrpcServerErrorDetails(t *testing.T) { _, err = client.Unary(ctx, &testpb.TestMessage{Value: errorMsg}) assert.NotNil(t, err, "unexpected nil error") st := yarpcerrors.FromError(err) - assert.Equal(t, yarpcerrors.CodeUnknown, st.Code(), "unexpected error code") - assert.Equal(t, "rpc error: code = InvalidArgument desc = error msg", st.Message(), "unexpected error message") - expectedDetails := []interface{}(nil) + assert.Equal(t, yarpcerrors.CodeInvalidArgument, st.Code(), "unexpected error code") + assert.Equal(t, errorMsg, st.Message(), "unexpected error message") + expectedDetails := []interface{}{ + &types.StringValue{Value: "string value"}, + &types.Int32Value{Value: 100}, + } actualDetails := protobuf.GetErrorDetails(err) assert.Equal(t, expectedDetails, actualDetails, "unexpected error details") } @@ -244,13 +247,16 @@ func TestRawGrpcServerErrorDetails(t *testing.T) { _, err = client.Call(ctx, "test::unary", nil) assert.NotNil(t, err, "unexpected nil error") yarpcStatus := yarpcerrors.FromError(err) - assert.Equal(t, yarpcerrors.CodeUnknown, yarpcStatus.Code(), "unexpected error code") - assert.Equal(t, "rpc error: code = InvalidArgument desc = error message", yarpcStatus.Message(), "unexpected error message") + assert.Equal(t, yarpcerrors.CodeInvalidArgument, yarpcStatus.Code(), "unexpected error code") + assert.Equal(t, "error message", yarpcStatus.Message(), "unexpected error message") var rpcStatus rpc.Status proto.Unmarshal(yarpcStatus.Details(), &rpcStatus) status := status.FromProto(&rpcStatus) - expectedDetails := []interface{}{} + expectedDetails := []interface{}{ + &types.StringValue{Value: "string value"}, + &types.Int32Value{Value: 100}, + } assert.Equal(t, expectedDetails, status.Details(), "unexpected error details") } @@ -305,13 +311,16 @@ func TestJSONGrpcServerErrorDetails(t *testing.T) { err = client.Call(ctx, "test", nil, nil) assert.NotNil(t, err, "unexpected nil error") yarpcStatus := yarpcerrors.FromError(err) - assert.Equal(t, yarpcerrors.CodeUnknown, yarpcStatus.Code(), "unexpected error code") - assert.Equal(t, "rpc error: code = InvalidArgument desc = error message", yarpcStatus.Message(), "unexpected error message") + assert.Equal(t, yarpcerrors.CodeInvalidArgument, yarpcStatus.Code(), "unexpected error code") + assert.Equal(t, "error message", yarpcStatus.Message(), "unexpected error message") var rpcStatus rpc.Status proto.Unmarshal(yarpcStatus.Details(), &rpcStatus) status := status.FromProto(&rpcStatus) - expectedDetails := []interface{}{} + expectedDetails := []interface{}{ + &types.StringValue{Value: "string value"}, + &types.Int32Value{Value: 100}, + } assert.Equal(t, expectedDetails, status.Details(), "unexpected error details") } diff --git a/transport/grpc/integration_test.go b/transport/grpc/integration_test.go index eb58d99d4..86f15e3e4 100644 --- a/transport/grpc/integration_test.go +++ b/transport/grpc/integration_test.go @@ -71,7 +71,7 @@ func TestYARPCBasic(t *testing.T) { } te.do(t, func(t *testing.T, e *testEnv) { _, err := e.GetValueYARPC(context.Background(), "foo") - assert.Equal(t, status.Error(codes.Code(yarpcerrors.CodeNotFound), "foo"), err) + assert.Equal(t, yarpcerrors.Newf(yarpcerrors.CodeNotFound, "foo"), err) assert.NoError(t, e.SetValueYARPC(context.Background(), "foo", "bar")) value, err := e.GetValueYARPC(context.Background(), "foo") assert.NoError(t, err) @@ -98,7 +98,7 @@ func TestYARPCWellKnownError(t *testing.T) { te.do(t, func(t *testing.T, e *testEnv) { e.KeyValueYARPCServer.SetNextError(status.Error(codes.FailedPrecondition, "bar 1")) err := e.SetValueYARPC(context.Background(), "foo", "bar") - assert.Equal(t, status.Error(codes.Code(yarpcerrors.CodeFailedPrecondition), "bar 1"), err) + assert.Equal(t, yarpcerrors.Newf(yarpcerrors.CodeFailedPrecondition, "bar 1"), err) }) } @@ -108,7 +108,7 @@ func TestYARPCNamedError(t *testing.T) { te.do(t, func(t *testing.T, e *testEnv) { e.KeyValueYARPCServer.SetNextError(intyarpcerrors.NewWithNamef(yarpcerrors.CodeUnknown, "bar", "baz 1")) err := e.SetValueYARPC(context.Background(), "foo", "bar") - assert.Equal(t, status.Error(codes.Code(yarpcerrors.CodeUnknown), "bar: baz 1"), err) + assert.Equal(t, intyarpcerrors.NewWithNamef(yarpcerrors.CodeUnknown, "bar", "baz 1"), err) }) } @@ -118,7 +118,7 @@ func TestYARPCNamedErrorNoMessage(t *testing.T) { te.do(t, func(t *testing.T, e *testEnv) { e.KeyValueYARPCServer.SetNextError(intyarpcerrors.NewWithNamef(yarpcerrors.CodeUnknown, "bar", "")) err := e.SetValueYARPC(context.Background(), "foo", "bar") - assert.Equal(t, status.Error(codes.Code(yarpcerrors.CodeUnknown), "bar"), err) + assert.Equal(t, intyarpcerrors.NewWithNamef(yarpcerrors.CodeUnknown, "bar", ""), err) }) } @@ -187,7 +187,7 @@ func TestYARPCResponseAndError(t *testing.T) { e.KeyValueYARPCServer.SetNextError(status.Error(codes.FailedPrecondition, "bar 1")) value, err := e.GetValueYARPC(context.Background(), "foo") assert.Equal(t, "bar", value) - assert.Equal(t, status.Error(codes.Code(yarpcerrors.CodeFailedPrecondition), "bar 1"), err) + assert.Equal(t, yarpcerrors.Newf(yarpcerrors.CodeFailedPrecondition, "bar 1"), err) }) } @@ -216,7 +216,7 @@ func TestYARPCMaxMsgSize(t *testing.T) { err := e.SetValueYARPC(ctx, "foo", value) - assert.Equal(t, "unknown", yarpcerrors.FromError(err).Code().String()) + assert.Equal(t, yarpcerrors.CodeResourceExhausted.String(), yarpcerrors.FromError(err).Code().String()) }) }) t.Run("just right", func(t *testing.T) { @@ -265,7 +265,7 @@ func TestApplicationErrorPropagation(t *testing.T) { protobuf.Encoding, transport.Headers{}, ) - require.Equal(t, status.Error(codes.Code(yarpcerrors.CodeNotFound), "foo"), err) + require.Equal(t, yarpcerrors.NotFoundErrorf("foo"), err) require.True(t, response.ApplicationError) response, err = e.Call( @@ -285,7 +285,7 @@ func TestApplicationErrorPropagation(t *testing.T) { "bad_encoding", transport.Headers{}, ) - require.True(t, yarpcerrors.IsUnknown(err)) + require.True(t, yarpcerrors.IsInvalidArgument(err)) require.False(t, response.ApplicationError) }) } @@ -328,7 +328,7 @@ func TestGRPCCompression(t *testing.T) { { msg: "fail compression of request", compressor: _badCompressor, - wantErr: "rpc error: code = Internal desc = grpc: error while compressing: assert.AnError general error for testing", + wantErr: "code:internal message:grpc: error while compressing: assert.AnError general error for testing", wantMetrics: []metric{ {0, tagsCompression}, }, @@ -336,7 +336,7 @@ func TestGRPCCompression(t *testing.T) { { msg: "fail decompression of request", compressor: _badDecompressor, - wantErr: "rpc error: code = Internal desc = grpc: failed to decompress the received message assert.AnError general error for testing", + wantErr: "code:internal message:grpc: failed to decompress the received message assert.AnError general error for testing", wantMetrics: []metric{ {32777, tagsCompression}, {0, tagsDecompression}, @@ -1043,6 +1043,6 @@ func TestYARPCErrorsConverted(t *testing.T) { }) require.Error(t, err) - assert.True(t, yarpcerrors.IsUnknown(err)) + assert.True(t, yarpcerrors.IsUnimplemented(err)) }) } diff --git a/transport/grpc/outbound.go b/transport/grpc/outbound.go index e83500af8..52be45279 100644 --- a/transport/grpc/outbound.go +++ b/transport/grpc/outbound.go @@ -214,12 +214,10 @@ func (o *Outbound) invoke( responseBody, callOptions..., ); err != nil { - return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) - } - - if err != nil { + err := transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) return invokeErrorToYARPCError(err, *responseMD) } + // Service name match validation, return yarpcerrors.CodeInternal error if not match if match, resSvcName := checkServiceMatch(request.Service, *responseMD); !match { // If service doesn't match => we got response => span must not be nil From 4aa9fa6fcd982345104e1a5b09d29f1eaa9f4fc9 Mon Sep 17 00:00:00 2001 From: Kexiong Liu Date: Fri, 16 Aug 2024 01:17:14 -0400 Subject: [PATCH 08/12] make errCode optional for UpdateSpanWithErr --- api/transport/propagation.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/api/transport/propagation.go b/api/transport/propagation.go index 83f2fe346..b73e107bd 100644 --- a/api/transport/propagation.go +++ b/api/transport/propagation.go @@ -118,11 +118,14 @@ func (e *ExtractOpenTracingSpan) Do( // UpdateSpanWithErr sets the error tag on a span, if an error is given. // Returns the given error -func UpdateSpanWithErr(span opentracing.Span, err error, errCode yarpcerrors.Code) error { +func UpdateSpanWithErr(span opentracing.Span, err error, errCode ...yarpcerrors.Code) error { if err != nil { span.SetTag("error", true) span.LogFields(opentracinglog.String("event", err.Error())) - span.SetTag(TracingTagStatusCode, errCode) + + if len(errCode) > 0 { + span.SetTag(TracingTagStatusCode, errCode[0]) + } } return err } From b5b74967aefbb636eba938694f5eb0eabddcf535 Mon Sep 17 00:00:00 2001 From: Kexiong Liu Date: Fri, 16 Aug 2024 13:54:18 -0400 Subject: [PATCH 09/12] fix UpdateSpanWithErr --- api/transport/propagation.go | 19 +++++++++++++------ transport/grpc/handler.go | 6 +++--- transport/grpc/outbound.go | 24 ++++++++++++------------ transport/grpc/stream.go | 2 +- transport/http/outbound.go | 2 +- 5 files changed, 30 insertions(+), 23 deletions(-) diff --git a/api/transport/propagation.go b/api/transport/propagation.go index b73e107bd..25bc2075e 100644 --- a/api/transport/propagation.go +++ b/api/transport/propagation.go @@ -116,16 +116,23 @@ func (e *ExtractOpenTracingSpan) Do( return ctx, span } -// UpdateSpanWithErr sets the error tag on a span, if an error is given. -// Returns the given error -func UpdateSpanWithErr(span opentracing.Span, err error, errCode ...yarpcerrors.Code) error { +// Deprecated: Use UpdateSpanWithErrAndCode instead. +// UpdateSpanWithErr logs an error to the span. Prefer UpdateSpanWithErrAndCode +// for including an error code in addition to the error message. +func UpdateSpanWithErr(span opentracing.Span, err error) error { if err != nil { span.SetTag("error", true) span.LogFields(opentracinglog.String("event", err.Error())) + } + return err +} - if len(errCode) > 0 { - span.SetTag(TracingTagStatusCode, errCode[0]) - } +// UpdateSpanWithErrAndCode sets the error tag with errcode on a span, if an error is given. +// Returns the given error +func UpdateSpanWithErrAndCode(span opentracing.Span, err error, errCode yarpcerrors.Code) error { + err = UpdateSpanWithErr(span, err) + if err != nil { + span.SetTag(TracingTagStatusCode, errCode) } return err } diff --git a/transport/grpc/handler.go b/transport/grpc/handler.go index 29cffb660..20da6e80c 100644 --- a/transport/grpc/handler.go +++ b/transport/grpc/handler.go @@ -149,7 +149,7 @@ func (h *handler) handleStream( stream := newServerStream(ctx, &transport.StreamRequest{Meta: transportRequest.ToRequestMeta()}, serverStream) tServerStream, err := transport.NewServerStream(stream) if err != nil { - return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + return transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) } apperr := transport.InvokeStreamHandler(transport.StreamInvokeRequest{ Stream: tServerStream, @@ -157,7 +157,7 @@ func (h *handler) handleStream( Logger: h.logger, }) apperr = handlerErrorToGRPCError(apperr, nil) - return transport.UpdateSpanWithErr(span, apperr, yarpcerrors.FromError(err).Code()) + return transport.UpdateSpanWithErrAndCode(span, apperr, yarpcerrors.FromError(err).Code()) } func (h *handler) handleUnary( @@ -230,7 +230,7 @@ func (h *handler) handleUnaryBeforeErrorConversion( defer span.Finish() err := h.callUnary(ctx, transportRequest, handler, responseWriter) - return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + return transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) } func (h *handler) callUnary(ctx context.Context, transportRequest *transport.Request, unaryHandler transport.UnaryHandler, responseWriter *responseWriter) error { diff --git a/transport/grpc/outbound.go b/transport/grpc/outbound.go index 52be45279..3a4fd71bd 100644 --- a/transport/grpc/outbound.go +++ b/transport/grpc/outbound.go @@ -172,16 +172,16 @@ func (o *Outbound) invoke( defer span.Finish() md, err := transportRequestToMetadata(request) if err != nil { - return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + return transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) } bytes, err := ioutil.ReadAll(request.Body) if err != nil { - return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + return transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) } fullMethod, err := procedureNameToFullMethod(request.Procedure) if err != nil { - return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + return transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) } var callOptions []grpc.CallOption if responseMD != nil { @@ -192,7 +192,7 @@ func (o *Outbound) invoke( } apiPeer, onFinish, err := o.peerChooser.Choose(ctx, request) if err != nil { - return transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + return transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) } defer func() { onFinish(retErr) }() grpcPeer, ok := apiPeer.(*grpcPeer) @@ -214,14 +214,14 @@ func (o *Outbound) invoke( responseBody, callOptions..., ); err != nil { - err := transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + err := transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) return invokeErrorToYARPCError(err, *responseMD) } // Service name match validation, return yarpcerrors.CodeInternal error if not match if match, resSvcName := checkServiceMatch(request.Service, *responseMD); !match { // If service doesn't match => we got response => span must not be nil - return transport.UpdateSpanWithErr(span, yarpcerrors.InternalErrorf("service name sent from the request "+ + return transport.UpdateSpanWithErrAndCode(span, yarpcerrors.InternalErrorf("service name sent from the request "+ "does not match the service name received in the response: sent %q, got: %q", request.Service, resSvcName), yarpcerrors.CodeInternal) } return nil @@ -304,22 +304,22 @@ func (o *Outbound) stream( _, span := createOpenTracingSpan.Do(ctx, treq) defer span.Finish() if err := validateRequest(treq); err != nil { - return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + return nil, transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) } md, err := transportRequestToMetadata(treq) if err != nil { - return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + return nil, transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) } fullMethod, err := procedureNameToFullMethod(req.Meta.Procedure) if err != nil { - return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + return nil, transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) } apiPeer, onFinish, err := o.peerChooser.Choose(ctx, treq) if err != nil { - return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + return nil, transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) } grpcPeer, ok := apiPeer.(*grpcPeer) @@ -329,13 +329,13 @@ func (o *Outbound) stream( ExpectedType: "*grpcPeer", } onFinish(err) - return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + return nil, transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) } if err := tracer.Inject(span.Context(), opentracing.HTTPHeaders, mdReadWriter(md)); err != nil { span.Finish() onFinish(err) - return nil, transport.UpdateSpanWithErr(span, err, yarpcerrors.FromError(err).Code()) + return nil, transport.UpdateSpanWithErrAndCode(span, err, yarpcerrors.FromError(err).Code()) } streamCtx := metadata.NewOutgoingContext(ctx, md) diff --git a/transport/grpc/stream.go b/transport/grpc/stream.go index e5f7c4e41..71093911b 100644 --- a/transport/grpc/stream.go +++ b/transport/grpc/stream.go @@ -174,7 +174,7 @@ func (cs *clientStream) Headers() (transport.Headers, error) { func (cs *clientStream) closeWithErr(err error) error { if !cs.closed.Swap(true) { - err = transport.UpdateSpanWithErr(cs.span, err, yarpcerrors.FromError(err).Code()) + err = transport.UpdateSpanWithErrAndCode(cs.span, err, yarpcerrors.FromError(err).Code()) cs.span.Finish() cs.release(err) } diff --git a/transport/http/outbound.go b/transport/http/outbound.go index 553c31af1..d3385b527 100644 --- a/transport/http/outbound.go +++ b/transport/http/outbound.go @@ -326,7 +326,7 @@ func (o *Outbound) call(ctx context.Context, treq *transport.Request) (*transpor if err = response.Body.Close(); err != nil { return nil, yarpcerrors.Newf(yarpcerrors.CodeInternal, err.Error()) } - return nil, transport.UpdateSpanWithErr(span, + return nil, transport.UpdateSpanWithErrAndCode(span, yarpcerrors.InternalErrorf("service name sent from the request "+ "does not match the service name received in the response, sent %q, got: %q", treq.Service, resSvcName), yarpcerrors.CodeInternal) } From 10754a0b6eb7bad8488d53d4a633aa7c72a13a2e Mon Sep 17 00:00:00 2001 From: Kexiong Liu Date: Fri, 16 Aug 2024 16:08:38 -0400 Subject: [PATCH 10/12] fix linter --- api/transport/propagation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/transport/propagation.go b/api/transport/propagation.go index 25bc2075e..ea0d779fb 100644 --- a/api/transport/propagation.go +++ b/api/transport/propagation.go @@ -116,9 +116,9 @@ func (e *ExtractOpenTracingSpan) Do( return ctx, span } -// Deprecated: Use UpdateSpanWithErrAndCode instead. // UpdateSpanWithErr logs an error to the span. Prefer UpdateSpanWithErrAndCode // for including an error code in addition to the error message. +// Deprecated: Use UpdateSpanWithErrAndCode instead. func UpdateSpanWithErr(span opentracing.Span, err error) error { if err != nil { span.SetTag("error", true) From 5f87bc0389df0cfc501cdc38bb209f9549354159 Mon Sep 17 00:00:00 2001 From: Kexiong Liu Date: Mon, 19 Aug 2024 13:18:52 -0400 Subject: [PATCH 11/12] saving --- api/transport/propagation.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/api/transport/propagation.go b/api/transport/propagation.go index ea0d779fb..6b427aac4 100644 --- a/api/transport/propagation.go +++ b/api/transport/propagation.go @@ -130,9 +130,8 @@ func UpdateSpanWithErr(span opentracing.Span, err error) error { // UpdateSpanWithErrAndCode sets the error tag with errcode on a span, if an error is given. // Returns the given error func UpdateSpanWithErrAndCode(span opentracing.Span, err error, errCode yarpcerrors.Code) error { - err = UpdateSpanWithErr(span, err) if err != nil { span.SetTag(TracingTagStatusCode, errCode) } - return err + return UpdateSpanWithErr(span, err) } From 9050ffad859c0fdbc12d8bddfdf8c9d193812cd0 Mon Sep 17 00:00:00 2001 From: Kexiong Liu Date: Wed, 21 Aug 2024 15:29:31 -0400 Subject: [PATCH 12/12] saving --- api/transport/propagation.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/api/transport/propagation.go b/api/transport/propagation.go index 6b427aac4..ead1ffcb7 100644 --- a/api/transport/propagation.go +++ b/api/transport/propagation.go @@ -122,7 +122,10 @@ func (e *ExtractOpenTracingSpan) Do( func UpdateSpanWithErr(span opentracing.Span, err error) error { if err != nil { span.SetTag("error", true) - span.LogFields(opentracinglog.String("event", err.Error())) + span.LogFields( + opentracinglog.String("event", "error"), + opentracinglog.String("message", err.Error()), + ) } return err }