[ https://issues.apache.org/jira/browse/CAMEL-21114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Ibsen reassigned CAMEL-21114: ----------------------------------- Assignee: Claus Ibsen > camel-zipfile - ZipSplitter with AggregationStrategy does not aggregate all > splits > ---------------------------------------------------------------------------------- > > Key: CAMEL-21114 > URL: https://issues.apache.org/jira/browse/CAMEL-21114 > Project: Camel > Issue Type: Bug > Components: camel-zipfile > Affects Versions: 3.14.10, 3.22.2 > Reporter: Andre Weickel > Assignee: Claus Ibsen > Priority: Minor > Fix For: 4.x > > > A transacted route with ZipSplitter and Aggregation Strategy does not > aggregate the last zip file entry. The issue only occurs for transacted > routes. > > Example: > > _Zip Archive_ > * _A.xml_ > * _B.xml_ > > Both splits are processed but only for the first exchange (A.xml) the > aggregate method is called. > For a zip archive with two entries the doRun() method of > MulticastTransactedTask is called three times. The third time iterator.next() > returns null although hasNext() was true. As a result the doDone() method is > called but there is still a task in the queue (with the second exchange). > This task is processed after doDone() was executed but it’s not aggregated > because of a done check in aggregate() of MulticastTransactedTask. > > We found the problem in Camel 3.14, but it is still present in Camel 3.22. > > It can be reproduced with the following test (it works if you remove the > transacted tag from the route) > {code:java} > import org.apache.camel.AggregationStrategy; > import org.apache.camel.Exchange; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.component.mock.MockEndpoint; > import org.apache.camel.dataformat.zipfile.ZipSplitter; > import org.apache.camel.spring.spi.SpringTransactionPolicy; > import org.apache.camel.test.junit4.CamelTestSupport; > import org.h2.jdbcx.JdbcDataSource; > import org.junit.Test; > import org.springframework.jdbc.datasource.DataSourceTransactionManager; > import org.springframework.transaction.support.TransactionTemplate; > public class ZipSplitterTest extends CamelTestSupport { > String zipArchiveWithTwoFiles = > "UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA="; > > @Test > public void testIfAllSplitsAggregated() throws Exception { > MockEndpoint mock = getMockEndpoint("mock:result"); > template.sendBody("direct:start", ""); > > // Check if second file was processed in aggregate() method of > AggregationStrategy > assertEquals("Orders2.xml", > mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", > String.class)); > } > @Override > protected RouteBuilder createRouteBuilder() throws Exception { > return new RouteBuilder() { > @Override > public void configure() throws Exception { > > JdbcDataSource dataSource = new JdbcDataSource(); > dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1"); > dataSource.setUser("sa"); > dataSource.setPassword(""); > DataSourceTransactionManager txManager = new > DataSourceTransactionManager(dataSource); > > TransactionTemplate transactionTemplate = new > TransactionTemplate(txManager); > > transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED"); > > transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED"); > transactionTemplate.setTimeout(1800); > SpringTransactionPolicy springTransactionPolicy = new > SpringTransactionPolicy(); > springTransactionPolicy.setTransactionManager(txManager); > > springTransactionPolicy.setTransactionTemplate(transactionTemplate); > > getContext().getRegistry().bind("transacted", > springTransactionPolicy); > getContext().getRegistry().bind("zipSplitter", new > ZipSplitter()); > from("direct:start") > .transacted("transacted") > .setBody().simple(zipArchiveWithTwoFiles) > .unmarshal().base64() > > .split().ref("zipSplitter").streaming().aggregationStrategy(new > StringAggregationStrategy()) > .log("Splitted") > .end() > .to("mock:result"); > } > }; > } > > private static class StringAggregationStrategy implements > AggregationStrategy { > @Override > public Exchange aggregate(Exchange oldExchange, Exchange newExchange) > { > > System.out.println(newExchange.getMessage().getHeader("CamelFileName")); > return newExchange; > } > } > }{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)