Greetings all,

After some weeks of scratching my head, I _believe_ I have found the
magic combination that appears to be causing messages to become stuck
in our network of brokers. It's so convoluted that it's entirely
likely that the error isn't what I think it is but I have managed to
create a test case that mirrors the behaviour we're seeing live. And
it goes something like this:

We have two brokers in a network of brokers. Producers were publishing
to a queue on one of the brokers and consumers reading off the queue
in the other broker. After a short while, messages would suddenly pile
up on the 'producer' broker and not get read off on the 'consumer'
broker.

Pause for lots of random testing...

It appeared that the network bridge subscription from the 'consumer'
broker was disappearing from the 'producer' broker, causing the pile
up.

More testing later...

And it looks like if we have a DefaultMessageListenerContainer with
the following configuration:

maxMessagesPerTask: 1
cacheLevel: CONSUMER
maxConcurrentConsumer: 3 (more than 1, basically)
concurrentConsumers: 1
sessionAcknowledgeMode: Session.AUTO_ACKNOWLEDGE

When Spring scales back the dynamic amount of consumers, the network
subscription appears to get lost and messages pile up on the producer
side.

Workaround:

Use concurrentConsumers instead of maxConcurrentConsumers so that
there are a static number of consumers (not setting maxMessagesPerTask
also seems to work).

I'll add it this to Jira if you'd like but the test case to replicate
the behaviour is as follows:


package org.example.activemq;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import junit.framework.TestCase;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

public class NetworkTest extends TestCase {

    public void testNetworkOfBrokers() throws Exception {
        BrokerService brokerService1 = null;
        BrokerService brokerService2 = null;

        final int total = 100;
        final CountDownLatch latch = new CountDownLatch(total);

        try {

        {
            brokerService1 = new BrokerService();
            brokerService1.setBrokerName("one");
            brokerService1.setUseJmx(false);
            brokerService1.setPersistenceAdapter(new
MemoryPersistenceAdapter());
            brokerService1.addConnector("tcp://0.0.0.0:61616");
            NetworkConnector network1 =
brokerService1.addNetworkConnector("static:(tcp://localhost:51515)");
            network1.setName("network1");
            network1.setDynamicOnly(true);
            network1.setNetworkTTL(3);
            network1.setPrefetchSize(1);
            brokerService1.start();
        }

        {
            brokerService2 = new BrokerService();
            brokerService2.setBrokerName("two");
            brokerService2.setUseJmx(false);
            brokerService2.setPersistenceAdapter(new
MemoryPersistenceAdapter());
            brokerService2.addConnector("tcp://0.0.0.0:51515");
            NetworkConnector network2 =
brokerService2.addNetworkConnector("static:(tcp://localhost:61616)");
            network2.setName("network1");
            network2.setDynamicOnly(true);
            network2.setNetworkTTL(3);
            network2.setPrefetchSize(1);
            brokerService2.start();
        }

        ExecutorService pool = Executors.newSingleThreadExecutor();

        ActiveMQConnectionFactory connectionFactory1 = new
ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:51515)?randomize=false");


        final DefaultMessageListenerContainer container = new
DefaultMessageListenerContainer();
        container.setConnectionFactory(connectionFactory1);
        container.setMaxConcurrentConsumers(10);
        container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
        container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
        container.setDestination(new ActiveMQQueue("testingqueue"));
        container.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        container.setMaxMessagesPerTask(1);
        container.afterPropertiesSet();
        container.start();

        pool.submit(new Callable<Object>() {
            public Object call() throws Exception {
                try {
                    final int batch = 10;
                    ActiveMQConnectionFactory connectionFactory2 = new
ActiveMQConnectionFactory("failover:(tcp://localhost:51515,tcp://localhost:61616)?randomize=false");
                    PooledConnectionFactory pooledConnectionFactory =
new PooledConnectionFactory(connectionFactory2);
                    JmsTemplate template = new
JmsTemplate(pooledConnectionFactory);
                    ActiveMQQueue queue = new ActiveMQQueue("testingqueue");
                    for(int b = 0; b < batch; b++) {
                        for(int i = 0; i < (total / batch); i++) {
                            template.send(queue, new MessageCreator() {
                                public Message createMessage(Session
session) throws JMSException {
                                    TextMessage message =
session.createTextMessage();
                                    message.setText("Hello World!");
                                    return message;
                                }
                            });
                        }
                        // give spring time to scale back again
                        while(container.getActiveConsumerCount() > 1) {
                            System.out.println("active consumer count:
" + container.getActiveConsumerCount());
                            System.out.println("concurrent consumer
count: " + container.getConcurrentConsumers());
                            Thread.sleep(1000);
                        }
                    }
                    pooledConnectionFactory.stop();
                } catch(Throwable t) {
                    t.printStackTrace();
                }
                return null;
            }
        });

        pool.shutdown();
        pool.awaitTermination(10, TimeUnit.SECONDS);

        int count = 0;

        // give it 20 seconds
        while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) {
            System.out.println("count " + latch.getCount());
        }


        container.destroy();

        } finally {
            try { if(brokerService1 != null) { brokerService1.stop();
}} catch(Throwable t) { t.printStackTrace(); }
            try { if(brokerService2 != null) { brokerService2.stop();
}} catch(Throwable t) { t.printStackTrace(); }
        }

        if(latch.getCount() > 0) {
            fail("latch should have gone down to 0 but was " +
latch.getCount());
        }

    }

}

Reply via email to