Andre Weickel created CAMEL-21114:
-------------------------------------

             Summary: ZipSplitter with AggregationStrategy does not aggregate 
all splits
                 Key: CAMEL-21114
                 URL: https://issues.apache.org/jira/browse/CAMEL-21114
             Project: Camel
          Issue Type: Bug
    Affects Versions: 3.14.10
            Reporter: Andre Weickel


A transacted route with ZipSplitter and Aggregation Strategy does not aggregate 
the last zip file entry. The issue only occurs for transacted routes.

 

Example:

 

_Zip Archive_
 * _A.xml_
 * _B.xml_

 

Both splits are processed but only for the first exchange (A.xml) the aggregate 
method is called.

For a zip archive with two entries the doRun() method of 
MulticastTransactedTask is called three times. The third time iterator.next() 
returns null although hasNext() was true. As a result the doDone() method is 
called but there is still a task in the queue (with the second exchange). This 
task is processed after doDone() was executed but it’s not aggregated because 
of a done check in aggregate() of MulticastTransactedTask.

 

We found the problem in Camel 3.14, but it is still present in Camel 3.22.

 

It can be reproduced with the following test (it works if you remove the 
transacted tag from the route)
{code:java}
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.dataformat.zipfile.ZipSplitter;
import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.h2.jdbcx.JdbcDataSource;
import org.junit.Test;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;

public class ZipSplitterTest extends CamelTestSupport  {    

String zipArchiveWithTwoFiles 
="UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA=";
    
    @Test
    public void testIfAllSplitsAggregated() throws Exception {
        MockEndpoint mock = getMockEndpoint("mock:result");
        template.sendBody("direct:start", "");        // Check if second file 
was processed in aggregate() method of AggregationStrategy
        assertEquals("Orders2.xml", 
mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", 
String.class));
    }    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
  
                JdbcDataSource dataSource = new JdbcDataSource();
                dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
                dataSource.setUser("sa");
                dataSource.setPassword("");                
DataSourceTransactionManager txManager = new 
DataSourceTransactionManager(dataSource);                TransactionTemplate 
transactionTemplate = new TransactionTemplate(txManager);
                
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
                
transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
                transactionTemplate.setTimeout(1800);                
SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy();
                springTransactionPolicy.setTransactionManager(txManager);
                
springTransactionPolicy.setTransactionTemplate(transactionTemplate);
                
                getContext().getRegistry().bind("transacted", 
springTransactionPolicy);
                getContext().getRegistry().bind("zipSplitter", new 
ZipSplitter());                from("direct:start")
                    .transacted("transacted")
                    .setBody().simple(zipArchiveWithTwoFiles)
                    .unmarshal().base64()
                    
.split().ref("zipSplitter").streaming().aggregationStrategy(new 
StringAggregationStrategy())
                        .log("Splitted")
                    .end()
                    .to("mock:result");
            }
        };
    }
    
    private static class StringAggregationStrategy implements 
AggregationStrategy {
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
           
System.out.println(newExchange.getMessage().getHeader("CamelFileName"));
           return newExchange;
        }
    }
}{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to