Should this work in 5.3.0, or do we need to wait for 5.4.0 to be released?  





Gary Tully wrote:
> 
> Ok. there is a problem and a workaround. see:
> https://issues.apache.org/activemq/browse/AMQ-2610
> The test case[1] attached to the jira works as expected with the
> workaround.
> The tests is based on the code posted by Scot.
> 
> As the file cursor and the queue share the same usage, (they are split for
> vmcursors), the key to ensuring the cursor limit kicks in before the queue
> memory limit (and producer flow controll) is to configure a cursor,
> policy.setCursorMemoryHighWaterMark(50); which is less than the 70% value
> used by the Queue. This ensures that it will spool messages to disk once
> 50%
> of the system usage is reached.
> 
> Have a peek at the test case, [1]
> https://issues.apache.org/activemq/secure/attachment/18932/UnlimitedEnqueueTest.java
> 
> 
> On 16 February 2010 11:38, Gary Tully <gary.tu...@gmail.com> wrote:
> 
>> There is something not right here. let me build a test case to
>> investigate
>> a bit.
>>
>>
>> On 16 February 2010 01:19, scot.hale <scot.h...@gmail.com> wrote:
>>
>>>
>>> A.) I tried using the FilePendingQueueMessageStoragePolicy.  I assume
>>> that
>>> this needs to be added to the queue destination policy specifically.
>>> However I added it to default and Topic just to be sure (not shown
>>> here).
>>>
>>> I turned on flow control, but was unable to figure out what memory
>>> settings
>>> are needed.  What I gathered from your post is that I need to set the
>>> queue
>>> destination memory limit higher than the default SystemUsage memory
>>> limit.
>>> Is that right?  For example:
>>>
>>>
>>>
>>>
>>> brokerService.getSystemUsage().getMemoryUsage().setLimit(32*1024*1024);
>>>
>>> brokerService.getSystemUsage().getTempUsage().setLimit(128*1024*1024);
>>>
>>>        PolicyMap policyMap = new PolicyMap();
>>>
>>>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>>>
>>>        PolicyEntry policy = new PolicyEntry();
>>>        policy.setProducerFlowControl(true);
>>>         policy.setPendingQueuePolicy(new
>>> FilePendingQueueMessageStoragePolicy());
>>>        policy.setQueue(">");
>>>        policy.setMemoryLimit(64*1024*1024);
>>>         entries.add(policy);
>>>        policyMap.setPolicyEntries(entries);
>>>
>>>        brokerService.setDestinationPolicy(policyMap);
>>>
>>>
>>> I tried it the other way around as well and it still stops (meaning
>>> producers are blocked or resourceAllocationExceptions are thrown from
>>> Queue.send()) when it gets to the lower of the two memory limits. I am
>>> definitely missing something.
>>>
>>> B.) Would using the StorePendingQueueMessageStoragePolicy provide the
>>> same
>>> behavior I am looking for?
>>>
>>> C.) I didn't understand the last sentence in your post.  Does this mean
>>> that
>>> when the brokerService.getSystemUsage().getTempUsage() is the disk usage
>>> limit that should generate ResourceAllocationExceptions (assuming
>>> SendFailIfNoSpace is set to true)?  In my configureation, it would mean
>>> when
>>> the 128MB is used up by the temp cursor references on disk, then no more
>>> resources are available?
>>>
>>>
>>>
>>>
>>>
>>>
>>> Gary Tully wrote:
>>> >
>>> > First thing is you need to use the FilePendingMessageStoragePolicy()
>>> as
>>> > that
>>> > will off load message references to the file system when
>>> > SystemUsage.MemoryUsage limit is reached.
>>> >
>>> > So 1) add the following to the broker policy entry
>>> >         PendingQueueMessageStoragePolicy pendingQueuePolicy = new
>>> > FilePendingQueueMessageStoragePolicy();
>>> >         policy.setPendingQueuePolicy(pendingQueuePolicy);
>>> >
>>> > With flow control on, you need to configure a lower SystemUsage as the
>>> use
>>> > of disk space by the file based cursors is determined by the shared
>>> > SystemUsage.memoryLimit, which by default is the same value as the
>>> memory
>>> > limit for a destination. With a single destination, the flowcontroll
>>> kicks
>>> > in before the system usage so no spooling to disk occurs.
>>> >
>>> > 2) Configure a SystemUsage.MemoryLimit that is less than the default
>>> > destination memory limit of 64M
>>> >    brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 *
>>> 1024
>>> *
>>> > 63);
>>> >
>>> > This should do it once you add a TempStore() limit to implement 5.
>>> >
>>> >
>>> > On 15 February 2010 17:22, scot.hale <scot.h...@gmail.com> wrote:
>>> >
>>> >>
>>> >> I am trying to setup a queue with the following requirements:
>>> >>
>>> >>
>>> >> ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
>>> >> 1. ) VM Transport
>>> >> 2. ) Persistent with KahaPersistenceAdaptor
>>> >> 4. ) JVM Memory usage is capped at something like 64MB
>>> >>        - When this limit is reached the producers should continue to
>>> >> store
>>> >> incoming messages to disk  (StoreBasedCursor or FileBasedCursor will
>>> >> work,
>>> >> since the former is the default that is the one I have been using.)
>>> >> 5. ) File System usage is capped at something like 10GB
>>> >>        - When this limit is reached the producers should start
>>> throwing
>>> >> javax.jms.ResourceAllocationExceptions to the Producers
>>> >>
>>> >> Number 5 is the least important, as it will be difficult to fill up
>>> disk
>>> >> space in production. My current setup configures ActiveMQ
>>> >> programatically.
>>> >> I don't think this is introducing problems, let me know if there are
>>> >> issues
>>> >> with programatic configuration.
>>> >>
>>> >>
>>> >> Default settings:
>>> >>        If I do not configure the SystemUsage or the Flow control,
>>> then
>>> >> 64MB
>>> >> default memory usage is reached and the producers are halted even
>>> though
>>> >> the
>>> >> queues are persistent and have much more space.  Should the default
>>> >> StoreBasedCursor behave this way?
>>> >>
>>> >>
>>> >> Turn off Flow Control:
>>> >>        When I turn off Flow Control with default SystemUseage
>>> settings,
>>> >> then the
>>> >> JVM memory is not capped.  After about 5 million messages with no
>>> >> consumers
>>> >> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.
>>> >>
>>> >>
>>> >> So what setting do I need to cap the memory and allow the messages to
>>> be
>>> >> stored to disk even when the cap is reached?
>>> >>
>>> >>
>>> >>
>>> >> This is how I programtically configure my BrokerService
>>> >>
>>> >>        System.setProperty("defaultBinSize", "16384");//Only way to
>>> set
>>> >> HashIndex bin size for KahaPersistenceAdapter
>>> >>        try {
>>> >>            uri = new URI("vm://"+brokerName);
>>> >>        } catch (URISyntaxException e) {
>>> >>            throw new RuntimeException(e);
>>> >>        }
>>> >>        brokerService = new BrokerService();
>>> >>        brokerService.setBrokerName(brokerName);
>>> >>        brokerService.setUseJmx(true);
>>> >>        brokerService.setUseLoggingForShutdownErrors(true);
>>> >>
>>> >>
>>> >>        PolicyMap policyMap = new PolicyMap();
>>> >>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>>> >>        PolicyEntry policy = new PolicyEntry();
>>> >>        policy.setProducerFlowControl(true);
>>> >>        policy.setQueue(">");
>>> >>        entries.add(policy);
>>> >>        policyMap.setPolicyEntries(entries);
>>> >>        brokerService.setDestinationPolicy(policyMap);
>>> >>
>>> >>        //PERSISTENCE
>>> >>        brokerService.setPersistent(true);
>>> >>        KahaPersistenceAdapter persistenceAdapter = new
>>> >> KahaPersistenceAdapter();
>>> >>        persistenceAdapter.setDirectory(new
>>> >> File("/tmp/activemq-"+brokerName+"/kaha"));
>>> >>        brokerService.setDataDirectoryFile(new
>>> >> File("/tmp/activemq-"+brokerName+"/data"));
>>> >>        brokerService.setTmpDataDirectory(new
>>> >> File("/tmp/activemq-"+brokerName+"/temp"));
>>> >>        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
>>> >>
>>> >>        try {
>>> >>            brokerService.setPersistenceAdapter(persistenceAdapter);
>>> >>        } catch (IOException e) {
>>> >>            throw new RuntimeException(e);
>>> >>        }
>>> >>        try {
>>> >>            brokerService.getSystemUsage().setSendFailIfNoSpace(true);
>>> >>            brokerService.addConnector(uri);
>>> >>            brokerService.start();
>>> >>        } catch (Exception e) {
>>> >>            throw new RuntimeException(e);
>>> >>        }
>>> >>
>>> >>
>>> >>
>>> >> Here is a Producer:
>>> >>
>>> >> public class Producer implements Runnable{
>>> >>
>>> >>    private BrokerService brokerService;
>>> >>    private long numberOfMessages;
>>> >>
>>> >>    public Producer(BrokerService brokerService, long n){
>>> >>        this.brokerService = brokerService;
>>> >>        this.numberOfMessages = n;
>>> >>    }
>>> >>
>>> >>    public void run(){
>>> >>        ActiveMQConnectionFactory factory = new
>>> >> ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
>>> >>        try {
>>> >>            Connection conn = factory.createConnection();
>>> >>            conn.start();
>>> >>            for (int i = 0; i < numberOfMessages; i++) {
>>> >>                Session session = conn.createSession(false,
>>> >> Session.AUTO_ACKNOWLEDGE);
>>> >>                Destination destination =
>>> >> session.createQueue("test-queue");
>>> >>                MessageProducer producer =
>>> >> session.createProducer(destination);
>>> >>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>>> >>                BytesMessage message = session.createBytesMessage();
>>> >>                message.writeBytes(new
>>> >>
>>> >>
>>> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
>>> >>                try {
>>> >>                    producer.send(message);
>>> >>                } catch (ResourceAllocationException e) {
>>> >>                    e.printStackTrace();
>>> >>                }
>>> >>                session.close();
>>> >>            }
>>> >>        } catch (JMSException e) {
>>> >>            throw new RuntimeException(e);
>>> >>         }
>>> >>
>>> >>    }
>>> >> }
>>> >>
>>> >>
>>> >> rajdavies wrote:
>>> >> >
>>> >> > Hi Scott,
>>> >> >
>>> >> > just change the below config to enable flow control - i.e:
>>> >> >
>>> >> > <policyEntry topic=">" producerFlowControl="true"
>>> memoryLimit="1mb">
>>> >> >   <policyEntry queue=">" producerFlowControl="true"
>>> memoryLimit="1mb">
>>> >> >
>>> >> > in 5.3 - producerFlowControl is on by default - so just remove the
>>> >> > producerFlowControl entry from your configuration.
>>> >> >
>>> >> > If this all sounds double dutch - send in your config - and we'll
>>> help
>>> >> > with the correct settings :)
>>> >> >
>>> >> >
>>> >> > On 12 Feb 2010, at 20:49, scot.hale wrote:
>>> >> >
>>> >> >>
>>> >> >> Fred,
>>> >> >>
>>> >> >> Were you able to configure ActiveMQ to grow without surpassing the
>>> >> >> memory
>>> >> >> setting?  I am trying to figure out how to do the same thing.
>>> >> >>
>>> >> >> -Scot
>>> >> >>
>>> >> >>
>>> >> >> Fred Moore-3 wrote:
>>> >> >>>
>>> >> >>> Hi,
>>> >> >>>
>>> >> >>> going back to Cursors and
>>> >> >>>
>>> >>
>>> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
>>> >> >>> ...
>>> >> >>>
>>> >> >>> ...can anyone shed some light on the actual role of memoryLimit
>>> in:
>>> >> >>>   <policyEntry topic=">" producerFlowControl="false"
>>> >> >>> memoryLimit="1mb">
>>> >> >>>   <policyEntry queue=">" producerFlowControl="false"
>>> >> >>> memoryLimit="1mb">
>>> >> >>>
>>> >> >>> ...moreover: *when* will producerFlowControl start slowing down
>>> >> >>> consumers?
>>> >> >>>
>>> >> >>> Cheers,
>>> >> >>> F.
>>> >> >>>
>>> >> >>>
>>> >> >>
>>> >> >> --
>>> >> >> View this message in context:
>>> >> >>
>>> >>
>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
>>> >> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>> >> >>
>>> >> >
>>> >> > Rob Davies
>>> >> > http://twitter.com/rajdavies
>>> >> > I work here: http://fusesource.com
>>> >> > My Blog: http://rajdavies.blogspot.com/
>>> >> > I'm writing this: http://www.manning.com/snyder/
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >>
>>> >> --
>>> >> View this message in context:
>>> >>
>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
>>> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>> >>
>>> >>
>>> >
>>> >
>>> > --
>>> > http://blog.garytully.com
>>> >
>>> > Open Source Integration
>>> > http://fusesource.com
>>> >
>>> >
>>>
>>> --
>>> View this message in context:
>>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27602503.html
>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>
>>>
>>
>>
>> --
>> http://blog.garytully.com
>>
>> Open Source Integration
>> http://fusesource.com
>>
> 
> 
> 
> -- 
> http://blog.garytully.com
> 
> Open Source Integration
> http://fusesource.com
> 
> 

-- 
View this message in context: 
http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27615569.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to