I doubt it is a bug... When you redeliver there are a few things that
could happen, including getting from the next buffer.. and things like
that...

I have been writing Messaging System since JBoss MQ, JBoss Messaging
-> HornetQ -> ActiveMQ 5 and ActiveMQ Artemis, as an user on a few
other systems..  and this is usually a common problem with read ahead
on the client.

Your best bet if you really care about ordering and redeliver is to
disable fetch ahead. with ActiveMQ:
http://activemq.apache.org/what-is-the-prefetch-limit-for.html


For ActiveMQ Artemis, you can set consumerWindowSize to 0 and that
will make the consumer to no cache any messages on the client.


I'm not sure if that's actually your issue. but it does sound like it.

On Fri, May 26, 2017 at 3:31 PM, Fabian Gonzalez
<fabian.gonza...@mulesoft.com> wrote:
> Hello,
>
> I'm planning to send a PR in order to deal with the issue I reported in
>
> https://issues.apache.org/jira/browse/AMQ-6658
>
> I would like some feedback (basically, if it makes sense).
>
> The issue concerns redelivery of messages: it does not preserve order under
> certain conditions when prefetch is performed.
>
> I think I found what causes the issue.
>
> There are two active threads which are involved in an apparent race
> condition:
>
> ActiveMQConnection[xx]Scheduler
> ActiveMQ Session Task
>
> 1) A Session Task thread verifies in ActiveMQSessionExecutor.iterate
> whether there are messages queued on the session (there are 60 messages
> prefetched).
> 2) It dispatchs the threads through ActiveMQMessageConsumer
> 3) if unconsumedMessages is not running in
> ActiveMQMessageConsumer.dispatch, it enqueues the message in
> unconsumedMessages and it is not delivered.
>
> So if ActiveMQ Session Task polls messages several times before
> ActiveMQConnection[xx]Scheduler starts unconsumedMessages in
> ActiveMQMessageConsumer:1889 (unconsumedMessages.start(), there would be
> several items in unconsumedMessages.
>
> The problem arises if ActiveMQ Session Task polls messages several times,
> it polls a new message, and then ActiveMQConnection[xx]Scheduler starts
> unconsumedMessage. In that case unconsumedMessages is running, so the
> message is delivered (but the first message which was enqueued in
> unconsumedMessage was the one which should have been delivered first
> instead of the one which was currently being considered by the session
> thread).
>
> I think the dispatch message has to deliver always the oldest message which
> was enqueued in unconsumedMessages.
>
> For example in the dispatch method in ActiveMQMessageConsumer, something
> like this should be done:
>
> @Override
> public void dispatch(MessageDispatch md)
> {
> MessageListener listener = this.messageListener.get();
> try
> {
> clearMessagesInProgress();
> clearDeliveredList();
> synchronized (unconsumedMessages.getMutex())
> {
> if (!unconsumedMessages.isClosed())
> {
> if (this.info.isBrowser() || !session.connection.isDuplicate(this,
> md.getMessage()))
> {
> if (listener != null && unconsumedMessages.isRunning())
> {
> // otherwise I do not preserve the order for
> // redelivery
> unconsumedMessages.enqueue(md);
> *md = unconsumedMessages.dequeueNoWait();*
>
> Thank you in advance for any feedback.
>
> Fabian
>
> On Tue, Apr 18, 2017 at 12:51 AM, Tim Bain <tb...@alumni.duke.edu> wrote:
>
>> I'm not an expert on the dispatch code, but this isn't the behavior I'd
>> expect to see, so please submit a bug in JIRA for it so we can get someone
>> who knows that code well can fix it (or explain why this is the intended
>> behavior, if that's the case).
>>
>> Tim
>>
>> On Tue, Apr 11, 2017 at 1:15 PM, Fabian Gonzalez <
>> fabian.gonza...@mulesoft.com> wrote:
>>
>> > Hello,
>> >
>> > This is my  first message so greetings for all.
>> > I am facing a situation  where activemq seems not to respect the order
>> for
>> > dispatched messages when a redilevery is needed using activemq-client
>> > 5.14.3.
>> >
>> > I am sending 60 messages to a queue with a single consumer, and I've
>> > noticed that sometimes when a relivery of the message is needed, as a
>> > consequence of rollback, another message from those 60 message is served
>> > before the redelivered message. There is no maxRedelivery set.
>> >
>> > What I notice debugging ActiveMQMessageConsumer is that the following
>> > behaviour may occur:
>> >
>> > - The 60 messages are dispatched in order in:
>> >
>> > ActiveMQMessageConsumer:1376:
>> >
>> >     @Override
>> >     public void dispatch(MessageDispatch md) {
>> >         MessageListener listener = this.messageListener.get();
>> >         try {
>> >             clearMessagesInProgress();
>> >             ...
>> >
>> > unconsumedMessage is running so the message is sent to the listener.
>> >
>> > - a rollback is performed and the message is redelivered (with a default
>> > delay):
>> >
>> > ActiveMQMessageConsumer:1305:
>> >
>> >                         if (redeliveryDelay > 0 &&
>> > !unconsumedMessages.isClosed()) {
>> >                             // Start up the delivery again a little
>> later.
>> >                             session.getScheduler().executeAfterDelay(new
>> > Runnable() {
>> >                                 @Override
>> >                                 public void run() {
>> >                                     try {
>> >                                         if (started.get()) {
>> >                                             start();
>> >                                         }
>> >                                     } catch (JMSException e) {
>> >
>> > session.connection.onAsyncException(e);
>> >                                     }
>> >                                 }
>> >                             }, redeliveryDelay);
>> >                         } else {
>> >                             start();
>> >                         }
>> >
>> > Periodically, the messages enqueued in the session are attempted to be
>> > consumed (as the unconsumedMessages from the consumer is not running they
>> > are not sent to the listener to be consumed and they are enqueued as
>> > unconsumedMessages).
>> > But if the thread scheduled from redelivery is started when the iteration
>> > from the unconsumed messages is being performed, the unconsumedMessages
>> is
>> > started in:
>> >
>> >     public void start() throws JMSException {
>> >         if (unconsumedMessages.isClosed()) {
>> >             return;
>> >         }
>> >         started.set(true);
>> >         unconsumedMessages.start();
>> >         session.executor.wakeup();
>> >     }
>> >
>> > and the message that is being considered from session (in the other
>> thread)
>> > is sent to the listener before the redelivered message, which may be an
>> > error in order.
>> >
>> > Is this the expected behaviour? I expected that the order was mantained
>> in
>> > this cases.
>> >
>> > Thanks in advance for your help and clarification
>> >
>>



-- 
Clebert Suconic

Reply via email to