[ 
https://issues.apache.org/jira/browse/CAMEL-21114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen reassigned CAMEL-21114:
-----------------------------------

    Assignee: Claus Ibsen

> camel-zipfile - ZipSplitter with AggregationStrategy does not aggregate all 
> splits
> ----------------------------------------------------------------------------------
>
>                 Key: CAMEL-21114
>                 URL: https://issues.apache.org/jira/browse/CAMEL-21114
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-zipfile
>    Affects Versions: 3.14.10, 3.22.2
>            Reporter: Andre Weickel
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 4.x
>
>
> A transacted route with ZipSplitter and Aggregation Strategy does not 
> aggregate the last zip file entry. The issue only occurs for transacted 
> routes.
>  
> Example:
>  
> _Zip Archive_
>  * _A.xml_
>  * _B.xml_
>  
> Both splits are processed but only for the first exchange (A.xml) the 
> aggregate method is called.
> For a zip archive with two entries the doRun() method of 
> MulticastTransactedTask is called three times. The third time iterator.next() 
> returns null although hasNext() was true. As a result the doDone() method is 
> called but there is still a task in the queue (with the second exchange). 
> This task is processed after doDone() was executed but it’s not aggregated 
> because of a done check in aggregate() of MulticastTransactedTask.
>  
> We found the problem in Camel 3.14, but it is still present in Camel 3.22.
>  
> It can be reproduced with the following test (it works if you remove the 
> transacted tag from the route)
> {code:java}
> import org.apache.camel.AggregationStrategy;
> import org.apache.camel.Exchange;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.dataformat.zipfile.ZipSplitter;
> import org.apache.camel.spring.spi.SpringTransactionPolicy;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.h2.jdbcx.JdbcDataSource;
> import org.junit.Test;
> import org.springframework.jdbc.datasource.DataSourceTransactionManager;
> import org.springframework.transaction.support.TransactionTemplate;
> public class ZipSplitterTest extends CamelTestSupport  {    
> String zipArchiveWithTwoFiles = 
> "UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA=";
>     
>     @Test
>     public void testIfAllSplitsAggregated() throws Exception {
>         MockEndpoint mock = getMockEndpoint("mock:result");
>         template.sendBody("direct:start", "");
>       
>         // Check if second file was processed in aggregate() method of 
> AggregationStrategy
>         assertEquals("Orders2.xml", 
> mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", 
> String.class));
>     }    
>     @Override
>     protected RouteBuilder createRouteBuilder() throws Exception {
>         return new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>   
>                 JdbcDataSource dataSource = new JdbcDataSource();
>                 dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
>                 dataSource.setUser("sa");
>                 dataSource.setPassword("");                
>                 DataSourceTransactionManager txManager = new 
> DataSourceTransactionManager(dataSource);
>                 
>                 TransactionTemplate transactionTemplate = new 
> TransactionTemplate(txManager);
>                 
> transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
>                 
> transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
>                 transactionTemplate.setTimeout(1800);
>                 SpringTransactionPolicy springTransactionPolicy = new 
> SpringTransactionPolicy();
>                 springTransactionPolicy.setTransactionManager(txManager);
>                 
> springTransactionPolicy.setTransactionTemplate(transactionTemplate);
>                 
>                 getContext().getRegistry().bind("transacted", 
> springTransactionPolicy);
>                 getContext().getRegistry().bind("zipSplitter", new 
> ZipSplitter());
>                 from("direct:start")
>                     .transacted("transacted")
>                     .setBody().simple(zipArchiveWithTwoFiles)
>                     .unmarshal().base64()
>                     
> .split().ref("zipSplitter").streaming().aggregationStrategy(new 
> StringAggregationStrategy())
>                         .log("Splitted")
>                     .end()
>                     .to("mock:result");
>             }
>         };
>     }
>     
>     private static class StringAggregationStrategy implements 
> AggregationStrategy {
>         @Override
>         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) 
> {
>            
> System.out.println(newExchange.getMessage().getHeader("CamelFileName"));
>            return newExchange;
>         }
>     }
> }{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to