Skip to content

Commit

Permalink
Merge pull request #2210 from sajinieKavindya/new-master
Browse files Browse the repository at this point in the history
Add correlation_id as a span tagger and handle NPE
  • Loading branch information
sajinieKavindya authored Aug 22, 2024
2 parents ac4ebd5 + ca31ff3 commit c3a88cb
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,5 @@ public class TelemetryConstants {
public static final String STATUS_CODE_ATTRIBUTE_KEY = "Status code";
public static final String STATUS_DESCRIPTION_ATTRIBUTE_KEY = "Status description";
public static final String ENDPOINT_ATTRIBUTE_KEY = "Endpoint";
public static final String CORRELATION_ID_ATTRIBUTE_KEY = "CorrelationId";
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,10 @@ private void startSpan(StatisticDataUnit statisticDataUnit, MessageContext synCt
if (isOuterLevelSpan(statisticDataUnit, spanStore)) {
// Extract span context from headers
context = extract(headersMap);
} else {
} else if (parentSpan != null) {
context = Context.current().with(parentSpan);
} else {
context = Context.current();
}
span = tracer.spanBuilder(statisticDataUnit.getComponentName()).setParent(context).startSpan();

Expand Down Expand Up @@ -352,12 +354,12 @@ private void finishSpan(BasicStatisticDataUnit basicStatisticDataUnit,
}
if (!Objects.equals(spanWrapper, spanStore.getOuterLevelSpanWrapper())) {
// A non-outer level span
spanStore.finishSpan(spanWrapper);
spanStore.finishSpan(spanWrapper, synCtx);
} else {
// An outer level span
if (tracingScope.isEventCollectionFinished(synCtx)) {
cleanupContinuationStateSequences(spanStore);
spanStore.finishSpan(spanWrapper);
cleanupContinuationStateSequences(spanStore, synCtx);
spanStore.finishSpan(spanWrapper, synCtx);
tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId());
}
// Else - Absorb. Will be handled when all the callbacks are completed
Expand All @@ -367,13 +369,14 @@ private void finishSpan(BasicStatisticDataUnit basicStatisticDataUnit,
/**
* Cleans up remaining unfinished continuation state sequences before ending the outer level span.
* @param spanStore Span store object.
* @param synCtx Synapse message context
*/
private void cleanupContinuationStateSequences(SpanStore spanStore) {
private void cleanupContinuationStateSequences(SpanStore spanStore, MessageContext synCtx) {
if (!spanStore.getContinuationStateSequenceInfos().isEmpty()) {
List<ContinuationStateSequenceInfo> continuationStateSequences =
spanStore.getContinuationStateSequenceInfos();
for (ContinuationStateSequenceInfo continuationStateSequence : continuationStateSequences) {
finishSpanForContinuationStateSequence(continuationStateSequence, spanStore);
finishSpanForContinuationStateSequence(continuationStateSequence, spanStore, synCtx);
}
continuationStateSequences.clear();
}
Expand All @@ -385,10 +388,10 @@ private void cleanupContinuationStateSequences(SpanStore spanStore) {
* @param spanStore Span store object.
*/
private void finishSpanForContinuationStateSequence(ContinuationStateSequenceInfo continuationStateSequenceInfo,
SpanStore spanStore) {
SpanStore spanStore, MessageContext synCtx) {
String spanWrapperId = continuationStateSequenceInfo.getSpanReferenceId();
SpanWrapper spanWrapper = spanStore.getSpanWrapper(spanWrapperId);
spanStore.finishSpan(spanWrapper);
spanStore.finishSpan(spanWrapper, synCtx);
}

@Override
Expand Down Expand Up @@ -418,9 +421,9 @@ private void handleCallbackFinishEvent(MessageContext messageContext) {
// The last callback received in a scope will finish the outer level span
if (tracingScope.isEventCollectionFinished(messageContext)) {
synchronized (tracingScope.getSpanStore()) {
cleanupContinuationStateSequences(tracingScope.getSpanStore());
cleanupContinuationStateSequences(tracingScope.getSpanStore(), messageContext);
SpanWrapper outerLevelSpanWrapper = tracingScope.getSpanStore().getOuterLevelSpanWrapper();
tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper);
tracingScope.getSpanStore().finishSpan(outerLevelSpanWrapper, messageContext);
tracingScopeManager.cleanupTracingScope(tracingScope.getTracingScopeId());
}
}
Expand Down Expand Up @@ -452,7 +455,8 @@ public void handleStateStackRemoval(ContinuationState continuationState, Message
true);
if (continuationStateSequenceInfo != null) {
continuationStateSequenceInfo.setSpanActive(false);
finishSpanForContinuationStateSequence(continuationStateSequenceInfo, tracingScope.getSpanStore());
finishSpanForContinuationStateSequence(continuationStateSequenceInfo, tracingScope.getSpanStore(),
synCtx);
tracingScope.getSpanStore().getContinuationStateSequenceInfos()
.remove(continuationStateSequenceInfo);
}
Expand Down Expand Up @@ -501,7 +505,7 @@ public void handleStateStackClearance(MessageContext synCtx) {
List<ContinuationStateSequenceInfo> stackedSequences =
tracingScope.getSpanStore().getContinuationStateSequenceInfos();
for (ContinuationStateSequenceInfo stackedSequence : stackedSequences) {
finishSpanForContinuationStateSequence(stackedSequence, tracingScope.getSpanStore());
finishSpanForContinuationStateSequence(stackedSequence, tracingScope.getSpanStore(), synCtx);
}
stackedSequences.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.helpers;

import io.opentelemetry.api.trace.Span;
import org.apache.synapse.MessageContext;
import org.apache.synapse.aspects.flow.statistics.data.raw.StatisticDataUnit;
import org.apache.synapse.aspects.flow.statistics.data.raw.StatisticsLog;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.OpenTelemetryManagerHolder;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.TelemetryConstants;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.models.SpanWrapper;
import org.apache.synapse.commons.CorrelationConstants;

/**
* Applies tags to Spans.
Expand All @@ -39,8 +41,9 @@ private SpanTagger() {}
* Sets tags to the span which is contained in the provided span wrapper, from information acquired from the
* given basic statistic data unit.
* @param spanWrapper Span wrapper that contains the target span.
* @param synCtx Synapse message context
*/
public static void setSpanTags(SpanWrapper spanWrapper) {
public static void setSpanTags(SpanWrapper spanWrapper, MessageContext synCtx) {
StatisticsLog openStatisticsLog = new StatisticsLog(spanWrapper.getStatisticDataUnit());
Span span = spanWrapper.getSpan();
if (OpenTelemetryManagerHolder.isCollectingPayloads() || OpenTelemetryManagerHolder.isCollectingProperties()) {
Expand Down Expand Up @@ -113,5 +116,9 @@ public static void setSpanTags(SpanWrapper spanWrapper) {
span.setAttribute(TelemetryConstants.ENDPOINT_ATTRIBUTE_KEY,
String.valueOf(openStatisticsLog.getEndpoint().getJsonRepresentation()));
}
if (synCtx.getProperty(CorrelationConstants.CORRELATION_ID) != null) {
span.setAttribute(TelemetryConstants.CORRELATION_ID_ATTRIBUTE_KEY,
synCtx.getProperty(CorrelationConstants.CORRELATION_ID).toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,12 @@ public SpanWrapper addSpanWrapper(String spanId,
* Denotes the end of a span.
* Adds tags to the span and removes reference to the appropriate span wrapper in activeSpanWrappers.
* @param spanWrapper Span wrapper object, which has been already created
* @param synCtx Synapse message context
*/
public void finishSpan(SpanWrapper spanWrapper) {
public void finishSpan(SpanWrapper spanWrapper, MessageContext synCtx) {
if (spanWrapper != null && spanWrapper.getSpan() != null) {
if (spanWrapper.getStatisticDataUnit() != null) {
SpanTagger.setSpanTags(spanWrapper);
SpanTagger.setSpanTags(spanWrapper, synCtx);
}
spanWrapper.getSpan().end();
activeSpanWrappers.remove(spanWrapper);
Expand Down

0 comments on commit c3a88cb

Please sign in to comment.