This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 82681c5 [CAMEL-4271] JDBCAggregateRepository and Recovery in a
Cluster (#6812)
82681c5 is described below
commit 82681c5505ddb7c8fcb2dad59d7fdcf5eb78d437
Author: Benjamin BONNET <[email protected]>
AuthorDate: Wed Jan 26 06:53:22 2022 +0100
[CAMEL-4271] JDBCAggregateRepository and Recovery in a Cluster (#6812)
* CAMEL-4271 : recover task in cluster
* Specializing JdbcAggregationRepository for cluster
---
.../camel-sql/src/main/docs/sql-component.adoc | 11 ++
.../jdbc/ClusteredJdbcAggregationRepository.java | 203 +++++++++++++++++++++
...=> ClusteredPostgresAggregationRepository.java} | 14 +-
.../aggregate/jdbc/JdbcAggregationRepository.java | 24 ++-
.../jdbc/PostgresAggregationRepository.java | 4 +-
...bstractClusteredJdbcAggregationTestSupport.java | 65 +++++++
.../jdbc/ClusteredJdbcAggregateRecoverTest.java | 87 +++++++++
.../aggregate/jdbc/JdbcSpringDataSource.xml | 22 ++-
.../camel-sql/src/test/resources/sql/init5.sql | 30 +++
9 files changed, 442 insertions(+), 18 deletions(-)
diff --git a/components/camel-sql/src/main/docs/sql-component.adoc
b/components/camel-sql/src/main/docs/sql-component.adoc
index 82ba8c0..6988467 100644
--- a/components/camel-sql/src/main/docs/sql-component.adoc
+++ b/components/camel-sql/src/main/docs/sql-component.adoc
@@ -747,6 +747,17 @@
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
Propagation is specified by constants of
`org.springframework.transaction.TransactionDefinition` interface,
so `propagationBehaviorName` is convenient setter that allows to use names of
the constants.
+=== Clustering
+JdbcAggregationRepository does not provide recovery in a clustered environment.
+
+You may use ClusteredJdbcAggregationRepository that provides a limited support
for recovery in a clustered environment : recovery mechanism is dealt
separately by members of the cluster, i.e. a member may only recover exchanges
that it completed itself.
+
+To enable this behaviour, property `recoverByInstance` must be set to true,
and `instanceId` property must be defined using a unique identifier (a string)
for each member of the cluster.
+
+Besides, completed table must have a `instance_id VARCHAR(255)` column.
+
+N.B. : since each member is the only responsible for the recovery of its
completed exchanges, if a member is stopped, its completed exchanges will not
be recovered until it is restarted, unless you update completed table to affect
them to another member (by changing `instance_id` for those completed
exchanges).
+
=== PostgreSQL case
There's special database that may cause problems with optimistic locking used
by `JdbcAggregationRepository`.
diff --git
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
new file mode 100644
index 0000000..b65f35d
--- /dev/null
+++
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.RowMapper;
+import
org.springframework.jdbc.core.support.AbstractLobCreatingPreparedStatementCallback;
+import org.springframework.jdbc.support.lob.LobCreator;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import
org.springframework.transaction.support.TransactionCallbackWithoutResult;
+
+/**
+ * JDBC based {@link org.apache.camel.spi.AggregationRepository}
JdbcAggregationRepository will only preserve any
+ * Serializable compatible data types. If a data type is not such a type its
dropped and a WARN is logged. And it only
+ * persists the Message body and the Message headers. The Exchange properties
are not persisted.
+ */
+public class ClusteredJdbcAggregationRepository extends
JdbcAggregationRepository {
+
+ private static final String INSTANCE_ID = "instance_id";
+ private static final Logger LOG =
LoggerFactory.getLogger(ClusteredJdbcAggregationRepository.class);
+
+ private String instanceId = "DEFAULT";
+ private boolean recoveryByInstance;
+
+ /**
+ * Creates an aggregation repository
+ */
+ public ClusteredJdbcAggregationRepository() {
+ }
+
+ /**
+ * Creates an aggregation repository with the three mandatory parameters
+ */
+ public ClusteredJdbcAggregationRepository(PlatformTransactionManager
transactionManager, String repositoryName,
+ DataSource dataSource) {
+ this.setRepositoryName(repositoryName);
+ this.setTransactionManager(transactionManager);
+ this.setDataSource(dataSource);
+ }
+
+ @Override
+ public void remove(final CamelContext camelContext, final String
correlationId, final Exchange exchange) {
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus
status) {
+ final String key = correlationId;
+ final String confirmKey = exchange.getExchangeId();
+ final long version = exchange.getProperty(VERSION_PROPERTY,
Long.class);
+ try {
+ LOG.debug("Removing key {}", key);
+
+ jdbcTemplate.update("DELETE FROM " + getRepositoryName() +
" WHERE " + ID + " = ? AND " + VERSION + " = ?",
+ key, version);
+
+ insert(camelContext, confirmKey, exchange,
getRepositoryNameCompleted(), version, true);
+
+ } catch (Exception e) {
+ throw new RuntimeException("Error removing key " + key + "
from repository " + getRepositoryName(), e);
+ }
+ }
+ });
+ }
+
+ /**
+ * Inserts a new record into the given repository table. Note: the
exchange properties are NOT persisted.
+ *
+ * @param camelContext Current CamelContext
+ * @param correlationId Correlation key
+ * @param exchange Aggregated exchange to insert
+ * @param repositoryName Table's name
+ * @param version Version identifier
+ */
+ protected void insert(
+ final CamelContext camelContext, final String correlationId, final
Exchange exchange,
+ final String repositoryName, final Long version, final boolean
completed)
+ throws Exception {
+ // The default totalParameterIndex is 3 for ID, Exchange and version.
Depending
+ // on logic this will be increased.
+ int totalParameterIndex = 3;
+ StringBuilder queryBuilder = new StringBuilder().append("INSERT INTO
").append(repositoryName).append('(')
+ .append(EXCHANGE).append(", ").append(ID).append(",
").append(VERSION);
+
+ if (isStoreBodyAsText()) {
+ queryBuilder.append(", ").append(BODY);
+ totalParameterIndex++;
+ }
+
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName : getHeadersToStoreAsText()) {
+ queryBuilder.append(", ").append(headerName);
+ totalParameterIndex++;
+ }
+ }
+ if (completed && isRecoveryByInstance()) {
+ queryBuilder.append(", ").append(INSTANCE_ID);
+ totalParameterIndex++;
+ }
+ queryBuilder.append(") VALUES (");
+
+ for (int i = 0; i < totalParameterIndex - 1; i++) {
+ queryBuilder.append("?, ");
+ }
+ queryBuilder.append("?)");
+
+ String sql = queryBuilder.toString();
+
+ insertHelper(camelContext, correlationId, exchange, sql, version,
completed);
+ }
+
+ protected int insertHelper(
+ final CamelContext camelContext, final String key, final Exchange
exchange,
+ final String sql, final Long version, final boolean completed)
+ throws Exception {
+ final byte[] data = codec.marshallExchange(camelContext, exchange,
allowSerializedHeaders);
+ Integer insertCount = super.jdbcTemplate.execute(sql,
+ new
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
+ @Override
+ protected void setValues(PreparedStatement ps, LobCreator
lobCreator) throws SQLException {
+ int totalParameterIndex = 0;
+ lobCreator.setBlobAsBytes(ps, ++totalParameterIndex,
data);
+ ps.setString(++totalParameterIndex, key);
+ ps.setLong(++totalParameterIndex, version);
+ if (isStoreBodyAsText()) {
+ ps.setString(++totalParameterIndex,
exchange.getIn().getBody(String.class));
+ }
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName :
getHeadersToStoreAsText()) {
+ String headerValue =
exchange.getIn().getHeader(headerName, String.class);
+ ps.setString(++totalParameterIndex,
headerValue);
+ }
+ }
+ if (completed && isRecoveryByInstance()) {
+ ps.setString(++totalParameterIndex, instanceId);
+ }
+ }
+ });
+ return insertCount == null ? 0 : insertCount;
+ }
+
+ @Override
+ public Set<String> scan(final CamelContext camelContext) {
+ return transactionTemplateReadOnly.execute(new
TransactionCallback<LinkedHashSet<String>>() {
+ public LinkedHashSet<String> doInTransaction(final
TransactionStatus status) {
+ final List<String> keys = jdbcTemplate.query(
+ "SELECT " + ID + " FROM " +
getRepositoryNameCompleted()
+ +
(isRecoveryByInstance()
+ ? " WHERE
INSTANCE_ID='" + instanceId + "'" : ""),
+ new RowMapper<String>() {
+ public String mapRow(final ResultSet rs, final int
rowNum) throws SQLException {
+ final String id = rs.getString(ID);
+ LOG.trace("getKey {}", id);
+ return id;
+ }
+ });
+ return new LinkedHashSet<>(keys);
+ }
+ });
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public void setInstanceId(final String instanceId) {
+ this.instanceId = instanceId;
+ }
+
+ public boolean isRecoveryByInstance() {
+ return recoveryByInstance;
+ }
+
+ public void setRecoveryByInstance(final boolean recoveryByInstance) {
+ this.recoveryByInstance = recoveryByInstance;
+ }
+
+}
diff --git
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredPostgresAggregationRepository.java
similarity index 86%
copy from
components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
copy to
components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredPostgresAggregationRepository.java
index 28b48f3..017fa0d 100644
---
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
+++
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredPostgresAggregationRepository.java
@@ -27,19 +27,19 @@ import
org.springframework.transaction.PlatformTransactionManager;
* PostgreSQL specific {@link JdbcAggregationRepository} that deals with SQL
Violation Exceptions using special
* {@code INSERT INTO .. ON CONFLICT DO NOTHING} claues.
*/
-public class PostgresAggregationRepository extends JdbcAggregationRepository {
+public class ClusteredPostgresAggregationRepository extends
ClusteredJdbcAggregationRepository {
/**
* Creates an aggregation repository
*/
- public PostgresAggregationRepository() {
+ public ClusteredPostgresAggregationRepository() {
}
/**
* Creates an aggregation repository with the three mandatory parameters
*/
- public PostgresAggregationRepository(PlatformTransactionManager
transactionManager, String repositoryName,
- DataSource dataSource) {
+ public ClusteredPostgresAggregationRepository(PlatformTransactionManager
transactionManager, String repositoryName,
+ DataSource dataSource) {
super(transactionManager, repositoryName, dataSource);
}
@@ -51,8 +51,10 @@ public class PostgresAggregationRepository extends
JdbcAggregationRepository {
* @param exchange the aggregated exchange
* @param repositoryName The name of the table
*/
+ @Override
protected void insert(
- final CamelContext camelContext, final String correlationId, final
Exchange exchange, String repositoryName)
+ final CamelContext camelContext, final String correlationId, final
Exchange exchange, String repositoryName,
+ final Long version, final boolean completed)
throws Exception {
// The default totalParameterIndex is 2 for ID and Exchange. Depending
on logic this will be increased
int totalParameterIndex = 2;
@@ -85,7 +87,7 @@ public class PostgresAggregationRepository extends
JdbcAggregationRepository {
String sql = queryBuilder.toString();
- int updateCount = insertHelper(camelContext, correlationId, exchange,
sql, 1L);
+ int updateCount = insertHelper(camelContext, correlationId, exchange,
sql, 1L, completed);
if (updateCount == 0 && getRepositoryName().equals(repositoryName)) {
throw new DataIntegrityViolationException("No row was inserted due
to data violation");
}
diff --git
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
index 5cc88a6..1629fb6 100644
---
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
+++
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
@@ -69,31 +69,32 @@ public class JdbcAggregationRepository extends
ServiceSupport
protected static final String BODY = "body";
// optimistic locking: version identifier needed to avoid the lost update
problem
- private static final String VERSION = "version";
- private static final String VERSION_PROPERTY =
"CamelOptimisticLockVersion";
+ protected static final String VERSION = "version";
+ protected static final String VERSION_PROPERTY =
"CamelOptimisticLockVersion";
private static final Logger LOG =
LoggerFactory.getLogger(JdbcAggregationRepository.class);
private static final Constants PROPAGATION_CONSTANTS = new
Constants(TransactionDefinition.class);
+ protected JdbcCamelCodec codec = new JdbcCamelCodec();
+ protected JdbcTemplate jdbcTemplate;
+ protected TransactionTemplate transactionTemplate;
+ protected TransactionTemplate transactionTemplateReadOnly;
+ protected boolean allowSerializedHeaders;
+
private JdbcOptimisticLockingExceptionMapper
jdbcOptimisticLockingExceptionMapper
= new DefaultJdbcOptimisticLockingExceptionMapper();
private PlatformTransactionManager transactionManager;
private DataSource dataSource;
- private TransactionTemplate transactionTemplate;
- private TransactionTemplate transactionTemplateReadOnly;
private int propagationBehavior =
TransactionDefinition.PROPAGATION_REQUIRED;
- private JdbcTemplate jdbcTemplate;
private LobHandler lobHandler = new DefaultLobHandler();
private String repositoryName;
private boolean returnOldExchange;
- private JdbcCamelCodec codec = new JdbcCamelCodec();
private long recoveryInterval = 5000;
private boolean useRecovery = true;
private int maximumRedeliveries;
private String deadLetterUri;
private List<String> headersToStoreAsText;
private boolean storeBodyAsText;
- private boolean allowSerializedHeaders;
/**
* Creates an aggregation repository
@@ -402,9 +403,12 @@ public class JdbcAggregationRepository extends
ServiceSupport
protected void doInTransactionWithoutResult(TransactionStatus
status) {
LOG.debug("Confirming exchangeId {}", exchangeId);
final String confirmKey = exchangeId;
-
- jdbcTemplate.update("DELETE FROM " +
getRepositoryNameCompleted() + " WHERE " + ID + " = ?",
- confirmKey);
+ final int mustBeOne = jdbcTemplate
+ .update("DELETE FROM " + getRepositoryNameCompleted()
+ " WHERE " + ID + " = ?", confirmKey);
+ if (mustBeOne != 1) {
+ LOG.error("problem removing row " + confirmKey + " from "
+ getRepositoryNameCompleted()
+ + " - DELETE statement did not return 1 but " +
mustBeOne);
+ }
}
});
diff --git
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
index 28b48f3..4f1da5b 100644
---
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
+++
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
@@ -51,8 +51,10 @@ public class PostgresAggregationRepository extends
JdbcAggregationRepository {
* @param exchange the aggregated exchange
* @param repositoryName The name of the table
*/
+ @Override
protected void insert(
- final CamelContext camelContext, final String correlationId, final
Exchange exchange, String repositoryName)
+ final CamelContext camelContext, final String correlationId, final
Exchange exchange, String repositoryName,
+ final Long version)
throws Exception {
// The default totalParameterIndex is 2 for ID and Exchange. Depending
on logic this will be increased
int totalParameterIndex = 2;
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractClusteredJdbcAggregationTestSupport.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractClusteredJdbcAggregationTestSupport.java
new file mode 100644
index 0000000..649c3ea
--- /dev/null
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractClusteredJdbcAggregationTestSupport.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.test.spring.junit5.CamelSpringTestSupport;
+import org.springframework.context.support.AbstractApplicationContext;
+
+public abstract class AbstractClusteredJdbcAggregationTestSupport extends
CamelSpringTestSupport {
+
+ ClusteredJdbcAggregationRepository repo;
+ ClusteredJdbcAggregationRepository repobis;
+
+ @Override
+ public void postProcessTest() throws Exception {
+ super.postProcessTest();
+
+ repo = applicationContext.getBean("repo5",
ClusteredJdbcAggregationRepository.class);
+ repobis = applicationContext.getBean("repo6",
ClusteredJdbcAggregationRepository.class);
+ configureJdbcAggregationRepository();
+ }
+
+ void configureJdbcAggregationRepository() {
+ }
+
+ long getCompletionInterval() {
+ return 5000;
+ }
+
+ @Override
+ protected AbstractApplicationContext createApplicationContext() {
+ return newAppContext(
+ "JdbcSpringDataSource.xml", "JdbcSpringDataSource.xml");
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregateRecoverTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregateRecoverTest.java
new file mode 100644
index 0000000..4df1263
--- /dev/null
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregateRecoverTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+public class ClusteredJdbcAggregateRecoverTest extends
AbstractClusteredJdbcAggregationTestSupport {
+
+ private static AtomicInteger counter = new AtomicInteger();
+
+ @Override
+ void configureJdbcAggregationRepository() {
+ // enable recovery
+ repo.setUseRecovery(true);
+ // check faster
+ repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+ repo.setRecoveryByInstance(true);
+ repo.setInstanceId("INSTANCE1");
+ repobis.setUseRecovery(true);
+ repobis.setRecoveryInterval(50, TimeUnit.MILLISECONDS);
+ repobis.setRecoveryByInstance(true);
+ repobis.setInstanceId("INSTANCE2");
+
+ }
+
+ @Test
+ public void testJdbcAggregateRecover() throws Exception {
+ // should fail the first 2 times and then recover
+ getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+ getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+ // should be marked as redelivered
+
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+ // on the 2nd redelivery attempt we success
+
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+
+ template.sendBodyAndHeader("direct:start", "A", "id", 123);
+ template.sendBodyAndHeader("direct:start", "B", "id", 123);
+ template.sendBodyAndHeader("direct:start", "C", "id", 123);
+ template.sendBodyAndHeader("direct:start", "D", "id", 123);
+ template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+ assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").aggregate(header("id"), new
MyAggregationStrategy()).completionSize(5)
+ .aggregationRepository(repo)
+ .log("aggregated exchange id ${exchangeId} with
${body}").to("mock:aggregated").delay(1000)
+ // simulate errors the first two times
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ int count = counter.incrementAndGet();
+ if (count <= 2) {
+ throw new IllegalArgumentException("Damn");
+ }
+ }
+ }).to("mock:result").end();
+ from("direct:tutu").aggregate(header("id"), new
MyAggregationStrategy()).completionSize(5).aggregationRepository(repobis)
+ .log("aggregated exchange id ${exchangeId} with
${body}").log("recover bis!!!!!!!!!!!!!!!!!").end();
+ }
+ };
+ }
+}
diff --git
a/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
b/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
index a04a404..45648cb 100644
---
a/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
+++
b/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
@@ -55,12 +55,17 @@
<jdbc:embedded-database id="{{testClassSimpleName}}-dataSource4"
type="DERBY">
<jdbc:script location="classpath:/sql/init4.sql"/>
</jdbc:embedded-database>
+
+ <!-- In Memory Database #5 -->
+ <jdbc:embedded-database id="{{testClassSimpleName}}-dataSource5"
type="DERBY">
+ <jdbc:script location="classpath:/sql/init5.sql"/>
+ </jdbc:embedded-database>
<bean id="repo1"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="repositoryName" value="aggregationRepo1"/>
<property name="transactionManager" ref="txManager1"/>
<property name="dataSource" ref="{{testClassSimpleName}}-dataSource1"/>
- </bean>
+ </bean>
<bean id="repo2"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="repositoryName" value="aggregationRepo2"/>
@@ -97,6 +102,17 @@
</list>
</property>
</bean>
+
+ <bean id="repo5"
class="org.apache.camel.processor.aggregate.jdbc.ClusteredJdbcAggregationRepository">
+ <property name="repositoryName" value="aggregationRepo5"/>
+ <property name="transactionManager" ref="txManager5"/>
+ <property name="dataSource" ref="{{testClassSimpleName}}-dataSource5"/>
+ </bean>
+ <bean id="repo6"
class="org.apache.camel.processor.aggregate.jdbc.ClusteredJdbcAggregationRepository">
+ <property name="repositoryName" value="aggregationRepo5"/>
+ <property name="transactionManager" ref="txManager5"/>
+ <property name="dataSource" ref="{{testClassSimpleName}}-dataSource5"/>
+ </bean>
<bean id="txManager1"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="{{testClassSimpleName}}-dataSource1"/>
@@ -113,5 +129,9 @@
<bean id="txManager4"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="{{testClassSimpleName}}-dataSource4"/>
</bean>
+
+ <bean id="txManager5"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
+ <property name="dataSource" ref="{{testClassSimpleName}}-dataSource5"/>
+ </bean>
</beans>
diff --git a/components/camel-sql/src/test/resources/sql/init5.sql
b/components/camel-sql/src/test/resources/sql/init5.sql
new file mode 100644
index 0000000..02c264d
--- /dev/null
+++ b/components/camel-sql/src/test/resources/sql/init5.sql
@@ -0,0 +1,30 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+CREATE TABLE aggregationRepo5 (
+ id varchar(255) NOT NULL,
+ exchange blob NOT NULL,
+ version bigint NOT NULL,
+ constraint aggregationRepo1_pk PRIMARY KEY (id)
+);
+CREATE TABLE aggregationRepo5_completed (
+ id varchar(255) NOT NULL,
+ exchange blob NOT NULL,
+ version bigint NOT NULL,
+ instance_id varchar(255),
+ constraint aggregationRepo1_completed_pk PRIMARY KEY (id)
+);
\ No newline at end of file