This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5fedf91e806a394635e2d3837d45c0d849f2b0fe Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Tue Aug 9 16:58:49 2022 +0200 CAMEL-18362: wait for the initial cache loading --- .../kafka/KafkaResumeStrategyConfiguration.java | 23 ++++++++++++++ .../KafkaResumeStrategyConfigurationBuilder.java | 17 ++++++++++ .../kafka/SingleNodeKafkaResumeStrategy.java | 37 ++++++++++++++++------ 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java index 2e314079223..94aede6fadd 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java @@ -17,6 +17,7 @@ package org.apache.camel.processor.resume.kafka; +import java.time.Duration; import java.util.Properties; import org.apache.camel.resume.ResumeStrategyConfiguration; @@ -28,6 +29,8 @@ public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguratio private Properties producerProperties; private Properties consumerProperties; private String topic; + private Duration maxInitializationDuration; + private int maxInitializationRetries; public Properties getProducerProperties() { return producerProperties; @@ -58,4 +61,24 @@ public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguratio this.topic = topic; } + + public Duration getMaxInitializationDuration() { + return maxInitializationDuration; + } + + public void setMaxInitializationDuration(Duration maxInitializationDuration) { + this.maxInitializationDuration = maxInitializationDuration; + } + + public int getMaxInitializationRetries() { + return maxInitializationRetries; + } + + public void setMaxInitializationRetries(int maxInitializationRetries) { + if (maxInitializationRetries < 1) { + throw new IllegalArgumentException("The maximum number of initialization retries must be equal or bigger than 1"); + } + + this.maxInitializationRetries = maxInitializationRetries; + } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java index 150936148d0..3c4e6a21ca5 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java @@ -17,6 +17,7 @@ package org.apache.camel.processor.resume.kafka; +import java.time.Duration; import java.util.Properties; import java.util.UUID; @@ -43,6 +44,8 @@ public class KafkaResumeStrategyConfigurationBuilder private Properties producerProperties; private Properties consumerProperties; private String topic; + private Duration maxInitializationDuration = Duration.ofSeconds(10); + private int maxInitializationRetries = 5; private KafkaResumeStrategyConfigurationBuilder() { } @@ -103,6 +106,18 @@ public class KafkaResumeStrategyConfigurationBuilder return this; } + public KafkaResumeStrategyConfigurationBuilder withMaxInitializationDuration(Duration duration) { + this.maxInitializationDuration = duration; + + return this; + } + + public KafkaResumeStrategyConfigurationBuilder withMaxInitializationRetries(int retries) { + this.maxInitializationRetries = retries; + + return this; + } + /** * Creates a basic consumer * @@ -138,6 +153,8 @@ public class KafkaResumeStrategyConfigurationBuilder resumeStrategyConfiguration.setConsumerProperties(consumerProperties); resumeStrategyConfiguration.setProducerProperties(producerProperties); resumeStrategyConfiguration.setTopic(topic); + resumeStrategyConfiguration.setMaxInitializationDuration(maxInitializationDuration); + resumeStrategyConfiguration.setMaxInitializationRetries(maxInitializationRetries); return resumeStrategyConfiguration; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java index a527b66c9d7..9a60cce760f 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java @@ -20,11 +20,13 @@ package org.apache.camel.processor.resume.kafka; import java.io.IOException; import java.nio.ByteBuffer; import java.time.Duration; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -152,13 +154,24 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable"); } - executorService.submit(this::refresh); + CountDownLatch latch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries()); + executorService.submit(() -> refresh(latch)); + + try { + LOG.trace("Waiting for kafka resume strategy async initialization"); + if (!latch.await(resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(), TimeUnit.MILLISECONDS)) { + LOG.debug("The initialization timed out"); + } + LOG.trace("Kafka resume strategy initialization complete"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } /** * Launch a thread to refresh the offsets periodically */ - private void refresh() { + private void refresh(CountDownLatch latch) { LOG.trace("Creating a offset cache refresher"); try { @@ -169,7 +182,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic()); consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic())); - poll(consumer); + poll(consumer, latch); } catch (WakeupException e) { LOG.info("Kafka consumer was interrupted during a blocking call"); } catch (Exception e) { @@ -182,27 +195,33 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka } } - protected void poll(Consumer<byte[], byte[]> consumer) { + protected void poll(Consumer<byte[], byte[]> consumer, CountDownLatch latch) { Deserializable deserializable = (Deserializable) adapter; + boolean initialized = false; do { ConsumerRecords<byte[], byte[]> records = consume(consumer); - if (records.isEmpty()) { - continue; - } - for (ConsumerRecord<byte[], byte[]> record : records) { byte[] value = record.value(); if (LOG.isTraceEnabled()) { - LOG.trace("Read from Kafka: {}", value); + LOG.trace("Read from Kafka at {} ({}): {}", Instant.ofEpochMilli(record.timestamp()), + record.timestampType(), value); } if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()))) { LOG.warn("Deserializer indicates that this is the last record to deserialize"); } } + + if (!initialized) { + if (latch.getCount() == 1) { + initialized = true; + } + + latch.countDown(); + } } while (true); }
