This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 865f05a1500b9a3890bcbe5fc1cea9cef58d578f
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Jan 31 11:26:15 2025 +0100

    (chores) camel-core: refactor large methods to help inlining
---
 .../org/apache/camel/processor/LoopProcessor.java  |  26 ++--
 .../org/apache/camel/processor/SendProcessor.java  | 172 +++++++++++----------
 .../apache/camel/processor/ThreadsProcessor.java   |  24 +--
 .../errorhandler/RedeliveryErrorHandler.java       |  84 +++++-----
 4 files changed, 166 insertions(+), 140 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 776ef59184e..e4e659a6026 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -170,21 +170,25 @@ public class LoopProcessor extends DelegateAsyncProcessor 
implements Traceable,
                     callback.done(false);
                 }
             } catch (Exception e) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Processing failed for exchangeId: {} >>> {}", 
exchange.getExchangeId(), e.getMessage());
-                }
-                if (expression != null) {
-                    // if we should stop due to an exception etc, then make 
sure to dec task count
-                    int gap = count - index;
-                    while (gap-- > 0) {
-                        taskCount.decrement();
-                    }
-                }
-                exchange.setException(e);
+                handleException(e);
                 callback.done(false);
             }
         }
 
+        private void handleException(Exception e) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Processing failed for exchangeId: {} >>> {}", 
exchange.getExchangeId(), e.getMessage());
+            }
+            if (expression != null) {
+                // if we should stop due to an exception etc, then make sure 
to dec task count
+                int gap = count - index;
+                while (gap-- > 0) {
+                    taskCount.decrement();
+                }
+            }
+            exchange.setException(e);
+        }
+
         @Override
         public String toString() {
             return "LoopState";
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
index 37dd575fccd..77d56b66b7c 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -154,76 +154,94 @@ public class SendProcessor extends AsyncProcessorSupport 
implements Traceable, E
 
         // if we have a producer then use that as its optimized
         if (producer != null) {
-            final Exchange target = exchange;
+            return sendUsingProducer(exchange, callback, existingPattern, 
originalBody, originalHeaders);
+        } else {
             // we can send with a different MEP pattern
-            if (destinationExchangePattern != null || pattern != null) {
-                target.setPattern(destinationExchangePattern != null ? 
destinationExchangePattern : pattern);
-            }
-            // set property which endpoint we send to
-            exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, 
destination.getEndpointUri());
-
-            final boolean sending = 
camelContext.getCamelContextExtension().isEventNotificationApplicable()
-                    && 
EventHelper.notifyExchangeSending(exchange.getContext(), target, destination);
-            // record timing for sending the exchange using the producer
-            StopWatch watch;
-            if (sending) {
-                watch = new StopWatch();
-            } else {
-                watch = null;
-            }
+            return sendUsingPattern(exchange, callback, existingPattern, 
originalBody, originalHeaders);
+        }
+    }
 
