This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 58c074d CAMEL-17045: refactored Kafka's consumer strategy
58c074d is described below
commit 58c074d8f9098ac354fda52923785b08f2048e86
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Oct 4 15:13:50 2021 +0200
CAMEL-17045: refactored Kafka's consumer strategy
This allows it to be reusable for other components
---
.../org/apache/camel/catalog/components/kafka.json | 4 ++--
.../component/kafka/KafkaComponentConfigurer.java | 4 ++--
.../component/kafka/KafkaEndpointConfigurer.java | 4 ++--
.../org/apache/camel/component/kafka/kafka.json | 4 ++--
.../camel/component/kafka/KafkaConfiguration.java | 10 +++++-----
...trategy.java => KafkaConsumerResumeStrategy.java} | 3 ++-
...y.java => OffsetKafkaConsumerResumeStrategy.java} | 6 +++---
.../support/PartitionAssignmentListener.java | 2 +-
.../consumer/support/ResumeStrategyFactory.java | 12 ++++++------
...va => SeekPolicyKafkaConsumerResumeStrategy.java} | 6 +++---
...T.java => KafkaConsumerWithResumeStrategyIT.java} | 12 ++++++------
.../main/java/org/apache/camel}/ResumeStrategy.java | 16 +++++++---------
.../component/dsl/KafkaComponentBuilderFactory.java | 14 +++++++-------
.../endpoint/dsl/KafkaEndpointBuilderFactory.java | 20 ++++++++++----------
14 files changed, 58 insertions(+), 59 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index c044147..473b8d8 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -55,7 +55,7 @@
"partitionAssignor": { "kind": "property", "displayName": "Partition
Assignor", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "defaultValue":
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The class name of the partition assignme [...]
"pollOnError": { "kind": "property", "displayName": "Poll On Error",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "de [...]
"pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms",
"group": "consumer", "label": "consumer", "required": false, "type":
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "5000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used when polling the
KafkaConsumer." },
- "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy",
"deprecated": false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This option allows the user to set a custom
resume strategy. The [...]
+ "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType":
"org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy",
"deprecated": false, "autowired": false, "secret": false,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "This option allows the
user to set a custom resume s [...]
"seekTo": { "kind": "property", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Set if KafkaConsumer will read from beginning
or end on startup: beginning : read from beginning [...]
"sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout
Ms", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "10000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used to detect failures when using
Kafka's group management facilities." },
"specificAvroReader": { "kind": "property", "displayName": "Specific Avro
Reader", "group": "consumer", "label": "confluent,consumer", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This enables the use of a specific Avro reader
for use with the Confluent Platf [...]
@@ -161,7 +161,7 @@
"partitionAssignor": { "kind": "parameter", "displayName": "Partition
Assignor", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "defaultValue":
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The class name of the partition assignm [...]
"pollOnError": { "kind": "parameter", "displayName": "Poll On Error",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "d [...]
"pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms",
"group": "consumer", "label": "consumer", "required": false, "type":
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "5000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used when polling the
KafkaConsumer." },
- "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy",
"deprecated": false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This option allows the user to set a custom
resume strategy. The [...]
+ "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType":
"org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy",
"deprecated": false, "autowired": false, "secret": false,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "This option allows the
user to set a custom resume [...]
"seekTo": { "kind": "parameter", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Set if KafkaConsumer will read from beginning
or end on startup: beginning : read from beginning [...]
"sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout
Ms", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "10000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used to detect failures when using
Kafka's group management facilities." },
"specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro
Reader", "group": "consumer", "label": "confluent,consumer", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This enables the use of a specific Avro reader
for use with the Confluent Plat [...]
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 53926fa..3353bdd 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -161,7 +161,7 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "requesttimeoutms":
case "requestTimeoutMs":
getOrCreateConfiguration(target).setRequestTimeoutMs(property(camelContext,
java.lang.Integer.class, value)); return true;
case "resumestrategy":
- case "resumeStrategy":
getOrCreateConfiguration(target).setResumeStrategy(property(camelContext,
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class,
value)); return true;
+ case "resumeStrategy":
getOrCreateConfiguration(target).setResumeStrategy(property(camelContext,
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy.class,
value)); return true;
case "retries":
getOrCreateConfiguration(target).setRetries(property(camelContext,
java.lang.Integer.class, value)); return true;
case "retrybackoffms":
case "retryBackoffMs":
getOrCreateConfiguration(target).setRetryBackoffMs(property(camelContext,
java.lang.Integer.class, value)); return true;
@@ -375,7 +375,7 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "requesttimeoutms":
case "requestTimeoutMs": return java.lang.Integer.class;
case "resumestrategy":
- case "resumeStrategy": return
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class;
+ case "resumeStrategy": return
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy.class;
case "retries": return java.lang.Integer.class;
case "retrybackoffms":
case "retryBackoffMs": return java.lang.Integer.class;
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index 467b99b..911bc7c 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -149,7 +149,7 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "requesttimeoutms":
case "requestTimeoutMs":
target.getConfiguration().setRequestTimeoutMs(property(camelContext,
java.lang.Integer.class, value)); return true;
case "resumestrategy":
- case "resumeStrategy":
target.getConfiguration().setResumeStrategy(property(camelContext,
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class,
value)); return true;
+ case "resumeStrategy":
target.getConfiguration().setResumeStrategy(property(camelContext,
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy.class,
value)); return true;
case "retries":
target.getConfiguration().setRetries(property(camelContext,
java.lang.Integer.class, value)); return true;
case "retrybackoffms":
case "retryBackoffMs":
target.getConfiguration().setRetryBackoffMs(property(camelContext,
java.lang.Integer.class, value)); return true;
@@ -351,7 +351,7 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "requesttimeoutms":
case "requestTimeoutMs": return java.lang.Integer.class;
case "resumestrategy":
- case "resumeStrategy": return
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class;
+ case "resumeStrategy": return
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy.class;
case "retries": return java.lang.Integer.class;
case "retrybackoffms":
case "retryBackoffMs": return java.lang.Integer.class;
diff --git
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index c044147..473b8d8 100644
---
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -55,7 +55,7 @@
"partitionAssignor": { "kind": "property", "displayName": "Partition
Assignor", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "defaultValue":
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The class name of the partition assignme [...]
"pollOnError": { "kind": "property", "displayName": "Poll On Error",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "de [...]
"pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms",
"group": "consumer", "label": "consumer", "required": false, "type":
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "5000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used when polling the
KafkaConsumer." },
- "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy",
"deprecated": false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This option allows the user to set a custom
resume strategy. The [...]
+ "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType":
"org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy",
"deprecated": false, "autowired": false, "secret": false,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "This option allows the
user to set a custom resume s [...]
"seekTo": { "kind": "property", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Set if KafkaConsumer will read from beginning
or end on startup: beginning : read from beginning [...]
"sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout
Ms", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "10000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used to detect failures when using
Kafka's group management facilities." },
"specificAvroReader": { "kind": "property", "displayName": "Specific Avro
Reader", "group": "consumer", "label": "confluent,consumer", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This enables the use of a specific Avro reader
for use with the Confluent Platf [...]
@@ -161,7 +161,7 @@
"partitionAssignor": { "kind": "parameter", "displayName": "Partition
Assignor", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "defaultValue":
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The class name of the partition assignm [...]
"pollOnError": { "kind": "parameter", "displayName": "Poll On Error",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "d [...]
"pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms",
"group": "consumer", "label": "consumer", "required": false, "type":
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "5000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used when polling the
KafkaConsumer." },
- "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy",
"deprecated": false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This option allows the user to set a custom
resume strategy. The [...]
+ "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType":
"org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy",
"deprecated": false, "autowired": false, "secret": false,
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "This option allows the
user to set a custom resume [...]
"seekTo": { "kind": "parameter", "displayName": "Seek To", "group":
"consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Set if KafkaConsumer will read from beginning
or end on startup: beginning : read from beginning [...]
"sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout
Ms", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "10000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The timeout used to detect failures when using
Kafka's group management facilities." },
"specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro
Reader", "group": "consumer", "label": "confluent,consumer", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "This enables the use of a specific Avro reader
for use with the Confluent Plat [...]
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index b357ed9..16f35e0 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -26,7 +26,7 @@ import java.util.stream.Collectors;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
+import
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -149,7 +149,7 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
private Long commitTimeoutMs = 5000L;
@UriParam(label = "consumer")
- private ResumeStrategy resumeStrategy;
+ private KafkaConsumerResumeStrategy resumeStrategy;
// Producer configuration properties
@UriParam(label = "producer", defaultValue =
KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
@@ -801,7 +801,7 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
this.breakOnFirstError = breakOnFirstError;
}
- public ResumeStrategy getResumeStrategy() {
+ public KafkaConsumerResumeStrategy getResumeStrategy() {
return resumeStrategy;
}
@@ -810,14 +810,14 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
* assigned (i.e.: when connecting or reconnecting). It allows
implementations to customize how to resume operations
* and serve as more flexible alternative to the seekTo and the
offsetRepository mechanisms.
*
- * See the {@link ResumeStrategy} for implementation details.
+ * See the {@link KafkaConsumerResumeStrategy} for implementation details.
*
* This option does not affect the auto commit setting. It is likely that
implementations using this setting will
* also want to evaluate using the manual commit option along with this.
*
* @param resumeStrategy An instance of the resume strategy
*/
- public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+ public void setResumeStrategy(KafkaConsumerResumeStrategy resumeStrategy) {
this.resumeStrategy = resumeStrategy;
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
similarity index 92%
copy from
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
copy to
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
index ebe7ed6..d85579b 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
@@ -17,13 +17,14 @@
package org.apache.camel.component.kafka.consumer.support;
+import org.apache.camel.ResumeStrategy;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* Defines a strategy for handling resume operations. Implementations can
define different ways to handle how to resume
* processing records.
*/
-public interface ResumeStrategy {
+public interface KafkaConsumerResumeStrategy extends
ResumeStrategy<KafkaConsumer<?, ?>> {
/**
* Perform the resume operation. This runs in the scope of the Kafka
Consumer thread and may run concurrently with
* other consumer instances when the component is set up to use more than
one of them. As such, implementations are
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
similarity index 91%
rename from
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
rename to
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
index 9788f27..7c13eb1 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
@@ -31,12 +31,12 @@ import static
org.apache.camel.component.kafka.consumer.support.KafkaRecordProce
/**
* A resume strategy that uses Kafka's offset for resuming
*/
-public class OffsetResumeStrategy implements ResumeStrategy {
- private static final Logger LOG =
LoggerFactory.getLogger(OffsetResumeStrategy.class);
+public class OffsetKafkaConsumerResumeStrategy implements
KafkaConsumerResumeStrategy {
+ private static final Logger LOG =
LoggerFactory.getLogger(OffsetKafkaConsumerResumeStrategy.class);
private final StateRepository<String, String> offsetRepository;
- public OffsetResumeStrategy(StateRepository<String, String>
offsetRepository) {
+ public OffsetKafkaConsumerResumeStrategy(StateRepository<String, String>
offsetRepository) {
this.offsetRepository = offsetRepository;
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index e07db10..0420041 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -38,7 +38,7 @@ public class PartitionAssignmentListener implements
ConsumerRebalanceListener {
private final KafkaConfiguration configuration;
private final KafkaConsumer consumer;
private final Map<String, Long> lastProcessedOffset;
- private final ResumeStrategy resumeStrategy;
+ private final KafkaConsumerResumeStrategy resumeStrategy;
private Supplier<Boolean> stopStateSupplier;
public PartitionAssignmentListener(String threadId, String topicName,
KafkaConfiguration configuration,
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 44bd588..272f467 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -27,7 +27,7 @@ public final class ResumeStrategyFactory {
/**
* A NO-OP resume strategy that does nothing (i.e.: no resume)
*/
- private static class NoOpResumeStrategy implements ResumeStrategy {
+ private static class NoOpKafkaConsumerResumeStrategy implements
KafkaConsumerResumeStrategy {
@SuppressWarnings("unused")
@Override
public void resume(KafkaConsumer<?, ?> consumer) {
@@ -35,13 +35,13 @@ public final class ResumeStrategyFactory {
}
}
- private static final NoOpResumeStrategy NO_OP_RESUME_STRATEGY = new
NoOpResumeStrategy();
+ private static final NoOpKafkaConsumerResumeStrategy NO_OP_RESUME_STRATEGY
= new NoOpKafkaConsumerResumeStrategy();
private static final Logger LOG =
LoggerFactory.getLogger(ResumeStrategyFactory.class);
private ResumeStrategyFactory() {
}
- public static ResumeStrategy newResumeStrategy(KafkaConfiguration
configuration) {
+ public static KafkaConsumerResumeStrategy
newResumeStrategy(KafkaConfiguration configuration) {
if (configuration.getResumeStrategy() != null) {
return configuration.getResumeStrategy();
@@ -50,16 +50,16 @@ public final class ResumeStrategyFactory {
return builtinResumeStrategies(configuration);
}
- private static ResumeStrategy builtinResumeStrategies(KafkaConfiguration
configuration) {
+ private static KafkaConsumerResumeStrategy
builtinResumeStrategies(KafkaConfiguration configuration) {
StateRepository<String, String> offsetRepository =
configuration.getOffsetRepository();
String seekTo = configuration.getSeekTo();
if (offsetRepository != null) {
LOG.info("Using resume from offset strategy");
- return new OffsetResumeStrategy(offsetRepository);
+ return new OffsetKafkaConsumerResumeStrategy(offsetRepository);
} else if (seekTo != null) {
LOG.info("Using resume from seek policy strategy with seeking from
{}", seekTo);
- return new SeekPolicyResumeStrategy(seekTo);
+ return new SeekPolicyKafkaConsumerResumeStrategy(seekTo);
}
LOG.info("Using NO-OP resume strategy");
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
similarity index 88%
rename from
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
rename to
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
index d58a23a..969f0df 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
@@ -24,12 +24,12 @@ import org.slf4j.LoggerFactory;
/**
* A resume strategy that uses Camel's seekTo configuration for resuming
*/
-public class SeekPolicyResumeStrategy implements ResumeStrategy {
- private static final Logger LOG =
LoggerFactory.getLogger(SeekPolicyResumeStrategy.class);
+public class SeekPolicyKafkaConsumerResumeStrategy implements
KafkaConsumerResumeStrategy {
+ private static final Logger LOG =
LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeStrategy.class);
private final String seekPolicy;
- public SeekPolicyResumeStrategy(String seekPolicy) {
+ public SeekPolicyKafkaConsumerResumeStrategy(String seekPolicy) {
this.seekPolicy = seekPolicy;
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithCustomResumeStrategyIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
similarity index 88%
rename from
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithCustomResumeStrategyIT.java
rename to
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
index 21040d0..192a1a6 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithCustomResumeStrategyIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.BindToRegistry;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
+import
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.AfterEach;
@@ -33,22 +33,22 @@ import org.junit.jupiter.api.Timeout;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class KafkaConsumerWithCustomResumeStrategyIT extends
BaseEmbeddedKafkaTestSupport {
+public class KafkaConsumerWithResumeStrategyIT extends
BaseEmbeddedKafkaTestSupport {
private static final String TOPIC = "custom-resume";
@EndpointInject("mock:result")
private MockEndpoint result;
@BindToRegistry("resumeStrategy")
- private TestResumeStrategy resumeStrategy;
+ private TestKafkaConsumerResumeStrategy resumeStrategy;
private CountDownLatch messagesLatch;
- private static class TestResumeStrategy implements ResumeStrategy {
+ private static class TestKafkaConsumerResumeStrategy implements
KafkaConsumerResumeStrategy {
private final CountDownLatch messagesLatch;
private boolean resumeCalled;
private boolean consumerIsNull = true;
- public TestResumeStrategy(CountDownLatch messagesLatch) {
+ public TestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch) {
this.messagesLatch = messagesLatch;
}
@@ -75,7 +75,7 @@ public class KafkaConsumerWithCustomResumeStrategyIT extends
BaseEmbeddedKafkaTe
@Override
protected void doPreSetup() {
messagesLatch = new CountDownLatch(1);
- resumeStrategy = new TestResumeStrategy(messagesLatch);
+ resumeStrategy = new TestKafkaConsumerResumeStrategy(messagesLatch);
}
@Test
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
b/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
similarity index 61%
rename from
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
rename to core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
index ebe7ed6..17f9828 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
@@ -15,21 +15,19 @@
* limitations under the License.
*/
-package org.apache.camel.component.kafka.consumer.support;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+package org.apache.camel;
/**
* Defines a strategy for handling resume operations. Implementations can
define different ways to handle how to resume
* processing records.
*/
-public interface ResumeStrategy {
+public interface ResumeStrategy<T> {
+
/**
- * Perform the resume operation. This runs in the scope of the Kafka
Consumer thread and may run concurrently with
- * other consumer instances when the component is set up to use more than
one of them. As such, implementations are
- * responsible for ensuring the thread-safety of the operations within the
resume method.
+ * A consumer, iterator or value class that can be used to set the index
position from which to resume from. The
+ * type is specific to the component.
*
- * @param consumer an instance of the KafkaConsumer which is resuming the
operation
+ * @param resumable A resumable object.
*/
- void resume(KafkaConsumer<?, ?> consumer);
+ void resume(T resumable);
}
diff --git
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 941ee86..a6f2815 100644
---
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -679,13 +679,13 @@ public interface KafkaComponentBuilderFactory {
* connecting or reconnecting). It allows implementations to customize
* how to resume operations and serve as more flexible alternative to
* the seekTo and the offsetRepository mechanisms. See the
- * ResumeStrategy for implementation details. This option does not
- * affect the auto commit setting. It is likely that implementations
- * using this setting will also want to evaluate using the manual
commit
- * option along with this.
+ * KafkaConsumerResumeStrategy for implementation details. This option
+ * does not affect the auto commit setting. It is likely that
+ * implementations using this setting will also want to evaluate using
+ * the manual commit option along with this.
*
* The option is a:
- *
<code>org.apache.camel.component.kafka.consumer.support.ResumeStrategy</code>
type.
+ *
<code>org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy</code>
type.
*
* Group: consumer
*
@@ -693,7 +693,7 @@ public interface KafkaComponentBuilderFactory {
* @return the dsl builder
*/
default KafkaComponentBuilder resumeStrategy(
-
org.apache.camel.component.kafka.consumer.support.ResumeStrategy
resumeStrategy) {
+
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy
resumeStrategy) {
doSetProperty("resumeStrategy", resumeStrategy);
return this;
}
@@ -2041,7 +2041,7 @@ public interface KafkaComponentBuilderFactory {
case "partitionAssignor":
getOrCreateConfiguration((KafkaComponent)
component).setPartitionAssignor((java.lang.String) value); return true;
case "pollOnError": getOrCreateConfiguration((KafkaComponent)
component).setPollOnError((org.apache.camel.component.kafka.PollOnError)
value); return true;
case "pollTimeoutMs": getOrCreateConfiguration((KafkaComponent)
component).setPollTimeoutMs((java.lang.Long) value); return true;
- case "resumeStrategy": getOrCreateConfiguration((KafkaComponent)
component).setResumeStrategy((org.apache.camel.component.kafka.consumer.support.ResumeStrategy)
value); return true;
+ case "resumeStrategy": getOrCreateConfiguration((KafkaComponent)
component).setResumeStrategy((org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy)
value); return true;
case "seekTo": getOrCreateConfiguration((KafkaComponent)
component).setSeekTo((java.lang.String) value); return true;
case "sessionTimeoutMs": getOrCreateConfiguration((KafkaComponent)
component).setSessionTimeoutMs((java.lang.Integer) value); return true;
case "specificAvroReader":
getOrCreateConfiguration((KafkaComponent)
component).setSpecificAvroReader((boolean) value); return true;
diff --git
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 3eaddbd..c79a913 100644
---
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -1142,13 +1142,13 @@ public interface KafkaEndpointBuilderFactory {
* connecting or reconnecting). It allows implementations to customize
* how to resume operations and serve as more flexible alternative to
* the seekTo and the offsetRepository mechanisms. See the
- * ResumeStrategy for implementation details. This option does not
- * affect the auto commit setting. It is likely that implementations
- * using this setting will also want to evaluate using the manual
commit
- * option along with this.
+ * KafkaConsumerResumeStrategy for implementation details. This option
+ * does not affect the auto commit setting. It is likely that
+ * implementations using this setting will also want to evaluate using
+ * the manual commit option along with this.
*
* The option is a:
- *
<code>org.apache.camel.component.kafka.consumer.support.ResumeStrategy</code>
type.
+ *
<code>org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy</code>
type.
*
* Group: consumer
*
@@ -1166,13 +1166,13 @@ public interface KafkaEndpointBuilderFactory {
* connecting or reconnecting). It allows implementations to customize
* how to resume operations and serve as more flexible alternative to
* the seekTo and the offsetRepository mechanisms. See the
- * ResumeStrategy for implementation details. This option does not
- * affect the auto commit setting. It is likely that implementations
- * using this setting will also want to evaluate using the manual
commit
- * option along with this.
+ * KafkaConsumerResumeStrategy for implementation details. This option
+ * does not affect the auto commit setting. It is likely that
+ * implementations using this setting will also want to evaluate using
+ * the manual commit option along with this.
*
* The option will be converted to a
- *
<code>org.apache.camel.component.kafka.consumer.support.ResumeStrategy</code>
type.
+ *
<code>org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy</code>
type.
*
* Group: consumer
*