lucasbru commented on code in PR #21799:
URL: https://github.com/apache/kafka/pull/21799#discussion_r3189330883


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18968,11 +18994,126 @@ public void 
testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() {
                 .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setTaskOffsetIntervalMs(60_000)
+                .setAcceptableRecoveryLag(10_000),
             result.response().data()
         );
     }
 
+    @Test
+    public void testStreamsGroupHeartbeatResponseVersion0() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .buildCoordinatorMetadataImage())
+            
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 0)
+            .build();
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, 
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+        // Send version 0 heartbeat request
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()),
+            (short) 0 // Version 0
+        );
+
+        StreamsGroupHeartbeatResult response = result.response();
+        StreamsGroupHeartbeatResponseData data = response.data();
+
+        // If ok for version0 to set `acceptableRecoveryLag` because the field 
is marked as `ignorable`
+        assertEquals(0, data.acceptableRecoveryLagLegacy(),
+            "Version 0 response should NOT include acceptableRecoveryLagLegacy 
(should be default 0)");
+        assertEquals(10_000L, data.acceptableRecoveryLag(),
+            "Version 0 response should NOT include acceptableRecoveryLag 
(should be default -1)");

Review Comment:
   should be default -1, but checking for 10_000L?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18968,11 +18994,126 @@ public void 
testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() {
                 .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setTaskOffsetIntervalMs(60_000)
+                .setAcceptableRecoveryLag(10_000),
             result.response().data()
         );
     }
 
+    @Test
+    public void testStreamsGroupHeartbeatResponseVersion0() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .buildCoordinatorMetadataImage())
+            
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 0)
+            .build();
+
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, 
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+        // Send version 0 heartbeat request
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()),
+            (short) 0 // Version 0
+        );
+
+        StreamsGroupHeartbeatResult response = result.response();
+        StreamsGroupHeartbeatResponseData data = response.data();
+
+        // If ok for version0 to set `acceptableRecoveryLag` because the field 
is marked as `ignorable`

Review Comment:
   ```suggestion
           // It's ok for version0 to set `acceptableRecoveryLag` because the 
field is marked as `ignorable`
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java:
##########
@@ -439,4 +443,26 @@ public int taskOffsetIntervalMs() {
         return taskOffsetIntervalMs.get();
     }
 
+    /**
+     * Updated whenever a heartbeat response is received from the broker.
+     *
+     * <p>If the broker does not support warmup tasks, this field should be 
set to {@code -1}.
+     * For this case, the Kafka Streams client is not required to populate 
{@code TaskOffsets} or
+     * {@code TaskEndOffsets} fields in {@link 
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest}.
+     */
+    public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) {
+        this.acceptableRecoveryLag.set(acceptableRecoveryLag);
+    }
+
+    /**
+     * Returns the acceptable recovery lag.
+     *
+     * <p>If acceptable recovery lag is set to {@code -1}, it means the broker 
doesn't support warmup tasks,
+     * and the Kafka Streams client it's not required to populate {@code 
TaskOffsets} or {@code TaskEndOffsets} fields

Review Comment:
   ```suggestion
        * and the Kafka Streams client is not required to populate {@code 
TaskOffsets} or {@code TaskEndOffsets} fields
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to