-            // optimize to only create a new callback if really needed, 
otherwise we can use the provided callback as-is
-            AsyncCallback ac = callback;
-            boolean newCallback = watch != null || existingPattern != 
target.getPattern() || variableReceive != null;
-            if (newCallback) {
-                ac = doneSync -> {
-                    try {
-                        // result should be stored in variable instead of 
message body/headers
-                        if (ExchangeHelper.shouldSetVariableResult(target, 
variableReceive)) {
-                            
ExchangeHelper.setVariableFromMessageBodyAndHeaders(target, variableReceive,
-                                    target.getMessage());
-                            target.getMessage().setBody(originalBody);
-                            target.getMessage().setHeaders(originalHeaders);
-                        }
-                        // restore previous MEP
-                        target.setPattern(existingPattern);
-                        // emit event that the exchange was sent to the 
endpoint
-                        if (watch != null) {
-                            long timeTaken = watch.taken();
-                            
EventHelper.notifyExchangeSent(target.getContext(), target, destination, 
timeTaken);
-                        }
-                    } finally {
-                        callback.done(doneSync);
-                    }
-                };
-            }
-            try {
-                // replace message body with variable
-                if (variableSend != null) {
-                    Object value = ExchangeHelper.getVariable(exchange, 
variableSend);
-                    exchange.getMessage().setBody(value);
-                }
+    private boolean sendUsingPattern(
+            Exchange exchange, AsyncCallback callback, ExchangePattern 
existingPattern, Object originalBody,
+            Map<String, Object> originalHeaders) {
+        if (destinationExchangePattern != null || pattern != null) {
+            exchange.setPattern(destinationExchangePattern != null ? 
destinationExchangePattern : pattern);
+        }
+        // set property which endpoint we send to
+        exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, 
destination.getEndpointUri());
 
-                LOG.debug(">>>> {} {}", destination, exchange);
-                boolean sync = producer.process(exchange, ac);
-                if (!sync) {
-                    
EventHelper.notifyExchangeAsyncProcessingStartedEvent(camelContext, exchange);
-                }
-                return sync;
-            } catch (Exception throwable) {
-                exchange.setException(throwable);
-                callback.done(true);
-            }
+        // replace message body with variable
+        if (variableSend != null) {
+            Object value = ExchangeHelper.getVariable(exchange, variableSend);
+            exchange.getMessage().setBody(value);
+        }
 
-            return true;
+        LOG.debug(">>>> {} {}", destination, exchange);
+
+        // send the exchange to the destination using the producer cache for 
the non optimized producers
+        return producerCache.doInAsyncProducer(destination, exchange, callback,
+                (producer, ex, cb) -> producer.process(ex, doneSync -> {
+                    // restore previous MEP
+                    exchange.setPattern(existingPattern);
+                    // result should be stored in variable instead of message 
body/headers
+                    if (ExchangeHelper.shouldSetVariableResult(exchange, 
variableReceive)) {
+                        
ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
+                                exchange.getMessage());
+                        exchange.getMessage().setBody(originalBody);
+                        exchange.getMessage().setHeaders(originalHeaders);
+                    }
+                    // signal we are done
+                    cb.done(doneSync);
+                }));
+    }
+
+    private boolean sendUsingProducer(
+            Exchange exchange, AsyncCallback callback, ExchangePattern 
existingPattern, Object originalBody,
+            Map<String, Object> originalHeaders) {
+        final Exchange target = exchange;
+        // we can send with a different MEP pattern
+        if (destinationExchangePattern != null || pattern != null) {
+            target.setPattern(destinationExchangePattern != null ? 
destinationExchangePattern : pattern);
+        }
+        // set property which endpoint we send to
+        exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, 
destination.getEndpointUri());
+
+        final boolean sending = 
camelContext.getCamelContextExtension().isEventNotificationApplicable()
+                && EventHelper.notifyExchangeSending(exchange.getContext(), 
target, destination);
+        // record timing for sending the exchange using the producer
+        StopWatch watch;
+        if (sending) {
+            watch = new StopWatch();
         } else {
-            // we can send with a different MEP pattern
-            if (destinationExchangePattern != null || pattern != null) {
-                exchange.setPattern(destinationExchangePattern != null ? 
destinationExchangePattern : pattern);
-            }
-            // set property which endpoint we send to
-            exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, 
destination.getEndpointUri());
+            watch = null;
+        }
 
