jeffkbkim commented on code in PR #19359: URL: https://github.com/apache/kafka/pull/19359#discussion_r2027417821
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java: ########## @@ -1106,4 +1106,32 @@ public void testIsSubscribedToTopic() { assertTrue(streamsGroup.isSubscribedToTopic("test-topic2")); assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic")); } + + @Test + public void testShutdownRequestedMethods() { + String memberId1 = "test-member-id1"; + String memberId2 = "test-member-id2"; + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class); + StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group", metricsShard); + + streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1)); + streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2)); + + // Initially, shutdown should not be requested + assertFalse(streamsGroup.isShutdownRequested()); + + // Set shutdown requested + streamsGroup.maybeSetShutdownRequested(memberId1, true); + assertTrue(streamsGroup.isShutdownRequested()); + + // As long as group not empty, remain in shutdown requested state + streamsGroup.removeMember(memberId1); + assertTrue(streamsGroup.isShutdownRequested()); + + // As soon as the group is empty, clear the shutdown requested state + streamsGroup.removeMember(memberId2); + assertFalse(streamsGroup.isShutdownRequested()); + } Review Comment: nit: newline ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2070,8 +2067,6 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream String processId, Endpoint userEndpoint, List<KeyValue> clientTags, - List<TaskOffset> taskOffsets, - List<TaskOffset> taskEndOffsets, Review Comment: these are unrelated to this PR correct? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java: ########## @@ -1049,4 +1057,21 @@ public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup( return describedGroup; } + public void maybeSetShutdownRequested(final String memberId, final boolean shutdownApplication) { Review Comment: should we just have a setShutdownRequested method and do the if check on the calller's side? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java: ########## @@ -197,6 +197,13 @@ public static class DeadlineAndEpoch { */ private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY; + /** + * A flag to indicate whether a shutdown has been requested for this group. + * This has no direct effect inside the group coordinator, but is propagated to all members of the group. + * This is not persisted in the log. Review Comment: to understand this flag: - we only clear when a group goes empty, this is when we assume all members acked - since this is only in soft state, we have no guarantee that all members will eventually shutdown. it is a best effort nudge. is this correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org