This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c9c1c1e85c58a021eb76002d08e6cc16ace879a4
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Aug 26 11:08:19 2020 +0200

    CAMEL-15468: camel-redis - Polished and rebuild
---
 apache-camel/src/main/descriptors/common-bin.xml   |   1 +
 .../org/apache/camel/catalog/docs.properties       |   1 +
 .../org/apache/camel/catalog/docs/redis.adoc       |  11 +
 .../org/apache/camel/catalog/others.properties     |   1 +
 .../org/apache/camel/catalog/others/redis.json     |  15 ++
 components/camel-redis/pom.xml                     |  42 ++--
 .../services/org/apache/camel/other.properties     |   7 +
 .../camel-redis/src/generated/resources/redis.json |  15 ++
 components/camel-redis/src/main/docs/redis.adoc    |  11 +
 .../aggregate/RedisAggregationRepository.java      | 254 ++++++++++++---------
 .../camel-redis/src/test/AggregateRedisTest.java   |  51 -----
 .../processor/aggregate/AggregateRedisTest.java    |  67 ++++++
 .../aggregate}/MyAggregationStrategy.java          |  21 +-
 .../src/test/resources/log4j2.properties           |  28 +++
 docs/components/modules/others/nav.adoc            |   1 +
 docs/components/modules/others/pages/redis.adoc    |  13 ++
 parent/pom.xml                                     |   2 +-
 17 files changed, 353 insertions(+), 188 deletions(-)

diff --git a/apache-camel/src/main/descriptors/common-bin.xml 
b/apache-camel/src/main/descriptors/common-bin.xml
index 827a5f9..a69243c 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -307,6 +307,7 @@
         <include>org.apache.camel:camel-reactive-executor-vertx</include>
         <include>org.apache.camel:camel-reactive-streams</include>
         <include>org.apache.camel:camel-reactor</include>
+        <include>org.apache.camel:camel-redis</include>
         <include>org.apache.camel:camel-ref</include>
         <include>org.apache.camel:camel-resilience4j</include>
         <include>org.apache.camel:camel-rest</include>
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties
index 88166b3..b70630c 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties
@@ -395,6 +395,7 @@ reactive-executor-vertx
 reactive-streams-component
 reactor
 recipientList-eip
+redis
 ref-component
 ref-language
 removeHeader-eip
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/redis.adoc
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/redis.adoc
new file mode 100644
index 0000000..1e81d98
--- /dev/null
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/redis.adoc
@@ -0,0 +1,11 @@
+[[redis-component]]
+= Redis Component
+:docTitle: Redis
+:artifactId: camel-redis
+:description: Aggregation repository using Redis as datastore
+:since: 3.5
+:supportLevel: Preview
+
+*Since Camel {since}*
+
+The Redis component provides an `AggregationStrategy` to use Redis as the 
backend datastore.
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
index f4a9fc2..93ceff4 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties
@@ -17,6 +17,7 @@ opentracing
 platform-http-vertx
 reactive-executor-vertx
 reactor
+redis
 resilience4j
 ribbon
 rxjava
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/redis.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/redis.json
new file mode 100644
index 0000000..6bbfb91
--- /dev/null
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/redis.json
@@ -0,0 +1,15 @@
+{
+  "other": {
+    "kind": "other",
+    "name": "redis",
+    "title": "Redis",
+    "description": "Aggregation repository using Redis as datastore",
+    "deprecated": false,
+    "firstVersion": "3.5.0",
+    "label": "database",
+    "supportLevel": "Preview",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-redis",
+    "version": "3.5.0-SNAPSHOT"
+  }
+}
diff --git a/components/camel-redis/pom.xml b/components/camel-redis/pom.xml
index 700269e..50d73cb 100644
--- a/components/camel-redis/pom.xml
+++ b/components/camel-redis/pom.xml
@@ -17,7 +17,8 @@
     limitations under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
 
     <modelVersion>4.0.0</modelVersion>
 
@@ -30,35 +31,38 @@
     <artifactId>camel-redis</artifactId>
     <packaging>jar</packaging>
     <name>Camel :: Redis</name>
