Hello, I'm trying to delete a persistent message from a large queue with removeMatchingMessages. The code works well until the memory that is used to browse the queue hits the systemUsage.memoryUsage.limit.
I've changed the LargeQueueSparseDelete test case to reproduce this problem (see below). In the Queue.java code I can see that doPageIn(true) will never page in any more messages and the loop in removeMatchingMessages will never end. It seems that the reference count for the messages in that loop is never decreased. Thanks, Heiko package org.apache.activemq.usecases; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.ConnectionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.Assert; /** * This unit test creates a fixed size queue and tries to remove some messages in the * queueThe test is used to verify the performance of * {@link org.apache.activemq.broker.region.Queue#removeMatchingMessages(org.apache.activemq.broker.ConnectionContext, String, int)}. */ public class LargeQueueLowMemoryRemoveTest extends EmbeddedBrokerTestSupport { private static final Logger LOG = LoggerFactory.getLogger(LargeQueueLowMemoryRemoveTest.class); /** * {@inheritDoc} */ @Override protected void setUp() throws Exception { super.useTopic = false; super.setUp(); } /** * @return whether or not persistence should be used */ protected boolean isPersistent() { return true; } public void testRemoveMessages() throws Exception { final int QUEUE_SIZE = 30000; final int QUEUE_CHUNK_SIZE = 100; final long TEST_TIMEOUT = 6000; // Populate a test queue with uniquely-identifiable messages. Connection conn = createConnection(); // Set the maximum memory to be used by the broker: this.broker.getSystemUsage().getMemoryUsage().setLimit(10000000); this.broker.getSystemUsage().getStoreUsage().setLimit( 200000000); this.broker.getSystemUsage().getTempUsage().setLimit( 300000000); int count = 0; conn.start(); while (count < QUEUE_SIZE) { try { Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 0; i < QUEUE_CHUNK_SIZE && count < QUEUE_SIZE; i++) { Message message = session.createMessage(); message.setIntProperty("id", count); producer.send(message); count += 1; } LOG.info("count = " + count); session.close(); } finally { } } conn.close(); // Access the implementation of the test queue and remove a message. Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get( destination); ConnectionContext context = new ConnectionContext( new NonCachedMessageEvaluationContext()); context.setBroker(broker.getBroker()); context.getMessageEvaluationContext().setDestination(destination); long startTimeMillis = System.currentTimeMillis(); Assert.assertEquals(1, queue.removeMatchingMessages("id=" + (QUEUE_SIZE - 1))); // Never reached this end, if the memory limit is lower than the queue size. long durationMillis = System.currentTimeMillis() - startTimeMillis; LOG.info("It took " + durationMillis + "ms to remove the last message from a queue a " + QUEUE_SIZE + " messages."); } }