frankvicky commented on code in PR #19400:
URL: https://github.com/apache/kafka/pull/19400#discussion_r2062833316


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1640,33 +1599,6 @@ public synchronized boolean close(final CloseOptions 
options) throws IllegalArgu
         return close(Optional.of(timeoutMs), options.leaveGroup);
     }
 
-    private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long 
remainingTimeMs) {

Review Comment:
   Hi @ableegoldman 
   Thanks for the insight!
   
   I have dug into the code and found that the reason might be the calling of 
`consumer#close` twice.
   During the close process, we will first enter 
`StreamThread#completeShutdown`[0] with `leaveGroup=false`, which will invoke 
close for the first time. This is the key point: after the first invoke, the 
close flag of the consumer has been set to true.[1][2]
   The second close invoke is `KafkaStreams#shutdownHelper`.[3] At this time, 
we will pass `leaveGroup=true` to the method.[4] Unfortunately, the close flag 
of the consumer is set to true[5], so the consumer will not execute the leave 
group operation as expected. 
   
   [0] 
   
https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865
   
   [1]
   
https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L1129
   
   [2]
   
https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L1135
   
   [3]
   
https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1536
   
   [4] Note that here, we want to call `consumer#close(CloseOptions)` instead 
of `processStreamThread(streamThreadLeaveConsumerGroup(timeoutMs));`, so it 
will be as follows.
   ```java
                   processStreamThread(t -> t.closeConsumer(true));
   ```
   
https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1462
   
   [5]
   
https://github.com/frankvicky/kafka/blob/ffb74cb5a4673696c4b2517d898e4e06e8223b73/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L1129



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to