This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push:
new cc9a6af8ff7 CAMEL-21309: camel-cxf - Force using sync client when
using tracing/opentelemetry as otherwise spans are not working correctly.
(#15816)
cc9a6af8ff7 is described below
commit cc9a6af8ff7ad6d73d1e2bd97eb5e9db7ebb0d3f
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Oct 3 13:16:14 2024 +0200
CAMEL-21309: camel-cxf - Force using sync client when using
tracing/opentelemetry as otherwise spans are not working correctly. (#15816)
* CAMEL-21309: camel-cxf - Force using sync client when using
tracing/opentelemetry as otherwise spans are not working correctly.
---
.../camel/component/cxf/jaxrs/CxfRsProducer.java | 24 ++++++++++++++++++++--
.../camel/component/cxf/jaxws/CxfProducer.java | 17 +++++++++++++--
2 files changed, 37 insertions(+), 4 deletions(-)
diff --git
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
index d03dfcee08f..952f38c2632 100644
---
a/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
+++
b/components/camel-cxf/camel-cxf-rest/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsProducer.java
@@ -42,6 +42,7 @@ import jakarta.ws.rs.core.Response;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Message;
import org.apache.camel.component.cxf.common.CxfOperationException;
import org.apache.camel.component.cxf.common.message.CxfConstants;
@@ -59,6 +60,8 @@ import org.apache.cxf.jaxrs.client.WebClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.camel.Exchange.ACTIVE_SPAN;
+
/**
* CxfRsProducer binds a Camel exchange to a CXF exchange, acts as a CXF JAXRS
client, it will turn the normal Object
* invocation to a RESTful request according to resource annotation. Any
response will be bound to Camel exchange.
@@ -108,6 +111,17 @@ public class CxfRsProducer extends DefaultAsyncProducer {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
+ // if using camel-tracer then execute this synchronously due to
CXF-9063
+ if (exchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN) != null) {
+ try {
+ process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ callback.done(true);
+ return true;
+ }
+
try {
Message inMessage = exchange.getIn();
Boolean httpClientAPI =
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.class);
@@ -122,7 +136,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
return false;
} catch (Exception exception) {
- LOG.error("Error invoking request", exception);
exchange.setException(exception);
callback.done(true);
return true;
@@ -130,6 +143,8 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
protected void invokeAsyncHttpClient(Exchange exchange, final
AsyncCallback callback) throws Exception {
+ LOG.trace("Process exchange: {} (asynchronously)", exchange);
+
Message inMessage = exchange.getIn();
JAXRSClientFactoryBean cfb =
clientFactoryBeanCache.get(CxfRsEndpointUtils
.getEffectiveAddress(exchange, ((CxfRsEndpoint)
getEndpoint()).getAddress()));
@@ -188,6 +203,8 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
protected void invokeAsyncProxyClient(Exchange exchange, final
AsyncCallback callback) throws Exception {
+ LOG.trace("Process exchange: {} (asynchronously)", exchange);
+
Message inMessage = exchange.getIn();
Object[] varValues =
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class);
String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME,
String.class);
@@ -263,7 +280,6 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
protected void setupClientMatrix(WebClient client, Exchange exchange)
throws Exception {
-
org.apache.cxf.message.Message cxfMessage
= (org.apache.cxf.message.Message)
exchange.getIn().getHeader(CxfConstants.CAMEL_CXF_MESSAGE);
if (cxfMessage != null) {
@@ -294,6 +310,8 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
protected void invokeHttpClient(Exchange exchange) throws Exception {
+ LOG.trace("Process exchange: {} (synchronously)", exchange);
+
Message inMessage = exchange.getIn();
JAXRSClientFactoryBean cfb =
clientFactoryBeanCache.get(CxfRsEndpointUtils
.getEffectiveAddress(exchange, ((CxfRsEndpoint)
getEndpoint()).getAddress()));
@@ -441,6 +459,8 @@ public class CxfRsProducer extends DefaultAsyncProducer {
}
protected void invokeProxyClient(Exchange exchange) throws Exception {
+ LOG.trace("Process exchange: {} (synchronously)", exchange);
+
Message inMessage = exchange.getIn();
Object[] varValues =
inMessage.getHeader(CxfConstants.CAMEL_CXF_RS_VAR_VALUES, Object[].class);
String methodName = inMessage.getHeader(CxfConstants.OPERATION_NAME,
String.class);
diff --git
a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
index ca713e0ebe9..a666b632923 100644
---
a/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
+++
b/components/camel-cxf/camel-cxf-soap/src/main/java/org/apache/camel/component/cxf/jaxws/CxfProducer.java
@@ -32,6 +32,7 @@ import javax.xml.namespace.QName;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.cxf.common.CxfPayload;
import org.apache.camel.component.cxf.common.DataFormat;
@@ -51,6 +52,8 @@ import org.apache.cxf.service.model.BindingOperationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.camel.Exchange.ACTIVE_SPAN;
+
/**
* CxfProducer binds a Camel exchange to a CXF exchange, acts as a CXF client,
and sends the request to a CXF to a
* server. Any response will be bound to Camel exchange.
@@ -99,8 +102,18 @@ public class CxfProducer extends DefaultAsyncProducer {
// so we don't delegate the sync process call to the async process
@Override
public boolean process(Exchange camelExchange, AsyncCallback callback) {
- LOG.trace("Process exchange: {} in an async way.", camelExchange);
+ // if using camel-tracer then execute this synchronously due to
CXF-9063
+ if (camelExchange.getProperty(ExchangePropertyKey.ACTIVE_SPAN) !=
null) {
+ try {
+ process(camelExchange);
+ } catch (Exception e) {
+ camelExchange.setException(e);
+ }
+ callback.done(true);
+ return true;
+ }
+ LOG.trace("Process exchange: {} (asynchronously)", camelExchange);
try {
// create CXF exchange
ExchangeImpl cxfExchange = new ExchangeImpl();
@@ -137,7 +150,7 @@ public class CxfProducer extends DefaultAsyncProducer {
*/
@Override
public void process(Exchange camelExchange) throws Exception {
- LOG.trace("Process exchange: {} in sync way.", camelExchange);
+ LOG.trace("Process exchange: {} (synchronously)", camelExchange);
// create CXF exchange
ExchangeImpl cxfExchange = new ExchangeImpl();