I have some prototype JMS queue consumer code that is not distributing
messages to consumers as I would expect.  My code creates two threads which
consume messages from a single queue.  Each thread is given a reference to
the same JMS ConnectionFactory with prefetchPolicy.all set to 0 (see config
files below).  Each thread then creates a single JMS Session with 3
Consumers, each with a different selector (2 pairs of identical Consumers,
one per thread).  Finally, the two threads enter an infinite message
consuming loop in which they iterate through their 3 consumers and poll the
queue for messages using Consumer.receiveNoWait().  The only difference
between the two threads is that one thread is "fast"--it just prints out the
message and continues--while the other thread has a 5 second sleep embedded
in it.

If I put 100 messages onto the queue before starting the process that has
the two consumers, then I get the expected behavior.  That is, the fast
thread processes 99 messages very quickly and the slow thread handles just 1
message (remember that prefetch is set to 0).  All work is done in 5
seconds.  However, if I start the consumers first, and then run my program
to put things onto the queue, the behavior is very different.  In this case,
there is a nearly 50/50 split (though not exactly 50/50) between the
distribution of jobs to the fast thread and the distribution of jobs to the
slow thread.  In one run, the fast thread processed 54 jobs very quickly (<
1 second) while the slow thread processed 46 jobs slowly (with a 5 second
interval between jobs).  In this case, all work is done in 230 (46 * 5)
seconds.  This is very different behavior--I don't understand why allocation
of messages to consumers has anything to do with when the consumers were
started vs. when messages were sent to the queue.  In other words, I don't
understand why, in the second case, messages are not distributed to my
second thread's set of JMS consumers when it is just sitting there in a loop
asking over and over again (using Consumer.receiveNoWait()) for a message
when the other thread is busy.

I've attached all the code and config files that I'm using to reproduce this
issue below...

----------------------------------------------------------------------

Here's my client spring config file:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xmlns:amq="http://activemq.apache.org/schema/core";
  xsi:schemaLocation="http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans.xsd
      http://activemq.apache.org/schema/core
      http://activemq.apache.org/schema/core/activemq-core-5.1.0.xsd";>

  <bean id="jmsFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
          <value>tcp://localhost:61616?jms.prefetchPolicy.all=0</value>
        </property>
      </bean>
    </property>
  </bean>

  <amq:queue id="destination"
physicalName="com.infopia.inventory?consumer.prefetchSize=0" />

  <bean id="consJmsTemplate"
class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
      <ref local="jmsFactory"/>
    </property>
  </bean>

  <bean id="jobConsumer" class="com.infopia.inventory.queue.JobConsumer"
scope="prototype">
    <property name="jmsConnectionFactory" ref="jmsFactory"/>
    <property name="destination" ref="destination"/>
  </bean>  

</beans>

----------------------------------------------------------------------

Here's my JMS consumer prototype code:

package com.infopia.inventory.queue;

import java.util.ArrayList;
import java.util.List;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class JobConsumer {
  private ConnectionFactory mFactory;
  private Queue mQueue;

  private final List<Thread> mThreads = new ArrayList<Thread>();

  private static final int NUM_PROCESSORS = 2;

  public static void main(String[] args) {
    ClassPathXmlApplicationContext resources =
        new ClassPathXmlApplicationContext(new String[] { "consContext.xml"
});

    JobConsumer jc = (JobConsumer)resources.getBean("jobConsumer");
    jc.start();
  }

  public void setJmsConnectionFactory(ConnectionFactory factory) {
    mFactory = factory;
  }

  public void setDestination(Queue queue) {
    mQueue = queue;
  }

  public void start() {
    // The JMS initialization is happening inside of the JobProcessor
    // constructor. For purposes of testing it is often convenient to have
the
    // consumer threads start as close to the same time as possible, so
that's
    // the reason we have the 2 loops here (and we don't start the threads
in
    // the first loop)...
    for (int index = 0; index < NUM_PROCESSORS; index++) {
      JobProcessor jp = new JobProcessor(index, mQueue, mFactory);
      Thread thread = new Thread(jp);
      mThreads.add(thread);
    }
    for (Thread t : mThreads) {
      t.start();
    }
  }
}

package com.infopia.inventory.queue;

import java.util.ArrayList;
import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

class JobProcessor implements Runnable {
  private final int mId;
  private final Queue mQueue;
  private final ConnectionFactory mFactory;
  private final Connection mConnection;
  private final Session mSession;
  private final List<MessageConsumer> mConsumers;

  private final static int SLEEP_NO_ITEMS_MILLIS = 1000;

