[ 
https://issues.apache.org/jira/browse/CAMEL-21114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andre Weickel updated CAMEL-21114:
----------------------------------
    Description: 
A transacted route with ZipSplitter and Aggregation Strategy does not aggregate 
the last zip file entry. The issue only occurs for transacted routes.

 

Example:

 

_Zip Archive_
 * _A.xml_
 * _B.xml_

 

Both splits are processed but only for the first exchange (A.xml) the aggregate 
method is called.

For a zip archive with two entries the doRun() method of 
MulticastTransactedTask is called three times. The third time iterator.next() 
returns null although hasNext() was true. As a result the doDone() method is 
called but there is still a task in the queue (with the second exchange). This 
task is processed after doDone() was executed but it’s not aggregated because 
of a done check in aggregate() of MulticastTransactedTask.

 

We found the problem in Camel 3.14, but it is still present in Camel 3.22.

 

It can be reproduced with the following test (it works if you remove the 
transacted tag from the route)
{code:java}
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.dataformat.zipfile.ZipSplitter;
import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.h2.jdbcx.JdbcDataSource;
import org.junit.Test;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;

public class ZipSplitterTest extends CamelTestSupport  {    

String zipArchiveWithTwoFiles = 
"UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA=";
    
    @Test
    public void testIfAllSplitsAggregated() throws Exception {
        MockEndpoint mock = getMockEndpoint("mock:result");
        template.sendBody("direct:start", "");
      
        // Check if second file was processed in aggregate() method of 
AggregationStrategy
        assertEquals("Orders2.xml", 
mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", 
String.class));
    }    

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
  
                JdbcDataSource dataSource = new JdbcDataSource();
                dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
                dataSource.setUser("sa");
                dataSource.setPassword("");                

                DataSourceTransactionManager txManager = new 
DataSourceTransactionManager(dataSource);
                TransactionTemplate transactionTemplate = new 
TransactionTemplate(txManager);
                
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
                
transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
                transactionTemplate.setTimeout(1800);                
SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy();
                springTransactionPolicy.setTransactionManager(txManager);
                
springTransactionPolicy.setTransactionTemplate(transactionTemplate);
                
                getContext().getRegistry().bind("transacted", 
springTransactionPolicy);
                getContext().getRegistry().bind("zipSplitter", new 
ZipSplitter());

                from("direct:start")
                    .transacted("transacted")
                    .setBody().simple(zipArchiveWithTwoFiles)
                    .unmarshal().base64()
                    
.split().ref("zipSplitter").streaming().aggregationStrategy(new 
StringAggregationStrategy())
                        .log("Splitted")
                    .end()
                    .to("mock:result");
            }
        };
    }
    
    private static class StringAggregationStrategy implements 
AggregationStrategy {
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
           
System.out.println(newExchange.getMessage().getHeader("CamelFileName"));
           return newExchange;
        }
    }
}{code}
 

 

  was:
A transacted route with ZipSplitter and Aggregation Strategy does not aggregate 
the last zip file entry. The issue only occurs for transacted routes.

 

Example:

 

_Zip Archive_
 * _A.xml_
 * _B.xml_

 

Both splits are processed but only for the first exchange (A.xml) the aggregate 
method is called.

For a zip archive with two entries the doRun() method of 
MulticastTransactedTask is called three times. The third time iterator.next() 
returns null although hasNext() was true. As a result the doDone() method is 
called but there is still a task in the queue (with the second exchange). This 
task is processed after doDone() was executed but it’s not aggregated because 
of a done check in aggregate() of MulticastTransactedTask.

 

We found the problem in Camel 3.14, but it is still present in Camel 3.22.

 

It can be reproduced with the following test (it works if you remove the 
transacted tag from the route)
{code:java}
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.dataformat.zipfile.ZipSplitter;
import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.h2.jdbcx.JdbcDataSource;
import org.junit.Test;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;

public class ZipSplitterTest extends CamelTestSupport  {    

String zipArchiveWithTwoFiles 
="UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA=";
    
    @Test
    public void testIfAllSplitsAggregated() throws Exception {
        MockEndpoint mock = getMockEndpoint("mock:result");
        template.sendBody("direct:start", "");        
        // Check if second file was processed in aggregate() method of 
AggregationStrategy
        assertEquals("Orders2.xml", 
mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", 
String.class));
    }    

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
  
                JdbcDataSource dataSource = new JdbcDataSource();
                dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
                dataSource.setUser("sa");
                dataSource.setPassword("");                

                DataSourceTransactionManager txManager = new 
DataSourceTransactionManager(dataSource);
                TransactionTemplate transactionTemplate = new 
TransactionTemplate(txManager);
                  
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
                
transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
                transactionTemplate.setTimeout(1800);                
SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy();
                springTransactionPolicy.setTransactionManager(txManager);
                
springTransactionPolicy.setTransactionTemplate(transactionTemplate);
                
                getContext().getRegistry().bind("transacted", 
springTransactionPolicy);
                getContext().getRegistry().bind("zipSplitter", new 
ZipSplitter());                from("direct:start")
                    .transacted("transacted")
                    .setBody().simple(zipArchiveWithTwoFiles)
                    .unmarshal().base64()
                    
.split().ref("zipSplitter").streaming().aggregationStrategy(new 
StringAggregationStrategy())
                        .log("Splitted")
                    .end()
                    .to("mock:result");
            }
        };
    }
    
    private static class StringAggregationStrategy implements 
AggregationStrategy {
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
           
System.out.println(newExchange.getMessage().getHeader("CamelFileName"));
           return newExchange;
        }
    }
}{code}
 

 


> ZipSplitter with AggregationStrategy does not aggregate all splits
> ------------------------------------------------------------------
>
>                 Key: CAMEL-21114
>                 URL: https://issues.apache.org/jira/browse/CAMEL-21114
>             Project: Camel
>          Issue Type: Bug
>    Affects Versions: 3.14.10
>            Reporter: Andre Weickel
>            Priority: Major
>
> A transacted route with ZipSplitter and Aggregation Strategy does not 
> aggregate the last zip file entry. The issue only occurs for transacted 
> routes.
>  
> Example:
>  
> _Zip Archive_
>  * _A.xml_
>  * _B.xml_
>  
> Both splits are processed but only for the first exchange (A.xml) the 
> aggregate method is called.
> For a zip archive with two entries the doRun() method of 
> MulticastTransactedTask is called three times. The third time iterator.next() 
> returns null although hasNext() was true. As a result the doDone() method is 
> called but there is still a task in the queue (with the second exchange). 
> This task is processed after doDone() was executed but it’s not aggregated 
> because of a done check in aggregate() of MulticastTransactedTask.
>  
> We found the problem in Camel 3.14, but it is still present in Camel 3.22.
>  
> It can be reproduced with the following test (it works if you remove the 
> transacted tag from the route)
> {code:java}
> import org.apache.camel.AggregationStrategy;
> import org.apache.camel.Exchange;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.dataformat.zipfile.ZipSplitter;
> import org.apache.camel.spring.spi.SpringTransactionPolicy;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.h2.jdbcx.JdbcDataSource;
> import org.junit.Test;
> import org.springframework.jdbc.datasource.DataSourceTransactionManager;
> import org.springframework.transaction.support.TransactionTemplate;
> public class ZipSplitterTest extends CamelTestSupport  {    
> String zipArchiveWithTwoFiles = 
> "UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA=";
>     
>     @Test
>     public void testIfAllSplitsAggregated() throws Exception {
>         MockEndpoint mock = getMockEndpoint("mock:result");
>         template.sendBody("direct:start", "");
>       
>         // Check if second file was processed in aggregate() method of 
> AggregationStrategy
>         assertEquals("Orders2.xml", 
> mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", 
> String.class));
>     }    
>     @Override
>     protected RouteBuilder createRouteBuilder() throws Exception {
>         return new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>   
>                 JdbcDataSource dataSource = new JdbcDataSource();
>                 dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
>                 dataSource.setUser("sa");
>                 dataSource.setPassword("");                
>                 DataSourceTransactionManager txManager = new 
> DataSourceTransactionManager(dataSource);
>                 TransactionTemplate transactionTemplate = new 
> TransactionTemplate(txManager);
>                 
> transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
>                 
> transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
>                 transactionTemplate.setTimeout(1800);                
> SpringTransactionPolicy springTransactionPolicy = new 
> SpringTransactionPolicy();
>                 springTransactionPolicy.setTransactionManager(txManager);
>                 
> springTransactionPolicy.setTransactionTemplate(transactionTemplate);
>                 
>                 getContext().getRegistry().bind("transacted", 
> springTransactionPolicy);
>                 getContext().getRegistry().bind("zipSplitter", new 
> ZipSplitter());
>                 from("direct:start")
>                     .transacted("transacted")
>                     .setBody().simple(zipArchiveWithTwoFiles)
>                     .unmarshal().base64()
>                     
> .split().ref("zipSplitter").streaming().aggregationStrategy(new 
> StringAggregationStrategy())
>                         .log("Splitted")
>                     .end()
>                     .to("mock:result");
>             }
>         };
>     }
>     
>     private static class StringAggregationStrategy implements 
> AggregationStrategy {
>         @Override
>         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) 
> {
>            
> System.out.println(newExchange.getMessage().getHeader("CamelFileName"));
>            return newExchange;
>         }
>     }
> }{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to