I have some prototype JMS queue consumer code that is not distributing messages to consumers as I would expect. My code creates two threads which consume messages from a single queue. Each thread is given a reference to the same JMS ConnectionFactory with prefetchPolicy.all set to 0 (see config files below). Each thread then creates a single JMS Session with 3 Consumers, each with a different selector (2 pairs of identical Consumers, one per thread). Finally, the two threads enter an infinite message consuming loop in which they iterate through their 3 consumers and poll the queue for messages using Consumer.receiveNoWait(). The only difference between the two threads is that one thread is "fast"--it just prints out the message and continues--while the other thread has a 5 second sleep embedded in it.
If I put 100 messages onto the queue before starting the process that has the two consumers, then I get the expected behavior. That is, the fast thread processes 99 messages very quickly and the slow thread handles just 1 message (remember that prefetch is set to 0). All work is done in 5 seconds. However, if I start the consumers first, and then run my program to put things onto the queue, the behavior is very different. In this case, there is a nearly 50/50 split (though not exactly 50/50) between the distribution of jobs to the fast thread and the distribution of jobs to the slow thread. In one run, the fast thread processed 54 jobs very quickly (< 1 second) while the slow thread processed 46 jobs slowly (with a 5 second interval between jobs). In this case, all work is done in 230 (46 * 5) seconds. This is very different behavior--I don't understand why allocation of messages to consumers has anything to do with when the consumers were started vs. when messages were sent to the queue. In other words, I don't understand why, in the second case, messages are not distributed to my second thread's set of JMS consumers when it is just sitting there in a loop asking over and over again (using Consumer.receiveNoWait()) for a message when the other thread is busy. I've attached all the code and config files that I'm using to reproduce this issue below... ---------------------------------------------------------------------- Here's my client spring config file: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.1.0.xsd"> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616?jms.prefetchPolicy.all=0</value> </property> </bean> </property> </bean> <amq:queue id="destination" physicalName="com.infopia.inventory?consumer.prefetchSize=0" /> <bean id="consJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref local="jmsFactory"/> </property> </bean> <bean id="jobConsumer" class="com.infopia.inventory.queue.JobConsumer" scope="prototype"> <property name="jmsConnectionFactory" ref="jmsFactory"/> <property name="destination" ref="destination"/> </bean> </beans> ---------------------------------------------------------------------- Here's my JMS consumer prototype code: package com.infopia.inventory.queue; import java.util.ArrayList; import java.util.List; import javax.jms.ConnectionFactory; import javax.jms.Queue; import org.springframework.context.support.ClassPathXmlApplicationContext; public class JobConsumer { private ConnectionFactory mFactory; private Queue mQueue; private final List<Thread> mThreads = new ArrayList<Thread>(); private static final int NUM_PROCESSORS = 2; public static void main(String[] args) { ClassPathXmlApplicationContext resources = new ClassPathXmlApplicationContext(new String[] { "consContext.xml" }); JobConsumer jc = (JobConsumer)resources.getBean("jobConsumer"); jc.start(); } public void setJmsConnectionFactory(ConnectionFactory factory) { mFactory = factory; } public void setDestination(Queue queue) { mQueue = queue; } public void start() { // The JMS initialization is happening inside of the JobProcessor // constructor. For purposes of testing it is often convenient to have the // consumer threads start as close to the same time as possible, so that's // the reason we have the 2 loops here (and we don't start the threads in // the first loop)... for (int index = 0; index < NUM_PROCESSORS; index++) { JobProcessor jp = new JobProcessor(index, mQueue, mFactory); Thread thread = new Thread(jp); mThreads.add(thread); } for (Thread t : mThreads) { t.start(); } } } package com.infopia.inventory.queue; import java.util.ArrayList; import java.util.List; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; class JobProcessor implements Runnable { private final int mId; private final Queue mQueue; private final ConnectionFactory mFactory; private final Connection mConnection; private final Session mSession; private final List<MessageConsumer> mConsumers; private final static int SLEEP_NO_ITEMS_MILLIS = 1000; JobProcessor(int id, Queue queue, ConnectionFactory factory) { mId = id; mQueue = queue; mFactory = factory; try { mConnection = mFactory.createConnection(); mSession = mConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); mConsumers = new ArrayList<MessageConsumer>(); for (int index = 0; index < JobProducer.MAX_CLIENT_ID; index++) { MessageConsumer consumer = mSession.createConsumer(mQueue, "clientId = " + index); mConsumers.add(consumer); } } catch (JMSException e) { System.out.format("Processor %d exception: %s%n", mId, e); throw new RuntimeException(e); } } @Override public void run() { try { mConnection.start(); boolean itemFound = true; while (true) { if (!itemFound) { try { System.out.format("Processor %d: No items found for any clients, sleeping %d ms.%n", mId, SLEEP_NO_ITEMS_MILLIS); Thread.sleep(SLEEP_NO_ITEMS_MILLIS); } catch (InterruptedException e) { } } itemFound = false; for (int index = 0; index < JobProducer.MAX_CLIENT_ID; index++) { MessageConsumer consumer = mConsumers.get(index); Message msg = consumer.receiveNoWait(); if (msg != null) { itemFound = true; if (msg instanceof TextMessage) { int clientId = msg.getIntProperty("clientId"); long timestamp = msg.getJMSTimestamp(); TextMessage textMsg = (TextMessage)msg; System.out.format("Processor %d: Message received for client %s at %d: %s%n", mId, clientId, timestamp, textMsg.getText()); if (mId == 0) { try { Thread.sleep(5000); } catch (InterruptedException e) { } } msg.acknowledge(); } else { throw new IllegalArgumentException("Unknown JMS message type: " + msg.getJMSType()); } } } } } catch (JMSException e) { System.out.format("Processor %d exception: %s%n", mId, e); throw new RuntimeException(e); } finally { try { mConnection.close(); } catch (JMSException e) { } } } } ---------------------------------------------------------------------- Here's my activemq.xml... <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- START SNIPPET: example --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data"> <!-- Destination specific policies using destination names or wildcards --> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" memoryLimit="5mb"/> <policyEntry topic=">" memoryLimit="5mb"> <dispatchPolicy> <strictOrderDispatchPolicy/> </dispatchPolicy> <subscriptionRecoveryPolicy> <lastImageSubscriptionRecoveryPolicy/> </subscriptionRecoveryPolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- Use the following to configure how ActiveMQ is exposed in JMX --> <managementContext> <managementContext createConnector="false"/> </managementContext> <!-- The store and forward broker networks ActiveMQ will listen to --> <networkConnectors> <!-- by default just auto discover the other brokers --> <networkConnector name="default-nc" uri="multicast://default"/> <!-- Example of a static configuration: <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/> --> </networkConnectors> <persistenceAdapter> <amqPersistenceAdapter syncOnWrite="false" directory="${activemq.base}/data" maxFileLength="20 mb"/> </persistenceAdapter> <!-- Use the following if you wish to configure the journal with JDBC --> <!-- <persistenceAdapter> <journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#postgres-ds"/> </persistenceAdapter> --> <!-- Or if you want to use pure JDBC without a journal --> <!-- <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#postgres-ds"/> </persistenceAdapter> --> <!-- The maximum about of space the broker will use before slowing down producers --> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="1 gb" name="foo"/> </storeUsage> <tempUsage> <tempUsage limit="100 mb"/> </tempUsage> </systemUsage> </systemUsage> <!-- The transport connectors ActiveMQ will listen to --> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/> <transportConnector name="ssl" uri="ssl://localhost:61617"/> <transportConnector name="stomp" uri="stomp://localhost:61613"/> <transportConnector name="xmpp" uri="xmpp://localhost:61222"/> </transportConnectors> <destinations> <queue name="inventory" physicalName="com.infopia.inventory"/> </destinations> </broker> <!-- ** Lets deploy some Enterprise Integration Patterns inside the ActiveMQ Message Broker ** For more details see ** ** http://activemq.apache.org/enterprise-integration-patterns.html --> <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring"> <!-- You can use a <package> element for each root package to search for Java routes --> <package>org.foo.bar</package> <!-- You can use Spring XML syntax to define the routes here using the <route> element --> <route> <from uri="activemq:example.A"/> <to uri="activemq:example.B"/> </route> </camelContext> <!-- Uncomment to create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic --> <!-- <commandAgent xmlns="http://activemq.apache.org/schema/core" brokerUrl="vm://localhost"/> --> <!-- An embedded servlet engine for serving up the Admin console --> <jetty xmlns="http://mortbay.com/schemas/jetty/1.0"> <connectors> <nioConnector port="8161"/> </connectors> <handlers> <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/> <webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/> <webAppContext contextPath="/fileserver" resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/> </handlers> </jetty> <!-- This xbean configuration file supports all the standard spring xml configuration options --> <!-- Postgres DataSource Sample Setup --> <!-- <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource"> <property name="serverName" value="localhost"/> <property name="databaseName" value="activemq"/> <property name="portNumber" value="0"/> <property name="user" value="activemq"/> <property name="password" value="activemq"/> <property name="dataSourceName" value="postgres"/> <property name="initialConnections" value="1"/> <property name="maxConnections" value="10"/> </bean> --> <!-- MySql DataSource Sample Setup --> <!-- <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> --> <!-- Oracle DataSource Sample Setup --> <!-- <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/> <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/> <property name="username" value="scott"/> <property name="password" value="tiger"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> --> <!-- Embedded Derby DataSource Sample Setup --> <!-- <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource"> <property name="databaseName" value="derbydb"/> <property name="createDatabase" value="create"/> </bean> --> </beans> <!-- END SNIPPET: example --> ---------------------------------------------------------------------- Here's my producer config file: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.1.0.xsd"> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> </property> </bean> <amq:queue id="destination" physicalName="com.infopia.inventory"/> <bean id="pubJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref local="jmsFactory"/> </property> </bean> <bean id="jobProducer" class="com.infopia.inventory.queue.JobProducer" scope="prototype"> <property name="jmsTemplate" ref="pubJmsTemplate"/> <property name="destination" ref="destination"/> </bean> </beans> ---------------------------------------------------------------------- Here's my JMS production java code: package com.infopia.inventory.queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class JobProducer { private JmsTemplate mJmsTemplate; private Queue mQueue; public static final int MAX_CLIENT_ID = 3; public static void main(String[] args) { ClassPathXmlApplicationContext resources = new ClassPathXmlApplicationContext(new String[] { "prodContext.xml" }); JobProducer jp = (JobProducer)resources.getBean("jobProducer"); jp.createJobs(); System.exit(0); } public void setJmsTemplate(JmsTemplate template) { mJmsTemplate = template; } public void setDestination(Queue queue) { mQueue = queue; } private void createJobs() { for (int index = 0; index < 100; index++) { createJob(index); /* * try { Thread.sleep(1000); } catch (InterruptedException e) { } */ } } private void createJob(final int count) { mJmsTemplate.send(mQueue, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage msg = session.createTextMessage(); msg.setIntProperty("clientId", count % MAX_CLIENT_ID); msg.setText("hello: " + count); return msg; } }); } } -- View this message in context: http://www.nabble.com/message-distribution-problem-with-prefetch-set-to-zero-tp18449355p18449355.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.