This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit c9c1c1e85c58a021eb76002d08e6cc16ace879a4 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Aug 26 11:08:19 2020 +0200 CAMEL-15468: camel-redis - Polished and rebuild --- apache-camel/src/main/descriptors/common-bin.xml | 1 + .../org/apache/camel/catalog/docs.properties | 1 + .../org/apache/camel/catalog/docs/redis.adoc | 11 + .../org/apache/camel/catalog/others.properties | 1 + .../org/apache/camel/catalog/others/redis.json | 15 ++ components/camel-redis/pom.xml | 42 ++-- .../services/org/apache/camel/other.properties | 7 + .../camel-redis/src/generated/resources/redis.json | 15 ++ components/camel-redis/src/main/docs/redis.adoc | 11 + .../aggregate/RedisAggregationRepository.java | 254 ++++++++++++--------- .../camel-redis/src/test/AggregateRedisTest.java | 51 ----- .../processor/aggregate/AggregateRedisTest.java | 67 ++++++ .../aggregate}/MyAggregationStrategy.java | 21 +- .../src/test/resources/log4j2.properties | 28 +++ docs/components/modules/others/nav.adoc | 1 + docs/components/modules/others/pages/redis.adoc | 13 ++ parent/pom.xml | 2 +- 17 files changed, 353 insertions(+), 188 deletions(-) diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml index 827a5f9..a69243c 100644 --- a/apache-camel/src/main/descriptors/common-bin.xml +++ b/apache-camel/src/main/descriptors/common-bin.xml @@ -307,6 +307,7 @@ <include>org.apache.camel:camel-reactive-executor-vertx</include> <include>org.apache.camel:camel-reactive-streams</include> <include>org.apache.camel:camel-reactor</include> + <include>org.apache.camel:camel-redis</include> <include>org.apache.camel:camel-ref</include> <include>org.apache.camel:camel-resilience4j</include> <include>org.apache.camel:camel-rest</include> diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties index 88166b3..b70630c 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs.properties @@ -395,6 +395,7 @@ reactive-executor-vertx reactive-streams-component reactor recipientList-eip +redis ref-component ref-language removeHeader-eip diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/redis.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/redis.adoc new file mode 100644 index 0000000..1e81d98 --- /dev/null +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/redis.adoc @@ -0,0 +1,11 @@ +[[redis-component]] += Redis Component +:docTitle: Redis +:artifactId: camel-redis +:description: Aggregation repository using Redis as datastore +:since: 3.5 +:supportLevel: Preview + +*Since Camel {since}* + +The Redis component provides an `AggregationStrategy` to use Redis as the backend datastore. diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties index f4a9fc2..93ceff4 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others.properties @@ -17,6 +17,7 @@ opentracing platform-http-vertx reactive-executor-vertx reactor +redis resilience4j ribbon rxjava diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/redis.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/redis.json new file mode 100644 index 0000000..6bbfb91 --- /dev/null +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/others/redis.json @@ -0,0 +1,15 @@ +{ + "other": { + "kind": "other", + "name": "redis", + "title": "Redis", + "description": "Aggregation repository using Redis as datastore", + "deprecated": false, + "firstVersion": "3.5.0", + "label": "database", + "supportLevel": "Preview", + "groupId": "org.apache.camel", + "artifactId": "camel-redis", + "version": "3.5.0-SNAPSHOT" + } +} diff --git a/components/camel-redis/pom.xml b/components/camel-redis/pom.xml index 700269e..50d73cb 100644 --- a/components/camel-redis/pom.xml +++ b/components/camel-redis/pom.xml @@ -17,7 +17,8 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -30,35 +31,38 @@ <artifactId>camel-redis</artifactId> <packaging>jar</packaging> <name>Camel :: Redis</name> - <description>Camel Redis support</description> + <description>Aggregation repository using Redis as datastore</description> + + <properties> + <firstVersion>3.5.0</firstVersion> + <label>database</label> + </properties> <dependencies> <dependency> - <groupId>org.redisson</groupId> - <artifactId>redisson</artifactId> - <version>${redisson-version}</version> + <groupId>org.apache.camel</groupId> + <artifactId>camel-support</artifactId> + </dependency> + + <dependency> + <groupId>org.redisson</groupId> + <artifactId>redisson</artifactId> + <version>${redisson-version}</version> </dependency> + <!-- testing --> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-test-spring-junit5</artifactId> + <artifactId>camel-test-junit5</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> <scope>test</scope> </dependency> </dependencies> - <build> - <plugins> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <systemPropertyVariables> - <endpoint>${endpoint}</endpoint> - </systemPropertyVariables> - </configuration> - </plugin> - </plugins> - </build> - </project> diff --git a/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/other.properties b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/other.properties new file mode 100644 index 0000000..fc3636e --- /dev/null +++ b/components/camel-redis/src/generated/resources/META-INF/services/org/apache/camel/other.properties @@ -0,0 +1,7 @@ +# Generated by camel build tools - do NOT edit this file! +name=redis +groupId=org.apache.camel +artifactId=camel-redis +version=3.5.0-SNAPSHOT +projectName=Camel :: Redis +projectDescription=Aggregation repository using Redis as datastore diff --git a/components/camel-redis/src/generated/resources/redis.json b/components/camel-redis/src/generated/resources/redis.json new file mode 100644 index 0000000..6bbfb91 --- /dev/null +++ b/components/camel-redis/src/generated/resources/redis.json @@ -0,0 +1,15 @@ +{ + "other": { + "kind": "other", + "name": "redis", + "title": "Redis", + "description": "Aggregation repository using Redis as datastore", + "deprecated": false, + "firstVersion": "3.5.0", + "label": "database", + "supportLevel": "Preview", + "groupId": "org.apache.camel", + "artifactId": "camel-redis", + "version": "3.5.0-SNAPSHOT" + } +} diff --git a/components/camel-redis/src/main/docs/redis.adoc b/components/camel-redis/src/main/docs/redis.adoc new file mode 100644 index 0000000..1e81d98 --- /dev/null +++ b/components/camel-redis/src/main/docs/redis.adoc @@ -0,0 +1,11 @@ +[[redis-component]] += Redis Component +:docTitle: Redis +:artifactId: camel-redis +:description: Aggregation repository using Redis as datastore +:since: 3.5 +:supportLevel: Preview + +*Since Camel {since}* + +The Redis component provides an `AggregationStrategy` to use Redis as the backend datastore. diff --git a/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java b/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java index 4b5ab34..49934b4 100644 --- a/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java +++ b/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java @@ -14,27 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.redis.processor.aggregate; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.util.Collections; import java.util.Map; import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.spi.OptimisticLockingAggregationRepository; import org.apache.camel.spi.RecoverableAggregationRepository; -import org.apache.camel.spi.OptimisticLockingAggregationRepository.OptimisticLockingException; import org.apache.camel.support.DefaultExchange; import org.apache.camel.support.DefaultExchangeHolder; import org.apache.camel.support.service.ServiceSupport; @@ -49,52 +39,57 @@ import org.redisson.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * {@link org.apache.camel.spi.AggregationRepository} using Redis as store. + */ public class RedisAggregationRepository extends ServiceSupport - implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository { - private static final Logger LOG = LoggerFactory.getLogger(RedisAggregationRepository.class); - private static final String COMPLETED_SUFFIX = "-completed"; + implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository { + private static final Logger LOG = LoggerFactory.getLogger(RedisAggregationRepository.class); + private static final String COMPLETED_SUFFIX = "-completed"; - private boolean optimistic; - private boolean useRecovery = true; - private Map<String, DefaultExchangeHolder> cache; + private boolean optimistic; + private boolean useRecovery = true; + private Map<String, DefaultExchangeHolder> cache; private Map<String, DefaultExchangeHolder> persistedCache; - private String endpoint; - private String mapName; - private String persistenceMapName; - private RedissonClient redisson; - private String deadLetterChannel; - private long recoveryInterval = 5000; - private int maximumRedeliveries = 3; - private boolean allowSerializedHeaders; - - public RedisAggregationRepository(final String mapName, final String endpoint) { - this.mapName = mapName; - this.persistenceMapName = String.format("%s%s", mapName, COMPLETED_SUFFIX); - this.optimistic = false; - this.endpoint = endpoint; - } - - public RedisAggregationRepository(final String mapName, final String persistenceMapName, - final String endpoint) { - this.mapName = mapName; - this.persistenceMapName = persistenceMapName; - this.optimistic = false; - this.endpoint = endpoint; - } - - public RedisAggregationRepository(final String mapName, final String endpoint, boolean optimistic) { - this(mapName, endpoint); - this.optimistic = optimistic; - } - - public RedisAggregationRepository(final String mapName, final String persistenceMapName, - final String endpoint, boolean optimistic) { - this(mapName, persistenceMapName, endpoint); - this.optimistic = optimistic; - } - - @Override - public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) throws OptimisticLockingException { + private String endpoint; + private String mapName; + private String persistenceMapName; + private RedissonClient redisson; + private boolean shutdownRedisson; + private String deadLetterChannel; + private long recoveryInterval = 5000; + private int maximumRedeliveries = 3; + private boolean allowSerializedHeaders; + + public RedisAggregationRepository(final String mapName, final String endpoint) { + this.mapName = mapName; + this.persistenceMapName = String.format("%s%s", mapName, COMPLETED_SUFFIX); + this.optimistic = false; + this.endpoint = endpoint; + } + + public RedisAggregationRepository(final String mapName, final String persistenceMapName, + final String endpoint) { + this.mapName = mapName; + this.persistenceMapName = persistenceMapName; + this.optimistic = false; + this.endpoint = endpoint; + } + + public RedisAggregationRepository(final String mapName, final String endpoint, boolean optimistic) { + this(mapName, endpoint); + this.optimistic = optimistic; + } + + public RedisAggregationRepository(final String mapName, final String persistenceMapName, + final String endpoint, boolean optimistic) { + this(mapName, persistenceMapName, endpoint); + this.optimistic = optimistic; + } + + @Override + public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) + throws OptimisticLockingException { if (!optimistic) { throw new UnsupportedOperationException(); } @@ -104,15 +99,17 @@ public class RedisAggregationRepository extends ServiceSupport final DefaultExchangeHolder misbehaviorHolder = cache.putIfAbsent(key, holder); if (misbehaviorHolder != null) { Exchange misbehaviorEx = unmarshallExchange(camelContext, misbehaviorHolder); - LOG.error("Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned", + LOG.warn( + "Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned", key, misbehaviorEx != null ? misbehaviorEx.getExchangeId() : "<null>"); - throw new OptimisticLockingException(); + throw new OptimisticLockingException(); } } else { DefaultExchangeHolder oldHolder = DefaultExchangeHolder.marshal(oldExchange, true, allowSerializedHeaders); DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders); if (!cache.replace(key, oldHolder, newHolder)) { - LOG.error("Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one", + LOG.warn( + "Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one", key); throw new OptimisticLockingException(); } @@ -121,14 +118,14 @@ public class RedisAggregationRepository extends ServiceSupport return oldExchange; } - @Override - public Exchange add(CamelContext camelContext, String key, Exchange exchange) { - if (optimistic) { - throw new UnsupportedOperationException(); - } - LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); - RLock lock = redisson.getLock("aggregationLock"); - try { + @Override + public Exchange add(CamelContext camelContext, String key, Exchange exchange) { + if (optimistic) { + throw new UnsupportedOperationException(); + } + LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); + RLock lock = redisson.getLock("aggregationLock"); + try { lock.lock(); DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); DefaultExchangeHolder oldHolder = cache.put(key, newHolder); @@ -137,9 +134,9 @@ public class RedisAggregationRepository extends ServiceSupport LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); lock.unlock(); } - } + } - @Override + @Override public Set<String> scan(CamelContext camelContext) { if (useRecovery) { LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName()); @@ -147,7 +144,8 @@ public class RedisAggregationRepository extends ServiceSupport LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), camelContext.getName()); return scanned; } else { - LOG.warn("What for to run recovery scans in {} context while repository {} is running in non-recoverable aggregation repository mode?!", + LOG.warn( + "What for to run recovery scans in {} context while repository {} is running in non-recoverable aggregation repository mode?!", camelContext.getName(), mapName); return Collections.emptySet(); } @@ -159,6 +157,46 @@ public class RedisAggregationRepository extends ServiceSupport return useRecovery ? unmarshallExchange(camelContext, persistedCache.get(exchangeId)) : null; } + public boolean isOptimistic() { + return optimistic; + } + + public void setOptimistic(boolean optimistic) { + this.optimistic = optimistic; + } + + public String getEndpoint() { + return endpoint; + } + + public void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } + + public String getMapName() { + return mapName; + } + + public void setMapName(String mapName) { + this.mapName = mapName; + } + + public String getPersistenceMapName() { + return persistenceMapName; + } + + public void setPersistenceMapName(String persistenceMapName) { + this.persistenceMapName = persistenceMapName; + } + + public RedissonClient getRedisson() { + return redisson; + } + + public void setRedisson(RedissonClient redisson) { + this.redisson = redisson; + } + @Override public void setRecoveryInterval(long interval, TimeUnit timeUnit) { this.recoveryInterval = timeUnit.toMillis(interval); @@ -208,7 +246,7 @@ public class RedisAggregationRepository extends ServiceSupport public Exchange get(CamelContext camelContext, String key) { return unmarshallExchange(camelContext, cache.get(key)); } - + /** * Checks if the key in question is in the repository. * @@ -221,7 +259,7 @@ public class RedisAggregationRepository extends ServiceSupport return false; } } - + public boolean isAllowSerializedHeaders() { return allowSerializedHeaders; } @@ -236,7 +274,8 @@ public class RedisAggregationRepository extends ServiceSupport if (optimistic) { LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), key); if (!cache.remove(key, holder)) { - LOG.error("Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.", + LOG.warn( + "Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.", key); throw new OptimisticLockingException(); } @@ -255,8 +294,8 @@ public class RedisAggregationRepository extends ServiceSupport RTransaction transaction = redisson.createTransaction(tOpts); try { - RMap<String, DefaultExchangeHolder> tCache = transaction.getMap(mapName); - RMap<String, DefaultExchangeHolder> tPersistentCache = transaction.getMap(persistenceMapName); + RMap<String, DefaultExchangeHolder> tCache = transaction.getMap(mapName); + RMap<String, DefaultExchangeHolder> tPersistentCache = transaction.getMap(persistenceMapName); DefaultExchangeHolder removedHolder = tCache.remove(key); LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", @@ -264,14 +303,16 @@ public class RedisAggregationRepository extends ServiceSupport tPersistentCache.put(exchange.getExchangeId(), removedHolder); transaction.commit(); - LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); + LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), + key); LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", exchange.getExchangeId(), key); } catch (Throwable throwable) { - transaction.rollback(); + transaction.rollback(); - final String msg = String.format("Transaction was rolled back for remove operation with a key %s and an Exchange ID %s.", - key, exchange.getExchangeId()); + final String msg = String.format( + "Transaction was rolled back for remove operation with a key %s and an Exchange ID %s.", + key, exchange.getExchangeId()); LOG.warn(msg, throwable); throw new RuntimeException(msg, throwable); } @@ -294,42 +335,41 @@ public class RedisAggregationRepository extends ServiceSupport return Collections.unmodifiableSet(cache.keySet()); } - /** - * @return Persistent repository {@link IMap} name; - */ public String getPersistentRepositoryName() { return persistenceMapName; } - - @PostConstruct - void init() throws Exception { - if (maximumRedeliveries < 0) { - throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer."); - } - if (recoveryInterval < 0) { - throw new IllegalArgumentException("Recovery interval must be zero or a positive integer."); - } - - } - - @Override - protected void doStart() throws Exception { - StringHelper.notEmpty(mapName, "repositoryName"); - Config config = new Config(); - config.useSingleServer().setAddress(String.format("redis://%s",endpoint)); - redisson = Redisson.create(config); - cache = redisson.getMap(mapName); - if (useRecovery) { + + @Override + protected void doInit() throws Exception { + StringHelper.notEmpty(mapName, "repositoryName"); + if (maximumRedeliveries < 0) { + throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer."); + } + if (recoveryInterval < 0) { + throw new IllegalArgumentException("Recovery interval must be zero or a positive integer."); + } + } + + @Override + protected void doStart() throws Exception { + if (redisson == null) { + Config config = new Config(); + config.useSingleServer().setAddress(String.format("redis://%s", endpoint)); + redisson = Redisson.create(config); + shutdownRedisson = true; + } + cache = redisson.getMap(mapName); + if (useRecovery) { persistedCache = redisson.getMap(persistenceMapName); } - } - - @Override - protected void doStop() throws Exception { - if(redisson!=null){ - redisson.shutdown(); - } - } + } + + @Override + protected void doStop() throws Exception { + if (redisson != null && shutdownRedisson) { + redisson.shutdown(); + } + } protected Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) { Exchange exchange = null; diff --git a/components/camel-redis/src/test/AggregateRedisTest.java b/components/camel-redis/src/test/AggregateRedisTest.java deleted file mode 100644 index 9867708..0000000 --- a/components/camel-redis/src/test/AggregateRedisTest.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.apache.camel.component.redis; - -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.junit4.CamelTestSupport; -import org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository; -import org.junit.Test; - -/** - * The ABC example for using the Aggregator EIP. - * <p/> - * This example have 4 messages send to the aggregator, by which one - * message is published which contains the aggregation of message 1,2 and 4 - * as they use the same correlation key. - * <p/> - * See the class {@link camelinaction.MyAggregationStrategy} for how the messages - * are actually aggregated together. - * - * @see MyAggregationStrategy - */ -public class AggregateRedisTest extends CamelTestSupport { - - private String endpoint = System.getProperty("endpoint"); //ip:port - - @Test - public void testABC() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("ABC"); - template.sendBodyAndHeader("direct:start", "A", "myId", 1); - template.sendBodyAndHeader("direct:start", "B", "myId", 1); - template.sendBodyAndHeader("direct:start", "F", "myId", 2); - template.sendBodyAndHeader("direct:start", "C", "myId", 1); - assertMockEndpointsSatisfied(); - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:start") - .log("Sending ${body} with correlation key ${header.myId}") - .aggregate(header("myId"), new MyAggregationStrategy()) - .aggregationRepository(new RedisAggregationRepository("aggregation", endpoint)) - .completionSize(3) - .log("Sending out ${body}") - .to("mock:result"); - } - }; - } -} \ No newline at end of file diff --git a/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/AggregateRedisTest.java b/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/AggregateRedisTest.java new file mode 100644 index 0000000..5629c8d --- /dev/null +++ b/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/AggregateRedisTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.redis.processor.aggregate; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +/** + * The ABC example for using the Aggregator EIP. + * <p/> + * This example have 4 messages send to the aggregator, by which one message is published which contains the aggregation + * of message 1,2 and 4 as they use the same correlation key. + * <p/> + */ +@Disabled("Requires manually testing") +public class AggregateRedisTest extends CamelTestSupport { + + // TODO: use docker test-containers for testing + + private String endpoint = System.getProperty("endpoint"); //ip:port + + @Test + public void testABC() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("ABC"); + + template.sendBodyAndHeader("direct:start", "A", "myId", 1); + template.sendBodyAndHeader("direct:start", "B", "myId", 1); + template.sendBodyAndHeader("direct:start", "F", "myId", 2); + template.sendBodyAndHeader("direct:start", "C", "myId", 1); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .log("Sending ${body} with correlation key ${header.myId}") + .aggregate(header("myId"), new MyAggregationStrategy()) + .aggregationRepository(new RedisAggregationRepository("aggregation", endpoint)) + .completionSize(3) + .log("Sending out ${body}") + .to("mock:result"); + } + }; + } +} diff --git a/components/camel-redis/src/test/MyAggregationStrategy.java b/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/MyAggregationStrategy.java similarity index 68% rename from components/camel-redis/src/test/MyAggregationStrategy.java rename to components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/MyAggregationStrategy.java index ed650e2..4de8c75 100644 --- a/components/camel-redis/src/test/MyAggregationStrategy.java +++ b/components/camel-redis/src/test/java/org/apache/camel/component/redis/processor/aggregate/MyAggregationStrategy.java @@ -1,20 +1,21 @@ +package org.apache.camel.component.redis.processor.aggregate; + +import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; -import org.apache.camel.processor.aggregate.AggregationStrategy; /** - * This is the aggregation strategy which is java code for <i>aggregating</i> - * incoming messages with the existing aggregated message. In other words - * you use this strategy to <i>merge</i> the messages together. + * This is the aggregation strategy which is java code for <i>aggregating</i> incoming messages with the existing + * aggregated message. In other words you use this strategy to <i>merge</i> the messages together. */ public class MyAggregationStrategy implements AggregationStrategy { /** * Aggregates the messages. * - * @param oldExchange the existing aggregated message. Is <tt>null</tt> the - * very first time as there are no existing message. - * @param newExchange the incoming message. This is never <tt>null</tt>. - * @return the aggregated message. + * @param oldExchange the existing aggregated message. Is <tt>null</tt> the very first time as there are no + * existing message. + * @param newExchange the incoming message. This is never <tt>null</tt>. + * @return the aggregated message. */ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { // the first time there are no existing message and therefore @@ -40,5 +41,5 @@ public class MyAggregationStrategy implements AggregationStrategy { // and return it return oldExchange; } - -} \ No newline at end of file + +} diff --git a/components/camel-redis/src/test/resources/log4j2.properties b/components/camel-redis/src/test/resources/log4j2.properties new file mode 100644 index 0000000..8728a5d --- /dev/null +++ b/components/camel-redis/src/test/resources/log4j2.properties @@ -0,0 +1,28 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +appender.file.type = File +appender.file.name = file +appender.file.fileName = target/camel-redis-test.log +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +appender.out.type = Console +appender.out.name = out +appender.out.layout.type = PatternLayout +appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +rootLogger.level = INFO +rootLogger.appenderRef.file.ref = file diff --git a/docs/components/modules/others/nav.adoc b/docs/components/modules/others/nav.adoc index 3a4fed6..514144c 100644 --- a/docs/components/modules/others/nav.adoc +++ b/docs/components/modules/others/nav.adoc @@ -22,6 +22,7 @@ ** xref:platform-http-vertx.adoc[Platform Http Vertx] ** xref:reactive-executor-vertx.adoc[Reactive Executor Vert.x] ** xref:reactor.adoc[Reactor] +** xref:redis.adoc[Redis] ** xref:resilience4j.adoc[Resilience4j] ** xref:ribbon.adoc[Ribbon] ** xref:rxjava.adoc[RxJava] diff --git a/docs/components/modules/others/pages/redis.adoc b/docs/components/modules/others/pages/redis.adoc new file mode 100644 index 0000000..a24c476 --- /dev/null +++ b/docs/components/modules/others/pages/redis.adoc @@ -0,0 +1,13 @@ +[[redis-component]] += Redis Component +//THIS FILE IS COPIED: EDIT THE SOURCE FILE: +:page-source: components/camel-redis/src/main/docs/redis.adoc +:docTitle: Redis +:artifactId: camel-redis +:description: Aggregation repository using Redis as datastore +:since: 3.5 +:supportLevel: Preview + +*Since Camel {since}* + +The Redis component provides an `AggregationStrategy` to use Redis as the backend datastore. diff --git a/parent/pom.xml b/parent/pom.xml index 50ab1ff..444086d 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -2066,7 +2066,7 @@ <artifactId>camel-reactor</artifactId> <version>${project.version}</version> </dependency> - <dependency> + <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-redis</artifactId> <version>${project.version}</version>
