Hello,

Here is a small test case. I am not familiar with camel testing, I hope my
test is correct.
I try to send 100 messages to a tcp endpoint and forward it to another tcp
endpoint. I don't know how to intercept the error, but if you turn the
logging to DEBUG, you have this error:

[            New I/O  worker #9] ServerChannelHandler           DEBUG
Incoming message: 99
[            New I/O  worker #9] SendProcessor                  DEBUG >>>>
Endpoint[mock://incoming] Exchange[Message: 99]
[            New I/O  worker #9] MockEndpoint                   DEBUG
mock://incoming >>>> 99 : Exchange[Message: 99] with body: 99 and
headers:{breadcrumbId=ID-jack-5708-1344355497315-0-696,
CamelNettyRemoteAddress=/127.0.0.1:5713,
CamelNettyChannelHandlerContext=org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext@c736e4,
CamelNettyMessageEvent=[id: 0x0058cca9, /127.0.0.1:5713 => /127.0.0.1:6520]
RECEIVED: 99}
[            New I/O  worker #9] SendProcessor                  DEBUG >>>>
Endpoint[tcp://localhost:233] Exchange[Message: 99]
[            New I/O  worker #9] NettyProducer                  DEBUG
Writing body: 99

[           New I/O  worker #41] ClientChannelHandler           DEBUG
Closing channel as an exception was thrown from Netty
java.lang.IllegalStateException: Queue full
        at java.util.AbstractQueue.add(AbstractQueue.java:71)[:1.6.0_26]
        at
java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:209)[:1.6.0_26]
        at
org.apache.camel.impl.DefaultServicePool.release(DefaultServicePool.java:96)[camel-core-2.10.0.jar:2.10.0]
        at
org.apache.camel.impl.ProducerCache$1.done(ProducerCache.java:304)[camel-core-2.10.0.jar:2.10.0]
        at
org.apache.camel.processor.SendProcessor$2$1.done(SendProcessor.java:125)[camel-core-2.10.0.jar:2.10.0]
        at
org.apache.camel.component.netty.handlers.ClientChannelHandler.messageReceived(ClientChannelHandler.java:162)[camel-netty-2.10.0.jar:2.10.0]
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)[netty-3.5.1.Final.jar:]
        at
org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:71)[netty-3.5.1.Final.jar:]
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)[netty-3.5.1.Final.jar:]
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:423)[netty-3.5.1.Final.jar:]
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:404)[netty-3.5.1.Final.jar:]
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:268)[netty-3.5.1.Final.jar:]
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)[netty-3.5.1.Final.jar:]
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)[netty-3.5.1.Final.jar:]
        at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:91)[netty-3.5.1.Final.jar:]
        at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:373)[netty-3.5.1.Final.jar:]
        at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:247)[netty-3.5.1.Final.jar:]
        at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)[netty-3.5.1.Final.jar:]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_26]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_26]
        at java.lang.Thread.run(Thread.java:662)[:1.6.0_26]

I noted that if I replace <code>callback.done(false);</code> by
<code>callback.done(true);</code>, the error disappears.


package test;


import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;

public class NettyTest extends CamelTestSupport {

    @Test
        public void testConcurrentProducers() throws Exception {
        doSendMessages(100, 2);
    }
        
    private void doSendMessages(int files, int poolSize) throws Exception {
        getMockEndpoint("mock:forward").expectedMessageCount(100);
        
        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
        Map<Integer, Future&lt;String>> responses = new
ConcurrentHashMap<Integer, Future&lt;String>>();
        for (int i = 0; i < files; i++) {
            final int index = i;
            Future<String> out = executor.submit(new Callable<String>() {
                public String call() throws Exception {
                        return
template.requestBody("netty:tcp://localhost:6520?sync=true&textline=true",
index , String.class);
                }
            });
            
            responses.put(index, out);
        }
        
        assertMockEndpointsSatisfied();
        
        executor.shutdownNow();
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {

                from("netty:tcp://localhost:6520?sync=true&textline=true")
                        .to("mock:incoming")
                        .to("netty:tcp://localhost:233?textline=true");
                
                from("netty:tcp://localhost:233?textline=true")
                        .process(new AsyncProcessor() {
                                                
                                                @Override
                                                public void process(Exchange 
exchange) throws Exception {
                                                }
                                                
                                                @Override
                                                public boolean process(Exchange 
exchange, AsyncCallback callback) {
                                                        String body = 
exchange.getIn().getBody(String.class);
                                                        
exchange.getOut().setBody("ackOfForward::" + body);
                                                        callback.done(false);
                                                        return true;
                                                }
                                        })
                        .to("mock:forward");
            }
        };
    }

}

Could you help me to understand?

Thank you.

Regards.

Suray Philippe



--
View this message in context: 
http://camel.465427.n5.nabble.com/Error-in-DefaultServicePool-on-Netty-Producer-Queue-is-full-tp5716882p5716932.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to