Sorry, I've never used them nor looked for examples, but maybe someone else has. On Apr 29, 2016 7:02 AM, "Quinn Stevenson" <qu...@pronoia-solutions.com> wrote:
> Thank You Tim - > > Is there an example of an interceptor somewhere in the code base I can use > as a reference to get started on this? > > BTW - I did it working with the Camel Broker Component by switching from > <recipientList> to <routingSlip>. I’m not sure why that made a difference, > but it did. > > > On Apr 28, 2016, at 10:53 PM, Tim Bain <tb...@alumni.duke.edu> wrote: > > > > I think you want to be using interceptors ( > > http://activemq.apache.org/interceptors.html), not consuming from a > queue > > and then publishing back to the same queue. I've always believed that > > embedded Camel routes couldn't be inserted into the middle of accepting a > > message (which is what you really want), but interceptors should be able > to > > do that. > > > > Tim > > > > On Thu, Apr 28, 2016 at 12:49 PM, Quinn Stevenson < > > qu...@pronoia-solutions.com> wrote: > > > >> I’m trying to use the ActiveMQ Broker Camel Component to add some JMS > user > >> properties to messages as they arrive at the broker. I’m using a > wildcard > >> on the from so I can apply the same logic to a set of topics or queues, > but > >> I can’t seem to send the message back to the original queue. > >> > >> Without wildcards, it works fine - something like this: > >> <beans xmlns="http://www.springframework.org/schema/beans" > >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > >> xsi:schemaLocation=" > >> http://camel.apache.org/schema/spring > >> http://camel.apache.org/schema/spring/camel-spring.xsd > >> http://www.springframework.org/schema/beans > >> http://www.springframework.org/schema/beans/spring-beans.xsd"> > >> > >> <camelContext id="audit-enrichment" xmlns=" > >> http://camel.apache.org/schema/spring"> > >> <route id="in-audit-to-file"> > >> <from uri="broker://queue:in.adt.epic"/> > >> <setHeader headerName="MyCustomHeader"> > >> <constant>MyHeaderValue</constant> > >> </setHeader> > >> <to uri="broker://queue:in.adt.epic" /> > >> </route> > >> > >> </camelContext> > >> > >> </beans> > >> > >> However when I put in the wildcards, I get and IllegalStateException. > The > >> configuration I’m trying looks like this > >> <beans xmlns="http://www.springframework.org/schema/beans" > >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > >> xsi:schemaLocation=" > >> http://camel.apache.org/schema/spring > >> http://camel.apache.org/schema/spring/camel-spring.xsd > >> http://www.springframework.org/schema/beans > >> http://www.springframework.org/schema/beans/spring-beans.xsd"> > >> > >> <camelContext id="audit-enrichment" xmlns=" > >> http://camel.apache.org/schema/spring"> > >> <route id="in-audit-to-file"> > >> <from uri="broker://queue:in.adt.*"/> > >> <setHeader headerName="MyCustomHeader"> > >> <constant>MyHeaderValue</constant> > >> </setHeader> > >> <recipientList> > >> <simple>broker://${header[JMSDestination]}</simple> > >> </recipientList> > >> </route> > >> > >> </camelContext> > >> > >> </beans> > >> > >> And here’s the exception I get > >> > >> 2016-04-28 12:47:43,721 | ERROR | Failed delivery for (MessageId: > >> ID-macpro-local-53588-1461869253207-0-2 on ExchangeId: > >> ID-macpro-local-53588-1461869253207-0-3). Exhausted after delivery > attempt: > >> 1 caught: java.lang.IllegalStateException: Not the original message from > >> the broker Message: Enter some text here for the message body... > >> > >> Message History > >> > >> > --------------------------------------------------------------------------------------------------------------------------------------- > >> RouteId ProcessorId Processor > >> Elapsed (ms) > >> [in-audit-to-file ] [in-audit-to-file ] [broker://queue:in.adt.* > >> ] [ 23] > >> [in-audit-to-file ] [setHeader1 ] [setHeader[MyCustomHeader] > >> ] [ 3] > >> [in-audit-to-file ] [recipientList1 ] > >> [recipientList[simple{broker://${header[JMSDestination]}}] > >> ] [ 18] > >> > >> Exchange > >> > >> > --------------------------------------------------------------------------------------------------------------------------------------- > >> Exchange[ > >> Id ID-macpro-local-53588-1461869253207-0-3 > >> ExchangePattern InOnly > >> Headers > >> {breadcrumbId=ID:macpro.local-53587-1461869252118-4:1:1:1:1, > >> CamelRedelivered=false, CamelRedeliveryCounter=0, JMSCorrelationID=, > >> JMSCorrelationIDAsBytes=, JMSDeliveryMode=1, > >> JMSDestination=queue://in.adt.epic, JMSExpiration=0, > >> JMSMessageID=ID:macpro.local-53587-1461869252118-4:1:1:1:1, > JMSPriority=0, > >> JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1461869263684, > >> JMSType=, JMSXGroupID=null, JMSXUserID=null, > MyCustomHeader=MyHeaderValue} > >> BodyType String > >> Body Enter some text here for the message body... > >> ] > >> > >> Stacktrace > >> > --------------------------------------------------------------------------------------------------------------------------------------- > >> | org.apache.camel.processor.DefaultErrorHandler | ActiveMQ VMTransport: > >> vm://localhost#1 > >> java.lang.IllegalStateException: Not the original message from the > broker > >> Message: Enter some text here for the message body... > >> at > >> > org.apache.activemq.camel.component.broker.BrokerProducer.checkOriginalMessage(BrokerProducer.java:95)[activemq-camel-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.camel.component.broker.BrokerProducer.getMessage(BrokerProducer.java:73)[activemq-camel-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.camel.component.broker.BrokerProducer.processInOnly(BrokerProducer.java:55)[activemq-camel-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.camel.component.broker.BrokerProducer.process(BrokerProducer.java:43)[activemq-camel-5.13.2.jar:5.13.2] > >> at > >> > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:668)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:596)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:237)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:178)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.RecipientList.process(RecipientList.java:131)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)[camel-core-2.16.2.jar:2.16.2] > >> at > >> > org.apache.activemq.camel.component.broker.BrokerConsumer.intercept(BrokerConsumer.java:56)[activemq-camel-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.broker.inteceptor.MessageInterceptorFilter.send(MessageInterceptorFilter.java:109)[activemq-broker-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:158)[activemq-broker-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:546)[activemq-broker-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)[activemq-client-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:338)[activemq-broker-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)[activemq-broker-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)[activemq-client-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:271)[activemq-broker-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:112)[activemq-client-5.13.2.jar:5.13.2] > >> at > >> > org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42)[activemq-client-5.13.2.jar:5.13.2] > >> 2016-04-28 12:47:43,727 | WARN | Error processing intercepted message: > >> ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId > = > >> ID:macpro.local-53587-1461869252118-4:1:1:1:1, originalDestination = > null, > >> originalTransactionId = null, producerId = > >> ID:macpro.local-53587-1461869252118-4:1:1:1, destination = > >> queue://in.adt.epic, transactionId = null, expiration = 0, timestamp = > >> 1461869263684, arrival = 0, brokerInTime = 0, brokerOutTime = 0, > >> correlationId = , replyTo = null, persistent = false, type = , priority > = > >> 0, groupID = null, groupSequence = 0, targetConsumerId = null, > compressed = > >> false, userID = null, content = null, marshalledProperties = null, > >> dataStructure = null, redeliveryCounter = 0, size = 0, properties = > null, > >> readOnlyProperties = true, readOnlyBody = true, droppable = false, > >> jmsXGroupFirstForConsumer = false, text = Enter some text here for the > >> message body...}. > >> > Exchange[ID-macpro-local-53588-1461869253207-0-1][BrokerJmsMessage[JMSMessageID: > >> ID:macpro.local-53587-1461869252118-4:1:1:1:1]. Caused by: > >> [java.lang.IllegalStateException - Not the original message from the > broker > >> Message: Enter some text here for the message body...] | > >> org.apache.activemq.camel.component.broker.BrokerConsumer | ActiveMQ > >> VMTransport: vm://localhost#1 > >> > >> Is there a way to accomplish what I’m after? > >> > >> > >> > >> > >