Turns out it doesn't have anything to do with the JMSTemplate. As far as I
can tell, producer flow control doesn't work for topics. Unless I've missed
something fundamental, I've included a test case that proves that. If you
run the code you'll see the producer halt when PFC kicks in, and never start
up again, even when the consumer has consumed all outstanding messages. If
you change the Destination from a topic to a queue, PFC does work as
excepted.

Regards,
Maarten

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class BrokerTest implements MessageListener {
  private static final Log log = LogFactory.getLog(BrokerTest.class);
  private static final String brokerName = "testBroker";
  private static final String brokerUrl = "vm://testBroker";
  private static final int destinationMemLimit = 2097152; // 2MB
  private static final AtomicLong produced = new AtomicLong();
  private static final AtomicLong consumed = new AtomicLong();

  public static void main(String[] args) throws Exception {
    // Setup and start the broker
    BrokerService broker = new BrokerService();
    broker.setBrokerName(brokerName);
    broker.setPersistent(false);
    broker.setSchedulerSupport(false);
    broker.setUseJmx(false);
    broker.setUseShutdownHook(false);
    broker.addConnector(brokerUrl);

    // Setup the destination policy
    PolicyMap pm = new PolicyMap();

    // Setup the topic destination policy
    PolicyEntry tpe = new PolicyEntry();
    tpe.setTopic(">");
    tpe.setMemoryLimit(destinationMemLimit);
    tpe.setProducerFlowControl(true);

    // Setup the topic destination policy
    PolicyEntry qpe = new PolicyEntry();
    qpe.setQueue(">");
    qpe.setMemoryLimit(destinationMemLimit);
    qpe.setProducerFlowControl(true);
    qpe.setQueuePrefetch(1);

    pm.setPolicyEntries(Arrays.asList(new PolicyEntry[] { tpe, qpe }));

    broker.setDestinationPolicy(pm);

    // Start the broker
    broker.start();

    Destination destination = new ActiveMQTopic("test");
    //Destination destination = new ActiveMQQueue("test");

    // Create the connection factory
    ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(brokerUrl);
    connectionFactory.setAlwaysSyncSend(true);
    connectionFactory.setProducerWindowSize(1024);

    // Start the test destination listener
    Connection c = connectionFactory.createConnection();
    c.start();
    c.createSession(false,
1).createConsumer(destination).setMessageListener(new BrokerTest());

    // Start producing the test messages
    Session s = connectionFactory.createConnection().createSession(false,
Session.AUTO_ACKNOWLEDGE);
    MessageProducer p = s.createProducer(destination);

    for (long i = 0; i < 2000000L; i++) {
      p.send(s.createTextMessage("test"));

      long count = produced.incrementAndGet();
      if (count % 1000 == 0) {
        log.debug("Produced " + count / 1000 + "k messages");
      }
    }
  }

  @Override
  public void onMessage(Message arg0) {
    try {
      Thread.sleep(1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    long count = consumed.incrementAndGet();
    if (count % 1000 == 0) {
      log.debug("\tConsumed " + count / 1000 + "k messages");
    }
  }
}
-- 
View this message in context: 
http://activemq.2283324.n4.nabble.com/Producer-flow-control-question-tp3092808p3162214.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to