Quick update. I figured out how to implement the idempotent consumer for a local ActiveMQ instance.
The route should be configured as follows: <route id="IBMMQ.RESPONSE_QUEUE_1"> <description>"RESPONSE_QUEUE_1 from IBMMQ to activemq"</description> <from uri="IBMMQ:RESPONSE_QUEUE_1?cacheLevelName=CACHE_CONSUMER&concurrentConsumers=4&maxConcurrentConsumers=10&maxMessagesPerTask=200&transacted=true&lazyCreateTransactionManager=false&disableReplyTo=true"/> <onException> <exception>java.io.IOException</exception> <exception>org.springframework.jms.IllegalStateException</exception> <exception>com.ibm.mq.MQException</exception> <handled> <constant>true</constant> </handled> <log message="Recoverable exception occurred: ${exception.message}" loggingLevel="ERROR"/> <to uri="activemq:RESPONSE_QUEUE_1?disableReplyTo=true&deliveryPersistent=false"/> </onException> <idempotentConsumer messageIdRepositoryRef="myRepo"> <header>messageId</header> <to uri="activemq:RESPONSE_QUEUE_1?jmsMessageType=Bytes&preserveMessageQos=true&disableReplyTo=true"/> </idempotentConsumer> </route> The idempotent information had to be below the onException entries. I think what I need to understand now is how do I implement idempotent for the entire cluster and/or is there a plan to implement something similar in ActiveMQ Classic like there ids for Artemis that can detect duplicate messages, Jason ________________________________ From: Jason Jackson <jason.jack...@itechag.com> Sent: Wednesday, July 10, 2024 9:16 AM To: users@activemq.apache.org <users@activemq.apache.org> Subject: Re: ActiveMQ Classic Idempotent Settings and Duplicate Messages For this specific issue there is a two-way process. Messages are placed on an ActiveMQ queue, using Camel/Spring I convert/transform the messages and then move the message to IBM MQ, another application picks the message up and process it, a message is then returned to IBM MQ and I then pick that message up from IBM MQ and place it back on ActiveMQ. Everything works correctly and there are know issues unless there is a broker failure, that is when a duplicate reply message is created. We are testing/verifying how the product works/functions before moving into product and are doing mock fail-over/failures. I have three or more ActiveMQ brokers setup/configured as a network of brokers using the following: <networkConnectors> <networkConnector name="AMQ_Cluster" duplex="true" networkTTL="50" decreaseNetworkConsumerPriority="true" dynamicOnly="true" uri="static:(nio+ssl://host1:1111,nio+ssl://host2:1111,nio+ssl://host3:1111)" userName="activemq" password="activemq" /> </networkConnectos> The communication and moving of messages between each of the brokers works correctly and there are no issues. There are no issues with the Camel routing that is taking place while there is no broker failure. The duplicate messages will occur when a broker fails, I suspect it is due to a message being sent but no acknowledgement being received and then the message is resent. Here is an example of my route from ActiveMQ to IBM MQ. <route id="IBMMQ.AMQ_QUEUE_1"> <description>"activemq to IBMMQ queue AMQ_QUEUE_1"</description> <from uri="activemq:AMQ_QUEUE_1?cacheLevelName=CACHE_CONSUMER&concurrentConsumers=4&maxConcurrentConsumers=10&maxMessagesPerTask=200&transacted=true&lazyCreateTransactionManager=false&disableReplyTo=true"/> <onException> <exception>org.apache.camel.TypeConversionException</exception> <exception>org.apache.camel.ValidationException</exception> <handled> <constant>true</constant> </handled> <log message="Unrecoverable exception occurred: ${exception.message}" loggingLevel="ERROR"/> <to uri="activemq:DLQ.AMQ_QUEUE_1?disableReplyTo=true"/> </onException> <onException> <exception>java.io.IOException</exception> <exception>org.springframework.jms.IllegalStateException</exception> <exception>com.ibm.mq.MQException</exception> <handled> <constant>true</constant> </handled> <log message="Recoverable exception occurred: ${exception.message}" loggingLevel="ERROR"/> <to uri="activemq:AMQ_QUEUE_1?disableReplyTo=true&deliveryPersistent=false"/> </onException> <validate> <simple resultType="java.lang.Boolean">${in.headers.JMSCorrelationID} != '' && ${in.headers.JMSCorrelationID} regex '\d+'</simple> </validate> <setHeader headerName="JMS_IBM_Character_Set"> <constant>UTF-8</constant> </setHeader> <setHeader headerName="JMS_IBM_MsgType"> <simple resultType="java.lang.Integer">1</simple> </setHeader> <setHeader headerName="JMS_IBM_PutApplType"> <simple resultType="java.lang.Integer">28</simple> </setHeader> <setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"> <simple resultType="java.lang.Integer">64</simple> </setHeader> <setHeader headerName="JMS_IBM_MQMD_ReplyToQMgr"> <constant>REPLY_QMGR</constant> </setHeader> <setHeader headerName="JMSXAppID"> <constant>pax.Application</constant> </setHeader> <setHeader headerName="JMSXDeliveryCount"> <constant>1</constant> </setHeader> <setHeader headerName="CamelJmsDestinationName"> <constant>queue:///AMQ_QUEUE_1?targetClient=1&mdWriteEnabled=true</constant> </setHeader> <setHeader headerName="JMS_IBM_MQMD_ReplyToQ"> <constant>RESPONSE_QUEUE_1</constant> </setHeader> <to pattern="InOnly" uri="IBMMQ:AMQ_QUEUE_1?jmsMessageType=Bytes&concurrentConsumers=4&maxConcurrentConsumers=10&maxMessagesPerTask=200&preserveMessageQos=true&disableReplyTo=true&includeSentJMSMessageID=true&useMessageIDAsCorrelationID=false"/> </route> Here is an example of my route from IBM MQ to ActiveMQ. <route id="IBMMQ.RESPONSE_QUEUE_1"> <description>"RESPONSE_QUEUE_1 from IBMMQ to activemq"</description> <from uri="IBMMQ:RESPONSE_QUEUE_1?cacheLevelName=CACHE_CONSUMER&concurrentConsumers=4&maxConcurrentConsumers=10&maxMessagesPerTask=200&transacted=true&lazyCreateTransactionManager=false&disableReplyTo=true"/> <onException> <exception>java.io.IOException</exception> <exception>org.springframework.jms.IllegalStateException</exception> <exception>com.ibm.mq.MQException</exception> <handled> <constant>true</constant> </handled> <log message="Recoverable exception occurred: ${exception.message}" loggingLevel="ERROR"/> <to uri="activemq:RESPONSE_QUEUE_1?disableReplyTo=true&deliveryPersistent=false"/> </onException> <to uri="activemq:RESPONSE_QUEUE_1?jmsMessageType=Bytes&preserveMessageQos=true&disableReplyTo=true"/> </route> When you mention am I using a transaction of any kind are you speaking/thinking of something like idempotentcy? I have used ActiveMQ and Camel in the past but not to the extent/degree I am now so I am still learning a lot about how Camel functions and the options that are available. From what I am able to read and understand it would appear I need to implement idempotent as a cluster service due to the fact that I am using ActiveMQ in a cluster environment. I have tried just implementing a local Memory base Idempotent instance but that fails with ActiveMQ also, I know I am off somewhere but I just do not know where. Here is what I have tried that fails, if I could get this to work it would only be valid for the local ActiveMQ instance and not the cluster, the changes in the response route are in yellow. Once I understood this I was hoping to implement idempotent, or something similar for the ActiveMQ cluster. <bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/> <route id="IBMMQ.RESPONSE_QUEUE_1"> <description>"RESPONSE_QUEUE_1 from IBMMQ to activemq"</description> <from uri="IBMMQ:RESPONSE_QUEUE_1?cacheLevelName=CACHE_CONSUMER&concurrentConsumers=4&maxConcurrentConsumers=10&maxMessagesPerTask=200&transacted=true&lazyCreateTransactionManager=false&disableReplyTo=true"/> <idempotentConsumer messageIdRepositoryRef="myRepo"> <header>messageId</header> <onException> <exception>java.io.IOException</exception> <exception>org.springframework.jms.IllegalStateException</exception> <exception>com.ibm.mq.MQException</exception> <handled> <constant>true</constant> </handled> <log message="Recoverable exception occurred: ${exception.message}" loggingLevel="ERROR"/> <to uri="activemq:RESPONSE_QUEUE_1?disableReplyTo=true&deliveryPersistent=false"/> </onException> <to uri="activemq:RESPONSE_QUEUE_1?jmsMessageType=Bytes&preserveMessageQos=true&disableReplyTo=true"/> </idempotentConsumer> </route> Jason ________________________________ From: Justin Bertram <jbert...@apache.org> Sent: Tuesday, July 9, 2024 3:43 PM To: users@activemq.apache.org <users@activemq.apache.org> Subject: Re: ActiveMQ Classic Idempotent Settings and Duplicate Messages CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you recognize the sender and know the content is safe. Are the duplicate messages already in the queue or are they being created during the process of moving them from ActiveMQ Classic to IBM MQ? If the latter, do you know why the duplicates are being created (e.g. the network connection fails)? Are you mitigating duplicates by using a transaction of any kind? How is your existing Camel route configured? Justin On Tue, Jul 9, 2024 at 2:17 PM Jason Jackson <jason.jack...@itechag.com.invalid> wrote: > Good afternoon. > > I was wondering if anyone had any good links of information regarding how > to use Idempotent within ActiveMQ Classic Camel routes. > > I am having an issue with duplicate messages within ActiveMQ Classic > 5.18.4 when messages are moved from ActiveMQ to IBM MQ using Camel. > > From the reading I have done I believe that if I can configure an > Idempotent repository I can use this and ignore duplicate messages. > > On this same note, the brokers are configured as a network of brokers with > duplex=true, does anyone know if this could possibly cause the duplicate > messages? > > > > Jason >