Hello,

Here is a poc : https://github.com/fcurvat/amqpoc

Just start an activemq broker (5.18.3) run the class 'PocAmqApplication'
spring boot app and stop it gracefully (app generate 1000 messages at start
/ got 5 consumers with prefetch 100).
Class 'ShutdownEventSource' allow to hook and stop gracefully the amq
consumers.

Often I get one message that is not ack (checking the amq console on
http://localhost:8161/admin/queues.jsp).
Sometimes it takes some time to shutdown because one consumer is still
consuming for a minute or more (that's not the big issue for me, as we can
manage to set a bigger 'terminationGracePeriodSeconds' on kubernetes to let
the app stop).

Best,

Fred

Le mar. 19 mars 2024 à 17:35, Frédéric Curvat <fcur...@gmail.com> a écrit :

> Hi,
>
> I will work on setting up a simple project, for now that's tangled with
> our code.
>
> Fred
>
> Le mar. 19 mars 2024 à 17:09, Matt Pavlovich <mattr...@gmail.com> a
> écrit :
>
>> Hi Frédéric-
>>
>> Do you have a small sample project that is able to reproduce the issue
>> that you can share (preferably a simple GitHub project)?
>>
>> Thanks,
>> Matt Pavlovich
>>
>> > On Mar 19, 2024, at 10:47 AM, Frédéric Curvat <fcur...@gmail.com>
>> wrote:
>> >
>> > Hello,
>> >
>> > Checking back on the case, i played with shutdown of consumers (and not
>> > using brave instrumentation).
>> > With 5 consumers reading, if i close consumers, then sessions, then
>> > connection, i almost always only ack on 4 messages despite 5 messages
>> are
>> > read.
>> > I almost always get one error of this kind :
>> >
>> > java.lang.NullPointerException: Cannot invoke "java.util.List.get(int)"
>> > because "this.synchronizations" is null
>> > at
>> >
>> org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:168)
>> > ~[activemq-client-5.18.3.jar:5.18.3]
>> > at
>> >
>> org.apache.activemq.TransactionContext.rollback(TransactionContext.java:291)
>> > ~[activemq-client-5.18.3.jar:5.18.3]
>> > at
>> org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:606)
>> > ~[activemq-client-5.18.3.jar:5.18.3]
>> > at
>> org.apache.activemq.jms.pool.PooledSession.close(PooledSession.java:118)
>> > ~[activemq-jms-pool-5.18.3.jar:5.18.3]
>> >
>> > I got the same error if close the connection first (because the
>> connection
>> > cleans up the sessions).
>> > I wonder if that's a bug or other misusage on our side.
>> > Client version is 5.18.3
>> >
>> > Any help / thoughts welcome :)
>> >
>> > Best,
>> >
>> > Fred
>> >
>> > Le lun. 11 mars 2024 à 16:27, Frédéric Curvat <fcur...@gmail.com> a
>> écrit :
>> >
>> >> Hello,
>> >>
>> >> More news about our issue.
>> >>
>> >> We did check again the case and i have some news :
>> >> - Nothing bad in broker logs (no poison ack).
>> >> - Application logs shows that issue appears on graceful shutdown of the
>> >> application (however not on all shutdowns)
>> >>
>> >> Our shutdown consists in calling .close() method on all
>> MessageConsumers
>> >> and then call .close() on all Sessions.
>> >> Seems fair to do it like this but reading the javadoc seems we could
>> have
>> >> just called .close() on the Connection.
>> >>
>> >> We checked a couple of issues of reading :
>> >> - In one case, we saw the stacktrace below, all consumers .close() are
>> ok
>> >> but one session is failing to close properly.
>> >> - In the other case, all consumers and sessions are closed without
>> errors.
>> >>
>> >> ======
>> >> java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
>> >> at
>> >>
>> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>> >> at
>> >>
>> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>> >> at
>> >>
>> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266)
>> >> at java.base/java.util.Objects.checkIndex(Objects.java:361)
>> >> at java.base/java.util.ArrayList.get(ArrayList.java:427)
>> >> at
>> >>
>> org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:168)
>> >> at
>> >>
>> org.apache.activemq.TransactionContext.rollback(TransactionContext.java:291)
>> >> at
>> org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:606)
>> >> at
>> >>
>> org.talend.ipaas.rt.amqsource.tracing.micrometer.MicrometerSession.rollback(MicrometerSession.java:102)
>> >> at brave.jms.TracingSession.rollback(TracingSession.java:119)
>> >> at
>> org.apache.activemq.jms.pool.PooledSession.close(PooledSession.java:118)
>> >> at
>> >>
>> org.talend.ipaas.rt.amqsource.impl.AMQEventSourceEngine.unregisterAllEventCallbacks(AMQEventSourceEngine.java:297)
>> >> at
>> >>
>> org.talend.ipaas.rt.amqsource.impl.AMQEventSourceEngine.unbindAll(AMQEventSourceEngine.java:203)
>> >> at
>> >>
>> org.talend.ipaas.rt.springboot.common.shutdown.ShutdownEventSource.onApplicationEvent(ShutdownEventSource.java:38)
>> >> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> >> Method)
>> >> at
>> >>
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>> >> at
>> >>
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>> >> at
>> >>
>> org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:343)
>> >> at
>> >>
>> org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:228)
>> >> at
>> >>
>> org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:165)
>> >> at
>> >>
>> org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
>> >> at
>> >>
>> org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
>> >> at
>> >>
>> org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)
>> >> at
>> >>
>> org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:437)
>> >> at
>> >>
>> org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:370)
>> >> at
>> >>
>> org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1058)
>> >> at
>> >>
>> org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.doClose(ServletWebServerApplicationContext.java:173)
>> >> at
>> >>
>> org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:1026)
>> >> at
>> >>
>> org.springframework.boot.SpringApplicationShutdownHook.closeAndWait(SpringApplicationShutdownHook.java:139)
>> >> at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>> >> at
>> >>
>> org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:108)
>> >> at java.base/java.lang.Thread.run(Thread.java:833)
>> >> ====
>> >>
>> >> More specifically
>> >>> at
>> >>
>> org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:168)
>> >>> at
>> >>
>> org.apache.activemq.TransactionContext.rollback(TransactionContext.java:291)
>> >>> at
>> org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:606)
>> >>> at
>> >>
>> org.talend.ipaas.rt.amqsource.tracing.micrometer.MicrometerSession.rollback(MicrometerSession.java:102)
>> >>> at brave.jms.TracingSession.rollback(TracingSession.java:119)
>> >>> at
>> >>
>> org.apache.activemq.jms.pool.PooledSession.close(PooledSession.java:118)
>> >> I wonder if that's not the brave implementation that triggers the
>> rollback
>> >> of the message reading. That would explain that the message is finally
>> read
>> >> by another consumer.
>> >> I don't know if there is a regression there in brave tracing (or if it
>> is
>> >> silently failling for some time), but we are using this tracing for
>> quite
>> >> some time (and we are not idempotent if replaying the message).
>> >>
>> >> We will dig on the possible brave changes, but still any comment or
>> >> thoughts are welcome.
>> >>
>> >> Best,
>> >>
>> >> Fred
>> >>
>> >> Le lun. 4 mars 2024 à 10:04, Frédéric Curvat <fcur...@gmail.com> a
>> écrit :
>> >>
>> >>> Hello JB !
>> >>>
>> >>> Hope you are well !
>> >>>
>> >>>> 1. The message goes in redelivery (because it expired or client
>> >>>> rollback transaction) and so it can be taken by another consumer. As
>> >>>> you use session_transacted, the "first" client has to deal with the
>> >>>> rollback
>> >>> For me that was not so likely because the message are read with a 1
>> >>> second interval on two different service pods. How could i confirm
>> that ?
>> >>> Also logging the messageId would help right ?
>> >>>
>> >>>> 2. Do you see "poison ack" in the log ?
>> >>> No trace in logs but we will double check.
>> >>>
>> >>>> Oh by the way, what's your consumer prefetch ? I guess it's more
>> than 1 ?
>> >>> Yes, we use prefetch to 100 (over tcp openwire).
>> >>>
>> >>> Thanks for support ! i am off this week but probably i will ping you
>> >>> directly in coming weeks since you proposed it.
>> >>>
>> >>> Best,
>> >>>
>> >>> Fred
>> >>>
>> >>> Le lun. 4 mars 2024 à 07:28, Jean-Baptiste Onofré <j...@nanthrax.net> a
>> >>> écrit :
>> >>>
>> >>>> Oh by the way, what's your consumer prefetch ? I guess it's more
>> than 1 ?
>> >>>>
>> >>>> Regards
>> >>>> JB
>> >>>>
>> >>>> On Fri, Mar 1, 2024 at 4:52 PM Frédéric Curvat <fcur...@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hello !
>> >>>>>
>> >>>>> At my company we are using Apache ActiveMQ 5.18.3.
>> >>>>> We suspect that in some rare cases, a queue message is read twice by
>> >>>>> different consumers.
>> >>>>> For more context :
>> >>>>> - broker is classic primary/secondary (secondary started but not
>> >>>> active -
>> >>>>> not a network of brokers).
>> >>>>> - we are using persisted queues with PostgreSQL backend.
>> >>>>> - A single queue is being read by several consumers : 10 consumers
>> for
>> >>>> a
>> >>>>> single java app deployed in HA other several k8s pods.
>> >>>>> - We use SESSION_TRANSACTED session for either consumers and
>> producers.
>> >>>>> - We use PooledConnectionFactory with 1 connection,
>> >>>>> maximumActiveSessionPerConnection 500, expiryTimeout 10000
>> >>>>> We see no transaction or other error in logs, either service of
>> >>>> activemq
>> >>>>> broker at the time of the "double read".
>> >>>>>
>> >>>>> Has something like this already been seen ? Can it be a bug or a
>> >>>>> misconfiguration somewhere ?
>> >>>>>
>> >>>>> Best,
>> >>>>>
>> >>>>> Fred
>> >>>>
>> >>>
>>
>>

Reply via email to