This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e9f5a12071e4318e284189f156b0be8aa4b2bc71
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Feb 19 09:56:15 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../main/java/org/apache/camel/spi/UnitOfWork.java |  7 ++
 .../camel/impl/engine/CamelInternalProcessor.java  | 12 +--
 .../camel/impl/engine/DefaultUnitOfWork.java       | 95 ++++++++++++----------
 .../apache/camel/impl/engine/MDCUnitOfWork.java    |  6 ++
 .../org/apache/camel/support/DefaultExchange.java  | 10 ++-
 5 files changed, 77 insertions(+), 53 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
index 20284fa..e94840a 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
@@ -48,6 +48,13 @@ public interface UnitOfWork extends Service {
     void reset();
 
     /**
+     * Prepares this unit of work with the given input {@link Exchange}
+     *
+     * @param exchange the exchange
+     */
+    void onExchange(Exchange exchange);
+
+    /**
      * Adds a synchronization hook
      *
      * @param synchronization the hook
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 8fba8fe..255b894 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -650,22 +650,24 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
 
             // only return UnitOfWork if we created a new as then its us that 
handle the lifecycle to done the created UoW
             UnitOfWork created = null;
+            UnitOfWork uow = exchange.getUnitOfWork();
 
-            if (exchange.getUnitOfWork() == null) {
+            if (uow == null) {
                 // If there is no existing UoW, then we should start one and
                 // terminate it once processing is completed for the exchange.
                 created = createUnitOfWork(exchange);
                 ExtendedExchange ee = (ExtendedExchange) exchange;
                 ee.setUnitOfWork(created);
                 created.start();
+                uow = created;
+            } else {
+                // reuse existing exchange
+                uow.onExchange(exchange);
             }
 
             // for any exchange we should push/pop route context so we can 
keep track of which route we are routing
             if (route != null) {
-                UnitOfWork existing = exchange.getUnitOfWork();
-                if (existing != null) {
-                    existing.pushRoute(route);
-                }
+                uow.pushRoute(route);
             }
 
             return created;
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 975b8a0..7e51696 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -49,20 +49,16 @@ import org.slf4j.LoggerFactory;
  */
 public class DefaultUnitOfWork implements UnitOfWork, Service {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultUnitOfWork.class);
+
+    // instances used by MDCUnitOfWork
     final InflightRepository inflightRepository;
     final boolean allowUseOriginalMessage;
     final boolean useBreadcrumb;
 
-    // TODO: This implementation seems to have transformed itself into a to 
broad concern
-    //   where unit of work is doing a bit more work than the transactional 
aspect that ties
-    //   to its name. Maybe this implementation should be named 
ExchangeContext and we can
-    //   introduce a simpler UnitOfWork concept. This would also allow us to 
refactor the
-    //   SubUnitOfWork into a general parent/child unit of work concept. 
However this
-    //   requires API changes and thus is best kept for future Camel work
-    private final Deque<Route> routes = new ArrayDeque<>(8);
-    private final Exchange exchange;
     private final ExtendedCamelContext context;
+    private final Deque<Route> routes = new ArrayDeque<>(8);
     private Logger log;
+    private Exchange exchange;
     private List<Synchronization> synchronizations;
     private Message originalInMessage;
     private Set<Object> transactedBy;
@@ -80,59 +76,68 @@ public class DefaultUnitOfWork implements UnitOfWork, 
Service {
 
     public DefaultUnitOfWork(Exchange exchange, InflightRepository 
inflightRepository, boolean allowUseOriginalMessage,
                              boolean useBreadcrumb) {
-        this.exchange = exchange;
         this.log = LOG;
         this.allowUseOriginalMessage = allowUseOriginalMessage;
         this.useBreadcrumb = useBreadcrumb;
         this.context = (ExtendedCamelContext) exchange.getContext();
         this.inflightRepository = inflightRepository;
+        onExchange(exchange);
+    }
 
-        if (allowUseOriginalMessage) {
-            // special for JmsMessage as it can cause it to loose headers 
later.
-            if 
(exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage"))
 {
-                this.originalInMessage = new DefaultMessage(context);
-                this.originalInMessage.setBody(exchange.getIn().getBody());
-                
this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders());
-            } else {
-                this.originalInMessage = exchange.getIn().copy();
-            }
-            // must preserve exchange on the original in message
-            if (this.originalInMessage instanceof MessageSupport) {
-                ((MessageSupport) 
this.originalInMessage).setExchange(exchange);
-            }
-        }
+    UnitOfWork newInstance(Exchange exchange) {
+        return new DefaultUnitOfWork(exchange, inflightRepository, 
allowUseOriginalMessage, useBreadcrumb);
+    }
 
-        // inject breadcrumb header if enabled
-        if (useBreadcrumb) {
-            // create or use existing breadcrumb
-            String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
-            if (breadcrumbId == null) {
-                // no existing breadcrumb, so create a new one based on the 
exchange id
-                breadcrumbId = exchange.getExchangeId();
-                exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, 
breadcrumbId);
+    @Override
+    public void onExchange(Exchange exchange) {
+        if (this.exchange == null) {
+            // unit of work is reused, so setup for this exchange
+            this.exchange = exchange;
+
+            if (allowUseOriginalMessage) {
+                // special for JmsMessage as it can cause it to loose headers 
later.
+                if 
(exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage"))
 {
+                    this.originalInMessage = new DefaultMessage(context);
+                    this.originalInMessage.setBody(exchange.getIn().getBody());
+                    
this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders());
+                } else {
+                    this.originalInMessage = exchange.getIn().copy();
+                }
+                // must preserve exchange on the original in message
+                if (this.originalInMessage instanceof MessageSupport) {
+                    ((MessageSupport) 
this.originalInMessage).setExchange(exchange);
+                }
             }
-        }
 
-        // fire event
-        if (context.isEventNotificationApplicable()) {
-            try {
-                EventHelper.notifyExchangeCreated(context, exchange);
-            } catch (Throwable e) {
-                // must catch exceptions to ensure the exchange is not failing 
due to notification event failed
-                log.warn("Exception occurred during event notification. This 
exception will be ignored.", e);
+            // inject breadcrumb header if enabled
+            if (useBreadcrumb) {
+                // create or use existing breadcrumb
+                String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
+                if (breadcrumbId == null) {
+                    // no existing breadcrumb, so create a new one based on 
the exchange id
+                    breadcrumbId = exchange.getExchangeId();
+                    exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, 
breadcrumbId);
+                }
             }
-        }
 
-        // register to inflight registry
-        inflightRepository.add(exchange);
-    }
+            // fire event
+            if (context.isEventNotificationApplicable()) {
+                try {
+                    EventHelper.notifyExchangeCreated(context, exchange);
+                } catch (Throwable e) {
+                    // must catch exceptions to ensure the exchange is not 
failing due to notification event failed
+                    log.warn("Exception occurred during event notification. 
This exception will be ignored.", e);
+                }
+            }
 
-    UnitOfWork newInstance(Exchange exchange) {
-        return new DefaultUnitOfWork(exchange, inflightRepository, 
allowUseOriginalMessage, useBreadcrumb);
+            // register to inflight registry
+            inflightRepository.add(exchange);
+        }
     }
 
     @Override
     public void reset() {
+        this.exchange = null;
         routes.clear();
         if (synchronizations != null) {
             synchronizations.clear();
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index f6229a2..d111302 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -202,6 +202,12 @@ public class MDCUnitOfWork extends DefaultUnitOfWork {
     }
 
     @Override
+    public void reset() {
+        super.reset();
+        clear();
+    }
+
+    @Override
     public String toString() {
         return "MDCUnitOfWork";
     }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index ad238b2..808bfb6 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -53,7 +53,7 @@ public final class DefaultExchange implements 
ExtendedExchange {
     private Exception exception;
     private String exchangeId;
     private UnitOfWork unitOfWork;
-    private ExchangePattern originalPattern;
+    private final ExchangePattern originalPattern;
     private ExchangePattern pattern;
     private Endpoint fromEndpoint;
     private String fromRouteId;
@@ -125,16 +125,20 @@ public final class DefaultExchange implements 
ExtendedExchange {
         this.properties.clear();
         this.exchangeId = null;
         this.created = 0;
+        // TODO: optimize in/out to keep as default message (if original 
message is this kind)
         this.in = null;
         this.out = null;
         this.exception = null;
-        this.unitOfWork = null;
+        // reset uow
+        if (this.unitOfWork != null) {
+            this.unitOfWork.reset();
+        }
         // reset pattern to original
         this.pattern = originalPattern;
-        // do not reset endpoint/fromRouteId as it would be the same 
consumer/endpoint again
         if (this.onCompletions != null) {
             this.onCompletions.clear();
         }
+        // do not reset endpoint/fromRouteId as it would be the same 
consumer/endpoint again
         this.externalRedelivered = null;
         this.historyNodeId = null;
         this.historyNodeLabel = null;

Reply via email to