Hello, all

I have opened a ticket (AMQ-6775
<https://issues.apache.org/jira/browse/AMQ-6775>) to verify an issue which
I explain below. I have added a possible fix,

https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd9a79a183bc

but I would like to know if I am correctly understanding the logic:

I found this situacion in
org.apache.activemq.ActiveMQSessionExecutor.iterate:
As I understand, the idea is that if there are messages queued on the
consumers they are delivered to the listeners and if that is not the case,
you should dispatch the messages queued in the session:
    public boolean iterate() {

        // Deliver any messages queued on the consumer to their listeners.
        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
            if (consumer.iterate()) {
                return true;
            }
        }

        // No messages left queued on the listeners.. so now dispatch
messages
        // queued on the session
        MessageDispatch message = messageQueue.dequeueNoWait();
        if (message == null) {
            return false;
        } else {
            dispatch(message);
            return !messageQueue.isEmpty();
        }
    }
Now the following race condition arises:
1) thread A (ActiveMQ Session Task) invokes .ActiveMQSessionExecutor
2) When this part of the code is executed:
        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
            if (consumer.iterate()) {
                return true;
            }
        }
ActiveMQMessageConsumer.iterate is invoked. There are messages unconsumed
in the consumer, but as unconsumedMessages is not started, a null is
returned as if there were no messages queued.
3) Thread A is interrupted
4) Thread B (ActiveMQConnection[xx]Scheduler) invokes
ActiveMQMessageConsumer.start, unconsumed messages are started.
5) Thread A continues to deliver the messages queued in the session (notice
that if thread B starts unconsumed messages before thread A which happens
most of times the messages in the consumer queue would have been
dispatched).
6) Thread A dispatch a message from the session when there are messages
from the consumer pending.
This race condition makes that in some cases a message which has been
rollbacked and get queued back in the consumer is redelivered after another
message in the session consumer (which was enqueued after the former
message).
Maybe the following commit in my fork would fix the issue, adding a further
verification to make sure that there are no messages queued in the
consumers before deliver the messages in the session:
https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd9a79a183bc

Thanks in advance for any comment,

Fabian

Reply via email to