This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new fb82a00 CAMEL-16619: camel-rabbitmq - Producer destroys rabbit
channels when returns it back to the pool (#5584)
fb82a00 is described below
commit fb82a007637f070a556571889bbd35fa4eb25cb7
Author: Anton Ovcharenko <[email protected]>
AuthorDate: Wed May 19 07:18:36 2021 +0300
CAMEL-16619: camel-rabbitmq - Producer destroys rabbit channels when
returns it back to the pool (#5584)
- for RabbitMQProducer specify `GenericObjectPool._maxIdle` property of
channel pool with `RabbitMQEndpoint.channelPoolMaxSize` value
Co-authored-by: Anton Ovcharenko <[email protected]>
---
.../camel/component/rabbitmq/RabbitMQProducer.java | 14 +++++++++--
.../component/rabbitmq/RabbitMQProducerTest.java | 27 ++++++++++++++++++----
2 files changed, 35 insertions(+), 6 deletions(-)
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 1d44015..f9cb2bd 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -109,10 +109,20 @@ public class RabbitMQProducer extends
DefaultAsyncProducer {
LOG.debug("Created connection: {}", conn);
LOG.trace("Creating channel pool...");
+ int channelPoolMaxSize = getEndpoint().getChannelPoolMaxSize();
channelPool = new GenericObjectPool<>(
- new PoolableChannelFactory(this.conn),
getEndpoint().getChannelPoolMaxSize(),
+ new PoolableChannelFactory(this.conn),
+ channelPoolMaxSize,
GenericObjectPool.WHEN_EXHAUSTED_BLOCK,
- getEndpoint().getChannelPoolMaxWait());
+ getEndpoint().getChannelPoolMaxWait(),
+ channelPoolMaxSize,
+ GenericObjectPool.DEFAULT_MIN_IDLE,
+ GenericObjectPool.DEFAULT_TEST_ON_BORROW,
+ GenericObjectPool.DEFAULT_TEST_ON_RETURN,
+ GenericObjectPool.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS,
+ GenericObjectPool.DEFAULT_NUM_TESTS_PER_EVICTION_RUN,
+ GenericObjectPool.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+ GenericObjectPool.DEFAULT_TEST_WHILE_IDLE);
attemptDeclaration();
}
diff --git
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
index 0bf14df..0751bd1 100644
---
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
+++
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
@@ -26,24 +26,27 @@ import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Channel;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.DefaultMessage;
+import org.apache.camel.util.ReflectionHelper;
+import org.apache.commons.pool.impl.GenericObjectPool;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
public class RabbitMQProducerTest {
+ private CamelContext context = new DefaultCamelContext();
private RabbitMQEndpoint endpoint = Mockito.mock(RabbitMQEndpoint.class);
private Exchange exchange = Mockito.mock(Exchange.class);
- private Message message = new DefaultMessage(new DefaultCamelContext());
+ private Message message = new DefaultMessage(context);
private Connection conn = Mockito.mock(Connection.class);
@BeforeEach
@@ -55,6 +58,7 @@ public class RabbitMQProducerTest {
Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn);
Mockito.when(conn.createChannel()).thenReturn(null);
Mockito.when(endpoint.getMessageConverter()).thenReturn(converter);
+ Mockito.when(endpoint.getCamelContext()).thenReturn(context);
}
@Test
@@ -202,6 +206,21 @@ public class RabbitMQProducerTest {
assertNull(props.getHeaders().get("invalidHeader"));
}
+ @Test
+ public void testChannelPoolConfiguration() throws Exception {
+ RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+ Mockito.when(endpoint.getChannelPoolMaxSize()).thenReturn(123);
+ Mockito.when(endpoint.getChannelPoolMaxWait()).thenReturn(321L);
+ producer.doStart();
+ Object channelPool =
ReflectionHelper.getField(producer.getClass().getDeclaredField("channelPool"),
producer);
+ assertNotNull(channelPool);
+ assertTrue(channelPool instanceof GenericObjectPool);
+ GenericObjectPool<Channel> genericObjectPool =
(GenericObjectPool<Channel>) channelPool;
+ assertEquals(123, genericObjectPool.getMaxActive());
+ assertEquals(123, genericObjectPool.getMaxIdle());
+ assertEquals(321L, genericObjectPool.getMaxWait());
+ }
+
private static class Something {
}
}