[ https://issues.apache.org/jira/browse/CAMEL-21114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andre Weickel updated CAMEL-21114: ---------------------------------- Description: A transacted route with ZipSplitter and Aggregation Strategy does not aggregate the last zip file entry. The issue only occurs for transacted routes. Example: _Zip Archive_ * _A.xml_ * _B.xml_ Both splits are processed but only for the first exchange (A.xml) the aggregate method is called. For a zip archive with two entries the doRun() method of MulticastTransactedTask is called three times. The third time iterator.next() returns null although hasNext() was true. As a result the doDone() method is called but there is still a task in the queue (with the second exchange). This task is processed after doDone() was executed but it’s not aggregated because of a done check in aggregate() of MulticastTransactedTask. We found the problem in Camel 3.14, but it is still present in Camel 3.22. It can be reproduced with the following test (it works if you remove the transacted tag from the route) {code:java} import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.dataformat.zipfile.ZipSplitter; import org.apache.camel.spring.spi.SpringTransactionPolicy; import org.apache.camel.test.junit4.CamelTestSupport; import org.h2.jdbcx.JdbcDataSource; import org.junit.Test; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.support.TransactionTemplate; public class ZipSplitterTest extends CamelTestSupport { String zipArchiveWithTwoFiles = "UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA="; @Test public void testIfAllSplitsAggregated() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); template.sendBody("direct:start", ""); // Check if second file was processed in aggregate() method of AggregationStrategy assertEquals("Orders2.xml", mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", String.class)); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { JdbcDataSource dataSource = new JdbcDataSource(); dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1"); dataSource.setUser("sa"); dataSource.setPassword(""); DataSourceTransactionManager txManager = new DataSourceTransactionManager(dataSource); TransactionTemplate transactionTemplate = new TransactionTemplate(txManager); transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED"); transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED"); transactionTemplate.setTimeout(1800); SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy(); springTransactionPolicy.setTransactionManager(txManager); springTransactionPolicy.setTransactionTemplate(transactionTemplate); getContext().getRegistry().bind("transacted", springTransactionPolicy); getContext().getRegistry().bind("zipSplitter", new ZipSplitter()); from("direct:start") .transacted("transacted") .setBody().simple(zipArchiveWithTwoFiles) .unmarshal().base64() .split().ref("zipSplitter").streaming().aggregationStrategy(new StringAggregationStrategy()) .log("Splitted") .end() .to("mock:result"); } }; } private static class StringAggregationStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { System.out.println(newExchange.getMessage().getHeader("CamelFileName")); return newExchange; } } }{code} was: A transacted route with ZipSplitter and Aggregation Strategy does not aggregate the last zip file entry. The issue only occurs for transacted routes. Example: _Zip Archive_ * _A.xml_ * _B.xml_ Both splits are processed but only for the first exchange (A.xml) the aggregate method is called. For a zip archive with two entries the doRun() method of MulticastTransactedTask is called three times. The third time iterator.next() returns null although hasNext() was true. As a result the doDone() method is called but there is still a task in the queue (with the second exchange). This task is processed after doDone() was executed but it’s not aggregated because of a done check in aggregate() of MulticastTransactedTask. We found the problem in Camel 3.14, but it is still present in Camel 3.22. It can be reproduced with the following test (it works if you remove the transacted tag from the route) {code:java} import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.dataformat.zipfile.ZipSplitter; import org.apache.camel.spring.spi.SpringTransactionPolicy; import org.apache.camel.test.junit4.CamelTestSupport; import org.h2.jdbcx.JdbcDataSource; import org.junit.Test; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.support.TransactionTemplate; public class ZipSplitterTest extends CamelTestSupport { String zipArchiveWithTwoFiles ="UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA="; @Test public void testIfAllSplitsAggregated() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); template.sendBody("direct:start", ""); // Check if second file was processed in aggregate() method of AggregationStrategy assertEquals("Orders2.xml", mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", String.class)); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { JdbcDataSource dataSource = new JdbcDataSource(); dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1"); dataSource.setUser("sa"); dataSource.setPassword(""); DataSourceTransactionManager txManager = new DataSourceTransactionManager(dataSource); TransactionTemplate transactionTemplate = new TransactionTemplate(txManager); transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED"); transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED"); transactionTemplate.setTimeout(1800); SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy(); springTransactionPolicy.setTransactionManager(txManager); springTransactionPolicy.setTransactionTemplate(transactionTemplate); getContext().getRegistry().bind("transacted", springTransactionPolicy); getContext().getRegistry().bind("zipSplitter", new ZipSplitter()); from("direct:start") .transacted("transacted") .setBody().simple(zipArchiveWithTwoFiles) .unmarshal().base64() .split().ref("zipSplitter").streaming().aggregationStrategy(new StringAggregationStrategy()) .log("Splitted") .end() .to("mock:result"); } }; } private static class StringAggregationStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { System.out.println(newExchange.getMessage().getHeader("CamelFileName")); return newExchange; } } }{code} > ZipSplitter with AggregationStrategy does not aggregate all splits > ------------------------------------------------------------------ > > Key: CAMEL-21114 > URL: https://issues.apache.org/jira/browse/CAMEL-21114 > Project: Camel > Issue Type: Bug > Affects Versions: 3.14.10 > Reporter: Andre Weickel > Priority: Major > > A transacted route with ZipSplitter and Aggregation Strategy does not > aggregate the last zip file entry. The issue only occurs for transacted > routes. > > Example: > > _Zip Archive_ > * _A.xml_ > * _B.xml_ > > Both splits are processed but only for the first exchange (A.xml) the > aggregate method is called. > For a zip archive with two entries the doRun() method of > MulticastTransactedTask is called three times. The third time iterator.next() > returns null although hasNext() was true. As a result the doDone() method is > called but there is still a task in the queue (with the second exchange). > This task is processed after doDone() was executed but it’s not aggregated > because of a done check in aggregate() of MulticastTransactedTask. > > We found the problem in Camel 3.14, but it is still present in Camel 3.22. > > It can be reproduced with the following test (it works if you remove the > transacted tag from the route) > {code:java} > import org.apache.camel.AggregationStrategy; > import org.apache.camel.Exchange; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.component.mock.MockEndpoint; > import org.apache.camel.dataformat.zipfile.ZipSplitter; > import org.apache.camel.spring.spi.SpringTransactionPolicy; > import org.apache.camel.test.junit4.CamelTestSupport; > import org.h2.jdbcx.JdbcDataSource; > import org.junit.Test; > import org.springframework.jdbc.datasource.DataSourceTransactionManager; > import org.springframework.transaction.support.TransactionTemplate; > public class ZipSplitterTest extends CamelTestSupport { > String zipArchiveWithTwoFiles = > "UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA="; > > @Test > public void testIfAllSplitsAggregated() throws Exception { > MockEndpoint mock = getMockEndpoint("mock:result"); > template.sendBody("direct:start", ""); > > // Check if second file was processed in aggregate() method of > AggregationStrategy > assertEquals("Orders2.xml", > mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", > String.class)); > } > @Override > protected RouteBuilder createRouteBuilder() throws Exception { > return new RouteBuilder() { > @Override > public void configure() throws Exception { > > JdbcDataSource dataSource = new JdbcDataSource(); > dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1"); > dataSource.setUser("sa"); > dataSource.setPassword(""); > DataSourceTransactionManager txManager = new > DataSourceTransactionManager(dataSource); > TransactionTemplate transactionTemplate = new > TransactionTemplate(txManager); > > transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED"); > > transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED"); > transactionTemplate.setTimeout(1800); > SpringTransactionPolicy springTransactionPolicy = new > SpringTransactionPolicy(); > springTransactionPolicy.setTransactionManager(txManager); > > springTransactionPolicy.setTransactionTemplate(transactionTemplate); > > getContext().getRegistry().bind("transacted", > springTransactionPolicy); > getContext().getRegistry().bind("zipSplitter", new > ZipSplitter()); > from("direct:start") > .transacted("transacted") > .setBody().simple(zipArchiveWithTwoFiles) > .unmarshal().base64() > > .split().ref("zipSplitter").streaming().aggregationStrategy(new > StringAggregationStrategy()) > .log("Splitted") > .end() > .to("mock:result"); > } > }; > } > > private static class StringAggregationStrategy implements > AggregationStrategy { > @Override > public Exchange aggregate(Exchange oldExchange, Exchange newExchange) > { > > System.out.println(newExchange.getMessage().getHeader("CamelFileName")); > return newExchange; > } > } > }{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)