DL1231 commented on code in PR #15067: URL: https://github.com/apache/kafka/pull/15067#discussion_r1667579599
########## core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala: ########## @@ -393,6 +398,81 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) } + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.consumer.heartbeat.interval.ms", value = "5000") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) Review Comment: Done ########## core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala: ########## @@ -393,6 +398,81 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) } + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + new ClusterConfigProperty(key = "group.consumer.heartbeat.interval.ms", value = "5000") + ), + features = Array( + new ClusterFeature(feature = Features.GROUP_VERSION, version = 1) + ) + ) + def testUpdateConsumerGroupHeartbeatConfigSuccessful(): Unit = { + val raftCluster = cluster.asInstanceOf[RaftClusterInstance] + val admin = cluster.createAdminClient() + val newHeartbeatIntervalMs = 10000 + val instanceId = "instanceId" + val consumerGroupId = "grp" + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = raftCluster.brokers.values().asScala.toSeq, + controllers = raftCluster.controllers().values().asScala.toSeq + ) + + // Heartbeat request to join the group. Note that the member subscribes + // to an nonexistent topic. + var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(consumerGroupId) + .setInstanceId(instanceId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() + + // Send the request until receiving a successful response. There is a delay + // here because the group coordinator is loaded in the background. + var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code + }, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.") + + // Verify the response. + assertNotNull(consumerGroupHeartbeatResponse.data.memberId) + assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(5000, consumerGroupHeartbeatResponse.data.heartbeatIntervalMs) + + // Alter consumer heartbeat interval config + val resource = new ConfigResource(ConfigResource.Type.GROUP, consumerGroupId) + val op = new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, newHeartbeatIntervalMs.toString), + OpType.SET) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org