Hi there,
I am using ActiveMQ 5.8.0 and Camel 2.10.4. I am reading
ExchangePattern.InOnly messages from a JMS queue, and want to expire those
which are not processed within a given time explicitly to a named dead
letter queue. The problem is I can't get things to expire.
I have the following route:
public class FulfillmentRequestRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage());
from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&timeToLive=1&transacted=true&preserveMessageQos=true")
.transacted()
.to("mock:initialProcessor");
}
}
And the following ActiveMQ config:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:broker="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<broker:broker useJmx="true" persistent="true" brokerName="myBroker">
<broker:transportConnectors>
<broker:transportConnector name="vm" uri="vm://myBroker" />
<broker:transportConnector name="tcp"
uri="tcp://localhost:${tcp.port}" />
</broker:transportConnectors>
<broker:persistenceAdapter>
<broker:kahaPersistenceAdapter
directory="target/olp-activemq-data" maxDataFileLength="33554432"/>
</broker:persistenceAdapter>
<broker:destinationPolicy>
<broker:policyMap>
<broker:policyEntries>
<broker:policyEntry queue=">">
<broker:deadLetterStrategy>
<broker:sharedDeadLetterStrategy processExpired="true"
processNonPersistent="true" />
</broker:deadLetterStrategy>
</broker:policyEntry>
</broker:policyEntries>
</broker:policyMap>
</broker:destinationPolicy>
</broker:broker>
<bean id="jms"
class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="vm://myBroker" />
<property name="transacted" value="true"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="acceptMessagesWhileStopping" value="false"/>
</bean>
<bean id="jmsTransactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://myBroker" />
</bean>
</beans>
Finally I have a Unit Test which creates two messages,one which will be
processed, and the other which should time-out.
@RunWith(CamelSpringJUnit4ClassRunner.class)
@ContextConfiguration(locations =
{"classpath:/META-INF/spring/camel-server.xml"})
public class FulfillmentRequestTimeoutTest {
@EndpointInject(uri = "mock:initialProcessor")
protected MockEndpoint mockEndpoint;
@Produce
protected ProducerTemplate template;
protected ConsumerTemplate consumer;
@Autowired
@Qualifier("camel-server")
protected CamelContext context;
@DirtiesContext
@Test
public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws
Exception {
// Given
consumer = context.createConsumerTemplate();
int expectedValidMessageCount = 3;
mockEndpoint.expectedMessageCount(expectedValidMessageCount);
// When
String xmlBody1 = "<?xml version=\"1.0\"?><body>THIS WILL NOT
TIMEOUT</body>";
template.sendBody("jms:queue:fulfillmentRequest",
ExchangePattern.InOnly, xmlBody1);
long ttl = System.currentTimeMillis() - 12000000;
String xmlBody2 = "<?xml version=\"1.0\"?><body>!!!!!TIMED
OUT!!!!!</body>";
template.sendBodyAndHeader("jms:queue:fulfillmentRequest",
ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl);
// Then
// The second message is not processed
mockEndpoint.assertIsSatisfied(); // This fails, but it sees two
messages rather than just one
List<Exchange> list = mockEndpoint.getReceivedExchanges();
String notTimedOutMessageBody = (String)
list.get(0).getIn().getBody(String.class);
assertEquals(xmlBody1, notTimedOutMessageBody);
Thread.sleep(5000);
// And is instead routed to the timedOut JMS queue
Object dlqBody = consumer.receiveBodyNoWait("jms:queue:dead");
assertNotNull("Should not lose the message", dlqBody); //
This also fails if I comment out the assert above
assertEquals(xmlBody2, dlqBody);
}
@Configuration
public static class ContextConfig extends SingleRouteCamelConfiguration
{
@Bean
public RouteBuilder route() {
return new FulfillmentRequestRoute();
}
}
}
I've been staring at this for a while, and while I think I've only been
changing one thing at a time, I may have made an error or left behind some
config which is shooting me in the foot.
One final thing to note, I have this pattern working elsewhere in tests
which explicitly throw exceptions from with transactions in Camel, but I'd
prefer not to have to manually start looking into headers myself when this
all seems to be handled already.
I hope you can help.
TIA
Cheers, Andrew
--
View this message in context:
http://camel.465427.n5.nabble.com/Not-Expiring-JMS-Messages-with-ActiveMQ-Camel-tp5732841.html
Sent from the Camel - Users mailing list archive at Nabble.com.