+        // optimize to only create a new callback if really needed, otherwise 
we can use the provided callback as-is
+        AsyncCallback ac = callback;
+        boolean newCallback = watch != null || existingPattern != 
target.getPattern() || variableReceive != null;
+        if (newCallback) {
+            ac = doneSync -> {
+                try {
+                    // result should be stored in variable instead of message 
body/headers
+                    if (ExchangeHelper.shouldSetVariableResult(target, 
variableReceive)) {
+                        
ExchangeHelper.setVariableFromMessageBodyAndHeaders(target, variableReceive,
+                                target.getMessage());
+                        target.getMessage().setBody(originalBody);
+                        target.getMessage().setHeaders(originalHeaders);
+                    }
+                    // restore previous MEP
+                    target.setPattern(existingPattern);
+                    // emit event that the exchange was sent to the endpoint
+                    if (watch != null) {
+                        long timeTaken = watch.taken();
+                        EventHelper.notifyExchangeSent(target.getContext(), 
target, destination, timeTaken);
+                    }
+                } finally {
+                    callback.done(doneSync);
+                }
+            };
+        }
+        try {
             // replace message body with variable
             if (variableSend != null) {
                 Object value = ExchangeHelper.getVariable(exchange, 
variableSend);
@@ -231,23 +249,17 @@ public class SendProcessor extends AsyncProcessorSupport 
implements Traceable, E
             }
 
             LOG.debug(">>>> {} {}", destination, exchange);
-
-            // send the exchange to the destination using the producer cache 
for the non optimized producers
-            return producerCache.doInAsyncProducer(destination, exchange, 
callback,
-                    (producer, ex, cb) -> producer.process(ex, doneSync -> {
-                        // restore previous MEP
-                        exchange.setPattern(existingPattern);
-                        // result should be stored in variable instead of 
message body/headers
-                        if (ExchangeHelper.shouldSetVariableResult(exchange, 
variableReceive)) {
-                            
ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive,
-                                    exchange.getMessage());
-                            exchange.getMessage().setBody(originalBody);
-                            exchange.getMessage().setHeaders(originalHeaders);
-                        }
-                        // signal we are done
-                        cb.done(doneSync);
-                    }));
+            boolean sync = producer.process(exchange, ac);
+            if (!sync) {
+                
EventHelper.notifyExchangeAsyncProcessingStartedEvent(camelContext, exchange);
+            }
+            return sync;
+        } catch (Exception throwable) {
+            exchange.setException(throwable);
+            callback.done(true);
         }
+
+        return true;
     }
 
     public String getVariableSend() {
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
index b9e2ddaa63b..48a0f109cba 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
@@ -133,16 +133,20 @@ public class ThreadsProcessor extends 
AsyncProcessorSupport implements IdAware,
             // tell Camel routing engine we continue routing asynchronous
             return false;
         } catch (Exception e) {
-            if (executorService instanceof ThreadPoolExecutor tpe) {
-                // process the call in synchronous mode
-                ProcessCall call = new ProcessCall(exchange, callback, true);
-                
rejectedPolicy.asRejectedExecutionHandler().rejectedExecution(call, tpe);
-                return true;
-            } else {
-                exchange.setException(e);
-                callback.done(true);
-                return true;
-            }
+            return handleException(exchange, callback, e);
+        }
+    }
+
+    private boolean handleException(Exchange exchange, AsyncCallback callback, 
Exception e) {
+        if (executorService instanceof ThreadPoolExecutor tpe) {
+            // process the call in synchronous mode
+            ProcessCall call = new ProcessCall(exchange, callback, true);
+            
rejectedPolicy.asRejectedExecutionHandler().rejectedExecution(call, tpe);
+            return true;
+        } else {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
     }
 
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 164958fb736..66f2af4edb4 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -756,48 +756,11 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
                     // okay there is a delay so create a scheduled task to 
have it executed in the future
 
                     if (currentRedeliveryPolicy.isAsyncDelayedRedelivery() && 
!exchange.isTransacted()) {
-
-                        // we are doing a redelivery then a thread pool must 
be configured (see the doStart method)
-                        ObjectHelper.notNull(executorService,
-                                "Redelivery is enabled but ExecutorService has 
not been configured.", this);
-
-                        // schedule the redelivery task
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Scheduling redelivery task to run in {} 
millis for exchangeId: {}", redeliveryDelay,
-                                    exchange.getExchangeId());
-                        }
-                        executorService.schedule(() -> 
reactiveExecutor.schedule(this::redeliver), redeliveryDelay,
-                                TimeUnit.MILLISECONDS);
-
+                        runAsynchronousRedelivery();
                     } else {
                         // async delayed redelivery was disabled or we are 
transacted so we must be synchronous
                         // as the transaction manager requires to execute in 
the same thread context
-                        try {
-                            // we are doing synchronous redelivery and use 
thread sleep, so we keep track using a counter how many are sleeping
-                            redeliverySleepCounter.incrementAndGet();
-                            boolean complete = sleep();
-                            redeliverySleepCounter.decrementAndGet();
-                            if (!complete) {
-                                // the task was rejected
-                                exchange.setException(new 
RejectedExecutionException("Redelivery not allowed while stopping"));
-                                // mark the exchange as redelivery exhausted 
so the failure processor / dead letter channel can process the exchange
-                                
exchange.getExchangeExtension().setRedeliveryExhausted(true);
-                                // jump to start of loop which then detects 
that we are failed and exhausted
-                                reactiveExecutor.schedule(this);
-                            } else {
-                                reactiveExecutor.schedule(this::redeliver);
-                            }
-                        } catch (InterruptedException e) {
-                            redeliverySleepCounter.decrementAndGet();
-                            // we was interrupted so break out
-                            exchange.setException(e);
-                            // mark the exchange to stop continue routing when 
interrupted
-                            // as we do not want to continue routing (for 
example a task has been cancelled)
-                            exchange.setRouteStop(true);
-                            reactiveExecutor.schedule(callback);
-
-                            Thread.currentThread().interrupt();
-                        }
+                        runSynchronousRedelivery();
                     }
                 } else {
                     // execute the task immediately
@@ -819,6 +782,49 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
             }
         }
 
