frankvicky commented on code in PR #19400: URL: https://github.com/apache/kafka/pull/19400#discussion_r2062833316
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1640,33 +1599,6 @@ public synchronized boolean close(final CloseOptions options) throws IllegalArgu return close(Optional.of(timeoutMs), options.leaveGroup); } - private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remainingTimeMs) { Review Comment: Hi @ableegoldman Thanks for the insight! I have dug into the code and found that the reason might be the calling of `consumer#close` twice. During the close process, we will first enter `StreamThread#completeShutdown`[0] with `leaveGroup=false`, which will invoke close for the first time. This is the key point: after the first invoke, the close flag of the consumer has been set to true.[1][2] The second close invoke is `KafkaStreams#shutdownHelper`.[3] At this time, we will pass `leaveGroup=true` to the method.[4] Unfortunately, the close flag of the consumer is set to true[5], so the consumer will not execute the leave group operation as expected. [0] https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865 [1] https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L1129 [2] https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L1135 [3] https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1536 [4] Note that here, we want to call `consumer#close(CloseOptions)` instead of `processStreamThread(streamThreadLeaveConsumerGroup(timeoutMs));`, so it will be as follows. ```java processStreamThread(t -> t.closeConsumer(true)); ``` https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1462 [5] https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L1129 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org