lucasbru commented on code in PR #19219: URL: https://github.com/apache/kafka/pull/19219#discussion_r2028522022
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() { context.assertNoRebalanceTimeout(groupId, memberId); } + @Test + public void testStreamsGroupDynamicConfigs() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build()) + .build(); + + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))); + + // Session timer is scheduled on first heartbeat. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(10000) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + assertEquals(1, result.response().data().memberEpoch()); + assertEquals(Map.of("num.standby.replicas", "0"), assignor.lastPassedAssignmentConfigs()); + + // Verify heartbeat interval + assertEquals(5000, result.response().data().heartbeatIntervalMs()); Review Comment: It's from `GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT`. I referred that line instead. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() { context.assertNoRebalanceTimeout(groupId, memberId); } + @Test + public void testStreamsGroupDynamicConfigs() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build()) + .build(); + + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))); + + // Session timer is scheduled on first heartbeat. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(10000) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + assertEquals(1, result.response().data().memberEpoch()); Review Comment: Sure, this just shows that a new target assignment was calculated - if the config would be missing from the next line, one cause could be that the assignor was not called at all. So there is some overlap with the next line, but this check gives context and rules out possible errors if the next line ever fails. I would keep it, but let me know if you want me to remove it. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -17848,6 +17921,104 @@ public void testStreamsRebalanceTimeoutExpiration() { context.assertNoRebalanceTimeout(groupId, memberId1); } + @Test + public void testStreamsOnNewMetadataImage() { Review Comment: As written in the commit message: "Piggy-backed is another test that streams groups react to changes in the topic metadata." This is basically a copy of the corresponding test for consumer groups. It ensures that the metadata refresh deadline is set to expired for all groups which use in their topology a topic that is changed in the `MetadataDelta`. When the metadata deadline is expired, we run topology configuration again to check if all the required topics have the right number of partitions. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -15776,6 +15779,76 @@ public void testStreamsGroupMemberEpochValidation() { assertEquals(100, result.response().data().memberEpoch()); } + @Test + public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + // Create a context with one streams group containing two members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 2) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1))) + .build()) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId2) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 2))) + .build()) + .withTargetAssignment(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2), Review Comment: True. Simplified. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -15776,6 +15779,76 @@ public void testStreamsGroupMemberEpochValidation() { assertEquals(100, result.response().data().memberEpoch()); } + @Test + public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + String subtopology2 = "subtopology2"; + String barTopicName = "bar"; + Uuid barTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)), + new Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName)) + )); + + // Create a context with one streams group containing two members. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 2) + .withStreamsGroup(new StreamsGroupBuilder(groupId, 10) + .withMember(streamsGroupMemberBuilderWithDefaults(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, Review Comment: True. Simplified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org