Hi, I've been investigating Camel recently and today I was taking a look at
message throughput in ActiveMQ when Camel routing is being used.

I have been doing simplistic testing so far, just using the example message
Producer class to pump lots of text messages through a queue.  When it's a
simple queue with no consumer, things seem fine.

However, if I have a Camel RouteBuilder as a consumer of this queue,
forwarding to another queue in the same ActiveMQ instance then it seems
possible to reproducibly make the queue hang.  This is a permanent hang,
with a restart of activemq required to fix the problem as far as I can tell.

I was in two minds about if this should be raised in the Camel forum or this
one, but the problem seems to lie in ActiveMQ (judging by the absence of
camel classes in any of the stack traces) and I suspect Camel is just a
useful way to introduce an unknown situation which causes the problem.

The story briefly is:

Start ActiveMQ (see version info later) with a simple camel route enabled
(from activemq:a to activemq:b for instance)
Start up a modified Producer class that just sends lots of messages (flat
out) to "a".
Each message is 25500 characters (~50KB)
After a while the ActiveMQ console will indicate that the Kaha store is
being used. (happens after a few thousand for me but will be machine
specific)
Shortly after this, the producer will stall.
Restarting the producer does not help.
Running a consumer to clear the queue does not help.
Restarting ActiveMQ does help (of course!).

I have tried this with three AMQs so far (4.1.1 hangs with no apparent
reason, 5.0 Snapshot from 12th august gets a thread deadlock and 5.0
snapshot from 15th august hangs for no apparent reason).

In each case, the producer is stuck writing to a socket (I've seen this
reported against AMQ 4.0.1 but no fix was put in place as the cause was not
identified).  Stacktrace is :

"main" prio=6 tid=0x00038278 nid=0xe9c runnable [0x0007f000..0x0007fc3c]
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at
java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
        at
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBu
fferedOutputStream.java:105)
        at java.io.DataOutputStream.flush(DataOutputStream.java:106)
        at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.ja
va:120)
        at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMoni
tor.java:144)
        - locked <0x22f99060> (a
org.apache.activemq.transport.InactivityMonitor
$2)
        at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.
java:82)
        at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatN
egotiator.java:90)
        at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.ja
va:40)
        - locked <0x22f9c5f0> (a java.lang.Object)
        at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorre
lator.java:59)
        at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnec
tion.java:1148)
        at
org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1583)
        - locked <0x22fa0118> (a java.lang.Object)
        at
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProdu
cer.java:226)
        at
org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessa
geProducerSupport.java:240)
        at ProducerTool.sendLoop(ProducerTool.java:137)
        at ProducerTool.run(ProducerTool.java:99)
        at ProducerTool.main(ProducerTool.java:60)


The deadlock found in AMQ 5 from 12th august is as follows:

Found one Java-level deadlock:
=============================
"ActiveMQ Transport: tcp:///127.0.0.1:4445":
  waiting to lock monitor 0x0003f84c (object 0x094f3700, a
java.lang.Object),
  which is held by "ActiveMQ Transport: tcp:///127.0.0.1:4443"
"ActiveMQ Transport: tcp:///127.0.0.1:4443":
  waiting to lock monitor 0x0003f72c (object 0x094f5ee8, a
org.apache.activemq.b
roker.region.cursors.FilePendingMessageCursor),
  which is held by "ActiveMQ Transport: tcp:///127.0.0.1:4445"

Java stack information for the threads listed above:
===================================================
"ActiveMQ Transport: tcp:///127.0.0.1:4445":
        at
org.apache.activemq.memory.UsageManager.increaseUsage(UsageManager.ja
va:157)
        - waiting to lock <0x094f3700> (a java.lang.Object)
        at
org.apache.activemq.command.Message.incrementReferenceCount(Message.j
ava:585)
        at
org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.ne
xt(FilePendingMessageCursor.java:192)
        - locked <0x094f5ee8> (a
org.apache.activemq.broker.region.cursors.FileP
endingMessageCursor)
        at
org.apache.activemq.broker.region.cursors.StoreQueueCursor.next(Store
QueueCursor.java:129)
        - locked <0x094f3908> (a
org.apache.activemq.broker.region.cursors.Store
QueueCursor)
        at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1015)
        - locked <0x094f3908> (a
org.apache.activemq.broker.region.cursors.Store
QueueCursor)
        at
org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:106
4)
        - locked <0x094f3998> (a java.lang.Object)
        at
org.apache.activemq.broker.region.Queue.sendMessage(Queue.java:993)
        at
