Oh, this is using:

ActiveMQ 5.3.1
Spring 2.5.6

On Fri, May 7, 2010 at 10:19 PM, Jamie McCrindle
<jamiemccrin...@gmail.com> wrote:
> Greetings all,
>
> After some weeks of scratching my head, I _believe_ I have found the
> magic combination that appears to be causing messages to become stuck
> in our network of brokers. It's so convoluted that it's entirely
> likely that the error isn't what I think it is but I have managed to
> create a test case that mirrors the behaviour we're seeing live. And
> it goes something like this:
>
> We have two brokers in a network of brokers. Producers were publishing
> to a queue on one of the brokers and consumers reading off the queue
> in the other broker. After a short while, messages would suddenly pile
> up on the 'producer' broker and not get read off on the 'consumer'
> broker.
>
> Pause for lots of random testing...
>
> It appeared that the network bridge subscription from the 'consumer'
> broker was disappearing from the 'producer' broker, causing the pile
> up.
>
> More testing later...
>
> And it looks like if we have a DefaultMessageListenerContainer with
> the following configuration:
>
> maxMessagesPerTask: 1
> cacheLevel: CONSUMER
> maxConcurrentConsumer: 3 (more than 1, basically)
> concurrentConsumers: 1
> sessionAcknowledgeMode: Session.AUTO_ACKNOWLEDGE
>
> When Spring scales back the dynamic amount of consumers, the network
> subscription appears to get lost and messages pile up on the producer
> side.
>
> Workaround:
>
> Use concurrentConsumers instead of maxConcurrentConsumers so that
> there are a static number of consumers (not setting maxMessagesPerTask
> also seems to work).
>
> I'll add it this to Jira if you'd like but the test case to replicate
> the behaviour is as follows:
>
>
> package org.example.activemq;
>
> import java.util.concurrent.Callable;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.TimeUnit;
>
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import junit.framework.TestCase;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.command.ActiveMQQueue;
> import org.apache.activemq.network.NetworkConnector;
> import org.apache.activemq.pool.PooledConnectionFactory;
> import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.core.MessageCreator;
> import org.springframework.jms.listener.DefaultMessageListenerContainer;
>
> public class NetworkTest extends TestCase {
>
>    public void testNetworkOfBrokers() throws Exception {
>        BrokerService brokerService1 = null;
>        BrokerService brokerService2 = null;
>
>        final int total = 100;
>        final CountDownLatch latch = new CountDownLatch(total);
>
>        try {
>
>        {
>            brokerService1 = new BrokerService();
>            brokerService1.setBrokerName("one");
>            brokerService1.setUseJmx(false);
>            brokerService1.setPersistenceAdapter(new
> MemoryPersistenceAdapter());
>            brokerService1.addConnector("tcp://0.0.0.0:61616");
>            NetworkConnector network1 =
> brokerService1.addNetworkConnector("static:(tcp://localhost:51515)");
>            network1.setName("network1");
>            network1.setDynamicOnly(true);
>            network1.setNetworkTTL(3);
>            network1.setPrefetchSize(1);
>            brokerService1.start();
>        }
>
>        {
>            brokerService2 = new BrokerService();
>            brokerService2.setBrokerName("two");
>            brokerService2.setUseJmx(false);
>            brokerService2.setPersistenceAdapter(new
> MemoryPersistenceAdapter());
>            brokerService2.addConnector("tcp://0.0.0.0:51515");
>            NetworkConnector network2 =
> brokerService2.addNetworkConnector("static:(tcp://localhost:61616)");
>            network2.setName("network1");
>            network2.setDynamicOnly(true);
>            network2.setNetworkTTL(3);
>            network2.setPrefetchSize(1);
>            brokerService2.start();
>        }
>
>        ExecutorService pool = Executors.newSingleThreadExecutor();
>
>        ActiveMQConnectionFactory connectionFactory1 = new
> ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:51515)?randomize=false");
>
>
>        final DefaultMessageListenerContainer container = new
> DefaultMessageListenerContainer();
>        container.setConnectionFactory(connectionFactory1);
>        container.setMaxConcurrentConsumers(10);
>        container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
>        
> container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
>        container.setDestination(new ActiveMQQueue("testingqueue"));
>        container.setMessageListener(new MessageListener() {
>            public void onMessage(Message message) {
>                latch.countDown();
>            }
>        });
>        container.setMaxMessagesPerTask(1);
>        container.afterPropertiesSet();
>        container.start();
>
>        pool.submit(new Callable<Object>() {
>            public Object call() throws Exception {
>                try {
>                    final int batch = 10;
>                    ActiveMQConnectionFactory connectionFactory2 = new
> ActiveMQConnectionFactory("failover:(tcp://localhost:51515,tcp://localhost:61616)?randomize=false");
>                    PooledConnectionFactory pooledConnectionFactory =
> new PooledConnectionFactory(connectionFactory2);
>                    JmsTemplate template = new
> JmsTemplate(pooledConnectionFactory);
>                    ActiveMQQueue queue = new ActiveMQQueue("testingqueue");
>                    for(int b = 0; b < batch; b++) {
>                        for(int i = 0; i < (total / batch); i++) {
>                            template.send(queue, new MessageCreator() {
>                                public Message createMessage(Session
> session) throws JMSException {
>                                    TextMessage message =
> session.createTextMessage();
>                                    message.setText("Hello World!");
>                                    return message;
>                                }
>                            });
>                        }
>                        // give spring time to scale back again
>                        while(container.getActiveConsumerCount() > 1) {
>                            System.out.println("active consumer count:
> " + container.getActiveConsumerCount());
>                            System.out.println("concurrent consumer
> count: " + container.getConcurrentConsumers());
>                            Thread.sleep(1000);
>                        }
>                    }
>                    pooledConnectionFactory.stop();
>                } catch(Throwable t) {
>                    t.printStackTrace();
>                }
>                return null;
>            }
>        });
>
>        pool.shutdown();
>        pool.awaitTermination(10, TimeUnit.SECONDS);
>
>        int count = 0;
>
>        // give it 20 seconds
>        while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) {
>            System.out.println("count " + latch.getCount());
>        }
>
>
>        container.destroy();
>
>        } finally {
>            try { if(brokerService1 != null) { brokerService1.stop();
> }} catch(Throwable t) { t.printStackTrace(); }
>            try { if(brokerService2 != null) { brokerService2.stop();
> }} catch(Throwable t) { t.printStackTrace(); }
>        }
>
>        if(latch.getCount() > 0) {
>            fail("latch should have gone down to 0 but was " +
> latch.getCount());
>        }
>
>    }
>
> }
>

Reply via email to