First thing is you need to use the FilePendingMessageStoragePolicy() as that
will off load message references to the file system when
SystemUsage.MemoryUsage limit is reached.

So 1) add the following to the broker policy entry
        PendingQueueMessageStoragePolicy pendingQueuePolicy = new
FilePendingQueueMessageStoragePolicy();
        policy.setPendingQueuePolicy(pendingQueuePolicy);

With flow control on, you need to configure a lower SystemUsage as the use
of disk space by the file based cursors is determined by the shared
SystemUsage.memoryLimit, which by default is the same value as the memory
limit for a destination. With a single destination, the flowcontroll kicks
in before the system usage so no spooling to disk occurs.

2) Configure a SystemUsage.MemoryLimit that is less than the default
destination memory limit of 64M
   brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 *
63);

This should do it once you add a TempStore() limit to implement 5.


On 15 February 2010 17:22, scot.hale <scot.h...@gmail.com> wrote:

>
> I am trying to setup a queue with the following requirements:
>
>
> ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
> 1. ) VM Transport
> 2. ) Persistent with KahaPersistenceAdaptor
> 4. ) JVM Memory usage is capped at something like 64MB
>        - When this limit is reached the producers should continue to store
> incoming messages to disk  (StoreBasedCursor or FileBasedCursor will work,
> since the former is the default that is the one I have been using.)
> 5. ) File System usage is capped at something like 10GB
>        - When this limit is reached the producers should start throwing
> javax.jms.ResourceAllocationExceptions to the Producers
>
> Number 5 is the least important, as it will be difficult to fill up disk
> space in production. My current setup configures ActiveMQ programatically.
> I don't think this is introducing problems, let me know if there are issues
> with programatic configuration.
>
>
> Default settings:
>        If I do not configure the SystemUsage or the Flow control, then 64MB
> default memory usage is reached and the producers are halted even though
> the
> queues are persistent and have much more space.  Should the default
> StoreBasedCursor behave this way?
>
>
> Turn off Flow Control:
>        When I turn off Flow Control with default SystemUseage settings,
> then the
> JVM memory is not capped.  After about 5 million messages with no consumers
> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.
>
>
> So what setting do I need to cap the memory and allow the messages to be
> stored to disk even when the cap is reached?
>
>
>
> This is how I programtically configure my BrokerService
>
>        System.setProperty("defaultBinSize", "16384");//Only way to set
> HashIndex bin size for KahaPersistenceAdapter
>        try {
>            uri = new URI("vm://"+brokerName);
>        } catch (URISyntaxException e) {
>            throw new RuntimeException(e);
>        }
>        brokerService = new BrokerService();
>        brokerService.setBrokerName(brokerName);
>        brokerService.setUseJmx(true);
>        brokerService.setUseLoggingForShutdownErrors(true);
>
>
>        PolicyMap policyMap = new PolicyMap();
>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>        PolicyEntry policy = new PolicyEntry();
>        policy.setProducerFlowControl(true);
>        policy.setQueue(">");
>        entries.add(policy);
>        policyMap.setPolicyEntries(entries);
>        brokerService.setDestinationPolicy(policyMap);
>
>        //PERSISTENCE
>        brokerService.setPersistent(true);
>        KahaPersistenceAdapter persistenceAdapter = new
> KahaPersistenceAdapter();
>        persistenceAdapter.setDirectory(new
> File("/tmp/activemq-"+brokerName+"/kaha"));
>        brokerService.setDataDirectoryFile(new
> File("/tmp/activemq-"+brokerName+"/data"));
>        brokerService.setTmpDataDirectory(new
> File("/tmp/activemq-"+brokerName+"/temp"));
>        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
>
>        try {
>            brokerService.setPersistenceAdapter(persistenceAdapter);
>        } catch (IOException e) {
>            throw new RuntimeException(e);
>        }
>        try {
>            brokerService.getSystemUsage().setSendFailIfNoSpace(true);
>            brokerService.addConnector(uri);
>            brokerService.start();
>        } catch (Exception e) {
>            throw new RuntimeException(e);
>        }
>
>
>
> Here is a Producer:
>
> public class Producer implements Runnable{
>
>    private BrokerService brokerService;
>    private long numberOfMessages;
>
>    public Producer(BrokerService brokerService, long n){
>        this.brokerService = brokerService;
>        this.numberOfMessages = n;
>    }
>
>    public void run(){
>        ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
>        try {
>            Connection conn = factory.createConnection();
>            conn.start();
>            for (int i = 0; i < numberOfMessages; i++) {
>                Session session = conn.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>                Destination destination = session.createQueue("test-queue");
>                MessageProducer producer =
> session.createProducer(destination);
>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>                BytesMessage message = session.createBytesMessage();
>                message.writeBytes(new
>
> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
>                try {
>                    producer.send(message);
>                } catch (ResourceAllocationException e) {
>                    e.printStackTrace();
>                }
>                session.close();
>            }
>        } catch (JMSException e) {
>            throw new RuntimeException(e);
>         }
>
>    }
> }
>
>
> rajdavies wrote:
> >
> > Hi Scott,
> >
> > just change the below config to enable flow control - i.e:
> >
> > <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
> >   <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
> >
> > in 5.3 - producerFlowControl is on by default - so just remove the
> > producerFlowControl entry from your configuration.
> >
> > If this all sounds double dutch - send in your config - and we'll help
> > with the correct settings :)
> >
> >
> > On 12 Feb 2010, at 20:49, scot.hale wrote:
> >
> >>
> >> Fred,
> >>
> >> Were you able to configure ActiveMQ to grow without surpassing the
> >> memory
> >> setting?  I am trying to figure out how to do the same thing.
> >>
> >> -Scot
> >>
> >>
> >> Fred Moore-3 wrote:
> >>>
> >>> Hi,
> >>>
> >>> going back to Cursors and
> >>>
> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
> >>> ...
> >>>
> >>> ...can anyone shed some light on the actual role of memoryLimit in:
> >>>   <policyEntry topic=">" producerFlowControl="false"
> >>> memoryLimit="1mb">
> >>>   <policyEntry queue=">" producerFlowControl="false"
> >>> memoryLimit="1mb">
> >>>
> >>> ...moreover: *when* will producerFlowControl start slowing down
> >>> consumers?
> >>>
> >>> Cheers,
> >>> F.
> >>>
> >>>
> >>
> >> --
> >> View this message in context:
> >>
> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >>
> >
> > Rob Davies
> > http://twitter.com/rajdavies
> > I work here: http://fusesource.com
> > My Blog: http://rajdavies.blogspot.com/
> > I'm writing this: http://www.manning.com/snyder/
> >
> >
> >
> >
> >
> >
> >
>
> --
> View this message in context:
> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Reply via email to