as I said on the JIRA, I knew it was about prefetch.
On Fri, May 26, 2017 at 3:38 PM, Clebert Suconic <clebert.suco...@gmail.com> wrote: > I doubt it is a bug... When you redeliver there are a few things that > could happen, including getting from the next buffer.. and things like > that... > > I have been writing Messaging System since JBoss MQ, JBoss Messaging > -> HornetQ -> ActiveMQ 5 and ActiveMQ Artemis, as an user on a few > other systems.. and this is usually a common problem with read ahead > on the client. > > Your best bet if you really care about ordering and redeliver is to > disable fetch ahead. with ActiveMQ: > http://activemq.apache.org/what-is-the-prefetch-limit-for.html > > > For ActiveMQ Artemis, you can set consumerWindowSize to 0 and that > will make the consumer to no cache any messages on the client. > > > I'm not sure if that's actually your issue. but it does sound like it. > > On Fri, May 26, 2017 at 3:31 PM, Fabian Gonzalez > <fabian.gonza...@mulesoft.com> wrote: >> Hello, >> >> I'm planning to send a PR in order to deal with the issue I reported in >> >> https://issues.apache.org/jira/browse/AMQ-6658 >> >> I would like some feedback (basically, if it makes sense). >> >> The issue concerns redelivery of messages: it does not preserve order under >> certain conditions when prefetch is performed. >> >> I think I found what causes the issue. >> >> There are two active threads which are involved in an apparent race >> condition: >> >> ActiveMQConnection[xx]Scheduler >> ActiveMQ Session Task >> >> 1) A Session Task thread verifies in ActiveMQSessionExecutor.iterate >> whether there are messages queued on the session (there are 60 messages >> prefetched). >> 2) It dispatchs the threads through ActiveMQMessageConsumer >> 3) if unconsumedMessages is not running in >> ActiveMQMessageConsumer.dispatch, it enqueues the message in >> unconsumedMessages and it is not delivered. >> >> So if ActiveMQ Session Task polls messages several times before >> ActiveMQConnection[xx]Scheduler starts unconsumedMessages in >> ActiveMQMessageConsumer:1889 (unconsumedMessages.start(), there would be >> several items in unconsumedMessages. >> >> The problem arises if ActiveMQ Session Task polls messages several times, >> it polls a new message, and then ActiveMQConnection[xx]Scheduler starts >> unconsumedMessage. In that case unconsumedMessages is running, so the >> message is delivered (but the first message which was enqueued in >> unconsumedMessage was the one which should have been delivered first >> instead of the one which was currently being considered by the session >> thread). >> >> I think the dispatch message has to deliver always the oldest message which >> was enqueued in unconsumedMessages. >> >> For example in the dispatch method in ActiveMQMessageConsumer, something >> like this should be done: >> >> @Override >> public void dispatch(MessageDispatch md) >> { >> MessageListener listener = this.messageListener.get(); >> try >> { >> clearMessagesInProgress(); >> clearDeliveredList(); >> synchronized (unconsumedMessages.getMutex()) >> { >> if (!unconsumedMessages.isClosed()) >> { >> if (this.info.isBrowser() || !session.connection.isDuplicate(this, >> md.getMessage())) >> { >> if (listener != null && unconsumedMessages.isRunning()) >> { >> // otherwise I do not preserve the order for >> // redelivery >> unconsumedMessages.enqueue(md); >> *md = unconsumedMessages.dequeueNoWait();* >> >> Thank you in advance for any feedback. >> >> Fabian >> >> On Tue, Apr 18, 2017 at 12:51 AM, Tim Bain <tb...@alumni.duke.edu> wrote: >> >>> I'm not an expert on the dispatch code, but this isn't the behavior I'd >>> expect to see, so please submit a bug in JIRA for it so we can get someone >>> who knows that code well can fix it (or explain why this is the intended >>> behavior, if that's the case). >>> >>> Tim >>> >>> On Tue, Apr 11, 2017 at 1:15 PM, Fabian Gonzalez < >>> fabian.gonza...@mulesoft.com> wrote: >>> >>> > Hello, >>> > >>> > This is my first message so greetings for all. >>> > I am facing a situation where activemq seems not to respect the order >>> for >>> > dispatched messages when a redilevery is needed using activemq-client >>> > 5.14.3. >>> > >>> > I am sending 60 messages to a queue with a single consumer, and I've >>> > noticed that sometimes when a relivery of the message is needed, as a >>> > consequence of rollback, another message from those 60 message is served >>> > before the redelivered message. There is no maxRedelivery set. >>> > >>> > What I notice debugging ActiveMQMessageConsumer is that the following >>> > behaviour may occur: >>> > >>> > - The 60 messages are dispatched in order in: >>> > >>> > ActiveMQMessageConsumer:1376: >>> > >>> > @Override >>> > public void dispatch(MessageDispatch md) { >>> > MessageListener listener = this.messageListener.get(); >>> > try { >>> > clearMessagesInProgress(); >>> > ... >>> > >>> > unconsumedMessage is running so the message is sent to the listener. >>> > >>> > - a rollback is performed and the message is redelivered (with a default >>> > delay): >>> > >>> > ActiveMQMessageConsumer:1305: >>> > >>> > if (redeliveryDelay > 0 && >>> > !unconsumedMessages.isClosed()) { >>> > // Start up the delivery again a little >>> later. >>> > session.getScheduler().executeAfterDelay(new >>> > Runnable() { >>> > @Override >>> > public void run() { >>> > try { >>> > if (started.get()) { >>> > start(); >>> > } >>> > } catch (JMSException e) { >>> > >>> > session.connection.onAsyncException(e); >>> > } >>> > } >>> > }, redeliveryDelay); >>> > } else { >>> > start(); >>> > } >>> > >>> > Periodically, the messages enqueued in the session are attempted to be >>> > consumed (as the unconsumedMessages from the consumer is not running they >>> > are not sent to the listener to be consumed and they are enqueued as >>> > unconsumedMessages). >>> > But if the thread scheduled from redelivery is started when the iteration >>> > from the unconsumed messages is being performed, the unconsumedMessages >>> is >>> > started in: >>> > >>> > public void start() throws JMSException { >>> > if (unconsumedMessages.isClosed()) { >>> > return; >>> > } >>> > started.set(true); >>> > unconsumedMessages.start(); >>> > session.executor.wakeup(); >>> > } >>> > >>> > and the message that is being considered from session (in the other >>> thread) >>> > is sent to the listener before the redelivered message, which may be an >>> > error in order. >>> > >>> > Is this the expected behaviour? I expected that the order was mantained >>> in >>> > this cases. >>> > >>> > Thanks in advance for your help and clarification >>> > >>> > > > > -- > Clebert Suconic -- Clebert Suconic