ableegoldman commented on code in PR #19400: URL: https://github.com/apache/kafka/pull/19400#discussion_r2071546806
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -2013,6 +2024,12 @@ public Map<String, KafkaFuture<Uuid>> clientInstanceIds(final Duration timeout) return result; } + public void closeConsumer(final boolean leaveGroup) { + final GroupMembershipOperation operation = leaveGroup ? LEAVE_GROUP : REMAIN_IN_GROUP; + final CloseOptions closeOptions = CloseOptions.groupMembershipOperation(operation); + mainConsumer.close(closeOptions); Review Comment: we should be able to get rid of this method so it doesn't matter, but just fyi this wouldn't be thread safe. For future reference, the way we invoke things on the StreamThread from outside (eg from `KafkaStreams`) is usually to just set a flag and then check it in the main loop which starts from the `runLoop()` method 🙂 ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1645,6 +1606,7 @@ private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remaini if (groupInstanceId.isPresent()) { log.debug("Sending leave group trigger to removing instance from consumer group: {}.", groupInstanceId.get()); + System.err.println(groupInstanceId.get()); Review Comment: leftover debugging code I'm guessing? 😄 ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1824,12 +1834,13 @@ private void completeShutdown(final boolean cleanRun) { log.error("Failed to unsubscribe due to the following error: ", e); } try { - mainConsumer.close(); + final GroupMembershipOperation membershipOperation = leaveGroup ? LEAVE_GROUP : REMAIN_IN_GROUP; + mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation)); } catch (final Throwable e) { log.error("Failed to close consumer due to the following error:", e); } try { - restoreConsumer.close(); + restoreConsumer.close(CloseOptions.groupMembershipOperation(REMAIN_IN_GROUP)); Review Comment: can you just add a comment here to explain why we use this for the restore consumer, eg ``` // restore consumer isn't part of a group so we use REMAIN_IN_GROUP to skip any leaveGroup checks ``` ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1645,6 +1606,7 @@ private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remaini if (groupInstanceId.isPresent()) { log.debug("Sending leave group trigger to removing instance from consumer group: {}.", groupInstanceId.get()); + System.err.println(groupInstanceId.get()); Review Comment: actually we should be able to get rid of this entire method now, right? the consumer#close should take care of leaving the group so we don't have to rely on the admin client anymore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org