junrao commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r939547038
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
metadataCache match {
// In KRaft mode, only replicas which are not fenced nor in controlled
shutdown are
- // allowed to join the ISR. This does not apply to ZK mode.
+ // allowed to join the ISR. In ZK mode, we just ensure the broker is
alive and not shutting down.
Review Comment:
In ControllerChannelManager.sendUpdateMetadataRequests(), it seems that we
include shutting down broker in liveBrokers. So, we won't know whether a remote
broker is shutting down.
##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1044,73 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
assertEquals(expectedAlterPartitionResponse, future.get(10,
TimeUnit.SECONDS))
}
+ @Test
+ def testShutdownBrokerNotAddedToIsr(): Unit = {
+ servers = makeServers(2)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+ val otherBroker = servers.find(_.config.brokerId != controllerId).get
+ val brokerId = otherBroker.config.brokerId
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+ val fullIsr = List(controllerId, brokerId)
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
+
+ // Shut down follower.
+ servers(brokerId).shutdown()
+ servers(brokerId).awaitShutdown()
+
+ val controller = getController().kafkaController
+ val leaderIsrAndControllerEpochMap =
zkClient.getTopicPartitionStates(Seq(tp))
+ val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+ val topicId = controller.controllerContext.topicIds(tp.topic)
+ val controllerEpoch =
controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+ // We expect only the controller (online broker) to be in ISR
+ assertEquals(List(controllerId), newLeaderAndIsr.isr)
+
+ // Try to update ISR to contain the offline broker.
+ val alterPartitionRequest = new AlterPartitionRequestData()
+ .setBrokerId(controllerId)
+ .setBrokerEpoch(controllerEpoch)
+ .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+ .setTopicId(topicId)
+ .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+ .setPartitionIndex(tp.partition)
+ .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
+ .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
+ .setNewIsr(fullIsr.map(Int.box).asJava)
+ .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+ ).asJava)
+ ).asJava)
+
+ val future = new CompletableFuture[AlterPartitionResponseData]()
+ controller.eventManager.put(AlterPartitionReceived(
+ alterPartitionRequest,
+ AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
+ future.complete
+ ))
+
+ // We expect an ineligble replica error response for the partition.
+ val expectedAlterPartitionResponse = new AlterPartitionResponseData()
+ .setTopics(Seq(new AlterPartitionResponseData.TopicData()
+ .setTopicId(topicId)
+ .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+ .setErrorCode(Errors.INELIGIBLE_REPLICA.code())
+ .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+ ).asJava)
+ ).asJava)
+
+ assertEquals(expectedAlterPartitionResponse, future.get(10,
TimeUnit.SECONDS))
+ assertEquals(List(controllerId), newLeaderAndIsr.isr)
Review Comment:
Should we read leaderAndIsr from controller again?
##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1044,73 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
assertEquals(expectedAlterPartitionResponse, future.get(10,
TimeUnit.SECONDS))
}
+ @Test
+ def testShutdownBrokerNotAddedToIsr(): Unit = {
+ servers = makeServers(2)
+ val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+ val otherBroker = servers.find(_.config.brokerId != controllerId).get
+ val brokerId = otherBroker.config.brokerId
+ val tp = new TopicPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+ val fullIsr = List(controllerId, brokerId)
+ TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
+
+ // Shut down follower.
+ servers(brokerId).shutdown()
+ servers(brokerId).awaitShutdown()
+
+ val controller = getController().kafkaController
+ val leaderIsrAndControllerEpochMap =
zkClient.getTopicPartitionStates(Seq(tp))
Review Comment:
Could we just get the leaderAndIsr from the controller? It's cheaper than
reading from ZK.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]