Turns out it doesn't have anything to do with the JMSTemplate. As far as I can tell, producer flow control doesn't work for topics. Unless I've missed something fundamental, I've included a test case that proves that. If you run the code you'll see the producer halt when PFC kicks in, and never start up again, even when the consumer has consumed all outstanding messages. If you change the Destination from a topic to a queue, PFC does work as excepted.
Regards, Maarten import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQTopic; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class BrokerTest implements MessageListener { private static final Log log = LogFactory.getLog(BrokerTest.class); private static final String brokerName = "testBroker"; private static final String brokerUrl = "vm://testBroker"; private static final int destinationMemLimit = 2097152; // 2MB private static final AtomicLong produced = new AtomicLong(); private static final AtomicLong consumed = new AtomicLong(); public static void main(String[] args) throws Exception { // Setup and start the broker BrokerService broker = new BrokerService(); broker.setBrokerName(brokerName); broker.setPersistent(false); broker.setSchedulerSupport(false); broker.setUseJmx(false); broker.setUseShutdownHook(false); broker.addConnector(brokerUrl); // Setup the destination policy PolicyMap pm = new PolicyMap(); // Setup the topic destination policy PolicyEntry tpe = new PolicyEntry(); tpe.setTopic(">"); tpe.setMemoryLimit(destinationMemLimit); tpe.setProducerFlowControl(true); // Setup the topic destination policy PolicyEntry qpe = new PolicyEntry(); qpe.setQueue(">"); qpe.setMemoryLimit(destinationMemLimit); qpe.setProducerFlowControl(true); qpe.setQueuePrefetch(1); pm.setPolicyEntries(Arrays.asList(new PolicyEntry[] { tpe, qpe })); broker.setDestinationPolicy(pm); // Start the broker broker.start(); Destination destination = new ActiveMQTopic("test"); //Destination destination = new ActiveMQQueue("test"); // Create the connection factory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl); connectionFactory.setAlwaysSyncSend(true); connectionFactory.setProducerWindowSize(1024); // Start the test destination listener Connection c = connectionFactory.createConnection(); c.start(); c.createSession(false, 1).createConsumer(destination).setMessageListener(new BrokerTest()); // Start producing the test messages Session s = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer p = s.createProducer(destination); for (long i = 0; i < 2000000L; i++) { p.send(s.createTextMessage("test")); long count = produced.incrementAndGet(); if (count % 1000 == 0) { log.debug("Produced " + count / 1000 + "k messages"); } } } @Override public void onMessage(Message arg0) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } long count = consumed.incrementAndGet(); if (count % 1000 == 0) { log.debug("\tConsumed " + count / 1000 + "k messages"); } } } -- View this message in context: http://activemq.2283324.n4.nabble.com/Producer-flow-control-question-tp3092808p3162214.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.