Hello,
I'm trying to delete a persistent message from a large queue with 
removeMatchingMessages. The code works well until the memory that is used to 
browse the queue hits the systemUsage.memoryUsage.limit.

I've changed the LargeQueueSparseDelete test case to reproduce this problem 
(see below). In the Queue.java code I can see that doPageIn(true) will never 
page in any more messages and the loop in removeMatchingMessages will never 
end. It seems that the reference count for the messages in that loop is never 
decreased.

Thanks,
Heiko

package org.apache.activemq.usecases;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.ConnectionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Assert;

/**
 * This unit test creates a fixed size queue and tries to remove some messages 
in the
 * queueThe test is used to verify the performance of
 * {@link 
org.apache.activemq.broker.region.Queue#removeMatchingMessages(org.apache.activemq.broker.ConnectionContext,
 String, int)}.
 */
public class LargeQueueLowMemoryRemoveTest extends EmbeddedBrokerTestSupport {
    private static final Logger LOG = 
LoggerFactory.getLogger(LargeQueueLowMemoryRemoveTest.class);

    /**
     * {@inheritDoc}
     */
    @Override
    protected void setUp() throws Exception {
        super.useTopic = false;
        super.setUp();
    }

    /**
     * @return whether or not persistence should be used
     */
    protected boolean isPersistent() {
        return true;
    }


    public void testRemoveMessages() throws Exception {
        final int QUEUE_SIZE = 30000;
        final int QUEUE_CHUNK_SIZE = 100;
        final long TEST_TIMEOUT = 6000;

        // Populate a test queue with uniquely-identifiable messages.
        Connection conn = createConnection();
        
        // Set the maximum memory to be used by the broker:
        this.broker.getSystemUsage().getMemoryUsage().setLimit(10000000);
        this.broker.getSystemUsage().getStoreUsage().setLimit( 200000000);
        this.broker.getSystemUsage().getTempUsage().setLimit(  300000000);

        int count = 0;
        conn.start();
        while (count < QUEUE_SIZE) {
                try {
                    Session session = conn.createSession(false,
                            Session.AUTO_ACKNOWLEDGE);
                    MessageProducer producer = 
session.createProducer(destination);
                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                    for (int i = 0; i < QUEUE_CHUNK_SIZE && count < QUEUE_SIZE; 
i++) {
                        Message message = session.createMessage();
                        message.setIntProperty("id", count);
                        
                        producer.send(message);
                        count += 1;
                    }
                LOG.info("count =  " + count);
                session.close();
                } finally {
                }
        }
        conn.close();
        // Access the implementation of the test queue and remove a message.
        Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(
                destination);

        ConnectionContext context = new ConnectionContext(
                new NonCachedMessageEvaluationContext());
        context.setBroker(broker.getBroker());
        context.getMessageEvaluationContext().setDestination(destination);

        long startTimeMillis = System.currentTimeMillis();
        Assert.assertEquals(1,
            queue.removeMatchingMessages("id=" + (QUEUE_SIZE - 1)));

        // Never reached this end, if the memory limit is lower than the queue 
size.
        long durationMillis = System.currentTimeMillis() - startTimeMillis;

        LOG.info("It took " + durationMillis
                + "ms to remove the last message from a queue a " + QUEUE_SIZE
                + " messages.");
    }
}


Reply via email to