-    <description>Camel Redis support</description>
+    <description>Aggregation repository using Redis as datastore</description>
+
+    <properties>
+        <firstVersion>3.5.0</firstVersion>
+        <label>database</label>
+    </properties>
 
     <dependencies>
 
         <dependency>
-           <groupId>org.redisson</groupId>
-           <artifactId>redisson</artifactId>
-           <version>${redisson-version}</version>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-support</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.redisson</groupId>
+            <artifactId>redisson</artifactId>
+            <version>${redisson-version}</version>
         </dependency>
 
+        <!-- testing -->
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-test-spring-junit5</artifactId>
+            <artifactId>camel-test-junit5</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
             <scope>test</scope>
         </dependency>
 
     </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <systemPropertyVariables>
-                       <endpoint>${endpoint}</endpoint>
-                    </systemPropertyVariables>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
 </project>
diff --git 
a/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/other.properties
 
b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/other.properties
new file mode 100644
index 0000000..fc3636e
--- /dev/null
+++ 
b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/other.properties
@@ -0,0 +1,7 @@
+# Generated by camel build tools - do NOT edit this file!
+name=redis
+groupId=org.apache.camel
+artifactId=camel-redis
+version=3.5.0-SNAPSHOT
+projectName=Camel :: Redis
+projectDescription=Aggregation repository using Redis as datastore
diff --git a/components/camel-redis/src/generated/resources/redis.json 
b/components/camel-redis/src/generated/resources/redis.json
new file mode 100644
index 0000000..6bbfb91
--- /dev/null
+++ b/components/camel-redis/src/generated/resources/redis.json
@@ -0,0 +1,15 @@
+{
+  "other": {
+    "kind": "other",
+    "name": "redis",
+    "title": "Redis",
+    "description": "Aggregation repository using Redis as datastore",
+    "deprecated": false,
+    "firstVersion": "3.5.0",
+    "label": "database",
+    "supportLevel": "Preview",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-redis",
+    "version": "3.5.0-SNAPSHOT"
+  }
+}
diff --git a/components/camel-redis/src/main/docs/redis.adoc 
b/components/camel-redis/src/main/docs/redis.adoc
new file mode 100644
index 0000000..1e81d98
--- /dev/null
+++ b/components/camel-redis/src/main/docs/redis.adoc
@@ -0,0 +1,11 @@
+[[redis-component]]
+= Redis Component
+:docTitle: Redis
+:artifactId: camel-redis
+:description: Aggregation repository using Redis as datastore
+:since: 3.5
+:supportLevel: Preview
+
+*Since Camel {since}*
+
+The Redis component provides an `AggregationStrategy` to use Redis as the 
backend datastore.
diff --git 
a/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java
 
b/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java
index 4b5ab34..49934b4 100644
--- 
a/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java
+++ 
b/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java
@@ -14,27 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.redis.processor.aggregate;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.OptimisticLockingAggregationRepository;
 import org.apache.camel.spi.RecoverableAggregationRepository;
-import 
org.apache.camel.spi.OptimisticLockingAggregationRepository.OptimisticLockingException;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.DefaultExchangeHolder;
 import org.apache.camel.support.service.ServiceSupport;
@@ -49,52 +39,57 @@ import org.redisson.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * {@link org.apache.camel.spi.AggregationRepository} using Redis as store.
+ */
 public class RedisAggregationRepository extends ServiceSupport
