Should this work in 5.3.0, or do we need to wait for 5.4.0 to be released?
Gary Tully wrote: > > Ok. there is a problem and a workaround. see: > https://issues.apache.org/activemq/browse/AMQ-2610 > The test case[1] attached to the jira works as expected with the > workaround. > The tests is based on the code posted by Scot. > > As the file cursor and the queue share the same usage, (they are split for > vmcursors), the key to ensuring the cursor limit kicks in before the queue > memory limit (and producer flow controll) is to configure a cursor, > policy.setCursorMemoryHighWaterMark(50); which is less than the 70% value > used by the Queue. This ensures that it will spool messages to disk once > 50% > of the system usage is reached. > > Have a peek at the test case, [1] > https://issues.apache.org/activemq/secure/attachment/18932/UnlimitedEnqueueTest.java > > > On 16 February 2010 11:38, Gary Tully <gary.tu...@gmail.com> wrote: > >> There is something not right here. let me build a test case to >> investigate >> a bit. >> >> >> On 16 February 2010 01:19, scot.hale <scot.h...@gmail.com> wrote: >> >>> >>> A.) I tried using the FilePendingQueueMessageStoragePolicy. I assume >>> that >>> this needs to be added to the queue destination policy specifically. >>> However I added it to default and Topic just to be sure (not shown >>> here). >>> >>> I turned on flow control, but was unable to figure out what memory >>> settings >>> are needed. What I gathered from your post is that I need to set the >>> queue >>> destination memory limit higher than the default SystemUsage memory >>> limit. >>> Is that right? For example: >>> >>> >>> >>> >>> brokerService.getSystemUsage().getMemoryUsage().setLimit(32*1024*1024); >>> >>> brokerService.getSystemUsage().getTempUsage().setLimit(128*1024*1024); >>> >>> PolicyMap policyMap = new PolicyMap(); >>> >>> List<PolicyEntry> entries = new ArrayList<PolicyEntry>(); >>> >>> PolicyEntry policy = new PolicyEntry(); >>> policy.setProducerFlowControl(true); >>> policy.setPendingQueuePolicy(new >>> FilePendingQueueMessageStoragePolicy()); >>> policy.setQueue(">"); >>> policy.setMemoryLimit(64*1024*1024); >>> entries.add(policy); >>> policyMap.setPolicyEntries(entries); >>> >>> brokerService.setDestinationPolicy(policyMap); >>> >>> >>> I tried it the other way around as well and it still stops (meaning >>> producers are blocked or resourceAllocationExceptions are thrown from >>> Queue.send()) when it gets to the lower of the two memory limits. I am >>> definitely missing something. >>> >>> B.) Would using the StorePendingQueueMessageStoragePolicy provide the >>> same >>> behavior I am looking for? >>> >>> C.) I didn't understand the last sentence in your post. Does this mean >>> that >>> when the brokerService.getSystemUsage().getTempUsage() is the disk usage >>> limit that should generate ResourceAllocationExceptions (assuming >>> SendFailIfNoSpace is set to true)? In my configureation, it would mean >>> when >>> the 128MB is used up by the temp cursor references on disk, then no more >>> resources are available? >>> >>> >>> >>> >>> >>> >>> Gary Tully wrote: >>> > >>> > First thing is you need to use the FilePendingMessageStoragePolicy() >>> as >>> > that >>> > will off load message references to the file system when >>> > SystemUsage.MemoryUsage limit is reached. >>> > >>> > So 1) add the following to the broker policy entry >>> > PendingQueueMessageStoragePolicy pendingQueuePolicy = new >>> > FilePendingQueueMessageStoragePolicy(); >>> > policy.setPendingQueuePolicy(pendingQueuePolicy); >>> > >>> > With flow control on, you need to configure a lower SystemUsage as the >>> use >>> > of disk space by the file based cursors is determined by the shared >>> > SystemUsage.memoryLimit, which by default is the same value as the >>> memory >>> > limit for a destination. With a single destination, the flowcontroll >>> kicks >>> > in before the system usage so no spooling to disk occurs. >>> > >>> > 2) Configure a SystemUsage.MemoryLimit that is less than the default >>> > destination memory limit of 64M >>> > brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 * >>> 1024 >>> * >>> > 63); >>> > >>> > This should do it once you add a TempStore() limit to implement 5. >>> > >>> > >>> > On 15 February 2010 17:22, scot.hale <scot.h...@gmail.com> wrote: >>> > >>> >> >>> >> I am trying to setup a queue with the following requirements: >>> >> >>> >> >>> >> ActiveMQ 5.1 or 5.3 ( I have been testing with 5.3 ) >>> >> 1. ) VM Transport >>> >> 2. ) Persistent with KahaPersistenceAdaptor >>> >> 4. ) JVM Memory usage is capped at something like 64MB >>> >> - When this limit is reached the producers should continue to >>> >> store >>> >> incoming messages to disk (StoreBasedCursor or FileBasedCursor will >>> >> work, >>> >> since the former is the default that is the one I have been using.) >>> >> 5. ) File System usage is capped at something like 10GB >>> >> - When this limit is reached the producers should start >>> throwing >>> >> javax.jms.ResourceAllocationExceptions to the Producers >>> >> >>> >> Number 5 is the least important, as it will be difficult to fill up >>> disk >>> >> space in production. My current setup configures ActiveMQ >>> >> programatically. >>> >> I don't think this is introducing problems, let me know if there are >>> >> issues >>> >> with programatic configuration. >>> >> >>> >> >>> >> Default settings: >>> >> If I do not configure the SystemUsage or the Flow control, >>> then >>> >> 64MB >>> >> default memory usage is reached and the producers are halted even >>> though >>> >> the >>> >> queues are persistent and have much more space. Should the default >>> >> StoreBasedCursor behave this way? >>> >> >>> >> >>> >> Turn off Flow Control: >>> >> When I turn off Flow Control with default SystemUseage >>> settings, >>> >> then the >>> >> JVM memory is not capped. After about 5 million messages with no >>> >> consumers >>> >> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors. >>> >> >>> >> >>> >> So what setting do I need to cap the memory and allow the messages to >>> be >>> >> stored to disk even when the cap is reached? >>> >> >>> >> >>> >> >>> >> This is how I programtically configure my BrokerService >>> >> >>> >> System.setProperty("defaultBinSize", "16384");//Only way to >>> set >>> >> HashIndex bin size for KahaPersistenceAdapter >>> >> try { >>> >> uri = new URI("vm://"+brokerName); >>> >> } catch (URISyntaxException e) { >>> >> throw new RuntimeException(e); >>> >> } >>> >> brokerService = new BrokerService(); >>> >> brokerService.setBrokerName(brokerName); >>> >> brokerService.setUseJmx(true); >>> >> brokerService.setUseLoggingForShutdownErrors(true); >>> >> >>> >> >>> >> PolicyMap policyMap = new PolicyMap(); >>> >> List<PolicyEntry> entries = new ArrayList<PolicyEntry>(); >>> >> PolicyEntry policy = new PolicyEntry(); >>> >> policy.setProducerFlowControl(true); >>> >> policy.setQueue(">"); >>> >> entries.add(policy); >>> >> policyMap.setPolicyEntries(entries); >>> >> brokerService.setDestinationPolicy(policyMap); >>> >> >>> >> //PERSISTENCE >>> >> brokerService.setPersistent(true); >>> >> KahaPersistenceAdapter persistenceAdapter = new >>> >> KahaPersistenceAdapter(); >>> >> persistenceAdapter.setDirectory(new >>> >> File("/tmp/activemq-"+brokerName+"/kaha")); >>> >> brokerService.setDataDirectoryFile(new >>> >> File("/tmp/activemq-"+brokerName+"/data")); >>> >> brokerService.setTmpDataDirectory(new >>> >> File("/tmp/activemq-"+brokerName+"/temp")); >>> >> persistenceAdapter.setMaxDataFileLength(500L*1024*1024); >>> >> >>> >> try { >>> >> brokerService.setPersistenceAdapter(persistenceAdapter); >>> >> } catch (IOException e) { >>> >> throw new RuntimeException(e); >>> >> } >>> >> try { >>> >> brokerService.getSystemUsage().setSendFailIfNoSpace(true); >>> >> brokerService.addConnector(uri); >>> >> brokerService.start(); >>> >> } catch (Exception e) { >>> >> throw new RuntimeException(e); >>> >> } >>> >> >>> >> >>> >> >>> >> Here is a Producer: >>> >> >>> >> public class Producer implements Runnable{ >>> >> >>> >> private BrokerService brokerService; >>> >> private long numberOfMessages; >>> >> >>> >> public Producer(BrokerService brokerService, long n){ >>> >> this.brokerService = brokerService; >>> >> this.numberOfMessages = n; >>> >> } >>> >> >>> >> public void run(){ >>> >> ActiveMQConnectionFactory factory = new >>> >> ActiveMQConnectionFactory(brokerService.getVmConnectorURI()); >>> >> try { >>> >> Connection conn = factory.createConnection(); >>> >> conn.start(); >>> >> for (int i = 0; i < numberOfMessages; i++) { >>> >> Session session = conn.createSession(false, >>> >> Session.AUTO_ACKNOWLEDGE); >>> >> Destination destination = >>> >> session.createQueue("test-queue"); >>> >> MessageProducer producer = >>> >> session.createProducer(destination); >>> >> producer.setDeliveryMode(DeliveryMode.PERSISTENT); >>> >> BytesMessage message = session.createBytesMessage(); >>> >> message.writeBytes(new >>> >> >>> >> >>> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,}); >>> >> try { >>> >> producer.send(message); >>> >> } catch (ResourceAllocationException e) { >>> >> e.printStackTrace(); >>> >> } >>> >> session.close(); >>> >> } >>> >> } catch (JMSException e) { >>> >> throw new RuntimeException(e); >>> >> } >>> >> >>> >> } >>> >> } >>> >> >>> >> >>> >> rajdavies wrote: >>> >> > >>> >> > Hi Scott, >>> >> > >>> >> > just change the below config to enable flow control - i.e: >>> >> > >>> >> > <policyEntry topic=">" producerFlowControl="true" >>> memoryLimit="1mb"> >>> >> > <policyEntry queue=">" producerFlowControl="true" >>> memoryLimit="1mb"> >>> >> > >>> >> > in 5.3 - producerFlowControl is on by default - so just remove the >>> >> > producerFlowControl entry from your configuration. >>> >> > >>> >> > If this all sounds double dutch - send in your config - and we'll >>> help >>> >> > with the correct settings :) >>> >> > >>> >> > >>> >> > On 12 Feb 2010, at 20:49, scot.hale wrote: >>> >> > >>> >> >> >>> >> >> Fred, >>> >> >> >>> >> >> Were you able to configure ActiveMQ to grow without surpassing the >>> >> >> memory >>> >> >> setting? I am trying to figure out how to do the same thing. >>> >> >> >>> >> >> -Scot >>> >> >> >>> >> >> >>> >> >> Fred Moore-3 wrote: >>> >> >>> >>> >> >>> Hi, >>> >> >>> >>> >> >>> going back to Cursors and >>> >> >>> >>> >> >>> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html >>> >> >>> ... >>> >> >>> >>> >> >>> ...can anyone shed some light on the actual role of memoryLimit >>> in: >>> >> >>> <policyEntry topic=">" producerFlowControl="false" >>> >> >>> memoryLimit="1mb"> >>> >> >>> <policyEntry queue=">" producerFlowControl="false" >>> >> >>> memoryLimit="1mb"> >>> >> >>> >>> >> >>> ...moreover: *when* will producerFlowControl start slowing down >>> >> >>> consumers? >>> >> >>> >>> >> >>> Cheers, >>> >> >>> F. >>> >> >>> >>> >> >>> >>> >> >> >>> >> >> -- >>> >> >> View this message in context: >>> >> >> >>> >> >>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html >>> >> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >>> >> >> >>> >> > >>> >> > Rob Davies >>> >> > http://twitter.com/rajdavies >>> >> > I work here: http://fusesource.com >>> >> > My Blog: http://rajdavies.blogspot.com/ >>> >> > I'm writing this: http://www.manning.com/snyder/ >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> >>> >> -- >>> >> View this message in context: >>> >> >>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html >>> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >>> >> >>> >> >>> > >>> > >>> > -- >>> > http://blog.garytully.com >>> > >>> > Open Source Integration >>> > http://fusesource.com >>> > >>> > >>> >>> -- >>> View this message in context: >>> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27602503.html >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >>> >>> >> >> >> -- >> http://blog.garytully.com >> >> Open Source Integration >> http://fusesource.com >> > > > > -- > http://blog.garytully.com > > Open Source Integration > http://fusesource.com > > -- View this message in context: http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27615569.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.