Hello,
Here is a small test case. I am not familiar with camel testing, I hope my
test is correct.
I try to send 100 messages to a tcp endpoint and forward it to another tcp
endpoint. I don't know how to intercept the error, but if you turn the
logging to DEBUG, you have this error:
[ New I/O worker #9] ServerChannelHandler DEBUG
Incoming message: 99
[ New I/O worker #9] SendProcessor DEBUG >>>>
Endpoint[mock://incoming] Exchange[Message: 99]
[ New I/O worker #9] MockEndpoint DEBUG
mock://incoming >>>> 99 : Exchange[Message: 99] with body: 99 and
headers:{breadcrumbId=ID-jack-5708-1344355497315-0-696,
CamelNettyRemoteAddress=/127.0.0.1:5713,
CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@c736e4,
CamelNettyMessageEvent=[id: 0x0058cca9, /127.0.0.1:5713 => /127.0.0.1:6520]
RECEIVED: 99}
[ New I/O worker #9] SendProcessor DEBUG >>>>
Endpoint[tcp://localhost:233] Exchange[Message: 99]
[ New I/O worker #9] NettyProducer DEBUG
Writing body: 99
[ New I/O worker #41] ClientChannelHandler DEBUG
Closing channel as an exception was thrown from Netty
java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:71)[:1.6.0_26]
at
java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:209)[:1.6.0_26]
at
org.apache.camel.impl.DefaultServicePool.release(DefaultServicePool.java:96)[camel-core-2.10.0.jar:2.10.0]
at
org.apache.camel.impl.ProducerCache$1.done(ProducerCache.java:304)[camel-core-2.10.0.jar:2.10.0]
at
org.apache.camel.processor.SendProcessor$2$1.done(SendProcessor.java:125)[camel-core-2.10.0.jar:2.10.0]
at
org.apache.camel.component.netty.handlers.ClientChannelHandler.messageReceived(ClientChannelHandler.java:162)[camel-netty-2.10.0.jar:2.10.0]
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)[netty-3.5.1.Final.jar:]
at
org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:71)[netty-3.5.1.Final.jar:]
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)[netty-3.5.1.Final.jar:]
at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:423)[netty-3.5.1.Final.jar:]
at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:404)[netty-3.5.1.Final.jar:]
at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:268)[netty-3.5.1.Final.jar:]
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)[netty-3.5.1.Final.jar:]
at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)[netty-3.5.1.Final.jar:]
at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:91)[netty-3.5.1.Final.jar:]
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:373)[netty-3.5.1.Final.jar:]
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:247)[netty-3.5.1.Final.jar:]
at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)[netty-3.5.1.Final.jar:]
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_26]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_26]
at java.lang.Thread.run(Thread.java:662)[:1.6.0_26]
I noted that if I replace <code>callback.done(false);</code> by
<code>callback.done(true);</code>, the error disappears.
package test;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
public class NettyTest extends CamelTestSupport {
@Test
public void testConcurrentProducers() throws Exception {
doSendMessages(100, 2);
}
private void doSendMessages(int files, int poolSize) throws Exception {
getMockEndpoint("mock:forward").expectedMessageCount(100);
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
Map<Integer, Future<String>> responses = new
ConcurrentHashMap<Integer, Future<String>>();
for (int i = 0; i < files; i++) {
final int index = i;
Future<String> out = executor.submit(new Callable<String>() {
public String call() throws Exception {
return
template.requestBody("netty:tcp://localhost:6520?sync=true&textline=true",
index , String.class);
}
});
responses.put(index, out);
}
assertMockEndpointsSatisfied();
executor.shutdownNow();
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("netty:tcp://localhost:6520?sync=true&textline=true")
.to("mock:incoming")
.to("netty:tcp://localhost:233?textline=true");
from("netty:tcp://localhost:233?textline=true")
.process(new AsyncProcessor() {
@Override
public void process(Exchange
exchange) throws Exception {
}
@Override
public boolean process(Exchange
exchange, AsyncCallback callback) {
String body =
exchange.getIn().getBody(String.class);
exchange.getOut().setBody("ackOfForward::" + body);
callback.done(false);
return true;
}
})
.to("mock:forward");
}
};
}
}
Could you help me to understand?
Thank you.
Regards.
Suray Philippe
--
View this message in context:
http://camel.465427.n5.nabble.com/Error-in-DefaultServicePool-on-Netty-Producer-Queue-is-full-tp5716882p5716932.html
Sent from the Camel - Users mailing list archive at Nabble.com.