This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit e9f5a12071e4318e284189f156b0be8aa4b2bc71 Author: Claus Ibsen <[email protected]> AuthorDate: Fri Feb 19 09:56:15 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../main/java/org/apache/camel/spi/UnitOfWork.java | 7 ++ .../camel/impl/engine/CamelInternalProcessor.java | 12 +-- .../camel/impl/engine/DefaultUnitOfWork.java | 95 ++++++++++++---------- .../apache/camel/impl/engine/MDCUnitOfWork.java | 6 ++ .../org/apache/camel/support/DefaultExchange.java | 10 ++- 5 files changed, 77 insertions(+), 53 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java index 20284fa..e94840a 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java @@ -48,6 +48,13 @@ public interface UnitOfWork extends Service { void reset(); /** + * Prepares this unit of work with the given input {@link Exchange} + * + * @param exchange the exchange + */ + void onExchange(Exchange exchange); + + /** * Adds a synchronization hook * * @param synchronization the hook diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 8fba8fe..255b894 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -650,22 +650,24 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In // only return UnitOfWork if we created a new as then its us that handle the lifecycle to done the created UoW UnitOfWork created = null; + UnitOfWork uow = exchange.getUnitOfWork(); - if (exchange.getUnitOfWork() == null) { + if (uow == null) { // If there is no existing UoW, then we should start one and // terminate it once processing is completed for the exchange. created = createUnitOfWork(exchange); ExtendedExchange ee = (ExtendedExchange) exchange; ee.setUnitOfWork(created); created.start(); + uow = created; + } else { + // reuse existing exchange + uow.onExchange(exchange); } // for any exchange we should push/pop route context so we can keep track of which route we are routing if (route != null) { - UnitOfWork existing = exchange.getUnitOfWork(); - if (existing != null) { - existing.pushRoute(route); - } + uow.pushRoute(route); } return created; diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index 975b8a0..7e51696 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -49,20 +49,16 @@ import org.slf4j.LoggerFactory; */ public class DefaultUnitOfWork implements UnitOfWork, Service { private static final Logger LOG = LoggerFactory.getLogger(DefaultUnitOfWork.class); + + // instances used by MDCUnitOfWork final InflightRepository inflightRepository; final boolean allowUseOriginalMessage; final boolean useBreadcrumb; - // TODO: This implementation seems to have transformed itself into a to broad concern - // where unit of work is doing a bit more work than the transactional aspect that ties - // to its name. Maybe this implementation should be named ExchangeContext and we can - // introduce a simpler UnitOfWork concept. This would also allow us to refactor the - // SubUnitOfWork into a general parent/child unit of work concept. However this - // requires API changes and thus is best kept for future Camel work - private final Deque<Route> routes = new ArrayDeque<>(8); - private final Exchange exchange; private final ExtendedCamelContext context; + private final Deque<Route> routes = new ArrayDeque<>(8); private Logger log; + private Exchange exchange; private List<Synchronization> synchronizations; private Message originalInMessage; private Set<Object> transactedBy; @@ -80,59 +76,68 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { public DefaultUnitOfWork(Exchange exchange, InflightRepository inflightRepository, boolean allowUseOriginalMessage, boolean useBreadcrumb) { - this.exchange = exchange; this.log = LOG; this.allowUseOriginalMessage = allowUseOriginalMessage; this.useBreadcrumb = useBreadcrumb; this.context = (ExtendedCamelContext) exchange.getContext(); this.inflightRepository = inflightRepository; + onExchange(exchange); + } - if (allowUseOriginalMessage) { - // special for JmsMessage as it can cause it to loose headers later. - if (exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage")) { - this.originalInMessage = new DefaultMessage(context); - this.originalInMessage.setBody(exchange.getIn().getBody()); - this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders()); - } else { - this.originalInMessage = exchange.getIn().copy(); - } - // must preserve exchange on the original in message - if (this.originalInMessage instanceof MessageSupport) { - ((MessageSupport) this.originalInMessage).setExchange(exchange); - } - } + UnitOfWork newInstance(Exchange exchange) { + return new DefaultUnitOfWork(exchange, inflightRepository, allowUseOriginalMessage, useBreadcrumb); + } - // inject breadcrumb header if enabled - if (useBreadcrumb) { - // create or use existing breadcrumb - String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); - if (breadcrumbId == null) { - // no existing breadcrumb, so create a new one based on the exchange id - breadcrumbId = exchange.getExchangeId(); - exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId); + @Override + public void onExchange(Exchange exchange) { + if (this.exchange == null) { + // unit of work is reused, so setup for this exchange + this.exchange = exchange; + + if (allowUseOriginalMessage) { + // special for JmsMessage as it can cause it to loose headers later. + if (exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage")) { + this.originalInMessage = new DefaultMessage(context); + this.originalInMessage.setBody(exchange.getIn().getBody()); + this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders()); + } else { + this.originalInMessage = exchange.getIn().copy(); + } + // must preserve exchange on the original in message + if (this.originalInMessage instanceof MessageSupport) { + ((MessageSupport) this.originalInMessage).setExchange(exchange); + } } - } - // fire event - if (context.isEventNotificationApplicable()) { - try { - EventHelper.notifyExchangeCreated(context, exchange); - } catch (Throwable e) { - // must catch exceptions to ensure the exchange is not failing due to notification event failed - log.warn("Exception occurred during event notification. This exception will be ignored.", e); + // inject breadcrumb header if enabled + if (useBreadcrumb) { + // create or use existing breadcrumb + String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); + if (breadcrumbId == null) { + // no existing breadcrumb, so create a new one based on the exchange id + breadcrumbId = exchange.getExchangeId(); + exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId); + } } - } - // register to inflight registry - inflightRepository.add(exchange); - } + // fire event + if (context.isEventNotificationApplicable()) { + try { + EventHelper.notifyExchangeCreated(context, exchange); + } catch (Throwable e) { + // must catch exceptions to ensure the exchange is not failing due to notification event failed + log.warn("Exception occurred during event notification. This exception will be ignored.", e); + } + } - UnitOfWork newInstance(Exchange exchange) { - return new DefaultUnitOfWork(exchange, inflightRepository, allowUseOriginalMessage, useBreadcrumb); + // register to inflight registry + inflightRepository.add(exchange); + } } @Override public void reset() { + this.exchange = null; routes.clear(); if (synchronizations != null) { synchronizations.clear(); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java index f6229a2..d111302 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java @@ -202,6 +202,12 @@ public class MDCUnitOfWork extends DefaultUnitOfWork { } @Override + public void reset() { + super.reset(); + clear(); + } + + @Override public String toString() { return "MDCUnitOfWork"; } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index ad238b2..808bfb6 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -53,7 +53,7 @@ public final class DefaultExchange implements ExtendedExchange { private Exception exception; private String exchangeId; private UnitOfWork unitOfWork; - private ExchangePattern originalPattern; + private final ExchangePattern originalPattern; private ExchangePattern pattern; private Endpoint fromEndpoint; private String fromRouteId; @@ -125,16 +125,20 @@ public final class DefaultExchange implements ExtendedExchange { this.properties.clear(); this.exchangeId = null; this.created = 0; + // TODO: optimize in/out to keep as default message (if original message is this kind) this.in = null; this.out = null; this.exception = null; - this.unitOfWork = null; + // reset uow + if (this.unitOfWork != null) { + this.unitOfWork.reset(); + } // reset pattern to original this.pattern = originalPattern; - // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again if (this.onCompletions != null) { this.onCompletions.clear(); } + // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again this.externalRedelivered = null; this.historyNodeId = null; this.historyNodeLabel = null;
