No worries. This is a small Unit Test that shows the problem. Hope it helps:
--
package netty;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class NettyTest extends TestCase
{
private final static Logger logger =
LoggerFactory.getLogger(NettyTest.class);
private final static CamelContext serverContext = new
DefaultCamelContext();
private final CamelContext clientContext = new DefaultCamelContext();
private final AtomicInteger responseCounter = new AtomicInteger(0);
private final AtomicBoolean passedTen = new AtomicBoolean(false);
private Boolean disconnectClient;
public NettyTest(Boolean disconnectClient)
{
this.disconnectClient = disconnectClient;
}
@Parameters
public static Collection<Object[]> configs()
{
return Arrays.asList(new Object[][] { { true }, { false } });
}
@BeforeClass
public static void createServer() throws Exception
{
serverContext.addRoutes(new RouteBuilder()
{
@Override
public void configure() throws Exception
{
from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
.setExchangePattern(ExchangePattern.InOut)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws
Exception
{
Object body = exchange.getIn().getBody();
logger.info("Request received : Value = {}",
body);
}
})
.transform(constant(3)).stop();
}
});
serverContext.start();
}
@Before
public void createClient() throws Exception
{
clientContext.addRoutes(new RouteBuilder()
{
@Override
public void configure() throws Exception
{
// Generate an Echo message and ensure a Response is sent
from("timer://echoTimer?delay=1s&fixedRate=true&period=1s")
.setExchangePattern(ExchangePattern.InOut)
.transform()
.constant(2)
.to(ExchangePattern.InOut,
"netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect="
+ disconnectClient.toString())
.process(new Processor()
{
@Override
public void process(Exchange exchange) throws
Exception
{
Object body = exchange.getIn().getBody();
logger.info("Response number {} : Value =
{}",
responseCounter.incrementAndGet(),
body);
if (responseCounter.get() > 10) {
passedTen.set(true);
}
}
}).stop();
}
});
}
@Test
public void test() throws Exception
{
clientContext.getShutdownStrategy().setTimeout(1);
clientContext.start();
logger.info("Disconnect = {}", this.disconnectClient);
Thread.sleep(TimeUnit.SECONDS.toMillis(15));
clientContext.stop();
assertTrue("More than 10 responses have been received",
passedTen.get());
}
}
--
View this message in context:
http://camel.465427.n5.nabble.com/Camel-Netty-Producer-creating-new-connection-on-every-message-tp4844805p4852442.html
Sent from the Camel - Users mailing list archive at Nabble.com.