lucasbru commented on code in PR #19359:
URL: https://github.com/apache/kafka/pull/19359#discussion_r2028446403


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16233,6 +16233,97 @@ public void 
testStreamsGroupMemberJoiningWithStaleTopology() {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testStreamsGroupMemberRequestingShutdownApplication() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
+                    
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+                    .build())
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId2)
+                    
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
+                    .build())
+                .withTargetAssignment(memberId1, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+                .withTargetAssignment(memberId2, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
+                .withTargetAssignmentEpoch(10)
+                .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+                .withPartitionMetadata(Map.of(
+                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6)
+                ))
+            )
+            .build();
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result1 = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setMemberEpoch(10)
+                .setShutdownApplication(true)
+        );
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setStatus(List.of(
+                    new StreamsGroupHeartbeatResponseData.Status()
+                        .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
+                        .setStatusDetail("A KafkaStreams instance encountered 
a fatal error and requested a shutdown for the entire application.")
+                )),
+            result1.response().data()
+        );
+        assertRecordsEquals(List.of(), result1.records());
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result2 = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setShutdownApplication(true)

Review Comment:
   Good catch, that was a copy/paste error. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -197,6 +197,13 @@ public static class DeadlineAndEpoch {
      */
     private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
 
+    /**
+     * A flag to indicate whether a shutdown has been requested for this group.
+     * This has no direct effect inside the group coordinator, but is 
propagated to all members of the group.
+     * This is not persisted in the log.

Review Comment:
   I extended the comment a bit.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1049,4 +1057,21 @@ public StreamsGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         return describedGroup;
     }
 
+    public void maybeSetShutdownRequested(final String memberId, final boolean 
shutdownApplication) {

Review Comment:
   Good idea. Done



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##########
@@ -1106,4 +1106,32 @@ public void testIsSubscribedToTopic() {
         assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
         assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
     }
+
+    @Test
+    public void testShutdownRequestedMethods() {
+        String memberId1 = "test-member-id1";
+        String memberId2 = "test-member-id2";
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+        StreamsGroup streamsGroup = new StreamsGroup(logContext, 
snapshotRegistry, "test-group", metricsShard);
+
+        
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1));
+        
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2));
+
+        // Initially, shutdown should not be requested
+        assertFalse(streamsGroup.isShutdownRequested());
+
+        // Set shutdown requested
+        streamsGroup.maybeSetShutdownRequested(memberId1, true);
+        assertTrue(streamsGroup.isShutdownRequested());
+
+        // As long as group not empty, remain in shutdown requested state
+        streamsGroup.removeMember(memberId1);
+        assertTrue(streamsGroup.isShutdownRequested());
+
+        // As soon as the group is empty, clear the shutdown requested state
+        streamsGroup.removeMember(memberId2);
+        assertFalse(streamsGroup.isShutdownRequested());
+    }

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to