[
https://issues.apache.org/jira/browse/CAMEL-17489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17476075#comment-17476075
]
Rafał Gała commented on CAMEL-17489:
------------------------------------
There you go
{noformat}
2022-01-14 11:59:27.612 [Camel (camel-1) thread #778 -
KafkaConsumer[***masked***]] WARN
org.apache.camel.component.kafka.KafkaConsumer.log:214 - Error unsubscribing
***masked***-Thread 0 from kafka topic ***masked***. Caused by:
[java.lang.IllegalStateException - This consumer has already been closed.]
java.lang.IllegalStateException: This consumer has already been closed.
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2437)
at
org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1062)
at
org.apache.camel.component.kafka.KafkaFetchRecords.safeUnsubscribe(KafkaFetchRecords.java:239)
at
org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:105)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834){noformat}
> camel-kafka - Unsubscribing fails due to already closed consumer
> ----------------------------------------------------------------
>
> Key: CAMEL-17489
> URL: https://issues.apache.org/jira/browse/CAMEL-17489
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.14.0
> Reporter: Rafał Gała
> Priority: Minor
>
> In {*}KafkaFetchRecords{*}, when an exception occurs inside *startPolling*
> method, the consumer is closed in finally block:
> {code:java}
> finally {
> lock.unlock();
> // only close if not retry
> if (!isRetrying()) {
> LOG.debug("Closing consumer {}", threadId);
> IOHelper.close(consumer);
> }
> } {code}
> and then unsibscribing in *run* method fails with "Consumer already closed
> error"
>
> {code:java}
> LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}",
> threadId, topicName);
> safeUnsubscribe();
> IOHelper.close(consumer); {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)