Hi Frédéric-

A sample project is a big help. First step is to tidy up the dependencies and 
align with Jakarta-based namespace. Using ActiveMQ 6.1.0 instead is the best 
next step. 

I opened a GH issue on your poc repo where other discussions could happen with 
less async conversation between your code tree and this mailing list.

Summary for future lurkers of this email thread:

Spring Boot 3.x is based on Jakarta vs Java EE. Basically, it means you need to 
change your POC to use:

    import jakarta.jms. .. 

instead of 
     
    import javax.jms ..

Let’s get that all cleaned up and aligned properly before looking further at 
any other issues.

Thanks!
Matt Pavlovich

> On Mar 20, 2024, at 9:09 AM, Frédéric Curvat <fcur...@gmail.com> wrote:
> 
> Hello,
> 
> Here is a poc : https://github.com/fcurvat/amqpoc
> 
> Just start an activemq broker (5.18.3) run the class 'PocAmqApplication'
> spring boot app and stop it gracefully (app generate 1000 messages at start
> / got 5 consumers with prefetch 100).
> Class 'ShutdownEventSource' allow to hook and stop gracefully the amq
> consumers.
> 
> Often I get one message that is not ack (checking the amq console on
> http://localhost:8161/admin/queues.jsp).
> Sometimes it takes some time to shutdown because one consumer is still
> consuming for a minute or more (that's not the big issue for me, as we can
> manage to set a bigger 'terminationGracePeriodSeconds' on kubernetes to let
> the app stop).
> 
> Best,
> 
> Fred
> 
> Le mar. 19 mars 2024 à 17:35, Frédéric Curvat <fcur...@gmail.com> a écrit :
> 
>> Hi,
>> 
>> I will work on setting up a simple project, for now that's tangled with
>> our code.
>> 
>> Fred
>> 
>> Le mar. 19 mars 2024 à 17:09, Matt Pavlovich <mattr...@gmail.com> a
>> écrit :
>> 
>>> Hi Frédéric-
>>> 
>>> Do you have a small sample project that is able to reproduce the issue
>>> that you can share (preferably a simple GitHub project)?
>>> 
>>> Thanks,
>>> Matt Pavlovich
>>> 
>>>> On Mar 19, 2024, at 10:47 AM, Frédéric Curvat <fcur...@gmail.com>
>>> wrote:
>>>> 
>>>> Hello,
>>>> 
>>>> Checking back on the case, i played with shutdown of consumers (and not
>>>> using brave instrumentation).
>>>> With 5 consumers reading, if i close consumers, then sessions, then
>>>> connection, i almost always only ack on 4 messages despite 5 messages
>>> are
>>>> read.
>>>> I almost always get one error of this kind :
>>>> 
>>>> java.lang.NullPointerException: Cannot invoke "java.util.List.get(int)"
>>>> because "this.synchronizations" is null
>>>> at
>>>> 
>>> org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:168)
>>>> ~[activemq-client-5.18.3.jar:5.18.3]
>>>> at
>>>> 
>>> org.apache.activemq.TransactionContext.rollback(TransactionContext.java:291)
>>>> ~[activemq-client-5.18.3.jar:5.18.3]
>>>> at
>>> org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:606)
>>>> ~[activemq-client-5.18.3.jar:5.18.3]
>>>> at
>>> org.apache.activemq.jms.pool.PooledSession.close(PooledSession.java:118)
>>>> ~[activemq-jms-pool-5.18.3.jar:5.18.3]
>>>> 
>>>> I got the same error if close the connection first (because the
>>> connection
>>>> cleans up the sessions).
>>>> I wonder if that's a bug or other misusage on our side.
>>>> Client version is 5.18.3
>>>> 
>>>> Any help / thoughts welcome :)
>>>> 
>>>> Best,
>>>> 
>>>> Fred
>>>> 
>>>> Le lun. 11 mars 2024 à 16:27, Frédéric Curvat <fcur...@gmail.com> a
>>> écrit :
>>>> 
>>>>> Hello,
>>>>> 
>>>>> More news about our issue.
>>>>> 
>>>>> We did check again the case and i have some news :
>>>>> - Nothing bad in broker logs (no poison ack).
>>>>> - Application logs shows that issue appears on graceful shutdown of the
>>>>> application (however not on all shutdowns)
>>>>> 
>>>>> Our shutdown consists in calling .close() method on all
>>> MessageConsumers
>>>>> and then call .close() on all Sessions.
>>>>> Seems fair to do it like this but reading the javadoc seems we could
>>> have
>>>>> just called .close() on the Connection.
>>>>> 
>>>>> We checked a couple of issues of reading :
>>>>> - In one case, we saw the stacktrace below, all consumers .close() are
>>> ok
>>>>> but one session is failing to close properly.
>>>>> - In the other case, all consumers and sessions are closed without
>>> errors.
>>>>> 
>>>>> ======
>>>>> java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
>>>>> at
>>>>> 
>>> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>>>>> at
>>>>> 
>>> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>>>>> at
>>>>> 
>>> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266)
>>>>> at java.base/java.util.Objects.checkIndex(Objects.java:361)
>>>>> at java.base/java.util.ArrayList.get(ArrayList.java:427)
>>>>> at
>>>>> 
>>> org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:168)
>>>>> at
>>>>> 
>>> org.apache.activemq.TransactionContext.rollback(TransactionContext.java:291)
>>>>> at
>>> org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:606)
>>>>> at
>>>>> 
>>> org.talend.ipaas.rt.amqsource.tracing.micrometer.MicrometerSession.rollback(MicrometerSession.java:102)
>>>>> at brave.jms.TracingSession.rollback(TracingSession.java:119)
>>>>> at
>>> org.apache.activemq.jms.pool.PooledSession.close(PooledSession.java:118)
>>>>> at
>>>>> 
>>> org.talend.ipaas.rt.amqsource.impl.AMQEventSourceEngine.unregisterAllEventCallbacks(AMQEventSourceEngine.java:297)
>>>>> at
>>>>> 
>>> org.talend.ipaas.rt.amqsource.impl.AMQEventSourceEngine.unbindAll(AMQEventSourceEngine.java:203)
>>>>> at
>>>>> 
>>> org.talend.ipaas.rt.springboot.common.shutdown.ShutdownEventSource.onApplicationEvent(ShutdownEventSource.java:38)
>>>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method)
>>>>> at
>>>>> 
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>>>>> at
>>>>> 
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>>>>> at
>>>>> 
>>> org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:343)
>>>>> at
>>>>> 
>>> org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:228)
>>>>> at
>>>>> 
>>> org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:165)
>>>>> at
>>>>> 
>>> org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
>>>>> at
>>>>> 
>>> org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
>>>>> at
>>>>> 
>>> org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)
>>>>> at
>>>>> 
>>> org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:437)
>>>>> at
>>>>> 
>>> org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:370)
>>>>> at
>>>>> 
>>> org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1058)
>>>>> at
>>>>> 
>>> org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.doClose(ServletWebServerApplicationContext.java:173)
>>>>> at
>>>>> 
>>> org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:1026)
>>>>> at
>>>>> 
>>> org.springframework.boot.SpringApplicationShutdownHook.closeAndWait(SpringApplicationShutdownHook.java:139)
>>>>> at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>>>>> at
>>>>> 
>>> org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:108)
>>>>> at java.base/java.lang.Thread.run(Thread.java:833)
>>>>> ====
>>>>> 
>>>>> More specifically
>>>>>> at
>>>>> 
>>> org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:168)
>>>>>> at
>>>>> 
>>> org.apache.activemq.TransactionContext.rollback(TransactionContext.java:291)
>>>>>> at
>>> org.apache.activemq.ActiveMQSession.rollback(ActiveMQSession.java:606)
>>>>>> at
>>>>> 
>>> org.talend.ipaas.rt.amqsource.tracing.micrometer.MicrometerSession.rollback(MicrometerSession.java:102)
>>>>>> at brave.jms.TracingSession.rollback(TracingSession.java:119)
>>>>>> at
>>>>> 
>>> org.apache.activemq.jms.pool.PooledSession.close(PooledSession.java:118)
>>>>> I wonder if that's not the brave implementation that triggers the
>>> rollback
>>>>> of the message reading. That would explain that the message is finally
>>> read
>>>>> by another consumer.
>>>>> I don't know if there is a regression there in brave tracing (or if it
>>> is
>>>>> silently failling for some time), but we are using this tracing for
>>> quite
>>>>> some time (and we are not idempotent if replaying the message).
>>>>> 
>>>>> We will dig on the possible brave changes, but still any comment or
>>>>> thoughts are welcome.
>>>>> 
>>>>> Best,
>>>>> 
>>>>> Fred
>>>>> 
>>>>> Le lun. 4 mars 2024 à 10:04, Frédéric Curvat <fcur...@gmail.com> a
>>> écrit :
>>>>> 
>>>>>> Hello JB !
>>>>>> 
>>>>>> Hope you are well !
>>>>>> 
>>>>>>> 1. The message goes in redelivery (because it expired or client
>>>>>>> rollback transaction) and so it can be taken by another consumer. As
>>>>>>> you use session_transacted, the "first" client has to deal with the
>>>>>>> rollback
>>>>>> For me that was not so likely because the message are read with a 1
>>>>>> second interval on two different service pods. How could i confirm
>>> that ?
>>>>>> Also logging the messageId would help right ?
>>>>>> 
>>>>>>> 2. Do you see "poison ack" in the log ?
>>>>>> No trace in logs but we will double check.
>>>>>> 
>>>>>>> Oh by the way, what's your consumer prefetch ? I guess it's more
>>> than 1 ?
>>>>>> Yes, we use prefetch to 100 (over tcp openwire).
>>>>>> 
>>>>>> Thanks for support ! i am off this week but probably i will ping you
>>>>>> directly in coming weeks since you proposed it.
>>>>>> 
>>>>>> Best,
>>>>>> 
>>>>>> Fred
>>>>>> 
>>>>>> Le lun. 4 mars 2024 à 07:28, Jean-Baptiste Onofré <j...@nanthrax.net> a
>>>>>> écrit :
>>>>>> 
>>>>>>> Oh by the way, what's your consumer prefetch ? I guess it's more
>>> than 1 ?
>>>>>>> 
>>>>>>> Regards
>>>>>>> JB
>>>>>>> 
>>>>>>> On Fri, Mar 1, 2024 at 4:52 PM Frédéric Curvat <fcur...@gmail.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> Hello !
>>>>>>>> 
>>>>>>>> At my company we are using Apache ActiveMQ 5.18.3.
>>>>>>>> We suspect that in some rare cases, a queue message is read twice by
>>>>>>>> different consumers.
>>>>>>>> For more context :
>>>>>>>> - broker is classic primary/secondary (secondary started but not
>>>>>>> active -
>>>>>>>> not a network of brokers).
>>>>>>>> - we are using persisted queues with PostgreSQL backend.
>>>>>>>> - A single queue is being read by several consumers : 10 consumers
>>> for
>>>>>>> a
>>>>>>>> single java app deployed in HA other several k8s pods.
>>>>>>>> - We use SESSION_TRANSACTED session for either consumers and
>>> producers.
>>>>>>>> - We use PooledConnectionFactory with 1 connection,
>>>>>>>> maximumActiveSessionPerConnection 500, expiryTimeout 10000
>>>>>>>> We see no transaction or other error in logs, either service of
>>>>>>> activemq
>>>>>>>> broker at the time of the "double read".
>>>>>>>> 
>>>>>>>> Has something like this already been seen ? Can it be a bug or a
>>>>>>>> misconfiguration somewhere ?
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> 
>>>>>>>> Fred
>>>>>>> 
>>>>>> 
>>> 
>>> 

Reply via email to