I think you want to be using interceptors ( http://activemq.apache.org/interceptors.html), not consuming from a queue and then publishing back to the same queue. I've always believed that embedded Camel routes couldn't be inserted into the middle of accepting a message (which is what you really want), but interceptors should be able to do that.
Tim On Thu, Apr 28, 2016 at 12:49 PM, Quinn Stevenson < qu...@pronoia-solutions.com> wrote: > I’m trying to use the ActiveMQ Broker Camel Component to add some JMS user > properties to messages as they arrive at the broker. I’m using a wildcard > on the from so I can apply the same logic to a set of topics or queues, but > I can’t seem to send the message back to the original queue. > > Without wildcards, it works fine - something like this: > <beans xmlns="http://www.springframework.org/schema/beans" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation=" > http://camel.apache.org/schema/spring > http://camel.apache.org/schema/spring/camel-spring.xsd > http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd"> > > <camelContext id="audit-enrichment" xmlns=" > http://camel.apache.org/schema/spring"> > <route id="in-audit-to-file"> > <from uri="broker://queue:in.adt.epic"/> > <setHeader headerName="MyCustomHeader"> > <constant>MyHeaderValue</constant> > </setHeader> > <to uri="broker://queue:in.adt.epic" /> > </route> > > </camelContext> > > </beans> > > However when I put in the wildcards, I get and IllegalStateException. The > configuration I’m trying looks like this > <beans xmlns="http://www.springframework.org/schema/beans" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation=" > http://camel.apache.org/schema/spring > http://camel.apache.org/schema/spring/camel-spring.xsd > http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd"> > > <camelContext id="audit-enrichment" xmlns=" > http://camel.apache.org/schema/spring"> > <route id="in-audit-to-file"> > <from uri="broker://queue:in.adt.*"/> > <setHeader headerName="MyCustomHeader"> > <constant>MyHeaderValue</constant> > </setHeader> > <recipientList> > <simple>broker://${header[JMSDestination]}</simple> > </recipientList> > </route> > > </camelContext> > > </beans> > > And here’s the exception I get > > 2016-04-28 12:47:43,721 | ERROR | Failed delivery for (MessageId: > ID-macpro-local-53588-1461869253207-0-2 on ExchangeId: > ID-macpro-local-53588-1461869253207-0-3). Exhausted after delivery attempt: > 1 caught: java.lang.IllegalStateException: Not the original message from > the broker Message: Enter some text here for the message body... > > Message History > > --------------------------------------------------------------------------------------------------------------------------------------- > RouteId ProcessorId Processor > Elapsed (ms) > [in-audit-to-file ] [in-audit-to-file ] [broker://queue:in.adt.* > ] [ 23] > [in-audit-to-file ] [setHeader1 ] [setHeader[MyCustomHeader] > ] [ 3] > [in-audit-to-file ] [recipientList1 ] > [recipientList[simple{broker://${header[JMSDestination]}}] > ] [ 18] > > Exchange > > --------------------------------------------------------------------------------------------------------------------------------------- > Exchange[ > Id ID-macpro-local-53588-1461869253207-0-3 > ExchangePattern InOnly > Headers > {breadcrumbId=ID:macpro.local-53587-1461869252118-4:1:1:1:1, > CamelRedelivered=false, CamelRedeliveryCounter=0, JMSCorrelationID=, > JMSCorrelationIDAsBytes=, JMSDeliveryMode=1, > JMSDestination=queue://in.adt.epic, JMSExpiration=0, > JMSMessageID=ID:macpro.local-53587-1461869252118-4:1:1:1:1, JMSPriority=0, > JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1461869263684, > JMSType=, JMSXGroupID=null, JMSXUserID=null, MyCustomHeader=MyHeaderValue} > BodyType String > Body Enter some text here for the message body... > ] > > Stacktrace > --------------------------------------------------------------------------------------------------------------------------------------- > | org.apache.camel.processor.DefaultErrorHandler | ActiveMQ VMTransport: > vm://localhost#1 > java.lang.IllegalStateException: Not the original message from the broker > Message: Enter some text here for the message body... > at > org.apache.activemq.camel.component.broker.BrokerProducer.checkOriginalMessage(BrokerProducer.java:95)[activemq-camel-5.13.2.jar:5.13.2] > at > org.apache.activemq.camel.component.broker.BrokerProducer.getMessage(BrokerProducer.java:73)[activemq-camel-5.13.2.jar:5.13.2] > at > org.apache.activemq.camel.component.broker.BrokerProducer.processInOnly(BrokerProducer.java:55)[activemq-camel-5.13.2.jar:5.13.2] > at > org.apache.activemq.camel.component.broker.BrokerProducer.process(BrokerProducer.java:43)[activemq-camel-5.13.2.jar:5.13.2] > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:668)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:596)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:237)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:178)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.RecipientList.process(RecipientList.java:131)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)[camel-core-2.16.2.jar:2.16.2] > at > org.apache.activemq.camel.component.broker.BrokerConsumer.intercept(BrokerConsumer.java:56)[activemq-camel-5.13.2.jar:5.13.2] > at > org.apache.activemq.broker.inteceptor.MessageInterceptorFilter.send(MessageInterceptorFilter.java:109)[activemq-broker-5.13.2.jar:5.13.2] > at > org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:158)[activemq-broker-5.13.2.jar:5.13.2] > at > org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:546)[activemq-broker-5.13.2.jar:5.13.2] > at > org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)[activemq-client-5.13.2.jar:5.13.2] > at > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:338)[activemq-broker-5.13.2.jar:5.13.2] > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)[activemq-broker-5.13.2.jar:5.13.2] > at > org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)[activemq-client-5.13.2.jar:5.13.2] > at > org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.13.2.jar:5.13.2] > at > org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:271)[activemq-broker-5.13.2.jar:5.13.2] > at > org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:112)[activemq-client-5.13.2.jar:5.13.2] > at > org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42)[activemq-client-5.13.2.jar:5.13.2] > 2016-04-28 12:47:43,727 | WARN | Error processing intercepted message: > ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = > ID:macpro.local-53587-1461869252118-4:1:1:1:1, originalDestination = null, > originalTransactionId = null, producerId = > ID:macpro.local-53587-1461869252118-4:1:1:1, destination = > queue://in.adt.epic, transactionId = null, expiration = 0, timestamp = > 1461869263684, arrival = 0, brokerInTime = 0, brokerOutTime = 0, > correlationId = , replyTo = null, persistent = false, type = , priority = > 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = > false, userID = null, content = null, marshalledProperties = null, > dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, > readOnlyProperties = true, readOnlyBody = true, droppable = false, > jmsXGroupFirstForConsumer = false, text = Enter some text here for the > message body...}. > Exchange[ID-macpro-local-53588-1461869253207-0-1][BrokerJmsMessage[JMSMessageID: > ID:macpro.local-53587-1461869252118-4:1:1:1:1]. Caused by: > [java.lang.IllegalStateException - Not the original message from the broker > Message: Enter some text here for the message body...] | > org.apache.activemq.camel.component.broker.BrokerConsumer | ActiveMQ > VMTransport: vm://localhost#1 > > Is there a way to accomplish what I’m after? > > > >