lucasbru commented on code in PR #21799:
URL: https://github.com/apache/kafka/pull/21799#discussion_r3189330883
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18968,11 +18994,126 @@ public void
testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() {
.setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
}
+ @Test
+ public void testStreamsGroupHeartbeatResponseVersion0() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
+ .build();
+
+ assignor.prepareGroupAssignment(
+ Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+ // Send version 0 heartbeat request
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ (short) 0 // Version 0
+ );
+
+ StreamsGroupHeartbeatResult response = result.response();
+ StreamsGroupHeartbeatResponseData data = response.data();
+
+ // If ok for version0 to set `acceptableRecoveryLag` because the field
is marked as `ignorable`
+ assertEquals(0, data.acceptableRecoveryLagLegacy(),
+ "Version 0 response should NOT include acceptableRecoveryLagLegacy
(should be default 0)");
+ assertEquals(10_000L, data.acceptableRecoveryLag(),
+ "Version 0 response should NOT include acceptableRecoveryLag
(should be default -1)");
Review Comment:
should be default -1, but checking for 10_000L?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -18968,11 +18994,126 @@ public void
testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() {
.setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
}
+ @Test
+ public void testStreamsGroupHeartbeatResponseVersion0() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
+ .build();
+
+ assignor.prepareGroupAssignment(
+ Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+ // Send version 0 heartbeat request
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ (short) 0 // Version 0
+ );
+
+ StreamsGroupHeartbeatResult response = result.response();
+ StreamsGroupHeartbeatResponseData data = response.data();
+
+ // If ok for version0 to set `acceptableRecoveryLag` because the field
is marked as `ignorable`
Review Comment:
```suggestion
// It's ok for version0 to set `acceptableRecoveryLag` because the
field is marked as `ignorable`
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java:
##########
@@ -439,4 +443,26 @@ public int taskOffsetIntervalMs() {
return taskOffsetIntervalMs.get();
}
+ /**
+ * Updated whenever a heartbeat response is received from the broker.
+ *
+ * <p>If the broker does not support warmup tasks, this field should be
set to {@code -1}.
+ * For this case, the Kafka Streams client is not required to populate
{@code TaskOffsets} or
+ * {@code TaskEndOffsets} fields in {@link
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest}.
+ */
+ public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) {
+ this.acceptableRecoveryLag.set(acceptableRecoveryLag);
+ }
+
+ /**
+ * Returns the acceptable recovery lag.
+ *
+ * <p>If acceptable recovery lag is set to {@code -1}, it means the broker
doesn't support warmup tasks,
+ * and the Kafka Streams client it's not required to populate {@code
TaskOffsets} or {@code TaskEndOffsets} fields
Review Comment:
```suggestion
* and the Kafka Streams client is not required to populate {@code
TaskOffsets} or {@code TaskEndOffsets} fields
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]