ableegoldman commented on code in PR #19400:
URL: https://github.com/apache/kafka/pull/19400#discussion_r2071546806


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -2013,6 +2024,12 @@ public Map<String, KafkaFuture<Uuid>> 
clientInstanceIds(final Duration timeout)
         return result;
     }
 
+    public void closeConsumer(final boolean leaveGroup) {
+        final GroupMembershipOperation operation = leaveGroup ? LEAVE_GROUP : 
REMAIN_IN_GROUP;
+        final CloseOptions closeOptions = 
CloseOptions.groupMembershipOperation(operation);
+        mainConsumer.close(closeOptions);

Review Comment:
   we should be able to get rid of this method so it doesn't matter, but just 
fyi this wouldn't be thread safe. For future reference, the way we invoke 
things on the StreamThread from outside (eg from `KafkaStreams`) is usually to 
just set a flag and then check it in the main loop which starts from the 
`runLoop()` method 🙂 



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1645,6 +1606,7 @@ private Consumer<StreamThread> 
streamThreadLeaveConsumerGroup(final long remaini
             if (groupInstanceId.isPresent()) {
                 log.debug("Sending leave group trigger to removing instance 
from consumer group: {}.",
                     groupInstanceId.get());
+                System.err.println(groupInstanceId.get());

Review Comment:
   leftover debugging code I'm guessing? 😄 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1824,12 +1834,13 @@ private void completeShutdown(final boolean cleanRun) {
             log.error("Failed to unsubscribe due to the following error: ", e);
         }
         try {
-            mainConsumer.close();
+            final GroupMembershipOperation membershipOperation = leaveGroup ? 
LEAVE_GROUP : REMAIN_IN_GROUP;
+            
mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation));
         } catch (final Throwable e) {
             log.error("Failed to close consumer due to the following error:", 
e);
         }
         try {
-            restoreConsumer.close();
+            
restoreConsumer.close(CloseOptions.groupMembershipOperation(REMAIN_IN_GROUP));

Review Comment:
   can you just add a comment here to explain why we use this for the restore 
consumer, eg
   ```
   // restore consumer isn't part of a group so we use REMAIN_IN_GROUP to skip 
any leaveGroup checks
   ```



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1645,6 +1606,7 @@ private Consumer<StreamThread> 
streamThreadLeaveConsumerGroup(final long remaini
             if (groupInstanceId.isPresent()) {
                 log.debug("Sending leave group trigger to removing instance 
from consumer group: {}.",
                     groupInstanceId.get());
+                System.err.println(groupInstanceId.get());

Review Comment:
   actually we should be able to get rid of this entire method now, right? the 
consumer#close should take care of leaving the group so we don't have to rely 
on the admin client anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to