[ 
https://issues.apache.org/jira/browse/CAMEL-17489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17476075#comment-17476075
 ] 

Rafał Gała commented on CAMEL-17489:
------------------------------------

There you go

 
{noformat}
2022-01-14 11:59:27.612 [Camel (camel-1) thread #778 - 
KafkaConsumer[***masked***]] WARN  
org.apache.camel.component.kafka.KafkaConsumer.log:214 - Error unsubscribing 
***masked***-Thread 0 from kafka topic ***masked***. Caused by: 
[java.lang.IllegalStateException - This consumer has already been closed.]
java.lang.IllegalStateException: This consumer has already been closed.
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2437)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1062)
    at 
org.apache.camel.component.kafka.KafkaFetchRecords.safeUnsubscribe(KafkaFetchRecords.java:239)
    at 
org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:105)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834){noformat}

> camel-kafka - Unsubscribing fails due to already closed consumer
> ----------------------------------------------------------------
>
>                 Key: CAMEL-17489
>                 URL: https://issues.apache.org/jira/browse/CAMEL-17489
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.14.0
>            Reporter: Rafał Gała
>            Priority: Minor
>
> In {*}KafkaFetchRecords{*}, when an exception occurs inside *startPolling* 
> method, the consumer is closed in finally block:
> {code:java}
> finally {
>     lock.unlock();
>     // only close if not retry
>     if (!isRetrying()) {
>         LOG.debug("Closing consumer {}", threadId);
>         IOHelper.close(consumer);
>     }
> } {code}
>  and then unsibscribing in *run* method fails with "Consumer already closed 
> error"
>  
> {code:java}
> LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", 
> threadId, topicName);
> safeUnsubscribe();
> IOHelper.close(consumer); {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to