  JobProcessor(int id, Queue queue, ConnectionFactory factory) {
    mId = id;
    mQueue = queue;
    mFactory = factory;
    try {
      mConnection = mFactory.createConnection();
      mSession = mConnection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
      mConsumers = new ArrayList<MessageConsumer>();
      for (int index = 0; index < JobProducer.MAX_CLIENT_ID; index++) {
        MessageConsumer consumer = mSession.createConsumer(mQueue, "clientId
= " + index);
        mConsumers.add(consumer);
      }
    } catch (JMSException e) {
      System.out.format("Processor %d exception: %s%n", mId, e);
      throw new RuntimeException(e);
    }
  }

  @Override
  public void run() {
    try {
      mConnection.start();
      boolean itemFound = true;
      while (true) {
        if (!itemFound) {
          try {
            System.out.format("Processor %d: No items found for any clients,
sleeping %d ms.%n", mId,
                SLEEP_NO_ITEMS_MILLIS);
            Thread.sleep(SLEEP_NO_ITEMS_MILLIS);
          } catch (InterruptedException e) {
          }
        }
        itemFound = false;
        for (int index = 0; index < JobProducer.MAX_CLIENT_ID; index++) {
          MessageConsumer consumer = mConsumers.get(index);
          Message msg = consumer.receiveNoWait();
          if (msg != null) {
            itemFound = true;
            if (msg instanceof TextMessage) {
              int clientId = msg.getIntProperty("clientId");
              long timestamp = msg.getJMSTimestamp();
              TextMessage textMsg = (TextMessage)msg;
              System.out.format("Processor %d: Message received for client
%s at %d: %s%n", mId, clientId,
                  timestamp, textMsg.getText());
              if (mId == 0) {
                try {
                  Thread.sleep(5000);
                } catch (InterruptedException e) {
                }
              }
              msg.acknowledge();
            } else {
              throw new IllegalArgumentException("Unknown JMS message type:
" + msg.getJMSType());
            }
          }
        }
      }
    } catch (JMSException e) {
      System.out.format("Processor %d exception: %s%n", mId, e);
      throw new RuntimeException(e);
    } finally {
      try {
        mConnection.close();
      } catch (JMSException e) {
      }
    }
  }
}

----------------------------------------------------------------------

Here's my activemq.xml...

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at
   
    http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans";
  xmlns:amq="http://activemq.apache.org/schema/core";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd   
  http://activemq.apache.org/camel/schema/spring
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd";>

    <!-- Allows us to use system properties as variables in this
configuration file -->
    <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

    <broker xmlns="http://activemq.apache.org/schema/core";
brokerName="localhost" dataDirectory="${activemq.base}/data">

        <!-- Destination specific policies using destination names or
wildcards -->
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue=">" memoryLimit="5mb"/>
                    <policyEntry topic=">" memoryLimit="5mb">
                        <dispatchPolicy>
                            <strictOrderDispatchPolicy/>
                        </dispatchPolicy>
                        <subscriptionRecoveryPolicy>
                            <lastImageSubscriptionRecoveryPolicy/>
                        </subscriptionRecoveryPolicy>
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <!-- Use the following to configure how ActiveMQ is exposed in JMX
-->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!-- The store and forward broker networks ActiveMQ will listen to
-->
        <networkConnectors>
            <!-- by default just auto discover the other brokers -->
            <networkConnector name="default-nc" uri="multicast://default"/>
            <!-- Example of a static configuration:
            <networkConnector name="host1 and host2"
uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
            -->
        </networkConnectors>

        <persistenceAdapter>
            <amqPersistenceAdapter syncOnWrite="false"
directory="${activemq.base}/data" maxFileLength="20 mb"/>
        </persistenceAdapter>

        <!-- Use the following if you wish to configure the journal with
JDBC -->
        <!--
        <persistenceAdapter>
            <journaledJDBC dataDirectory="${activemq.base}/data"
dataSource="#postgres-ds"/>
        </persistenceAdapter>
        -->

        <!-- Or if you want to use pure JDBC without a journal -->
        <!--
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#postgres-ds"/>
        </persistenceAdapter>
        -->

        <!--  The maximum about of space the broker will use before slowing
down producers -->
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="20 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1 gb" name="foo"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>


        <!-- The transport connectors ActiveMQ will listen to -->
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://localhost:61616"
discoveryUri="multicast://default"/>
            <transportConnector name="ssl" uri="ssl://localhost:61617"/>
            <transportConnector name="stomp" uri="stomp://localhost:61613"/>
            <transportConnector name="xmpp" uri="xmpp://localhost:61222"/>
        </transportConnectors>

        <destinations>
            <queue name="inventory" physicalName="com.infopia.inventory"/>
        </destinations>

    </broker>

    <!--
    ** Lets deploy some Enterprise Integration Patterns inside the ActiveMQ
Message Broker
    ** For more details see
    **
    ** http://activemq.apache.org/enterprise-integration-patterns.html
    -->
    <camelContext id="camel"
xmlns="http://activemq.apache.org/camel/schema/spring";>

        <!-- You can use a <package> element for each root package to search
for Java routes -->
        <package>org.foo.bar</package>

        <!-- You can use Spring XML syntax to define the routes here using
the <route> element -->
        <route>
            <from uri="activemq:example.A"/>
            <to uri="activemq:example.B"/>
        </route>
    </camelContext>


    <!-- Uncomment to create a command agent to respond to message based
admin commands on the ActiveMQ.Agent topic -->
    <!--
    <commandAgent xmlns="http://activemq.apache.org/schema/core";
brokerUrl="vm://localhost"/>
    -->


    <!-- An embedded servlet engine for serving up the Admin console -->
    <jetty xmlns="http://mortbay.com/schemas/jetty/1.0";>
        <connectors>
            <nioConnector port="8161"/>
        </connectors>

        <handlers>
            <webAppContext contextPath="/admin"
resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/>
            <webAppContext contextPath="/demo"
resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/>
            <webAppContext contextPath="/fileserver"
resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/>
        </handlers>
    </jetty>

    <!--  This xbean configuration file supports all the standard spring xml
configuration options -->

    <!-- Postgres DataSource Sample Setup -->
    <!--
    <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource">
      <property name="serverName" value="localhost"/>
      <property name="databaseName" value="activemq"/>
      <property name="portNumber" value="0"/>
      <property name="user" value="activemq"/>
      <property name="password" value="activemq"/>
      <property name="dataSourceName" value="postgres"/>
      <property name="initialConnections" value="1"/>
      <property name="maxConnections" value="10"/>
    </bean>
    -->

    <!-- MySql DataSource Sample Setup -->
    <!--
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
      <property name="url"
value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
      <property name="username" value="activemq"/>
      <property name="password" value="activemq"/>
      <property name="maxActive" value="200"/>
      <property name="poolPreparedStatements" value="true"/>
    </bean>
    -->

    <!-- Oracle DataSource Sample Setup -->
    <!--
    <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
      <property name="driverClassName"
value="oracle.jdbc.driver.OracleDriver"/>
      <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>
      <property name="username" value="scott"/>
      <property name="password" value="tiger"/>
      <property name="maxActive" value="200"/>
      <property name="poolPreparedStatements" value="true"/>
    </bean>
    -->

    <!-- Embedded Derby DataSource Sample Setup -->
    <!--
    <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
      <property name="databaseName" value="derbydb"/>
      <property name="createDatabase" value="create"/>
    </bean>
    -->

</beans>
<!-- END SNIPPET: example -->

----------------------------------------------------------------------

Here's my producer config file:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans";
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
    xmlns:amq="http://activemq.apache.org/schema/core";
    xsi:schemaLocation="http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans.xsd
      http://activemq.apache.org/schema/core
      http://activemq.apache.org/schema/core/activemq-core-5.1.0.xsd";>
      
  <bean id="jmsFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
          <value>tcp://localhost:61616</value>
        </property>
      </bean>
    </property>
  </bean>

  <amq:queue id="destination"  physicalName="com.infopia.inventory"/>
  
  <bean id="pubJmsTemplate"
class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
      <ref local="jmsFactory"/>
    </property>
  </bean>

  <bean id="jobProducer" class="com.infopia.inventory.queue.JobProducer"
scope="prototype">
    <property name="jmsTemplate" ref="pubJmsTemplate"/>
    <property name="destination" ref="destination"/>
  </bean>  
</beans>  

----------------------------------------------------------------------

Here's my JMS production java code:

package com.infopia.inventory.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class JobProducer {
  private JmsTemplate mJmsTemplate;
  private Queue mQueue;

  public static final int MAX_CLIENT_ID = 3;

  public static void main(String[] args) {
    ClassPathXmlApplicationContext resources =
        new ClassPathXmlApplicationContext(new String[] { "prodContext.xml"
});

    JobProducer jp = (JobProducer)resources.getBean("jobProducer");
    jp.createJobs();
    System.exit(0);
  }

  public void setJmsTemplate(JmsTemplate template) {
    mJmsTemplate = template;
  }

  public void setDestination(Queue queue) {
    mQueue = queue;
  }

  private void createJobs() {
    for (int index = 0; index < 100; index++) {
      createJob(index);
      /*
       * try { Thread.sleep(1000); } catch (InterruptedException e) { }
       */
    }
  }

  private void createJob(final int count) {
    mJmsTemplate.send(mQueue, new MessageCreator() {
      public Message createMessage(Session session) throws JMSException {
        TextMessage msg = session.createTextMessage();
        msg.setIntProperty("clientId", count % MAX_CLIENT_ID);
        msg.setText("hello: " + count);
        return msg;
      }
    });
  }
}

-- 
View this message in context: 
http://www.nabble.com/message-distribution-problem-with-prefetch-set-to-zero-tp18449355p18449355.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to