No one has any idea on this? It would be really great if I could find the formula to get this to work.
*Robert Simmons Jr. MSc. - Lead Java Architect @ EA* *Author of: Hardcore Java (2003) and Maintainable Java (2012)* *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* On Tue, Apr 15, 2014 at 5:15 PM, kraythe . <[email protected]> wrote: > So I think there is a problem with the way rollback is implemented in > relation to Camel. As far as I can tell there is no way to get the > following working. > > package com.ea.wwce.camel.test.utilities; > > import com.ea.wwce.camel.utilities.data.RecordList; > import com.ea.wwce.camel.utilities.transactions.TxnHelper; > import org.apache.camel.ExchangePattern; > import org.apache.camel.builder.AdviceWithRouteBuilder; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.component.mock.MockEndpoint; > import org.testng.annotations.Test; > import static com.ea.wwce.camel.test.utilities.TransactionTestTools.*; > import static > com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ; > import static > com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings; > import static org.apache.camel.ExchangePattern.InOnly; > > /** This test suite validates the transaction configuration in the test > suite. */ > public class JMSOnlyTransactionTest extends AMQRouteTestSupport { > private static final String QUEUE_DEAD = "dead"; > private static final String QUEUE_INBOX = "inbox"; > private static final String QUEUE_OUTBOX = "outbox"; > private static final String ROUTE_ID_FEED = "Feed"; > private static final String ROUTE_ID_TEST_ROUTE = "TestRoute"; > private static final String ROUTE_ID_RESULTS = "ResultsRoute"; > private static final String ROUTE_ID_DEAD = "DeadRoute"; > private static final String DIRECT_FEED_INBOX = "direct:feed_inbox"; > > private static final String MOCK_END = "mock:end"; > private static final String MOCK_BEFORE_TO_QUEUE = > "mock:before_to_queue"; > private static final String MOCK_AFTER_TO_QUEUE = "mock:after_to_queue"; > > /** Mock endpoints. */ > private MockEndpoint mockEnd, mockDead, mockOutbox, mockBeforeToQueue, > mockAfterToQueue; > > /** Helper to initialize mocks in the test. */ > private void initMocks() { > mockEnd = assertAndGetMockEndpoint(MOCK_END); > mockDead = assertAndGetMockEndpoint(MOCK_DEAD); > mockBeforeToQueue = assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE); > mockAfterToQueue = assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE); > mockOutbox = assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX)); > } > > @Override > protected RouteBuilder createRouteBuilder() { > System.out.println("createRouteBuilder"); > return new RouteBuilder(this.context) { > @Override > public void configure() { > getContext().setTracing(true); > from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED) > .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) > > .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX)); > from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS) > .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) > .unmarshal(dfCaseRecord).to(MOCK_END); > from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD) > .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) > .unmarshal(dfCaseRecord).to(MOCK_DEAD); > from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE) > > .onException(RuntimeException.class).handled(true).useOriginalMessage() > .to(InOnly, endpointAMQ(QUEUE_DEAD)).end() > .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW) > .unmarshal(dfCaseRecord) > .to(MOCK_BEFORE_TO_QUEUE) > .marshal(dfCaseRecord) > .to(endpointAMQ(QUEUE_OUTBOX)) > .unmarshal(dfCaseRecord) > .to(MOCK_AFTER_TO_QUEUE); > } > }; > } > > /** Advice the route, mocking ActiveMQ endpoints. */ > protected void adviceRoute() throws Exception { > this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith( > this.context, new AdviceWithRouteBuilder() { > @Override > public void configure() throws Exception { > mockEndpoints("activemq:queue:*"); > } > } > ); > } > > @Test > public void testNormalRouting() throws Exception { > adviceRoute(); > startCamelContext(); > initMocks(); > final RecordList cases = casesA(); > > mockEnd.expectedBodiesReceived(cases); > mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases); > > mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, > cases)); > mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases); > > template.sendBody(DIRECT_FEED_INBOX, cases); > > mockBeforeToQueue.assertIsSatisfied(); > mockAfterToQueue.assertIsSatisfied(); > mockEnd.assertIsSatisfied(); > mockDead.assertIsSatisfied(); > mockOutbox.assertIsSatisfied(); > } > > @Test > public void testRollbackBeforeEnqueue() throws Exception { > adviceRoute(); > startCamelContext(); > initMocks(); > final RecordList cases = casesA(); > > mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2)); > mockDead.expectedBodiesReceived(cases.get(0)); > mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases); > mockBeforeToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR); > > mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, > cases.get(1), cases.get(2))); > mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1), > cases.get(2)); > > template.sendBody(DIRECT_FEED_INBOX, cases); > > mockBeforeToQueue.assertIsSatisfied(); > mockAfterToQueue.assertIsSatisfied(); > mockEnd.assertIsSatisfied(); > mockDead.assertIsSatisfied(); > mockOutbox.assertIsSatisfied(); > } > > @Test > public void testRollbackAfterEnqueue() throws Exception { > adviceRoute(); > startCamelContext(); > initMocks(); > final RecordList cases = casesA(); > > mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2)); > mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0)); > mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases); > > mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, > cases)); > mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases); > mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR); > > template.sendBody(DIRECT_FEED_INBOX, cases); > > mockBeforeToQueue.assertIsSatisfied(); > mockAfterToQueue.assertIsSatisfied(); > mockDead.assertIsSatisfied(); > mockOutbox.assertIsSatisfied(); > mockEnd.assertIsSatisfied(); > } > } > > I have tried dozens of combinations in the onException clause and nothing > works. Adding markRollbackOnly(), or rollback() only succeeds in rolling > back the dead letter channel as well. Making the handled(false) cause AMQ > to resubmit the transaction and rolls back the dead letter channel. I have > tried a dozen combinations so if anyone has one that works I would be > grateful. > > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* > *Author of: Hardcore Java (2003) and Maintainable Java (2012)* > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* > > > On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <[email protected]> wrote: > >> So, in the ongoing perfect transaction configuration we have an >> interesting use case: Consider the following route in a test: >> >> @Override >> protected RouteBuilder createRouteBuilder() { >> System.out.println("createRouteBuilder"); >> return new RouteBuilder(this.context) { >> @Override >> public void configure() { >> getContext().setTracing(true); >> from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED) >> .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) >> >> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX)); >> from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS) >> .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) >> .unmarshal(dfCaseRecord).to(MOCK_END); >> from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD) >> .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) >> .unmarshal(dfCaseRecord).to(MOCK_DEAD); >> from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE) >> >> .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end() >> .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) >> .unmarshal(dfCaseRecord) >> .to(MOCK_BEFORE_TO_QUEUE) >> .marshal(dfCaseRecord) >> .to(endpointAMQ(QUEUE_OUTBOX)) >> .unmarshal(dfCaseRecord) >> .to(MOCK_AFTER_TO_QUEUE); >> } >> }; >> } >> >> Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply looks >> up the transaction policy by name as it is just a string key for our JNDI >> registry. Our test case looks like the following. >> >> @Test >> public void testRollbackAfterEnqueue() throws Exception { >> adviceRoute(); >> startCamelContext(); >> initMocks(); >> final RecordList cases = casesA(); >> >> mockEnd.expectedMessageCount(2); >> mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2)); >> mockDead.expectedMessageCount(1); >> mockDead.expectedBodiesReceived(cases.get(0)); >> mockBeforeToQueue.expectedMessageCount(3); >> mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases); >> mockOutbox.expectedMessageCount(3); >> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, >> cases)); >> mockAfterToQueue.expectedMessageCount(3); >> mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases); >> mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR); >> >> template.sendBody(DIRECT_FEED_INBOX, cases); >> >> mockBeforeToQueue.assertIsSatisfied(); >> mockAfterToQueue.assertIsSatisfied(); >> mockEnd.assertIsSatisfied(); >> mockDead.assertIsSatisfied(); >> mockOutbox.assertIsSatisfied(); >> } >> >> In this route the goal is that if any exceptions are thrown even after >> the message is enqueued, it should be rolled back, the message that >> excepted should go to the DLQ and the messages in the outbox should be >> rolled back but the message from the inbox should not be put back on the >> queue. This is proving to be a bit of a juggling act. >> >> I tried putting markRollBackOnly() in the exception handler after the >> to(dead) but that rolled back the dead letter queue and outbox and then >> redelivered the inbox message. Removing the markRollbackOnly() means >> that the message arrives at dead and is off the inbox but it doesn't get >> removed from the outbox. >> >> So I am looking for the right combination of calls to accomplish what I >> want to do. Any suggestions? >> >> Thanks in advance. >> >> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* >> *Author of: Hardcore Java (2003) and Maintainable Java (2012)* >> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 >> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* >> > >
