lucasbru commented on code in PR #19359: URL: https://github.com/apache/kafka/pull/19359#discussion_r2028446403
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16233,6 +16233,97 @@ public void testStreamsGroupMemberJoiningWithStaleTopology() { assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testStreamsGroupMemberRequestingShutdownApplication() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId1) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))) + .build()) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId2) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5))) + .build()) + .withTargetAssignment(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))) + .withTargetAssignment(memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5))) + .withTargetAssignmentEpoch(10) + .withTopology(StreamsTopology.fromHeartbeatRequest(topology)) + .withPartitionMetadata(Map.of( + fooTopicName, new org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, fooTopicName, 6) + )) + ) + .build(); + + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result1 = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(10) + .setShutdownApplication(true) + ); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setStatus(List.of( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(Status.SHUTDOWN_APPLICATION.code()) + .setStatusDetail("A KafkaStreams instance encountered a fatal error and requested a shutdown for the entire application.") + )), + result1.response().data() + ); + assertRecordsEquals(List.of(), result1.records()); + + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result2 = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setShutdownApplication(true) Review Comment: Good catch, that was a copy/paste error. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java: ########## @@ -197,6 +197,13 @@ public static class DeadlineAndEpoch { */ private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY; + /** + * A flag to indicate whether a shutdown has been requested for this group. + * This has no direct effect inside the group coordinator, but is propagated to all members of the group. + * This is not persisted in the log. Review Comment: I extended the comment a bit. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java: ########## @@ -1049,4 +1057,21 @@ public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup( return describedGroup; } + public void maybeSetShutdownRequested(final String memberId, final boolean shutdownApplication) { Review Comment: Good idea. Done ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java: ########## @@ -1106,4 +1106,32 @@ public void testIsSubscribedToTopic() { assertTrue(streamsGroup.isSubscribedToTopic("test-topic2")); assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic")); } + + @Test + public void testShutdownRequestedMethods() { + String memberId1 = "test-member-id1"; + String memberId2 = "test-member-id2"; + LogContext logContext = new LogContext(); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class); + StreamsGroup streamsGroup = new StreamsGroup(logContext, snapshotRegistry, "test-group", metricsShard); + + streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1)); + streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2)); + + // Initially, shutdown should not be requested + assertFalse(streamsGroup.isShutdownRequested()); + + // Set shutdown requested + streamsGroup.maybeSetShutdownRequested(memberId1, true); + assertTrue(streamsGroup.isShutdownRequested()); + + // As long as group not empty, remain in shutdown requested state + streamsGroup.removeMember(memberId1); + assertTrue(streamsGroup.isShutdownRequested()); + + // As soon as the group is empty, clear the shutdown requested state + streamsGroup.removeMember(memberId2); + assertFalse(streamsGroup.isShutdownRequested()); + } Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org