I've been trying for a while now to figure out how to get full control over the
threads used by the JMS/ActiveMQ components... and so far I've had zero luck.
Seems like no matter what I try, the JMS component always goes and makes a
bunch of threads.
I'm testing out with some simple routes:
from("jms:topic:test.>").bean(new Tester("root"));
from("jms:topic:test.a").bean(new Tester("a"));
from("jms:topic:test.b").bean(new Tester("b"));
from("jms:topic:test.c").bean(new Tester("c"));
from("jms:topic:test.d").bean(new Tester("d"));
from("jms:topic:test.e").bean(new Tester("e"));
Tester simply logs the message body with a prefix of its constructor argument.
"jms" component is basically:
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("vm://localhost");
SingleConnectionFactory singleConnectionFactory = new
SingleConnectionFactory(connectionFactory);
singleConnectionFactory.setReconnectOnException(true);
ActiveMQConfiguration jmsConfig = new ActiveMQConfiguration();
jmsConfig.setConnectionFactory(singleConnectionFactory);
jmsConfig.setConsumerType(ConsumerType.Simple);
ActiveMQComponent jms = new ActiveMQComponent(jmsConfig);
I then start everything up and send off some messages:
for (int i=0; i<10; i++) {
producer.sendBody("jms:topic:test.a", "a");
producer.sendBody("jms:topic:test.b", "b");
producer.sendBody("jms:topic:test.c", "c");
producer.sendBody("jms:topic:test.d", "d");
producer.sendBody("jms:topic:test.e", "e");
producer.sendBody("jms:topic:test.f", "f");
producer.sendBody("jms:topic:test.g", "g");
}
This ends up making a bunch of threads JmsConsumer threads:
"Camel (camel-1) thread #0 - JmsConsumer[test.a]"@1,678 in group "main": RUNNING
"Camel (camel-1) thread #1 - JmsConsumer[test.>]"@1,681 in group "main": WAIT
"Camel (camel-1) thread #10 - JmsConsumer[test.>]"@1,717 in group "main": WAIT
"Camel (camel-1) thread #11 - JmsConsumer[test.>]"@1,722 in group "main": WAIT
"Camel (camel-1) thread #12 - JmsConsumer[test.>]"@1,725 in group "main": WAIT
"Camel (camel-1) thread #13 - JmsConsumer[test.a]"@1,726 in group "main": WAIT
"Camel (camel-1) thread #15 - JmsConsumer[test.>]"@1,730 in group "main": WAIT
"Camel (camel-1) thread #16 - JmsConsumer[test.b]"@1,731 in group "main": WAIT
"Camel (camel-1) thread #17 - JmsConsumer[test.>]"@1,735 in group "main": WAIT
"Camel (camel-1) thread #18 - JmsConsumer[test.c]"@1,736 in group "main": WAIT
"Camel (camel-1) thread #19 - JmsConsumer[test.>]"@1,737 in group "main": WAIT
"Camel (camel-1) thread #2 - JmsConsumer[test.>]"@1,701 in group "main": WAIT
"Camel (camel-1) thread #20 - JmsConsumer[test.d]"@1,738 in group "main": WAIT
"Camel (camel-1) thread #21 - JmsConsumer[test.>]"@1,739 in group "main": WAIT
"Camel (camel-1) thread #22 - JmsConsumer[test.e]"@1,740 in group "main": WAIT
"Camel (camel-1) thread #23 - JmsConsumer[test.>]"@1,741 in group "main": WAIT
"Camel (camel-1) thread #24 - JmsConsumer[test.>]"@1,742 in group "main": WAIT
"Camel (camel-1) thread #25 - JmsConsumer[test.>]"@1,743 in group "main": WAIT
"Camel (camel-1) thread #26 - JmsConsumer[test.a]"@1,744 in group "main": WAIT
"Camel (camel-1) thread #27 - JmsConsumer[test.>]"@1,746 in group "main": WAIT
"Camel (camel-1) thread #28 - JmsConsumer[test.b]"@1,747 in group "main": WAIT
"Camel (camel-1) thread #29 - JmsConsumer[test.>]"@1,748 in group "main": WAIT
"Camel (camel-1) thread #3 - JmsConsumer[test.b]"@1,702 in group "main": WAIT
"Camel (camel-1) thread #30 - JmsConsumer[test.c]"@1,749 in group "main": WAIT
"Camel (camel-1) thread #31 - JmsConsumer[test.d]"@1,752 in group "main": WAIT
"Camel (camel-1) thread #32 - JmsConsumer[test.>]"@1,751 in group "main": WAIT
"Camel (camel-1) thread #33 - JmsConsumer[test.>]"@1,755 in group "main": WAIT
"Camel (camel-1) thread #34 - JmsConsumer[test.e]"@1,756 in group "main": WAIT
"Camel (camel-1) thread #35 - JmsConsumer[test.>]"@1,758 in group "main": WAIT
"Camel (camel-1) thread #36 - JmsConsumer[test.>]"@1,759 in group "main": WAIT
"Camel (camel-1) thread #37 - JmsConsumer[test.>]"@1,760 in group "main": WAIT
"Camel (camel-1) thread #39 - JmsConsumer[test.>]"@1,762 in group "main": WAIT
"Camel (camel-1) thread #4 - JmsConsumer[test.>]"@1,705 in group "main": WAIT
"Camel (camel-1) thread #5 - JmsConsumer[test.c]"@1,706 in group "main": WAIT
"Camel (camel-1) thread #6 - JmsConsumer[test.>]"@1,709 in group "main": WAIT
"Camel (camel-1) thread #7 - JmsConsumer[test.d]"@1,710 in group "main": WAIT
"Camel (camel-1) thread #8 - JmsConsumer[test.>]"@1,713 in group "main": WAIT
"Camel (camel-1) thread #9 - JmsConsumer[test.e]"@1,714 in group "main": WAIT
And the ExecutorServiceStrategy looks like its got a pool for each of these
components + one more:
• executorServiceStrategy =
{org.apache.camel.impl.DefaultExecutorServiceStrategy@1771}
• executorServices = {java.util.ArrayList@2666} size = 8
• [0] = {java.util.concurrent.ThreadPoolExecutor@1773}
• [1] = {java.util.concurrent.ThreadPoolExecutor@2687}
• [2] = {java.util.concurrent.ScheduledThreadPoolExecutor@2688}
• [3] = {java.util.concurrent.ThreadPoolExecutor@2689}
• [4] = {java.util.concurrent.ThreadPoolExecutor@2690}
• [5] = {java.util.concurrent.ThreadPoolExecutor@2691}
• [6] = {java.util.concurrent.ThreadPoolExecutor@2692}
• [7] = {java.util.concurrent.ThreadPoolExecutor@2693}
Does each route gets its own threadpool?
I've tried to set the default thread pool profile to something small, like:
ThreadPoolProfile defaultProfile = new
ThreadPoolProfileSupport("defaultThreadPoolProfile2");
defaultProfile.setPoolSize(1);
defaultProfile.setMaxPoolSize(1);
defaultProfile.setKeepAliveTime(60L);
defaultProfile.setTimeUnit(TimeUnit.SECONDS);
defaultProfile.setMaxQueueSize(1000);
defaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns);
executorServiceStrategy.setDefaultThreadPoolProfile(defaultProfile);
But this does not seem to have much effect on the number of threads created.
I've tried setting the executorServiceRef explicitly via
threads().executorServiceRef() but that also doesn't seem to affect the
configuration used to create the JmsConsumer threads.
* * *
I'm trying to setup some topic subscriptions (may have many of them) which flow
over a single connection using the JMS message listener's session thread (not
polling receive with the Default consumer type) and trying to get all of those
consumers to fire off to a single small threadpool (~5) for processing.
Is that possible with Camel & the JMS/ActiveMQ components, or do I need to
write a custom JMS component which provides the desired thread semantics? I may
have to write a custom component anyways, to get rid of the ~5mb of spring
dependencies which these components pull in, but thats another matter. I'd
still really like to know how to get full control over the threads used by the
JMS/ActiveMQ components as they are today.
Can anyone drop some knowledge?
--jason