frankvicky commented on code in PR #19400:
URL: https://github.com/apache/kafka/pull/19400#discussion_r2062833316
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1640,33 +1599,6 @@ public synchronized boolean close(final CloseOptions
options) throws IllegalArgu
return close(Optional.of(timeoutMs), options.leaveGroup);
}
- private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long
remainingTimeMs) {
Review Comment:
Hi @ableegoldman
Thanks for the insight!
I have dug into the code and found that the reason might be the calling of
`consumer#close` twice.
During the close process, we will first enter
`StreamThread#completeShutdown`[0] with `leaveGroup=false`, which will invoke
close for the first time. This is the key point: after the first invoke, the
close flag of the consumer has been set to true.[1][2]
The second close invoke is `KafkaStreams#shutdownHelper`.[3] At this time,
we will pass `leaveGroup=true` to the method.[4] Unfortunately, the close flag
of the consumer is set to true[5], so the consumer will not execute the leave
group operation as expected.
[0]
https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865
[1]
https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L1129
[2]
https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L1135
[3]
https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1536
[4] Note that here, we want to call `consumer#close(CloseOptions)` instead
of `processStreamThread(streamThreadLeaveConsumerGroup(timeoutMs));`, so it
will be as follows.
```java
processStreamThread(t -> t.closeConsumer(true));
```
https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1462
[5]
https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L1129-L1133
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]