Quick update.
I figured out how to implement the idempotent consumer for a local ActiveMQ
instance.
The route should be configured as follows:
<route id="IBMMQ.RESPONSE_QUEUE_1">
<description>"RESPONSE_QUEUE_1 from IBMMQ to activemq"</description>
<from
uri="IBMMQ:RESPONSE_QUEUE_1?cacheLevelName=CACHE_CONSUMER&concurrentConsumers=4&maxConcurrentConsumers=10&maxMessagesPerTask=200&transacted=true&lazyCreateTransactionManager=false&disableReplyTo=true"/>
<onException>
<exception>java.io.IOException</exception>
<exception>org.springframework.jms.IllegalStateException</exception>
<exception>com.ibm.mq.MQException</exception>
<handled>
<constant>true</constant>
</handled>
<log message="Recoverable exception occurred: ${exception.message}"
loggingLevel="ERROR"/>
<to
uri="activemq:RESPONSE_QUEUE_1?disableReplyTo=true&deliveryPersistent=false"/>
</onException>
<idempotentConsumer messageIdRepositoryRef="myRepo">
<header>messageId</header>
<to
uri="activemq:RESPONSE_QUEUE_1?jmsMessageType=Bytes&preserveMessageQos=true&disableReplyTo=true"/>
</idempotentConsumer>
</route>
The idempotent information had to be below the onException entries.
I think what I need to understand now is how do I implement idempotent for the
entire cluster and/or is there a plan to implement something similar in
ActiveMQ Classic like there ids for Artemis that can detect duplicate messages,
Jason
________________________________
From: Jason Jackson <[email protected]>
Sent: Wednesday, July 10, 2024 9:16 AM
To: [email protected] <[email protected]>
Subject: Re: ActiveMQ Classic Idempotent Settings and Duplicate Messages
For this specific issue there is a two-way process. Messages are placed on an
ActiveMQ queue, using Camel/Spring I convert/transform the messages and then
move the message to IBM MQ, another application picks the message up and
process it, a message is then returned to IBM MQ and I then pick that message
up from IBM MQ and place it back on ActiveMQ. Everything works correctly and
there are know issues unless there is a broker failure, that is when a
duplicate reply message is created.
We are testing/verifying how the product works/functions before moving into
product and are doing mock fail-over/failures.
I have three or more ActiveMQ brokers setup/configured as a network of brokers
using the following:
<networkConnectors>
<networkConnector
name="AMQ_Cluster"
duplex="true"
networkTTL="50"
decreaseNetworkConsumerPriority="true"
dynamicOnly="true"
uri="static:(nio+ssl://host1:1111,nio+ssl://host2:1111,nio+ssl://host3:1111)"
userName="activemq"
password="activemq"
/>
</networkConnectos>
The communication and moving of messages between each of the brokers works
correctly and there are no issues. There are no issues with the Camel routing
that is taking place while there is no broker failure.
The duplicate messages will occur when a broker fails, I suspect it is due to a
message being sent but no acknowledgement being received and then the message
is resent.
Here is an example of my route from ActiveMQ to IBM MQ.
<route id="IBMMQ.AMQ_QUEUE_1">
<description>"activemq to IBMMQ queue AMQ_QUEUE_1"</description>
<from
uri="activemq:AMQ_QUEUE_1?cacheLevelName=CACHE_CONSUMER&concurrentConsumers=4&maxConcurrentConsumers=10&maxMessagesPerTask=200&transacted=true&lazyCreateTransactionManager=false&disableReplyTo=true"/>
<onException>
<exception>org.apache.camel.TypeConversionException</exception>
<exception>org.apache.camel.ValidationException</exception>
<handled>
<constant>true</constant>
</handled>
<log message="Unrecoverable exception occurred:
${exception.message}" loggingLevel="ERROR"/>
<to uri="activemq:DLQ.AMQ_QUEUE_1?disableReplyTo=true"/>
</onException>
<onException>
<exception>java.io.IOException</exception>
<exception>org.springframework.jms.IllegalStateException</exception>
<exception>com.ibm.mq.MQException</exception>
<handled>
<constant>true</constant>
</handled>
<log message="Recoverable exception occurred:
${exception.message}" loggingLevel="ERROR"/>
<to
uri="activemq:AMQ_QUEUE_1?disableReplyTo=true&deliveryPersistent=false"/>
</onException>
<validate>
<simple
resultType="java.lang.Boolean">${in.headers.JMSCorrelationID} != '' &&
${in.headers.JMSCorrelationID} regex '\d+'</simple>
</validate>
<setHeader headerName="JMS_IBM_Character_Set">
<constant>UTF-8</constant>
</setHeader>
<setHeader headerName="JMS_IBM_MsgType">
<simple resultType="java.lang.Integer">1</simple>
</setHeader>
<setHeader headerName="JMS_IBM_PutApplType">
<simple resultType="java.lang.Integer">28</simple>
</setHeader>
<setHeader headerName="JMS_IBM_Report_Pass_Correl_ID">
<simple resultType="java.lang.Integer">64</simple>
</setHeader>
<setHeader headerName="JMS_IBM_MQMD_ReplyToQMgr">
<constant>REPLY_QMGR</constant>
</setHeader>
<setHeader headerName="JMSXAppID">
<constant>pax.Application</constant>
</setHeader>
<setHeader headerName="JMSXDeliveryCount">
<constant>1</constant>
</setHeader>
<setHeader headerName="CamelJmsDestinationName">
<constant>queue:///AMQ_QUEUE_1?targetClient=1&mdWriteEnabled=true</constant>
</setHeader>
<setHeader headerName="JMS_IBM_MQMD_ReplyToQ">
<constant>RESPONSE_QUEUE_1</constant>
</setHeader>
<to pattern="InOnly"
uri="IBMMQ:AMQ_QUEUE_1?jmsMessageType=Bytes&concurrentConsumers=4&maxConcurrentConsumers=10&maxMessagesPerTask=200&preserveMessageQos=true&disableReplyTo=true&includeSentJMSMessageID=true&useMessageIDAsCorrelationID=false"/>
</route>
Here is an example of my route from IBM MQ to ActiveMQ.
<route id="IBMMQ.RESPONSE_QUEUE_1">
<description>"RESPONSE_QUEUE_1 from IBMMQ to activemq"</description>
<from
uri="IBMMQ:RESPONSE_QUEUE_1?cacheLevelName=CACHE_CONSUMER&concurrentConsumers=4&maxConcurrentConsumers=10&maxMessagesPerTask=200&transacted=true&lazyCreateTransactionManager=false&disableReplyTo=true"/>
<onException>
<exception>java.io.IOException</exception>
<exception>org.springframework.jms.IllegalStateException</exception>
<exception>com.ibm.mq.MQException</exception>
<handled>
<constant>true</constant>
</handled>
<log message="Recoverable exception occurred: ${exception.message}"
loggingLevel="ERROR"/>
<to
uri="activemq:RESPONSE_QUEUE_1?disableReplyTo=true&deliveryPersistent=false"/>
</onException>
<to
uri="activemq:RESPONSE_QUEUE_1?jmsMessageType=Bytes&preserveMessageQos=true&disableReplyTo=true"/>
</route>
When you mention am I using a transaction of any kind are you speaking/thinking
of something like idempotentcy?
I have used ActiveMQ and Camel in the past but not to the extent/degree I am
now so I am still learning a lot about how Camel functions and the options that
are available.
From what I am able to read and understand it would appear I need to implement
idempotent as a cluster service due to the fact that I am using ActiveMQ in a
cluster environment.
I have tried just implementing a local Memory base Idempotent instance but that
fails with ActiveMQ also, I know I am off somewhere but I just do not know
where.
Here is what I have tried that fails, if I could get this to work it would only
be valid for the local ActiveMQ instance and not the cluster, the changes in
the response route are in yellow. Once I understood this I was hoping to
implement idempotent, or something similar for the ActiveMQ cluster.
<bean id="myRepo"
class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
<route id="IBMMQ.RESPONSE_QUEUE_1">
<description>"RESPONSE_QUEUE_1 from IBMMQ to activemq"</description>
<from
uri="IBMMQ:RESPONSE_QUEUE_1?cacheLevelName=CACHE_CONSUMER&concurrentConsumers=4&maxConcurrentConsumers=10&maxMessagesPerTask=200&transacted=true&lazyCreateTransactionManager=false&disableReplyTo=true"/>
<idempotentConsumer messageIdRepositoryRef="myRepo">
<header>messageId</header>
<onException>
<exception>java.io.IOException</exception>
<exception>org.springframework.jms.IllegalStateException</exception>
<exception>com.ibm.mq.MQException</exception>
<handled>
<constant>true</constant>
</handled>
<log message="Recoverable exception occurred: ${exception.message}"
loggingLevel="ERROR"/>
<to
uri="activemq:RESPONSE_QUEUE_1?disableReplyTo=true&deliveryPersistent=false"/>
</onException>
<to
uri="activemq:RESPONSE_QUEUE_1?jmsMessageType=Bytes&preserveMessageQos=true&disableReplyTo=true"/>
</idempotentConsumer>
</route>
Jason
________________________________
From: Justin Bertram <[email protected]>
Sent: Tuesday, July 9, 2024 3:43 PM
To: [email protected] <[email protected]>
Subject: Re: ActiveMQ Classic Idempotent Settings and Duplicate Messages
CAUTION: This email originated from outside of the organization. Do not click
links or open attachments unless you recognize the sender and know the content
is safe.
Are the duplicate messages already in the queue or are they being created
during the process of moving them from ActiveMQ Classic to IBM MQ? If the
latter, do you know why the duplicates are being created (e.g. the network
connection fails)? Are you mitigating duplicates by using a transaction of
any kind? How is your existing Camel route configured?
Justin
On Tue, Jul 9, 2024 at 2:17 PM Jason Jackson
<[email protected]> wrote:
> Good afternoon.
>
> I was wondering if anyone had any good links of information regarding how
> to use Idempotent within ActiveMQ Classic Camel routes.
>
> I am having an issue with duplicate messages within ActiveMQ Classic
> 5.18.4 when messages are moved from ActiveMQ to IBM MQ using Camel.
>
> From the reading I have done I believe that if I can configure an
> Idempotent repository I can use this and ignore duplicate messages.
>
> On this same note, the brokers are configured as a network of brokers with
> duplex=true, does anyone know if this could possibly cause the duplicate
> messages?
>
>
>
> Jason
>