This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fe095bc545b6212ea3e8e72265e7c4df3b9c36a7
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon Aug 8 16:31:58 2022 +0200

    CAMEL-18362: rework the consumer/producer
    
    - Ensure the consumer is mostly isolated in its own thread
    - Prevent closing the producer while an offset update is happening
---
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 190 +++++++++------------
 1 file changed, 84 insertions(+), 106 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index b731e9b0c1c..a527b66c9d7 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -23,13 +23,13 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.Queue;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.resume.Cacheable;
@@ -41,7 +41,6 @@ import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.util.IOHelper;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -50,6 +49,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,12 +70,12 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
     private ResumeAdapter adapter;
     private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
     private final ExecutorService executorService;
+    private final ReentrantLock lock = new ReentrantLock();
 
     /**
      * Builds an instance of this class
      *
      * @param resumeStrategyConfiguration the configuration to use for this 
strategy instance
-     *
      */
     public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration 
resumeStrategyConfiguration) {
         this.resumeStrategyConfiguration = resumeStrategyConfiguration;
@@ -86,7 +86,6 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
      * Builds an instance of this class
      *
      * @param resumeStrategyConfiguration the configuration to use for this 
strategy instance
-     *
      */
     public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration 
resumeStrategyConfiguration,
                                          ExecutorService executorService) {
@@ -98,11 +97,10 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
      * Sends data to a topic. The records will always be sent asynchronously. 
If there's an error, a producer error
      * counter will be increased.
      *
-     * @see                         
SingleNodeKafkaResumeStrategy#getProducerErrors()
      * @param  message              the message to send
      * @throws ExecutionException
      * @throws InterruptedException
-     *
+     * @see                         
SingleNodeKafkaResumeStrategy#getProducerErrors()
      */
     protected void produce(byte[] key, byte[] message) throws 
ExecutionException, InterruptedException {
         ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(resumeStrategyConfiguration.getTopic(), key, message);
@@ -125,8 +123,6 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
 
     @Override
     public void updateLastOffset(T offset) throws Exception {
-        createProducer();
-
         OffsetKey<?> key = offset.getOffsetKey();
         Offset<?> offsetValue = offset.getLastOffset();
 
@@ -137,39 +133,63 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
         ByteBuffer keyBuffer = key.serialize();
         ByteBuffer valueBuffer = offsetValue.serialize();
 
-        produce(keyBuffer.array(), valueBuffer.array());
+        try {
+            lock.lock();
+            produce(keyBuffer.array(), valueBuffer.array());
+        } finally {
+            lock.unlock();
+        }
 
         doAdd(key, offsetValue);
     }
 
     /**
      * Loads the existing data into the cache
-     *
-     * @throws Exception
      */
-    public void loadCache() throws Exception {
-        createConsumer();
-
-        subscribe();
-
-        LOG.debug("Loading records from topic {}", 
resumeStrategyConfiguration.getTopic());
-
+    @Override
+    public void loadCache() {
         if (!(adapter instanceof Deserializable)) {
             throw new RuntimeCamelException("Cannot load data for an adapter 
that is not deserializable");
         }
 
-        executorService.submit(() -> refresh());
+        executorService.submit(this::refresh);
+    }
+
+    /**
+     * Launch a thread to refresh the offsets periodically
+     */
+    private void refresh() {
+        LOG.trace("Creating a offset cache refresher");
+
+        try {
+            consumer = createConsumer();
+
+            subscribe(consumer);
+
+            LOG.debug("Loading records from topic {}", 
resumeStrategyConfiguration.getTopic());
+            
consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
+
+            poll(consumer);
+        } catch (WakeupException e) {
+            LOG.info("Kafka consumer was interrupted during a blocking call");
+        } catch (Exception e) {
+            LOG.error("Error while refreshing the local cache: {}", 
e.getMessage(), e);
+        } finally {
+            if (consumer != null) {
+                consumer.unsubscribe();
+                consumer.close(Duration.ofSeconds(5));
+            }
+        }
     }
 
-    protected void poll() {
+    protected void poll(Consumer<byte[], byte[]> consumer) {
         Deserializable deserializable = (Deserializable) adapter;
 
-        ConsumerRecords<byte[], byte[]> records;
         do {
-            records = consume();
+            ConsumerRecords<byte[], byte[]> records = consume(consumer);
 
             if (records.isEmpty()) {
-                break;
+                continue;
             }
 
             for (ConsumerRecord<byte[], byte[]> record : records) {
@@ -191,7 +211,7 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
      *
      * @param topic the topic to consume the messages from
      */
-    protected void checkAndSubscribe(String topic) {
+    protected void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String 
topic) {
         if (!subscribed) {
             consumer.subscribe(Collections.singletonList(topic));
 
@@ -205,26 +225,18 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
      * @param topic     the topic to consume the messages from
      * @param remaining the number of messages to rewind from the last offset 
position (used to fill the cache)
      */
-    public void checkAndSubscribe(String topic, long remaining) {
+    public void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String 
topic, long remaining) {
         if (!subscribed) {
-            consumer.subscribe(Collections.singletonList(topic), 
getConsumerRebalanceListener(remaining));
-
+            consumer.subscribe(Collections.singletonList(topic), 
getConsumerRebalanceListener(consumer, remaining));
             subscribed = true;
         }
     }
 
-    /**
-     * Creates a new consumer rebalance listener. This can be useful for 
setting the exact Kafka offset when necessary
-     * to read a limited amount of messages or customize the resume strategy 
behavior when a rebalance occurs.
-     *
-     * @param  remaining the number of remaining messages on the topic to try 
to collect
-     * @return
-     */
-    protected ConsumerRebalanceListener getConsumerRebalanceListener(long 
remaining) {
+    private ConsumerRebalanceListener 
getConsumerRebalanceListener(Consumer<byte[], byte[]> consumer, long remaining) 
{
         return new ConsumerRebalanceListener() {
             @Override
             public void onPartitionsRevoked(Collection<TopicPartition> 
collection) {
-
+                // NO-OP
             }
 
             @Override
@@ -244,45 +256,15 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
         };
     }
 
-    /**
-     * Unsubscribe from the topic
-     */
-    protected void unsubscribe() {
-        try {
-            consumer.unsubscribe();
-        } catch (IllegalStateException e) {
-            LOG.warn("The consumer is likely already closed. Skipping 
unsubscribing from {}",
-                    resumeStrategyConfiguration.getTopic());
-        } catch (Exception e) {
-            LOG.error("Error unsubscribing from the Kafka topic {}: {}", 
resumeStrategyConfiguration.getTopic(), e.getMessage(),
-                    e);
-        }
-    }
-
     /**
      * Consumes message from the topic previously setup
      *
      * @return An instance of the consumer records
      */
-    protected ConsumerRecords<byte[], byte[]> consume() {
-        int retries = 10;
-
-        return consume(retries);
-    }
-
-    /**
-     * Consumes message from the topic previously setup
-     *
-     * @param  retries how many times to retry consuming data from the topic
-     * @return         An instance of the consumer records
-     */
-    protected ConsumerRecords<byte[], byte[]> consume(int retries) {
-        while (retries > 0) {
-            ConsumerRecords<byte[], byte[]> records = 
consumer.poll(pollDuration);
-            if (!records.isEmpty()) {
-                return records;
-            }
-            retries--;
+    protected ConsumerRecords<byte[], byte[]> consume(Consumer<byte[], byte[]> 
consumer) {
+        ConsumerRecords<byte[], byte[]> records = consumer.poll(pollDuration);
+        if (!records.isEmpty()) {
+            return records;
         }
 
         return ConsumerRecords.empty();
@@ -307,17 +289,17 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
         return ConsumerRecords.empty();
     }
 
-    public void subscribe() throws Exception {
+    private void subscribe(Consumer<byte[], byte[]> consumer) {
         if (adapter instanceof Cacheable) {
             ResumeCache<?> cache = ((Cacheable) adapter).getCache();
 
             if (cache.capacity() >= 1) {
-                checkAndSubscribe(resumeStrategyConfiguration.getTopic(), 
cache.capacity());
+                checkAndSubscribe(consumer, 
resumeStrategyConfiguration.getTopic(), cache.capacity());
             } else {
-                checkAndSubscribe(resumeStrategyConfiguration.getTopic());
+                checkAndSubscribe(consumer, 
resumeStrategyConfiguration.getTopic());
             }
         } else {
-            checkAndSubscribe(resumeStrategyConfiguration.getTopic());
+            checkAndSubscribe(consumer, 
resumeStrategyConfiguration.getTopic());
         }
     }
 
@@ -356,21 +338,38 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
         }
     }
 
-    private void createConsumer() {
-        if (consumer == null) {
-            consumer = new 
KafkaConsumer<>(resumeStrategyConfiguration.getConsumerProperties());
-        }
+    private Consumer<byte[], byte[]> createConsumer() {
+        return new 
KafkaConsumer<>(resumeStrategyConfiguration.getConsumerProperties());
     }
 
     @Override
     public void stop() {
-        LOG.info("Closing the Kafka producer");
-        IOHelper.close(producer, "Kafka producer", LOG);
+        try {
+            LOG.trace("Trying to obtain a lock for closing the producer");
+            if (!lock.tryLock(1, TimeUnit.SECONDS)) {
+                LOG.warn("Failed to obtain a lock for closing the producer. 
Force closing the producer ...");
+            }
 
-        LOG.info("Closing the Kafka consumer");
-        IOHelper.close(producer, "Kafka consumer", LOG);
+            LOG.info("Closing the Kafka producer");
+            IOHelper.close(producer, "Kafka producer", LOG);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } finally {
+            lock.unlock();
+        }
 
-        executorService.shutdown();
+        try {
+            LOG.info("Closing the Kafka consumer");
+            consumer.wakeup();
+            executorService.shutdown();
+
+            if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) {
+                LOG.warn("Kafka consumer did not shutdown within 2 seconds");
+                executorService.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
     }
 
     @Override
@@ -381,6 +380,7 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
     @Override
     public void start() {
         LOG.info("Starting the kafka resume strategy");
+        createProducer();
     }
 
     public Duration getPollDuration() {
@@ -391,10 +391,6 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
         this.pollDuration = Objects.requireNonNull(pollDuration, "The poll 
duration cannot be null");
     }
 
-    protected Consumer<byte[], byte[]> getConsumer() {
-        return consumer;
-    }
-
     protected Producer<byte[], byte[]> getProducer() {
         return producer;
     }
@@ -410,22 +406,4 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
         return resumeStrategyConfiguration;
     }
 
-    /**
-     * Launch a thread to refresh the offsets periodically
-     */
-    private void refresh() {
-        LOG.trace("Creating a offset cache refresher");
-        try {
-            Properties prop = (Properties) 
getResumeStrategyConfiguration().getConsumerProperties().clone();
-            prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
-
-            try (Consumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(prop)) {
-                
consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
-
-                poll();
-            }
-        } catch (Exception e) {
-            LOG.error("Error while refreshing the local cache: {}", 
e.getMessage(), e);
-        }
-    }
 }

Reply via email to