Oh, this is using: ActiveMQ 5.3.1 Spring 2.5.6
On Fri, May 7, 2010 at 10:19 PM, Jamie McCrindle <jamiemccrin...@gmail.com> wrote: > Greetings all, > > After some weeks of scratching my head, I _believe_ I have found the > magic combination that appears to be causing messages to become stuck > in our network of brokers. It's so convoluted that it's entirely > likely that the error isn't what I think it is but I have managed to > create a test case that mirrors the behaviour we're seeing live. And > it goes something like this: > > We have two brokers in a network of brokers. Producers were publishing > to a queue on one of the brokers and consumers reading off the queue > in the other broker. After a short while, messages would suddenly pile > up on the 'producer' broker and not get read off on the 'consumer' > broker. > > Pause for lots of random testing... > > It appeared that the network bridge subscription from the 'consumer' > broker was disappearing from the 'producer' broker, causing the pile > up. > > More testing later... > > And it looks like if we have a DefaultMessageListenerContainer with > the following configuration: > > maxMessagesPerTask: 1 > cacheLevel: CONSUMER > maxConcurrentConsumer: 3 (more than 1, basically) > concurrentConsumers: 1 > sessionAcknowledgeMode: Session.AUTO_ACKNOWLEDGE > > When Spring scales back the dynamic amount of consumers, the network > subscription appears to get lost and messages pile up on the producer > side. > > Workaround: > > Use concurrentConsumers instead of maxConcurrentConsumers so that > there are a static number of consumers (not setting maxMessagesPerTask > also seems to work). > > I'll add it this to Jira if you'd like but the test case to replicate > the behaviour is as follows: > > > package org.example.activemq; > > import java.util.concurrent.Callable; > import java.util.concurrent.CountDownLatch; > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Executors; > import java.util.concurrent.TimeUnit; > > import javax.jms.JMSException; > import javax.jms.Message; > import javax.jms.MessageListener; > import javax.jms.Session; > import javax.jms.TextMessage; > > import junit.framework.TestCase; > > import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.activemq.broker.BrokerService; > import org.apache.activemq.command.ActiveMQQueue; > import org.apache.activemq.network.NetworkConnector; > import org.apache.activemq.pool.PooledConnectionFactory; > import org.apache.activemq.store.memory.MemoryPersistenceAdapter; > import org.springframework.jms.core.JmsTemplate; > import org.springframework.jms.core.MessageCreator; > import org.springframework.jms.listener.DefaultMessageListenerContainer; > > public class NetworkTest extends TestCase { > > public void testNetworkOfBrokers() throws Exception { > BrokerService brokerService1 = null; > BrokerService brokerService2 = null; > > final int total = 100; > final CountDownLatch latch = new CountDownLatch(total); > > try { > > { > brokerService1 = new BrokerService(); > brokerService1.setBrokerName("one"); > brokerService1.setUseJmx(false); > brokerService1.setPersistenceAdapter(new > MemoryPersistenceAdapter()); > brokerService1.addConnector("tcp://0.0.0.0:61616"); > NetworkConnector network1 = > brokerService1.addNetworkConnector("static:(tcp://localhost:51515)"); > network1.setName("network1"); > network1.setDynamicOnly(true); > network1.setNetworkTTL(3); > network1.setPrefetchSize(1); > brokerService1.start(); > } > > { > brokerService2 = new BrokerService(); > brokerService2.setBrokerName("two"); > brokerService2.setUseJmx(false); > brokerService2.setPersistenceAdapter(new > MemoryPersistenceAdapter()); > brokerService2.addConnector("tcp://0.0.0.0:51515"); > NetworkConnector network2 = > brokerService2.addNetworkConnector("static:(tcp://localhost:61616)"); > network2.setName("network1"); > network2.setDynamicOnly(true); > network2.setNetworkTTL(3); > network2.setPrefetchSize(1); > brokerService2.start(); > } > > ExecutorService pool = Executors.newSingleThreadExecutor(); > > ActiveMQConnectionFactory connectionFactory1 = new > ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:51515)?randomize=false"); > > > final DefaultMessageListenerContainer container = new > DefaultMessageListenerContainer(); > container.setConnectionFactory(connectionFactory1); > container.setMaxConcurrentConsumers(10); > container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); > > container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); > container.setDestination(new ActiveMQQueue("testingqueue")); > container.setMessageListener(new MessageListener() { > public void onMessage(Message message) { > latch.countDown(); > } > }); > container.setMaxMessagesPerTask(1); > container.afterPropertiesSet(); > container.start(); > > pool.submit(new Callable<Object>() { > public Object call() throws Exception { > try { > final int batch = 10; > ActiveMQConnectionFactory connectionFactory2 = new > ActiveMQConnectionFactory("failover:(tcp://localhost:51515,tcp://localhost:61616)?randomize=false"); > PooledConnectionFactory pooledConnectionFactory = > new PooledConnectionFactory(connectionFactory2); > JmsTemplate template = new > JmsTemplate(pooledConnectionFactory); > ActiveMQQueue queue = new ActiveMQQueue("testingqueue"); > for(int b = 0; b < batch; b++) { > for(int i = 0; i < (total / batch); i++) { > template.send(queue, new MessageCreator() { > public Message createMessage(Session > session) throws JMSException { > TextMessage message = > session.createTextMessage(); > message.setText("Hello World!"); > return message; > } > }); > } > // give spring time to scale back again > while(container.getActiveConsumerCount() > 1) { > System.out.println("active consumer count: > " + container.getActiveConsumerCount()); > System.out.println("concurrent consumer > count: " + container.getConcurrentConsumers()); > Thread.sleep(1000); > } > } > pooledConnectionFactory.stop(); > } catch(Throwable t) { > t.printStackTrace(); > } > return null; > } > }); > > pool.shutdown(); > pool.awaitTermination(10, TimeUnit.SECONDS); > > int count = 0; > > // give it 20 seconds > while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) { > System.out.println("count " + latch.getCount()); > } > > > container.destroy(); > > } finally { > try { if(brokerService1 != null) { brokerService1.stop(); > }} catch(Throwable t) { t.printStackTrace(); } > try { if(brokerService2 != null) { brokerService2.stop(); > }} catch(Throwable t) { t.printStackTrace(); } > } > > if(latch.getCount() > 0) { > fail("latch should have gone down to 0 but was " + > latch.getCount()); > } > > } > > } >