lucasbru commented on code in PR #19219:
URL: https://github.com/apache/kafka/pull/19219#discussion_r2020717486


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15294,221 +15314,2860 @@ public void testShareGroupStates() {
     }
 
     @Test
-    public void testConsumerGroupDynamicConfigs() {
-        String groupId = "fooup";
-        // Use a static member id as it makes the test easier.
+    public void testStreamsHeartbeatRequestValidation() {
         String memberId = Uuid.randomUuid().toString();
-
-        Uuid fooTopicId = Uuid.randomUuid();
-        String fooTopicName = "foo";
-
-        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .addRacks()
-                .build())
             .build();
+        Exception ex;
 
-        assignor.prepareGroupAssignment(new GroupAssignment(
-            Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
-                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
-            )))
-        ));
+        // MemberId must be present in all requests.
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()));
+        assertEquals("MemberId can't be empty.", ex.getMessage());
 
-        // Session timer is scheduled on first heartbeat.
-        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result =
-            context.consumerGroupHeartbeat(
-                new ConsumerGroupHeartbeatRequestData()
-                    .setGroupId(groupId)
-                    .setMemberId(memberId)
-                    .setMemberEpoch(0)
-                    .setRebalanceTimeoutMs(90000)
-                    .setSubscribedTopicNames(List.of("foo"))
-                    .setTopicPartitions(List.of()));
-        assertEquals(1, result.response().memberEpoch());
+        // MemberId can't be all whitespaces.
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setMemberId("   ")));
+        assertEquals("MemberId can't be empty.", ex.getMessage());
 
-        // Verify heartbeat interval
-        assertEquals(5000, result.response().heartbeatIntervalMs());
+        // GroupId must be present in all requests.
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setMemberId(memberId)));
+        assertEquals("GroupId can't be empty.", ex.getMessage());
 
-        // Verify that there is a session time.
-        context.assertSessionTimeout(groupId, memberId, 45000);
+        // GroupId can't be all whitespaces.
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setMemberId(memberId)
+                .setGroupId("   ")));
+        assertEquals("GroupId can't be empty.", ex.getMessage());
 
-        // Advance time.
-        assertEquals(
-            List.of(),
-            context.sleep(result.response().heartbeatIntervalMs())
-        );
+        // RebalanceTimeoutMs must be present in the first request (epoch == 
0).
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setMemberId(memberId)
+                .setGroupId("foo")
+                .setMemberEpoch(0)));
+        assertEquals("RebalanceTimeoutMs must be provided in first request.", 
ex.getMessage());
 
-        // Dynamic update group config
-        Properties newGroupConfig = new Properties();
-        newGroupConfig.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 50000);
-        newGroupConfig.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
-        context.updateGroupConfig(groupId, newGroupConfig);
+        // ActiveTasks must be present and empty in the first request (epoch 
== 0).
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setMemberId(memberId)
+                .setGroupId("foo")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)));
+        assertEquals("ActiveTasks must be empty when (re-)joining.", 
ex.getMessage());
 
-        // Session timer is rescheduled on second heartbeat.
-        result = context.consumerGroupHeartbeat(
-            new ConsumerGroupHeartbeatRequestData()
-                .setGroupId(groupId)
+        // StandbyTasks must be present and empty in the first request (epoch 
== 0).
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
                 .setMemberId(memberId)
-                .setMemberEpoch(result.response().memberEpoch()));
-        assertEquals(1, result.response().memberEpoch());
+                .setGroupId("foo")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setActiveTasks(List.of())));
+        assertEquals("StandbyTasks must be empty when (re-)joining.", 
ex.getMessage());
 
-        // Verify heartbeat interval
-        assertEquals(10000, result.response().heartbeatIntervalMs());
+        // WarmupTasks must be present and empty in the first request (epoch 
== 0).
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setMemberId(memberId)
+                .setGroupId("foo")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())));
+        assertEquals("WarmupTasks must be empty when (re-)joining.", 
ex.getMessage());
+
+        // Topology must be present in the first request (epoch == 0).
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setMemberId(memberId)
+                .setGroupId("foo")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of())));
+        assertEquals("Topology must be non-null when (re-)joining.", 
ex.getMessage());
 
-        // Verify that there is a session time.
-        context.assertSessionTimeout(groupId, memberId, 50000);
+        // InstanceId must be non-empty if provided in all requests.
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setInstanceId("")));
+        assertEquals("InstanceId can't be empty.", ex.getMessage());
 
-        // Advance time.
-        assertEquals(
-            List.of(),
-            context.sleep(result.response().heartbeatIntervalMs())
-        );
+        // RackId must be non-empty if provided in all requests.
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setRackId("")));
+        assertEquals("RackId can't be empty.", ex.getMessage());
 
-        // Session timer is cancelled on leave.
-        result = context.consumerGroupHeartbeat(
-            new ConsumerGroupHeartbeatRequestData()
-                .setGroupId(groupId)
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId("foo")
                 .setMemberId(memberId)
-                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
-        assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
result.response().memberEpoch());
+                .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(new StreamsGroupHeartbeatRequestData.Topology())
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of())));
+        assertEquals("InstanceId can't be null.", ex.getMessage());
 
