mjsax commented on code in PR #19219:
URL: https://github.com/apache/kafka/pull/19219#discussion_r2027993145


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15776,6 +15779,76 @@ public void testStreamsGroupMemberEpochValidation() {
         assertEquals(100, result.response().data().memberEpoch());
     }
 
+    @Test
+    public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String subtopology2 = "subtopology2";
+        String barTopicName = "bar";
+        Uuid barTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+            new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))

Review Comment:
   Why do we build a topology with two sub-topologies? Should it not be 
sufficient for this test to build a minimum topology?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8148,7 +8174,10 @@ private TaskAssignor streamsGroupAssignor(String 
groupId) {
      * Get the assignor of the provided streams group.
      */
     private Map<String, String> streamsGroupAssignmentConfigs(String groupId) {

Review Comment:
   For my own education: why does this method return a `Map`? In the end, we 
_know_ the config name, and I would assume the caller wants to lookup the 
config value. Wo why not just return an `int` ?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15776,6 +15779,76 @@ public void testStreamsGroupMemberEpochValidation() {
         assertEquals(100, result.response().data().memberEpoch());
     }
 
+    @Test
+    public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String subtopology2 = "subtopology2";
+        String barTopicName = "bar";
+        Uuid barTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+            new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+        ));
+
+        // Create a context with one streams group containing two members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build())
+            .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 
2)
+            .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2),
+                        TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1)))
+                    .build())
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5),
+                        TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
+                    .build())
+                .withTargetAssignment(memberId1, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2),

Review Comment:
   Similar question as above -- seems there is a lot of unrelated boilerplate 
code?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15776,6 +15779,76 @@ public void testStreamsGroupMemberEpochValidation() {
         assertEquals(100, result.response().data().memberEpoch());
     }
 
+    @Test
+    public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String subtopology2 = "subtopology2";
+        String barTopicName = "bar";
+        Uuid barTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+            new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+        ));
+
+        // Create a context with one streams group containing two members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build())
+            .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 
2)
+            .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,

Review Comment:
   Why we we need to actually assign tasks? Could this not just stay empty?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8148,7 +8174,10 @@ private TaskAssignor streamsGroupAssignor(String 
groupId) {
      * Get the assignor of the provided streams group.
      */
     private Map<String, String> streamsGroupAssignmentConfigs(String groupId) {
-        return Map.of("group.streams.num.standby.replicas", "0");
+        Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
+        final Integer numStandbyReplicas = 
groupConfig.map(GroupConfig::streamsNumStandbyReplicas)
+            .orElse(config.streamsGroupNumStandbyReplicas());
+        return Map.of("num.standby.replicas", numStandbyReplicas.toString());

Review Comment:
   Why we we return without `group.streams` prefix of the config name now?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testStreamsGroupDynamicConfigs() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result =
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(10000)
+                    .setTopology(topology)
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of()));
+        assertEquals(1, result.response().data().memberEpoch());
+        assertEquals(Map.of("num.standby.replicas", "0"), 
assignor.lastPassedAssignmentConfigs());
+
+        // Verify heartbeat interval
+        assertEquals(5000, result.response().data().heartbeatIntervalMs());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);
+
+        // Advance time.
+        assertEquals(
+            List.of(),
+            context.sleep(result.response().data().heartbeatIntervalMs())
+        );
+
+        // Dynamic update group config
+        Properties newGroupConfig = new Properties();
+        newGroupConfig.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 50000);
+        newGroupConfig.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
+        newGroupConfig.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 2);
+        context.updateGroupConfig(groupId, newGroupConfig);
+
+        // Session timer is rescheduled on second heartbeat, new assignment 
with new parameter is calculated.
+        result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(result.response().data().memberEpoch())
+                .setRackId("bla"));
+        assertEquals(2, result.response().data().memberEpoch());

Review Comment:
   As above: seems to be unrelated to this test?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testStreamsGroupDynamicConfigs() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result =
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(10000)
+                    .setTopology(topology)
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of()));
+        assertEquals(1, result.response().data().memberEpoch());
+        assertEquals(Map.of("num.standby.replicas", "0"), 
assignor.lastPassedAssignmentConfigs());
+
+        // Verify heartbeat interval
+        assertEquals(5000, result.response().data().heartbeatIntervalMs());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);

Review Comment:
   Similar to above. Where does 45000 come from?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MockTaskAssignor.java:
##########
@@ -60,6 +65,7 @@ public String name() {
     @Override
     public GroupAssignment assign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber)
         throws TaskAssignorException {
+        assignmentConfigs = groupSpec.assignmentConfigs();

Review Comment:
   There is no changes to `GroupSpec` in this PR, so not sure if understand the 
relationship to the rest of the code



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +2019,290 @@ private 
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
             .toList();
     }
 