-               implements RecoverableAggregationRepository, 
OptimisticLockingAggregationRepository {
-       private static final Logger LOG = 
LoggerFactory.getLogger(RedisAggregationRepository.class);
-       private static final String COMPLETED_SUFFIX = "-completed";
+        implements RecoverableAggregationRepository, 
OptimisticLockingAggregationRepository {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RedisAggregationRepository.class);
+    private static final String COMPLETED_SUFFIX = "-completed";
 
-       private boolean optimistic;
-       private boolean useRecovery = true;
-       private Map<String, DefaultExchangeHolder> cache;
+    private boolean optimistic;
+    private boolean useRecovery = true;
+    private Map<String, DefaultExchangeHolder> cache;
     private Map<String, DefaultExchangeHolder> persistedCache;
-       private String endpoint;
-       private String mapName;
-       private String persistenceMapName;
-       private RedissonClient redisson;
-       private String deadLetterChannel;
-       private long recoveryInterval = 5000;
-       private int maximumRedeliveries = 3;
-       private boolean allowSerializedHeaders;
-
-       public RedisAggregationRepository(final String mapName, final String 
endpoint) {
-               this.mapName = mapName;
-               this.persistenceMapName = String.format("%s%s", mapName, 
COMPLETED_SUFFIX);
-               this.optimistic = false;
-               this.endpoint = endpoint;
-       }
-
-       public RedisAggregationRepository(final String mapName, final String 
persistenceMapName,
-                       final String endpoint) {
-               this.mapName = mapName;
-               this.persistenceMapName = persistenceMapName;
-               this.optimistic = false;
-               this.endpoint = endpoint;
-       }
-
-       public RedisAggregationRepository(final String mapName, final String 
endpoint, boolean optimistic) {
-               this(mapName, endpoint);
-               this.optimistic = optimistic;
-       }
-
-       public RedisAggregationRepository(final String mapName, final String 
persistenceMapName,
-                       final String endpoint, boolean optimistic) {
-               this(mapName, persistenceMapName, endpoint);
-               this.optimistic = optimistic;
-       }
-
-       @Override
-    public Exchange add(CamelContext camelContext, String key, Exchange 
oldExchange, Exchange newExchange) throws OptimisticLockingException {
+    private String endpoint;
+    private String mapName;
+    private String persistenceMapName;
+    private RedissonClient redisson;
+    private boolean shutdownRedisson;
+    private String deadLetterChannel;
+    private long recoveryInterval = 5000;
+    private int maximumRedeliveries = 3;
+    private boolean allowSerializedHeaders;
+
+    public RedisAggregationRepository(final String mapName, final String 
endpoint) {
+        this.mapName = mapName;
+        this.persistenceMapName = String.format("%s%s", mapName, 
COMPLETED_SUFFIX);
+        this.optimistic = false;
+        this.endpoint = endpoint;
+    }
+
+    public RedisAggregationRepository(final String mapName, final String 
persistenceMapName,
+                                      final String endpoint) {
+        this.mapName = mapName;
+        this.persistenceMapName = persistenceMapName;
+        this.optimistic = false;
+        this.endpoint = endpoint;
+    }
+
+    public RedisAggregationRepository(final String mapName, final String 
endpoint, boolean optimistic) {
+        this(mapName, endpoint);
+        this.optimistic = optimistic;
+    }
+
+    public RedisAggregationRepository(final String mapName, final String 
persistenceMapName,
+                                      final String endpoint, boolean 
optimistic) {
+        this(mapName, persistenceMapName, endpoint);
+        this.optimistic = optimistic;
+    }
+
+    @Override
+    public Exchange add(CamelContext camelContext, String key, Exchange 
oldExchange, Exchange newExchange)
+            throws OptimisticLockingException {
         if (!optimistic) {
             throw new UnsupportedOperationException();
         }
@@ -104,15 +99,17 @@ public class RedisAggregationRepository extends 
ServiceSupport
             final DefaultExchangeHolder misbehaviorHolder = 
cache.putIfAbsent(key, holder);
             if (misbehaviorHolder != null) {
                 Exchange misbehaviorEx = unmarshallExchange(camelContext, 
misbehaviorHolder);
-                LOG.error("Optimistic locking failed for exchange with key {}: 
IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges 
to be returned",
+                LOG.warn(
+                        "Optimistic locking failed for exchange with key {}: 
IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges 
to be returned",
                         key, misbehaviorEx != null ? 
misbehaviorEx.getExchangeId() : "<null>");
-                throw  new OptimisticLockingException();
+                throw new OptimisticLockingException();
             }
         } else {
             DefaultExchangeHolder oldHolder = 
DefaultExchangeHolder.marshal(oldExchange, true, allowSerializedHeaders);
             DefaultExchangeHolder newHolder = 
DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders);
             if (!cache.replace(key, oldHolder, newHolder)) {
-                LOG.error("Optimistic locking failed for exchange with key {}: 
IMap#replace returned no Exchanges, while it's expected to replace one",
+                LOG.warn(
+                        "Optimistic locking failed for exchange with key {}: 
IMap#replace returned no Exchanges, while it's expected to replace one",
                         key);
                 throw new OptimisticLockingException();
             }
@@ -121,14 +118,14 @@ public class RedisAggregationRepository extends 
ServiceSupport
         return oldExchange;
     }
 
-       @Override
-       public Exchange add(CamelContext camelContext, String key, Exchange 
exchange) {
-               if (optimistic) {
-                       throw new UnsupportedOperationException();
-               }
-               LOG.trace("Adding an Exchange with ID {} for key {} in a 
thread-safe manner.", exchange.getExchangeId(), key);
-               RLock lock = redisson.getLock("aggregationLock");
-               try {
+    @Override
+    public Exchange add(CamelContext camelContext, String key, Exchange 
exchange) {
+        if (optimistic) {
+            throw new UnsupportedOperationException();
+        }
+        LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe 
manner.", exchange.getExchangeId(), key);
+        RLock lock = redisson.getLock("aggregationLock");
+        try {
             lock.lock();
             DefaultExchangeHolder newHolder = 
DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders);
             DefaultExchangeHolder oldHolder = cache.put(key, newHolder);
@@ -137,9 +134,9 @@ public class RedisAggregationRepository extends 
ServiceSupport
             LOG.trace("Added an Exchange with ID {} for key {} in a 
thread-safe manner.", exchange.getExchangeId(), key);
             lock.unlock();
         }
-       }
+    }
 
-       @Override
+    @Override
     public Set<String> scan(CamelContext camelContext) {
         if (useRecovery) {
             LOG.trace("Scanning for exchanges to recover in {} context", 
camelContext.getName());
@@ -147,7 +144,8 @@ public class RedisAggregationRepository extends 
ServiceSupport
             LOG.trace("Found {} keys for exchanges to recover in {} context", 
scanned.size(), camelContext.getName());
             return scanned;
         } else {
-            LOG.warn("What for to run recovery scans in {} context while 
repository {} is running in non-recoverable aggregation repository mode?!",
+            LOG.warn(
+                    "What for to run recovery scans in {} context while 
repository {} is running in non-recoverable aggregation repository mode?!",
                     camelContext.getName(), mapName);
             return Collections.emptySet();
         }
@@ -159,6 +157,46 @@ public class RedisAggregationRepository extends 
ServiceSupport
         return useRecovery ? unmarshallExchange(camelContext, 
persistedCache.get(exchangeId)) : null;
     }
 
+    public boolean isOptimistic() {
+        return optimistic;
+    }
+
+    public void setOptimistic(boolean optimistic) {
+        this.optimistic = optimistic;
+    }
+
+    public String getEndpoint() {
+        return endpoint;
+    }
+
+    public void setEndpoint(String endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public String getMapName() {
+        return mapName;
+    }
+
+    public void setMapName(String mapName) {
+        this.mapName = mapName;
+    }
+
+    public String getPersistenceMapName() {
+        return persistenceMapName;
+    }
+
+    public void setPersistenceMapName(String persistenceMapName) {
+        this.persistenceMapName = persistenceMapName;
+    }
+
+    public RedissonClient getRedisson() {
+        return redisson;
+    }
+
+    public void setRedisson(RedissonClient redisson) {
+        this.redisson = redisson;
+    }
+
     @Override
     public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
         this.recoveryInterval = timeUnit.toMillis(interval);
@@ -208,7 +246,7 @@ public class RedisAggregationRepository extends 
ServiceSupport
     public Exchange get(CamelContext camelContext, String key) {
         return unmarshallExchange(camelContext, cache.get(key));
     }
-    
+
     /**
      * Checks if the key in question is in the repository.
      * 
@@ -221,7 +259,7 @@ public class RedisAggregationRepository extends 
ServiceSupport
             return false;
         }
     }
-    
+
     public boolean isAllowSerializedHeaders() {
         return allowSerializedHeaders;
     }
@@ -236,7 +274,8 @@ public class RedisAggregationRepository extends 
ServiceSupport
         if (optimistic) {
             LOG.trace("Removing an exchange with ID {} for key {} in an 
optimistic manner.", exchange.getExchangeId(), key);
             if (!cache.remove(key, holder)) {
-                LOG.error("Optimistic locking failed for exchange with key {}: 
IMap#remove removed no Exchanges, while it's expected to remove one.",
+                LOG.warn(
+                        "Optimistic locking failed for exchange with key {}: 
IMap#remove removed no Exchanges, while it's expected to remove one.",
                         key);
                 throw new OptimisticLockingException();
             }
@@ -255,8 +294,8 @@ public class RedisAggregationRepository extends 
ServiceSupport
                 RTransaction transaction = redisson.createTransaction(tOpts);
 
                 try {
-                       RMap<String, DefaultExchangeHolder> tCache = 
transaction.getMap(mapName);
-                       RMap<String, DefaultExchangeHolder> tPersistentCache = 
transaction.getMap(persistenceMapName);
+                    RMap<String, DefaultExchangeHolder> tCache = 
transaction.getMap(mapName);
+                    RMap<String, DefaultExchangeHolder> tPersistentCache = 
transaction.getMap(persistenceMapName);
 
                     DefaultExchangeHolder removedHolder = tCache.remove(key);
                     LOG.trace("Putting an exchange with ID {} for key {} into 
a recoverable storage in a thread-safe manner.",
@@ -264,14 +303,16 @@ public class RedisAggregationRepository extends 
ServiceSupport
                     tPersistentCache.put(exchange.getExchangeId(), 
removedHolder);
 
                     transaction.commit();
-                    LOG.trace("Removed an exchange with ID {} for key {} in a 
thread-safe manner.", exchange.getExchangeId(), key);
+                    LOG.trace("Removed an exchange with ID {} for key {} in a 
thread-safe manner.", exchange.getExchangeId(),
+                            key);
                     LOG.trace("Put an exchange with ID {} for key {} into a 
recoverable storage in a thread-safe manner.",
                             exchange.getExchangeId(), key);
                 } catch (Throwable throwable) {
-                       transaction.rollback();
+                    transaction.rollback();
 
-                    final String msg = String.format("Transaction was rolled 
back for remove operation with a key %s and an Exchange ID %s.",
-                               key, exchange.getExchangeId());
+                    final String msg = String.format(
+                            "Transaction was rolled back for remove operation 
with a key %s and an Exchange ID %s.",
+                            key, exchange.getExchangeId());
                     LOG.warn(msg, throwable);
                     throw new RuntimeException(msg, throwable);
                 }
@@ -294,42 +335,41 @@ public class RedisAggregationRepository extends 
ServiceSupport
         return Collections.unmodifiableSet(cache.keySet());
     }
 
-    /**
-     * @return Persistent repository {@link IMap} name;
-     */
     public String getPersistentRepositoryName() {
         return persistenceMapName;
     }
-       
-       @PostConstruct
-       void init() throws Exception {
-               if (maximumRedeliveries < 0) {
-                       throw new IllegalArgumentException("Maximum redelivery 
retries must be zero or a positive integer.");
-               }
-               if (recoveryInterval < 0) {
-                       throw new IllegalArgumentException("Recovery interval 
must be zero or a positive integer.");
-               }
-               
-       }
-
-       @Override
-       protected void doStart() throws Exception {
-               StringHelper.notEmpty(mapName, "repositoryName");
-               Config config = new Config();
-               
config.useSingleServer().setAddress(String.format("redis://%s",endpoint));
-               redisson = Redisson.create(config);
-               cache = redisson.getMap(mapName);
-               if (useRecovery) {
+
+    @Override
+    protected void doInit() throws Exception {
+        StringHelper.notEmpty(mapName, "repositoryName");
+        if (maximumRedeliveries < 0) {
+            throw new IllegalArgumentException("Maximum redelivery retries 
must be zero or a positive integer.");
+        }
+        if (recoveryInterval < 0) {
+            throw new IllegalArgumentException("Recovery interval must be zero 
or a positive integer.");
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (redisson == null) {
+            Config config = new Config();
+            config.useSingleServer().setAddress(String.format("redis://%s", 
endpoint));
+            redisson = Redisson.create(config);
+            shutdownRedisson = true;
+        }
+        cache = redisson.getMap(mapName);
+        if (useRecovery) {
             persistedCache = redisson.getMap(persistenceMapName);
         }
-       }
-
-       @Override
-       protected void doStop() throws Exception {
-           if(redisson!=null){
-                       redisson.shutdown();
-           }
-       }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (redisson != null && shutdownRedisson) {
+            redisson.shutdown();
+        }
+    }
 
     protected Exchange unmarshallExchange(CamelContext camelContext, 
DefaultExchangeHolder holder) {
         Exchange exchange = null;
diff --git a/components/camel-redis/src/test/AggregateRedisTest.java 
b/components/camel-redis/src/test/AggregateRedisTest.java
deleted file mode 100644
index 9867708..0000000
--- a/components/camel-redis/src/test/AggregateRedisTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.camel.component.redis;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import 
org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository;
-import org.junit.Test;
-
-/**
- * The ABC example for using the Aggregator EIP.
- * <p/>
- * This example have 4 messages send to the aggregator, by which one
- * message is published which contains the aggregation of message 1,2 and 4
- * as they use the same correlation key.
- * <p/>
- * See the class {@link camelinaction.MyAggregationStrategy} for how the 
messages
- * are actually aggregated together.
- *
- * @see MyAggregationStrategy
- */
-public class AggregateRedisTest extends CamelTestSupport {
-       
-       private String endpoint = System.getProperty("endpoint"); //ip:port
-       
-       @Test
-    public void testABC() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("ABC");
-        template.sendBodyAndHeader("direct:start", "A", "myId", 1);
-        template.sendBodyAndHeader("direct:start", "B", "myId", 1);
-        template.sendBodyAndHeader("direct:start", "F", "myId", 2);
-        template.sendBodyAndHeader("direct:start", "C", "myId", 1);
-        assertMockEndpointsSatisfied();
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-               return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:start")
-                    .log("Sending ${body} with correlation key ${header.myId}")
-                    .aggregate(header("myId"), new MyAggregationStrategy())
-                                       .aggregationRepository(new 
RedisAggregationRepository("aggregation", endpoint))
-                                       .completionSize(3)
-                        .log("Sending out ${body}")
-                        .to("mock:result");
-            }
-        };
-    }
-}
\ No newline at end of file
diff --git 
a/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/AggregateRedisTest.java
 
b/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/AggregateRedisTest.java
new file mode 100644
index 0000000..5629c8d
--- /dev/null
+++ 
b/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/AggregateRedisTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.redis.processor.aggregate;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The ABC example for using the Aggregator EIP.
+ * <p/>
+ * This example have 4 messages send to the aggregator, by which one message 
is published which contains the aggregation
+ * of message 1,2 and 4 as they use the same correlation key.
+ * <p/>
+ */
+@Disabled("Requires manually testing")
+public class AggregateRedisTest extends CamelTestSupport {
+
+    // TODO: use docker test-containers for testing
+
+    private String endpoint = System.getProperty("endpoint"); //ip:port
+
+    @Test
+    public void testABC() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("ABC");
+
+        template.sendBodyAndHeader("direct:start", "A", "myId", 1);
+        template.sendBodyAndHeader("direct:start", "B", "myId", 1);
+        template.sendBodyAndHeader("direct:start", "F", "myId", 2);
+        template.sendBodyAndHeader("direct:start", "C", "myId", 1);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .log("Sending ${body} with correlation key 
${header.myId}")
+                        .aggregate(header("myId"), new MyAggregationStrategy())
+                        .aggregationRepository(new 
RedisAggregationRepository("aggregation", endpoint))
+                        .completionSize(3)
+                        .log("Sending out ${body}")
+                        .to("mock:result");
+            }
+        };
+    }
+}
diff --git a/components/camel-redis/src/test/MyAggregationStrategy.java 
b/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/MyAggregationStrategy.java
similarity index 68%
rename from components/camel-redis/src/test/MyAggregationStrategy.java
rename to 
components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/MyAggregationStrategy.java
index ed650e2..4de8c75 100644
--- a/components/camel-redis/src/test/MyAggregationStrategy.java
+++ 
b/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/MyAggregationStrategy.java
@@ -1,20 +1,21 @@
+package org.apache.camel.component.redis.processor.aggregate;
+
+import org.apache.camel.AggregationStrategy;
 import org.apache.camel.Exchange;
-import org.apache.camel.processor.aggregate.AggregationStrategy;
 
 /**
- * This is the aggregation strategy which is java code for <i>aggregating</i>
- * incoming messages with the existing aggregated message. In other words
- * you use this strategy to <i>merge</i> the messages together.
+ * This is the aggregation strategy which is java code for <i>aggregating</i> 
incoming messages with the existing
+ * aggregated message. In other words you use this strategy to <i>merge</i> 
the messages together.
  */
 public class MyAggregationStrategy implements AggregationStrategy {
 
     /**
      * Aggregates the messages.
      *
-     * @param oldExchange  the existing aggregated message. Is <tt>null</tt> 
the
-     *                     very first time as there are no existing message.
-     * @param newExchange  the incoming message. This is never <tt>null</tt>.
-     * @return the aggregated message.
+     * @param  oldExchange the existing aggregated message. Is <tt>null</tt> 
the very first time as there are no
+     *                     existing message.
+     * @param  newExchange the incoming message. This is never <tt>null</tt>.
+     * @return             the aggregated message.
      */
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         // the first time there are no existing message and therefore
@@ -40,5 +41,5 @@ public class MyAggregationStrategy implements 
AggregationStrategy {
         // and return it
         return oldExchange;
     }
-    
-}
\ No newline at end of file
+
+}
diff --git a/components/camel-redis/src/test/resources/log4j2.properties 
b/components/camel-redis/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..8728a5d
--- /dev/null
+++ b/components/camel-redis/src/test/resources/log4j2.properties
@@ -0,0 +1,28 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.file.type = File
+appender.file.name = file
+appender.file.fileName = target/camel-redis-test.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = file
diff --git a/docs/components/modules/others/nav.adoc 
b/docs/components/modules/others/nav.adoc
index 3a4fed6..514144c 100644
--- a/docs/components/modules/others/nav.adoc
+++ b/docs/components/modules/others/nav.adoc
@@ -22,6 +22,7 @@
 ** xref:platform-http-vertx.adoc[Platform Http Vertx]
 ** xref:reactive-executor-vertx.adoc[Reactive Executor Vert.x]
 ** xref:reactor.adoc[Reactor]
+** xref:redis.adoc[Redis]
 ** xref:resilience4j.adoc[Resilience4j]
 ** xref:ribbon.adoc[Ribbon]
 ** xref:rxjava.adoc[RxJava]
diff --git a/docs/components/modules/others/pages/redis.adoc 
b/docs/components/modules/others/pages/redis.adoc
new file mode 100644
index 0000000..a24c476
--- /dev/null
+++ b/docs/components/modules/others/pages/redis.adoc
@@ -0,0 +1,13 @@
+[[redis-component]]
+= Redis Component
+//THIS FILE IS COPIED: EDIT THE SOURCE FILE:
+:page-source: components/camel-redis/src/main/docs/redis.adoc
+:docTitle: Redis
+:artifactId: camel-redis
+:description: Aggregation repository using Redis as datastore
+:since: 3.5
+:supportLevel: Preview
+
+*Since Camel {since}*
+
+The Redis component provides an `AggregationStrategy` to use Redis as the 
backend datastore.
diff --git a/parent/pom.xml b/parent/pom.xml
index 50ab1ff..444086d 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -2066,7 +2066,7 @@
         <artifactId>camel-reactor</artifactId>
         <version>${project.version}</version>
       </dependency>
-         <dependency>
+      <dependency>
         <groupId>org.apache.camel</groupId>
         <artifactId>camel-redis</artifactId>
         <version>${project.version}</version>

Reply via email to