-        // Verify that there are no timers.
-        context.assertNoSessionTimeout(groupId, memberId);
-        context.assertNoRebalanceTimeout(groupId, memberId);
+        // Member epoch cannot be < -2
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setMemberId(memberId)
+                .setGroupId("foo")
+                .setMemberEpoch(-3)
+                .setRebalanceTimeoutMs(1500)
+        ));
+        assertEquals("MemberEpoch is -3, but must be greater than or equal to 
-2.", ex.getMessage());
+
+        // Topology must not be present in the later requests (epoch != 0).
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setMemberId(memberId)
+                .setGroupId("foo")
+                .setMemberEpoch(1)
+                .setRebalanceTimeoutMs(1500)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of())
+                .setTopology(new StreamsGroupHeartbeatRequestData.Topology())
+        ));
+        assertEquals("Topology can only be provided when (re-)joining.", 
ex.getMessage());
+
+        // Topology must not contain changelog topics with fixed partition 
numbers
+        StreamsInvalidTopologyException topoEx = 
assertThrows(StreamsInvalidTopologyException.class, () -> 
context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setMemberId(memberId)
+                .setGroupId("foo")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of())
+                .setTopology(new 
StreamsGroupHeartbeatRequestData.Topology().setSubtopologies(
+                    List.of(
+                        new StreamsGroupHeartbeatRequestData.Subtopology()
+                            .setStateChangelogTopics(
+                                List.of(
+                                    new 
StreamsGroupHeartbeatRequestData.TopicInfo()
+                                        
.setName("changelog_topic_with_fixed_partition")
+                                        .setPartitions(3)
+                                )
+                            )
+                    )
+                ))
+        ));
+        assertEquals("Changelog topic changelog_topic_with_fixed_partition 
must have an undefined partition count, but it is set to 3.",
+            topoEx.getMessage());
     }
 
     @Test
-    public void testShareGroupDynamicConfigs() {
+    public void testUnknownStreamsGroupId() {
         String groupId = "fooup";
-        // Use a static member id as it makes the test easier.
         String memberId = Uuid.randomUuid().toString();
 
-        Uuid fooTopicId = Uuid.randomUuid();
-        String fooTopicName = "foo";
-
-        MockPartitionAssignor assignor = new MockPartitionAssignor("simple");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-            .withShareGroupAssignor(assignor)
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .addRacks()
-                .build())
             .build();
 
-        assignor.prepareGroupAssignment(new GroupAssignment(
-            Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
-                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
-            )))
-        ));
-
-        // Session timer is scheduled on first heartbeat.
-        CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> 
result =
-            context.shareGroupHeartbeat(
-                new ShareGroupHeartbeatRequestData()
+        GroupIdNotFoundException e = 
assertThrows(GroupIdNotFoundException.class, () ->
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
                     .setGroupId(groupId)
                     .setMemberId(memberId)
-                    .setMemberEpoch(0)
-                    .setSubscribedTopicNames(List.of("foo")));
-        assertEquals(1, result.response().memberEpoch());
-
-        // Verify heartbeat interval
-        assertEquals(5000, result.response().heartbeatIntervalMs());
-
-        // Verify that there is a session time.
-        context.assertSessionTimeout(groupId, memberId, 45000);
-
-        // Advance time.
-        assertEquals(
-            List.of(),
-            context.sleep(result.response().heartbeatIntervalMs())
-        );
-
-        // Dynamic update group config
-        Properties newGroupConfig = new Properties();
-        newGroupConfig.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, 50000);
-        newGroupConfig.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
-        context.updateGroupConfig(groupId, newGroupConfig);
-
-        // Session timer is rescheduled on second heartbeat.
-        result = context.shareGroupHeartbeat(
-            new ShareGroupHeartbeatRequestData()
-                .setGroupId(groupId)
-                .setMemberId(memberId)
-                .setMemberEpoch(result.response().memberEpoch()));
-        assertEquals(1, result.response().memberEpoch());
+                    .setMemberEpoch(100) // Epoch must be > 0.
+                    .setRebalanceTimeoutMs(1500)
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of())));
+        assertEquals("Streams group fooup not found.", e.getMessage());
+    }
 
-        // Verify heartbeat interval
-        assertEquals(10000, result.response().heartbeatIntervalMs());
+    @Test
+    public void testUnknownMemberIdJoinsStreamsGroup() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        Topology topology = new Topology();
 
-        // Verify that there is a session time.
-        context.assertSessionTimeout(groupId, memberId, 50000);
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .build();
 
-        // Advance time.
-        assertEquals(
-            List.of(),
-            context.sleep(result.response().heartbeatIntervalMs())
-        );
+        assignor.prepareGroupAssignment(Map.of(memberId, TasksTuple.EMPTY));
 
-        // Session timer is cancelled on leave.
-        result = context.shareGroupHeartbeat(
-            new ShareGroupHeartbeatRequestData()
+        // A first member joins to create the group.
+        context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
                 .setGroupId(groupId)
                 .setMemberId(memberId)
-                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
-        assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
result.response().memberEpoch());
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
 
