Quick update.

I figured out how to implement the idempotent consumer for a local ActiveMQ 
instance.

The route should be configured as follows:

<route id="IBMMQ.RESPONSE_QUEUE_1">
   <description>"RESPONSE_QUEUE_1 from IBMMQ to activemq"</description>
   <from 
uri="IBMMQ:RESPONSE_QUEUE_1?cacheLevelName=CACHE_CONSUMER&amp;concurrentConsumers=4&amp;maxConcurrentConsumers=10&amp;maxMessagesPerTask=200&amp;transacted=true&amp;lazyCreateTransactionManager=false&amp;disableReplyTo=true"/>
      <onException>
         <exception>java.io.IOException</exception>
         <exception>org.springframework.jms.IllegalStateException</exception>
         <exception>com.ibm.mq.MQException</exception>
         <handled>
            <constant>true</constant>
         </handled>
         <log message="Recoverable exception occurred: ${exception.message}" 
loggingLevel="ERROR"/>
         <to 
uri="activemq:RESPONSE_QUEUE_1?disableReplyTo=true&amp;deliveryPersistent=false"/>
      </onException>
   <idempotentConsumer messageIdRepositoryRef="myRepo">
   <header>messageId</header>
      <to 
uri="activemq:RESPONSE_QUEUE_1?jmsMessageType=Bytes&amp;preserveMessageQos=true&amp;disableReplyTo=true"/>
   </idempotentConsumer>
</route>

The idempotent information had to be below the onException entries.

I think what I need to understand now is how do I implement idempotent for the 
entire cluster and/or is there a plan to implement something similar in 
ActiveMQ Classic like there ids for Artemis that can detect duplicate messages,

Jason



________________________________
From: Jason Jackson <jason.jack...@itechag.com>
Sent: Wednesday, July 10, 2024 9:16 AM
To: users@activemq.apache.org <users@activemq.apache.org>
Subject: Re: ActiveMQ Classic Idempotent Settings and Duplicate Messages

For this specific issue there is a two-way process.  Messages are placed on an 
ActiveMQ queue, using Camel/Spring I convert/transform the messages and then 
move the message to IBM MQ, another application picks the message up and 
process it, a message is then returned to IBM MQ and I then pick that message 
up from IBM MQ and place it back on ActiveMQ.  Everything works correctly and 
there are know issues unless there is a broker failure, that is when a 
duplicate reply message is created.

We are testing/verifying how the product works/functions before moving into 
product and are doing mock fail-over/failures.

I have three or more ActiveMQ brokers setup/configured as a network of brokers 
using the following:

<networkConnectors>
    <networkConnector
        name="AMQ_Cluster"
        duplex="true"
        networkTTL="50"
        decreaseNetworkConsumerPriority="true"
        dynamicOnly="true"
        
uri="static:(nio+ssl://host1:1111,nio+ssl://host2:1111,nio+ssl://host3:1111)"
        userName="activemq"
        password="activemq"
    />
</networkConnectos>

The communication and moving of messages between each of the brokers works 
correctly and there are no issues.  There are no issues with the Camel routing 
that is taking place while there is no broker failure.

The duplicate messages will occur when a broker fails, I suspect it is due to a 
message being sent but no acknowledgement being received and then the message 
is resent.

Here is an example of my route from ActiveMQ to IBM MQ.

<route id="IBMMQ.AMQ_QUEUE_1">
         <description>"activemq to IBMMQ queue AMQ_QUEUE_1"</description>
         <from 
uri="activemq:AMQ_QUEUE_1?cacheLevelName=CACHE_CONSUMER&amp;concurrentConsumers=4&amp;maxConcurrentConsumers=10&amp;maxMessagesPerTask=200&amp;transacted=true&amp;lazyCreateTransactionManager=false&amp;disableReplyTo=true"/>
            <onException>
               <exception>org.apache.camel.TypeConversionException</exception>
               <exception>org.apache.camel.ValidationException</exception>
               <handled>
                  <constant>true</constant>
               </handled>
               <log message="Unrecoverable exception occurred: 
${exception.message}" loggingLevel="ERROR"/>
               <to uri="activemq:DLQ.AMQ_QUEUE_1?disableReplyTo=true"/>
            </onException>
            <onException>
               <exception>java.io.IOException</exception>
               
<exception>org.springframework.jms.IllegalStateException</exception>
               <exception>com.ibm.mq.MQException</exception>
               <handled>
                  <constant>true</constant>
               </handled>
               <log message="Recoverable exception occurred: 
${exception.message}" loggingLevel="ERROR"/>
               <to 
uri="activemq:AMQ_QUEUE_1?disableReplyTo=true&amp;deliveryPersistent=false"/>
            </onException>
            <validate>
               <simple 
resultType="java.lang.Boolean">${in.headers.JMSCorrelationID} != '' &amp;&amp; 
${in.headers.JMSCorrelationID} regex '\d+'</simple>
            </validate>
            <setHeader headerName="JMS_IBM_Character_Set">
               <constant>UTF-8</constant>
            </setHeader>
            <setHeader headerName="JMS_IBM_MsgType">
               <simple resultType="java.lang.Integer">1</simple>
            </setHeader>
            <setHeader headerName="JMS_IBM_PutApplType">
               <simple resultType="java.lang.Integer">28</simple>
            </setHeader>
            <setHeader headerName="JMS_IBM_Report_Pass_Correl_ID">
               <simple resultType="java.lang.Integer">64</simple>
            </setHeader>
            <setHeader headerName="JMS_IBM_MQMD_ReplyToQMgr">
               <constant>REPLY_QMGR</constant>
            </setHeader>
            <setHeader headerName="JMSXAppID">
               <constant>pax.Application</constant>
            </setHeader>
            <setHeader headerName="JMSXDeliveryCount">
               <constant>1</constant>
            </setHeader>
            <setHeader headerName="CamelJmsDestinationName">
               
