Have not looked into the test case yet, but when prefetch=0, a consumer is polling for messages. The broker will not actively dispatch messages to that consumer. On each call to receive, the consumer sends a pull command to the broker and then waits for a message dispatch. If failover occurs between the send of the pull and before the dispatch, the receive will remain blocked as the pull command will be lost and not replayed.
One solution is to have the state tracker track message pull commands so that they can be replayed. Afaik that is not there at the moment. Can you raise an enhancement request for that? For your use case though, is prefetch=1 an option? On 15 February 2010 17:35, Pavel <[email protected]> wrote: > Here is another example, this time activemq + Spring; no CXF at all. > > Any ideas? > > Thanks, > Pavel > > > On Sat, Feb 13, 2010 at 3:04 PM, Pavel <[email protected]> wrote: > >> Hi, >> >> I'm hitting an issue with activeMQ reconnect. >> >> We are using CXF and SOAP over JMS with ActiveMQ. Also, at some point we >> started using consumer.prefetchSize=0. This is to prevent deadlocks when >> doing cyclic synchronous webservice calls, like A.foo()->B.bar()->A.baz(). >> >> So, attached is a small example, echo-like webservice called over JMS >> every 2 seconds. >> ActiveMQ 5.3 or Fuse 5.3.0.5. >> >> activemq.broker.url=failover:(tcp://localhost:61616) >> jms.destinationQueueName=SAMPLE___SERVICE___QUEUE?consumer.prefetchSize=0 >> >> >> All works well, until AMQ restart. Once restart happens, client seems to >> reconnect (I can see consumer in a web console), but starts timing out [1]. >> AMQ logs contain several messages like this: >> DEBUG | SAMPLE___SERVICE___QUEUE toPageIn: 1, Inflight: 0, >> pagedInMessages.size 5 >> >> at some point this follows by a bunch of >> DEBUG | Message expired Message >> ID:EPBYMINW0436-4688-1266064267343-0:0:22:1:1 dropped=false acked=false >> locked=false >> >> >> After application restart things are back to normal - until the next >> reconnect. >> >> If I simply start consumers with AMQ down, then start AMQ - connect works >> fine and messages start flowing. So it looks like the issue is with >> reconnect only. >> >> Am I hitting some known issue? Are there solutions/workarounds for that? >> >> >> [1] Client stacktrace. >> Caused by: java.lang.RuntimeException: Timeout receiving message with >> correlationId e10ec6062aec42d899e3c36d1171c6530000000000000032 >> at >> org.apache.cxf.transport.jms.JMSConduit.sendExchange(JMSConduit.java:206) >> at >> org.apache.cxf.transport.jms.JMSOutputStream.doClose(JMSOutputStream.java:56) >> at >> org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:185) >> at >> org.apache.cxf.io.CacheAndWriteOutputStream.postClose(CacheAndWriteOutputStream.java:47) >> at >> org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:188) >> at >> org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:66) >> at >> org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:62) >> at >> org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:236) >> at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:472) >> at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:302) >> at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:254) >> at >> org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:73) >> at >> org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:123) >> >> >> Thanks, >> Pavel > > > > -- http://blog.garytully.com Open Source Integration http://fusesource.com
