Hi,
we're experiencing the same problem.
We also use ActiveMQ 5.1 with jdbc persistant adapter and we use
and a queuePrefetch=1 on consumer side.
Our Broker waits at the same Unsafe.park as shown in jconsole:
Name: QueueThread:queue://NEW_FILE
State: WAITING
Total blocked: 120 Total waited: 15.149
Stack trace:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:118)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1841)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:359)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:470)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:674)
java.lang.Thread.run(Thread.java:595)
Also jconsole states a QueueSize=6 but browsing the queue via jconsoe
shows no entries at all. Having a direct look at the DB the 6 messages
exist in the activemq msg table.
shridharv schrieb:
Hi,
Was this issue figured out ?
We have the same problem, wherein a consumer stops receiving further
messages, as it has not committed after receiving around 1000 messages. We
cannot commit until we get a set of messages and hence the consume is
waiting indefinitely.
Any suggestions welcome.
AmQ 5.2 in Windows.
-----------------------------------------------
IBeaumont wrote:
I'm sending a mixture of persistent and non persistent messages.
I have seen a negative queue count before so will look at the trunk.
Can't see anything unusal in JConsole, but I have just downlaoded the
source and in the ActiveMQMessageConsumer.receiveNoWait there are these
lines...
if (info.getPrefetchSize() == 0) {
md = dequeue(-1); // We let the broker let us know when we
// timeout.
} else {
md = dequeue(0);
}
So that explains why my consumes seem to hang when things go wrong -
because they wait forever because I have a prefetchSize of zero.
I've now changed me code so that instead of using receiveNoWait, I'll use
"receive" with a short timeout. Hopefully then things will timeout and
messages will contiue to be processed. I'll then need to find out if all
any messages get skipped. ALthough that doesn't help find the cause.
Gary Tully wrote:
Your use case sounds typical and reasonable but just to understand,
are you sending persistent and non persistent messages to the same
queue?
Can you check in Jconsole, in the ActiveMQ BrokerView - Queues and
Subscriptions:
Do the queue metrics (size, dispatched) look as expected?
A negative queue size is indicative of a issue that was resolved on
trunk recently. If you see that you could try a snapshot.
Re: prefetch==0. ok, yea, you need that if you want to depende on
redelivered(). tThat is a known issue, prefetched messages that are
not consumed are deemed redelivered when they are dispatched to the
next consumer.
ActiveMQ redelivered means delivered to a amq consumer, not delivered
to the end user. see:
https://issues.apache.org/activemq/browse/AMQ-1952
2009/1/13 IBeaumont <ibeaum...@categoric.com>:
Hi Gary,
Thanks for the feedback.
As you can see from my stack traces earlier in jconsole, the consumers
are
stuck on pulling messages from the queues, they are in:
org.apache.activemq.MessageDispatchChannel.dequeue
The reason I've ended up with the settings I have as so far they seem to
be
give me the desired results (apart from this last problem). Maybe it
would
be easier to explain the behaviur I need and someone might be able to
advise
on the best settings...
So most messages sent are persistent, and I need to process every
message -
I can't afford to drop any. I may have slow "consumers", and I might be
putting messages on to the queues at a far greater rate than they can
get
processed (I want any available harddisk space to be used for message
storage). There are a a number of consumers and producers, and a number
of
different queues.
The pre-fetch size was set to zero to overcome an issue I was having
with
redelivery count. Our application uses the JMS redelivered to see if it
has
already tried processing the message. Using a pre-fetch size bigger
than 0
was causing me some issues with this. Out of interest, do you know what
happens to messages that are pre-fetched and then the consumer dies. Do
they get flagged as redelivered when a new consumer picks them up?
Gary Tully wrote:
A good place to start is with JCosole, to have a look at the queues
and consumers to see how many messages are currently in the queues.
The broker is configured with producerFlowControl="false", using TCP, a
pre-fetch size of 0 and I've also set sendFailIfNoSpace="true".
The sendFailIfNoSpace is disabled if producerFlowControl==false, so
your producers will hang if memory for the queues are exhausted.
sendFailIfNoSpace is considered a flowControl mechanism that does not
block so it needs producerFlowControl to be enabled (the default) and
no producer window to be set (also the default).
Just a thought, but is it possible that the consumers are blocking
when trying to "stick messages" back on a queue?
Out of interest, why are you using prefetch==0?
2009/1/13 IBeaumont <ibeaum...@categoric.com>:
I've got a fairly complex app, that takes msgs, processing them and
sticks
them on the same or different queues.
The queues are pre-loaded with persistent messages before the
application
starts (50000). Once it starts processing things work fine for a
while
and
then the consumers stop receiving any messages. I have a 4 consumers
for
each queue, and there are 3 different queues.
Looking at jconsole all consumes are waiting here:
State: WAITING on java.lang.obj...@1801d4a
Total blocked: 20 Total waited: 158
Stack trace:
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:485)
org.apache.activemq.MessageDispatchChannel.dequeue(MessageDispatchChannel.java:75)
org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:412)
org.apache.activemq.ActiveMQMessageConsumer.receiveNoWait(ActiveMQMessageConsumer.java:560)
com.xalert.server.queuing.SessionManager.getAlert(SessionManager.java:236)
If I look at the broker (and I'm not really sure what to look at
here),
the
thread for my queue that should be dispatching messages looks like
this
Name: QueueThread:queue://csPIQ
State: WAITING on
java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobj...@b0a518
Total blocked: 2,365 Total waited: 6,717
Stack trace:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(Unknown Source)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
Source)
java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
java.util.concurrent.ThreadPoolExecutor.getTask(Unknown Source)
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.lang.Thread.run(Unknown Source)
The broker is configured with producerFlowControl="false", using TCP,
a
pre-fetch size of 0 and I've also set sendFailIfNoSpace="true".
Any ideas on what my problem is or how/where I look in ActiveMQ to
find
the
cause, or if the problem is with the consumer.
TIA
Ian
--
View this message in context:
http://www.nabble.com/Q-Consumers-stop-receiving-messages-tp21438163p21438163.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
--
http://blog.garytully.com
Open Source SOA
http://FUSESource.com
--
View this message in context:
http://www.nabble.com/Q-Consumers-stop-receiving-messages-tp21438163p21440240.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.
--
http://blog.garytully.com
Open Source SOA
http://FUSESource.com
--
Dipl.-Ing. Norbert Pfistner
Softwareentwicklung
picturesafe GmbH
Simon-von-Utrecht-Straße 31-37
D-20359 Hamburg
http://www.picturesafe.de
fon: +49 40 374127 901
fax: +49 40 374127 999
npfist...@picturesafe.de
Sitz der Gesellschaft: Hannover
Geschäftsführer: Herbert Wirth
HR: Amtsgericht Hannover HR B 53 366