This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 58c074d  CAMEL-17045: refactored Kafka's consumer strategy
58c074d is described below

commit 58c074d8f9098ac354fda52923785b08f2048e86
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Oct 4 15:13:50 2021 +0200

    CAMEL-17045: refactored Kafka's consumer strategy
    
    This allows it to be reusable for other components
---
 .../org/apache/camel/catalog/components/kafka.json   |  4 ++--
 .../component/kafka/KafkaComponentConfigurer.java    |  4 ++--
 .../component/kafka/KafkaEndpointConfigurer.java     |  4 ++--
 .../org/apache/camel/component/kafka/kafka.json      |  4 ++--
 .../camel/component/kafka/KafkaConfiguration.java    | 10 +++++-----
 ...trategy.java => KafkaConsumerResumeStrategy.java} |  3 ++-
 ...y.java => OffsetKafkaConsumerResumeStrategy.java} |  6 +++---
 .../support/PartitionAssignmentListener.java         |  2 +-
 .../consumer/support/ResumeStrategyFactory.java      | 12 ++++++------
 ...va => SeekPolicyKafkaConsumerResumeStrategy.java} |  6 +++---
 ...T.java => KafkaConsumerWithResumeStrategyIT.java} | 12 ++++++------
 .../main/java/org/apache/camel}/ResumeStrategy.java  | 16 +++++++---------
 .../component/dsl/KafkaComponentBuilderFactory.java  | 14 +++++++-------
 .../endpoint/dsl/KafkaEndpointBuilderFactory.java    | 20 ++++++++++----------
 14 files changed, 58 insertions(+), 59 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index c044147..473b8d8 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -55,7 +55,7 @@
     "partitionAssignor": { "kind": "property", "displayName": "Partition 
Assignor", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The class name of the partition assignme [...]
     "pollOnError": { "kind": "property", "displayName": "Poll On Error", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ 
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "de [...]
     "pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms", 
"group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used when polling the 
KafkaConsumer." },
-    "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This option allows the user to set a custom 
resume strategy. The  [...]
+    "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": 
"org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy",
 "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "This option allows the 
user to set a custom resume s [...]
     "seekTo": { "kind": "property", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Set if KafkaConsumer will read from beginning 
or end on startup: beginning : read from beginning  [...]
     "sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "10000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used to detect failures when using 
Kafka's group management facilities." },
     "specificAvroReader": { "kind": "property", "displayName": "Specific Avro 
Reader", "group": "consumer", "label": "confluent,consumer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This enables the use of a specific Avro reader 
for use with the Confluent Platf [...]
@@ -161,7 +161,7 @@
     "partitionAssignor": { "kind": "parameter", "displayName": "Partition 
Assignor", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The class name of the partition assignm [...]
     "pollOnError": { "kind": "parameter", "displayName": "Poll On Error", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ 
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "d [...]
     "pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms", 
"group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used when polling the 
KafkaConsumer." },
-    "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This option allows the user to set a custom 
resume strategy. The [...]
+    "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": 
"org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy",
 "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "This option allows the 
user to set a custom resume  [...]
     "seekTo": { "kind": "parameter", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Set if KafkaConsumer will read from beginning 
or end on startup: beginning : read from beginning [...]
     "sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "10000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used to detect failures when using 
Kafka's group management facilities." },
     "specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro 
Reader", "group": "consumer", "label": "confluent,consumer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This enables the use of a specific Avro reader 
for use with the Confluent Plat [...]
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 53926fa..3353bdd 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -161,7 +161,7 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "requesttimeoutms":
         case "requestTimeoutMs": 
getOrCreateConfiguration(target).setRequestTimeoutMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "resumestrategy":
-        case "resumeStrategy": 
getOrCreateConfiguration(target).setResumeStrategy(property(camelContext, 
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class, 
value)); return true;
+        case "resumeStrategy": 
getOrCreateConfiguration(target).setResumeStrategy(property(camelContext, 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy.class,
 value)); return true;
         case "retries": 
getOrCreateConfiguration(target).setRetries(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "retrybackoffms":
         case "retryBackoffMs": 
getOrCreateConfiguration(target).setRetryBackoffMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
@@ -375,7 +375,7 @@ public class KafkaComponentConfigurer extends 
PropertyConfigurerSupport implemen
         case "requesttimeoutms":
         case "requestTimeoutMs": return java.lang.Integer.class;
         case "resumestrategy":
-        case "resumeStrategy": return 
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class;
+        case "resumeStrategy": return 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy.class;
         case "retries": return java.lang.Integer.class;
         case "retrybackoffms":
         case "retryBackoffMs": return java.lang.Integer.class;
diff --git 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index 467b99b..911bc7c 100644
--- 
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++ 
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -149,7 +149,7 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "requesttimeoutms":
         case "requestTimeoutMs": 
target.getConfiguration().setRequestTimeoutMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "resumestrategy":
-        case "resumeStrategy": 
target.getConfiguration().setResumeStrategy(property(camelContext, 
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class, 
value)); return true;
+        case "resumeStrategy": 
target.getConfiguration().setResumeStrategy(property(camelContext, 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy.class,
 value)); return true;
         case "retries": 
target.getConfiguration().setRetries(property(camelContext, 
java.lang.Integer.class, value)); return true;
         case "retrybackoffms":
         case "retryBackoffMs": 
target.getConfiguration().setRetryBackoffMs(property(camelContext, 
java.lang.Integer.class, value)); return true;
@@ -351,7 +351,7 @@ public class KafkaEndpointConfigurer extends 
PropertyConfigurerSupport implement
         case "requesttimeoutms":
         case "requestTimeoutMs": return java.lang.Integer.class;
         case "resumestrategy":
-        case "resumeStrategy": return 
org.apache.camel.component.kafka.consumer.support.ResumeStrategy.class;
+        case "resumeStrategy": return 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy.class;
         case "retries": return java.lang.Integer.class;
         case "retrybackoffms":
         case "retryBackoffMs": return java.lang.Integer.class;
diff --git 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index c044147..473b8d8 100644
--- 
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ 
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -55,7 +55,7 @@
     "partitionAssignor": { "kind": "property", "displayName": "Partition 
Assignor", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The class name of the partition assignme [...]
     "pollOnError": { "kind": "property", "displayName": "Poll On Error", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ 
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "de [...]
     "pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms", 
"group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used when polling the 
KafkaConsumer." },
-    "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This option allows the user to set a custom 
resume strategy. The  [...]
+    "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": 
"org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy",
 "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "This option allows the 
user to set a custom resume s [...]
     "seekTo": { "kind": "property", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Set if KafkaConsumer will read from beginning 
or end on startup: beginning : read from beginning  [...]
     "sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "10000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used to detect failures when using 
Kafka's group management facilities." },
     "specificAvroReader": { "kind": "property", "displayName": "Specific Avro 
Reader", "group": "consumer", "label": "confluent,consumer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This enables the use of a specific Avro reader 
for use with the Confluent Platf [...]
@@ -161,7 +161,7 @@
     "partitionAssignor": { "kind": "parameter", "displayName": "Partition 
Assignor", "group": "consumer", "label": "consumer", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": 
"org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The class name of the partition assignm [...]
     "pollOnError": { "kind": "parameter", "displayName": "Poll On Error", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ 
"DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": 
false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "d [...]
     "pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms", 
"group": "consumer", "label": "consumer", "required": false, "type": 
"duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "5000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used when polling the 
KafkaConsumer." },
-    "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.kafka.consumer.support.ResumeStrategy", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This option allows the user to set a custom 
resume strategy. The [...]
+    "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": 
"org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy",
 "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", 
"configurationField": "configuration", "description": "This option allows the 
user to set a custom resume  [...]
     "seekTo": { "kind": "parameter", "displayName": "Seek To", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": 
false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "Set if KafkaConsumer will read from beginning 
or end on startup: beginning : read from beginning [...]
     "sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout 
Ms", "group": "consumer", "label": "consumer", "required": false, "type": 
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "10000", "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "The timeout used to detect failures when using 
Kafka's group management facilities." },
     "specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro 
Reader", "group": "consumer", "label": "confluent,consumer", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": 
"configuration", "description": "This enables the use of a specific Avro reader 
for use with the Confluent Plat [...]
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index b357ed9..16f35e0 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -26,7 +26,7 @@ import java.util.stream.Collectors;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
+import 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -149,7 +149,7 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
     private Long commitTimeoutMs = 5000L;
 
     @UriParam(label = "consumer")
-    private ResumeStrategy resumeStrategy;
+    private KafkaConsumerResumeStrategy resumeStrategy;
 
     // Producer configuration properties
     @UriParam(label = "producer", defaultValue = 
KafkaConstants.KAFKA_DEFAULT_PARTITIONER)
@@ -801,7 +801,7 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
         this.breakOnFirstError = breakOnFirstError;
     }
 
-    public ResumeStrategy getResumeStrategy() {
+    public KafkaConsumerResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 
@@ -810,14 +810,14 @@ public class KafkaConfiguration implements Cloneable, 
HeaderFilterStrategyAware
      * assigned (i.e.: when connecting or reconnecting). It allows 
implementations to customize how to resume operations
      * and serve as more flexible alternative to the seekTo and the 
offsetRepository mechanisms.
      *
-     * See the {@link ResumeStrategy} for implementation details.
+     * See the {@link KafkaConsumerResumeStrategy} for implementation details.
      *
      * This option does not affect the auto commit setting. It is likely that 
implementations using this setting will
      * also want to evaluate using the manual commit option along with this.
      *
      * @param resumeStrategy An instance of the resume strategy
      */
-    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(KafkaConsumerResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
similarity index 92%
copy from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
copy to 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
index ebe7ed6..d85579b 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
@@ -17,13 +17,14 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
+import org.apache.camel.ResumeStrategy;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 
 /**
  * Defines a strategy for handling resume operations. Implementations can 
define different ways to handle how to resume
  * processing records.
  */
-public interface ResumeStrategy {
+public interface KafkaConsumerResumeStrategy extends 
ResumeStrategy<KafkaConsumer<?, ?>> {
     /**
      * Perform the resume operation. This runs in the scope of the Kafka 
Consumer thread and may run concurrently with
      * other consumer instances when the component is set up to use more than 
one of them. As such, implementations are
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
similarity index 91%
rename from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
rename to 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
index 9788f27..7c13eb1 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
@@ -31,12 +31,12 @@ import static 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProce
 /**
  * A resume strategy that uses Kafka's offset for resuming
  */
-public class OffsetResumeStrategy implements ResumeStrategy {
-    private static final Logger LOG = 
LoggerFactory.getLogger(OffsetResumeStrategy.class);
+public class OffsetKafkaConsumerResumeStrategy implements 
KafkaConsumerResumeStrategy {
+    private static final Logger LOG = 
LoggerFactory.getLogger(OffsetKafkaConsumerResumeStrategy.class);
 
     private final StateRepository<String, String> offsetRepository;
 
-    public OffsetResumeStrategy(StateRepository<String, String> 
offsetRepository) {
+    public OffsetKafkaConsumerResumeStrategy(StateRepository<String, String> 
offsetRepository) {
         this.offsetRepository = offsetRepository;
     }
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index e07db10..0420041 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -38,7 +38,7 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
     private final KafkaConfiguration configuration;
     private final KafkaConsumer consumer;
     private final Map<String, Long> lastProcessedOffset;
-    private final ResumeStrategy resumeStrategy;
+    private final KafkaConsumerResumeStrategy resumeStrategy;
     private Supplier<Boolean> stopStateSupplier;
 
     public PartitionAssignmentListener(String threadId, String topicName, 
KafkaConfiguration configuration,
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 44bd588..272f467 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -27,7 +27,7 @@ public final class ResumeStrategyFactory {
     /**
      * A NO-OP resume strategy that does nothing (i.e.: no resume)
      */
-    private static class NoOpResumeStrategy implements ResumeStrategy {
+    private static class NoOpKafkaConsumerResumeStrategy implements 
KafkaConsumerResumeStrategy {
         @SuppressWarnings("unused")
         @Override
         public void resume(KafkaConsumer<?, ?> consumer) {
@@ -35,13 +35,13 @@ public final class ResumeStrategyFactory {
         }
     }
 
-    private static final NoOpResumeStrategy NO_OP_RESUME_STRATEGY = new 
NoOpResumeStrategy();
+    private static final NoOpKafkaConsumerResumeStrategy NO_OP_RESUME_STRATEGY 
= new NoOpKafkaConsumerResumeStrategy();
     private static final Logger LOG = 
LoggerFactory.getLogger(ResumeStrategyFactory.class);
 
     private ResumeStrategyFactory() {
     }
 
-    public static ResumeStrategy newResumeStrategy(KafkaConfiguration 
configuration) {
+    public static KafkaConsumerResumeStrategy 
newResumeStrategy(KafkaConfiguration configuration) {
 
         if (configuration.getResumeStrategy() != null) {
             return configuration.getResumeStrategy();
@@ -50,16 +50,16 @@ public final class ResumeStrategyFactory {
         return builtinResumeStrategies(configuration);
     }
 
-    private static ResumeStrategy builtinResumeStrategies(KafkaConfiguration 
configuration) {
+    private static KafkaConsumerResumeStrategy 
builtinResumeStrategies(KafkaConfiguration configuration) {
         StateRepository<String, String> offsetRepository = 
configuration.getOffsetRepository();
         String seekTo = configuration.getSeekTo();
 
         if (offsetRepository != null) {
             LOG.info("Using resume from offset strategy");
-            return new OffsetResumeStrategy(offsetRepository);
+            return new OffsetKafkaConsumerResumeStrategy(offsetRepository);
         } else if (seekTo != null) {
             LOG.info("Using resume from seek policy strategy with seeking from 
{}", seekTo);
-            return new SeekPolicyResumeStrategy(seekTo);
+            return new SeekPolicyKafkaConsumerResumeStrategy(seekTo);
         }
 
         LOG.info("Using NO-OP resume strategy");
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
similarity index 88%
rename from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
rename to 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
index d58a23a..969f0df 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
@@ -24,12 +24,12 @@ import org.slf4j.LoggerFactory;
 /**
  * A resume strategy that uses Camel's seekTo configuration for resuming
  */
-public class SeekPolicyResumeStrategy implements ResumeStrategy {
-    private static final Logger LOG = 
LoggerFactory.getLogger(SeekPolicyResumeStrategy.class);
+public class SeekPolicyKafkaConsumerResumeStrategy implements 
KafkaConsumerResumeStrategy {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeStrategy.class);
 
     private final String seekPolicy;
 
-    public SeekPolicyResumeStrategy(String seekPolicy) {
+    public SeekPolicyKafkaConsumerResumeStrategy(String seekPolicy) {
         this.seekPolicy = seekPolicy;
     }
 
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithCustomResumeStrategyIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
similarity index 88%
rename from 
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithCustomResumeStrategyIT.java
rename to 
components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
index 21040d0..192a1a6 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithCustomResumeStrategyIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.consumer.support.ResumeStrategy;
+import 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.junit.jupiter.api.AfterEach;
@@ -33,22 +33,22 @@ import org.junit.jupiter.api.Timeout;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class KafkaConsumerWithCustomResumeStrategyIT extends 
BaseEmbeddedKafkaTestSupport {
+public class KafkaConsumerWithResumeStrategyIT extends 
BaseEmbeddedKafkaTestSupport {
     private static final String TOPIC = "custom-resume";
 
     @EndpointInject("mock:result")
     private MockEndpoint result;
 
     @BindToRegistry("resumeStrategy")
-    private TestResumeStrategy resumeStrategy;
+    private TestKafkaConsumerResumeStrategy resumeStrategy;
     private CountDownLatch messagesLatch;
 
-    private static class TestResumeStrategy implements ResumeStrategy {
+    private static class TestKafkaConsumerResumeStrategy implements 
KafkaConsumerResumeStrategy {
         private final CountDownLatch messagesLatch;
         private boolean resumeCalled;
         private boolean consumerIsNull = true;
 
-        public TestResumeStrategy(CountDownLatch messagesLatch) {
+        public TestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch) {
             this.messagesLatch = messagesLatch;
         }
 
@@ -75,7 +75,7 @@ public class KafkaConsumerWithCustomResumeStrategyIT extends 
BaseEmbeddedKafkaTe
     @Override
     protected void doPreSetup() {
         messagesLatch = new CountDownLatch(1);
-        resumeStrategy = new TestResumeStrategy(messagesLatch);
+        resumeStrategy = new TestKafkaConsumerResumeStrategy(messagesLatch);
     }
 
     @Test
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
 b/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
similarity index 61%
rename from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
rename to core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
index ebe7ed6..17f9828 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
@@ -15,21 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.kafka.consumer.support;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+package org.apache.camel;
 
 /**
  * Defines a strategy for handling resume operations. Implementations can 
define different ways to handle how to resume
  * processing records.
  */
-public interface ResumeStrategy {
+public interface ResumeStrategy<T> {
+
     /**
-     * Perform the resume operation. This runs in the scope of the Kafka 
Consumer thread and may run concurrently with
-     * other consumer instances when the component is set up to use more than 
one of them. As such, implementations are
-     * responsible for ensuring the thread-safety of the operations within the 
resume method.
+     * A consumer, iterator or value class that can be used to set the index 
position from which to resume from. The
+     * type is specific to the component.
      *
-     * @param consumer an instance of the KafkaConsumer which is resuming the 
operation
+     * @param resumable A resumable object.
      */
-    void resume(KafkaConsumer<?, ?> consumer);
+    void resume(T resumable);
 }
diff --git 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 941ee86..a6f2815 100644
--- 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -679,13 +679,13 @@ public interface KafkaComponentBuilderFactory {
          * connecting or reconnecting). It allows implementations to customize
          * how to resume operations and serve as more flexible alternative to
          * the seekTo and the offsetRepository mechanisms. See the
-         * ResumeStrategy for implementation details. This option does not
-         * affect the auto commit setting. It is likely that implementations
-         * using this setting will also want to evaluate using the manual 
commit
-         * option along with this.
+         * KafkaConsumerResumeStrategy for implementation details. This option
+         * does not affect the auto commit setting. It is likely that
+         * implementations using this setting will also want to evaluate using
+         * the manual commit option along with this.
          * 
          * The option is a:
-         * 
&lt;code&gt;org.apache.camel.component.kafka.consumer.support.ResumeStrategy&lt;/code&gt;
 type.
+         * 
&lt;code&gt;org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy&lt;/code&gt;
 type.
          * 
          * Group: consumer
          * 
@@ -693,7 +693,7 @@ public interface KafkaComponentBuilderFactory {
          * @return the dsl builder
          */
         default KafkaComponentBuilder resumeStrategy(
-                
org.apache.camel.component.kafka.consumer.support.ResumeStrategy 
resumeStrategy) {
+                
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy 
resumeStrategy) {
             doSetProperty("resumeStrategy", resumeStrategy);
             return this;
         }
@@ -2041,7 +2041,7 @@ public interface KafkaComponentBuilderFactory {
             case "partitionAssignor": 
getOrCreateConfiguration((KafkaComponent) 
component).setPartitionAssignor((java.lang.String) value); return true;
             case "pollOnError": getOrCreateConfiguration((KafkaComponent) 
component).setPollOnError((org.apache.camel.component.kafka.PollOnError) 
value); return true;
             case "pollTimeoutMs": getOrCreateConfiguration((KafkaComponent) 
component).setPollTimeoutMs((java.lang.Long) value); return true;
-            case "resumeStrategy": getOrCreateConfiguration((KafkaComponent) 
component).setResumeStrategy((org.apache.camel.component.kafka.consumer.support.ResumeStrategy)
 value); return true;
+            case "resumeStrategy": getOrCreateConfiguration((KafkaComponent) 
component).setResumeStrategy((org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy)
 value); return true;
             case "seekTo": getOrCreateConfiguration((KafkaComponent) 
component).setSeekTo((java.lang.String) value); return true;
             case "sessionTimeoutMs": getOrCreateConfiguration((KafkaComponent) 
component).setSessionTimeoutMs((java.lang.Integer) value); return true;
             case "specificAvroReader": 
getOrCreateConfiguration((KafkaComponent) 
component).setSpecificAvroReader((boolean) value); return true;
diff --git 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 3eaddbd..c79a913 100644
--- 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -1142,13 +1142,13 @@ public interface KafkaEndpointBuilderFactory {
          * connecting or reconnecting). It allows implementations to customize
          * how to resume operations and serve as more flexible alternative to
          * the seekTo and the offsetRepository mechanisms. See the
-         * ResumeStrategy for implementation details. This option does not
-         * affect the auto commit setting. It is likely that implementations
-         * using this setting will also want to evaluate using the manual 
commit
-         * option along with this.
+         * KafkaConsumerResumeStrategy for implementation details. This option
+         * does not affect the auto commit setting. It is likely that
+         * implementations using this setting will also want to evaluate using
+         * the manual commit option along with this.
          * 
          * The option is a:
-         * 
&lt;code&gt;org.apache.camel.component.kafka.consumer.support.ResumeStrategy&lt;/code&gt;
 type.
+         * 
&lt;code&gt;org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy&lt;/code&gt;
 type.
          * 
          * Group: consumer
          * 
@@ -1166,13 +1166,13 @@ public interface KafkaEndpointBuilderFactory {
          * connecting or reconnecting). It allows implementations to customize
          * how to resume operations and serve as more flexible alternative to
          * the seekTo and the offsetRepository mechanisms. See the
-         * ResumeStrategy for implementation details. This option does not
-         * affect the auto commit setting. It is likely that implementations
-         * using this setting will also want to evaluate using the manual 
commit
-         * option along with this.
+         * KafkaConsumerResumeStrategy for implementation details. This option
+         * does not affect the auto commit setting. It is likely that
+         * implementations using this setting will also want to evaluate using
+         * the manual commit option along with this.
          * 
          * The option will be converted to a
-         * 
&lt;code&gt;org.apache.camel.component.kafka.consumer.support.ResumeStrategy&lt;/code&gt;
 type.
+         * 
&lt;code&gt;org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy&lt;/code&gt;
 type.
          * 
          * Group: consumer
          * 

Reply via email to