This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.10.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.10.x by this push:
new 487f59c436d CAMEL-22278: Improve rabbitmq producer performance (#18738)
487f59c436d is described below
commit 487f59c436dac16bf1984ec80e2c3887603fae00
Author: Federico Mariani <[email protected]>
AuthorDate: Sun Jul 27 11:32:28 2025 +0200
CAMEL-22278: Improve rabbitmq producer performance (#18738)
---
.../springrabbit/SpringRabbitMQProducer.java | 17 ++--
.../integration/RabbitMQProducerIT.java | 94 ++++++++++++++++++++++
2 files changed, 104 insertions(+), 7 deletions(-)
diff --git
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
index 373073e770c..78d9bc13477 100644
---
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
+++
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQProducer.java
@@ -212,14 +212,17 @@ public class SpringRabbitMQProducer extends
DefaultAsyncProducer {
}
final long timeout = getEndpoint().getConfirmTimeout() <= 0 ?
Long.MAX_VALUE : getEndpoint().getConfirmTimeout();
try {
- Boolean sent = getInOnlyTemplate().invoke(t -> {
- t.send(ex, rk, msg);
- if (confirm) {
+ boolean sent;
+ if (confirm) {
+ sent = getInOnlyTemplate().invoke(t -> {
+ t.send(ex, rk, msg);
return t.waitForConfirms(timeout);
- } else {
- return true;
- }
- });
+ });
+ } else {
+ getInOnlyTemplate().send(ex, rk, msg);
+ sent = true;
+ }
+
if (Boolean.FALSE == sent) {
exchange.setException(new TimeoutException("Message not sent
within " + timeout + " millis"));
}
diff --git
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
index d3ed4743268..8c3e8a4f49d 100644
---
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
+++
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
@@ -18,12 +18,20 @@ package org.apache.camel.component.springrabbit.integration;
import java.nio.charset.Charset;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.springrabbit.SpringRabbitMQConstants;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.BindingBuilder;
@@ -39,6 +47,92 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class RabbitMQProducerIT extends RabbitMQITSupport {
+ private static final Logger LOG =
LoggerFactory.getLogger(RabbitMQProducerIT.class);
+
+ @Disabled("Load test used to verify producer performance CAMEL-22278")
+ @Test
+ public void testProducerLoadAndReceive() throws Exception {
+ // --- 1. AMQP Setup ---
+ // Ensures the RabbitMQ queue, exchange, and binding exist before the
test.
+ ConnectionFactory cf =
context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);
+ Queue q = new Queue("myqueue");
+ TopicExchange t = new TopicExchange("foo");
+ AmqpAdmin admin = new RabbitAdmin(cf);
+ admin.declareQueue(q);
+ admin.declareExchange(t);
+ admin.declareBinding(BindingBuilder.bind(q).to(t).with("foo.bar.#"));
+
+ // --- 2. Test Configuration ---
+ final int numThreads = 10;
+ final int totalDurationSeconds = 10;
+ final int snapshotTimeSeconds = 5;
+
+ // --- 3. Concurrency Tools ---
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ final AtomicInteger messageCounter = new AtomicInteger(0);
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ // --- 4. Define the Message Sending Task ---
+ Runnable senderTask = () -> {
+ while (running.get()) {
+ template.sendBody("direct:start", "Load test message #" +
messageCounter.get());
+ messageCounter.incrementAndGet();
+ }
+ };
+
+ // --- 5. Start the Load Test ---
+ LOG.info("Starting load test with {} threads for {} seconds...",
numThreads, totalDurationSeconds);
+ for (int i = 0; i < numThreads; i++) {
+ executor.submit(senderTask);
+ }
+
+ // --- 6. Wait and Count at 5 Seconds ---
+ Thread.sleep(TimeUnit.SECONDS.toMillis(snapshotTimeSeconds));
+
+ int countAfter5Seconds = messageCounter.get();
+ LOG.info(">>> Messages sent after {} seconds: {}",
snapshotTimeSeconds, countAfter5Seconds);
+ Assertions.assertTrue(countAfter5Seconds > 0, "No messages were sent
after 5 seconds.");
+
+ // --- 7. Wait for the Remaining Time ---
+ Thread.sleep(TimeUnit.SECONDS.toMillis(totalDurationSeconds -
snapshotTimeSeconds));
+
+ // --- 8. Stop Threads and Gather Final Sent Count ---
+ LOG.info("Stopping threads...");
+ running.set(false);
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+
+ int finalSentCount = messageCounter.get();
+ LOG.info(">>> Total messages sent in {} seconds: {}",
totalDurationSeconds, finalSentCount);
+
+ // --- 9. NEW: Receive All Messages from the Queue ---
+ LOG.info("\n--- Draining and Verifying Queue Content ---");
+ AmqpTemplate rabbitConsumerTemplate = new RabbitTemplate(cf);
+
+ int receivedCount = 0;
+ long lastMessageTime = System.currentTimeMillis();
+ final long idleTimeoutMillis = 2000; // 2 seconds
+ while (true) {
+ Object receivedMessage =
rabbitConsumerTemplate.receiveAndConvert("myqueue");
+
+ if (receivedMessage != null) {
+ // If we got a message, increment count and reset the idle
timer.
+ receivedCount++;
+ lastMessageTime = System.currentTimeMillis();
+ } else {
+ // If we got no message, check if the idle timeout has been
exceeded.
+ if (System.currentTimeMillis() - lastMessageTime >
idleTimeoutMillis) {
+ LOG.info(
+ "No messages received for " + (idleTimeoutMillis /
1000) + " seconds. Exiting consumer loop.");
+ break; // Exit the loop
+ }
+ }
+ }
+ LOG.info("Total messages received from queue: " + receivedCount);
+
+ // --- 10. Final Verification ---
+ Assertions.assertEquals(finalSentCount, receivedCount, "The number of
sent and received messages should match.");
+ }
@Test
public void testProducer() throws Exception {