This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9e832b0428cf7f0101e9be2cb6b9c19f4958b5fd Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Wed Aug 10 16:50:25 2022 +0200 CAMEL-18356: consolidated the Kafka resume strategies --- .../resume/kafka/MultiNodeKafkaResumeStrategy.java | 64 +--------------------- .../kafka/SingleNodeKafkaResumeStrategy.java | 60 ++++++++++++++++---- 2 files changed, 50 insertions(+), 74 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java index 4cfe256fb3d..82580200f03 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java @@ -40,9 +40,9 @@ import org.slf4j.LoggerFactory; * * @param <K> the type of key */ +@Deprecated public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNodeKafkaResumeStrategy<K> { private static final Logger LOG = LoggerFactory.getLogger(MultiNodeKafkaResumeStrategy.class); - private final ExecutorService executorService; /** * Create a new instance of this class @@ -64,68 +64,8 @@ public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNod public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration, ExecutorService executorService) { - super(resumeStrategyConfiguration); - - this.executorService = executorService; - } - - protected void poll() { - poll(getConsumer()); - } - - protected void poll(Consumer<byte[], byte[]> consumer) { - Deserializable deserializable = (Deserializable) getAdapter(); - - ConsumerRecords<byte[], byte[]> records; - do { - records = consume(10, consumer); - - if (records.isEmpty()) { - break; - } - - for (ConsumerRecord<byte[], byte[]> record : records) { - byte[] value = record.value(); - - LOG.trace("Read from Kafka: {}", value); - - deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value())); - } - } while (true); + super(resumeStrategyConfiguration, executorService); } - @Override - public void loadCache() throws Exception { - super.loadCache(); - executorService.submit(() -> refresh()); - } - - /** - * Launch a thread to refresh the offsets periodically - */ - private void refresh() { - LOG.trace("Creating a offset cache refresher"); - try { - Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone(); - prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - - try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) { - consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic())); - - poll(consumer); - } - } catch (Exception e) { - LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e); - } - } - - @Override - public void stop() { - try { - executorService.shutdown(); - } finally { - super.stop(); - } - } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java index 870185127de..db87f65074a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java @@ -23,9 +23,13 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Objects; +import java.util.Properties; import java.util.Queue; +import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.camel.RuntimeCamelException; import org.apache.camel.resume.Cacheable; @@ -37,6 +41,7 @@ import org.apache.camel.resume.ResumeAdapter; import org.apache.camel.resume.cache.ResumeCache; import org.apache.camel.util.IOHelper; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -50,9 +55,7 @@ import org.slf4j.LoggerFactory; /** * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node - * integrations. For multi-node integrations (i.e: using clusters with the master component check - * {@link MultiNodeKafkaResumeStrategy}. - * + * integrations. */ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy<T> { private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class); @@ -66,15 +69,29 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka private boolean subscribed; private ResumeAdapter adapter; private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration; + private final ExecutorService executorService; /** * Builds an instance of this class - * + * * @param resumeStrategyConfiguration the configuration to use for this strategy instance * */ public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) { this.resumeStrategyConfiguration = resumeStrategyConfiguration; + executorService = Executors.newSingleThreadExecutor(); + } + + /** + * Builds an instance of this class + * + * @param resumeStrategyConfiguration the configuration to use for this strategy instance + * + */ + public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration, + ExecutorService executorService) { + this.resumeStrategyConfiguration = resumeStrategyConfiguration; + this.executorService = executorService; } /** @@ -125,7 +142,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka /** * Loads the existing data into the cache - * + * * @throws Exception */ public void loadCache() throws Exception { @@ -138,9 +155,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka if (!(adapter instanceof Deserializable)) { throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable"); } - poll(); - unsubscribe(); + executorService.submit(() -> refresh()); } protected void poll() { @@ -160,7 +176,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka LOG.trace("Read from Kafka: {}", value); if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()))) { - break; + LOG.warn("Deserializar indicates that this is the last record to deserialize"); } } } while (true); @@ -168,7 +184,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka /** * Subscribe to the topic if not subscribed yet - * + * * @param topic the topic to consume the messages from */ protected void checkAndSubscribe(String topic) { @@ -181,7 +197,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka /** * Subscribe to the topic if not subscribed yet - * + * * @param topic the topic to consume the messages from * @param remaining the number of messages to rewind from the last offset position (used to fill the cache) */ @@ -196,7 +212,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka /** * Creates a new consumer rebalance listener. This can be useful for setting the exact Kafka offset when necessary * to read a limited amount of messages or customize the resume strategy behavior when a rebalance occurs. - * + * * @param remaining the number of remaining messages on the topic to try to collect * @return */ @@ -252,7 +268,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka /** * Consumes message from the topic previously setup - * + * * @param retries how many times to retry consuming data from the topic * @return An instance of the consumer records */ @@ -349,6 +365,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka LOG.info("Closing the Kafka consumer"); IOHelper.close(producer, "Kafka consumer", LOG); + + executorService.shutdown(); } @Override @@ -388,4 +406,22 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka return resumeStrategyConfiguration; } + /** + * Launch a thread to refresh the offsets periodically + */ + private void refresh() { + LOG.trace("Creating a offset cache refresher"); + try { + Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone(); + prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + + try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) { + consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic())); + + poll(); + } + } catch (Exception e) { + LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e); + } + } }
