Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add correlation_id as a span tagger and handle NPE #2210

Merged
merged 2 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,5 @@ public class TelemetryConstants {
public static final String STATUS_CODE_ATTRIBUTE_KEY = "Status code";
public static final String STATUS_DESCRIPTION_ATTRIBUTE_KEY = "Status description";
public static final String ENDPOINT_ATTRIBUTE_KEY = "Endpoint";
public static final String CORRELATION_ID_ATTRIBUTE_KEY = "CorrelationId";
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,10 @@ private void startSpan(StatisticDataUnit statisticDataUnit, MessageContext synCt
if (isOuterLevelSpan(statisticDataUnit, spanStore)) {
// Extract span context from headers
context = extract(headersMap);
} else {
} else if (parentSpan != null) {
context = Context.current().with(parentSpan);
} else {
context = Context.current();
}
span = tracer.spanBuilder(statisticDataUnit.getComponentName()).setParent(context).startSpan();

Expand Down Expand Up @@ -352,12 +354,12 @@ private void finishSpan(BasicStatisticDataUnit basicStatisticDataUnit,
}
if (!Objects.equals(spanWrapper, spanStore.getOuterLevelSpanWrapper())) {
// A non-outer level span
spanStore.finishSpan(spanWrapper);
spanStore.finishSpan(spanWrapper, synCtx);
} else {
// An outer level span
if (tracingScope.isEventCollectionFinished(synCtx)) {
cleanupContinuationStateSequences(spanStore);
spanStore.finishSpan(spanWrapper);
cleanupContinuationStateSequences(spanStore, synCtx);
spanStore.finishSpan(spanWrapper, synCtx);
tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId());
}
// Else - Absorb. Will be handled when all the callbacks are completed
Expand All @@ -367,13 +369,14 @@ private void finishSpan(BasicStatisticDataUnit basicStatisticDataUnit,
/**
* Cleans up remaining unfinished continuation state sequences before ending the outer level span.
* @param spanStore Span store object.
* @param synCtx Synapse message context
*/
private void cleanupContinuationStateSequences(SpanStore spanStore) {
private void cleanupContinuationStateSequences(SpanStore spanStore, MessageContext synCtx) {
if (!spanStore.getContinuationStateSequenceInfos().isEmpty()) {
List<ContinuationStateSequenceInfo> continuationStateSequences =
spanStore.getContinuationStateSequenceInfos();
for (ContinuationStateSequenceInfo continuationStateSequence : continuationStateSequences) {
finishSpanForContinuationStateSequence(continuationStateSequence, spanStore);
finishSpanForContinuationStateSequence(continuationStateSequence, spanStore, synCtx);
}
continuationStateSequences.clear();
}
Expand All @@ -385,10 +388,10 @@ private void cleanupContinuationStateSequences(SpanStore spanStore) {
* @param spanStore Span store object.
*/
private void finishSpanForContinuationStateSequence(ContinuationStateSequenceInfo continuationStateSequenceInfo,
SpanStore spanStore) {
SpanStore spanStore, MessageContext synCtx) {
String spanWrapperId = continuationStateSequenceInfo.getSpanReferenceId();
SpanWrapper spanWrapper = spanStore.getSpanWrapper(spanWrapperId);
spanStore.finishSpan(spanWrapper);
spanStore.finishSpan(spanWrapper, synCtx);
}

@Override
Expand Down Expand Up @@ -418,9 +421,9 @@ private void handleCallbackFinishEvent(MessageContext messageContext) {
// The last callback received in a scope will finish the outer level span
if (tracingScope.isEventCollectionFinished(messageContext)) {
synchronized (tracingScope.getSpanStore()) {
cleanupContinuationStateSequences(tracingScope.getSpanStore());
cleanupContinuationStateSequences(tracingScope.getSpanStore(), messageContext);
SpanWrapper outerLevelSpanWrapper = tracingScope.getSpanStore().getOuterLevelSpanWrapper();
tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper);
tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper, messageContext);
tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId());
}
}
Expand Down Expand Up @@ -452,7 +455,8 @@ public void handleStateStackRemoval(ContinuationState continuationState, Message
true);
if (continuationStateSequenceInfo != null) {
continuationStateSequenceInfo.setSpanActive(false);
finishSpanForContinuationStateSequence(continuationStateSequenceInfo, tracingScope.getSpanStore());
finishSpanForContinuationStateSequence(continuationStateSequenceInfo, tracingScope.getSpanStore(),
synCtx);
tracingScope.getSpanStore().getContinuationStateSequenceInfos()
.remove(continuationStateSequenceInfo);
}
Expand Down Expand Up @@ -501,7 +505,7 @@ public void handleStateStackClearance(MessageContext synCtx) {
List<ContinuationStateSequenceInfo> stackedSequences =
tracingScope.getSpanStore().getContinuationStateSequenceInfos();
for (ContinuationStateSequenceInfo stackedSequence : stackedSequences) {
finishSpanForContinuationStateSequence(stackedSequence, tracingScope.getSpanStore());
finishSpanForContinuationStateSequence(stackedSequence, tracingScope.getSpanStore(), synCtx);
}
stackedSequences.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.helpers;

import io.opentelemetry.api.trace.Span;
import org.apache.synapse.MessageContext;
import org.apache.synapse.aspects.flow.statistics.data.raw.StatisticDataUnit;
import org.apache.synapse.aspects.flow.statistics.data.raw.StatisticsLog;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.OpenTelemetryManagerHolder;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.TelemetryConstants;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.models.SpanWrapper;
import org.apache.synapse.commons.CorrelationConstants;

/**
* Applies tags to Spans.
Expand All @@ -39,8 +41,9 @@ private SpanTagger() {}
* Sets tags to the span which is contained in the provided span wrapper, from information acquired from the
* given basic statistic data unit.
* @param spanWrapper Span wrapper that contains the target span.
* @param synCtx Synapse message context
*/
public static void setSpanTags(SpanWrapper spanWrapper) {
public static void setSpanTags(SpanWrapper spanWrapper, MessageContext synCtx) {
StatisticsLog openStatisticsLog = new StatisticsLog(spanWrapper.getStatisticDataUnit());
Span span = spanWrapper.getSpan();
if (OpenTelemetryManagerHolder.isCollectingPayloads() || OpenTelemetryManagerHolder.isCollectingProperties()) {
Expand Down Expand Up @@ -113,5 +116,9 @@ public static void setSpanTags(SpanWrapper spanWrapper) {
span.setAttribute(TelemetryConstants.ENDPOINT_ATTRIBUTE_KEY,
String.valueOf(openStatisticsLog.getEndpoint().getJsonRepresentation()));
}
if (synCtx.getProperty(CorrelationConstants.CORRELATION_ID) != null) {
span.setAttribute(TelemetryConstants.CORRELATION_ID_ATTRIBUTE_KEY,
synCtx.getProperty(CorrelationConstants.CORRELATION_ID).toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,12 @@ public SpanWrapper addSpanWrapper(String spanId,
* Denotes the end of a span.
* Adds tags to the span and removes reference to the appropriate span wrapper in activeSpanWrappers.
* @param spanWrapper Span wrapper object, which has been already created
* @param synCtx Synapse message context
*/
public void finishSpan(SpanWrapper spanWrapper) {
public void finishSpan(SpanWrapper spanWrapper, MessageContext synCtx) {
if (spanWrapper != null && spanWrapper.getSpan() != null) {
if (spanWrapper.getStatisticDataUnit() != null) {
SpanTagger.setSpanTags(spanWrapper);
SpanTagger.setSpanTags(spanWrapper, synCtx);
}
spanWrapper.getSpan().end();
activeSpanWrappers.remove(spanWrapper);
Expand Down
Loading