ableegoldman commented on code in PR #19400:
URL: https://github.com/apache/kafka/pull/19400#discussion_r2076630475


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1640,33 +1599,6 @@ public synchronized boolean close(final CloseOptions 
options) throws IllegalArgu
         return close(Optional.of(timeoutMs), options.leaveGroup);
     }
 
-    private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long 
remainingTimeMs) {

Review Comment:
   yeah, so we should basically just leave everything up to the StreamThread's 
`completeShutdown` method. It's true that right now the shutdown helper calls 
this `#streamThreadLeaveConsumerGroup` after the thread has shut down which 
makes it seem like we want the shutdown helper to be responsible for managing 
the group membership after the thread has died, but that's really just an 
artifact of the lack of a `consumer#close(CloseOptions)` before now. We had to 
wait for the thread to shut down and close the consumer, and then we could use 
the admin client to explicitly remove it from the group. But that's really not 
ideal for a number of reasons (a big one being that it only works for static 
members) and was essentially just a workaround since the consumer couldn't do 
this itself.
   
   Now that we do have consumers which can handle their own group membership 
via `#close`, we don't need this workaround and can leave everything up to the 
consumer which the StreamThread owns. In short, instead of refactoring the way 
this `#streamThreadLeaveConsumerGroup` method works, we should just remove this 
call entirely, and leave it up to the StreamThread to properly close the 
consumer with the right leavegroup option in its `#completeShutdown`
   
   Lmk if that makes sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to