This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.10.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.10.x by this push:
     new 487f59c436d CAMEL-22278: Improve rabbitmq producer performance (#18738)
487f59c436d is described below

commit 487f59c436dac16bf1984ec80e2c3887603fae00
Author: Federico Mariani <[email protected]>
AuthorDate: Sun Jul 27 11:32:28 2025 +0200

    CAMEL-22278: Improve rabbitmq producer performance (#18738)
---
 .../springrabbit/SpringRabbitMQProducer.java       | 17 ++--
 .../integration/RabbitMQProducerIT.java            | 94 ++++++++++++++++++++++
 2 files changed, 104 insertions(+), 7 deletions(-)

diff --git 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
index 373073e770c..78d9bc13477 100644
--- 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
+++ 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
@@ -212,14 +212,17 @@ public class SpringRabbitMQProducer extends 
DefaultAsyncProducer {
         }
         final long timeout = getEndpoint().getConfirmTimeout() <= 0 ? 
Long.MAX_VALUE : getEndpoint().getConfirmTimeout();
         try {
-            Boolean sent = getInOnlyTemplate().invoke(t -> {
-                t.send(ex, rk, msg);
-                if (confirm) {
+            boolean sent;
+            if (confirm) {
+                sent = getInOnlyTemplate().invoke(t -> {
+                    t.send(ex, rk, msg);
                     return t.waitForConfirms(timeout);
-                } else {
-                    return true;
-                }
-            });
+                });
+            } else {
+                getInOnlyTemplate().send(ex, rk, msg);
+                sent = true;
+            }
+
             if (Boolean.FALSE == sent) {
                 exchange.setException(new TimeoutException("Message not sent 
within " + timeout + " millis"));
             }
diff --git 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
index d3ed4743268..8c3e8a4f49d 100644
--- 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
+++ 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
@@ -18,12 +18,20 @@ package org.apache.camel.component.springrabbit.integration;
 
 import java.nio.charset.Charset;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.springrabbit.SpringRabbitMQConstants;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.amqp.core.AmqpAdmin;
 import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.amqp.core.BindingBuilder;
@@ -39,6 +47,92 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 
 public class RabbitMQProducerIT extends RabbitMQITSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQProducerIT.class);
+
+    @Disabled("Load test used to verify producer performance CAMEL-22278")
+    @Test
+    public void testProducerLoadAndReceive() throws Exception {
+        // --- 1. AMQP Setup ---
+        // Ensures the RabbitMQ queue, exchange, and binding exist before the 
test.
+        ConnectionFactory cf = 
context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);
+        Queue q = new Queue("myqueue");
+        TopicExchange t = new TopicExchange("foo");
+        AmqpAdmin admin = new RabbitAdmin(cf);
+        admin.declareQueue(q);
+        admin.declareExchange(t);
+        admin.declareBinding(BindingBuilder.bind(q).to(t).with("foo.bar.#"));
+
+        // --- 2. Test Configuration ---
+        final int numThreads = 10;
+        final int totalDurationSeconds = 10;
+        final int snapshotTimeSeconds = 5;
+
+        // --- 3. Concurrency Tools ---
+        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+        final AtomicInteger messageCounter = new AtomicInteger(0);
+        final AtomicBoolean running = new AtomicBoolean(true);
+
+        // --- 4. Define the Message Sending Task ---
+        Runnable senderTask = () -> {
+            while (running.get()) {
+                template.sendBody("direct:start", "Load test message #" + 
messageCounter.get());
+                messageCounter.incrementAndGet();
+            }
+        };
+
+        // --- 5. Start the Load Test ---
+        LOG.info("Starting load test with {} threads for {} seconds...", 
numThreads, totalDurationSeconds);
+        for (int i = 0; i < numThreads; i++) {
+            executor.submit(senderTask);
+        }
+
+        // --- 6. Wait and Count at 5 Seconds ---
+        Thread.sleep(TimeUnit.SECONDS.toMillis(snapshotTimeSeconds));
+
+        int countAfter5Seconds = messageCounter.get();
+        LOG.info(">>> Messages sent after {} seconds: {}", 
snapshotTimeSeconds, countAfter5Seconds);
+        Assertions.assertTrue(countAfter5Seconds > 0, "No messages were sent 
after 5 seconds.");
+
+        // --- 7. Wait for the Remaining Time ---
+        Thread.sleep(TimeUnit.SECONDS.toMillis(totalDurationSeconds - 
snapshotTimeSeconds));
+
+        // --- 8. Stop Threads and Gather Final Sent Count ---
+        LOG.info("Stopping threads...");
+        running.set(false);
+        executor.shutdown();
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+
+        int finalSentCount = messageCounter.get();
+        LOG.info(">>> Total messages sent in {} seconds: {}", 
totalDurationSeconds, finalSentCount);
+
+        // --- 9. NEW: Receive All Messages from the Queue ---
+        LOG.info("\n--- Draining and Verifying Queue Content ---");
+        AmqpTemplate rabbitConsumerTemplate = new RabbitTemplate(cf);
+
+        int receivedCount = 0;
+        long lastMessageTime = System.currentTimeMillis();
+        final long idleTimeoutMillis = 2000; // 2 seconds
+        while (true) {
+            Object receivedMessage = 
rabbitConsumerTemplate.receiveAndConvert("myqueue");
+
+            if (receivedMessage != null) {
+                // If we got a message, increment count and reset the idle 
timer.
+                receivedCount++;
+                lastMessageTime = System.currentTimeMillis();
+            } else {
+                // If we got no message, check if the idle timeout has been 
exceeded.
+                if (System.currentTimeMillis() - lastMessageTime > 
idleTimeoutMillis) {
+                    LOG.info(
+                            "No messages received for " + (idleTimeoutMillis / 
1000) + " seconds. Exiting consumer loop.");
+                    break; // Exit the loop
+                }
+            }
+        }
+        LOG.info("Total messages received from queue: " + receivedCount);
+
+        // --- 10. Final Verification ---
+        Assertions.assertEquals(finalSentCount, receivedCount, "The number of 
sent and received messages should match.");
+    }
 
     @Test
     public void testProducer() throws Exception {

Reply via email to