[
https://issues.apache.org/jira/browse/CAMEL-21114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen reassigned CAMEL-21114:
-----------------------------------
Assignee: Claus Ibsen
> camel-zipfile - ZipSplitter with AggregationStrategy does not aggregate all
> splits
> ----------------------------------------------------------------------------------
>
> Key: CAMEL-21114
> URL: https://issues.apache.org/jira/browse/CAMEL-21114
> Project: Camel
> Issue Type: Bug
> Components: camel-zipfile
> Affects Versions: 3.14.10, 3.22.2
> Reporter: Andre Weickel
> Assignee: Claus Ibsen
> Priority: Minor
> Fix For: 4.x
>
>
> A transacted route with ZipSplitter and Aggregation Strategy does not
> aggregate the last zip file entry. The issue only occurs for transacted
> routes.
>
> Example:
>
> _Zip Archive_
> * _A.xml_
> * _B.xml_
>
> Both splits are processed but only for the first exchange (A.xml) the
> aggregate method is called.
> For a zip archive with two entries the doRun() method of
> MulticastTransactedTask is called three times. The third time iterator.next()
> returns null although hasNext() was true. As a result the doDone() method is
> called but there is still a task in the queue (with the second exchange).
> This task is processed after doDone() was executed but it’s not aggregated
> because of a done check in aggregate() of MulticastTransactedTask.
>
> We found the problem in Camel 3.14, but it is still present in Camel 3.22.
>
> It can be reproduced with the following test (it works if you remove the
> transacted tag from the route)
> {code:java}
> import org.apache.camel.AggregationStrategy;
> import org.apache.camel.Exchange;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.dataformat.zipfile.ZipSplitter;
> import org.apache.camel.spring.spi.SpringTransactionPolicy;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.h2.jdbcx.JdbcDataSource;
> import org.junit.Test;
> import org.springframework.jdbc.datasource.DataSourceTransactionManager;
> import org.springframework.transaction.support.TransactionTemplate;
> public class ZipSplitterTest extends CamelTestSupport {
> String zipArchiveWithTwoFiles =
> "UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA=";
>
> @Test
> public void testIfAllSplitsAggregated() throws Exception {
> MockEndpoint mock = getMockEndpoint("mock:result");
> template.sendBody("direct:start", "");
>
> // Check if second file was processed in aggregate() method of
> AggregationStrategy
> assertEquals("Orders2.xml",
> mock.getExchanges().get(0).getMessage().getHeader("CamelFileName",
> String.class));
> }
> @Override
> protected RouteBuilder createRouteBuilder() throws Exception {
> return new RouteBuilder() {
> @Override
> public void configure() throws Exception {
>
> JdbcDataSource dataSource = new JdbcDataSource();
> dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
> dataSource.setUser("sa");
> dataSource.setPassword("");
> DataSourceTransactionManager txManager = new
> DataSourceTransactionManager(dataSource);
>
> TransactionTemplate transactionTemplate = new
> TransactionTemplate(txManager);
>
> transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
>
> transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
> transactionTemplate.setTimeout(1800);
> SpringTransactionPolicy springTransactionPolicy = new
> SpringTransactionPolicy();
> springTransactionPolicy.setTransactionManager(txManager);
>
> springTransactionPolicy.setTransactionTemplate(transactionTemplate);
>
> getContext().getRegistry().bind("transacted",
> springTransactionPolicy);
> getContext().getRegistry().bind("zipSplitter", new
> ZipSplitter());
> from("direct:start")
> .transacted("transacted")
> .setBody().simple(zipArchiveWithTwoFiles)
> .unmarshal().base64()
>
> .split().ref("zipSplitter").streaming().aggregationStrategy(new
> StringAggregationStrategy())
> .log("Splitted")
> .end()
> .to("mock:result");
> }
> };
> }
>
> private static class StringAggregationStrategy implements
> AggregationStrategy {
> @Override
> public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
> {
>
> System.out.println(newExchange.getMessage().getHeader("CamelFileName"));
> return newExchange;
> }
> }
> }{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)