I have a producer in a request handling thread that I am using to send messages
to a remote JMS (ActiveMQ) broker. I am using the asycSendBody method on the
producerTemplate. I discovered that if the broker goes down the producer starts
to block. I can't afford for the producer ever to block, so I have added an
ExecutorService using a short queue and the 'Discard' policy. Here is the root
that I have:
from("direct:tuneStart")
.threads(5,
20).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.Discard)
.to("jms:queue:myQueue?jmsMessageType=Text")
Now my producer never blocks. However, I do want to gather metrics or log
successful versus failed events. I don't want to do that in the producer though.
I seem to have two sorts of failure:
1. I use the ActiveMQ failover transport with a timeout of 1 second. When the
broker is down I see the messages that get onto the queue fail after one second
with an 'uncategorized' JMSException [1].
2. When the queues fill up the messages are silently discarded. I know I could
also use the Abort policy.
I want to have a handler for every successful message and for each type of
failure. Can anyone tell me how to do this please? Can I use Camel exception
clause (onException). Or do I somehow have to introduce a bean with an exchange
that can handle futures? Is 'onCompletion' useful here?
Any help would be very much appreciated. I have only just started using Camel.
Thank you.
James
org.apache.activemq.transport.failover.FailoverTransport - Failover timed out
after 29999ms
[1] Failed delivery for (MessageId: ID-james-xxxx-52220-1334830271190-0-5 on
ExchangeId: ID-james-xxxx-52220-1334830271190-0-6). Exhausted after delivery
attempt: 1 caught: org.springframework.jms.UncategorizedJmsException:
Uncategorized exception occured during JMS processing; nested exception is
javax.jms.JMSException: Failover timeout of 1000 ms reached.
IOException: Failover timeout of 1000 ms reached.
at
org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:449)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
at
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:74)
at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:79)
at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1244)
at
org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1350)
at
org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:300)
at
org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:196)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:457)
at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:185)
at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:359)
at
org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:313)
at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:111)
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at
org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:115)
….