This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new fdf54e3 CAMEL-15195: Camel-netty - RequestTimeout seems not working
as expected (#3925)
fdf54e3 is described below
commit fdf54e38f5676dc638c550ddb00d1ee8cd5692eb
Author: Amos Feng <[email protected]>
AuthorDate: Wed Jun 17 18:43:01 2020 +0800
CAMEL-15195: Camel-netty - RequestTimeout seems not working as expected
(#3925)
* add back the codes which removes the "timeout" handler after receiving
the response
* add the "timeout" handler when activating from the connection pool
---
.../apache/camel/component/netty/NettyProducer.java | 21 ++++++++++++++++++---
.../netty/handlers/ClientChannelHandler.java | 7 +++++++
2 files changed, 25 insertions(+), 3 deletions(-)
diff --git
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index 51143e1..f915b82 100644
---
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -109,14 +109,14 @@ public class NettyProducer extends DefaultAsyncProducer {
config.timeBetweenEvictionRunsMillis = 30 * 1000L;
config.minEvictableIdleTimeMillis =
configuration.getProducerPoolMinEvictableIdle();
config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
- pool = new GenericObjectPool<>(new
NettyProducerPoolableObjectFactory(), config);
+ pool = new GenericObjectPool<>(new
NettyProducerPoolableObjectFactory(this), config);
if (LOG.isDebugEnabled()) {
LOG.debug("Created NettyProducer pool[maxActive={},
minIdle={}, maxIdle={}, minEvictableIdleTimeMillis={}] -> {}",
config.maxActive, config.minIdle, config.maxIdle,
config.minEvictableIdleTimeMillis, pool);
}
} else {
- pool = new SharedSingletonObjectPool<>(new
NettyProducerPoolableObjectFactory());
+ pool = new SharedSingletonObjectPool<>(new
NettyProducerPoolableObjectFactory(this));
if (LOG.isDebugEnabled()) {
LOG.debug("Created NettyProducer shared singleton pool -> {}",
pool);
}
@@ -555,6 +555,11 @@ public class NettyProducer extends DefaultAsyncProducer {
* Object factory to create {@link Channel} used by the pool.
*/
private final class NettyProducerPoolableObjectFactory implements
PoolableObjectFactory<ChannelFuture> {
+ private NettyProducer producer;
+
+ public NettyProducerPoolableObjectFactory(NettyProducer producer) {
+ this.producer = producer;
+ }
@Override
public ChannelFuture makeObject() throws Exception {
@@ -603,8 +608,18 @@ public class NettyProducer extends DefaultAsyncProducer {
@Override
public void activateObject(ChannelFuture channelFuture) {
- // noop
LOG.trace("activateObject channel request: {}", channelFuture);
+
+ if (channelFuture.isSuccess() &&
producer.getConfiguration().getRequestTimeout() > 0) {
+ LOG.trace("reset the request timeout as we activate the
channel");
+ Channel channel = channelFuture.channel();
+
+ ChannelHandler handler = channel.pipeline().get("timeout");
+ if (handler == null) {
+ ChannelHandler timeout = new
ReadTimeoutHandler(producer.getConfiguration().getRequestTimeout(),
TimeUnit.MILLISECONDS);
+ channel.pipeline().addBefore("handler", "timeout",
timeout);
+ }
+ }
}
@Override
diff --git
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
index a2bbb21..1947e6e 100644
---
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
+++
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.netty.handlers;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.camel.AsyncCallback;
@@ -148,6 +149,12 @@ public class ClientChannelHandler extends
SimpleChannelInboundHandler<Object> {
LOG.trace("Message received: {}", msg);
}
+ ChannelHandler handler = ctx.pipeline().get("timeout");
+ if (handler != null) {
+ LOG.trace("Removing timeout channel as we received message");
+ ctx.pipeline().remove(handler);
+ }
+
NettyCamelState state = getState(ctx, msg);
Exchange exchange = state != null ? state.getExchange() : null;
if (exchange == null) {