This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0d754837c6d531b7f350ca1d789685e6a4da6522 Author: malogan82 <[email protected]> AuthorDate: Sat Jul 11 22:26:56 2020 +0200 Added camel-redis component and RedisAggregationRepository using Redisson Added camel-redis component and RedisAggregationRepository using Redisson Added test class and fixed some bugs Modified test class Modified test class Modified RedissonAggregationRepository --- bom/camel-bom/pom.xml | 5 + camel-dependencies/pom.xml | 1 + components/camel-redis/pom.xml | 64 ++++ .../aggregate/RedisAggregationRepository.java | 342 +++++++++++++++++++++ .../camel-redis/src/test/AggregateRedisTest.java | 51 +++ .../src/test/MyAggregationStrategy.java | 44 +++ components/pom.xml | 1 + parent/pom.xml | 6 + 8 files changed, 514 insertions(+) diff --git a/bom/camel-bom/pom.xml b/bom/camel-bom/pom.xml index 0118577..21320c9 100644 --- a/bom/camel-bom/pom.xml +++ b/bom/camel-bom/pom.xml @@ -1460,6 +1460,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-redis</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-ref</artifactId> <version>${project.version}</version> </dependency> diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml index 92859c5..e66d598 100644 --- a/camel-dependencies/pom.xml +++ b/camel-dependencies/pom.xml @@ -489,6 +489,7 @@ <rdf4j-rio-version>2.4.4</rdf4j-rio-version> <reactive-streams-version>1.0.3</reactive-streams-version> <reactor-version>3.2.16.RELEASE</reactor-version> + <redisson-version>3.13.2</redisson-version> <rescu-version>2.0.2</rescu-version> <resilience4j-version>1.5.0</resilience4j-version> <rest-assured-version>4.3.1</rest-assured-version> diff --git a/components/camel-redis/pom.xml b/components/camel-redis/pom.xml new file mode 100644 index 0000000..700269e --- /dev/null +++ b/components/camel-redis/pom.xml @@ -0,0 +1,64 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>3.5.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-redis</artifactId> + <packaging>jar</packaging> + <name>Camel :: Redis</name> + <description>Camel Redis support</description> + + <dependencies> + + <dependency> + <groupId>org.redisson</groupId> + <artifactId>redisson</artifactId> + <version>${redisson-version}</version> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-spring-junit5</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <endpoint>${endpoint}</endpoint> + </systemPropertyVariables> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java b/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java new file mode 100644 index 0000000..4b5ab34 --- /dev/null +++ b/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.redis.processor.aggregate; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.spi.OptimisticLockingAggregationRepository; +import org.apache.camel.spi.RecoverableAggregationRepository; +import org.apache.camel.spi.OptimisticLockingAggregationRepository.OptimisticLockingException; +import org.apache.camel.support.DefaultExchange; +import org.apache.camel.support.DefaultExchangeHolder; +import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.util.StringHelper; +import org.redisson.Redisson; +import org.redisson.api.RLock; +import org.redisson.api.RMap; +import org.redisson.api.RTransaction; +import org.redisson.api.RedissonClient; +import org.redisson.api.TransactionOptions; +import org.redisson.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RedisAggregationRepository extends ServiceSupport + implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository { + private static final Logger LOG = LoggerFactory.getLogger(RedisAggregationRepository.class); + private static final String COMPLETED_SUFFIX = "-completed"; + + private boolean optimistic; + private boolean useRecovery = true; + private Map<String, DefaultExchangeHolder> cache; + private Map<String, DefaultExchangeHolder> persistedCache; + private String endpoint; + private String mapName; + private String persistenceMapName; + private RedissonClient redisson; + private String deadLetterChannel; + private long recoveryInterval = 5000; + private int maximumRedeliveries = 3; + private boolean allowSerializedHeaders; + + public RedisAggregationRepository(final String mapName, final String endpoint) { + this.mapName = mapName; + this.persistenceMapName = String.format("%s%s", mapName, COMPLETED_SUFFIX); + this.optimistic = false; + this.endpoint = endpoint; + } + + public RedisAggregationRepository(final String mapName, final String persistenceMapName, + final String endpoint) { + this.mapName = mapName; + this.persistenceMapName = persistenceMapName; + this.optimistic = false; + this.endpoint = endpoint; + } + + public RedisAggregationRepository(final String mapName, final String endpoint, boolean optimistic) { + this(mapName, endpoint); + this.optimistic = optimistic; + } + + public RedisAggregationRepository(final String mapName, final String persistenceMapName, + final String endpoint, boolean optimistic) { + this(mapName, persistenceMapName, endpoint); + this.optimistic = optimistic; + } + + @Override + public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) throws OptimisticLockingException { + if (!optimistic) { + throw new UnsupportedOperationException(); + } + LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", newExchange.getExchangeId(), key); + if (oldExchange == null) { + DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders); + final DefaultExchangeHolder misbehaviorHolder = cache.putIfAbsent(key, holder); + if (misbehaviorHolder != null) { + Exchange misbehaviorEx = unmarshallExchange(camelContext, misbehaviorHolder); + LOG.error("Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned", + key, misbehaviorEx != null ? misbehaviorEx.getExchangeId() : "<null>"); + throw new OptimisticLockingException(); + } + } else { + DefaultExchangeHolder oldHolder = DefaultExchangeHolder.marshal(oldExchange, true, allowSerializedHeaders); + DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders); + if (!cache.replace(key, oldHolder, newHolder)) { + LOG.error("Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one", + key); + throw new OptimisticLockingException(); + } + } + LOG.trace("Added an Exchange with ID {} for key {} in optimistic manner.", newExchange.getExchangeId(), key); + return oldExchange; + } + + @Override + public Exchange add(CamelContext camelContext, String key, Exchange exchange) { + if (optimistic) { + throw new UnsupportedOperationException(); + } + LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); + RLock lock = redisson.getLock("aggregationLock"); + try { + lock.lock(); + DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); + DefaultExchangeHolder oldHolder = cache.put(key, newHolder); + return unmarshallExchange(camelContext, oldHolder); + } finally { + LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); + lock.unlock(); + } + } + + @Override + public Set<String> scan(CamelContext camelContext) { + if (useRecovery) { + LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName()); + Set<String> scanned = Collections.unmodifiableSet(persistedCache.keySet()); + LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), camelContext.getName()); + return scanned; + } else { + LOG.warn("What for to run recovery scans in {} context while repository {} is running in non-recoverable aggregation repository mode?!", + camelContext.getName(), mapName); + return Collections.emptySet(); + } + } + + @Override + public Exchange recover(CamelContext camelContext, String exchangeId) { + LOG.trace("Recovering an Exchange with ID {}.", exchangeId); + return useRecovery ? unmarshallExchange(camelContext, persistedCache.get(exchangeId)) : null; + } + + @Override + public void setRecoveryInterval(long interval, TimeUnit timeUnit) { + this.recoveryInterval = timeUnit.toMillis(interval); + } + + @Override + public void setRecoveryInterval(long interval) { + this.recoveryInterval = interval; + } + + @Override + public long getRecoveryIntervalInMillis() { + return recoveryInterval; + } + + @Override + public void setUseRecovery(boolean useRecovery) { + this.useRecovery = useRecovery; + } + + @Override + public boolean isUseRecovery() { + return useRecovery; + } + + @Override + public void setDeadLetterUri(String deadLetterUri) { + this.deadLetterChannel = deadLetterUri; + } + + @Override + public String getDeadLetterUri() { + return deadLetterChannel; + } + + @Override + public void setMaximumRedeliveries(int maximumRedeliveries) { + this.maximumRedeliveries = maximumRedeliveries; + } + + @Override + public int getMaximumRedeliveries() { + return maximumRedeliveries; + } + + @Override + public Exchange get(CamelContext camelContext, String key) { + return unmarshallExchange(camelContext, cache.get(key)); + } + + /** + * Checks if the key in question is in the repository. + * + * @param key Object - key in question + */ + public boolean containsKey(Object key) { + if (cache != null) { + return cache.containsKey(key); + } else { + return false; + } + } + + public boolean isAllowSerializedHeaders() { + return allowSerializedHeaders; + } + + public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { + this.allowSerializedHeaders = allowSerializedHeaders; + } + + @Override + public void remove(CamelContext camelContext, String key, Exchange exchange) { + DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); + if (optimistic) { + LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), key); + if (!cache.remove(key, holder)) { + LOG.error("Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.", + key); + throw new OptimisticLockingException(); + } + LOG.trace("Removed an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), key); + if (useRecovery) { + LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", + exchange.getExchangeId(), key); + persistedCache.put(exchange.getExchangeId(), holder); + LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", + exchange.getExchangeId(), key); + } + } else { + if (useRecovery) { + LOG.trace("Removing an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); + TransactionOptions tOpts = TransactionOptions.defaults(); + RTransaction transaction = redisson.createTransaction(tOpts); + + try { + RMap<String, DefaultExchangeHolder> tCache = transaction.getMap(mapName); + RMap<String, DefaultExchangeHolder> tPersistentCache = transaction.getMap(persistenceMapName); + + DefaultExchangeHolder removedHolder = tCache.remove(key); + LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", + exchange.getExchangeId(), key); + tPersistentCache.put(exchange.getExchangeId(), removedHolder); + + transaction.commit(); + LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); + LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", + exchange.getExchangeId(), key); + } catch (Throwable throwable) { + transaction.rollback(); + + final String msg = String.format("Transaction was rolled back for remove operation with a key %s and an Exchange ID %s.", + key, exchange.getExchangeId()); + LOG.warn(msg, throwable); + throw new RuntimeException(msg, throwable); + } + } else { + cache.remove(key); + } + } + } + + @Override + public void confirm(CamelContext camelContext, String exchangeId) { + LOG.trace("Confirming an exchange with ID {}.", exchangeId); + if (useRecovery) { + persistedCache.remove(exchangeId); + } + } + + @Override + public Set<String> getKeys() { + return Collections.unmodifiableSet(cache.keySet()); + } + + /** + * @return Persistent repository {@link IMap} name; + */ + public String getPersistentRepositoryName() { + return persistenceMapName; + } + + @PostConstruct + void init() throws Exception { + if (maximumRedeliveries < 0) { + throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer."); + } + if (recoveryInterval < 0) { + throw new IllegalArgumentException("Recovery interval must be zero or a positive integer."); + } + + } + + @Override + protected void doStart() throws Exception { + StringHelper.notEmpty(mapName, "repositoryName"); + Config config = new Config(); + config.useSingleServer().setAddress(String.format("redis://%s",endpoint)); + redisson = Redisson.create(config); + cache = redisson.getMap(mapName); + if (useRecovery) { + persistedCache = redisson.getMap(persistenceMapName); + } + } + + @Override + protected void doStop() throws Exception { + if(redisson!=null){ + redisson.shutdown(); + } + } + + protected Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) { + Exchange exchange = null; + if (holder != null) { + exchange = new DefaultExchange(camelContext); + DefaultExchangeHolder.unmarshal(exchange, holder); + } + return exchange; + } +} diff --git a/components/camel-redis/src/test/AggregateRedisTest.java b/components/camel-redis/src/test/AggregateRedisTest.java new file mode 100644 index 0000000..9867708 --- /dev/null +++ b/components/camel-redis/src/test/AggregateRedisTest.java @@ -0,0 +1,51 @@ +package org.apache.camel.component.redis; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.camel.component.redis.processor.aggregate.RedisAggregationRepository; +import org.junit.Test; + +/** + * The ABC example for using the Aggregator EIP. + * <p/> + * This example have 4 messages send to the aggregator, by which one + * message is published which contains the aggregation of message 1,2 and 4 + * as they use the same correlation key. + * <p/> + * See the class {@link camelinaction.MyAggregationStrategy} for how the messages + * are actually aggregated together. + * + * @see MyAggregationStrategy + */ +public class AggregateRedisTest extends CamelTestSupport { + + private String endpoint = System.getProperty("endpoint"); //ip:port + + @Test + public void testABC() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("ABC"); + template.sendBodyAndHeader("direct:start", "A", "myId", 1); + template.sendBodyAndHeader("direct:start", "B", "myId", 1); + template.sendBodyAndHeader("direct:start", "F", "myId", 2); + template.sendBodyAndHeader("direct:start", "C", "myId", 1); + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .log("Sending ${body} with correlation key ${header.myId}") + .aggregate(header("myId"), new MyAggregationStrategy()) + .aggregationRepository(new RedisAggregationRepository("aggregation", endpoint)) + .completionSize(3) + .log("Sending out ${body}") + .to("mock:result"); + } + }; + } +} \ No newline at end of file diff --git a/components/camel-redis/src/test/MyAggregationStrategy.java b/components/camel-redis/src/test/MyAggregationStrategy.java new file mode 100644 index 0000000..ed650e2 --- /dev/null +++ b/components/camel-redis/src/test/MyAggregationStrategy.java @@ -0,0 +1,44 @@ +import org.apache.camel.Exchange; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * This is the aggregation strategy which is java code for <i>aggregating</i> + * incoming messages with the existing aggregated message. In other words + * you use this strategy to <i>merge</i> the messages together. + */ +public class MyAggregationStrategy implements AggregationStrategy { + + /** + * Aggregates the messages. + * + * @param oldExchange the existing aggregated message. Is <tt>null</tt> the + * very first time as there are no existing message. + * @param newExchange the incoming message. This is never <tt>null</tt>. + * @return the aggregated message. + */ + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + // the first time there are no existing message and therefore + // the oldExchange is null. In these cases we just return + // the newExchange + if (oldExchange == null) { + return newExchange; + } + + // now we have both an existing message (oldExchange) + // and a incoming message (newExchange) + // we want to merge together. + + // in this example we add their bodies + String oldBody = oldExchange.getIn().getBody(String.class).trim(); + String newBody = newExchange.getIn().getBody(String.class).trim(); + + // the body should be the two bodies added together + String body = oldBody + newBody; + + // update the existing message with the added body + oldExchange.getIn().setBody(body); + // and return it + return oldExchange; + } + +} \ No newline at end of file diff --git a/components/pom.xml b/components/pom.xml index b438cb5..5d89fe7 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -315,6 +315,7 @@ <module>camel-reactive-executor-vertx</module> <module>camel-reactive-streams</module> <module>camel-reactor</module> + <module>camel-redis</module> <module>camel-resilience4j</module> <module>camel-resteasy</module> <module>camel-rest-swagger</module> diff --git a/parent/pom.xml b/parent/pom.xml index 3fb163b..50ab1ff 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -477,6 +477,7 @@ <rdf4j-model-version>2.4.4</rdf4j-model-version> <reactive-streams-version>1.0.3</reactive-streams-version> <reactor-version>3.2.16.RELEASE</reactor-version> + <redisson-version>3.13.2</redisson-version> <rescu-version>2.0.2</rescu-version> <resilience4j-version>1.5.0</resilience4j-version> <rest-assured-version>4.3.1</rest-assured-version> @@ -2065,6 +2066,11 @@ <artifactId>camel-reactor</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-redis</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ref</artifactId>
