I think you want to be using interceptors (
http://activemq.apache.org/interceptors.html), not consuming from a queue
and then publishing back to the same queue.  I've always believed that
embedded Camel routes couldn't be inserted into the middle of accepting a
message (which is what you really want), but interceptors should be able to
do that.

Tim

On Thu, Apr 28, 2016 at 12:49 PM, Quinn Stevenson <
qu...@pronoia-solutions.com> wrote:

> I’m trying to use the ActiveMQ Broker Camel Component to add some JMS user
> properties to messages as they arrive at the broker.  I’m using a wildcard
> on the from so I can apply the same logic to a set of topics or queues, but
> I can’t seem to send the message back to the original queue.
>
> Without wildcards, it works fine - something like this:
> <beans xmlns="http://www.springframework.org/schema/beans";
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>        xsi:schemaLocation="
>             http://camel.apache.org/schema/spring
> http://camel.apache.org/schema/spring/camel-spring.xsd
>             http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd";>
>
>     <camelContext id="audit-enrichment" xmlns="
> http://camel.apache.org/schema/spring";>
>         <route id="in-audit-to-file">
>             <from uri="broker://queue:in.adt.epic"/>
>             <setHeader headerName="MyCustomHeader">
>                 <constant>MyHeaderValue</constant>
>             </setHeader>
>             <to uri="broker://queue:in.adt.epic" />
>         </route>
>
>     </camelContext>
>
> </beans>
>
> However when I put in the wildcards, I get and IllegalStateException.  The
> configuration I’m trying looks like this
> <beans xmlns="http://www.springframework.org/schema/beans";
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>        xsi:schemaLocation="
>             http://camel.apache.org/schema/spring
> http://camel.apache.org/schema/spring/camel-spring.xsd
>             http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd";>
>
>     <camelContext id="audit-enrichment" xmlns="
> http://camel.apache.org/schema/spring";>
>         <route id="in-audit-to-file">
>             <from uri="broker://queue:in.adt.*"/>
>             <setHeader headerName="MyCustomHeader">
>                 <constant>MyHeaderValue</constant>
>             </setHeader>
>             <recipientList>
>                 <simple>broker://${header[JMSDestination]}</simple>
>             </recipientList>
>         </route>
>
>     </camelContext>
>
> </beans>
>
> And here’s the exception I get
>
> 2016-04-28 12:47:43,721 | ERROR | Failed delivery for (MessageId:
> ID-macpro-local-53588-1461869253207-0-2 on ExchangeId:
> ID-macpro-local-53588-1461869253207-0-3). Exhausted after delivery attempt:
> 1 caught: java.lang.IllegalStateException: Not the original message from
> the broker Message: Enter some text here for the message body...
>
> Message History
>
> ---------------------------------------------------------------------------------------------------------------------------------------
> RouteId              ProcessorId          Processor
>                                                 Elapsed (ms)
> [in-audit-to-file  ] [in-audit-to-file  ] [broker://queue:in.adt.*
>                                                ] [        23]
> [in-audit-to-file  ] [setHeader1        ] [setHeader[MyCustomHeader]
>                                                ] [         3]
> [in-audit-to-file  ] [recipientList1    ]
> [recipientList[simple{broker://${header[JMSDestination]}}]
>    ] [        18]
>
> Exchange
>
> ---------------------------------------------------------------------------------------------------------------------------------------
> Exchange[
>         Id                  ID-macpro-local-53588-1461869253207-0-3
>         ExchangePattern     InOnly
>         Headers
>  {breadcrumbId=ID:macpro.local-53587-1461869252118-4:1:1:1:1,
> CamelRedelivered=false, CamelRedeliveryCounter=0, JMSCorrelationID=,
> JMSCorrelationIDAsBytes=, JMSDeliveryMode=1,
> JMSDestination=queue://in.adt.epic, JMSExpiration=0,
> JMSMessageID=ID:macpro.local-53587-1461869252118-4:1:1:1:1, JMSPriority=0,
> JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1461869263684,
> JMSType=, JMSXGroupID=null, JMSXUserID=null, MyCustomHeader=MyHeaderValue}
>         BodyType            String
>         Body                Enter some text here for the message body...
> ]
>
> Stacktrace
> ---------------------------------------------------------------------------------------------------------------------------------------
> | org.apache.camel.processor.DefaultErrorHandler | ActiveMQ VMTransport:
> vm://localhost#1
> java.lang.IllegalStateException: Not the original message from the broker
> Message: Enter some text here for the message body...
>         at
> org.apache.activemq.camel.component.broker.BrokerProducer.checkOriginalMessage(BrokerProducer.java:95)[activemq-camel-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.camel.component.broker.BrokerProducer.getMessage(BrokerProducer.java:73)[activemq-camel-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.camel.component.broker.BrokerProducer.processInOnly(BrokerProducer.java:55)[activemq-camel-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.camel.component.broker.BrokerProducer.process(BrokerProducer.java:43)[activemq-camel-5.13.2.jar:5.13.2]
>         at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:668)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:596)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:237)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:178)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.RecipientList.process(RecipientList.java:131)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)[camel-core-2.16.2.jar:2.16.2]
>         at
> org.apache.activemq.camel.component.broker.BrokerConsumer.intercept(BrokerConsumer.java:56)[activemq-camel-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.broker.inteceptor.MessageInterceptorFilter.send(MessageInterceptorFilter.java:109)[activemq-broker-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:158)[activemq-broker-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:546)[activemq-broker-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)[activemq-client-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:338)[activemq-broker-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)[activemq-broker-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)[activemq-client-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:271)[activemq-broker-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:112)[activemq-client-5.13.2.jar:5.13.2]
>         at
> org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42)[activemq-client-5.13.2.jar:5.13.2]
> 2016-04-28 12:47:43,727 | WARN  | Error processing intercepted message:
> ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId =
> ID:macpro.local-53587-1461869252118-4:1:1:1:1, originalDestination = null,
> originalTransactionId = null, producerId =
> ID:macpro.local-53587-1461869252118-4:1:1:1, destination =
> queue://in.adt.epic, transactionId = null, expiration = 0, timestamp =
> 1461869263684, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
> correlationId = , replyTo = null, persistent = false, type = , priority =
> 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed =
> false, userID = null, content = null, marshalledProperties = null,
> dataStructure = null, redeliveryCounter = 0, size = 0, properties = null,
> readOnlyProperties = true, readOnlyBody = true, droppable = false,
> jmsXGroupFirstForConsumer = false, text = Enter some text here for the
> message body...}.
> Exchange[ID-macpro-local-53588-1461869253207-0-1][BrokerJmsMessage[JMSMessageID:
> ID:macpro.local-53587-1461869252118-4:1:1:1:1]. Caused by:
> [java.lang.IllegalStateException - Not the original message from the broker
> Message: Enter some text here for the message body...] |
> org.apache.activemq.camel.component.broker.BrokerConsumer | ActiveMQ
> VMTransport: vm://localhost#1
>
> Is there a way to accomplish what I’m after?
>
>
>
>

Reply via email to