First, I think you're right that it's problematic that KahaDB checkpoint operations and "real" usage of the memory store are able to block one another, though my reasons for saying that may not be quite the same as yours. Fundamentally, the memory store and the persistence store are intended to be two independent storage areas, used for different purposes (one stores persistent messages, one stores non-persistent messages). One should be able to fill (and invoke flow control on the destinations that use it) without affecting the other. Yet what's actually happened is that we needed the ability to read messages from the persistent store into memory, and instead of recognizing the abstraction that the memory store is for non-persistent memory (and therefore not for the temporarily-in-memory copies of persistent messages as they are read into cursors), we treated the memory store as a catch-all for any copy of a message that wasn't in a persistent store. Obviously they have to go somewhere, but there's already a catch-all place for storing stuff that's being used for current processing, and it's called the heap, and we should have used it for copies of messages read in by cursors (or any other mechanism for reading messages from the persistent store) instead of placing them into the memory store.
One argument for putting the messages into the memory store might have been to ensure that lots of cursor-intensive operations don't cause the broker to OOM. I don't agree with that argument. Overloading a piece of software beyond the memory capacity that you've chosen to configure it with will always cause the software to OOM; if you connect to the broker with a billion separate connections, it's going to OOM, so I don't see why trying to do too much simultaneous browsing of the persistent store should be any different. The argument will always be, "if the way you're using the broker causes it to run out of memory, then either give it more memory so you don't run out, or stop doing whatever you're doing that's causing it to use so much memory;" there's no need to use the memory store as an attempt to protect us from that problem. So I roughly agree with the general premise that KahaDB operations should be unaffected by a full memory store and should not cause the memory store to fill, though my proposal for how to solve the problem differs from yours, and I don't think that AMQ-6115 really captures what I'm proposing. I'll submit a separate JIRA to propose that when I get some time this weekend. But putting that aside, it's obvious from your description that how you've currently configured the broker is insufficient for the usage patterns you're looking to support. (If it was sufficient, the checkpoint would succeed and everything would be find, as you've proven by your experiment of increasing the memory store size and the heap size.) So the same argument I described halfway through the paragraph about OOMs applies to you here: you're trying to do too much for the memory resources you've allocated to your broker, so you either need to increase your resource allocation or decrease your workload. (Or we need to decrease the amount of memory used by the checkpointing algorithm; more on that in a minute.) Increasing your resource allocation is simple, but you say it's not possible; I'm very skeptical that it's truly not possible, especially when you say the host has 16GB total but you're only allowed 1GB, and would recommend you question your assumptions (and those of your bosses, systems engineers, customers, etc.) to see if it's actually not possible or just not easy, but ultimately you know your scenario and if it's not possible then it's not possible. Decreasing your workload in this case means reducing the amount of work required to checkpoint your KahaDB instance. I don't have a lot of expertise on the KahaDB checkpoint algorithm, but I'd assume that you'll use less memory store if you have fewer messages in your KahaDB instance, and that you might also use less if you have the same number of messages in fewer destinations (since I assume that the cursors only have one page worth of messages loaded at a time, so fewer cursors would mean fewer pages loaded and therefore less memory usage at any one time). So one option might be to rearchitect your use of destinations to push messages into fewer destinations and use selectors to ensure that the right consumers get the right ones. Another solution that could work would be to keep messages for less time, either by discarding them if they're unconsumed after a certain amount of time, by changing your consumers to reduce the max possible time before they consume, or by consuming them yourself and storing them into a database, to be retrieved later. ActiveMQ is not a database and is not optimized to act as one; it's optimized for delivery of messages shortly after the are produced, and by keeping large amounts of data available for days at a time, you're using it in ways it's not optimized for (as you've discovered), but other products are optimized to be used as a database and you could consume content yourself and write it into one of those products if it's unconsumed after a certain amount of time. A third solution would be to provide dedicated brokers for each consumer with a hub-and-spoke topology, so that there are fewer messages in any given broker (so you've got a better chance of not running out of memory store space while checkpointing) and one consumer going offline doesn't affect the other brokers since the other brokers won't see the offline consumer's messages. A fourth option could be to shard your destinations across multiple central brokers (standalone, not part of a network of brokers), where each one is responsible for only certain destinations and consumers connect to all brokers and know which destinations are found on which broker. There might be other ways to do this, but those are the four I can think of: reduce your number of destinations, reduce your message volume by not keeping them on the broker as long, reduce your messages per broker by using a network of brokers, or reduce your messages per broker by sharding across multiple standalone brokers. I promised to come back to whether the ActiveMQ code can be changed to reduce the memory footprint used by checkpointing, so here it is. I haven't looked at the code that does the checkpointing, but I would assume that it would be possible to configure it in ways that don't use as much of the memory store, possibly at the cost of having checkpoints take longer. One possibility might be to reduce the page size of the cursors that are used for checkpointing each destination, so that each one puts fewer messages at a time into the memory store. Another option might be to limit the number of destinations that can be checkpointed in parallel, so there are at most N * Page Size messages in the memory store at a time due to checkpointing. Maybe there are other options as well, but those are the two that jumped to mind without having read the code. Both changes might increase the time required to perform a checkpoint, so they'd need to be configuration options that were exposed rather than changing default behavior. I don't want to submit a JIRA enhancement request without having read the code to confirm that it works the way I've assumed it does, so I'm not going to submit a JIRA entry about these ideas at the moment. If you have time to find and read the source code and evaluate whether these proposed solutions make sense (and whether there's a better option), then please do, otherwise I'll try to get to it when I have some time. Either way, this won't give you an immediate fix and there's no guarantee that any enhancement you request will be implemented, so in the meantime I'd encourage you to consider some of the other things I've recommended to you, so you have options between now and whenever a version containing one of these enhancements is released. Tim On Wed, Jan 13, 2016 at 8:43 AM, Klaus Pittig < klaus.pit...@futura4retail.com> wrote: > a.) Regarding your last answer (thanks for your effort by the way): > > I'm aware of the relation between the heap and the systemUsage > memoryLimit and we make sure that there are no illogical settings. > The primary requirement is to have a stable system running 'forever' > w/o any memory issues at any time independent from the load/throughput. > No one really wants to deal with memory settings on the edge of limits. > > You're right: the memory is completely consumed. And I can't guarantee > the checkpoint/cleanup to be finished completely, so the system can be > stalled without giving GC a chance to release some memory. > > It's the expiry check causing this. The persistent stores themselves > seem to be managed as expected (no issues, no inconsistency, no loss); > our situation is independent of the storage (reproducable for leveldb > and kahadb). For KahaDB we use 16mb for journal files since years > (helps to save a huge amount of space required for pending messages > not consumed for some days due to offline situations on client side). > Anyway, here is our current configuration you requested: > > <persistenceAdapter> > <kahaDB directory="${activemq.base}/data/kahadb" > enableIndexWriteAsync="true" journalMaxFileLength="16mb" > indexWriteBatchSize="10000" indexCacheSize="10000" /> > <!-- > <levelDB directory="${activemq.base}/data/leveldb" logSize="33554432" /> > --> > </persistenceAdapter> > > > b.) Some proposal concerning AMQ-6115: > > In my point of view, it's worth to discuss the one and only > memoryLimit parameter used for both the regular browse/consume threads > and the checkpoint/cleanup threads. > There should always be enough space to browse/consume any queue at > least with prefetch 1 resp. one of the next pending messages. > Maybe - in this case - 2 well-balanced memoryLimit parameters with > priority on consumption instead of checkpoint/cleanup are helpful for > a a better regulation. Or something near it. > > > c.) Our results and an acceptable solution so far: > > After a thorough investigation (w/o changing ActiveMQ source code) the > result is for now that we need to accept the limitations defined by > the single memoryLimit parameter used both for the #checkpoint/cleanup > process and browsing/consuming queues. > > **1.) Memory** > > There is not a problem, if we use a much higher memoryLimit (together > with a higher max-heap) to support both the message caching per > destination during the #checkpoint/cleanup workflow and our > requirements to browse/consume messages. > > But more memory is not an option in our scenario, we need to deal with > 1024m max-heap and 500m memoryLimit. > > Besides this, constantly setting higher memoryLimits just because of > more persistent queues containing hundreds/thousands of pending > messages together with certain offline/inactive consumer scenarios > should be discussed in detail (IMHO). > > > **2.) Persistent Adapters** > > We ruled out persistent adapters as the cause of the problem, because > the behaviour doesn't change, if we switch different types of > persistent stores (KahaDB, LevelDB, JDBC-PostgreSQL). > > During the debugging sessions with KahaDB we also see regular > checkpoint handling, the storage is managed as expected. > > > **3.) Destination Policy / Expiration Check** > > Our problem completely disappears, if we disable caching and the > expiration check, which is the actual cause of the problem. > > The corresponding properties are documented and there is a nice blog > article about Message Priorities with a description quite suitable for > our scenario: > > - http://activemq.apache.org/how-can-i-support-priority-queues.html > - > > http://blog.christianposta.com/activemq/activemq-message-priorities-how-it-works/ > > We simply added useCache="false" and expireMessagesPeriod="0" to the > policyEntry: > > <destinationPolicy> > <policyMap> > <policyEntries> > <policyEntry queue=">" producerFlowControl="false" > optimizedDispatch="true" memoryLimit="128mb" > timeBeforeDispatchStarts="1000" > useCache="false" expireMessagesPeriod="0"> > <dispatchPolicy> > <strictOrderDispatchPolicy /> > </dispatchPolicy> > <pendingQueuePolicy> > <storeCursor /> > </pendingQueuePolicy> > </policyEntry> > </policyEntries> > </policyMap> > </destinationPolicy> > > > The consequences are clear, if we don't use in-mem caching anymore and > never check for message expiration. > > For we neither use message expiration nor message priorities and the > current message dispatching is fast enough for us, this trade-off is > acceptable regarding given system limitations. > > One should also think about well-defined prefetch limits for memory > consumption during specific workflows. Message sizes in our scenario > can be 2 Bytes up to approx. 100 KB, so more individual policyEntries > and client consumer configurations could be helpful to optimize system > behaviour concerning performance and memory usage (see > http://activemq.apache.org/per-destination-policies.html). > > > Cheers > Klaus > > > Am 11.01.16 um 15:35 schrieb Tim Bain: > > I believe you are correct: browsing a persistent queue uses bytes > > from the memory store, because those bytes must be read from the > > persistence store into the memory store before they can be handed > > off to browsers or consumers. If all available bytes in the memory > > store are already in use, the messages can't be paged into the > > memory store, and so the operation that required them to be paged > > in will hang/fail. > > > > You can work around the problem by increasing your memory store > > size via trial-and-error until the problem goes away. Note that > > the broker itself needs some amount of memory, so you can't give > > the whole heap over to the memory store or you'll risk getting > > OOMs, which means you may need to increase the heap size as well. > > You can estimate how much memory the broker needs aside from the > > memory store by subtracting the bytes used for the memory store > > (539 MB) from the total heap bytes used as measured via JConsole or > > similar tools. I'd double (or more) that number to be safe, if it > > was me; the last thing I want to deal with in a production > > application (ActiveMQ or anything else) is running out of memory > > because I tried to cut the memory limits too close just to save a > > little RAM. > > > > All of that is how to work around the fact that before you try to > > browse your queue, something else has already consumed all > > available bytes in the memory store. If you want to dig into why > > that's happening, we'd need to try to figure out what those bytes > > are being used for and whether it's possible to change > > configuration values to reduce the usage so it fits into your > > current limit. There will definitely be more effort required than > > simply increasing the memory limit (and max heap size), but we can > > try if you're not able to increase the limits enough to fix the > > problem. > > > > If you want to go down that path, one thread to pull on is your > > observation that you "can browse/consume some Queues _until_ the > > #checkpoint call after 30 seconds." I assume from your reference > > to checkpointing that you're using KahaDB as your persistence > > store. Can you post the KahaDB portion of your config? > > > > Your statements here and in your StackOverflow post ( > > > http://stackoverflow.com/questions/34679854/how-to-avoid-blocking-of-queue-browsing-after-activemq-checkpoint-call > ) > > > > > indicate that you think that the problem is that memory isn't getting > > garbage collected after the operation that needed it (i.e. the > > checkpoint) completes, but it's also possible that the checkpoint > > operation isn't completing because it can't get enough messages > > read into the memory store. Have you confirmed via the thread dump > > that there is not a checkpoint operation still in progress? Also, > > how large are your journal files that are getting checkpointed? If > > they're large enough that all messages for one file won't fit into > > the memory store, you might be able to prevent the problem by using > > smaller files. > > > > Tim On Jan 8, 2016 9:32 AM, "Klaus Pittig" > > <klaus.pit...@futura4retail.com> wrote: > > > >> If I increase the JVM max heap size (4GB), the behavior does not > >> change. In my point of view, the configured memoryLimit (500 MB) > >> works as expected (heapdump shows same max. size for the > >> TextMessage content, i.e. 55002 byte[] instances containing 539 > >> MB total). > >> > >> However, trying to browse a queue shows no content, even if there > >> is enough heap memory available. > >> > >> As far as i understand the sourcecode, this also due to the > >> configured memoryLimit, because - i hope this is the answer you > >> expect - the calculation for available causes hasSpace = false. > >> > >> I found this here: > >> > >> AbstractPendingMessageCursor { public boolean hasSpace() { return > >> systemUsage != null ? > >> (!systemUsage.getMemoryUsage().isFull(memoryUsageHighWaterMark)) > >> : true; } public boolean isFull() { return systemUsage != null ? > >> systemUsage.getMemoryUsage().isFull() : false; } } > >> > >> > >> #hasSpace is in this case called during a click on a queue in > >> the Webconsole; see the 2 stacks during this workflow: > >> > >> Daemon Thread [Queue:aaa114] (Suspended (breakpoint at line 107 > >> in QueueStorePrefetch)) owns: QueueStorePrefetch (id=6036) owns: > >> StoreQueueCursor (id=6037) owns: Object (id=6038) > >> QueueStorePrefetch.doFillBatch() line: 107 > >> QueueStorePrefetch(AbstractStoreCursor).fillBatch() line: 381 > >> QueueStorePrefetch(AbstractStoreCursor).reset() line: 142 > >> StoreQueueCursor.reset() line: 159 > >> Queue.doPageInForDispatch(boolean, boolean) line: 1897 > >> Queue.pageInMessages(boolean) line: 2119 Queue.iterate() line: > >> 1596 DedicatedTaskRunner.runTask() line: 112 > >> DedicatedTaskRunner$1.run() line: 42 > >> > >> Daemon Thread [ActiveMQ VMTransport: vm://localhost#1] > >> (Suspended (breakpoint at line 107 in QueueStorePrefetch)) owns: > >> QueueStorePrefetch (id=5974) owns: StoreQueueCursor (id=5975) > >> owns: Object (id=5976) owns: Object (id=5977) > >> QueueStorePrefetch.doFillBatch() line: 107 > >> QueueStorePrefetch(AbstractStoreCursor).fillBatch() line: 381 > >> QueueStorePrefetch(AbstractStoreCursor).reset() line: 142 > >> StoreQueueCursor.reset() line: 159 > >> Queue.doPageInForDispatch(boolean, boolean) line: 1897 > >> Queue.pageInMessages(boolean) line: 2119 Queue.iterate() line: > >> 1596 Queue.wakeup() line: 1822 > >> Queue.addSubscription(ConnectionContext, Subscription) line: 491 > >> ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, > >> > >> > ConsumerInfo) line: 399 > >> ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, > >> ConsumerInfo) line: 427 > >> ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) > >> line: 244 > >> AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, > >> ConsumerInfo) line: 102 > >> AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: > >> 104 > >> CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, > >> > >> > ConsumerInfo) > >> line: 102 > >> TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, > >> ConsumerInfo) line: 102 > >> StatisticsBroker(BrokerFilter).addConsumer(ConnectionContext, > >> ConsumerInfo) line: 102 > >> BrokerService$5(MutableBrokerFilter).addConsumer(ConnectionContext, > >> > >> > ConsumerInfo) line: 107 > >> TransportConnection.processAddConsumer(ConsumerInfo) line: 663 > >> ConsumerInfo.visit(CommandVisitor) line: 348 > >> TransportConnection.service(Command) line: 334 > >> TransportConnection$1.onCommand(Object) line: 188 > >> ResponseCorrelator.onCommand(Object) line: 116 > >> MutexTransport.onCommand(Object) line: 50 VMTransport.iterate() > >> line: 248 DedicatedTaskRunner.runTask() line: 112 > >> DedicatedTaskRunner$1.run() line: 42 > >> > >> > >> > >> Setting queueBrowsePrefetch="1" and queuePrefetch="1" in the > >> PolicyEntry for queue=">" also has no effect. > >> > >> > >> Am 08.01.16 um 16:32 schrieb Tim Bain: > >>> If you increase your JVM size (4GB, 8GB, etc., the biggest your > >>> OS and hardware will support), does the behavior change? Does > >>> it truly take all available memory, or just all the memory that > >>> you've made available to it (which isn't tiny but really isn't > >>> all that big)? > >>> > >>> Also, how do you know that the MessageCursor seems to decide > >>> that there is not enough memory and stops delivery of queue > >>> content to browsers/consumers? What symptom tells you that? On > >>> Jan 8, 2016 8:25 AM, "Klaus Pittig" > >>> <klaus.pit...@futura4retail.com> wrote: > >>> > >>>> (related issue: > >>>> https://issues.apache.org/jira/browse/AMQ-6115) > >>>> > >>>> There's a problem when Using ActiveMQ with a large number of > >>>> Persistence Queues (250) á 1000 persistent TextMessages á 10 > >>>> KB. > >>>> > >>>> Our scenario requires these messages to remain in the storage > >>>> over a long time (days), until they are consumed (large > >>>> amounts of data are staged for distribution for many > >>>> consumer, that could be offline for some days). > >>>> > >>>> > >>>> After the Persistence Store is filled with these Messages and > >>>> after a broker restart we can browse/consume some Queues > >>>> _until_ the #checkpoint call after 30 seconds. > >>>> > >>>> This call causes the broker to use all available memory and > >>>> never releases it for other tasks such as Queue > >>>> browse/consume. Internally the MessageCursor seems to decide, > >>>> that there is not enough memory and stops delivery of queue > >>>> content to browsers/consumers. > >>>> > >>>> > >>>> => Is there a way to avoid this behaviour by configuration or > >>>> is this a bug? > >>>> > >>>> The expectation is, that we can consume/browse any queue > >>>> under all circumstances. > >>>> > >>>> Settings below are in production for some time now and > >>>> several recommendations are applied found in the ActiveMQ > >>>> documentation (destination policies, systemUsage, persistence > >>>> store options etc.) > >>>> > >>>> - Behaviour is tested with ActiveMQ: 5.11.2, 5.13.0 and > >>>> 5.5.1. - Memory Settings: Xmx=1024m - Java: 1.8 or 1.7 - OS: > >>>> Windows, MacOS, Linux - PersistenceAdapter: KahaDB or > >>>> LevelDB - Disc: enough free space (200 GB) and physical > >>>> memory (16 GB max). > >>>> > >>>> Besides the above mentioned settings we use the following > >>>> settings for the broker (btw: changing the memoryLimit to a > >>>> lower value like 1mb does not change the situation): > >>>> > >>>> <destinationPolicy> <policyMap> <policyEntries> <policyEntry > >>>> queue=">" producerFlowControl="false" > >>>> optimizedDispatch="true" memoryLimit="128mb" > >>>> timeBeforeDispatchStarts="1000"> <dispatchPolicy> > >>>> <strictOrderDispatchPolicy /> </dispatchPolicy> > >>>> <pendingQueuePolicy> <storeCursor /> </pendingQueuePolicy> > >>>> </policyEntry> </policyEntries> </policyMap> > >>>> </destinationPolicy> <systemUsage> <systemUsage > >>>> sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage > >>>> limit="50 mb" /> </memoryUsage> <storeUsage> <storeUsage > >>>> limit="80000 mb" /> </storeUsage> <tempUsage> <tempUsage > >>>> limit="1000 mb" /> </tempUsage> </systemUsage> > >>>> </systemUsage> > >>>> > >>>> If we set the **cursorMemoryHighWaterMark** in the > >>>> destinationPolicy to a higher value like **150** or **600** > >>>> depending on the difference between memoryUsage and the > >>>> available heap space relieves the situation a bit for a > >>>> workaround, but this is not really an option for production > >>>> systems in my point of view. > >>>> > >>>> Screenie with information from Oracle Mission Control showing > >>>> those ActiveMQTextMessage instances that are never released > >>>> from memory: > >>>> > >>>> http://goo.gl/EjEixV > >>>> > >>>> > >>>> Cheers Klaus > >>>> > >>> > >> > > >