Thank you very much, Tim! On Thu, Jul 20, 2017 at 11:20 PM, Tim Bain <[email protected]> wrote:
> I forwarded this to the developer mailing list ( > http://activemq.2283324.n4.nabble.com/Fwd-Messages-in- > session-queue-delivered-before-messages-enqueued-in- > consumers-td4728758.html), > so between this thread, that thread, and the JIRA entry and pull request, > hopefully you'll get the feedback you're hoping for. But I can't say for > sure which place you'll get responses, so watch them all. > > Tim > > On Jul 20, 2017 10:13 AM, "Fabian Gonzalez" <[email protected]> > wrote: > > Hello, all > > I have opened a ticket (AMQ-6775 > <https://issues.apache.org/jira/browse/AMQ-6775>) to verify an issue which > I explain below. I have added a possible fix, > > https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd > 9a79a183bc > > but I would like to know if I am correctly understanding the logic: > > I found this situacion in > org.apache.activemq.ActiveMQSessionExecutor.iterate: > As I understand, the idea is that if there are messages queued on the > consumers they are delivered to the listeners and if that is not the case, > you should dispatch the messages queued in the session: > public boolean iterate() { > > // Deliver any messages queued on the consumer to their listeners. > for (ActiveMQMessageConsumer consumer : this.session.consumers) { > if (consumer.iterate()) { > return true; > } > } > > // No messages left queued on the listeners.. so now dispatch > messages > // queued on the session > MessageDispatch message = messageQueue.dequeueNoWait(); > if (message == null) { > return false; > } else { > dispatch(message); > return !messageQueue.isEmpty(); > } > } > Now the following race condition arises: > 1) thread A (ActiveMQ Session Task) invokes .ActiveMQSessionExecutor > 2) When this part of the code is executed: > for (ActiveMQMessageConsumer consumer : this.session.consumers) { > if (consumer.iterate()) { > return true; > } > } > ActiveMQMessageConsumer.iterate is invoked. There are messages unconsumed > in the consumer, but as unconsumedMessages is not started, a null is > returned as if there were no messages queued. > 3) Thread A is interrupted > 4) Thread B (ActiveMQConnection[xx]Scheduler) invokes > ActiveMQMessageConsumer.start, unconsumed messages are started. > 5) Thread A continues to deliver the messages queued in the session (notice > that if thread B starts unconsumed messages before thread A which happens > most of times the messages in the consumer queue would have been > dispatched). > 6) Thread A dispatch a message from the session when there are messages > from the consumer pending. > This race condition makes that in some cases a message which has been > rollbacked and get queued back in the consumer is redelivered after another > message in the session consumer (which was enqueued after the former > message). > Maybe the following commit in my fork would fix the issue, adding a further > verification to make sure that there are no messages queued in the > consumers before deliver the messages in the session: > https://github.com/fsgonz/activemq/commit/0411242fddb16edba38ead18fa99dd > 9a79a183bc > > Thanks in advance for any comment, > > Fabian >
