Hello, Here is a poc : https://github.com/fcurvat/amqpoc
Just start an activemq broker (5.18.3) run the class 'PocAmqApplication' spring boot app and stop it gracefully (app generate 1000 messages at start / got 5 consumers with prefetch 100). Class 'ShutdownEventSource' allow to hook and stop gracefully the amq consumers. Often I get one message that is not ack (checking the amq console on http://localhost:8161/admin/queues.jsp). Sometimes it takes some time to shutdown because one consumer is still consuming for a minute or more (that's not the big issue for me, as we can manage to set a bigger 'terminationGracePeriodSeconds' on kubernetes to let the app stop). Best, Fred Le mar. 19 mars 2024 à 17:35, Frédéric Curvat <fcur...@gmail.com> a écrit : > Hi, > > I will work on setting up a simple project, for now that's tangled with > our code. > > Fred > > Le mar. 19 mars 2024 à 17:09, Matt Pavlovich <mattr...@gmail.com> a > écrit : > >> Hi Frédéric- >> >> Do you have a small sample project that is able to reproduce the issue >> that you can share (preferably a simple GitHub project)? >> >> Thanks, >> Matt Pavlovich >> >> > On Mar 19, 2024, at 10:47 AM, Frédéric Curvat <fcur...@gmail.com> >> wrote: >> > >> > Hello, >> > >> > Checking back on the case, i played with shutdown of consumers (and not >> > using brave instrumentation). >> > With 5 consumers reading, if i close consumers, then sessions, then >> > connection, i almost always only ack on 4 messages despite 5 messages >> are >> > read. >> > I almost always get one error of this kind : >> > >> > java.lang.NullPointerException: Cannot invoke "java.util.List.get(int)" >> > because "this.synchronizations" is null >> > at >> > >> org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:168) >> > ~[activemq-client-5.18.3.jar:5.18.3] >> > at >> > >> org.apache.activemq.TransactionContext.rollback(TransactionContext.java:291) >> > ~[activemq-client-5.18.3.jar:5.18.3] >> > at >> org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:606) >> > ~[activemq-client-5.18.3.jar:5.18.3] >> > at >> org.apache.activemq.jms.pool.PooledSession.close(PooledSession.java:118) >> > ~[activemq-jms-pool-5.18.3.jar:5.18.3] >> > >> > I got the same error if close the connection first (because the >> connection >> > cleans up the sessions). >> > I wonder if that's a bug or other misusage on our side. >> > Client version is 5.18.3 >> > >> > Any help / thoughts welcome :) >> > >> > Best, >> > >> > Fred >> > >> > Le lun. 11 mars 2024 à 16:27, Frédéric Curvat <fcur...@gmail.com> a >> écrit : >> > >> >> Hello, >> >> >> >> More news about our issue. >> >> >> >> We did check again the case and i have some news : >> >> - Nothing bad in broker logs (no poison ack). >> >> - Application logs shows that issue appears on graceful shutdown of the >> >> application (however not on all shutdowns) >> >> >> >> Our shutdown consists in calling .close() method on all >> MessageConsumers >> >> and then call .close() on all Sessions. >> >> Seems fair to do it like this but reading the javadoc seems we could >> have >> >> just called .close() on the Connection. >> >> >> >> We checked a couple of issues of reading : >> >> - In one case, we saw the stacktrace below, all consumers .close() are >> ok >> >> but one session is failing to close properly. >> >> - In the other case, all consumers and sessions are closed without >> errors. >> >> >> >> ====== >> >> java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1 >> >> at >> >> >> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) >> >> at >> >> >> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) >> >> at >> >> >> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266) >> >> at java.base/java.util.Objects.checkIndex(Objects.java:361) >> >> at java.base/java.util.ArrayList.get(ArrayList.java:427) >> >> at >> >> >> org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:168) >> >> at >> >> >> org.apache.activemq.TransactionContext.rollback(TransactionContext.java:291) >> >> at >> org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:606) >> >> at >> >> >> org.talend.ipaas.rt.amqsource.tracing.micrometer.MicrometerSession.rollback(MicrometerSession.java:102) >> >> at brave.jms.TracingSession.rollback(TracingSession.java:119) >> >> at >> org.apache.activemq.jms.pool.PooledSession.close(PooledSession.java:118) >> >> at >> >> >> org.talend.ipaas.rt.amqsource.impl.AMQEventSourceEngine.unregisterAllEventCallbacks(AMQEventSourceEngine.java:297) >> >> at >> >> >> org.talend.ipaas.rt.amqsource.impl.AMQEventSourceEngine.unbindAll(AMQEventSourceEngine.java:203) >> >> at >> >> >> org.talend.ipaas.rt.springboot.common.shutdown.ShutdownEventSource.onApplicationEvent(ShutdownEventSource.java:38) >> >> at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> >> Method) >> >> at >> >> >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) >> >> at >> >> >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> at java.base/java.lang.reflect.Method.invoke(Method.java:568) >> >> at >> >> >> org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:343) >> >> at >> >> >> org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:228) >> >> at >> >> >> org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:165) >> >> at >> >> >> org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) >> >> at >> >> >> org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) >> >> at >> >> >> org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143) >> >> at >> >> >> org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:437) >> >> at >> >> >> org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:370) >> >> at >> >> >> org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1058) >> >> at >> >> >> org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.doClose(ServletWebServerApplicationContext.java:173) >> >> at >> >> >> org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:1026) >> >> at >> >> >> org.springframework.boot.SpringApplicationShutdownHook.closeAndWait(SpringApplicationShutdownHook.java:139) >> >> at java.base/java.lang.Iterable.forEach(Iterable.java:75) >> >> at >> >> >> org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:108) >> >> at java.base/java.lang.Thread.run(Thread.java:833) >> >> ==== >> >> >> >> More specifically >> >>> at >> >> >> org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:168) >> >>> at >> >> >> org.apache.activemq.TransactionContext.rollback(TransactionContext.java:291) >> >>> at >> org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:606) >> >>> at >> >> >> org.talend.ipaas.rt.amqsource.tracing.micrometer.MicrometerSession.rollback(MicrometerSession.java:102) >> >>> at brave.jms.TracingSession.rollback(TracingSession.java:119) >> >>> at >> >> >> org.apache.activemq.jms.pool.PooledSession.close(PooledSession.java:118) >> >> I wonder if that's not the brave implementation that triggers the >> rollback >> >> of the message reading. That would explain that the message is finally >> read >> >> by another consumer. >> >> I don't know if there is a regression there in brave tracing (or if it >> is >> >> silently failling for some time), but we are using this tracing for >> quite >> >> some time (and we are not idempotent if replaying the message). >> >> >> >> We will dig on the possible brave changes, but still any comment or >> >> thoughts are welcome. >> >> >> >> Best, >> >> >> >> Fred >> >> >> >> Le lun. 4 mars 2024 à 10:04, Frédéric Curvat <fcur...@gmail.com> a >> écrit : >> >> >> >>> Hello JB ! >> >>> >> >>> Hope you are well ! >> >>> >> >>>> 1. The message goes in redelivery (because it expired or client >> >>>> rollback transaction) and so it can be taken by another consumer. As >> >>>> you use session_transacted, the "first" client has to deal with the >> >>>> rollback >> >>> For me that was not so likely because the message are read with a 1 >> >>> second interval on two different service pods. How could i confirm >> that ? >> >>> Also logging the messageId would help right ? >> >>> >> >>>> 2. Do you see "poison ack" in the log ? >> >>> No trace in logs but we will double check. >> >>> >> >>>> Oh by the way, what's your consumer prefetch ? I guess it's more >> than 1 ? >> >>> Yes, we use prefetch to 100 (over tcp openwire). >> >>> >> >>> Thanks for support ! i am off this week but probably i will ping you >> >>> directly in coming weeks since you proposed it. >> >>> >> >>> Best, >> >>> >> >>> Fred >> >>> >> >>> Le lun. 4 mars 2024 à 07:28, Jean-Baptiste Onofré <j...@nanthrax.net> a >> >>> écrit : >> >>> >> >>>> Oh by the way, what's your consumer prefetch ? I guess it's more >> than 1 ? >> >>>> >> >>>> Regards >> >>>> JB >> >>>> >> >>>> On Fri, Mar 1, 2024 at 4:52 PM Frédéric Curvat <fcur...@gmail.com> >> >>>> wrote: >> >>>>> >> >>>>> Hello ! >> >>>>> >> >>>>> At my company we are using Apache ActiveMQ 5.18.3. >> >>>>> We suspect that in some rare cases, a queue message is read twice by >> >>>>> different consumers. >> >>>>> For more context : >> >>>>> - broker is classic primary/secondary (secondary started but not >> >>>> active - >> >>>>> not a network of brokers). >> >>>>> - we are using persisted queues with PostgreSQL backend. >> >>>>> - A single queue is being read by several consumers : 10 consumers >> for >> >>>> a >> >>>>> single java app deployed in HA other several k8s pods. >> >>>>> - We use SESSION_TRANSACTED session for either consumers and >> producers. >> >>>>> - We use PooledConnectionFactory with 1 connection, >> >>>>> maximumActiveSessionPerConnection 500, expiryTimeout 10000 >> >>>>> We see no transaction or other error in logs, either service of >> >>>> activemq >> >>>>> broker at the time of the "double read". >> >>>>> >> >>>>> Has something like this already been seen ? Can it be a bug or a >> >>>>> misconfiguration somewhere ? >> >>>>> >> >>>>> Best, >> >>>>> >> >>>>> Fred >> >>>> >> >>> >> >>