This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 711ab31 Camel 14935 kafka exception (#3877)
711ab31 is described below
commit 711ab31c29615deffe5b7fe3dc4ba1235cbd9979
Author: Darius <[email protected]>
AuthorDate: Wed Jun 3 00:14:33 2020 -0400
Camel 14935 kafka exception (#3877)
* CAMEL-14935: Fix issue with failing commits on rebalance
* Log the exception on partion revoked, and rethrow
* Fixed, based on pull request feedback
---
.../java/org/apache/camel/component/kafka/KafkaConsumer.java | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index df469b0..30f33fa 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -460,8 +460,15 @@ public class KafkaConsumer extends DefaultConsumer {
offset = -1L;
}
LOG.debug("Saving offset repository state {} from offsetKey {}
with offset: {}", threadId, offsetKey, offset);
- commitOffset(offsetRepository, partition, offset, true);
- lastProcessedOffset.remove(offsetKey);
+ try {
+ commitOffset(offsetRepository, partition, offset, true);
+ } catch (java.lang.Exception e) {
+ LOG.error("Error saving offset repository state {} from
offsetKey {} with offset: {}", threadId, offsetKey, offset);
+ throw e;
+ } finally {
+ lastProcessedOffset.remove(offsetKey);
+ }
+
}
}