[
https://issues.apache.org/jira/browse/CAMEL-21114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andre Weickel updated CAMEL-21114:
----------------------------------
Description:
A transacted route with ZipSplitter and Aggregation Strategy does not aggregate
the last zip file entry. The issue only occurs for transacted routes.
Example:
_Zip Archive_
* _A.xml_
* _B.xml_
Both splits are processed but only for the first exchange (A.xml) the aggregate
method is called.
For a zip archive with two entries the doRun() method of
MulticastTransactedTask is called three times. The third time iterator.next()
returns null although hasNext() was true. As a result the doDone() method is
called but there is still a task in the queue (with the second exchange). This
task is processed after doDone() was executed but it’s not aggregated because
of a done check in aggregate() of MulticastTransactedTask.
We found the problem in Camel 3.14, but it is still present in Camel 3.22.
It can be reproduced with the following test (it works if you remove the
transacted tag from the route)
{code:java}
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.dataformat.zipfile.ZipSplitter;
import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.h2.jdbcx.JdbcDataSource;
import org.junit.Test;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
public class ZipSplitterTest extends CamelTestSupport {
String zipArchiveWithTwoFiles =
"UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA=";
@Test
public void testIfAllSplitsAggregated() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
template.sendBody("direct:start", "");
// Check if second file was processed in aggregate() method of
AggregationStrategy
assertEquals("Orders2.xml",
mock.getExchanges().get(0).getMessage().getHeader("CamelFileName",
String.class));
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
JdbcDataSource dataSource = new JdbcDataSource();
dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
dataSource.setUser("sa");
dataSource.setPassword("");
DataSourceTransactionManager txManager = new
DataSourceTransactionManager(dataSource);
TransactionTemplate transactionTemplate = new
TransactionTemplate(txManager);
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
transactionTemplate.setTimeout(1800);
SpringTransactionPolicy springTransactionPolicy = new
SpringTransactionPolicy();
springTransactionPolicy.setTransactionManager(txManager);
springTransactionPolicy.setTransactionTemplate(transactionTemplate);
getContext().getRegistry().bind("transacted",
springTransactionPolicy);
getContext().getRegistry().bind("zipSplitter", new
ZipSplitter());
from("direct:start")
.transacted("transacted")
.setBody().simple(zipArchiveWithTwoFiles)
.unmarshal().base64()
.split().ref("zipSplitter").streaming().aggregationStrategy(new
StringAggregationStrategy())
.log("Splitted")
.end()
.to("mock:result");
}
};
}
private static class StringAggregationStrategy implements
AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
System.out.println(newExchange.getMessage().getHeader("CamelFileName"));
return newExchange;
}
}
}{code}
was:
A transacted route with ZipSplitter and Aggregation Strategy does not aggregate
the last zip file entry. The issue only occurs for transacted routes.
Example:
_Zip Archive_
* _A.xml_
* _B.xml_
Both splits are processed but only for the first exchange (A.xml) the aggregate
method is called.
For a zip archive with two entries the doRun() method of
MulticastTransactedTask is called three times. The third time iterator.next()
returns null although hasNext() was true. As a result the doDone() method is
called but there is still a task in the queue (with the second exchange). This
task is processed after doDone() was executed but it’s not aggregated because
of a done check in aggregate() of MulticastTransactedTask.
We found the problem in Camel 3.14, but it is still present in Camel 3.22.
It can be reproduced with the following test (it works if you remove the
transacted tag from the route)
{code:java}
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.dataformat.zipfile.ZipSplitter;
import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.h2.jdbcx.JdbcDataSource;
import org.junit.Test;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
public class ZipSplitterTest extends CamelTestSupport {
String zipArchiveWithTwoFiles =
"UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA=";
@Test
public void testIfAllSplitsAggregated() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
template.sendBody("direct:start", "");
// Check if second file was processed in aggregate() method of
AggregationStrategy
assertEquals("Orders2.xml",
mock.getExchanges().get(0).getMessage().getHeader("CamelFileName",
String.class));
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
JdbcDataSource dataSource = new JdbcDataSource();
dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
dataSource.setUser("sa");
dataSource.setPassword("");
DataSourceTransactionManager txManager = new
DataSourceTransactionManager(dataSource);
TransactionTemplate transactionTemplate = new
TransactionTemplate(txManager);
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
transactionTemplate.setTimeout(1800);
SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy();
springTransactionPolicy.setTransactionManager(txManager);
springTransactionPolicy.setTransactionTemplate(transactionTemplate);
getContext().getRegistry().bind("transacted",
springTransactionPolicy);
getContext().getRegistry().bind("zipSplitter", new
ZipSplitter());
from("direct:start")
.transacted("transacted")
.setBody().simple(zipArchiveWithTwoFiles)
.unmarshal().base64()
.split().ref("zipSplitter").streaming().aggregationStrategy(new
StringAggregationStrategy())
.log("Splitted")
.end()
.to("mock:result");
}
};
}
private static class StringAggregationStrategy implements
AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
System.out.println(newExchange.getMessage().getHeader("CamelFileName"));
return newExchange;
}
}
}{code}
> ZipSplitter with AggregationStrategy does not aggregate all splits
> ------------------------------------------------------------------
>
> Key: CAMEL-21114
> URL: https://issues.apache.org/jira/browse/CAMEL-21114
> Project: Camel
> Issue Type: Bug
> Affects Versions: 3.14.10
> Reporter: Andre Weickel
> Priority: Major
>
> A transacted route with ZipSplitter and Aggregation Strategy does not
> aggregate the last zip file entry. The issue only occurs for transacted
> routes.
>
> Example:
>
> _Zip Archive_
> * _A.xml_
> * _B.xml_
>
> Both splits are processed but only for the first exchange (A.xml) the
> aggregate method is called.
> For a zip archive with two entries the doRun() method of
> MulticastTransactedTask is called three times. The third time iterator.next()
> returns null although hasNext() was true. As a result the doDone() method is
> called but there is still a task in the queue (with the second exchange).
> This task is processed after doDone() was executed but it’s not aggregated
> because of a done check in aggregate() of MulticastTransactedTask.
>
> We found the problem in Camel 3.14, but it is still present in Camel 3.22.
>
> It can be reproduced with the following test (it works if you remove the
> transacted tag from the route)
> {code:java}
> import org.apache.camel.AggregationStrategy;
> import org.apache.camel.Exchange;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.dataformat.zipfile.ZipSplitter;
> import org.apache.camel.spring.spi.SpringTransactionPolicy;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.h2.jdbcx.JdbcDataSource;
> import org.junit.Test;
> import org.springframework.jdbc.datasource.DataSourceTransactionManager;
> import org.springframework.transaction.support.TransactionTemplate;
> public class ZipSplitterTest extends CamelTestSupport {
> String zipArchiveWithTwoFiles =
> "UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA=";
>
> @Test
> public void testIfAllSplitsAggregated() throws Exception {
> MockEndpoint mock = getMockEndpoint("mock:result");
> template.sendBody("direct:start", "");
>
> // Check if second file was processed in aggregate() method of
> AggregationStrategy
> assertEquals("Orders2.xml",
> mock.getExchanges().get(0).getMessage().getHeader("CamelFileName",
> String.class));
> }
> @Override
> protected RouteBuilder createRouteBuilder() throws Exception {
> return new RouteBuilder() {
> @Override
> public void configure() throws Exception {
>
> JdbcDataSource dataSource = new JdbcDataSource();
> dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
> dataSource.setUser("sa");
> dataSource.setPassword("");
> DataSourceTransactionManager txManager = new
> DataSourceTransactionManager(dataSource);
>
> TransactionTemplate transactionTemplate = new
> TransactionTemplate(txManager);
>
> transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
>
> transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
> transactionTemplate.setTimeout(1800);
> SpringTransactionPolicy springTransactionPolicy = new
> SpringTransactionPolicy();
> springTransactionPolicy.setTransactionManager(txManager);
>
> springTransactionPolicy.setTransactionTemplate(transactionTemplate);
>
> getContext().getRegistry().bind("transacted",
> springTransactionPolicy);
> getContext().getRegistry().bind("zipSplitter", new
> ZipSplitter());
> from("direct:start")
> .transacted("transacted")
> .setBody().simple(zipArchiveWithTwoFiles)
> .unmarshal().base64()
>
> .split().ref("zipSplitter").streaming().aggregationStrategy(new
> StringAggregationStrategy())
> .log("Splitted")
> .end()
> .to("mock:result");
> }
> };
> }
>
> private static class StringAggregationStrategy implements
> AggregationStrategy {
> @Override
> public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
> {
>
> System.out.println(newExchange.getMessage().getHeader("CamelFileName"));
> return newExchange;
> }
> }
> }{code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)