This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit fe095bc545b6212ea3e8e72265e7c4df3b9c36a7 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Mon Aug 8 16:31:58 2022 +0200 CAMEL-18362: rework the consumer/producer - Ensure the consumer is mostly isolated in its own thread - Prevent closing the producer while an offset update is happening --- .../kafka/SingleNodeKafkaResumeStrategy.java | 190 +++++++++------------ 1 file changed, 84 insertions(+), 106 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java index b731e9b0c1c..a527b66c9d7 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java @@ -23,13 +23,13 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Objects; -import java.util.Properties; import java.util.Queue; -import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.RuntimeCamelException; import org.apache.camel.resume.Cacheable; @@ -41,7 +41,6 @@ import org.apache.camel.resume.ResumeAdapter; import org.apache.camel.resume.cache.ResumeCache; import org.apache.camel.util.IOHelper; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -50,6 +49,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,12 +70,12 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka private ResumeAdapter adapter; private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration; private final ExecutorService executorService; + private final ReentrantLock lock = new ReentrantLock(); /** * Builds an instance of this class * * @param resumeStrategyConfiguration the configuration to use for this strategy instance - * */ public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) { this.resumeStrategyConfiguration = resumeStrategyConfiguration; @@ -86,7 +86,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka * Builds an instance of this class * * @param resumeStrategyConfiguration the configuration to use for this strategy instance - * */ public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration, ExecutorService executorService) { @@ -98,11 +97,10 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka * Sends data to a topic. The records will always be sent asynchronously. If there's an error, a producer error * counter will be increased. * - * @see SingleNodeKafkaResumeStrategy#getProducerErrors() * @param message the message to send * @throws ExecutionException * @throws InterruptedException - * + * @see SingleNodeKafkaResumeStrategy#getProducerErrors() */ protected void produce(byte[] key, byte[] message) throws ExecutionException, InterruptedException { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(resumeStrategyConfiguration.getTopic(), key, message); @@ -125,8 +123,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka @Override public void updateLastOffset(T offset) throws Exception { - createProducer(); - OffsetKey<?> key = offset.getOffsetKey(); Offset<?> offsetValue = offset.getLastOffset(); @@ -137,39 +133,63 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka ByteBuffer keyBuffer = key.serialize(); ByteBuffer valueBuffer = offsetValue.serialize(); - produce(keyBuffer.array(), valueBuffer.array()); + try { + lock.lock(); + produce(keyBuffer.array(), valueBuffer.array()); + } finally { + lock.unlock(); + } doAdd(key, offsetValue); } /** * Loads the existing data into the cache - * - * @throws Exception */ - public void loadCache() throws Exception { - createConsumer(); - - subscribe(); - - LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic()); - + @Override + public void loadCache() { if (!(adapter instanceof Deserializable)) { throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable"); } - executorService.submit(() -> refresh()); + executorService.submit(this::refresh); + } + + /** + * Launch a thread to refresh the offsets periodically + */ + private void refresh() { + LOG.trace("Creating a offset cache refresher"); + + try { + consumer = createConsumer(); + + subscribe(consumer); + + LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic()); + consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic())); + + poll(consumer); + } catch (WakeupException e) { + LOG.info("Kafka consumer was interrupted during a blocking call"); + } catch (Exception e) { + LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e); + } finally { + if (consumer != null) { + consumer.unsubscribe(); + consumer.close(Duration.ofSeconds(5)); + } + } } - protected void poll() { + protected void poll(Consumer<byte[], byte[]> consumer) { Deserializable deserializable = (Deserializable) adapter; - ConsumerRecords<byte[], byte[]> records; do { - records = consume(); + ConsumerRecords<byte[], byte[]> records = consume(consumer); if (records.isEmpty()) { - break; + continue; } for (ConsumerRecord<byte[], byte[]> record : records) { @@ -191,7 +211,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka * * @param topic the topic to consume the messages from */ - protected void checkAndSubscribe(String topic) { + protected void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String topic) { if (!subscribed) { consumer.subscribe(Collections.singletonList(topic)); @@ -205,26 +225,18 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka * @param topic the topic to consume the messages from * @param remaining the number of messages to rewind from the last offset position (used to fill the cache) */ - public void checkAndSubscribe(String topic, long remaining) { + public void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String topic, long remaining) { if (!subscribed) { - consumer.subscribe(Collections.singletonList(topic), getConsumerRebalanceListener(remaining)); - + consumer.subscribe(Collections.singletonList(topic), getConsumerRebalanceListener(consumer, remaining)); subscribed = true; } } - /** - * Creates a new consumer rebalance listener. This can be useful for setting the exact Kafka offset when necessary - * to read a limited amount of messages or customize the resume strategy behavior when a rebalance occurs. - * - * @param remaining the number of remaining messages on the topic to try to collect - * @return - */ - protected ConsumerRebalanceListener getConsumerRebalanceListener(long remaining) { + private ConsumerRebalanceListener getConsumerRebalanceListener(Consumer<byte[], byte[]> consumer, long remaining) { return new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { - + // NO-OP } @Override @@ -244,45 +256,15 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka }; } - /** - * Unsubscribe from the topic - */ - protected void unsubscribe() { - try { - consumer.unsubscribe(); - } catch (IllegalStateException e) { - LOG.warn("The consumer is likely already closed. Skipping unsubscribing from {}", - resumeStrategyConfiguration.getTopic()); - } catch (Exception e) { - LOG.error("Error unsubscribing from the Kafka topic {}: {}", resumeStrategyConfiguration.getTopic(), e.getMessage(), - e); - } - } - /** * Consumes message from the topic previously setup * * @return An instance of the consumer records */ - protected ConsumerRecords<byte[], byte[]> consume() { - int retries = 10; - - return consume(retries); - } - - /** - * Consumes message from the topic previously setup - * - * @param retries how many times to retry consuming data from the topic - * @return An instance of the consumer records - */ - protected ConsumerRecords<byte[], byte[]> consume(int retries) { - while (retries > 0) { - ConsumerRecords<byte[], byte[]> records = consumer.poll(pollDuration); - if (!records.isEmpty()) { - return records; - } - retries--; + protected ConsumerRecords<byte[], byte[]> consume(Consumer<byte[], byte[]> consumer) { + ConsumerRecords<byte[], byte[]> records = consumer.poll(pollDuration); + if (!records.isEmpty()) { + return records; } return ConsumerRecords.empty(); @@ -307,17 +289,17 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka return ConsumerRecords.empty(); } - public void subscribe() throws Exception { + private void subscribe(Consumer<byte[], byte[]> consumer) { if (adapter instanceof Cacheable) { ResumeCache<?> cache = ((Cacheable) adapter).getCache(); if (cache.capacity() >= 1) { - checkAndSubscribe(resumeStrategyConfiguration.getTopic(), cache.capacity()); + checkAndSubscribe(consumer, resumeStrategyConfiguration.getTopic(), cache.capacity()); } else { - checkAndSubscribe(resumeStrategyConfiguration.getTopic()); + checkAndSubscribe(consumer, resumeStrategyConfiguration.getTopic()); } } else { - checkAndSubscribe(resumeStrategyConfiguration.getTopic()); + checkAndSubscribe(consumer, resumeStrategyConfiguration.getTopic()); } } @@ -356,21 +338,38 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka } } - private void createConsumer() { - if (consumer == null) { - consumer = new KafkaConsumer<>(resumeStrategyConfiguration.getConsumerProperties()); - } + private Consumer<byte[], byte[]> createConsumer() { + return new KafkaConsumer<>(resumeStrategyConfiguration.getConsumerProperties()); } @Override public void stop() { - LOG.info("Closing the Kafka producer"); - IOHelper.close(producer, "Kafka producer", LOG); + try { + LOG.trace("Trying to obtain a lock for closing the producer"); + if (!lock.tryLock(1, TimeUnit.SECONDS)) { + LOG.warn("Failed to obtain a lock for closing the producer. Force closing the producer ..."); + } - LOG.info("Closing the Kafka consumer"); - IOHelper.close(producer, "Kafka consumer", LOG); + LOG.info("Closing the Kafka producer"); + IOHelper.close(producer, "Kafka producer", LOG); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + lock.unlock(); + } - executorService.shutdown(); + try { + LOG.info("Closing the Kafka consumer"); + consumer.wakeup(); + executorService.shutdown(); + + if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) { + LOG.warn("Kafka consumer did not shutdown within 2 seconds"); + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } @Override @@ -381,6 +380,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka @Override public void start() { LOG.info("Starting the kafka resume strategy"); + createProducer(); } public Duration getPollDuration() { @@ -391,10 +391,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka this.pollDuration = Objects.requireNonNull(pollDuration, "The poll duration cannot be null"); } - protected Consumer<byte[], byte[]> getConsumer() { - return consumer; - } - protected Producer<byte[], byte[]> getProducer() { return producer; } @@ -410,22 +406,4 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka return resumeStrategyConfiguration; } - /** - * Launch a thread to refresh the offsets periodically - */ - private void refresh() { - LOG.trace("Creating a offset cache refresher"); - try { - Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone(); - prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - - try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) { - consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic())); - - poll(); - } - } catch (Exception e) { - LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e); - } - } }
