jeffkbkim commented on code in PR #19359:
URL: https://github.com/apache/kafka/pull/19359#discussion_r2027417821


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -1106,4 +1106,32 @@ public void testIsSubscribedToTopic() {
         assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
         assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
     }
+
+    @Test
+    public void testShutdownRequestedMethods() {
+        String memberId1 = "test-member-id1";
+        String memberId2 = "test-member-id2";
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+        StreamsGroup streamsGroup = new StreamsGroup(logContext, 
snapshotRegistry, "test-group", metricsShard);
+
+        
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1));
+        
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2));
+
+        // Initially, shutdown should not be requested
+        assertFalse(streamsGroup.isShutdownRequested());
+
+        // Set shutdown requested
+        streamsGroup.maybeSetShutdownRequested(memberId1, true);
+        assertTrue(streamsGroup.isShutdownRequested());
+
+        // As long as group not empty, remain in shutdown requested state
+        streamsGroup.removeMember(memberId1);
+        assertTrue(streamsGroup.isShutdownRequested());
+
+        // As soon as the group is empty, clear the shutdown requested state
+        streamsGroup.removeMember(memberId2);
+        assertFalse(streamsGroup.isShutdownRequested());
+    }

Review Comment:
   nit: newline



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2070,8 +2067,6 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         String processId,
         Endpoint userEndpoint,
         List<KeyValue> clientTags,
-        List<TaskOffset> taskOffsets,
-        List<TaskOffset> taskEndOffsets,

Review Comment:
   these are unrelated to this PR correct?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1049,4 +1057,21 @@ public StreamsGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         return describedGroup;
     }
 
+    public void maybeSetShutdownRequested(final String memberId, final boolean 
shutdownApplication) {

Review Comment:
   should we just have a setShutdownRequested method and do the if check on the 
calller's side?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -197,6 +197,13 @@ public static class DeadlineAndEpoch {
      */
     private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
 
+    /**
+     * A flag to indicate whether a shutdown has been requested for this group.
+     * This has no direct effect inside the group coordinator, but is 
propagated to all members of the group.
+     * This is not persisted in the log.

Review Comment:
   to understand this flag:
   - we only clear when a group goes empty, this is when we assume all members 
acked
   - since this is only in soft state, we have no guarantee that all members 
will eventually shutdown. it is a best effort nudge.
   
   is this correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to