This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 2213f90 (chores) camel-kafka: fixes and cleanups
2213f90 is described below
commit 2213f90d42cf0a12092709f57fbae62834aa3a84
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Thu Sep 16 18:39:06 2021 +0200
(chores) camel-kafka: fixes and cleanups
- consolidate common code
- adjust the hash map containing the last processed offset to not use a
concurrent collection (the instance is only accessed within the local
consumer thread and should be safe using a non-concurrent collection)
---
.../main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java | 4 ++--
.../camel/component/kafka/consumer/support/KafkaRecordProcessor.java | 4 +---
2 files changed, 3 insertions(+), 5 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 6394770..086432e 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -18,12 +18,12 @@ package org.apache.camel.component.kafka;
import java.time.Duration;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
@@ -56,7 +56,7 @@ class KafkaFetchRecords implements Runnable {
private final Pattern topicPattern;
private final String threadId;
private final Properties kafkaProps;
- private final Map<String, Long> lastProcessedOffset = new
ConcurrentHashMap<>();
+ private final Map<String, Long> lastProcessedOffset = new HashMap<>();
private final PollExceptionStrategy pollExceptionStrategy;
private final BridgeExceptionHandlerToErrorHandler bridge;
private final ReentrantLock lock = new ReentrantLock();
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index e48f78c..f1a70b0 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -132,6 +132,7 @@ public class KafkaRecordProcessor {
// if not auto commit then we have additional information on the
exchange
if (!autoCommitEnabled) {
message.setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
!recordHasNext);
+ message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext
&& !partitionHasNext);
}
if (configuration.isAllowManualCommit()) {
@@ -141,9 +142,6 @@ public class KafkaRecordProcessor {
KafkaManualCommit manual =
manualCommitFactory.newInstance(exchange, consumer, partition.topic(), threadId,
offsetRepository, partition, record.offset(),
configuration.getCommitTimeoutMs());
message.setHeader(KafkaConstants.MANUAL_COMMIT, manual);
- }
- // if commit management is on user side give additional info for the
end of poll loop
- if (!autoCommitEnabled || configuration.isAllowManualCommit()) {
message.setHeader(KafkaConstants.LAST_POLL_RECORD, !recordHasNext
&& !partitionHasNext);
}