I am trying to setup a queue with the following requirements:

ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
1. ) VM Transport
2. ) Persistent with KahaPersistenceAdaptor 
4. ) JVM Memory usage is capped at something like 64MB
        - When this limit is reached the producers should continue to store
incoming messages to disk  (StoreBasedCursor or FileBasedCursor will work,
since the former is the default that is the one I have been using.)
5. ) File System usage is capped at something like 10GB
        - When this limit is reached the producers should start throwing
javax.jms.ResourceAllocationExceptions to the Producers

Number 5 is the least important, as it will be difficult to fill up disk
space in production. My current setup configures ActiveMQ programatically.  
I don't think this is introducing problems, let me know if there are issues
with programatic configuration.


Default settings:  
        If I do not configure the SystemUsage or the Flow control, then 64MB
default memory usage is reached and the producers are halted even though the
queues are persistent and have much more space.  Should the default
StoreBasedCursor behave this way?


Turn off Flow Control:  
        When I turn off Flow Control with default SystemUseage settings, then 
the
JVM memory is not capped.  After about 5 million messages with no consumers
the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.


So what setting do I need to cap the memory and allow the messages to be
stored to disk even when the cap is reached?  



This is how I programtically configure my BrokerService

        System.setProperty("defaultBinSize", "16384");//Only way to set
HashIndex bin size for KahaPersistenceAdapter
        try {
            uri = new URI("vm://"+brokerName);
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
        brokerService = new BrokerService();
        brokerService.setBrokerName(brokerName);
        brokerService.setUseJmx(true);
        brokerService.setUseLoggingForShutdownErrors(true);
        
        
        PolicyMap policyMap = new PolicyMap();        
        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
        PolicyEntry policy = new PolicyEntry();
        policy.setProducerFlowControl(true);
        policy.setQueue(">");
        entries.add(policy);                
        policyMap.setPolicyEntries(entries);
        brokerService.setDestinationPolicy(policyMap);
        
        //PERSISTENCE
        brokerService.setPersistent(true);
        KahaPersistenceAdapter persistenceAdapter = new
KahaPersistenceAdapter();
        persistenceAdapter.setDirectory(new
File("/tmp/activemq-"+brokerName+"/kaha"));
        brokerService.setDataDirectoryFile(new
File("/tmp/activemq-"+brokerName+"/data"));
        brokerService.setTmpDataDirectory(new
File("/tmp/activemq-"+brokerName+"/temp"));
        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
        
        try {
            brokerService.setPersistenceAdapter(persistenceAdapter);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        try {
            brokerService.getSystemUsage().setSendFailIfNoSpace(true);
            brokerService.addConnector(uri);
            brokerService.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }



Here is a Producer:

public class Producer implements Runnable{
    
    private BrokerService brokerService;   
    private long numberOfMessages;
    
    public Producer(BrokerService brokerService, long n){
        this.brokerService = brokerService;
        this.numberOfMessages = n;
    }
    
    public void run(){
        ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerService.getVmConnectorURI());        
        try {
            Connection conn = factory.createConnection();            
            conn.start();
            for (int i = 0; i < numberOfMessages; i++) {
                Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);                
                Destination destination = session.createQueue("test-queue");
                MessageProducer producer =
session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                BytesMessage message = session.createBytesMessage();
                message.writeBytes(new
byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
                try {
                    producer.send(message);
                } catch (ResourceAllocationException e) {
                    e.printStackTrace();
                }
                session.close();
            }
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
        
    }
}


rajdavies wrote:
> 
> Hi Scott,
> 
> just change the below config to enable flow control - i.e:
> 
> <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
>   <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
> 
> in 5.3 - producerFlowControl is on by default - so just remove the  
> producerFlowControl entry from your configuration.
> 
> If this all sounds double dutch - send in your config - and we'll help  
> with the correct settings :)
> 
> 
> On 12 Feb 2010, at 20:49, scot.hale wrote:
> 
>>
>> Fred,
>>
>> Were you able to configure ActiveMQ to grow without surpassing the  
>> memory
>> setting?  I am trying to figure out how to do the same thing.
>>
>> -Scot
>>
>>
>> Fred Moore-3 wrote:
>>>
>>> Hi,
>>>
>>> going back to Cursors and
>>> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
>>> ...
>>>
>>> ...can anyone shed some light on the actual role of memoryLimit in:
>>>   <policyEntry topic=">" producerFlowControl="false"  
>>> memoryLimit="1mb">
>>>   <policyEntry queue=">" producerFlowControl="false"  
>>> memoryLimit="1mb">
>>>
>>> ...moreover: *when* will producerFlowControl start slowing down  
>>> consumers?
>>>
>>> Cheers,
>>> F.
>>>
>>>
>>
>> -- 
>> View this message in context:
>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
> 
> Rob Davies
> http://twitter.com/rajdavies
> I work here: http://fusesource.com
> My Blog: http://rajdavies.blogspot.com/
> I'm writing this: http://www.manning.com/snyder/
> 
> 
> 
> 
> 
> 
> 

-- 
View this message in context: 
http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to