+        private void runAsynchronousRedelivery() {
+            // we are doing a redelivery then a thread pool must be configured 
(see the doStart method)
+            ObjectHelper.notNull(executorService,
+                    "Redelivery is enabled but ExecutorService has not been 
configured.", this);
+
+            // schedule the redelivery task
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Scheduling redelivery task to run in {} millis for 
exchangeId: {}", redeliveryDelay,
+                        exchange.getExchangeId());
+            }
+            executorService.schedule(() -> 
reactiveExecutor.schedule(this::redeliver), redeliveryDelay,
+                    TimeUnit.MILLISECONDS);
+        }
+
+        private void runSynchronousRedelivery() {
+            try {
+                // we are doing synchronous redelivery and use thread sleep, 
so we keep track using a counter how many are sleeping
+                redeliverySleepCounter.incrementAndGet();
+                boolean complete = sleep();
+                redeliverySleepCounter.decrementAndGet();
+                if (!complete) {
+                    // the task was rejected
+                    exchange.setException(new 
RejectedExecutionException("Redelivery not allowed while stopping"));
+                    // mark the exchange as redelivery exhausted so the 
failure processor / dead letter channel can process the exchange
+                    
exchange.getExchangeExtension().setRedeliveryExhausted(true);
+                    // jump to start of loop which then detects that we are 
failed and exhausted
+                    reactiveExecutor.schedule(this);
+                } else {
+                    reactiveExecutor.schedule(this::redeliver);
+                }
+            } catch (InterruptedException e) {
+                redeliverySleepCounter.decrementAndGet();
+                // we was interrupted so break out
+                exchange.setException(e);
+                // mark the exchange to stop continue routing when interrupted
+                // as we do not want to continue routing (for example a task 
has been cancelled)
+                exchange.setRouteStop(true);
+                reactiveExecutor.schedule(callback);
+
+                Thread.currentThread().interrupt();
+            }
+        }
+
         protected boolean isRunAllowed() {
             // if camel context is forcing a shutdown then do not allow running
             if (shutdownStrategy.isForceShutdown()) {

Reply via email to