This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git
The following commit(s) were added to refs/heads/main by this push:
new 23787071 CAMEL-18677: allow setting the cache on the configuration
builder
23787071 is described below
commit 237870719af4b790f5875156e94b16d76e0e2df9
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Thu Nov 3 18:22:07 2022 +0100
CAMEL-18677: allow setting the cache on the configuration builder
---
.../example/resume/aws/kinesis/main/KinesisRoute.java | 3 +--
.../example/resume/cassandra/main/CassandraRoute.java | 15 +++++----------
.../camel/example/resume/cassandra/main/MainApp.java | 6 ++++--
.../strategies/kafka/file/LargeFileRouteBuilder.java | 6 +++---
.../kafka/fileset/LargeDirectoryRouteBuilder.java | 4 +---
.../strategies/ClusterizedLargeDirectoryRouteBuilder.java | 6 +++---
6 files changed, 17 insertions(+), 23 deletions(-)
diff --git
a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java
b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java
index cd097042..a7e05c26 100644
---
a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java
+++
b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java
@@ -55,13 +55,12 @@ public class KinesisRoute extends RouteBuilder {
@Override
public void configure() {
- bindToRegistry(ResumeCache.DEFAULT_NAME, resumeCache);
bindToRegistry("amazonKinesisClient", client);
String kinesisEndpointUri =
"aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient";
fromF(kinesisEndpointUri, streamName)
-
.resumable().configuration(KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder())
+
.resumable().configuration(KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder().withResumeCache(resumeCache))
.process(this::addResumeOffset);
}
}
diff --git
a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraRoute.java
b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraRoute.java
index 3f90c7b4..1bf1c25e 100644
---
a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraRoute.java
+++
b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraRoute.java
@@ -23,9 +23,8 @@ import java.util.concurrent.CountDownLatch;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.cassandra.CassandraConstants;
+import
org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
import org.apache.camel.resume.ResumeAction;
-import org.apache.camel.resume.ResumeStrategy;
-import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.support.resume.Resumables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,15 +33,13 @@ public class CassandraRoute extends RouteBuilder {
private static final Logger LOG =
LoggerFactory.getLogger(CassandraRoute.class);
private final CountDownLatch latch;
private final int batchSize;
- private final ResumeStrategy resumeStrategy;
- private final ResumeCache<?> resumeCache;
private final CassandraClient client;
+ private final KafkaResumeStrategyConfigurationBuilder configurationBuilder;
- public CassandraRoute(CountDownLatch latch, int batchSize, ResumeStrategy
resumeStrategy, ResumeCache<?> resumeCache, CassandraClient client) {
+ public CassandraRoute(CountDownLatch latch, int batchSize,
KafkaResumeStrategyConfigurationBuilder configurationBuilder, CassandraClient
client) {
this.latch = latch;
this.batchSize = batchSize;
- this.resumeStrategy = resumeStrategy;
- this.resumeCache = resumeCache;
+ this.configurationBuilder = configurationBuilder;
this.client = client;
}
@@ -78,15 +75,13 @@ public class CassandraRoute extends RouteBuilder {
@Override
public void configure() {
- bindToRegistry(ResumeStrategy.DEFAULT_NAME, resumeStrategy);
- bindToRegistry(ResumeCache.DEFAULT_NAME, resumeCache);
bindToRegistry(CassandraConstants.CASSANDRA_RESUME_ACTION, new
CustomResumeAction(client.newExampleDao()));
fromF("cql:{{cassandra.host}}:{{cassandra.cql3.port}}/camel_ks?cql=%s&resultSetConversionStrategy=#class:%s",
ExampleDao.getSelectStatement(batchSize),
ExampleResultSetConversionStrategy.class.getName())
.split(body()) // We receive a list of records so, for each
.resumable()
- .resumeStrategy(ResumeStrategy.DEFAULT_NAME)
+ .configuration(configurationBuilder)
.intermittent(true) // Set to ignore empty data sets that
will generate exchanges w/ no offset information
.process(this::addResumeInfo);
diff --git
a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/MainApp.java
b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/MainApp.java
index 6606b3c6..6718d38f 100644
---
a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/MainApp.java
+++
b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/MainApp.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import org.apache.camel.component.caffeine.resume.CaffeineCache;
import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
import org.apache.camel.main.Main;
+import
org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
public class MainApp {
@@ -47,7 +48,7 @@ public class MainApp {
}
// Normal code path for consuming from Cassandra
- SingleNodeKafkaResumeStrategy resumeStrategy =
KafkaUtil.getDefaultStrategy();
+ final KafkaResumeStrategyConfigurationBuilder configurationBuilder =
KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder();
Main main = new Main();
@@ -57,7 +58,8 @@ public class MainApp {
try (CassandraClient client = new CassandraClient(host, port)) {
- main.configure().addRoutesBuilder(new CassandraRoute(latch,
batchSize, resumeStrategy, new CaffeineCache<>(10240), client));
+ configurationBuilder.withResumeCache(new CaffeineCache<>(10240));
+ main.configure().addRoutesBuilder(new CassandraRoute(latch,
batchSize, configurationBuilder, client));
main.start();
}
}
diff --git
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
index 0ad7e275..b3c24e40 100644
---
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
+++
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
@@ -85,9 +85,9 @@ public class LargeFileRouteBuilder extends RouteBuilder {
public void configure() {
producerTemplate = getContext().createProducerTemplate();
- getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, cache);
-
- final KafkaResumeStrategyConfigurationBuilder
defaultKafkaResumeStrategyConfigurationBuilder =
KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder();
+ final KafkaResumeStrategyConfigurationBuilder
defaultKafkaResumeStrategyConfigurationBuilder = KafkaUtil
+ .getDefaultKafkaResumeStrategyConfigurationBuilder()
+ .withResumeCache(cache);
from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
.routeId("largeFileRoute")
diff --git
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
index c85b2ca3..4482171e 100644
---
a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
+++
b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
@@ -63,10 +63,8 @@ public class LargeDirectoryRouteBuilder extends RouteBuilder
{
* Let's configure the Camel routing rules using Java code...
*/
public void configure() {
- getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, cache);
-
from("file:{{input.dir}}?noop=true&recursive=true")
- .resumable().configuration(resumeStrategyConfigurationBuilder)
+
.resumable().configuration(resumeStrategyConfigurationBuilder.withResumeCache(cache))
.process(this::process)
.to("file:{{output.dir}}");
}
diff --git
a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
index 407c9105..4c57da1f 100644
---
a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
+++
b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
@@ -49,9 +49,9 @@ public class ClusterizedLargeDirectoryRouteBuilder extends
RouteBuilder {
* Let's configure the Camel routing rules using Java code...
*/
public void configure() {
- getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, new
CaffeineCache<>(10000));
-
- final KafkaResumeStrategyConfigurationBuilder
defaultKafkaResumeStrategyConfigurationBuilder =
KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder();
+ final KafkaResumeStrategyConfigurationBuilder
defaultKafkaResumeStrategyConfigurationBuilder = KafkaUtil
+ .getDefaultKafkaResumeStrategyConfigurationBuilder()
+ .withResumeCache(new CaffeineCache<>(10000));
from("timer:heartbeat?period=10000")
.routeId("heartbeat")