org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:478)

        at org.apache.activemq.broker.region.Queue.send(Queue.java:436)
        at
org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.
java:328)
        at
org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java
:402)
        at
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.j
ava:221)
        at
org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:125)
        at
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeD
estinationBroker.java:95)
        at
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilt
er.java:135)
        at
org.apache.activemq.broker.TransportConnection.processMessage(Transpo
rtConnection.java:474)
        at
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.jav
a:623)
        at
org.apache.activemq.broker.TransportConnection.service(TransportConne
ction.java:320)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportC
onnection.java:216)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilt
er.java:67)
        at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireForm
atNegotiator.java:129)
        at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityM
onitor.java:124)
        - locked <0x09502968> (a
org.apache.activemq.transport.InactivityMonitor
$1)
        at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSup
port.java:83)
        at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.jav
a:150)
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:
137)
        at java.lang.Thread.run(Thread.java:595)
"ActiveMQ Transport: tcp:///127.0.0.1:4443":
        at
org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.on
MemoryUseChanged(FilePendingMessageCursor.java:249)
        - waiting to lock <0x094f5ee8> (a
org.apache.activemq.broker.region.curs
ors.FilePendingMessageCursor)
        at
org.apache.activemq.memory.UsageManager.fireEvent(UsageManager.java:3
58)
        at
org.apache.activemq.memory.UsageManager.setPercentUsage(UsageManager.
java:328)
        - locked <0x094f3700> (a java.lang.Object)
        at
org.apache.activemq.memory.UsageManager.decreaseUsage(UsageManager.ja
va:181)
        at
org.apache.activemq.command.Message.decrementReferenceCount(Message.j
ava:602)
        - locked <0x0961f980> (a
org.apache.activemq.command.ActiveMQTextMessage
)
        at
org.apache.activemq.broker.region.IndirectMessageReference.drop(Indir
ectMessageReference.java:137)
        - locked <0x09612fe0> (a
org.apache.activemq.broker.region.IndirectMessa
geReference)
        at
org.apache.activemq.broker.region.QueueSubscription.acknowledge(Queue
Subscription.java:56)
        at
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(Pr
efetchSubscription.java:193)
        - locked <0x094f0178> (a
org.apache.activemq.broker.region.QueueSubscrip
tion)
        at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(Abstract
Region.java:340)
        at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBrok
er.java:427)
        at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionB
roker.java:191)
        at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java
:73)
        at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java
:73)
        at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBro
kerFilter.java:87)
        at
org.apache.activemq.broker.TransportConnection.processMessageAck(Tran
sportConnection.java:480)
        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:184)
        at
org.apache.activemq.broker.TransportConnection.service(TransportConne
ction.java:320)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportC
onnection.java:216)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilt
er.java:67)
        at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireForm
atNegotiator.java:129)
        at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityM
onitor.java:124)
        - locked <0x094e9eb0> (a
org.apache.activemq.transport.InactivityMonitor
$1)
        at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSup
port.java:83)
        at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.jav
a:150)
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:
137)
        at java.lang.Thread.run(Thread.java:595)

Found 1 deadlock.

The "Hang for no reason" thread dump is in this file
http://www.nabble.com/file/p12181133/AMQ5%2BThreadDump.txt
AMQ5+ThreadDump.txt . In this one, I am posting to a queue called DataQueue
and this is being forwarded to a queue called TOOL.DEFAULT.

I'm not an expert on AMQ so perhaps I've just not configured something
correctly.  It seems to me that when the number of entries in the "a" queue
gets too long, AMQ decides to buffer them in a persistent store (kaha) and
this is where things fall over.  The use of Camel as a router slows down the
consumption of messages from queue "a" sufficiently to allow the backlog to
build up and cause kaha to kick in.

Some things don't make sense to me though - for instance if I just send
messages to a queue and don't consume them then should they not also end up
in a kaha store once I've sent enough?  My first experiment once I found
this problem was to send messages to queue "a" without any consumer - this
didn't cause a problem until AMQ ran out of memory.  This suggests that kaha
didn't kick in at all?

Anyhow, sorry for the long post but this seems a significant problem.  I
realise the message throughput I have used is pretty extreme but there does
seem to be some nasty race condition between message arrival, consumption
and the kaha store which could perhaps arise at any time.

Thanks,

-Dominic









-- 
View this message in context: 
http://www.nabble.com/Queue-%22hangs%22-when-Kaha-Store-kicks-in.-Restart-required.-tf4279563s2354.html#a12181133
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to