lucasbru commented on code in PR #19219:
URL: https://github.com/apache/kafka/pull/19219#discussion_r2028522022


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testStreamsGroupDynamicConfigs() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result =
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(10000)
+                    .setTopology(topology)
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of()));
+        assertEquals(1, result.response().data().memberEpoch());
+        assertEquals(Map.of("num.standby.replicas", "0"), 
assignor.lastPassedAssignmentConfigs());
+
+        // Verify heartbeat interval
+        assertEquals(5000, result.response().data().heartbeatIntervalMs());

Review Comment:
   It's from 
`GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT`. I 
referred that line instead.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testStreamsGroupDynamicConfigs() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result =
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(10000)
+                    .setTopology(topology)
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of()));
+        assertEquals(1, result.response().data().memberEpoch());

Review Comment:
   Sure, this just shows that a new target assignment was calculated - if the 
config would be missing from the next line, one cause could be that the 
assignor was not called at all. So there is some overlap with the next line, 
but this check gives context and rules out possible errors if the next line 
ever fails. I would keep it, but let me know if you want me to remove it.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -17848,6 +17921,104 @@ public void testStreamsRebalanceTimeoutExpiration() {
         context.assertNoRebalanceTimeout(groupId, memberId1);
     }
 
+    @Test
+    public void testStreamsOnNewMetadataImage() {

Review Comment:
   As written in the commit message: "Piggy-backed is another test that streams 
groups react to changes in the topic metadata."
   
   This is basically a copy of the corresponding test for consumer groups. It 
ensures that the metadata refresh deadline is set to expired for all groups 
which use in their topology a topic that is changed in the `MetadataDelta`.
   
   When the metadata deadline is expired, we run topology configuration again 
to check if all the required topics have the right number of partitions.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15776,6 +15779,76 @@ public void testStreamsGroupMemberEpochValidation() {
         assertEquals(100, result.response().data().memberEpoch());
     }
 
+    @Test
+    public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String subtopology2 = "subtopology2";
+        String barTopicName = "bar";
+        Uuid barTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+            new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+        ));
+
+        // Create a context with one streams group containing two members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build())
+            .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 
2)
+            .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2),
+                        TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1)))
+                    .build())
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5),
+                        TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
+                    .build())
+                .withTargetAssignment(memberId1, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2),

Review Comment:
   True. Simplified.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15776,6 +15779,76 @@ public void testStreamsGroupMemberEpochValidation() {
         assertEquals(100, result.response().data().memberEpoch());
     }
 
+    @Test
+    public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String subtopology2 = "subtopology2";
+        String barTopicName = "bar";
+        Uuid barTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+            new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+        ));
+
+        // Create a context with one streams group containing two members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build())
+            .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 
2)
+            .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,

Review Comment:
   True. Simplified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to