I have a multi threaded consumer setup... all message consumption is asynchronous

the consumers both the dispatch consumer and the "child" consumers are setup like this

 public synchronized void startJMS() throws JMSException {
        try {
            connection = createQueueConnection(queueName, "0");
session = createQueueSession(connection, queueName, true, Session.AUTO_ACKNOWLEDGE);
            queue = createQueue(session, queueName);
            receiver = createReceiver(session, queue, this);
            queueCount = Integer.parseInt(idList.get(0));

            for (int i = 0; i<queueCount; i++) {
                Integer anIntId = new Integer(i);
QueueConnection childConnection = createQueueConnection(queueName, String.valueOf(i+1)); QueueSession childSession = createQueueSession (childConnection, queueName, true, Session.AUTO_ACKNOWLEDGE);
                //QueueSession childSession = session;
                Queue tempQueue = childSession.createTemporaryQueue();
                // attach message listener to the queue
MessageListener dataProcessor = new TTDataProcessor (controller, childSession, new Integer(i+1), idList);
                createReceiver(childSession, tempQueue, dataProcessor);
                queueMap.put(anIntId, tempQueue);
                childConnection.start();
            }

            connection.start();
        } catch (JMSException e) {
            closeReceiver(receiver);
            closeSession(session);
            closeConnection(connection);
            throw e;
        }
    }

note: I have a separate connection/session for all consumers

main dispatch consumer does the following with a message

                    QueueSender sender = session.createSender(null);
                    sender.send(childQueue, message);
                    sender.setDeliveryMode(DeliveryMode.PERSISTENT);
if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
                        acknowledgeMessage(message, messageId);
                    }
log.logMessage("Commiting session", ILogPriority.IMPORTANT);
                    session.commit();

The web console shows 1050 messages pending. If I restart my application nothing is consumed.

If I use JMeter to push 1000 messages into the main dispatch queue message consumption begins

Upon blasting 1000 messages into the queue queue consumption begins
then afterwards Hermes shows that there are 2050 messages on the queue.

In my mind there's something wrong when there are 2050 messages on the queue and no messages are delivered upon restart of my application but message delivery begins to work once I start pumping more messages into the queue but it seems nothing is ever removed from the queue which is very strange.

this is with 5.2.0

I've seen others report similar behaviour in archived threads so is there an explanation for this?


On Sep 15, 2009, at 12:56 AM, Rob Davies wrote:


On 15 Sep 2009, at 06:42, Robert Nicholson wrote:

So, I have a pretty standard JMS message listener but when using ActiveMQ it often just doesn't receive any messages even when there are messages on the queue. The Session is a transacted one with AUTO_ACKNOWLEDGE but for some reason the messages are never taken off the queue.

I believe AUTO_ACKNOWLEDGE is ignored and eventually it says SESSION_TRANSACTED but what I don't understand is upon committing the session the messages are still on the queue.

Why is that?





Er - not sure - can you share your code  - or provide a test case ?

cheers,

Rob

Rob Davies
http://twitter.com/rajdavies
I work here: http://fusesource.com
My Blog: http://rajdavies.blogspot.com/
I'm writing this: http://www.manning.com/snyder/






Reply via email to