+    /**
+     * Handles a regular heartbeat from a Streams group member.
+     * It mainly consists of five parts:
+     * 1) Create or update the member.
+     *    The group epoch is bumped if the member has been created or updated.
+     * 2) Initialize or update the topology.
+     *    The group epoch is bumped if the topology has been created or 
updated.
+     * 3) Determine the partition metadata and any internal topics that need 
to be created.
+     * 4) Update the target assignment for the streams group if the group epoch
+     *    is larger than the current target assignment epoch.
+     * 5) Reconcile the member's assignment with the target assignment.
+     *
+     * @param groupId             The group ID from the request.
+     * @param memberId            The member ID from the request.
+     * @param memberEpoch         The member epoch from the request.
+     * @param instanceId          The instance ID from the request or null.
+     * @param rackId              The rack ID from the request or null.
+     * @param rebalanceTimeoutMs  The rebalance timeout from the request or -1.
+     * @param clientId            The client ID.
+     * @param clientHost          The client host.
+     * @param topology            The topology from the request or null.
+     * @param ownedActiveTasks    The list of owned active tasks from the 
request or null.
+     * @param ownedStandbyTasks   The list of owned standby tasks from the 
request or null.
+     * @param ownedWarmupTasks    The list of owned warmup tasks from the 
request or null.
+     * @param userEndpoint        User-defined endpoint for Interactive 
Queries, or null.
+     * @param clientTags          Used for rack-aware assignment algorithm, or 
null.
+     * @param taskEndOffsets      Cumulative changelog offsets for tasks, or 
null.
+     * @param taskOffsets         Cumulative changelog end-offsets for tasks, 
or null.
+     * @param shutdownApplication Whether all Streams clients in the group 
should shut down.
+     * @return A result containing the StreamsGroupHeartbeat response and a 
list of records to update the state machine.
+     */
+    private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
streamsGroupHeartbeat(
+        String groupId,
+        String memberId,
+        int memberEpoch,
+        String instanceId,
+        String rackId,
+        int rebalanceTimeoutMs,
+        String clientId,
+        String clientHost,
+        StreamsGroupHeartbeatRequestData.Topology topology,
+        List<TaskIds> ownedActiveTasks,
+        List<TaskIds> ownedStandbyTasks,
+        List<TaskIds> ownedWarmupTasks,
+        String processId,
+        Endpoint userEndpoint,
+        List<KeyValue> clientTags,
+        List<TaskOffset> taskOffsets,
+        List<TaskOffset> taskEndOffsets,
+        boolean shutdownApplication
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        final List<CoordinatorRecord> records = new ArrayList<>();
+        final List<StreamsGroupHeartbeatResponseData.Status> returnedStatus = 
new ArrayList<>();
+
+        // Get or create the streams group.
+        boolean isJoining = memberEpoch == 0;
+        final StreamsGroup group = isJoining ? 
getOrCreateStreamsGroup(groupId) : getStreamsGroupOrThrow(groupId);
+        throwIfStreamsGroupIsFull(group, memberId);

Review Comment:
   Should we call this only if `isJoining` is true? And if yes, could we 
simplify the logic of `throwIfStreamsGroupIsFull` and only check its size? -- 
If `isJoining` is false, I would assume that `memberId` is never empty and that 
the `memberId` is never new (ie, always contained in `group`?
   



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testStreamsGroupDynamicConfigs() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result =
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(10000)
+                    .setTopology(topology)
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of()));
+        assertEquals(1, result.response().data().memberEpoch());
+        assertEquals(Map.of("num.standby.replicas", "0"), 
assignor.lastPassedAssignmentConfigs());
+
+        // Verify heartbeat interval
+        assertEquals(5000, result.response().data().heartbeatIntervalMs());

Review Comment:
   Where was this set, ie, where does 5000 come from? Don't see it in the test 
setup.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -17848,6 +17921,104 @@ public void testStreamsRebalanceTimeoutExpiration() {
         context.assertNoRebalanceTimeout(groupId, memberId1);
     }
 
+    @Test
+    public void testStreamsOnNewMetadataImage() {

Review Comment:
   I don't understand this test, ie, how is it related to test code changes 
above?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testStreamsGroupDynamicConfigs() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result =
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(10000)
+                    .setTopology(topology)
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of()));
+        assertEquals(1, result.response().data().memberEpoch());

Review Comment:
   Why do we verify this? Seems unrelated? It's not a config.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testStreamsGroupDynamicConfigs() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result =
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(10000)
+                    .setTopology(topology)
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of()));
+        assertEquals(1, result.response().data().memberEpoch());
+        assertEquals(Map.of("num.standby.replicas", "0"), 
assignor.lastPassedAssignmentConfigs());
+
+        // Verify heartbeat interval
+        assertEquals(5000, result.response().data().heartbeatIntervalMs());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);
+
+        // Advance time.
+        assertEquals(

Review Comment:
   For my own education. What are we asserting here and why? -- Also, why do we 
need to advance time for this test, that should verify if some configs are 
picked up (which we verify explicitly anyway)?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testStreamsGroupDynamicConfigs() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2))));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result =
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(10000)
+                    .setTopology(topology)
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of()));
+        assertEquals(1, result.response().data().memberEpoch());
+        assertEquals(Map.of("num.standby.replicas", "0"), 
assignor.lastPassedAssignmentConfigs());
+
+        // Verify heartbeat interval
+        assertEquals(5000, result.response().data().heartbeatIntervalMs());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);
+
+        // Advance time.
+        assertEquals(
+            List.of(),
+            context.sleep(result.response().data().heartbeatIntervalMs())
+        );
+
+        // Dynamic update group config
+        Properties newGroupConfig = new Properties();
+        newGroupConfig.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 50000);
+        newGroupConfig.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
+        newGroupConfig.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 2);
+        context.updateGroupConfig(groupId, newGroupConfig);
+
+        // Session timer is rescheduled on second heartbeat, new assignment 
with new parameter is calculated.
+        result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(result.response().data().memberEpoch())
+                .setRackId("bla"));
+        assertEquals(2, result.response().data().memberEpoch());
+
+        // Verify heartbeat interval
+        assertEquals(10000, result.response().data().heartbeatIntervalMs());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 50000);
+
+        // Verify that the new number of standby replicas is used
+        assertEquals(Map.of("num.standby.replicas", "2"), 
assignor.lastPassedAssignmentConfigs());
+
+        // Advance time.
+        assertEquals(
+            List.of(),
+            context.sleep(result.response().data().heartbeatIntervalMs())
+        );
+
+        // Session timer is cancelled on leave.

Review Comment:
   Seems to be unrelated to what we want to test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to