This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 46e003c436c CAMEL-20840: fixed incomplete/incorrect implementation of
resume adapter
46e003c436c is described below
commit 46e003c436c6d90fa765403a3b512ed1c73c0168
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Jun 11 11:26:40 2024 +0200
CAMEL-20840: fixed incomplete/incorrect implementation of resume adapter
- removed invalid test
- adjust Kinesis resume adapter to use the ResumeActionAware
- simplify Kinesis resume adapter
---
.../camel/catalog/components/aws2-kinesis.json | 3 +-
.../camel/component/aws2/kinesis/aws2-kinesis.json | 3 +-
.../services/org/apache/camel/adapter-factory | 2 -
.../component/aws2/kinesis/Kinesis2Constants.java | 3 +
.../component/aws2/kinesis/Kinesis2Consumer.java | 25 ++++-
.../consumer/KinesisDefaultResumeAdapter.java | 81 ----------------
.../aws2/kinesis/consumer/KinesisResumeAction.java | 68 +++++++++++++
.../kinesis/consumer/KinesisResumeAdapter.java | 33 -------
.../KinesisConsumerResumeAfterRestartIT.java | 106 ---------------------
.../integration/KinesisConsumerResumeIT.java | 37 +++----
.../support/resume/ResumeActionAwareAdapter.java | 4 +
.../dsl/Kinesis2EndpointBuilderFactory.java | 12 +++
12 files changed, 134 insertions(+), 243 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
index 8f1a0065ed7..2c45648f61e 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
@@ -58,7 +58,8 @@
"CamelAwsKinesisApproximateArrivalTimestamp": { "index": 1, "kind":
"header", "displayName": "", "group": "common", "label": "", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The time AWS assigned as the arrival
time of the record.", "constantName":
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#APPROX_ARRIVAL_TIME"
},
"CamelAwsKinesisPartitionKey": { "index": 2, "kind": "header",
"displayName": "", "group": "common", "label": "", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Identifies which shard in the stream
the data record is assigned to.", "constantName":
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#PARTITION_KEY" },
"CamelMessageTimestamp": { "index": 3, "kind": "header", "displayName":
"", "group": "common", "label": "", "required": false, "javaType": "long",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The timestamp of the message", "constantName":
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#MESSAGE_TIMESTAMP" },
- "CamelAwsKinesisShardId": { "index": 4, "kind": "header", "displayName":
"", "group": "common", "label": "", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The shard ID of the shard where the data record was
placed.", "constantName":
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" }
+ "CamelKinesisDbResumeAction": { "index": 4, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The resume action to execute when
resuming.", "constantName":
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RESUME_ACTION" },
+ "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName":
"", "group": "common", "label": "", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The shard ID of the shard where the data record was
placed.", "constantName":
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" }
},
"properties": {
"streamName": { "index": 0, "kind": "path", "displayName": "Stream Name",
"group": "common", "label": "", "required": true, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Name of the stream" },
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
index 8f1a0065ed7..2c45648f61e 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
+++
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
@@ -58,7 +58,8 @@
"CamelAwsKinesisApproximateArrivalTimestamp": { "index": 1, "kind":
"header", "displayName": "", "group": "common", "label": "", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The time AWS assigned as the arrival
time of the record.", "constantName":
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#APPROX_ARRIVAL_TIME"
},
"CamelAwsKinesisPartitionKey": { "index": 2, "kind": "header",
"displayName": "", "group": "common", "label": "", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Identifies which shard in the stream
the data record is assigned to.", "constantName":
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#PARTITION_KEY" },
"CamelMessageTimestamp": { "index": 3, "kind": "header", "displayName":
"", "group": "common", "label": "", "required": false, "javaType": "long",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The timestamp of the message", "constantName":
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#MESSAGE_TIMESTAMP" },
- "CamelAwsKinesisShardId": { "index": 4, "kind": "header", "displayName":
"", "group": "common", "label": "", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The shard ID of the shard where the data record was
placed.", "constantName":
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" }
+ "CamelKinesisDbResumeAction": { "index": 4, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The resume action to execute when
resuming.", "constantName":
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RESUME_ACTION" },
+ "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName":
"", "group": "common", "label": "", "required": false, "javaType": "String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "The shard ID of the shard where the data record was
placed.", "constantName":
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" }
},
"properties": {
"streamName": { "index": 0, "kind": "path", "displayName": "Stream Name",
"group": "common", "label": "", "required": true, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration",
"configurationField": "configuration", "description": "Name of the stream" },
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/services/org/apache/camel/adapter-factory
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/services/org/apache/camel/adapter-factory
deleted file mode 100644
index 0e7c6193aee..00000000000
---
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/services/org/apache/camel/adapter-factory
+++ /dev/null
@@ -1,2 +0,0 @@
-# Generated by camel build tools - do NOT edit this file!
-class=org.apache.camel.component.aws2.kinesis.consumer.KinesisDefaultResumeAdapter
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
index 62abfc4d5d3..78b6db63a1d 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
@@ -32,6 +32,9 @@ public interface Kinesis2Constants {
@Metadata(description = "The timestamp of the message", javaType = "long")
String MESSAGE_TIMESTAMP = Exchange.MESSAGE_TIMESTAMP;
+ @Metadata(label = "consumer", description = "The resume action to execute
when resuming.", javaType = "String")
+ String RESUME_ACTION = "CamelKinesisDbResumeAction";
+
/**
* in a Kinesis Record object, the shard ID is used on writes to indicate
where the data was stored
*/
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 9c5c86eb0d6..2b94b9da034 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -29,7 +29,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAction;
+import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.ResumeActionAware;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
@@ -287,14 +289,31 @@ public class Kinesis2Consumer extends
ScheduledBatchPollingConsumer implements R
return;
}
- KinesisResumeAdapter adapter =
resumeStrategy.getAdapter(KinesisResumeAdapter.class);
+ ResumeActionAware adapter =
resumeStrategy.getAdapter(ResumeActionAware.class);
if (adapter == null) {
LOG.warn("There is a resume strategy setup, but no adapter
configured or the type is incorrect");
return;
}
- adapter.configureGetShardIteratorRequest(req,
getEndpoint().getConfiguration().getStreamName(), shardId);
+ final ResumeAction action = resolveResumeAction(shardId, req);
+ adapter.setResumeAction(action);
+ adapter.resume();
+ }
+
+ private KinesisResumeAction resolveResumeAction(String shardId,
GetShardIteratorRequest.Builder req) {
+ KinesisResumeAction action
+ =
getEndpoint().getCamelContext().getRegistry().lookupByNameAndType(Kinesis2Constants.RESUME_ACTION,
+ KinesisResumeAction.class);
+ if (action == null) {
+ action = new KinesisResumeAction(req);
+ } else {
+ action.setBuilder(req);
+ }
+
+ action.setShardId(shardId);
+ action.setStreamName(getEndpoint().getConfiguration().getStreamName());
+ return action;
}
private Queue<Exchange> createExchanges(Shard shard, List<Record> records)
{
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java
deleted file mode 100644
index 8a1c47a2cf3..00000000000
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisDefaultResumeAdapter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.aws2.kinesis.consumer;
-
-import org.apache.camel.resume.Cacheable;
-import org.apache.camel.resume.Offset;
-import org.apache.camel.resume.OffsetKey;
-import org.apache.camel.resume.ResumeAdapter;
-import org.apache.camel.resume.cache.ResumeCache;
-import org.apache.camel.spi.annotations.JdkService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-
-@JdkService(ResumeAdapter.RESUME_ADAPTER_FACTORY)
-public class KinesisDefaultResumeAdapter implements KinesisResumeAdapter,
Cacheable {
- private static final Logger LOG =
LoggerFactory.getLogger(KinesisDefaultResumeAdapter.class);
-
- private ResumeCache<String> cache;
-
- @Override
- public void resume() {
- throw new UnsupportedOperationException();
- }
-
- private void add(Object key, Object offset) {
- KinesisOffset ko = (KinesisOffset) cache.computeIfAbsent((String) key,
k -> new KinesisOffset());
-
- ko.update((String) offset);
- }
-
- @Override
- public boolean add(OffsetKey<?> key, Offset<?> offset) {
- add(key.getValue(), offset.getValue());
-
- return true;
- }
-
- @Override
- public void setCache(ResumeCache<?> cache) {
- this.cache = (ResumeCache<String>) cache;
- }
-
- @Override
- public ResumeCache<?> getCache() {
- return cache;
- }
-
- @Override
- public void
configureGetShardIteratorRequest(GetShardIteratorRequest.Builder builder,
String streamName, String shardId) {
- KinesisOffset offset = cache.get(shardId, KinesisOffset.class);
-
- if (offset == null) {
- LOG.info("There is no offset for the stream {}", streamName);
- return;
- }
-
- final String sequenceNumber = offset.getValue();
- LOG.info("Resuming from offset {} for key {}", sequenceNumber,
streamName);
-
- builder.shardId(shardId);
- builder.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
- builder.startingSequenceNumber(sequenceNumber);
- }
-}
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAction.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAction.java
new file mode 100644
index 00000000000..11dcab57ae7
--- /dev/null
+++
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.aws2.kinesis.consumer;
+
+import org.apache.camel.resume.ResumeAction;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+public class KinesisResumeAction implements ResumeAction {
+
+ private GetShardIteratorRequest.Builder builder;
+ private String streamName;
+ private String shardId;
+
+ public KinesisResumeAction() {
+ }
+
+ public KinesisResumeAction(GetShardIteratorRequest.Builder builder) {
+ this.builder = builder;
+ }
+
+ public void setBuilder(GetShardIteratorRequest.Builder builder) {
+ this.builder = builder;
+ }
+
+ protected GetShardIteratorRequest.Builder getBuilder() {
+ return builder;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public void setStreamName(String streamName) {
+ this.streamName = streamName;
+ }
+
+ public String getShardId() {
+ return shardId;
+ }
+
+ public void setShardId(String shardId) {
+ this.shardId = shardId;
+ }
+
+ @Override
+ public boolean evalEntry(Object shardId, Object sequenceNumber) {
+ builder.shardId((String) shardId);
+ builder.shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+ builder.startingSequenceNumber((String) sequenceNumber);
+ return false;
+ }
+}
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
deleted file mode 100644
index a9f031f1866..00000000000
---
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.aws2.kinesis.consumer;
-
-import org.apache.camel.resume.ResumeAdapter;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
-
-/**
- * The resume adapter for Kinesis
- */
-public interface KinesisResumeAdapter extends ResumeAdapter {
- /*
- When consuming from multiple shards the KinesisResumeAdapter is
potentially accessed by multiple threads.
- To avoid any concurrency issues the configuration of the
GetShardIteratorRequest should be done in one operation
- and not using multiple calls like in the previous version of this
interface
- */
- void configureGetShardIteratorRequest(GetShardIteratorRequest.Builder
builder, String streamName, String shardId);
-}
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeAfterRestartIT.java
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeAfterRestartIT.java
deleted file mode 100644
index 41a9b10efbf..00000000000
---
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeAfterRestartIT.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.integration;
-
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.camel.builder.RouteBuilder;
-import
org.apache.camel.component.aws2.kinesis.consumer.KinesisConsumerOffsetProcessor;
-import
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategyConfiguration;
-import org.apache.camel.processor.resume.TransientResumeStrategy;
-import org.apache.camel.resume.cache.ResumeCache;
-import org.apache.camel.test.infra.aws.common.services.AWSService;
-import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
-import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.camel.test.junit5.CamelTestSupport;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import software.amazon.awssdk.core.SdkBytes;
-import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
-
-import static
org.apache.camel.test.infra.aws2.clients.KinesisUtils.createStream;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class KinesisConsumerResumeAfterRestartIT extends CamelTestSupport {
-
- @RegisterExtension
- public static AWSService awsService =
AWSServiceFactory.createSingletonKinesisService();
- private KinesisClient client;
-
- String streamName = "my-stream";
-
- List<String> receivedMessages = new CopyOnWriteArrayList<>();
-
- ResumeCache resumeCache = TransientResumeStrategy.createSimpleCache();
-
- @Override
- protected RouteBuilder createRouteBuilder() {
- client = AWSSDKClientUtils.newKinesisClient();
-
- context.getRegistry().bind("amazonKinesisClient",
AWSSDKClientUtils.newKinesisClient());
-
- return new RouteBuilder() {
- @Override
- public void configure() {
- String kinesisEndpointUri =
"aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient";
-
- fromF(kinesisEndpointUri, streamName)
- // commenting out the strategy will cause the test to
fail as event "First" will be consumed twice
-
.resumable().configuration(KinesisResumeStrategyConfiguration.builder().withResumeCache(resumeCache))
- .process(new KinesisConsumerOffsetProcessor())
- .process(exchange ->
receivedMessages.add(exchange.getMessage().getBody(String.class)));
- }
- };
- }
-
- @BeforeEach
- public void prepareEnvironment() {
- createStream(client, streamName);
- }
-
- private void sendEvent(String payload) {
-
client.putRecord(PutRecordRequest.builder().streamName(streamName).partitionKey("my-key")
- .data(SdkBytes.fromUtf8String(payload)).build());
- }
-
- @Test
- void shouldResumeConsumptionAfterRestart() {
-
- sendEvent("First");
- Awaitility.await().until(() -> receivedMessages.contains("First"));
-
- restartContext();
-
- sendEvent("Second");
- Awaitility.await().until(() -> receivedMessages.contains("Second"));
-
- assertEquals(2, receivedMessages.size());
- }
-
- private void restartContext() {
- context.stop();
-
- // stop also seems to close the kinesis client, therefor we need to
provide a new one
- context.getRegistry().bind("amazonKinesisClient",
AWSSDKClientUtils.newKinesisClient());
-
- context.start();
- }
-}
diff --git
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
index 1868e7ad251..ce34c29bee4 100644
---
a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
+++
b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
@@ -25,9 +25,11 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAction;
+import
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategyConfiguration;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
@@ -76,29 +78,26 @@ public class KinesisConsumerResumeIT extends
CamelTestSupport {
}
}
- private static final class TestKinesisResumeAdapter implements
KinesisResumeAdapter {
+ private static final class TestResumeAction extends KinesisResumeAction {
private List<PutRecordsResponse> previousRecords;
private final int expectedCount;
- private GetShardIteratorRequest.Builder builder;
- private TestKinesisResumeAdapter(int expectedCount) {
+ private TestResumeAction(int expectedCount) {
this.expectedCount = expectedCount;
}
- @Override
- public void resume() {
- }
-
public void setPreviousRecords(List<PutRecordsResponse>
previousRecords) {
this.previousRecords = previousRecords;
}
+ public int getExpectedCount() {
+ return expectedCount;
+ }
+
@Override
- public void configureGetShardIteratorRequest(
- GetShardIteratorRequest.Builder builder, String streamName,
String shardId) {
+ public boolean evalEntry(Object shardId, Object sequenceNumber) {
+ final GetShardIteratorRequest.Builder builder = super.getBuilder();
ObjectHelper.notNull(builder, "builder");
- ObjectHelper.notNull(streamName, "streamName");
- ObjectHelper.notNull(shardId, "shardId");
LOG.debug("Waiting for data");
Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() ->
!previousRecords.isEmpty());
@@ -109,6 +108,7 @@ public class KinesisConsumerResumeIT extends
CamelTestSupport {
builder.startingSequenceNumber(putRecordsResultEntry.sequenceNumber());
builder.shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+ return false;
}
}
@@ -126,7 +126,7 @@ public class KinesisConsumerResumeIT extends
CamelTestSupport {
private final int expectedCount = messageCount / 2;
private List<KinesisData> receivedMessages = new CopyOnWriteArrayList<>();
private List<PutRecordsResponse> previousRecords;
- private TestKinesisResumeAdapter adapter = new
TestKinesisResumeAdapter(expectedCount);
+ private TestResumeAction action = new TestResumeAction(expectedCount);
@Override
protected RouteBuilder createRouteBuilder() {
@@ -137,11 +137,17 @@ public class KinesisConsumerResumeIT extends
CamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() {
- bindToRegistry("testResumeStrategy", new
TransientResumeStrategy(adapter));
+ final ResumeCache<Object> simpleCache =
TransientResumeStrategy.createSimpleCache();
+ final
KinesisResumeStrategyConfiguration.KinesisResumeStrategyConfigurationBuilder
resumeConfigurationBuilder
+ = KinesisResumeStrategyConfiguration.builder()
+ .withResumeCache(simpleCache);
+
+ bindToRegistry(Kinesis2Constants.RESUME_ACTION, action);
String kinesisEndpointUri =
"aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient";
fromF(kinesisEndpointUri, streamName)
+ .resumable().configuration(resumeConfigurationBuilder)
.process(exchange -> {
KinesisData data = new KinesisData();
final Message message = exchange.getMessage();
@@ -153,7 +159,6 @@ public class KinesisConsumerResumeIT extends
CamelTestSupport {
receivedMessages.add(data);
})
- .resumable("testResumeStrategy")
.to("mock:result");
}
};
@@ -172,7 +177,7 @@ public class KinesisConsumerResumeIT extends
CamelTestSupport {
}
}
- adapter.setPreviousRecords(previousRecords);
+ action.setPreviousRecords(previousRecords);
}
@AfterEach
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
index 563f64c8889..a273c30ee1d 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/resume/ResumeActionAwareAdapter.java
@@ -78,4 +78,8 @@ public class ResumeActionAwareAdapter implements
ResumeActionAware, Cacheable, D
return add(key, value);
}
+
+ protected ResumeAction getResumeAction() {
+ return resumeAction;
+ }
}
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
index 5188a698bd9..f11004307de 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
@@ -2332,6 +2332,18 @@ public interface Kinesis2EndpointBuilderFactory {
public String messageTimestamp() {
return "CamelMessageTimestamp";
}
+ /**
+ * The resume action to execute when resuming.
+ *
+ * The option is a: {@code String} type.
+ *
+ * Group: consumer
+ *
+ * @return the name of the header {@code KinesisDbResumeAction}.
+ */
+ public String kinesisDbResumeAction() {
+ return "CamelKinesisDbResumeAction";
+ }
/**
* The shard ID of the shard where the data record was placed.
*