Hi, I've been investigating Camel recently and today I was taking a look at message throughput in ActiveMQ when Camel routing is being used.
I have been doing simplistic testing so far, just using the example message Producer class to pump lots of text messages through a queue. When it's a simple queue with no consumer, things seem fine. However, if I have a Camel RouteBuilder as a consumer of this queue, forwarding to another queue in the same ActiveMQ instance then it seems possible to reproducibly make the queue hang. This is a permanent hang, with a restart of activemq required to fix the problem as far as I can tell. I was in two minds about if this should be raised in the Camel forum or this one, but the problem seems to lie in ActiveMQ (judging by the absence of camel classes in any of the stack traces) and I suspect Camel is just a useful way to introduce an unknown situation which causes the problem. The story briefly is: Start ActiveMQ (see version info later) with a simple camel route enabled (from activemq:a to activemq:b for instance) Start up a modified Producer class that just sends lots of messages (flat out) to "a". Each message is 25500 characters (~50KB) After a while the ActiveMQ console will indicate that the Kaha store is being used. (happens after a few thousand for me but will be machine specific) Shortly after this, the producer will stall. Restarting the producer does not help. Running a consumer to clear the queue does not help. Restarting ActiveMQ does help (of course!). I have tried this with three AMQs so far (4.1.1 hangs with no apparent reason, 5.0 Snapshot from 12th august gets a thread deadlock and 5.0 snapshot from 15th august hangs for no apparent reason). In each case, the producer is stuck writing to a socket (I've seen this reported against AMQ 4.0.1 but no fix was put in place as the cause was not identified). Stacktrace is : "main" prio=6 tid=0x00038278 nid=0xe9c runnable [0x0007f000..0x0007fc3c] at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92) at java.net.SocketOutputStream.write(SocketOutputStream.java:136) at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBu fferedOutputStream.java:105) at java.io.DataOutputStream.flush(DataOutputStream.java:106) at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.ja va:120) at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMoni tor.java:144) - locked <0x22f99060> (a org.apache.activemq.transport.InactivityMonitor $2) at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter. java:82) at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatN egotiator.java:90) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.ja va:40) - locked <0x22f9c5f0> (a java.lang.Object) at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorre lator.java:59) at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnec tion.java:1148) at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1583) - locked <0x22fa0118> (a java.lang.Object) at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProdu cer.java:226) at org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessa geProducerSupport.java:240) at ProducerTool.sendLoop(ProducerTool.java:137) at ProducerTool.run(ProducerTool.java:99) at ProducerTool.main(ProducerTool.java:60) The deadlock found in AMQ 5 from 12th august is as follows: Found one Java-level deadlock: ============================= "ActiveMQ Transport: tcp:///127.0.0.1:4445": waiting to lock monitor 0x0003f84c (object 0x094f3700, a java.lang.Object), which is held by "ActiveMQ Transport: tcp:///127.0.0.1:4443" "ActiveMQ Transport: tcp:///127.0.0.1:4443": waiting to lock monitor 0x0003f72c (object 0x094f5ee8, a org.apache.activemq.b roker.region.cursors.FilePendingMessageCursor), which is held by "ActiveMQ Transport: tcp:///127.0.0.1:4445" Java stack information for the threads listed above: =================================================== "ActiveMQ Transport: tcp:///127.0.0.1:4445": at org.apache.activemq.memory.UsageManager.increaseUsage(UsageManager.ja va:157) - waiting to lock <0x094f3700> (a java.lang.Object) at org.apache.activemq.command.Message.incrementReferenceCount(Message.j ava:585) at org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.ne xt(FilePendingMessageCursor.java:192) - locked <0x094f5ee8> (a org.apache.activemq.broker.region.cursors.FileP endingMessageCursor) at org.apache.activemq.broker.region.cursors.StoreQueueCursor.next(Store QueueCursor.java:129) - locked <0x094f3908> (a org.apache.activemq.broker.region.cursors.Store QueueCursor) at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1015) - locked <0x094f3908> (a org.apache.activemq.broker.region.cursors.Store QueueCursor) at org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:106 4) - locked <0x094f3998> (a java.lang.Object) at org.apache.activemq.broker.region.Queue.sendMessage(Queue.java:993) at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:478) at org.apache.activemq.broker.region.Queue.send(Queue.java:436) at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion. java:328) at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java :402) at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.j ava:221) at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:125) at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeD estinationBroker.java:95) at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilt er.java:135) at org.apache.activemq.broker.TransportConnection.processMessage(Transpo rtConnection.java:474) at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.jav a:623) at org.apache.activemq.broker.TransportConnection.service(TransportConne ction.java:320) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportC onnection.java:216) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilt er.java:67) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireForm atNegotiator.java:129) at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityM onitor.java:124) - locked <0x09502968> (a org.apache.activemq.transport.InactivityMonitor $1) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSup port.java:83) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.jav a:150) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java: 137) at java.lang.Thread.run(Thread.java:595) "ActiveMQ Transport: tcp:///127.0.0.1:4443": at org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.on MemoryUseChanged(FilePendingMessageCursor.java:249) - waiting to lock <0x094f5ee8> (a org.apache.activemq.broker.region.curs ors.FilePendingMessageCursor) at org.apache.activemq.memory.UsageManager.fireEvent(UsageManager.java:3 58) at org.apache.activemq.memory.UsageManager.setPercentUsage(UsageManager. java:328) - locked <0x094f3700> (a java.lang.Object) at org.apache.activemq.memory.UsageManager.decreaseUsage(UsageManager.ja va:181) at org.apache.activemq.command.Message.decrementReferenceCount(Message.j ava:602) - locked <0x0961f980> (a org.apache.activemq.command.ActiveMQTextMessage ) at org.apache.activemq.broker.region.IndirectMessageReference.drop(Indir ectMessageReference.java:137) - locked <0x09612fe0> (a org.apache.activemq.broker.region.IndirectMessa geReference) at org.apache.activemq.broker.region.QueueSubscription.acknowledge(Queue Subscription.java:56) at org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(Pr efetchSubscription.java:193) - locked <0x094f0178> (a org.apache.activemq.broker.region.QueueSubscrip tion) at org.apache.activemq.broker.region.AbstractRegion.acknowledge(Abstract Region.java:340) at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBrok er.java:427) at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionB roker.java:191) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java :73) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java :73) at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBro kerFilter.java:87) at org.apache.activemq.broker.TransportConnection.processMessageAck(Tran sportConnection.java:480) at org.apache.activemq.command.MessageAck.visit(MessageAck.java:184) at org.apache.activemq.broker.TransportConnection.service(TransportConne ction.java:320) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportC onnection.java:216) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilt er.java:67) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireForm atNegotiator.java:129) at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityM onitor.java:124) - locked <0x094e9eb0> (a org.apache.activemq.transport.InactivityMonitor $1) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSup port.java:83) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.jav a:150) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java: 137) at java.lang.Thread.run(Thread.java:595) Found 1 deadlock. The "Hang for no reason" thread dump is in this file http://www.nabble.com/file/p12181133/AMQ5%2BThreadDump.txt AMQ5+ThreadDump.txt . In this one, I am posting to a queue called DataQueue and this is being forwarded to a queue called TOOL.DEFAULT. I'm not an expert on AMQ so perhaps I've just not configured something correctly. It seems to me that when the number of entries in the "a" queue gets too long, AMQ decides to buffer them in a persistent store (kaha) and this is where things fall over. The use of Camel as a router slows down the consumption of messages from queue "a" sufficiently to allow the backlog to build up and cause kaha to kick in. Some things don't make sense to me though - for instance if I just send messages to a queue and don't consume them then should they not also end up in a kaha store once I've sent enough? My first experiment once I found this problem was to send messages to queue "a" without any consumer - this didn't cause a problem until AMQ ran out of memory. This suggests that kaha didn't kick in at all? Anyhow, sorry for the long post but this seems a significant problem. I realise the message throughput I have used is pretty extreme but there does seem to be some nasty race condition between message arrival, consumption and the kaha store which could perhaps arise at any time. Thanks, -Dominic -- View this message in context: http://www.nabble.com/Queue-%22hangs%22-when-Kaha-Store-kicks-in.-Restart-required.-tf4279563s2354.html#a12181133 Sent from the ActiveMQ - User mailing list archive at Nabble.com.