[ https://issues.apache.org/jira/browse/KAFKA-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932778#comment-17932778 ]
A. Sophie Blee-Goldman commented on KAFKA-18067: ------------------------------------------------ thanks for catching this. [~frankvicky] since we had to revert the initial fix, want to revisit this issue and retry? Seems like we should not piggyback on the existing #close method to set the "dont reinitialize" flag, since it's not only called during shutdown but also during the `#resetProducer` method. Basically we want a separate method that will "permanently" close the producer only for when the thread is shutting down, and set the flag in there. > Kafka Streams can leak Producer client under EOS > ------------------------------------------------ > > Key: KAFKA-18067 > URL: https://issues.apache.org/jira/browse/KAFKA-18067 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: A. Sophie Blee-Goldman > Assignee: TengYao Chi > Priority: Major > Labels: newbie, newbie++ > Fix For: 4.1.0 > > > Under certain conditions Kafka Streams can end up closing a producer client > twice and creating a new one that then is never closed. > During a StreamThread's shutdown, the TaskManager is closed first, through > which the thread's producer client is also closed. Later on we call > #unsubscribe on the main consumer, which can result in the #onPartitionsLost > callback being invoked and ultimately trying to reset/reinitialize the > StreamsProducer if EOS is enabled. This in turn includes closing the current > producer and creating a new one. And since the current producer was already > closed, we end up closing that client twice and never closing the newly > created producer. > Ideally we would just skip the reset/reinitialize process entirely when > invoked during shutdown. This solves the two problems here (leaked client and > double close), while also removing the unnecessary overhead of creating an > entirely new client just to throw it away -- This message was sent by Atlassian Jira (v8.20.10#820010)