This is an automated email from the ASF dual-hosted git repository.
jpoth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 26dd138bac5 CAMEL-20643: Add option to propagate Opentelemtry Context
even when tracing is disabled for a Camel Processor
26dd138bac5 is described below
commit 26dd138bac51761fb4e82684dbffb3e9c6195587
Author: John Poth <[email protected]>
AuthorDate: Wed Apr 3 11:11:56 2024 +0200
CAMEL-20643: Add option to propagate Opentelemtry Context even when tracing
is disabled for a Camel Processor
---
.../OpenTelemetryTracingStrategy.java | 72 +++++++++++-
.../CamelOpenTelemetryTestSupport.java | 4 +
...lemetryTracingStrategyPropagateContextTest.java | 128 +++++++++++++++++++++
3 files changed, 201 insertions(+), 3 deletions(-)
diff --git
a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java
b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java
index 7208d53dd8b..c827ca06c5a 100644
---
a/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java
+++
b/components/camel-opentelemetry/src/main/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategy.java
@@ -36,6 +36,8 @@ public class OpenTelemetryTracingStrategy implements
InterceptStrategy {
private final OpenTelemetryTracer tracer;
+ private boolean propagateContext;
+
public OpenTelemetryTracingStrategy(OpenTelemetryTracer tracer) {
this.tracer = tracer;
}
@@ -45,11 +47,34 @@ public class OpenTelemetryTracingStrategy implements
InterceptStrategy {
CamelContext camelContext,
NamedNode processorDefinition, Processor target, Processor
nextTarget)
throws Exception {
- if (!shouldTrace(processorDefinition)) {
+ if (shouldTrace(processorDefinition)) {
+ return new PropagateContextAndCreateSpan(processorDefinition,
target);
+ } else if (isPropagateContext()) {
+ return new PropagateContext(target);
+ } else {
return new DelegateAsyncProcessor(target);
}
+ }
+
+ public boolean isPropagateContext() {
+ return propagateContext;
+ }
+
+ public void setPropagateContext(boolean propagateContext) {
+ this.propagateContext = propagateContext;
+ }
- return new DelegateAsyncProcessor((Exchange exchange) -> {
+ private class PropagateContextAndCreateSpan implements Processor {
+ private final NamedNode processorDefinition;
+ private final Processor target;
+
+ public PropagateContextAndCreateSpan(NamedNode processorDefinition,
Processor target) {
+ this.processorDefinition = processorDefinition;
+ this.target = target;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
Span span = null;
OpenTelemetrySpanAdapter spanWrapper = (OpenTelemetrySpanAdapter)
ActiveSpanManager.getSpan(exchange);
if (spanWrapper != null) {
@@ -86,7 +111,48 @@ public class OpenTelemetryTracingStrategy implements
InterceptStrategy {
processorSpan.end();
}
- });
+ }
+ }
+
+ private class PropagateContext implements Processor {
+ private final Processor target;
+
+ public PropagateContext(Processor target) {
+ this.target = target;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Span span = null;
+ OpenTelemetrySpanAdapter spanWrapper = (OpenTelemetrySpanAdapter)
ActiveSpanManager.getSpan(exchange);
+ if (spanWrapper != null) {
+ span = spanWrapper.getOpenTelemetrySpan();
+ }
+
+ if (span == null) {
+ target.process(exchange);
+ return;
+ }
+
+ boolean activateExchange = !(target instanceof
GetCorrelationContextProcessor
+ || target instanceof SetCorrelationContextProcessor);
+
+ if (activateExchange) {
+ ActiveSpanManager.activate(exchange, new
OpenTelemetrySpanAdapter(span));
+ }
+
+ try {
+ target.process(exchange);
+ } catch (Exception ex) {
+ span.setStatus(StatusCode.ERROR);
+ span.recordException(ex);
+ throw ex;
+ } finally {
+ if (activateExchange) {
+ ActiveSpanManager.deactivate(exchange);
+ }
+ }
+ }
}
private static String getComponentName(NamedNode processorDefinition) {
diff --git
a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
index 1bc2bfd1ea4..029a9afdf27 100644
---
a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
+++
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/CamelOpenTelemetryTestSupport.java
@@ -94,6 +94,10 @@ class CamelOpenTelemetryTestSupport extends CamelTestSupport
{
return null;
}
+ protected OpenTelemetryTracer getOttracer() {
+ return ottracer;
+ }
+
protected void verify() {
verify(expected, false);
}
diff --git
a/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyPropagateContextTest.java
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyPropagateContextTest.java
new file mode 100644
index 00000000000..bcf27705ab4
--- /dev/null
+++
b/components/camel-opentelemetry/src/test/java/org/apache/camel/opentelemetry/OpenTelemetryTracingStrategyPropagateContextTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.opentelemetry;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.InterceptStrategy;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.apache.camel.test.junit5.TestSupport.fileUri;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class OpenTelemetryTracingStrategyPropagateContextTest extends
CamelOpenTelemetryTestSupport {
+
+ @TempDir
+ private static Path tempDirectory;
+
+ private final static SpanTestData[] testdata = {
+ new
SpanTestData().setLabel("camel-process").setOperation("delayed")
+ .setParentId(2),
+ new
SpanTestData().setLabel("camel-process").setOperation("WithSpan.secondMethod")
+ .setParentId(2),
+ new
SpanTestData().setLabel("camel-process").setOperation("file").setKind(SpanKind.SERVER)
+ };
+
+ OpenTelemetryTracingStrategyPropagateContextTest() {
+ super(testdata);
+ }
+
+ @BeforeAll
+ public static void createFile() throws IOException {
+ Files.createFile(tempDirectory.resolve("file.txt"));
+ }
+
+ @Test
+ void testTracingOfProcessors() throws IOException, InterruptedException {
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+ assertTrue(notify.matches(30, TimeUnit.SECONDS));
+ verify(true);
+ }
+
+ @Override
+ protected String getExcludePatterns() {
+ return "longRunningProcess";
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(fileUri(tempDirectory)).routeId("serviceA")
+ .process(exchange -> {
+ longRunningProcess();
+ }).id("longRunningProcess")
+ .delay(simple("${random(0,500)}")).id("delayed");
+ }
+
+ private void longRunningProcess() {
+ firstMethod();
+ secondMethod();
+ }
+
+ private void firstMethod() {
+ // no Span created by Camel
+ }
+
+ // Simulate io.opentelemetry.instrumentation.annotations.@WithSpan
+ // in order to avoid having to start an HTTP sever just to collect
the Spans
+ // see
https://github.com/open-telemetry/opentelemetry-java-examples/tree/main/telemetry-testing
+ //@WithSpan
+ public void secondMethod() {
+ // The Context should be propagated
+ Assertions.assertNotSame(Context.root(), Context.current(),
"OpenTelemetry was not propagated !");
+ // build and start a custom Span similar to what @WithSpan
would do
+ SpanBuilder builder =
getOttracer().getTracer().spanBuilder("WithSpan.secondMethod");
+ Span span = builder.setParent(Context.current())
+ .setAttribute(COMPONENT_KEY, "custom")
+ .startSpan();
+ //noinspection EmptyTryBlock
+ try (Scope ignored = span.makeCurrent()) {
+ // do work
+ } finally {
+ span.end();
+ }
+
+ }
+ };
+ }
+
+ @Override
+ protected Function<OpenTelemetryTracer, InterceptStrategy>
getTracingStrategy() {
+ return (tracer) -> {
+ OpenTelemetryTracingStrategy strategy = new
OpenTelemetryTracingStrategy(tracer);
+ strategy.setPropagateContext(true);
+ return strategy;
+ };
+ }
+}