Your use of an exclusive reply queue with more than 1 producer is broken. The point of an exclusive reply queue is that your publisher knows it's the only one publishing messages for which the replies will go on that reply queue, because all other publishers will use different reply queues for their reply messages. So what you're doing in #2 and #3 isn't valid; each of those producers needs a unique reply queue, which means it's not possible to reuse the URL without changing the replyTo parameter.
I'm not familiar with what you mean by dynamically updating the URL, but if what you're doing is creating a new producer without stopping the previous one, then that's broken too if you aren't changing the replyTo parameter. You need to make sure that you're shutting down the existing consumers before you start the new route. Maybe Camel is supposed to be doing that and isn't (which would be a bug in Camel that you should pursue with them if you haven't already), and maybe it's not supposed to stop the consumers and you're wrongly assuming it will do it for you. Either way, you're going to get better feedback on the Camel mailing list (or on StackOverflow) if you haven't already done so. Good luck. On Tue, Mar 17, 2015 at 10:03 AM, agentalpha <yogesh.l...@gmail.com> wrote: > Hi Everyone, > > We are using Camel 2.13.2 and ActiveMQ 5.9.0. > The configuration of activemq broker is as follows: > <bean id="activemq" > class="org.apache.activemq.camel.component.ActiveMQComponent"> > <property name="configuration" ref="jmsConfig" /> > <property name="transacted" value="false" /> > <property name="acceptMessagesWhileStopping" value="false" > /> > <property name="cacheLevelName" value="CACHE_CONSUMER" /> > </bean> > > <bean id="jmsConnectionFactory" > class="org.apache.activemq.ActiveMQConnectionFactory"> > <property name="brokerURL" value="$[ren.brokerUrl]" /> > <property name="useAsyncSend" value="true" /> > </bean> > > <bean id="pooledConnectionFactory" > class="org.apache.activemq.pool.PooledConnectionFactory" > init-method="start" destroy-method="stop"> > <property name="maxConnections" value="10" /> > <property name="maximumActiveSessionPerConnection" > value="-1" /> > <property name="expiryTimeout" value="0" /> > <property name="idleTimeout" value="0" /> > <property name="connectionFactory" > ref="jmsConnectionFactory" /> > </bean> > > <bean id="jmsConfig" > class="org.apache.camel.component.jms.JmsConfiguration"> > <property name="connectionFactory" > ref="pooledConnectionFactory" /> > <property name="concurrentConsumers" value="15" /> > <property name="maxConcurrentConsumers" value="15" /> > <property name="maxMessagesPerTask" value="5" /> > <property name="idleTaskExecutionLimit" value="0" /> > <property name="idleConsumerLimit" value="0" /> > </bean> > > > As mentioned, we have concurrent and max concurrent consumers having value > 15. > > We have following route sample: > <route id="openApiRoute" trace="true" autoStartup="false" > xmlns="http://camel.apache.org/schema/blueprint"> > <from uri="openAPI" /> > <onException> > <exception>java.lang.Throwable</exception> > <handled> > <constant>true</constant> > </handled> > <bean ref="errorHandlerBean" /> > <convertBodyTo > type="com.company.openapi.SoapMap" /> > </onException> > <onException> > <exception>java.lang.Exception</exception> > <handled> > <constant>true</constant> > </handled> > <bean ref="errorHandlerBean" /> > <convertBodyTo > type="com.company.openapi.SoapMap" /> > </onException> > <onException> > > <exception>org.apache.camel.ExchangeTimedOutException</exception> > <redeliveryPolicy logRetryAttempted="true" > retryAttemptedLogLevel="WARN" > > > maximumRedeliveries="{{ren.context.camel.openApiRoute.maximumRedeliveries}}" > > redeliveryDelay="{{ren.context.camel.openApiRoute.redeliveryDelay}}" /> > <handled> > <simple>${header.requestType} != > 'broadcast'</simple> > </handled> > <choice> > <when> > > <simple>${header.requestType} != 'broadcast'</simple> > <bean > ref="errorHandlerBean" > /> > <convertBodyTo > type="com.company.openapi.SoapMap" /> > </when> > </choice> > </onException> > > <convertBodyTo type="java.util.Map" /> > <choice> > <when> > <simple>${header.requestType} == > 'broadcast'</simple> > <bean ref ="broadcasterBean"/> > <recipientList > parallelProcessing="true" strategyRef="aggregatorStrategy" streaming="true" > stopOnException="false" > prop:timeout="{{ren.context.camel.httpRoute.aggregatorTimeout}}"> > > <header>recipientList</header> > </recipientList> > <process > ref="broadcastResultProcessor" /> > </when> > <when> > <simple>${header.requestType} == > 'forward'</simple> > <bean ref="dynamicRouterBean" > method="route" /> > </when> > <otherwise> > <bean > ref="operationTypeNotSupported" > method="throwOperationTypeNotSupportedException" /> > </otherwise> > </choice> > <convertBodyTo type="com.company.openapi.SoapMap" > /> > </route> > > > What we are essentially doing, is fetching a request on CXF and forwarding > it to a jms endpoint using IN/OUT messaging. We use dynamic router for > performing this operation dynamically. > A sample dynamic url for this jms endpoint is : > > > activemq:queue:soapRequestQ2?exchangePattern=InOut&asyncConsumer=true&useMessageIDAsCorrelationID=false&requestTimeout=5000&transacted=false&replyTo=responseQ&replyToType=Exclusive > > Now few problematic behaviors of automatic consumer creation on > responseQueue are: > 1. Whenever the route starts and requests are sent, 15 consumers are > created > on the response queue. After that no matter how much the load is, these 15 > consumers are handling the requests properly (as per our tps expectations). > Now if we dynamically update the above sample jms url in our system (even > just the value of requestTimeout) and send another request, we see that the > reponseQ now has additional 15 (total of 30) consumers on it. And right > after this, most of the requests start failing for unable to map the > response to the request. > The message is sent in the request queue, picked up by the client, the > response is sent, which we can see being enqueued adn dequeued in the > response queue (from activeMQ web console), the response is fetched by our > application but QueueReplyManager fails to map it to original request, > mentioning Response received for unknown correlationId xyz. And just after > a > few seconds we see the exchangetimeout happening for the message with same > correlationid. > Is this because the new set of 15 consumers got created in different > session > than the original one and hence they are not able to share the > correlationId? > Even though I have maxConcurrentConsumers set to 15 for that specific > queue, > why is it still increasing beyond that value? (for every change in url, it > keeps on increasing by 15). > > 2. We have same behaviour when we use the same queue/endpoint url for > single > request and in receipient list. > The single forward request registers its own 15 consumers and when we > perform a receipient list url, it creates additional 15 consumers on the > queues (again maxconcurrentconsumers is 15 only). > > 3. When use the dynamic router (routing to the same jms in/out url) in > multiple routes, we always have 15 consumers on the response queue. > But we use receipient list in multiple routes (routing to same set of jms > in/out urls), it creates set of 15 consumers per route (meaning if we have > 3 > routes it creates 45 consumers). > And again we start getting the same issue of response not being mapped to > the request for unknown correlation id. > > Is there any book/blog from where I can understand how and why is this > happening? How does camel/activemq creates and manages consumers on queues, > specially when we are using the building blocks of dynamic router, > receipient list, activemq endpoint etc. > > I have been trying to understand and find a correct solution for this since > couple of weeks now. > Please help me in this. > Thanks. > > BR! > Yogesh > > > > -- > View this message in context: > http://activemq.2283324.n4.nabble.com/ActiveMQ-using-in-Camel-In-OUT-endpoint-creates-additional-consumers-on-response-queue-which-are-mors-tp4693377.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. >