Hi Frédéric- A sample project is a big help. First step is to tidy up the dependencies and align with Jakarta-based namespace. Using ActiveMQ 6.1.0 instead is the best next step.
I opened a GH issue on your poc repo where other discussions could happen with less async conversation between your code tree and this mailing list. Summary for future lurkers of this email thread: Spring Boot 3.x is based on Jakarta vs Java EE. Basically, it means you need to change your POC to use: import jakarta.jms. .. instead of import javax.jms .. Let’s get that all cleaned up and aligned properly before looking further at any other issues. Thanks! Matt Pavlovich > On Mar 20, 2024, at 9:09 AM, Frédéric Curvat <fcur...@gmail.com> wrote: > > Hello, > > Here is a poc : https://github.com/fcurvat/amqpoc > > Just start an activemq broker (5.18.3) run the class 'PocAmqApplication' > spring boot app and stop it gracefully (app generate 1000 messages at start > / got 5 consumers with prefetch 100). > Class 'ShutdownEventSource' allow to hook and stop gracefully the amq > consumers. > > Often I get one message that is not ack (checking the amq console on > http://localhost:8161/admin/queues.jsp). > Sometimes it takes some time to shutdown because one consumer is still > consuming for a minute or more (that's not the big issue for me, as we can > manage to set a bigger 'terminationGracePeriodSeconds' on kubernetes to let > the app stop). > > Best, > > Fred > > Le mar. 19 mars 2024 à 17:35, Frédéric Curvat <fcur...@gmail.com> a écrit : > >> Hi, >> >> I will work on setting up a simple project, for now that's tangled with >> our code. >> >> Fred >> >> Le mar. 19 mars 2024 à 17:09, Matt Pavlovich <mattr...@gmail.com> a >> écrit : >> >>> Hi Frédéric- >>> >>> Do you have a small sample project that is able to reproduce the issue >>> that you can share (preferably a simple GitHub project)? >>> >>> Thanks, >>> Matt Pavlovich >>> >>>> On Mar 19, 2024, at 10:47 AM, Frédéric Curvat <fcur...@gmail.com> >>> wrote: >>>> >>>> Hello, >>>> >>>> Checking back on the case, i played with shutdown of consumers (and not >>>> using brave instrumentation). >>>> With 5 consumers reading, if i close consumers, then sessions, then >>>> connection, i almost always only ack on 4 messages despite 5 messages >>> are >>>> read. >>>> I almost always get one error of this kind : >>>> >>>> java.lang.NullPointerException: Cannot invoke "java.util.List.get(int)" >>>> because "this.synchronizations" is null >>>> at >>>> >>> org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:168) >>>> ~[activemq-client-5.18.3.jar:5.18.3] >>>> at >>>> >>> org.apache.activemq.TransactionContext.rollback(TransactionContext.java:291) >>>> ~[activemq-client-5.18.3.jar:5.18.3] >>>> at >>> org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:606) >>>> ~[activemq-client-5.18.3.jar:5.18.3] >>>> at >>> org.apache.activemq.jms.pool.PooledSession.close(PooledSession.java:118) >>>> ~[activemq-jms-pool-5.18.3.jar:5.18.3] >>>> >>>> I got the same error if close the connection first (because the >>> connection >>>> cleans up the sessions). >>>> I wonder if that's a bug or other misusage on our side. >>>> Client version is 5.18.3 >>>> >>>> Any help / thoughts welcome :) >>>> >>>> Best, >>>> >>>> Fred >>>> >>>> Le lun. 11 mars 2024 à 16:27, Frédéric Curvat <fcur...@gmail.com> a >>> écrit : >>>> >>>>> Hello, >>>>> >>>>> More news about our issue. >>>>> >>>>> We did check again the case and i have some news : >>>>> - Nothing bad in broker logs (no poison ack). >>>>> - Application logs shows that issue appears on graceful shutdown of the >>>>> application (however not on all shutdowns) >>>>> >>>>> Our shutdown consists in calling .close() method on all >>> MessageConsumers >>>>> and then call .close() on all Sessions. >>>>> Seems fair to do it like this but reading the javadoc seems we could >>> have >>>>> just called .close() on the Connection. >>>>> >>>>> We checked a couple of issues of reading : >>>>> - In one case, we saw the stacktrace below, all consumers .close() are >>> ok >>>>> but one session is failing to close properly. >>>>> - In the other case, all consumers and sessions are closed without >>> errors. >>>>> >>>>> ====== >>>>> java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1 >>>>> at >>>>> >>> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) >>>>> at >>>>> >>> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) >>>>> at >>>>> >>> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266) >>>>> at java.base/java.util.Objects.checkIndex(Objects.java:361) >>>>> at java.base/java.util.ArrayList.get(ArrayList.java:427) >>>>> at >>>>> >>> org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:168) >>>>> at >>>>> >>> org.apache.activemq.TransactionContext.rollback(TransactionContext.java:291) >>>>> at >>> org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:606) >>>>> at >>>>> >>> org.talend.ipaas.rt.amqsource.tracing.micrometer.MicrometerSession.rollback(MicrometerSession.java:102) >>>>> at brave.jms.TracingSession.rollback(TracingSession.java:119) >>>>> at >>> org.apache.activemq.jms.pool.PooledSession.close(PooledSession.java:118) >>>>> at >>>>> >>> org.talend.ipaas.rt.amqsource.impl.AMQEventSourceEngine.unregisterAllEventCallbacks(AMQEventSourceEngine.java:297) >>>>> at >>>>> >>> org.talend.ipaas.rt.amqsource.impl.AMQEventSourceEngine.unbindAll(AMQEventSourceEngine.java:203) >>>>> at >>>>> >>> org.talend.ipaas.rt.springboot.common.shutdown.ShutdownEventSource.onApplicationEvent(ShutdownEventSource.java:38) >>>>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>> Method) >>>>> at >>>>> >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) >>>>> at >>>>> >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:568) >>>>> at >>>>> >>> org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:343) >>>>> at >>>>> >>> org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:228) >>>>> at >>>>> >>> org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:165) >>>>> at >>>>> >>> org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) >>>>> at >>>>> >>> org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) >>>>> at >>>>> >>> org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143) >>>>> at >>>>> >>> org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:437) >>>>> at >>>>> >>> org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:370) >>>>> at >>>>> >>> org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1058) >>>>> at >>>>> >>> org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.doClose(ServletWebServerApplicationContext.java:173) >>>>> at >>>>> >>> org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:1026) >>>>> at >>>>> >>> org.springframework.boot.SpringApplicationShutdownHook.closeAndWait(SpringApplicationShutdownHook.java:139) >>>>> at java.base/java.lang.Iterable.forEach(Iterable.java:75) >>>>> at >>>>> >>> org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:108) >>>>> at java.base/java.lang.Thread.run(Thread.java:833) >>>>> ==== >>>>> >>>>> More specifically >>>>>> at >>>>> >>> org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:168) >>>>>> at >>>>> >>> org.apache.activemq.TransactionContext.rollback(TransactionContext.java:291) >>>>>> at >>> org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:606) >>>>>> at >>>>> >>> org.talend.ipaas.rt.amqsource.tracing.micrometer.MicrometerSession.rollback(MicrometerSession.java:102) >>>>>> at brave.jms.TracingSession.rollback(TracingSession.java:119) >>>>>> at >>>>> >>> org.apache.activemq.jms.pool.PooledSession.close(PooledSession.java:118) >>>>> I wonder if that's not the brave implementation that triggers the >>> rollback >>>>> of the message reading. That would explain that the message is finally >>> read >>>>> by another consumer. >>>>> I don't know if there is a regression there in brave tracing (or if it >>> is >>>>> silently failling for some time), but we are using this tracing for >>> quite >>>>> some time (and we are not idempotent if replaying the message). >>>>> >>>>> We will dig on the possible brave changes, but still any comment or >>>>> thoughts are welcome. >>>>> >>>>> Best, >>>>> >>>>> Fred >>>>> >>>>> Le lun. 4 mars 2024 à 10:04, Frédéric Curvat <fcur...@gmail.com> a >>> écrit : >>>>> >>>>>> Hello JB ! >>>>>> >>>>>> Hope you are well ! >>>>>> >>>>>>> 1. The message goes in redelivery (because it expired or client >>>>>>> rollback transaction) and so it can be taken by another consumer. As >>>>>>> you use session_transacted, the "first" client has to deal with the >>>>>>> rollback >>>>>> For me that was not so likely because the message are read with a 1 >>>>>> second interval on two different service pods. How could i confirm >>> that ? >>>>>> Also logging the messageId would help right ? >>>>>> >>>>>>> 2. Do you see "poison ack" in the log ? >>>>>> No trace in logs but we will double check. >>>>>> >>>>>>> Oh by the way, what's your consumer prefetch ? I guess it's more >>> than 1 ? >>>>>> Yes, we use prefetch to 100 (over tcp openwire). >>>>>> >>>>>> Thanks for support ! i am off this week but probably i will ping you >>>>>> directly in coming weeks since you proposed it. >>>>>> >>>>>> Best, >>>>>> >>>>>> Fred >>>>>> >>>>>> Le lun. 4 mars 2024 à 07:28, Jean-Baptiste Onofré <j...@nanthrax.net> a >>>>>> écrit : >>>>>> >>>>>>> Oh by the way, what's your consumer prefetch ? I guess it's more >>> than 1 ? >>>>>>> >>>>>>> Regards >>>>>>> JB >>>>>>> >>>>>>> On Fri, Mar 1, 2024 at 4:52 PM Frédéric Curvat <fcur...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>> Hello ! >>>>>>>> >>>>>>>> At my company we are using Apache ActiveMQ 5.18.3. >>>>>>>> We suspect that in some rare cases, a queue message is read twice by >>>>>>>> different consumers. >>>>>>>> For more context : >>>>>>>> - broker is classic primary/secondary (secondary started but not >>>>>>> active - >>>>>>>> not a network of brokers). >>>>>>>> - we are using persisted queues with PostgreSQL backend. >>>>>>>> - A single queue is being read by several consumers : 10 consumers >>> for >>>>>>> a >>>>>>>> single java app deployed in HA other several k8s pods. >>>>>>>> - We use SESSION_TRANSACTED session for either consumers and >>> producers. >>>>>>>> - We use PooledConnectionFactory with 1 connection, >>>>>>>> maximumActiveSessionPerConnection 500, expiryTimeout 10000 >>>>>>>> We see no transaction or other error in logs, either service of >>>>>>> activemq >>>>>>>> broker at the time of the "double read". >>>>>>>> >>>>>>>> Has something like this already been seen ? Can it be a bug or a >>>>>>>> misconfiguration somewhere ? >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Fred >>>>>>> >>>>>> >>> >>>