This is an automated email from the ASF dual-hosted git repository. pcongiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 553ddd5f4a68a201189f310fb7f2d90a339a4a07 Author: Pasquale Congiusti <[email protected]> AuthorDate: Mon May 12 15:12:09 2025 +0200 feat(components): micrometer observability telemetry impl Closes CAMEL-21869 --- catalog/camel-allcomponents/pom.xml | 5 + components/camel-micrometer-observability/pom.xml | 130 +++++++++++ .../MicrometerObservabilityTracerConfigurer.java | 69 ++++++ ...ter.observability.MicrometerObservabilityTracer | 2 + .../apache/camel/micrometer-observability-tracer | 2 + .../services/org/apache/camel/other.properties | 7 + .../resources/micrometer-observability.json | 15 ++ .../src/main/docs/micrometer-observability.adoc | 69 ++++++ .../MicrometerObservabilitySpanAdapter.java | 83 +++++++ .../MicrometerObservabilityTracer.java | 160 +++++++++++++ .../micrometer/observability/AsyncCXFTest.java | 171 ++++++++++++++ .../micrometer/observability/AsyncDirectTest.java | 146 ++++++++++++ .../micrometer/observability/AsyncWiretapTest.java | 149 ++++++++++++ .../observability/CamelOpenTelemetryExtension.java | 257 +++++++++++++++++++++ .../observability/DisableEndpointTest.java | 80 +++++++ .../observability/EnableProcessorsTest.java | 109 +++++++++ .../MicrometerObservabilityTrace.java | 53 +++++ ...rObservabilityTracerPropagationTestSupport.java | 77 ++++++ .../MicrometerObservabilityTracerTest.java | 110 +++++++++ .../MicrometerObservabilityTracerTestSupport.java | 70 ++++++ .../observability/SpanPropagationTest.java | 82 +++++++ .../src/test/resources/log4j2.properties | 30 +++ parent/pom.xml | 15 ++ 23 files changed, 1891 insertions(+) diff --git a/catalog/camel-allcomponents/pom.xml b/catalog/camel-allcomponents/pom.xml index 279c7f1d08f..6a4910f05ed 100644 --- a/catalog/camel-allcomponents/pom.xml +++ b/catalog/camel-allcomponents/pom.xml @@ -1271,6 +1271,11 @@ <artifactId>camel-micrometer</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-micrometer-observability</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-micrometer-prometheus</artifactId> diff --git a/components/camel-micrometer-observability/pom.xml b/components/camel-micrometer-observability/pom.xml new file mode 100644 index 00000000000..a03b8e9fc05 --- /dev/null +++ b/components/camel-micrometer-observability/pom.xml @@ -0,0 +1,130 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>4.12.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-micrometer-observability</artifactId> + <packaging>jar</packaging> + <name>Camel :: Micrometer :: Observability 2</name> + <description>Micrometer Observability implementation of Camel Telemetry</description> + + <properties> + <firstVersion>4.15.0</firstVersion> + <label>monitoring,microservice</label> + <title>Micrometer Observability 2</title> + <supportLevel>Preview</supportLevel> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-telemetry</artifactId> + </dependency> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-core</artifactId> + </dependency> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-tracing</artifactId> + </dependency> + <!-- Used as default development tracer option --> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-tracing-test</artifactId> + </dependency> + + + <!-- START Required to validate a real context propagation implementation --> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-tracing-bridge-otel</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk</artifactId> + <version>${opentelemetry-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-testing</artifactId> + <version>${opentelemetry-version}</version> + <scope>test</scope> + </dependency> + <!-- END Required to validate a real context propagation implementation --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-spring-junit5</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + + <!-- Required to test CXF async --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-cxf-rest</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-cxf-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-transports-http-undertow</artifactId> + <version>${cxf-version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>io.undertow</groupId> + <artifactId>undertow-servlet</artifactId> + </exclusion> + <exclusion> + <groupId>io.undertow</groupId> + <artifactId>undertow-servlet-jakarta</artifactId> + </exclusion> + <exclusion> + <groupId>io.undertow</groupId> + <artifactId>undertow-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-undertow</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/components/camel-micrometer-observability/src/generated/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracerConfigurer.java b/components/camel-micrometer-observability/src/generated/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracerConfigurer.java new file mode 100644 index 00000000000..014c73913f0 --- /dev/null +++ b/components/camel-micrometer-observability/src/generated/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracerConfigurer.java @@ -0,0 +1,69 @@ +/* Generated by camel build tools - do NOT edit this file! */ +package org.apache.camel.micrometer.observability; + +import javax.annotation.processing.Generated; +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.ExtendedPropertyConfigurerGetter; +import org.apache.camel.spi.PropertyConfigurerGetter; +import org.apache.camel.spi.ConfigurerStrategy; +import org.apache.camel.spi.GeneratedPropertyConfigurer; +import org.apache.camel.util.CaseInsensitiveMap; +import org.apache.camel.micrometer.observability.MicrometerObservabilityTracer; + +/** + * Generated by camel build tools - do NOT edit this file! + */ +@Generated("org.apache.camel.maven.packaging.GenerateConfigurerMojo") +@SuppressWarnings("unchecked") +public class MicrometerObservabilityTracerConfigurer extends org.apache.camel.support.component.PropertyConfigurerSupport implements GeneratedPropertyConfigurer, PropertyConfigurerGetter { + + @Override + public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) { + org.apache.camel.micrometer.observability.MicrometerObservabilityTracer target = (org.apache.camel.micrometer.observability.MicrometerObservabilityTracer) obj; + switch (ignoreCase ? name.toLowerCase() : name) { + case "camelcontext": + case "camelContext": target.setCamelContext(property(camelContext, org.apache.camel.CamelContext.class, value)); return true; + case "excludepatterns": + case "excludePatterns": target.setExcludePatterns(property(camelContext, java.lang.String.class, value)); return true; + case "spanlifecyclemanager": + case "spanLifecycleManager": target.setSpanLifecycleManager(property(camelContext, org.apache.camel.telemetry.SpanLifecycleManager.class, value)); return true; + case "traceprocessors": + case "traceProcessors": target.setTraceProcessors(property(camelContext, boolean.class, value)); return true; + default: return false; + } + } + + @Override + public Class<?> getOptionType(String name, boolean ignoreCase) { + switch (ignoreCase ? name.toLowerCase() : name) { + case "camelcontext": + case "camelContext": return org.apache.camel.CamelContext.class; + case "excludepatterns": + case "excludePatterns": return java.lang.String.class; + case "spanlifecyclemanager": + case "spanLifecycleManager": return org.apache.camel.telemetry.SpanLifecycleManager.class; + case "traceprocessors": + case "traceProcessors": return boolean.class; + default: return null; + } + } + + @Override + public Object getOptionValue(Object obj, String name, boolean ignoreCase) { + org.apache.camel.micrometer.observability.MicrometerObservabilityTracer target = (org.apache.camel.micrometer.observability.MicrometerObservabilityTracer) obj; + switch (ignoreCase ? name.toLowerCase() : name) { + case "camelcontext": + case "camelContext": return target.getCamelContext(); + case "excludepatterns": + case "excludePatterns": return target.getExcludePatterns(); + case "spanlifecyclemanager": + case "spanLifecycleManager": return target.getSpanLifecycleManager(); + case "traceprocessors": + case "traceProcessors": return target.isTraceProcessors(); + default: return null; + } + } +} + diff --git a/components/camel-micrometer-observability/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.micrometer.observability.MicrometerObservabilityTracer b/components/camel-micrometer-observability/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.micrometer.observability.MicrometerObservabilityTracer new file mode 100644 index 00000000000..878f9e382dc --- /dev/null +++ b/components/camel-micrometer-observability/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.micrometer.observability.MicrometerObservabilityTracer @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.micrometer.observability.MicrometerObservabilityTracerConfigurer diff --git a/components/camel-micrometer-observability/src/generated/resources/META-INF/services/org/apache/camel/micrometer-observability-tracer b/components/camel-micrometer-observability/src/generated/resources/META-INF/services/org/apache/camel/micrometer-observability-tracer new file mode 100644 index 00000000000..e2c07c68f1d --- /dev/null +++ b/components/camel-micrometer-observability/src/generated/resources/META-INF/services/org/apache/camel/micrometer-observability-tracer @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.micrometer.observability.MicrometerObservabilityTracer diff --git a/components/camel-micrometer-observability/src/generated/resources/META-INF/services/org/apache/camel/other.properties b/components/camel-micrometer-observability/src/generated/resources/META-INF/services/org/apache/camel/other.properties new file mode 100644 index 00000000000..c2cf3f73b4f --- /dev/null +++ b/components/camel-micrometer-observability/src/generated/resources/META-INF/services/org/apache/camel/other.properties @@ -0,0 +1,7 @@ +# Generated by camel build tools - do NOT edit this file! +name=micrometer-observability +groupId=org.apache.camel +artifactId=camel-micrometer-observability +version=4.12.0-SNAPSHOT +projectName=Camel :: Micrometer :: Observability +projectDescription=Micrometer Observability implementation of Camel Telemetry diff --git a/components/camel-micrometer-observability/src/generated/resources/micrometer-observability.json b/components/camel-micrometer-observability/src/generated/resources/micrometer-observability.json new file mode 100644 index 00000000000..afce5fe6f33 --- /dev/null +++ b/components/camel-micrometer-observability/src/generated/resources/micrometer-observability.json @@ -0,0 +1,15 @@ +{ + "other": { + "kind": "other", + "name": "micrometer-observability", + "title": "Micrometer Observability", + "description": "Micrometer Observability implementation of Camel Telemetry", + "deprecated": false, + "firstVersion": "4.12.0", + "label": "monitoring,microservice", + "supportLevel": "Preview", + "groupId": "org.apache.camel", + "artifactId": "camel-micrometer-observability", + "version": "4.12.0-SNAPSHOT" + } +} diff --git a/components/camel-micrometer-observability/src/main/docs/micrometer-observability.adoc b/components/camel-micrometer-observability/src/main/docs/micrometer-observability.adoc new file mode 100644 index 00000000000..e2d8e70461e --- /dev/null +++ b/components/camel-micrometer-observability/src/main/docs/micrometer-observability.adoc @@ -0,0 +1,69 @@ += Telemetry Dev Component +:doctitle: Micrometer Observability +:shortname: micrometed-observability +:artifactid: camel-micrometed-observability +:description: Implementation of Camel Micrometer Observability based on the Camel Telemetry spec +:since: 4.15 +:supportlevel: Preview +:tabs-sync-option: + +*Since Camel {since}* + +This module is the implementation of the common `camel-telemetry` interface based on https://micrometer.io/[Micrometer Application Observability] technology. The name used here as `Micrometer Observability 2` is done to distinguish on the existing `camel-observation` component which was based on an older Camel tracing and Micrometer specification. You're invited to start replacing the older `camel-observation` with this one instead as it may become the default component in future version [...] + +NOTE: this component has slight differences compared to the `camel-observation` and is meant to solve a few inconsistencies identified. + +== Configuration + +The configuration properties for the component are: + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Option |Default |Description +|`excludePatterns` | | Sets exclude pattern(s) that will disable tracing for Camel +messages that matches the pattern. The content is a Set<String> where the key is a pattern. The pattern +uses the rules from Intercept. +|`traceProcessors` | `false` | Setting this to true will create new OpenTelemetry Spans for each Camel Processors. +Use the excludePattern property to filter out Processors +|======================================================================= + +== Tracing format + +The component provides a few different traces representation formats you can use, depending on the scope of your telemetry or the integration with any third party telemetry collectors. + +=== Tree + +As the component is mainly targeted to have a quick visualization of a running application, then, the `tree` format is the recommended one. The output will be like the following: + +``` +| d6c36bc6f3374ed0bd6edb638535f130 +├── | timer://start (camel-timer) [2030 millis] <-- + ├── | process (to1-to) [2021 millis] + ├── | direct://new (camel-direct) [2020 millis] --> + ├── | direct://new (camel-direct) [2019 millis] <-- + ├── | process (delay1-delay) [2002 millis] + ├── | process (log2-log) [2 millis] + └── | process (to3-to) [3 millis] + └── | log://new (camel-log) [2 millis] --> + └── | process (to4-to) [1 millis] + └── | mock://end (camel-mock) [0 millis] --> + ├── | process (log1-log) [0 millis] + └── | process (to2-to) [2 millis] + └── | log://info (camel-log) [1 millis] --> +``` + +This is a quick visual tree Trace representation of a given execution of a Camel route. You can quickly verify the Camel URI, the component and the time spent to execute each Span. + +NOTE: the --> and <-- represents an outgoing or incoming message. + +=== Json + +This format is suitable if you need to integrate with any third party. The output of each trace will come in the following format: + +```json +{"traceId":"4ee161eb055644fca713f4c02ab2fb50","spans":[{"logEntries":[],"traceid":"4ee161eb055644fca713f4c02ab2fb50","spanid":"8785debcdc09409abeed65d08aaaca1b","exchangeId":"D1B1814C509A376-0000000000000000","op":"EVENT_RECEIVED","component":"camel-timer","url.path":"start","initTimestamp":"4085653460994","camel.uri":"timer://start","url.scheme":"timer","endTimestamp":"4087698388888","isDone":"true"},{"logEntries":[],"traceid":"4ee161eb055644fca713f4c02ab2fb50","spanid":"51cc3929d34e452 [...] +``` + +=== Default + +Default is a very simple serialization into the default Java object `toString()` representation. diff --git a/components/camel-micrometer-observability/src/main/java/org/apache/camel/micrometer/observability/MicrometerObservabilitySpanAdapter.java b/components/camel-micrometer-observability/src/main/java/org/apache/camel/micrometer/observability/MicrometerObservabilitySpanAdapter.java new file mode 100644 index 00000000000..44bdff3e920 --- /dev/null +++ b/components/camel-micrometer-observability/src/main/java/org/apache/camel/micrometer/observability/MicrometerObservabilitySpanAdapter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.util.Map; + +import org.apache.camel.telemetry.Span; + +public class MicrometerObservabilitySpanAdapter implements Span { + + private static final String DEFAULT_EVENT_NAME = "log"; + + private final io.micrometer.tracing.Span span; + + public MicrometerObservabilitySpanAdapter(io.micrometer.tracing.Span span) { + this.span = span; + } + + @Override + public void log(Map<String, String> fields) { + String event = fields.get("event"); + if ("error".equalsIgnoreCase(event)) { + setError(true); + } else if (fields.get("message") != null) { + // this is really the only expected event for logging + this.span.event("message=" + fields.get("message")); + } else { + this.span.event(DEFAULT_EVENT_NAME); + } + } + + @Override + public void setComponent(String component) { + this.span.tag("component", component); + } + + @Override + public void setError(boolean isError) { + this.span.tag("error", isError); + } + + @Override + public void setTag(String key, String value) { + this.span.tag(key, value); + } + + protected io.micrometer.tracing.Span getSpan() { + return this.span; + } + + protected void activate() { + this.span.start(); + } + + protected void close() { + this.span.end(); + } + + protected void deactivate() { + + } + + @Override + public String toString() { + return "MicrometerObservabilitySpanAdapter [traceId=" + + span.context().traceId() + " spanId=" + span.context().spanId() + "]"; + } + +} diff --git a/components/camel-micrometer-observability/src/main/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracer.java b/components/camel-micrometer-observability/src/main/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracer.java new file mode 100644 index 00000000000..fa6bba48ab7 --- /dev/null +++ b/components/camel-micrometer-observability/src/main/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracer.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.tracing.Span.Builder; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.handler.DefaultTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; +import io.micrometer.tracing.propagation.Propagator; +import io.micrometer.tracing.test.simple.SimpleTracer; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.spi.Configurer; +import org.apache.camel.spi.annotations.JdkService; +import org.apache.camel.support.CamelContextHelper; +import org.apache.camel.telemetry.Span; +import org.apache.camel.telemetry.SpanContextPropagationExtractor; +import org.apache.camel.telemetry.SpanContextPropagationInjector; +import org.apache.camel.telemetry.SpanLifecycleManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@JdkService("micrometer-observability-tracer") +@Configurer +@ManagedResource(description = "MicrometerObservabilityTracer") +public class MicrometerObservabilityTracer extends org.apache.camel.telemetry.Tracer { + + private static final Logger LOG = LoggerFactory.getLogger(MicrometerObservabilityTracer.class); + + private Tracer tracer; + private ObservationRegistry observationRegistry; + private Propagator propagator; + + @Override + protected void initTracer() { + if (tracer == null) { + tracer = CamelContextHelper.findSingleByType(getCamelContext(), Tracer.class); + } + if (tracer == null) { + tracer = new SimpleTracer(); + LOG.warn("No tracer was provided. A default inmemory tracer is used. " + + "This can be useful for development only, avoid this in a production environment."); + } + if (observationRegistry == null) { + observationRegistry = CamelContextHelper.findSingleByType(getCamelContext(), ObservationRegistry.class); + } + if (observationRegistry == null) { + MeterRegistry meterRegistry = new SimpleMeterRegistry(); + this.observationRegistry = ObservationRegistry.create(); + this.observationRegistry.observationConfig().observationHandler( + new DefaultMeterObservationHandler(meterRegistry)); + LOG.warn("No observation registry was provided. A default inmemory observation registry is used. " + + "This can be useful for development only, avoid this in a production environment."); + } + + if (propagator == null) { + propagator = CamelContextHelper.findSingleByType(getCamelContext(), Propagator.class); + } + if (propagator == null) { + propagator = Propagator.NOOP; + LOG.warn("No propagator was provided. A NOOP implementation is used, you won't be able to trace " + + "upstream activity. " + + "This can be useful for development only, avoid this in a production environment."); + } + + this.setSpanLifecycleManager(new MicrometerObservabilitySpanLifecycleManager()); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + LOG.info("Micrometer Observability enabled"); + } + + private class MicrometerObservabilitySpanLifecycleManager implements SpanLifecycleManager { + + private MicrometerObservabilitySpanLifecycleManager() { + observationRegistry.observationConfig().observationHandler( + new ObservationHandler.FirstMatchingCompositeObservationHandler( + new PropagatingSenderTracingObservationHandler<>(tracer, propagator), + new PropagatingReceiverTracingObservationHandler<>(tracer, propagator), + new DefaultTracingObservationHandler(tracer))); + } + + @Override + public Span create(String spanName, Span parent, SpanContextPropagationExtractor extractor) { + io.micrometer.tracing.Span span; + if (parent != null) { + MicrometerObservabilitySpanAdapter microObsParentSpan = (MicrometerObservabilitySpanAdapter) parent; + span = tracer.nextSpan(microObsParentSpan.getSpan()); + } else if (extractor.get("traceparent") != null || extractor.get("X-B3-TraceId") != null) { + /* + * This part is a bit tricky. We need to verify if the extractor + * (ie, the Camel Exchange) holds a propagated parent. + * As the micrometer-observability is technology agnostic, we need to check against + * the available implementations (Opentelemetry and Zipkin at the moment of writing this comment). + * TODO: we could do this configurable if it is required. + */ + Builder builder = propagator.extract(extractor, (carrier, key) -> { + return extractor.get(key) == null ? null : (String) extractor.get(key); + }); + + span = builder.start(); + } else { + span = tracer.nextSpan(); + } + + span.name(spanName); + + return new MicrometerObservabilitySpanAdapter(span); + } + + @Override + public void activate(Span span) { + MicrometerObservabilitySpanAdapter microObsSpan = (MicrometerObservabilitySpanAdapter) span; + microObsSpan.activate(); + } + + @Override + public void close(Span span) { + MicrometerObservabilitySpanAdapter microObsSpan = (MicrometerObservabilitySpanAdapter) span; + microObsSpan.close(); + } + + @Override + public void deactivate(Span span) { + MicrometerObservabilitySpanAdapter microObsSpan = (MicrometerObservabilitySpanAdapter) span; + microObsSpan.deactivate(); + } + + @Override + public void inject(Span span, SpanContextPropagationInjector injector) { + MicrometerObservabilitySpanAdapter microObsSpan = (MicrometerObservabilitySpanAdapter) span; + propagator.inject( + microObsSpan.getSpan().context(), + injector, + (carrier, key, value) -> carrier.put(key, value)); + } + } + +} diff --git a/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/AsyncCXFTest.java b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/AsyncCXFTest.java new file mode 100644 index 00000000000..b77e25a0b67 --- /dev/null +++ b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/AsyncCXFTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.trace.data.SpanData; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.micrometer.observability.CamelOpenTelemetryExtension.OtelTrace; +import org.apache.camel.telemetry.Op; +import org.apache.camel.test.AvailablePortFinder; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/* + * AsyncCXFTest tests the execution of CXF async which was reported as a potential candidate to + * inconsistent Span creation in async mode. + * + * This test is also special as it requires a different setting to inherit the + * Opentelemetry propagation mechanism. + */ +public class AsyncCXFTest extends MicrometerObservabilityTracerPropagationTestSupport { + + private static int cxfPort = AvailablePortFinder.getNextRandomAvailable(); + + @Test + void testRouteMultipleRequests() throws InterruptedException, IOException { + int j = 10; + MockEndpoint mock = getMockEndpoint("mock:end"); + mock.expectedMessageCount(j); + mock.setAssertPeriod(5000); + for (int i = 0; i < j; i++) { + context.createProducerTemplate().sendBody("direct:start", "Hello!"); + } + mock.assertIsSatisfied(1000); + Map<String, OtelTrace> traces = otelExtension.getTraces(); + // Each trace should have a unique trace id. It is enough to assert that + // the number of elements in the map is the same of the requests to prove + // all traces have been generated uniquely. + assertEquals(j, traces.size()); + // Each trace should have the same structure + for (OtelTrace trace : traces.values()) { + checkTrace(trace); + } + } + + private void checkTrace(OtelTrace trace) { + List<SpanData> spans = trace.getSpans(); + assertEquals(8, spans.size()); + SpanData testProducer = getSpan(spans, "direct://start", Op.EVENT_SENT); + SpanData direct = getSpan(spans, "direct://start", Op.EVENT_RECEIVED); + SpanData directSendTo = getSpan(spans, "direct://send", Op.EVENT_SENT); + SpanData directSendFrom = getSpan(spans, "direct://send", Op.EVENT_RECEIVED); + SpanData cxfRs = getSpan( + spans, + "cxfrs://http://localhost:" + cxfPort + "/rest/helloservice/sayHello?synchronous=false", + Op.EVENT_SENT); + SpanData rest = getSpan( + spans, + "rest://post:/rest/helloservice:/sayHello?routeId=direct-hi", + Op.EVENT_RECEIVED); + SpanData log = getSpan(spans, "log://hi", Op.EVENT_SENT); + SpanData mock = getSpan(spans, "mock://end", Op.EVENT_SENT); + + // Validate span completion + assertTrue(testProducer.hasEnded()); + assertTrue(direct.hasEnded()); + assertTrue(directSendTo.hasEnded()); + assertTrue(directSendFrom.hasEnded()); + assertTrue(cxfRs.hasEnded()); + assertTrue(rest.hasEnded()); + assertTrue(log.hasEnded()); + assertTrue(mock.hasEnded()); + + // Validate same trace + assertEquals(testProducer.getSpanContext().getTraceId(), direct.getSpanContext().getTraceId()); + assertEquals(testProducer.getSpanContext().getTraceId(), directSendTo.getSpanContext().getTraceId()); + assertEquals(testProducer.getSpanContext().getTraceId(), directSendFrom.getSpanContext().getTraceId()); + assertEquals(testProducer.getSpanContext().getTraceId(), cxfRs.getSpanContext().getTraceId()); + assertEquals(testProducer.getSpanContext().getTraceId(), rest.getSpanContext().getTraceId()); + assertEquals(testProducer.getSpanContext().getTraceId(), log.getSpanContext().getTraceId()); + assertEquals(testProducer.getSpanContext().getTraceId(), mock.getSpanContext().getTraceId()); + + // Validate different Exchange ID + assertNotEquals(testProducer.getAttributes().get(AttributeKey.stringKey("exchangeId")), + rest.getAttributes().get(AttributeKey.stringKey("exchangeId"))); + assertEquals(testProducer.getAttributes().get(AttributeKey.stringKey("exchangeId")), + direct.getAttributes().get(AttributeKey.stringKey("exchangeId"))); + assertEquals(testProducer.getAttributes().get(AttributeKey.stringKey("exchangeId")), + directSendTo.getAttributes().get(AttributeKey.stringKey("exchangeId"))); + assertEquals(testProducer.getAttributes().get(AttributeKey.stringKey("exchangeId")), + directSendFrom.getAttributes().get(AttributeKey.stringKey("exchangeId"))); + assertEquals(testProducer.getAttributes().get(AttributeKey.stringKey("exchangeId")), + cxfRs.getAttributes().get(AttributeKey.stringKey("exchangeId"))); + assertEquals(rest.getAttributes().get(AttributeKey.stringKey("exchangeId")), + log.getAttributes().get(AttributeKey.stringKey("exchangeId"))); + assertEquals(rest.getAttributes().get(AttributeKey.stringKey("exchangeId")), + mock.getAttributes().get(AttributeKey.stringKey("exchangeId"))); + + // Validate hierarchy + assertFalse(testProducer.getParentSpanContext().isValid()); + assertEquals(testProducer.getSpanContext().getSpanId(), direct.getParentSpanContext().getSpanId()); + assertEquals(direct.getSpanContext().getSpanId(), directSendTo.getParentSpanContext().getSpanId()); + assertEquals(directSendTo.getSpanContext().getSpanId(), directSendFrom.getParentSpanContext().getSpanId()); + assertEquals(directSendFrom.getSpanContext().getSpanId(), cxfRs.getParentSpanContext().getSpanId()); + assertEquals(cxfRs.getSpanContext().getSpanId(), rest.getParentSpanContext().getSpanId()); + assertEquals(rest.getSpanContext().getSpanId(), log.getParentSpanContext().getSpanId()); + assertEquals(rest.getSpanContext().getSpanId(), mock.getParentSpanContext().getSpanId()); + + // Validate message logging + // The micrometer framework sets the name instead of the opentelemetry attributes + assertEquals("message=A direct message", directSendFrom.getEvents().get(0).getName()); + assertEquals("message=say-hi", rest.getEvents().get(0).getName()); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .routeId("myRoute") + .to("direct:send"); + + from("direct:send") + .log("A direct message") + .to("cxfrs:http://localhost:" + cxfPort + + "/rest/helloservice/sayHello?synchronous=false"); + + restConfiguration() + .port(cxfPort); + + rest("/rest/helloservice") + .post("/sayHello") + .routeId("rest-GET-say-hi") + .to("direct:hi"); + + from("direct:hi") + .routeId("direct-hi") + .delay(2000) + .log("say-hi") + .to("log:hi") + .to("mock:end"); + } + }; + } + +} diff --git a/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/AsyncDirectTest.java b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/AsyncDirectTest.java new file mode 100644 index 00000000000..399cf87e3ea --- /dev/null +++ b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/AsyncDirectTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import io.micrometer.tracing.test.simple.SimpleSpan; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.telemetry.Op; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class AsyncDirectTest extends MicrometerObservabilityTracerTestSupport { + + @Test + void testRouteMultipleRequests() throws InterruptedException, IOException { + int j = 10; + MockEndpoint mock = getMockEndpoint("mock:end"); + mock.expectedMessageCount(j); + mock.setAssertPeriod(5000); + for (int i = 0; i < j; i++) { + context.createProducerTemplate().sendBody("direct:start", "Hello!"); + } + mock.assertIsSatisfied(1000); + Map<String, MicrometerObservabilityTrace> traces = traces(); + // Each trace should have a unique trace id. It is enough to assert that + // the number of elements in the map is the same of the requests to prove + // all traces have been generated uniquely. + assertEquals(j, traces.size()); + // Each trace should have the same structure + for (MicrometerObservabilityTrace trace : traces.values()) { + checkTrace(trace, "Hello!"); + } + + } + + private void checkTrace(MicrometerObservabilityTrace trace, String expectedBody) { + List<SimpleSpan> spans = trace.getSpans(); + assertEquals(7, spans.size()); + SimpleSpan testProducer = MicrometerObservabilityTracerTestSupport.getSpan(spans, "direct://start", Op.EVENT_SENT); + SimpleSpan direct = MicrometerObservabilityTracerTestSupport.getSpan(spans, "direct://start", Op.EVENT_RECEIVED); + SimpleSpan newDirectTo = MicrometerObservabilityTracerTestSupport.getSpan(spans, "direct://new", Op.EVENT_SENT); + SimpleSpan log = MicrometerObservabilityTracerTestSupport.getSpan(spans, "log://info", Op.EVENT_SENT); + SimpleSpan newDirectFrom = MicrometerObservabilityTracerTestSupport.getSpan(spans, "direct://new", Op.EVENT_RECEIVED); + SimpleSpan newLog = MicrometerObservabilityTracerTestSupport.getSpan(spans, "log://new", Op.EVENT_SENT); + SimpleSpan newMock = MicrometerObservabilityTracerTestSupport.getSpan(spans, "mock://end", Op.EVENT_SENT); + + // Validate span completion + assertNotEquals("", testProducer.getEndTimestamp()); + assertNotEquals("", direct.getEndTimestamp()); + assertNotEquals("", newDirectTo.getEndTimestamp()); + assertNotEquals("", log.getEndTimestamp()); + assertNotEquals("", newDirectFrom.getEndTimestamp()); + assertNotEquals("", newLog.getEndTimestamp()); + assertNotEquals("", newMock.getEndTimestamp()); + + // Validate same trace + assertEquals(testProducer.getTraceId(), direct.getTraceId()); + assertEquals(testProducer.getTraceId(), newDirectTo.getTraceId()); + assertEquals(testProducer.getTraceId(), log.getTraceId()); + assertEquals(testProducer.getTraceId(), newDirectFrom.getTraceId()); + assertEquals(testProducer.getTraceId(), newLog.getTraceId()); + assertEquals(testProducer.getTraceId(), newMock.getTraceId()); + + // Validate same Exchange ID + // As it's a "direct" component, we expect the logic to happen within the same + // Exchange boundary + assertEquals(testProducer.getTags().get("exchangeId"), direct.getTags().get("exchangeId")); + assertEquals(testProducer.getTags().get("exchangeId"), newDirectTo.getTags().get("exchangeId")); + assertEquals(testProducer.getTags().get("exchangeId"), newDirectFrom.getTags().get("exchangeId")); + assertEquals(testProducer.getTags().get("exchangeId"), log.getTags().get("exchangeId")); + assertEquals(testProducer.getTags().get("exchangeId"), newLog.getTags().get("exchangeId")); + assertEquals(testProducer.getTags().get("exchangeId"), newMock.getTags().get("exchangeId")); + + // // Validate hierarchy + assertEquals("", testProducer.getParentId()); + assertEquals(testProducer.getSpanId(), direct.getParentId()); + assertEquals(direct.getSpanId(), newDirectTo.getParentId()); + assertEquals(direct.getSpanId(), log.getParentId()); + assertEquals(newDirectTo.getSpanId(), newDirectFrom.getParentId()); + assertEquals(newDirectFrom.getSpanId(), newLog.getParentId()); + assertEquals(newDirectFrom.getSpanId(), newMock.getParentId()); + + // Validate message logging + assertEquals("message=A direct message", direct.getEvents().iterator().next().getValue()); + assertEquals("message=A new message", newDirectFrom.getEvents().iterator().next().getValue()); + + if (expectedBody == null) { + assertEquals( + "message=Exchange[ExchangePattern: InOut, BodyType: null, Body: [Body is null]]", + log.getEvents().iterator().next().getValue()); + assertEquals( + "message=Exchange[ExchangePattern: InOut, BodyType: null, Body: [Body is null]]", + newLog.getEvents().iterator().next().getValue()); + } else { + assertEquals( + "message=Exchange[ExchangePattern: InOnly, BodyType: String, Body: " + expectedBody + "]", + log.getEvents().iterator().next().getValue()); + assertEquals( + "message=Exchange[ExchangePattern: InOnly, BodyType: String, Body: " + expectedBody + "]", + newLog.getEvents().iterator().next().getValue()); + } + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .routeId("start") + .to("direct:new") + .log("A direct message") + .to("log:info"); + + from("direct:new") + .delay(2000) + .routeId("new") + .log("A new message") + .to("log:new") + .to("mock:end"); + } + }; + } + +} diff --git a/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/AsyncWiretapTest.java b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/AsyncWiretapTest.java new file mode 100644 index 00000000000..3458d30ff06 --- /dev/null +++ b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/AsyncWiretapTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import io.micrometer.tracing.test.simple.SimpleSpan; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.telemetry.Op; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +/* + * WiretappedRouteTest tests the execution of a new spin off component which would create a new exchange, + * for example, using the wiretap component. + */ +public class AsyncWiretapTest extends MicrometerObservabilityTracerTestSupport { + + @Test + void testRouteMultipleRequests() throws InterruptedException, IOException { + int j = 10; + MockEndpoint mock = getMockEndpoint("mock:end"); + mock.expectedMessageCount(j); + mock.setAssertPeriod(5000); + for (int i = 0; i < j; i++) { + context.createProducerTemplate().sendBody("direct:start", "Hello!"); + } + mock.assertIsSatisfied(1000); + Map<String, MicrometerObservabilityTrace> traces = traces(); + // Each trace should have a unique trace id. It is enough to assert that + // the number of elements in the map is the same of the requests to prove + // all traces have been generated uniquely. + assertEquals(j, traces.size()); + // Each trace should have the same structure + for (MicrometerObservabilityTrace trace : traces.values()) { + checkTrace(trace, "Hello!"); + } + + } + + private void checkTrace(MicrometerObservabilityTrace trace, String expectedBody) { + List<SimpleSpan> spans = trace.getSpans(); + assertEquals(7, spans.size()); + SimpleSpan testProducer = MicrometerObservabilityTracerTestSupport.getSpan(spans, "direct://start", Op.EVENT_SENT); + SimpleSpan direct = MicrometerObservabilityTracerTestSupport.getSpan(spans, "direct://start", Op.EVENT_RECEIVED); + SimpleSpan wiretapDirectTo = MicrometerObservabilityTracerTestSupport.getSpan(spans, "direct://tap", Op.EVENT_SENT); + SimpleSpan wiretapDirectFrom + = MicrometerObservabilityTracerTestSupport.getSpan(spans, "direct://tap", Op.EVENT_RECEIVED); + SimpleSpan log = MicrometerObservabilityTracerTestSupport.getSpan(spans, "log://info", Op.EVENT_SENT); + SimpleSpan wiretapLog = MicrometerObservabilityTracerTestSupport.getSpan(spans, "log://tapped", Op.EVENT_SENT); + SimpleSpan wiretapMock = MicrometerObservabilityTracerTestSupport.getSpan(spans, "mock://end", Op.EVENT_SENT); + + // Validate span completion + assertNotEquals("", testProducer.getEndTimestamp()); + assertNotEquals("", direct.getEndTimestamp()); + assertNotEquals("", wiretapDirectTo.getEndTimestamp()); + assertNotEquals("", log.getEndTimestamp()); + assertNotEquals("", wiretapDirectFrom.getEndTimestamp()); + assertNotEquals("", wiretapLog.getEndTimestamp()); + assertNotEquals("", wiretapMock.getEndTimestamp()); + + // Validate same trace + assertEquals(testProducer.getTraceId(), direct.getTraceId()); + assertEquals(testProducer.getTraceId(), wiretapDirectTo.getTraceId()); + assertEquals(testProducer.getTraceId(), log.getTraceId()); + assertEquals(testProducer.getTraceId(), wiretapDirectFrom.getTraceId()); + assertEquals(testProducer.getTraceId(), wiretapLog.getTraceId()); + assertEquals(testProducer.getTraceId(), wiretapMock.getTraceId()); + + // Validate different Exchange ID + assertNotEquals(testProducer.getTags().get("exchangeId"), wiretapDirectTo.getTags().get("exchangeId")); + assertEquals(testProducer.getTags().get("exchangeId"), direct.getTags().get("exchangeId")); + assertEquals(testProducer.getTags().get("exchangeId"), log.getTags().get("exchangeId")); + assertEquals(wiretapDirectTo.getTags().get("exchangeId"), wiretapDirectFrom.getTags().get("exchangeId")); + assertEquals(wiretapDirectTo.getTags().get("exchangeId"), wiretapLog.getTags().get("exchangeId")); + assertEquals(wiretapDirectTo.getTags().get("exchangeId"), wiretapMock.getTags().get("exchangeId")); + + // Validate hierarchy + assertEquals("", testProducer.getParentId()); + assertEquals(testProducer.getSpanId(), direct.getParentId()); + assertEquals(direct.getSpanId(), wiretapDirectTo.getParentId()); + assertEquals(direct.getSpanId(), log.getParentId()); + assertEquals(wiretapDirectTo.getSpanId(), wiretapDirectFrom.getParentId()); + assertEquals(wiretapDirectFrom.getSpanId(), wiretapLog.getParentId()); + assertEquals(wiretapDirectFrom.getSpanId(), wiretapMock.getParentId()); + + // Validate message logging + assertEquals("message=A direct message", direct.getEvents().iterator().next().getValue()); + assertEquals("message=A tapped message", wiretapDirectFrom.getEvents().iterator().next().getValue()); + + if (expectedBody == null) { + assertEquals( + "message=Exchange[ExchangePattern: InOut, BodyType: null, Body: [Body is null]]", + log.getEvents().iterator().next().getValue()); + assertEquals( + "message=Exchange[ExchangePattern: InOut, BodyType: null, Body: [Body is null]]", + wiretapLog.getEvents().iterator().next().getValue()); + } else { + assertEquals( + "message=Exchange[ExchangePattern: InOnly, BodyType: String, Body: " + expectedBody + "]", + log.getEvents().iterator().next().getValue()); + assertEquals( + "message=Exchange[ExchangePattern: InOnly, BodyType: String, Body: " + expectedBody + "]", + wiretapLog.getEvents().iterator().next().getValue()); + } + + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .routeId("start") + .wireTap("direct:tap") + .log("A direct message") + .to("log:info"); + + from("direct:tap") + .delay(2000) + .routeId("wiretapped") + .log("A tapped message") + .to("log:tapped") + .to("mock:end"); + } + }; + } +} diff --git a/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/CamelOpenTelemetryExtension.java b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/CamelOpenTelemetryExtension.java new file mode 100644 index 00000000000..a5e8a199b77 --- /dev/null +++ b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/CamelOpenTelemetryExtension.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +//import io.opentelemetry.sdk.extension.incubator.trace.LeakDetectingSpanProcessor; +import io.opentelemetry.sdk.logs.SdkLoggerProvider; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +/** + * Adapted from + * https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk/testing/src/main/java/io/opentelemetry/sdk/testing/junit5/OpenTelemetryExtension.java + */ +final class CamelOpenTelemetryExtension implements BeforeEachCallback, AfterEachCallback { + + /** + * Returns an extension with a default SDK initialized with an in-memory span exporter and W3C trace context + * propagation. + */ + static CamelOpenTelemetryExtension create() { + InMemorySpanExporter spanExporter = InMemorySpanExporter.create(); + SdkTracerProvider tracerProvider = SdkTracerProvider.builder() + //.addSpanProcessor(LeakDetectingSpanProcessor.create()) + .addSpanProcessor(new LoggingSpanProcessor()) + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + + InMemoryLogRecordExporter logRecordExporter = InMemoryLogRecordExporter.create(); + SdkLoggerProvider loggerProvider = SdkLoggerProvider.builder() + .addLogRecordProcessor(SimpleLogRecordProcessor.create(logRecordExporter)) + .build(); + ContextPropagators propagators = ContextPropagators.create(W3CTraceContextPropagator.getInstance()); + OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder() + .setPropagators(propagators) + .setTracerProvider(tracerProvider) + .setMeterProvider(meterProvider) + .setLoggerProvider(loggerProvider) + .build(); + + return new CamelOpenTelemetryExtension( + openTelemetry, spanExporter, metricReader, logRecordExporter, propagators); + } + + private final OpenTelemetrySdk openTelemetry; + private final InMemorySpanExporter spanExporter; + private final InMemoryMetricReader metricReader; + private final InMemoryLogRecordExporter logRecordExporter; + private final ContextPropagators propagators; + + private CamelOpenTelemetryExtension( + OpenTelemetrySdk openTelemetry, + InMemorySpanExporter spanExporter, + InMemoryMetricReader metricReader, + InMemoryLogRecordExporter logRecordExporter, + ContextPropagators propagators) { + this.openTelemetry = openTelemetry; + this.spanExporter = spanExporter; + this.metricReader = metricReader; + this.logRecordExporter = logRecordExporter; + this.propagators = propagators; + } + + /** + * Returns the {@link OpenTelemetrySdk} created by this extension. + */ + public OpenTelemetry getOpenTelemetry() { + return openTelemetry; + } + + public ContextPropagators getPropagators() { + return this.propagators; + } + + /** + * Returns all the exported {@link SpanData} so far. + */ + public List<SpanData> getSpans() { + return spanExporter.getFinishedSpanItems(); + } + + /** + * Returns the current {@link MetricData} in {@link AggregationTemporality#CUMULATIVE} format. + */ + public List<MetricData> getMetrics() { + return new ArrayList<>(metricReader.collectAllMetrics()); + } + + /** + * Returns all the exported {@link LogRecordData} so far. + */ + public List<LogRecordData> getLogRecords() { + return new ArrayList<>(logRecordExporter.getFinishedLogRecordItems()); + } + + /** + * Clears the collected exported {@link SpanData}. Consider making your test smaller instead of manually clearing + * state using this method. + */ + public void clearSpans() { + spanExporter.reset(); + } + + /** + * Clears all registered metric instruments, such that {@link #getMetrics()} is empty. + */ + public void clearMetrics() { + SdkMeterProviderUtil.resetForTest(openTelemetry.getSdkMeterProvider()); + } + + /** + * Clears the collected exported {@link LogRecordData}. Consider making your test smaller instead of manually + * clearing state using this method. + */ + public void clearLogRecords() { + logRecordExporter.reset(); + } + + @Override + public void beforeEach(ExtensionContext context) { + GlobalOpenTelemetry.resetForTest(); + GlobalOpenTelemetry.set(openTelemetry); + } + + @Override + public void afterEach(ExtensionContext context) { + GlobalOpenTelemetry.resetForTest(); + openTelemetry.close(); + } + + static class LoggingSpanProcessor implements SpanProcessor { + private static final Logger LOG = LoggerFactory.getLogger(LoggingSpanProcessor.class); + private static final Marker OTEL_MARKER = MarkerFactory.getMarker("OTEL"); + + @Override + public void onStart(Context context, ReadWriteSpan readWriteSpan) { + LOG.info(OTEL_MARKER, "Span started: name - '{}', kind - '{}', id - '{}-{}", readWriteSpan.getName(), + readWriteSpan.getKind(), + readWriteSpan.getSpanContext().getTraceId(), readWriteSpan.getSpanContext().getSpanId()); + } + + @Override + public boolean isStartRequired() { + return true; + } + + @Override + public void onEnd(ReadableSpan readableSpan) { + LOG.info(OTEL_MARKER, "Span ended: name - '{}', kind - '{}', id - '{}-{}", readableSpan.getName(), + readableSpan.getKind(), + readableSpan.getSpanContext().getTraceId(), readableSpan.getSpanContext().getSpanId()); + } + + @Override + public boolean isEndRequired() { + return true; + } + } + + Map<String, OtelTrace> getTraces() { + Map<String, OtelTrace> answer = new HashMap<>(); + for (SpanData span : this.getSpans()) { + String traceId = span.getTraceId(); + OtelTrace trace = answer.get(traceId); + if (trace == null) { + trace = new OtelTrace(traceId); + answer.put(traceId, trace); + } + trace.addSpan(span); + } + + // Sort the spans for all traces + answer.forEach((id, trace) -> Collections.sort(trace.getSpans(), new SpanComparator())); + + return answer; + } + + class OtelTrace { + String traceId; + List<SpanData> spans; + + OtelTrace(String traceId) { + this.traceId = traceId; + this.spans = new ArrayList<>(); + } + + void addSpan(SpanData span) { + this.spans.add(span); + } + + List<SpanData> getSpans() { + return this.spans; + } + + @Override + public String toString() { + return traceId + " " + spans; + } + } + + class SpanComparator implements java.util.Comparator<SpanData> { + @Override + public int compare(SpanData a, SpanData b) { + Long nanosA = a.getStartEpochNanos(); + Long nanosB = b.getStartEpochNanos(); + return (int) (nanosA - nanosB); + } + } +} diff --git a/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/DisableEndpointTest.java b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/DisableEndpointTest.java new file mode 100644 index 00000000000..5be3c7a361d --- /dev/null +++ b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/DisableEndpointTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import io.micrometer.tracing.test.simple.SimpleSpan; +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class DisableEndpointTest extends MicrometerObservabilityTracerTestSupport { + + @Override + protected CamelContext createCamelContext() throws Exception { + tst.setTraceProcessors(true); + tst.setExcludePatterns("log*,to*"); + return super.createCamelContext(); + } + + @Test + void testProcessorsTraceRequest() throws IOException { + template.sendBody("direct:start", "my-body"); + Map<String, MicrometerObservabilityTrace> traces = traces(); + assertEquals(1, traces.size()); + checkTrace(traces.values().iterator().next()); + } + + private void checkTrace(MicrometerObservabilityTrace trace) { + List<SimpleSpan> spans = trace.getSpans(); + assertEquals(2, spans.size()); + SimpleSpan testProducer = spans.get(0); + SimpleSpan direct = spans.get(1); + + // Validate span completion + assertNotEquals("", testProducer.getEndTimestamp()); + assertNotEquals("", direct.getEndTimestamp()); + + // Validate same trace + assertEquals(testProducer.getTraceId(), direct.getTraceId()); + + // Validate hierarchy + assertEquals("", testProducer.getParentId()); + assertEquals(testProducer.getSpanId(), direct.getParentId()); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .routeId("start") + .log("A message") + .to("log:info"); + } + }; + } + +} diff --git a/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/EnableProcessorsTest.java b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/EnableProcessorsTest.java new file mode 100644 index 00000000000..33a1325b8d1 --- /dev/null +++ b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/EnableProcessorsTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import io.micrometer.tracing.test.simple.SimpleSpan; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.telemetry.Op; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class EnableProcessorsTest extends MicrometerObservabilityTracerTestSupport { + + @Override + protected CamelContext createCamelContext() throws Exception { + tst.setTraceProcessors(true); + return super.createCamelContext(); + } + + @Test + void testProcessorsTraceRequest() throws IOException { + template.sendBody("direct:start", "my-body"); + Map<String, MicrometerObservabilityTrace> traces = traces(); + assertEquals(1, traces.size()); + checkTrace(traces.values().iterator().next()); + } + + private void checkTrace(MicrometerObservabilityTrace trace) { + List<SimpleSpan> spans = trace.getSpans(); + assertEquals(6, spans.size()); + + SimpleSpan testProducer = spans.get(0); + SimpleSpan direct = spans.get(1); + SimpleSpan innerLog = spans.get(2); + SimpleSpan innerProcessor = spans.get(3); + SimpleSpan log = spans.get(4); + SimpleSpan innerToLog = spans.get(5); + + // Validate span completion + assertNotEquals("", testProducer.getEndTimestamp()); + assertNotEquals("", direct.getEndTimestamp()); + assertNotEquals("", innerLog.getEndTimestamp()); + assertNotEquals("", innerProcessor.getEndTimestamp()); + assertNotEquals("", log.getEndTimestamp()); + assertNotEquals("", innerToLog.getEndTimestamp()); + + // Validate same trace + assertEquals(testProducer.getTraceId(), direct.getTraceId()); + assertEquals(testProducer.getTraceId(), innerLog.getTraceId()); + assertEquals(testProducer.getTraceId(), innerProcessor.getTraceId()); + assertEquals(testProducer.getTraceId(), log.getTraceId()); + assertEquals(testProducer.getTraceId(), innerToLog.getTraceId()); + + // Validate op + assertEquals(Op.EVENT_RECEIVED.toString(), direct.getTags().get("op")); + assertEquals(Op.EVENT_PROCESS.toString(), innerProcessor.getTags().get("op")); + + // Validate hierarchy + assertEquals("", testProducer.getParentId()); + assertEquals(testProducer.getSpanId(), direct.getParentId()); + assertEquals(direct.getSpanId(), innerLog.getParentId()); + assertEquals(direct.getSpanId(), innerProcessor.getParentId()); + assertEquals(direct.getSpanId(), log.getParentId()); + assertEquals(log.getSpanId(), innerToLog.getParentId()); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .routeId("start") + .log("A message") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader("operation", "fake"); + } + }) + .to("log:info"); + } + }; + } + +} diff --git a/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTrace.java b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTrace.java new file mode 100644 index 00000000000..07e00335bc1 --- /dev/null +++ b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTrace.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.util.ArrayList; +import java.util.List; + +import io.micrometer.tracing.test.simple.SimpleSpan; + +/* + * This class is used for testing purposes only. It groups an array of Spans belonging to the same Trace. + */ +public class MicrometerObservabilityTrace { + + private String traceId; + private List<SimpleSpan> spans; + + public MicrometerObservabilityTrace(String traceId) { + this.traceId = traceId; + this.spans = new ArrayList<>(); + } + + public void add(SimpleSpan span) { + this.spans.add(span); + } + + public List<SimpleSpan> getSpans() { + return this.spans; + } + + public String getTraceId() { + return this.traceId; + } + + public String toString() { + return "Trace Id: " + traceId + " " + spans; + } + +} diff --git a/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracerPropagationTestSupport.java b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracerPropagationTestSupport.java new file mode 100644 index 00000000000..915b6fc27d8 --- /dev/null +++ b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracerPropagationTestSupport.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.util.List; + +import io.micrometer.tracing.otel.bridge.OtelCurrentTraceContext; +import io.micrometer.tracing.otel.bridge.OtelPropagator; +import io.micrometer.tracing.otel.bridge.OtelTracer; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.trace.data.SpanData; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.telemetry.Op; +import org.apache.camel.telemetry.TagConstants; +import org.apache.camel.test.junit5.ExchangeTestSupport; + +/** + * This test is special as it requires a different setting to inherit the Opentelemetry propagation mechanism. + */ +public class MicrometerObservabilityTracerPropagationTestSupport extends ExchangeTestSupport { + + protected CamelOpenTelemetryExtension otelExtension = CamelOpenTelemetryExtension.create(); + + @Override + protected CamelContext createCamelContext() throws Exception { + MicrometerObservabilityTracer tst = new MicrometerObservabilityTracer(); + CamelContext context = super.createCamelContext(); + + ContextPropagators propagators = otelExtension.getPropagators(); + io.opentelemetry.api.trace.Tracer otelTracer = otelExtension.getOpenTelemetry().getTracer("traceTest"); + + OtelPropagator otelPropagator = new OtelPropagator(propagators, otelTracer); + OtelCurrentTraceContext currentTraceContext = new OtelCurrentTraceContext(); + // We must convert the Otel Tracer into a micrometer Tracer + io.micrometer.tracing.Tracer micrometerTracer = new OtelTracer( + otelTracer, + currentTraceContext, + null); + + context.getRegistry().bind("MicrometerObservabilityTracer", micrometerTracer); + context.getRegistry().bind("OpentelemetryPropagators", otelPropagator); + + CamelContextAware.trySetCamelContext(tst, context); + tst.init(context); + return context; + } + + protected SpanData getSpan(List<SpanData> trace, String uri, Op op) { + for (SpanData span : trace) { + String camelURI = span.getAttributes().get(AttributeKey.stringKey("camel.uri")); + if (camelURI != null && camelURI.equals(uri)) { + String operation = span.getAttributes().get(AttributeKey.stringKey(TagConstants.OP)); + if (operation != null && operation.equals(op.toString())) { + return span; + } + } + } + throw new IllegalArgumentException("Trying to get a non existing span!"); + } + +} diff --git a/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracerTest.java b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracerTest.java new file mode 100644 index 00000000000..7203edf70bb --- /dev/null +++ b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracerTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import io.micrometer.tracing.test.simple.SimpleSpan; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.telemetry.Op; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class MicrometerObservabilityTracerTest extends MicrometerObservabilityTracerTestSupport { + + @Test + void testRouteSingleRequest() throws IOException { + template.request("direct:start", null); + Map<String, MicrometerObservabilityTrace> traces = traces(); + assertEquals(1, traces.size()); + checkTrace(traces.values().iterator().next(), null); + } + + @Test + void testRouteMultipleRequests() throws IOException { + for (int i = 1; i <= 10; i++) { + context.createProducerTemplate().sendBody("direct:start", "Hello!"); + } + Map<String, MicrometerObservabilityTrace> traces = traces(); + // Each trace should have a unique trace id. It is enough to assert that + // the number of elements in the map is the same of the requests to prove + // all traces have been generated uniquely. + assertEquals(10, traces.size()); + // Each trace should have the same structure + for (MicrometerObservabilityTrace trace : traces.values()) { + checkTrace(trace, "Hello!"); + } + } + + private void checkTrace(MicrometerObservabilityTrace trace, String expectedBody) { + List<SimpleSpan> spans = trace.getSpans(); + assertEquals(3, spans.size()); + SimpleSpan testProducer = spans.get(0); + SimpleSpan direct = spans.get(1); + SimpleSpan log = spans.get(2); + + // Validate span completion + assertNotEquals("", testProducer.getEndTimestamp()); + assertNotEquals("", direct.getEndTimestamp()); + assertNotEquals("", log.getEndTimestamp()); + + // Validate same trace + assertEquals(testProducer.getTraceId(), direct.getTraceId()); + assertEquals(direct.getTraceId(), log.getTraceId()); + + // Validate hierarchy + assertEquals("", testProducer.getParentId()); + assertEquals(testProducer.getSpanId(), direct.getParentId()); + assertEquals(direct.getSpanId(), log.getParentId()); + + // Validate operations + assertEquals(Op.EVENT_SENT.toString(), testProducer.getTags().get("op")); + assertEquals(Op.EVENT_RECEIVED.toString(), direct.getTags().get("op")); + + // Validate message logging + assertEquals("message=A message", direct.getEvents().iterator().next().getValue()); + if (expectedBody == null) { + assertEquals( + "message=Exchange[ExchangePattern: InOut, BodyType: null, Body: [Body is null]]", + log.getEvents().iterator().next().getValue()); + } else { + assertEquals( + "message=Exchange[ExchangePattern: InOnly, BodyType: String, Body: " + expectedBody + "]", + log.getEvents().iterator().next().getValue()); + } + + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .routeId("start") + .log("A message") + .to("log:info"); + } + }; + } + +} diff --git a/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracerTestSupport.java b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracerTestSupport.java new file mode 100644 index 00000000000..74c509308fa --- /dev/null +++ b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/MicrometerObservabilityTracerTestSupport.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.micrometer.tracing.test.simple.SimpleSpan; +import io.micrometer.tracing.test.simple.SimpleTracer; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.telemetry.Op; +import org.apache.camel.telemetry.TagConstants; +import org.apache.camel.test.junit5.ExchangeTestSupport; + +public class MicrometerObservabilityTracerTestSupport extends ExchangeTestSupport { + + protected SimpleTracer tracer = new SimpleTracer(); + protected MicrometerObservabilityTracer tst = new MicrometerObservabilityTracer(); + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getRegistry().bind("MicrometerObservabilityTracer", tracer); + CamelContextAware.trySetCamelContext(tst, context); + tst.init(context); + return context; + } + + protected Map<String, MicrometerObservabilityTrace> traces() { + HashMap<String, MicrometerObservabilityTrace> map = new HashMap<>(); + for (SimpleSpan span : tracer.getSpans()) { + String traceId = span.getTraceId(); + MicrometerObservabilityTrace trace = map.get(traceId); + if (trace == null) { + trace = new MicrometerObservabilityTrace(traceId); + map.put(traceId, trace); + } + trace.add(span); + } + return map; + } + + protected static SimpleSpan getSpan(List<SimpleSpan> trace, String uri, Op op) { + for (SimpleSpan span : trace) { + if (span.getTags().get("camel.uri") != null && span.getTags().get("camel.uri").equals(uri)) { + if (span.getTags().get(TagConstants.OP).equals(op.toString())) { + return span; + } + } + } + throw new IllegalArgumentException("Trying to get a non existing span!"); + } + +} diff --git a/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/SpanPropagationTest.java b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/SpanPropagationTest.java new file mode 100644 index 00000000000..70c4c719187 --- /dev/null +++ b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/SpanPropagationTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import io.opentelemetry.sdk.trace.data.SpanData; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.micrometer.observability.CamelOpenTelemetryExtension.OtelTrace; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * This test is special as it requires a different setting to inherit the Opentelemetry propagation mechanism. + */ +public class SpanPropagationTest extends MicrometerObservabilityTracerPropagationTestSupport { + + @Test + void testPropagateUpstreamTraceRequest() throws IOException { + template.requestBodyAndHeader("direct:start", "sample body", + "traceparent", "00-0af044aea5c127fd5ab5f839de2b8ae2-d362a8a943c2b289-01"); + Map<String, OtelTrace> traces = otelExtension.getTraces(); + assertEquals(1, traces.size()); + checkTrace(traces.values().iterator().next()); + } + + private void checkTrace(OtelTrace trace) { + List<SpanData> spans = trace.getSpans(); + assertEquals(3, spans.size()); + SpanData testProducer = spans.get(0); + SpanData direct = spans.get(1); + SpanData log = spans.get(2); + + // Validate span completion + assertTrue(testProducer.hasEnded()); + assertTrue(direct.hasEnded()); + assertTrue(log.hasEnded()); + + // Validate same trace + assertEquals("0af044aea5c127fd5ab5f839de2b8ae2", testProducer.getSpanContext().getTraceId()); + assertEquals(testProducer.getSpanContext().getTraceId(), direct.getSpanContext().getTraceId()); + assertEquals(direct.getSpanContext().getTraceId(), log.getSpanContext().getTraceId()); + + // Validate hierarchy + assertEquals("d362a8a943c2b289", testProducer.getParentSpanContext().getSpanId()); + assertEquals(testProducer.getSpanContext().getSpanId(), direct.getParentSpanContext().getSpanId()); + assertEquals(direct.getSpanContext().getSpanId(), log.getParentSpanContext().getSpanId()); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .routeId("start") + .log("A message") + .to("log:info"); + } + }; + } + +} diff --git a/components/camel-micrometer-observability/src/test/resources/log4j2.properties b/components/camel-micrometer-observability/src/test/resources/log4j2.properties new file mode 100644 index 00000000000..80f108cf6fc --- /dev/null +++ b/components/camel-micrometer-observability/src/test/resources/log4j2.properties @@ -0,0 +1,30 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +appender.console.type = Console +appender.console.name = console +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n + +appender.file.type = File +appender.file.name = file +appender.file.fileName = target/camel-micrometer-observability-test.log +appender.file.append = true +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n + +rootLogger.level = INFO +rootLogger.appenderRef.file.ref = file diff --git a/parent/pom.xml b/parent/pom.xml index 76eb6db9c31..c5ff0290cd6 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -1922,6 +1922,11 @@ <artifactId>camel-micrometer</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-micrometer-observability</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-micrometer-prometheus</artifactId> @@ -3287,6 +3292,16 @@ <artifactId>micrometer-tracing</artifactId> <version>${micrometer-tracing-version}</version> </dependency> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-tracing-bridge-otel</artifactId> + <version>${micrometer-tracing-version}</version> + </dependency> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-tracing-test</artifactId> + <version>${micrometer-tracing-version}</version> + </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId>
