Hi I suggest to dive into the spring jms code, as its the library that is the baseline for the Camel JMS component. It has the logic for recovery.
On Mon, Jan 21, 2019 at 3:59 PM Geldhof, Kristof <[email protected]> wrote: > > Hi, > > I'm using Camel 2.23.0 and activemq-camel. I'm consuming messages from a > virtual topic. I want to detect when there are problems in the connection > with the broker and when (automatic) recovery is successful. For the initial > detection of a connection error I use an exception listener. This one is > triggered as expected. However, I don't succeed in detecting when the > auto-recovery has succeeded (I need to be able to detect the recovery without > a new message being consumed). I tried to use an instance of > DefaultMessageListenerContainer and call its isRecovering() method to see if > recovery has succeeded. > > from("imq:queue:Consumer.app.VirtualTopic.Update?messageListenerContainerFactoryRef=#myContainerFactory") > .log("Message received from virtual topic 'VirtualTopic.Update': ${body}") > ... > > An exception listener "JmsExceptionListener" listens for exceptions and > triggers a route: > > public class JmsExceptionListener implements ExceptionListener { > > @Autowired > private ProducerTemplate producerTemplate; > > @Override > public void onException(JMSException exception) { > producerTemplate.sendBody ("direct:error", ""); > } > } > > Error route: > > from("direct:error") > .onException(Exception.class) > ... (infinite retries with fixed delay) > .end() > .filter(method("myMessageListenerContainer", "isRecovering") > .throwException(new Exception("still recovering")) > .end() > .log("Connection restored") > ... > > Bean configuration: > > <bean id="imq" class="org.apache.activemq.camel.component.ActiveMQComponent" > p:acknowledgementModeName="CLIENT_ACKNOWLEDGE" > p:receiveTimeout="30000" > p:requestTimeout="30000" > p:concurrentConsumers="1" > p:cacheLevelName="CACHE_CONSUMER" > p:testConnectionOnStartup="true" > p:connectionFactory-ref="imqConnectionFactory" > p:recoveryInterval="2000"/> > > <amq:connectionFactory id="imqConnectionFactory" > consumerExpiryCheckEnabled="false" exceptionListener="#amqExceptionListener" > brokerURL="${imq.broker.url}"/> > <amq:prefetchPolicy all="1"/> > > <bean id="amqExceptionListener" > class="be.myapp.listener.JmsExceptionListener"/> > > <bean id="myMessageListenerContainer" > class="org.springframework.jms.listener.DefaultMessageListenerContainer" > p:connectionFactory-ref="imqConnectionFactory" > p:destination-name="queue:Consumer.app.VirtualTopic.Update"/> > > Classes: > > public class MyMessageListenerContainerFactory implements > MessageListenerContainerFactory { > > @Autowired > private DefaultMessageListenerContainer > myMessageListenerContainer; > > @Override > public AbstractMessageListenerContainer > createMessageListenerContainer(JmsEndpoint endpoint) { > return myMessageListenerContainer; > } > } > > When manually bringing down my locally-running activeMQ broker, the exception > handler is correctly triggered, but the isRecovering() method of my > MyMessageListenerContainer returns false, while I had expected it to return > true. What can be the cause of this? Is there a problem in my configuration? > Is there any other way to easily detect the recovery state? > > Best regards, > Kristof Geldhof -- Claus Ibsen ----------------- http://davsclaus.com @davsclaus Camel in Action 2: https://www.manning.com/ibsen2