<constant>queue:///AMQ_QUEUE_1?targetClient=1&amp;mdWriteEnabled=true</constant>
            </setHeader>
            <setHeader headerName="JMS_IBM_MQMD_ReplyToQ">
               <constant>RESPONSE_QUEUE_1</constant>
            </setHeader>
            <to pattern="InOnly" 
uri="IBMMQ:AMQ_QUEUE_1?jmsMessageType=Bytes&amp;concurrentConsumers=4&amp;maxConcurrentConsumers=10&amp;maxMessagesPerTask=200&amp;preserveMessageQos=true&amp;disableReplyTo=true&amp;includeSentJMSMessageID=true&amp;useMessageIDAsCorrelationID=false"/>
      </route>


Here is an example of my route from IBM MQ to ActiveMQ.

<route id="IBMMQ.RESPONSE_QUEUE_1">
   <description>"RESPONSE_QUEUE_1 from IBMMQ to activemq"</description>
   <from 
uri="IBMMQ:RESPONSE_QUEUE_1?cacheLevelName=CACHE_CONSUMER&amp;concurrentConsumers=4&amp;maxConcurrentConsumers=10&amp;maxMessagesPerTask=200&amp;transacted=true&amp;lazyCreateTransactionManager=false&amp;disableReplyTo=true"/>
      <onException>
         <exception>java.io.IOException</exception>
         <exception>org.springframework.jms.IllegalStateException</exception>
         <exception>com.ibm.mq.MQException</exception>
         <handled>
            <constant>true</constant>
         </handled>
         <log message="Recoverable exception occurred: ${exception.message}" 
loggingLevel="ERROR"/>
         <to 
uri="activemq:RESPONSE_QUEUE_1?disableReplyTo=true&amp;deliveryPersistent=false"/>
      </onException>
   <to 
uri="activemq:RESPONSE_QUEUE_1?jmsMessageType=Bytes&amp;preserveMessageQos=true&amp;disableReplyTo=true"/>
</route>

When you mention am I using a transaction of any kind are you speaking/thinking 
of something like idempotentcy?

I have used ActiveMQ and Camel in the past but not to the extent/degree I am 
now so I am still learning a lot about how Camel functions and the options that 
are available.

From what I am able to read and understand it would appear I need to implement 
idempotent as a cluster service due to the fact that I am using ActiveMQ in a 
cluster environment.

I have tried just implementing a local Memory base Idempotent instance but that 
fails with ActiveMQ also, I know I am off somewhere but I just do not know 
where.

Here is what I have tried that fails, if I could get this to work it would only 
be valid for the local ActiveMQ instance and not the cluster, the changes in 
the response route are in yellow.  Once I understood this I was hoping to 
implement idempotent, or something similar for the ActiveMQ cluster.

<bean id="myRepo" 
class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

<route id="IBMMQ.RESPONSE_QUEUE_1">
   <description>"RESPONSE_QUEUE_1 from IBMMQ to activemq"</description>
   <from 
uri="IBMMQ:RESPONSE_QUEUE_1?cacheLevelName=CACHE_CONSUMER&amp;concurrentConsumers=4&amp;maxConcurrentConsumers=10&amp;maxMessagesPerTask=200&amp;transacted=true&amp;lazyCreateTransactionManager=false&amp;disableReplyTo=true"/>
   <idempotentConsumer messageIdRepositoryRef="myRepo">
   <header>messageId</header>
      <onException>
         <exception>java.io.IOException</exception>
         <exception>org.springframework.jms.IllegalStateException</exception>
         <exception>com.ibm.mq.MQException</exception>
         <handled>
            <constant>true</constant>
         </handled>
         <log message="Recoverable exception occurred: ${exception.message}" 
loggingLevel="ERROR"/>
         <to 
uri="activemq:RESPONSE_QUEUE_1?disableReplyTo=true&amp;deliveryPersistent=false"/>
      </onException>
   <to 
uri="activemq:RESPONSE_QUEUE_1?jmsMessageType=Bytes&amp;preserveMessageQos=true&amp;disableReplyTo=true"/>
   </idempotentConsumer>
</route>

Jason


________________________________
From: Justin Bertram <jbert...@apache.org>
Sent: Tuesday, July 9, 2024 3:43 PM
To: users@activemq.apache.org <users@activemq.apache.org>
Subject: Re: ActiveMQ Classic Idempotent Settings and Duplicate Messages

CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.


Are the duplicate messages already in the queue or are they being created
during the process of moving them from ActiveMQ Classic to IBM MQ? If the
latter, do you know why the duplicates are being created (e.g. the network
connection fails)? Are you mitigating duplicates by using a transaction of
any kind? How is your existing Camel route configured?


Justin

On Tue, Jul 9, 2024 at 2:17 PM Jason Jackson
<jason.jack...@itechag.com.invalid> wrote:

> Good afternoon.
>
> I was wondering if anyone had any good links of information regarding how
> to use Idempotent within ActiveMQ Classic Camel routes.
>
> I am having an issue with duplicate messages within ActiveMQ Classic
> 5.18.4 when messages are moved from ActiveMQ to IBM MQ using Camel.
>
> From the reading I have done I believe that if I can configure an
> Idempotent repository I can use this and ignore duplicate messages.
>
> On this same note, the brokers are configured as a network of brokers with
> duplex=true, does anyone know if this could possibly cause the duplicate
> messages?
>
>
>
> Jason
>

Reply via email to