[
https://issues.apache.org/jira/browse/CAMEL-17489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17476149#comment-17476149
]
Rafał Gała commented on CAMEL-17489:
------------------------------------
My solution will only make sure the unsubscribe is called before a consumer
instance is called so it will most probably not prevent the error from
occuring. I think that to prevent this error from occuring a boolean variable
should be created and set to true after a consumer has been closed. The
variable would then be checked inside *safeUnsubscribe* to prevent calls to
unsubscribe on an already closed consumer.
What I could also try is to introduce a check in *safeUnsubscribe* method that
would check a result of *consumer.subscription()* and call unsubscribe only in
a case where non empty set has been returned. I could also introduce a boolean
variable that I would set to true after first call to unsubscribe to prevent
multiple calls.
Let me know what you think and I'll make changes in the code :)
> camel-kafka - Unsubscribing fails due to already closed consumer
> ----------------------------------------------------------------
>
> Key: CAMEL-17489
> URL: https://issues.apache.org/jira/browse/CAMEL-17489
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.14.0
> Reporter: Rafał Gała
> Priority: Minor
> Fix For: 3.15.0
>
>
> In {*}KafkaFetchRecords{*}, when an exception occurs inside *startPolling*
> method, the consumer is closed in finally block:
> {code:java}
> finally {
> lock.unlock();
> // only close if not retry
> if (!isRetrying()) {
> LOG.debug("Closing consumer {}", threadId);
> IOHelper.close(consumer);
> }
> } {code}
> and then unsubscribing in *run* method fails with "This consumer has already
> been closed"
> {code:java}
> LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}",
> threadId, topicName);
> safeUnsubscribe();
> IOHelper.close(consumer); {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)