This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-2.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.20.x by this push:
new 4baceea CAMEL-12031: KafkaConsumer stops consuming messages when
exception occurs during offset commit
4baceea is described below
commit 4baceea76c9cf794a4bd65f1bd6568e65badd11b
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Dec 18 15:04:08 2017 +0100
CAMEL-12031: KafkaConsumer stops consuming messages when exception occurs
during offset commit
---
.../java/org/apache/camel/component/kafka/KafkaConsumer.java | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 19c9884..52611fc 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -36,6 +36,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
@@ -178,6 +179,7 @@ public class KafkaConsumer extends DefaultConsumer {
protected boolean doRun() {
// allow to re-connect thread in case we use that to retry failed
messages
boolean reConnect = false;
+ boolean unsubscribing = false;
try {
log.info("Subscribing {} to topic {}", threadId, topicName);
@@ -298,12 +300,22 @@ public class KafkaConsumer extends DefaultConsumer {
}
log.info("Unsubscribing {} from topic {}", threadId,
topicName);
+ // we are unsubscribing so do not re connect
+ unsubscribing = true;
consumer.unsubscribe();
} catch (InterruptException e) {
getExceptionHandler().handleException("Interrupted while
consuming " + threadId + " from kafka topic", e);
log.info("Unsubscribing {} from topic {}", threadId,
topicName);
consumer.unsubscribe();
Thread.currentThread().interrupt();
+ } catch (KafkaException e) {
+ // some kind of error in kafka, it may happen during
unsubscribing or during normal processing
+ if (unsubscribing) {
+ getExceptionHandler().handleException("Error unsubscribing
" + threadId + " from kafka topic " + topicName, e);
+ } else {
+ log.warn("KafkaException consuming {} from topic {}. Will
attempt to re-connect on next run", threadId, topicName);
+ reConnect = true;
+ }
} catch (Exception e) {
getExceptionHandler().handleException("Error consuming " +
threadId + " from kafka topic", e);
} finally {
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].