-        // Verify that there are no timers.
-        context.assertNoSessionTimeout(groupId, memberId);
-        context.assertNoRebalanceTimeout(groupId, memberId);
+        // The second member is rejected because the member id is unknown and
+        // the member epoch is not zero.
+        final String memberId2 = Uuid.randomUuid().toString();
+        UnknownMemberIdException e = 
assertThrows(UnknownMemberIdException.class, () ->
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId2)
+                    .setMemberEpoch(1)
+                    .setRebalanceTimeoutMs(1500)
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of())));
+        assertEquals(String.format("Member %s is not a member of group %s.", 
memberId2, groupId), e.getMessage());
     }
 
     @Test
-    public void testReplayConsumerGroupMemberMetadata() {
+    public void testStreamsGroupMemberEpochValidation() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
             .build();
+        assignor.prepareGroupAssignment(Map.of(memberId, TasksTuple.EMPTY));
 
-        ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
-            .setClientId("clientid")
-            .setClientHost("clienthost")
-            .setServerAssignorName("range")
-            .setRackId("rackid")
-            .setSubscribedTopicNames(List.of("foo"))
+        StreamsGroupMember member = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, 
TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3)))
             .build();
 
-        // The group and the member are created if they do not exist.
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord("foo",
 member));
-        assertEquals(member, 
context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("member",
 false));
-    }
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 member));
 
-    @Test
-    public void testReplayConsumerGroupMemberMetadataTombstone() {
-        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-            .build();
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
 100));
 
-        // The group still exists but the member is already gone. Replaying the
-        // ConsumerGroupMemberMetadata tombstone should be a no-op.
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 
10));
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
 "m1"));
-        assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", 
false));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
 topology));
 
-        // The group may not exist at all. Replaying the 
ConsumerGroupMemberMetadata tombstone
-        // should a no-op.
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("bar",
 "m1"));
-        assertThrows(GroupIdNotFoundException.class, () -> 
context.groupMetadataManager.consumerGroup("bar"));
-    }
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
 memberId,
+            TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 1, 2, 3)
+            )));
 
-    @Test
-    public void testReplayConsumerGroupMetadata() {
-        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-            .build();
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 100));
 
-        // The group is created if it does not exist.
-        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 
10));
-        assertEquals(10, 
context.groupMetadataManager.consumerGroup("foo").groupEpoch());
-    }
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
 member));
+
+        // Member epoch is greater than the expected epoch.
+        FencedMemberEpochException e1 = 
assertThrows(FencedMemberEpochException.class, () ->
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(200)
+                    .setRebalanceTimeoutMs(1500)));
+        assertEquals("The streams group member has a greater member epoch 
(200) than the one known by the group coordinator (100). "
+            + "The member must abandon all its partitions and rejoin.", 
e1.getMessage());
+
+        // Member epoch is smaller than the expected epoch.
+        FencedMemberEpochException e2 = 
assertThrows(FencedMemberEpochException.class, () ->
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(50)
+                    .setRebalanceTimeoutMs(1500)));
+        assertEquals("The streams group member has a smaller member epoch (50) 
than the one known by the group coordinator (100). "
+            + "The member must abandon all its partitions and rejoin.", 
e2.getMessage());
+
+        // Member joins with previous epoch but without providing tasks.
+        FencedMemberEpochException e3 = 
assertThrows(FencedMemberEpochException.class, () ->
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(99)
+                    .setRebalanceTimeoutMs(1500)));
+        assertEquals("The streams group member has a smaller member epoch (99) 
than the one known by the group coordinator (100). "
+            + "The member must abandon all its partitions and rejoin.", 
e3.getMessage());
+
+        // Member joins with previous epoch and has a subset of the owned 
tasks.
+        // This is accepted as the response with the bumped epoch may have 
been lost.
+        // In this case, we provide back the correct epoch to the member.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(99)
+                .setRebalanceTimeoutMs(1500)
+                .setActiveTasks(List.of(new 
StreamsGroupHeartbeatRequestData.TaskIds()
+                    .setSubtopologyId(subtopology1)
+                    .setPartitions(List.of(1, 2))))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+        assertEquals(100, result.response().data().memberEpoch());
+    }
+
+    @Test
+    public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String subtopology2 = "subtopology2";
+        String barTopicName = "bar";
+        Uuid barTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+            new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+        ));
+
+        // Create a context with one streams group containing two members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build())
+            .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 
2)
+            .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2),
+                        TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1)))
+                    .build())
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5),
+                        TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
+                    .build())
+                .withTargetAssignment(memberId1, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2),
+                    TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1)))
+                .withTargetAssignment(memberId2, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5),
+                    TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
+                .withTargetAssignmentEpoch(10)
+                .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+                .withPartitionMetadata(Map.of(
+                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6),
+                    barTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(barTopicId, 
barTopicName, 3)
+                ))
+            )
+            .build();
+
+        assertThrows(GroupMaxSizeReachedException.class, () ->
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId3)

Review Comment:
   Not sure what you mean. In this test, we set the max size to 2. We set up 
the group to already have 2 members. Then, we try to join the group with a new 
member, that would be `memberId3` in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to