[ https://issues.apache.org/jira/browse/KAFKA-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16607679#comment-16607679 ]
ASF GitHub Bot commented on KAFKA-6082: --------------------------------------- lindong28 closed pull request #5101: KAFKA-6082: Fence zookeeper updates with controller epoch zkVersion URL: https://github.com/apache/kafka/pull/5101 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index a92340f2a4b..f3c81efb0b5 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -70,7 +70,7 @@ class Partition(val topic: String, * the controller sends it a start replica command containing the leader for each partition that the broker hosts. * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for * each partition. */ - private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 + private var controllerEpoch: Int = KafkaController.InitialControllerEpoch this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] " private def isReplicaLocal(replicaId: Int) : Boolean = replicaId == localBrokerId || replicaId == Request.FutureLocalReplicaId diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index f4671cfaa1a..20c3de0c1e5 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -28,8 +28,8 @@ class ControllerContext { var controllerChannelManager: ControllerChannelManager = null var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty - var epoch: Int = KafkaController.InitialControllerEpoch - 1 - var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1 + var epoch: Int = KafkaController.InitialControllerEpoch + var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion var allTopics: Set[String] = Set.empty private var partitionReplicaAssignmentUnderlying: mutable.Map[String, mutable.Map[Int, Seq[Int]]] = mutable.Map.empty val partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala index 13967e029ed..c93e9e79ec2 100644 --- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala @@ -24,6 +24,7 @@ import com.yammer.metrics.core.Gauge import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.utils.CoreUtils.inLock import kafka.utils.ShutdownableThread +import org.apache.kafka.common.errors.ControllerMovedException import org.apache.kafka.common.utils.Time import scala.collection._ @@ -32,12 +33,14 @@ object ControllerEventManager { val ControllerEventThreadName = "controller-event-thread" } class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer], - eventProcessedListener: ControllerEvent => Unit) extends KafkaMetricsGroup { + eventProcessedListener: ControllerEvent => Unit, + controllerMovedListener: () => Unit) extends KafkaMetricsGroup { @volatile private var _state: ControllerState = ControllerState.Idle private val putLock = new ReentrantLock() private val queue = new LinkedBlockingQueue[ControllerEvent] - private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName) + // Visible for test + private[controller] val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName) private val time = Time.SYSTEM private val eventQueueTimeHist = newHistogram("EventQueueTimeMs") @@ -86,6 +89,9 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll controllerEvent.process() } } catch { + case e: ControllerMovedException => + info(s"Controller moved to another broker when processing $controllerEvent.", e) + controllerMovedListener() case e: Throwable => error(s"Error processing event $controllerEvent", e) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 286768f595b..379e66da9e0 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -35,14 +35,14 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, LeaderAndIsrResponse, StopReplicaResponse} import org.apache.kafka.common.utils.Time import org.apache.zookeeper.KeeperException -import org.apache.zookeeper.KeeperException.{Code, NodeExistsException} +import org.apache.zookeeper.KeeperException.Code import scala.collection._ import scala.util.Try object KafkaController extends Logging { - val InitialControllerEpoch = 1 - val InitialControllerEpochZkVersion = 1 + val InitialControllerEpoch = 0 + val InitialControllerEpochZkVersion = 0 /** * ControllerEventThread will shutdown once it sees this event @@ -52,6 +52,12 @@ object KafkaController extends Logging { override def process(): Unit = () } + // Used only by test + private[controller] case class AwaitOnLatch(latch: CountDownLatch) extends ControllerEvent { + override def state: ControllerState = ControllerState.ControllerChange + override def process(): Unit = latch.await() + } + } class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, initialBrokerInfo: BrokerInfo, @@ -70,7 +76,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti // visible for testing private[controller] val eventManager = new ControllerEventManager(config.brokerId, - controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics()) + controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () => maybeResign()) val topicDeletionManager = new TopicDeletionManager(this, eventManager, zkClient) private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger) @@ -214,21 +220,15 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti /** * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller. * It does the following things on the become-controller state change - - * 1. Registers controller epoch changed listener - * 2. Increments the controller epoch - * 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and + * 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and * leaders for all existing partitions. - * 4. Starts the controller's channel manager - * 5. Starts the replica state machine - * 6. Starts the partition state machine + * 2. Starts the controller's channel manager + * 3. Starts the replica state machine + * 4. Starts the partition state machine * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller. * This ensures another controller election will be triggered and there will always be an actively serving controller */ private def onControllerFailover() { - info("Reading controller epoch from ZooKeeper") - readControllerEpochFromZooKeeper() - info("Incrementing controller epoch in ZooKeeper") - incrementControllerEpoch() info("Registering handlers") // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks @@ -239,9 +239,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence) info("Deleting log dir event notifications") - zkClient.deleteLogDirEventNotifications() + zkClient.deleteLogDirEventNotifications(controllerContext.epochZkVersion) info("Deleting isr change notifications") - zkClient.deleteIsrChangeNotifications() + zkClient.deleteIsrChangeNotifications(controllerContext.epochZkVersion) info("Initializing controller context") initializeControllerContext() info("Fetching topic deletions in progress") @@ -599,6 +599,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti topicDeletionManager.markTopicIneligibleForDeletion(Set(topic)) onPartitionReassignment(tp, reassignedPartitionContext) } catch { + case e: ControllerMovedException => + error(s"Error completing reassignment of partition $tp because controller has moved to another broker", e) + throw e case e: Throwable => error(s"Error completing reassignment of partition $tp", e) // remove the partition from the admin path to unblock the admin client @@ -619,41 +622,15 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti try { partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy)) } catch { + case e: ControllerMovedException => + error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")} because controller has moved to another broker.", e) + throw e case e: Throwable => error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")}", e) } finally { removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance) } } - private def incrementControllerEpoch(): Unit = { - val newControllerEpoch = controllerContext.epoch + 1 - val setDataResponse = zkClient.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion) - setDataResponse.resultCode match { - case Code.OK => - controllerContext.epochZkVersion = setDataResponse.stat.getVersion - controllerContext.epoch = newControllerEpoch - case Code.NONODE => - // if path doesn't exist, this is the first controller whose epoch should be 1 - // the following call can still fail if another controller gets elected between checking if the path exists and - // trying to create the controller epoch path - val createResponse = zkClient.createControllerEpochRaw(KafkaController.InitialControllerEpoch) - createResponse.resultCode match { - case Code.OK => - controllerContext.epoch = KafkaController.InitialControllerEpoch - controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion - case Code.NODEEXISTS => - throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure") - case _ => - val exception = createResponse.resultException.get - error("Error while incrementing controller epoch", exception) - throw exception - } - case _ => - throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure") - } - info(s"Epoch incremented to ${controllerContext.epoch}") - } - private def initializeControllerContext() { // update controller cache with delete topic information controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet @@ -783,7 +760,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti private def updateAssignedReplicasForPartition(partition: TopicPartition, replicas: Seq[Int]) { controllerContext.updatePartitionReplicaAssignment(partition, replicas) - val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, controllerContext.partitionReplicaAssignmentForTopic(partition.topic)) + val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, controllerContext.partitionReplicaAssignmentForTopic(partition.topic), controllerContext.epochZkVersion) setDataResponse.resultCode match { case Code.OK => info(s"Updated assigned replicas for partition $partition being reassigned to ${replicas.mkString(",")}") @@ -844,16 +821,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti controllerContext.partitionsBeingReassigned.values.foreach(_.unregisterReassignIsrChangeHandler(zkClient)) } - private def readControllerEpochFromZooKeeper() { - // initialize the controller epoch and zk version by reading from zookeeper - val epochAndStatOpt = zkClient.getControllerEpoch - epochAndStatOpt.foreach { case (epoch, stat) => - controllerContext.epoch = epoch - controllerContext.epochZkVersion = stat.getVersion - info(s"Initialized controller epoch to ${controllerContext.epoch} and zk version ${controllerContext.epochZkVersion}") - } - } - /** * Remove partition from partitions being reassigned in ZooKeeper and ControllerContext. If the partition reassignment * is complete (i.e. there is no other partition with a reassignment in progress), the reassign_partitions znode @@ -874,12 +841,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti // write the new list to zookeeper if (updatedPartitionsBeingReassigned.isEmpty) { info(s"No more partitions need to be reassigned. Deleting zk path ${ReassignPartitionsZNode.path}") - zkClient.deletePartitionReassignment() + zkClient.deletePartitionReassignment(controllerContext.epochZkVersion) // Ensure we detect future reassignments eventManager.put(PartitionReassignment) } else { val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas) - try zkClient.setOrCreatePartitionReassignment(reassignment) + try zkClient.setOrCreatePartitionReassignment(reassignment, controllerContext.epochZkVersion) catch { case e: KeeperException => throw new AdminOperationException(e) } @@ -902,7 +869,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } } if (!isTriggeredByAutoRebalance) { - zkClient.deletePreferredReplicaElection() + zkClient.deletePreferredReplicaElection(controllerContext.epochZkVersion) // Ensure we detect future preferred replica leader elections eventManager.put(PreferredReplicaLeaderElection) } @@ -955,7 +922,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion // update the new leadership decision in zookeeper or retry val UpdateLeaderAndIsrResult(successfulUpdates, _, failedUpdates) = - zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch) + zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch, controllerContext.epochZkVersion) if (successfulUpdates.contains(partition)) { val finalLeaderAndIsr = successfulUpdates(partition) finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch)) @@ -1204,13 +1171,32 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } private def triggerControllerMove(): Unit = { - onControllerResignation() - activeControllerId = -1 - zkClient.deleteController() + activeControllerId = zkClient.getControllerId.getOrElse(-1) + if (!isActive) { + warn("Controller has already moved when trying to trigger controller movement") + return + } + try { + val expectedControllerEpochZkVersion = controllerContext.epochZkVersion + activeControllerId = -1 + onControllerResignation() + zkClient.deleteController(expectedControllerEpochZkVersion) + } catch { + case _: ControllerMovedException => + warn("Controller has already moved when trying to trigger controller movement") + } + } + + private def maybeResign(): Unit = { + val wasActiveBeforeChange = isActive + zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) + activeControllerId = zkClient.getControllerId.getOrElse(-1) + if (wasActiveBeforeChange && !isActive) { + onControllerResignation() + } } private def elect(): Unit = { - val timestamp = time.milliseconds activeControllerId = zkClient.getControllerId.getOrElse(-1) /* * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, @@ -1223,22 +1209,27 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } try { - zkClient.registerController(config.brokerId, timestamp) - info(s"${config.brokerId} successfully elected as the controller") + val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId) + controllerContext.epoch = epoch + controllerContext.epochZkVersion = epochZkVersion activeControllerId = config.brokerId + + info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " + + s"and epoch zk version is now ${controllerContext.epochZkVersion}") + onControllerFailover() } catch { - case _: NodeExistsException => - // If someone else has written the path, then - activeControllerId = zkClient.getControllerId.getOrElse(-1) + case e: ControllerMovedException => + maybeResign() if (activeControllerId != -1) - debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}") + debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e) else - warn("A controller has been elected but just resigned, this will result in another round of election") + warn("A controller has been elected but just resigned, this will result in another round of election", e) - case e2: Throwable => - error(s"Error while electing or becoming controller on broker ${config.brokerId}", e2) + case t: Throwable => + error(s"Error while electing or becoming controller on broker ${config.brokerId}. " + + s"Trigger controller movement immediately", t) triggerControllerMove() } } @@ -1321,7 +1312,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti onBrokerLogDirFailure(brokerIds) } finally { // delete processed children - zkClient.deleteLogDirEventNotifications(sequenceNumbers) + zkClient.deleteLogDirEventNotifications(sequenceNumbers, controllerContext.epochZkVersion) } } } @@ -1336,7 +1327,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti val existingPartitionReplicaAssignment = newPartitionReplicaAssignment.filter(p => existingPartitions.contains(p._1.partition.toString)) - zkClient.setTopicAssignment(topic, existingPartitionReplicaAssignment) + zkClient.setTopicAssignment(topic, existingPartitionReplicaAssignment, controllerContext.epochZkVersion) } override def process(): Unit = { @@ -1377,7 +1368,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics if (nonExistentTopics.nonEmpty) { warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}") - zkClient.deleteTopicDeletions(nonExistentTopics.toSeq) + zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion) } topicsToBeDeleted --= nonExistentTopics if (config.deleteTopicEnable) { @@ -1396,7 +1387,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } else { // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics info(s"Removing $topicsToBeDeleted since delete topic is disabled") - zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq) + zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion) } } } @@ -1469,7 +1460,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } } finally { // delete the notifications - zkClient.deleteIsrChangeNotifications(sequenceNumbers) + zkClient.deleteIsrChangeNotifications(sequenceNumbers, controllerContext.epochZkVersion) } } @@ -1504,12 +1495,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti override def state = ControllerState.ControllerChange override def process(): Unit = { - val wasActiveBeforeChange = isActive - zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) - activeControllerId = zkClient.getControllerId.getOrElse(-1) - if (wasActiveBeforeChange && !isActive) { - onControllerResignation() - } + maybeResign() } } @@ -1517,12 +1503,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti override def state = ControllerState.ControllerChange override def process(): Unit = { - val wasActiveBeforeChange = isActive - zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) - activeControllerId = zkClient.getControllerId.getOrElse(-1) - if (wasActiveBeforeChange && !isActive) { - onControllerResignation() - } + maybeResign() elect() } } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 3a0ac190fc1..663ee8da9b4 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -23,6 +23,7 @@ import kafka.utils.Logging import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode} import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.ControllerMovedException import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code @@ -132,6 +133,9 @@ class PartitionStateMachine(config: KafkaConfig, doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt) controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) } catch { + case e: ControllerMovedException => + error(s"Controller moved to another broker when moving some partitions to $targetState state", e) + throw e case e: Throwable => error(s"Error while moving some partitions to $targetState state", e) } } @@ -250,8 +254,11 @@ class PartitionStateMachine(config: KafkaConfig, partition -> leaderIsrAndControllerEpoch }.toMap val createResponses = try { - zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs) + zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs, controllerContext.epochZkVersion) } catch { + case e: ControllerMovedException => + error("Controller moved to another broker when trying to create the topic partition state znode", e) + throw e case e: Exception => partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) } Seq.empty @@ -361,7 +368,7 @@ class PartitionStateMachine(config: KafkaConfig, val recipientsPerPartition = partitionsWithLeaders.map { case (partition, _, recipients) => partition -> recipients }.toMap val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, _) => partition -> leaderAndIsrOpt.get }.toMap val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr( - adjustedLeaderAndIsrs, controllerContext.epoch) + adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion) successfulUpdates.foreach { case (partition, leaderAndIsr) => val replicas = controllerContext.partitionReplicaAssignment(partition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 1ab8a43dfde..433ab566837 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -23,6 +23,7 @@ import kafka.utils.Logging import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode} import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.ControllerMovedException import org.apache.zookeeper.KeeperException.Code import scala.collection.mutable @@ -106,6 +107,9 @@ class ReplicaStateMachine(config: KafkaConfig, } controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) } catch { + case e: ControllerMovedException => + error(s"Controller moved to another broker when moving some replicas to $targetState state", e) + throw e case e: Throwable => error(s"Error while moving some replicas to $targetState state", e) } } @@ -299,7 +303,7 @@ class ReplicaStateMachine(config: KafkaConfig, leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr) } val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr( - adjustedLeaderAndIsrs, controllerContext.epoch) + adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion) val exceptionsForPartitionsWithNoLeaderAndIsrInZk = partitionsWithNoLeaderAndIsrInZk.flatMap { partition => if (!topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) { val exception = new StateChangeFailedException(s"Failed to change state of replica $replicaId for partition $partition since the leader and isr path in zookeeper is empty") diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 8d93ef2fa2d..1ef79be794f 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -92,7 +92,7 @@ class TopicDeletionManager(controller: KafkaController, } else { // if delete topic is disabled clean the topic entries under /admin/delete_topics info(s"Removing $initialTopicsToBeDeleted since delete topic is disabled") - zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq) + zkClient.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq, controllerContext.epochZkVersion) } } @@ -251,9 +251,9 @@ class TopicDeletionManager(controller: KafkaController, controller.replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq, NonExistentReplica) topicsToBeDeleted -= topic topicsWithDeletionStarted -= topic - zkClient.deleteTopicZNode(topic) - zkClient.deleteTopicConfigs(Seq(topic)) - zkClient.deleteTopicDeletions(Seq(topic)) + zkClient.deleteTopicZNode(topic, controllerContext.epochZkVersion) + zkClient.deleteTopicConfigs(Seq(topic), controllerContext.epochZkVersion) + zkClient.deleteTopicDeletions(Seq(topic), controllerContext.epochZkVersion) controllerContext.removeTopic(topic) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 14e537e63db..1a89e1d021b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -167,7 +167,7 @@ class ReplicaManager(val config: KafkaConfig, } /* epoch of the controller that last changed the leader */ - @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 + @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch private val localBrokerId = config.brokerId private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp => new Partition(tp.topic, tp.partition, time, this))) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index c8079654d69..a12abb43963 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -21,24 +21,27 @@ import java.util.Properties import com.yammer.metrics.core.MetricName import kafka.api.LeaderAndIsr import kafka.cluster.Broker -import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch} import kafka.log.LogConfig import kafka.metrics.KafkaMetricsGroup -import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls} +import kafka.security.auth.SimpleAclAuthorizer.{NoAcls, VersionedAcls} import kafka.security.auth.{Acl, Resource, ResourceType} import kafka.server.ConfigType import kafka.utils.Logging import kafka.zookeeper._ import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.resource.PatternType +import org.apache.kafka.common.errors.ControllerMovedException import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.zookeeper.KeeperException.{Code, NodeExistsException} +import org.apache.zookeeper.KeeperException.{BadVersionException, Code, ConnectionLossException, NodeExistsException} +import org.apache.zookeeper.OpResult.{ErrorResult, SetDataResult} import org.apache.zookeeper.data.{ACL, Stat} import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper} import scala.collection.mutable.ArrayBuffer import scala.collection.{Seq, mutable} +import scala.collection.JavaConverters._ /** * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]]. @@ -86,14 +89,75 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean } /** - * Registers a given broker in zookeeper as the controller. + * Registers a given broker in zookeeper as the controller and increments controller epoch. + * @return the (updated controller epoch, epoch zkVersion) tuple * @param controllerId the id of the broker that is to be registered as the controller. - * @param timestamp the timestamp of the controller election. - * @throws KeeperException if an error is returned by ZooKeeper. - */ - def registerController(controllerId: Int, timestamp: Long): Unit = { - val path = ControllerZNode.path - checkedEphemeralCreate(path, ControllerZNode.encode(controllerId, timestamp)) + * @throws ControllerMovedException if fail to create /controller or fail to increment controller epoch. + */ + def registerControllerAndIncrementControllerEpoch(controllerId: Int): (Int, Int) = { + val timestamp = time.milliseconds() + + // Read /controller_epoch to get the current controller epoch and zkVersion, + // create /controller_epoch with initial value if not exists + val (curEpoch, curEpochZkVersion) = getControllerEpoch + .map(e => (e._1, e._2.getVersion)) + .getOrElse(maybeCreateControllerEpochZNode()) + + // Create /controller and update /controller_epoch atomically + val newControllerEpoch = curEpoch + 1 + val expectedControllerEpochZkVersion = curEpochZkVersion + + debug(s"Try to create ${ControllerZNode.path} and increment controller epoch to $newControllerEpoch with expected controller epoch zkVersion $expectedControllerEpochZkVersion") + + def checkControllerAndEpoch(): (Int, Int) = { + val curControllerId = getControllerId.getOrElse(throw new ControllerMovedException( + s"The ephemeral node at ${ControllerZNode.path} went away while checking whether the controller election succeeds. " + + s"Aborting controller startup procedure")) + if (controllerId == curControllerId) { + val (epoch, stat) = getControllerEpoch.getOrElse( + throw new IllegalStateException(s"${ControllerEpochZNode.path} existed before but goes away while trying to read it")) + + // If the epoch is the same as newControllerEpoch, it is safe to infer that the returned epoch zkVersion + // is associated with the current broker during controller election because we already knew that the zk + // transaction succeeds based on the controller znode verification. Other rounds of controller + // election will result in larger epoch number written in zk. + if (epoch == newControllerEpoch) + return (newControllerEpoch, stat.getVersion) + } + throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure") + } + + def tryCreateControllerZNodeAndIncrementEpoch(): (Int, Int) = { + try { + val transaction = zooKeeperClient.createTransaction() + transaction.create(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp), + acls(ControllerZNode.path).asJava, CreateMode.EPHEMERAL) + transaction.setData(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion) + val results = transaction.commit() + val setDataResult = results.get(1).asInstanceOf[SetDataResult] + (newControllerEpoch, setDataResult.getStat.getVersion) + } catch { + case _: NodeExistsException | _: BadVersionException => checkControllerAndEpoch() + case _: ConnectionLossException => + zooKeeperClient.waitUntilConnected() + tryCreateControllerZNodeAndIncrementEpoch() + } + } + + tryCreateControllerZNodeAndIncrementEpoch() + } + + private def maybeCreateControllerEpochZNode(): (Int, Int) = { + createControllerEpochRaw(KafkaController.InitialControllerEpoch).resultCode match { + case Code.OK => + info(s"Successfully created ${ControllerEpochZNode.path} with initial epoch ${KafkaController.InitialControllerEpoch}") + (KafkaController.InitialControllerEpoch, KafkaController.InitialControllerEpochZkVersion) + case Code.NODEEXISTS => + val (epoch, stat) = getControllerEpoch.getOrElse(throw new IllegalStateException(s"${ControllerEpochZNode.path} existed before but goes away while trying to read it")) + (epoch, stat.getVersion) + case code => + throw KeeperException.create(code) + } } def updateBrokerInfo(brokerInfo: BrokerInfo): Unit = { @@ -119,13 +183,15 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Sets topic partition states for the given partitions. * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. * @return sequence of SetDataResponse whose contexts are the partitions they are associated with. */ - def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = { + def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch], expectedControllerEpochZkVersion: Int): Seq[SetDataResponse] = { val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) => val path = TopicPartitionStateZNode.path(partition) val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch) - SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition)) + SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, Some(partition), + controllerZkVersionCheck(expectedControllerEpochZkVersion)) } retryRequestsUntilConnected(setDataRequests.toSeq) } @@ -133,15 +199,16 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Creates topic partition state znodes for the given partitions. * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. * @return sequence of CreateResponse whose contexts are the partitions they are associated with. */ - def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = { - createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq) - createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq) + def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch], expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = { + createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq, expectedControllerEpochZkVersion) + createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq, expectedControllerEpochZkVersion) val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) => val path = TopicPartitionStateZNode.path(partition) val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch) - CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition)) + CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion)) } retryRequestsUntilConnected(createRequests.toSeq) } @@ -172,9 +239,10 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * Update the partition states of multiple partitions in zookeeper. * @param leaderAndIsrs The partition states to update. * @param controllerEpoch The current controller epoch. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. * @return UpdateLeaderAndIsrResult instance containing per partition results. */ - def updateLeaderAndIsr(leaderAndIsrs: Map[TopicPartition, LeaderAndIsr], controllerEpoch: Int): UpdateLeaderAndIsrResult = { + def updateLeaderAndIsr(leaderAndIsrs: Map[TopicPartition, LeaderAndIsr], controllerEpoch: Int, expectedControllerEpochZkVersion: Int): UpdateLeaderAndIsrResult = { val successfulUpdates = mutable.Map.empty[TopicPartition, LeaderAndIsr] val updatesToRetry = mutable.Buffer.empty[TopicPartition] val failed = mutable.Map.empty[TopicPartition, Exception] @@ -182,8 +250,9 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) } val setDataResponses = try { - setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs) + setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs, expectedControllerEpochZkVersion) } catch { + case e: ControllerMovedException => throw e case e: Exception => leaderAndIsrs.keys.foreach(partition => failed.put(partition, e)) return UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap) @@ -381,10 +450,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * Sets the topic znode with the given assignment. * @param topic the topic whose assignment is being set. * @param assignment the partition to replica mapping to set for the given topic + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. * @return SetDataResponse */ - def setTopicAssignmentRaw(topic: String, assignment: collection.Map[TopicPartition, Seq[Int]]): SetDataResponse = { - val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), ZkVersion.MatchAnyVersion) + def setTopicAssignmentRaw(topic: String, assignment: collection.Map[TopicPartition, Seq[Int]], expectedControllerEpochZkVersion: Int): SetDataResponse = { + val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), ZkVersion.MatchAnyVersion, + zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)) retryRequestUntilConnected(setDataRequest) } @@ -392,10 +463,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * Sets the topic znode with the given assignment. * @param topic the topic whose assignment is being set. * @param assignment the partition to replica mapping to set for the given topic + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. * @throws KeeperException if there is an error while setting assignment */ - def setTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = { - val setDataResponse = setTopicAssignmentRaw(topic, assignment) + def setTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]], expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion) = { + val setDataResponse = setTopicAssignmentRaw(topic, assignment, expectedControllerEpochZkVersion) setDataResponse.maybeThrow } @@ -443,11 +515,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Deletes all log dir event notifications. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. */ - def deleteLogDirEventNotifications(): Unit = { + def deleteLogDirEventNotifications(expectedControllerEpochZkVersion: Int): Unit = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) if (getChildrenResponse.resultCode == Code.OK) { - deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)) + deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber), expectedControllerEpochZkVersion) } else if (getChildrenResponse.resultCode != Code.NONODE) { getChildrenResponse.maybeThrow } @@ -456,10 +529,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Deletes the log dir event notifications associated with the given sequence numbers. * @param sequenceNumbers the sequence numbers associated with the log dir event notifications to be deleted. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. */ - def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = { + def deleteLogDirEventNotifications(sequenceNumbers: Seq[String], expectedControllerEpochZkVersion: Int): Unit = { val deleteRequests = sequenceNumbers.map { sequenceNumber => - DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), ZkVersion.MatchAnyVersion) + DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), ZkVersion.MatchAnyVersion, + zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)) } retryRequestsUntilConnected(deleteRequests) } @@ -677,9 +752,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Remove the given topics from the topics marked for deletion. * @param topics the topics to remove. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. */ - def deleteTopicDeletions(topics: Seq[String]): Unit = { - val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), ZkVersion.MatchAnyVersion)) + def deleteTopicDeletions(topics: Seq[String], expectedControllerEpochZkVersion: Int): Unit = { + val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), ZkVersion.MatchAnyVersion, + zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))) retryRequestsUntilConnected(deleteRequests) } @@ -708,18 +785,20 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * exists or not. * * @param reassignment the reassignment to set on the reassignment znode + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. * @throws KeeperException if there is an error while setting or creating the znode */ - def setOrCreatePartitionReassignment(reassignment: collection.Map[TopicPartition, Seq[Int]]): Unit = { + def setOrCreatePartitionReassignment(reassignment: collection.Map[TopicPartition, Seq[Int]], expectedControllerEpochZkVersion: Int): Unit = { def set(reassignmentData: Array[Byte]): SetDataResponse = { - val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, reassignmentData, ZkVersion.MatchAnyVersion) + val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, reassignmentData, ZkVersion.MatchAnyVersion, + zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)) retryRequestUntilConnected(setDataRequest) } def create(reassignmentData: Array[Byte]): CreateResponse = { val createRequest = CreateRequest(ReassignPartitionsZNode.path, reassignmentData, acls(ReassignPartitionsZNode.path), - CreateMode.PERSISTENT) + CreateMode.PERSISTENT, zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)) retryRequestUntilConnected(createRequest) } @@ -744,9 +823,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Deletes the partition reassignment znode. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. */ - def deletePartitionReassignment(): Unit = { - val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, ZkVersion.MatchAnyVersion) + def deletePartitionReassignment(expectedControllerEpochZkVersion: Int): Unit = { + val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, ZkVersion.MatchAnyVersion, + zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)) retryRequestUntilConnected(deleteRequest) } @@ -851,11 +932,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Deletes all isr change notifications. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. */ - def deleteIsrChangeNotifications(): Unit = { + def deleteIsrChangeNotifications(expectedControllerEpochZkVersion: Int): Unit = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path)) if (getChildrenResponse.resultCode == Code.OK) { - deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber)) + deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber), expectedControllerEpochZkVersion) } else if (getChildrenResponse.resultCode != Code.NONODE) { getChildrenResponse.maybeThrow } @@ -864,10 +946,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Deletes the isr change notifications associated with the given sequence numbers. * @param sequenceNumbers the sequence numbers associated with the isr change notifications to be deleted. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. */ - def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = { + def deleteIsrChangeNotifications(sequenceNumbers: Seq[String], expectedControllerEpochZkVersion: Int): Unit = { val deleteRequests = sequenceNumbers.map { sequenceNumber => - DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), ZkVersion.MatchAnyVersion) + DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), ZkVersion.MatchAnyVersion, + zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)) } retryRequestsUntilConnected(deleteRequests) } @@ -897,9 +981,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Deletes the preferred replica election znode. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. */ - def deletePreferredReplicaElection(): Unit = { - val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, ZkVersion.MatchAnyVersion) + def deletePreferredReplicaElection(expectedControllerEpochZkVersion: Int): Unit = { + val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, ZkVersion.MatchAnyVersion, + zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)) retryRequestUntilConnected(deleteRequest) } @@ -919,9 +1005,11 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Deletes the controller znode. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. */ - def deleteController(): Unit = { - val deleteRequest = DeleteRequest(ControllerZNode.path, ZkVersion.MatchAnyVersion) + def deleteController(expectedControllerEpochZkVersion: Int): Unit = { + val deleteRequest = DeleteRequest(ControllerZNode.path, ZkVersion.MatchAnyVersion, + zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion)) retryRequestUntilConnected(deleteRequest) } @@ -944,17 +1032,20 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Recursively deletes the topic znode. * @param topic the topic whose topic znode we wish to delete. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. */ - def deleteTopicZNode(topic: String): Unit = { - deleteRecursive(TopicZNode.path(topic)) + def deleteTopicZNode(topic: String, expectedControllerEpochZkVersion: Int): Unit = { + deleteRecursive(TopicZNode.path(topic), expectedControllerEpochZkVersion) } /** * Deletes the topic configs for the given topics. * @param topics the topics whose configs we wish to delete. + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. */ - def deleteTopicConfigs(topics: Seq[String]): Unit = { - val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ZkVersion.MatchAnyVersion)) + def deleteTopicConfigs(topics: Seq[String], expectedControllerEpochZkVersion: Int): Unit = { + val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), + ZkVersion.MatchAnyVersion, zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))) retryRequestsUntilConnected(deleteRequests) } @@ -1403,18 +1494,19 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Deletes the given zk path recursively * @param path + * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. * @return true if path gets deleted successfully, false if root path doesn't exist * @throws KeeperException if there is an error while deleting the znodes */ - def deleteRecursive(path: String): Boolean = { + def deleteRecursive(path: String, expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion): Boolean = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path)) getChildrenResponse.resultCode match { case Code.OK => - getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child")) - val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, ZkVersion.MatchAnyVersion)) - if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) { + getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child", expectedControllerEpochZkVersion)) + val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, ZkVersion.MatchAnyVersion, + zkVersionCheck = controllerZkVersionCheck(expectedControllerEpochZkVersion))) + if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) throw deleteResponse.resultException.get - } true case Code.NONODE => false case _ => throw getChildrenResponse.resultException.get @@ -1468,18 +1560,18 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean } - private def createTopicPartition(partitions: Seq[TopicPartition]): Seq[CreateResponse] = { + private def createTopicPartition(partitions: Seq[TopicPartition], expectedControllerEpochZkVersion: Int): Seq[CreateResponse] = { val createRequests = partitions.map { partition => val path = TopicPartitionZNode.path(partition) - CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition)) + CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition), controllerZkVersionCheck(expectedControllerEpochZkVersion)) } retryRequestsUntilConnected(createRequests) } - private def createTopicPartitions(topics: Seq[String]): Seq[CreateResponse] = { + private def createTopicPartitions(topics: Seq[String], expectedControllerEpochZkVersion: Int):Seq[CreateResponse] = { val createRequests = topics.map { topic => val path = TopicPartitionsZNode.path(topic) - CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic)) + CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic), controllerZkVersionCheck(expectedControllerEpochZkVersion)) } retryRequestsUntilConnected(createRequests) } @@ -1513,13 +1605,16 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean requestResponsePairs.foreach { case (request, response) => if (response.resultCode == Code.CONNECTIONLOSS) remainingRequests += request - else + else { + maybeThrowControllerMoveException(response) responses += response + } } if (remainingRequests.nonEmpty) zooKeeperClient.waitUntilConnected() } else { + batchResponses.foreach(maybeThrowControllerMoveException) remainingRequests.clear() responses ++= batchResponses } @@ -1599,4 +1694,31 @@ object KafkaZkClient { time, metricGroup, metricType) new KafkaZkClient(zooKeeperClient, isSecure, time) } + + + private def controllerZkVersionCheck(version: Int): Option[ZkVersionCheck] = { + if (version < KafkaController.InitialControllerEpochZkVersion) + None + else + Some(ZkVersionCheck(ControllerEpochZNode.path, version)) + } + + private def maybeThrowControllerMoveException(response: AsyncResponse): Unit = { + response.zkVersionCheckResult match { + case Some(zkVersionCheckResult) => + val zkVersionCheck = zkVersionCheckResult.zkVersionCheck + if (zkVersionCheck.checkPath.equals(ControllerEpochZNode.path)) + zkVersionCheckResult.opResult match { + case errorResult: ErrorResult => + val errorCode = Code.get(errorResult.getErr) + if (errorCode == Code.BADVERSION) + // Throw ControllerMovedException when the zkVersionCheck is performed on the controller epoch znode and the check fails + throw new ControllerMovedException(s"Controller epoch zkVersion check fails. Expected zkVersion = ${zkVersionCheck.expectedZkVersion}") + else if (errorCode != Code.OK) + throw KeeperException.create(errorCode, zkVersionCheck.checkPath) + case _ => + } + case None => + } + } } diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 97ec9a44c36..59304143715 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -17,21 +17,23 @@ package kafka.zookeeper +import java.util import java.util.Locale import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} -import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, Semaphore, TimeUnit} +import java.util.concurrent._ import com.yammer.metrics.core.{Gauge, MetricName} import kafka.metrics.KafkaMetricsGroup import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock} import kafka.utils.{KafkaScheduler, Logging} import org.apache.kafka.common.utils.Time -import org.apache.zookeeper.AsyncCallback.{ACLCallback, Children2Callback, DataCallback, StatCallback, StringCallback, VoidCallback} +import org.apache.zookeeper.AsyncCallback._ import org.apache.zookeeper.KeeperException.Code +import org.apache.zookeeper.OpResult.{CreateResult, SetDataResult} import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState} import org.apache.zookeeper.ZooKeeper.States import org.apache.zookeeper.data.{ACL, Stat} -import org.apache.zookeeper.{CreateMode, KeeperException, WatchedEvent, Watcher, ZooKeeper} +import org.apache.zookeeper._ import scala.collection.JavaConverters._ import scala.collection.mutable.Set @@ -156,6 +158,10 @@ class ZooKeeperClient(connectString: String, responseQueue.asScala.toBuffer } } + + def createTransaction(): Transaction = { + zooKeeper.transaction() + } // Visibility to override for testing private[zookeeper] def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = { @@ -166,44 +172,76 @@ class ZooKeeperClient(connectString: String, val sendTimeMs = time.hiResClockMs() request match { - case ExistsRequest(path, ctx) => + case ExistsRequest(path, ctx, _) => zooKeeper.exists(path, shouldWatch(request), new StatCallback { override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = callback(ExistsResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs))) }, ctx.orNull) - case GetDataRequest(path, ctx) => + case GetDataRequest(path, ctx, _) => zooKeeper.getData(path, shouldWatch(request), new DataCallback { override def processResult(rc: Int, path: String, ctx: Any, data: Array[Byte], stat: Stat): Unit = callback(GetDataResponse(Code.get(rc), path, Option(ctx), data, stat, responseMetadata(sendTimeMs))) }, ctx.orNull) - case GetChildrenRequest(path, ctx) => + case GetChildrenRequest(path, ctx, _) => zooKeeper.getChildren(path, shouldWatch(request), new Children2Callback { override def processResult(rc: Int, path: String, ctx: Any, children: java.util.List[String], stat: Stat): Unit = callback(GetChildrenResponse(Code.get(rc), path, Option(ctx), Option(children).map(_.asScala).getOrElse(Seq.empty), stat, responseMetadata(sendTimeMs))) }, ctx.orNull) - case CreateRequest(path, data, acl, createMode, ctx) => - zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback { - override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit = - callback(CreateResponse(Code.get(rc), path, Option(ctx), name, responseMetadata(sendTimeMs))) - }, ctx.orNull) - case SetDataRequest(path, data, version, ctx) => - zooKeeper.setData(path, data, version, new StatCallback { - override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = - callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs))) - }, ctx.orNull) - case DeleteRequest(path, version, ctx) => - zooKeeper.delete(path, version, new VoidCallback { - override def processResult(rc: Int, path: String, ctx: Any): Unit = - callback(DeleteResponse(Code.get(rc), path, Option(ctx), responseMetadata(sendTimeMs))) - }, ctx.orNull) - case GetAclRequest(path, ctx) => + case CreateRequest(path, data, acl, createMode, ctx, zkVersionCheck) => + if (zkVersionCheck.isEmpty) + zooKeeper.create(path, data, acl.asJava, createMode, new StringCallback { + override def processResult(rc: Int, path: String, ctx: Any, name: String): Unit = + callback(CreateResponse(Code.get(rc), path, Option(ctx), name, responseMetadata(sendTimeMs))) + }, ctx.orNull) + else + zooKeeper.multi(Seq(zkVersionCheck.get.checkOp, Op.create(path, data, acl.asJava, createMode)).asJava, new MultiCallback { + override def processResult(rc: Int, multiOpPath: String, ctx: scala.Any, opResults: util.List[OpResult]): Unit = { + val (zkVersionCheckOpResult, requestOpResult) = (opResults.get(0), opResults.get(1)) + val name = requestOpResult match { + case c: CreateResult => c.getPath + case _ => null + } + callback(CreateResponse(Code.get(rc), path, Option(ctx), name, responseMetadata(sendTimeMs), + Some(ZkVersionCheckResult(zkVersionCheck.get, zkVersionCheckOpResult)))) + }}, ctx.orNull) + case SetDataRequest(path, data, version, ctx, zkVersionCheck) => + if (zkVersionCheck.isEmpty) + zooKeeper.setData(path, data, version, new StatCallback { + override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = + callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs))) + }, ctx.orNull) + else + zooKeeper.multi(Seq(zkVersionCheck.get.checkOp, Op.setData(path, data, version)).asJava, new MultiCallback { + override def processResult(rc: Int, multiOpPath: String, ctx: scala.Any, opResults: util.List[OpResult]): Unit = { + val (zkVersionCheckOpResult, requestOpResult) = (opResults.get(0), opResults.get(1)) + val stat = requestOpResult match { + case s: SetDataResult => s.getStat + case _ => null + } + callback(SetDataResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs), + Some(ZkVersionCheckResult(zkVersionCheck.get, zkVersionCheckOpResult)))) + }}, ctx.orNull) + case DeleteRequest(path, version, ctx, zkVersionCheck) => + if (zkVersionCheck.isEmpty) + zooKeeper.delete(path, version, new VoidCallback { + override def processResult(rc: Int, path: String, ctx: Any): Unit = + callback(DeleteResponse(Code.get(rc), path, Option(ctx), responseMetadata(sendTimeMs))) + }, ctx.orNull) + else + zooKeeper.multi(Seq(zkVersionCheck.get.checkOp, Op.delete(path, version)).asJava, new MultiCallback { + override def processResult(rc: Int, multiOpPath: String, ctx: scala.Any, opResults: util.List[OpResult]): Unit = { + val (zkVersionCheckOpResult, _) = (opResults.get(0), opResults.get(1)) + callback(DeleteResponse(Code.get(rc), path, Option(ctx), responseMetadata(sendTimeMs), + Some(ZkVersionCheckResult(zkVersionCheck.get, zkVersionCheckOpResult)))) + }}, ctx.orNull) + case GetAclRequest(path, ctx, _) => zooKeeper.getACL(path, null, new ACLCallback { override def processResult(rc: Int, path: String, ctx: Any, acl: java.util.List[ACL], stat: Stat): Unit = { callback(GetAclResponse(Code.get(rc), path, Option(ctx), Option(acl).map(_.asScala).getOrElse(Seq.empty), stat, responseMetadata(sendTimeMs))) - }}, ctx.orNull) - case SetAclRequest(path, acl, version, ctx) => + }}, ctx.orNull) + case SetAclRequest(path, acl, version, ctx, _) => zooKeeper.setACL(path, acl.asJava, version, new StatCallback { override def processResult(rc: Int, path: String, ctx: Any, stat: Stat): Unit = callback(SetAclResponse(Code.get(rc), path, Option(ctx), stat, responseMetadata(sendTimeMs))) @@ -329,7 +367,7 @@ class ZooKeeperClient(connectString: String, private[kafka] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) { zooKeeper } - + private def reinitialize(): Unit = { // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion // may require additional Zookeeper requests, which will block to acquire the initialization lock @@ -447,45 +485,54 @@ sealed trait AsyncRequest { type Response <: AsyncResponse def path: String def ctx: Option[Any] + def zkVersionCheck: Option[ZkVersionCheck] +} + +case class ZkVersionCheck(checkPath: String, expectedZkVersion: Int) { + def checkOp: Op = Op.check(checkPath, expectedZkVersion) } +case class ZkVersionCheckResult(zkVersionCheck: ZkVersionCheck, opResult: OpResult) + case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode, - ctx: Option[Any] = None) extends AsyncRequest { + ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest { type Response = CreateResponse } -case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None) extends AsyncRequest { +case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest { type Response = DeleteResponse } -case class ExistsRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { +case class ExistsRequest(path: String, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest { type Response = ExistsResponse } -case class GetDataRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { +case class GetDataRequest(path: String, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest { type Response = GetDataResponse } -case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None) extends AsyncRequest { +case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest { type Response = SetDataResponse } -case class GetAclRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { +case class GetAclRequest(path: String, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest { type Response = GetAclResponse } -case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: Option[Any] = None) extends AsyncRequest { +case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest { type Response = SetAclResponse } -case class GetChildrenRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { +case class GetChildrenRequest(path: String, ctx: Option[Any] = None, zkVersionCheck: Option[ZkVersionCheck] = None) extends AsyncRequest { type Response = GetChildrenResponse } + sealed abstract class AsyncResponse { def resultCode: Code def path: String def ctx: Option[Any] + def zkVersionCheckResult: Option[ZkVersionCheckResult] /** Return None if the result code is OK and KeeperException otherwise. */ def resultException: Option[KeeperException] = @@ -506,17 +553,22 @@ case class ResponseMetadata(sendTimeMs: Long, receivedTimeMs: Long) { def responseTimeMs: Long = receivedTimeMs - sendTimeMs } -case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String, metadata: ResponseMetadata) extends AsyncResponse -case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any], metadata: ResponseMetadata) extends AsyncResponse -case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, metadata: ResponseMetadata) extends AsyncResponse +case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String, + metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse +case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any], + metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse +case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, + metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], data: Array[Byte], stat: Stat, - metadata: ResponseMetadata) extends AsyncResponse -case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, metadata: ResponseMetadata) extends AsyncResponse + metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse +case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, + metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: Seq[ACL], stat: Stat, - metadata: ResponseMetadata) extends AsyncResponse -case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, metadata: ResponseMetadata) extends AsyncResponse + metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse +case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, + metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat, - metadata: ResponseMetadata) extends AsyncResponse + metadata: ResponseMetadata, zkVersionCheckResult: Option[ZkVersionCheckResult] = None) extends AsyncResponse class ZooKeeperClientException(message: String) extends RuntimeException(message) class ZooKeeperClientExpiredException(message: String) extends ZooKeeperClientException(message) diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index 4f40b27f019..b44c239cfbd 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -20,7 +20,7 @@ import kafka.common.AdminCommandFailedException import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.TestUtils._ import kafka.utils.{Logging, TestUtils} -import kafka.zk.{ReassignPartitionsZNode, ZooKeeperTestHarness} +import kafka.zk.{ReassignPartitionsZNode, ZkVersion, ZooKeeperTestHarness} import org.junit.Assert.{assertEquals, assertTrue} import org.junit.{After, Before, Test} import kafka.admin.ReplicationQuotaUtils._ @@ -613,7 +613,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { ) // Set znode directly to avoid non-existent topic validation - zkClient.setOrCreatePartitionReassignment(firstMove) + zkClient.setOrCreatePartitionReassignment(firstMove, ZkVersion.MatchAnyVersion) servers.foreach(_.startup()) waitForReassignmentToComplete() diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala index e5753e58030..e0a753cacaf 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala @@ -54,7 +54,7 @@ class ControllerEventManagerTest { val controllerStats = new ControllerStats val eventProcessedListenerCount = new AtomicInteger controllerEventManager = new ControllerEventManager(0, controllerStats.rateAndTimeMetrics, - _ => eventProcessedListenerCount.incrementAndGet) + _ => eventProcessedListenerCount.incrementAndGet, () => ()) controllerEventManager.start() val initialTimerCount = timer(metricName).count diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 5e5d84f9626..dc4076a27c7 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -17,23 +17,29 @@ package kafka.controller -import java.util.concurrent.LinkedBlockingQueue +import java.util.Properties +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue} import com.yammer.metrics.Metrics import com.yammer.metrics.core.Timer import kafka.api.LeaderAndIsr import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.TestUtils -import kafka.zk.{PreferredReplicaElectionZNode, ZooKeeperTestHarness} +import kafka.zk._ import org.junit.{After, Before, Test} import org.junit.Assert.{assertEquals, assertTrue} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.ControllerMovedException +import org.apache.log4j.Level +import kafka.utils.LogCaptureAppender import scala.collection.JavaConverters._ import scala.util.Try class ControllerIntegrationTest extends ZooKeeperTestHarness { var servers = Seq.empty[KafkaServer] + val firstControllerEpoch = KafkaController.InitialControllerEpoch + 1 + val firstControllerEpochZkVersion = KafkaController.InitialControllerEpochZkVersion + 1 @Before override def setUp() { @@ -51,30 +57,30 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { def testEmptyCluster(): Unit = { servers = makeServers(1) TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller") - waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch") + waitUntilControllerEpoch(firstControllerEpoch, "broker failed to set controller epoch") } @Test def testControllerEpochPersistsWhenAllBrokersDown(): Unit = { servers = makeServers(1) TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller") - waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch") + waitUntilControllerEpoch(firstControllerEpoch, "broker failed to set controller epoch") servers.head.shutdown() servers.head.awaitShutdown() TestUtils.waitUntilTrue(() => !zkClient.getControllerId.isDefined, "failed to kill controller") - waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "controller epoch was not persisted after broker failure") + waitUntilControllerEpoch(firstControllerEpoch, "controller epoch was not persisted after broker failure") } @Test def testControllerMoveIncrementsControllerEpoch(): Unit = { servers = makeServers(1) TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller") - waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch") + waitUntilControllerEpoch(firstControllerEpoch, "broker failed to set controller epoch") servers.head.shutdown() servers.head.awaitShutdown() servers.head.startup() TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller") - waitUntilControllerEpoch(KafkaController.InitialControllerEpoch + 1, "controller epoch was not incremented after controller move") + waitUntilControllerEpoch(firstControllerEpoch + 1, "controller epoch was not incremented after controller move") } @Test @@ -83,7 +89,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { val tp = new TopicPartition("t", 0) val assignment = Map(tp.partition -> Seq(0)) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, + waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, "failed to get expected partition state upon topic creation") } @@ -97,7 +103,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { val tp = new TopicPartition("t", 0) val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId)) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers.take(1)) - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch, + waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch, "failed to get expected partition state upon topic creation") } @@ -109,8 +115,8 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { val assignment = Map(tp0.partition -> Seq(0)) val expandedAssignment = Map(tp0 -> Seq(0), tp1 -> Seq(0)) TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers) - zkClient.setTopicAssignment(tp0.topic, expandedAssignment) - waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, + zkClient.setTopicAssignment(tp0.topic, expandedAssignment, firstControllerEpochZkVersion) + waitForPartitionState(tp1, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch, "failed to get expected partition state upon topic partition expansion") TestUtils.waitUntilMetadataIsPropagated(servers, tp1.topic, tp1.partition) } @@ -127,8 +133,8 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { TestUtils.createTopic(zkClient, tp0.topic, partitionReplicaAssignment = assignment, servers = servers) servers(otherBrokerId).shutdown() servers(otherBrokerId).awaitShutdown() - zkClient.setTopicAssignment(tp0.topic, expandedAssignment) - waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch, + zkClient.setTopicAssignment(tp0.topic, expandedAssignment, firstControllerEpochZkVersion) + waitForPartitionState(tp1, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch, "failed to get expected partition state upon topic partition expansion") TestUtils.waitUntilMetadataIsPropagated(Seq(servers(controllerId)), tp1.topic, tp1.partition) } @@ -147,7 +153,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { val reassignment = Map(tp -> Seq(otherBrokerId)) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) zkClient.createPartitionReassignment(reassignment) - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 3, + waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 3, "failed to get expected partition state after partition reassignment") TestUtils.waitUntilTrue(() => zkClient.getReplicaAssignmentForTopics(Set(tp.topic)) == reassignment, "failed to get updated partition assignment on topic znode after partition reassignment") @@ -169,8 +175,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) servers(otherBrokerId).shutdown() servers(otherBrokerId).awaitShutdown() - zkClient.setOrCreatePartitionReassignment(reassignment) - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, + val controller = getController() + zkClient.setOrCreatePartitionReassignment(reassignment, controller.kafkaController.controllerContext.epochZkVersion) + waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, "failed to get expected partition state during partition reassignment with offline replica") TestUtils.waitUntilTrue(() => zkClient.reassignPartitionsInProgress(), "partition reassignment path should remain while reassignment in progress") @@ -188,10 +195,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { servers(otherBrokerId).shutdown() servers(otherBrokerId).awaitShutdown() zkClient.createPartitionReassignment(reassignment) - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, + waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, "failed to get expected partition state during partition reassignment with offline replica") servers(otherBrokerId).startup() - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 4, + waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 4, "failed to get expected partition state after partition reassignment") TestUtils.waitUntilTrue(() => zkClient.getReplicaAssignmentForTopics(Set(tp.topic)) == reassignment, "failed to get updated partition assignment on topic znode after partition reassignment") @@ -235,7 +242,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { zkClient.createPreferredReplicaElection(Set(tp)) TestUtils.waitUntilTrue(() => !zkClient.pathExists(PreferredReplicaElectionZNode.path), "failed to remove preferred replica leader election path after giving up") - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, + waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, "failed to get expected partition state upon broker shutdown") } @@ -249,10 +256,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) servers(otherBrokerId).shutdown() servers(otherBrokerId).awaitShutdown() - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, + waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1, "failed to get expected partition state upon broker shutdown") servers(otherBrokerId).startup() - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 2, + waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 2, "failed to get expected partition state upon broker startup") } @@ -264,14 +271,14 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { val tp = new TopicPartition("t", 0) val assignment = Map(tp.partition -> Seq(otherBrokerId)) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch, + waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch, "failed to get expected partition state upon topic creation") servers(otherBrokerId).shutdown() servers(otherBrokerId).awaitShutdown() TestUtils.waitUntilTrue(() => { val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) leaderIsrAndControllerEpochMap.contains(tp) && - isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) && + isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), firstControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) && leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId) }, "failed to get expected partition state after entire isr went offline") } @@ -284,14 +291,14 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { val tp = new TopicPartition("t", 0) val assignment = Map(tp.partition -> Seq(otherBrokerId)) TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch, + waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch, "failed to get expected partition state upon topic creation") servers(1).shutdown() servers(1).awaitShutdown() TestUtils.waitUntilTrue(() => { val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) leaderIsrAndControllerEpochMap.contains(tp) && - isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) && + isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), firstControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) && leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId) }, "failed to get expected partition state after entire isr went offline") } @@ -341,18 +348,105 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0)) } + @Test + def testControllerMoveOnTopicCreation(): Unit = { + servers = makeServers(1) + TestUtils.waitUntilControllerElected(zkClient) + val tp = new TopicPartition("t", 0) + val assignment = Map(tp.partition -> Seq(0)) + + testControllerMove(() => { + val adminZkClient = new AdminZkClient(zkClient) + adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(tp.topic, assignment, new Properties()) + }) + } + + @Test + def testControllerMoveOnTopicDeletion(): Unit = { + servers = makeServers(1) + TestUtils.waitUntilControllerElected(zkClient) + val tp = new TopicPartition("t", 0) + val assignment = Map(tp.partition -> Seq(0)) + TestUtils.createTopic(zkClient, tp.topic(), assignment, servers) + + testControllerMove(() => { + val adminZkClient = new AdminZkClient(zkClient) + adminZkClient.deleteTopic(tp.topic()) + }) + } + + @Test + def testControllerMoveOnPreferredReplicaElection(): Unit = { + servers = makeServers(1) + val tp = new TopicPartition("t", 0) + val assignment = Map(tp.partition -> Seq(0)) + TestUtils.createTopic(zkClient, tp.topic(), assignment, servers) + + testControllerMove(() => zkClient.createPreferredReplicaElection(Set(tp))) + } + + @Test + def testControllerMoveOnPartitionReassignment(): Unit = { + servers = makeServers(1) + TestUtils.waitUntilControllerElected(zkClient) + val tp = new TopicPartition("t", 0) + val assignment = Map(tp.partition -> Seq(0)) + TestUtils.createTopic(zkClient, tp.topic(), assignment, servers) + + val reassignment = Map(tp -> Seq(0)) + testControllerMove(() => zkClient.createPartitionReassignment(reassignment)) + } + + private def testControllerMove(fun: () => Unit): Unit = { + val controller = getController().kafkaController + val appender = LogCaptureAppender.createAndRegister() + val previousLevel = LogCaptureAppender.setClassLoggerLevel(controller.eventManager.thread.getClass, Level.INFO) + + try { + TestUtils.waitUntilTrue(() => { + controller.eventManager.state == ControllerState.Idle + }, "Controller event thread is still busy") + + val latch = new CountDownLatch(1) + + // Let the controller event thread await on a latch before the pre-defined logic is triggered. + // This is used to make sure that when the event thread resumes and starts processing events, the controller has already moved. + controller.eventManager.put(KafkaController.AwaitOnLatch(latch)) + // Execute pre-defined logic. This can be topic creation/deletion, preferred leader election, etc. + fun() + + // Delete the controller path, re-create /controller znode to emulate controller movement + zkClient.deleteController(controller.controllerContext.epochZkVersion) + zkClient.registerControllerAndIncrementControllerEpoch(servers.size) + + // Resume the controller event thread. At this point, the controller should see mismatch controller epoch zkVersion and resign + latch.countDown() + TestUtils.waitUntilTrue(() => !controller.isActive, "Controller fails to resign") + + // Expect to capture the ControllerMovedException in the log of ControllerEventThread + val event = appender.getMessages.find(e => e.getLevel == Level.INFO + && e.getThrowableInformation != null + && e.getThrowableInformation.getThrowable.getClass.getName.equals(new ControllerMovedException("").getClass.getName)) + assertTrue(event.isDefined) + + } finally { + LogCaptureAppender.unregister(appender) + LogCaptureAppender.setClassLoggerLevel(controller.eventManager.thread.getClass, previousLevel) + } + } + private def preferredReplicaLeaderElection(controllerId: Int, otherBroker: KafkaServer, tp: TopicPartition, replicas: Set[Int], leaderEpoch: Int): Unit = { otherBroker.shutdown() otherBroker.awaitShutdown() - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, leaderEpoch + 1, + waitForPartitionState(tp, firstControllerEpoch, controllerId, leaderEpoch + 1, "failed to get expected partition state upon broker shutdown") otherBroker.startup() TestUtils.waitUntilTrue(() => zkClient.getInSyncReplicasForPartition(new TopicPartition(tp.topic, tp.partition)).get.toSet == replicas, "restarted broker failed to join in-sync replicas") zkClient.createPreferredReplicaElection(Set(tp)) TestUtils.waitUntilTrue(() => !zkClient.pathExists(PreferredReplicaElectionZNode.path), "failed to remove preferred replica leader election path after completion") - waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBroker.config.brokerId, leaderEpoch + 2, + waitForPartitionState(tp, firstControllerEpoch, otherBroker.config.brokerId, leaderEpoch + 2, "failed to get expected partition state upon broker startup") } @@ -395,4 +489,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { .getOrElse(fail(s"Unable to find metric $metricName")).asInstanceOf[Timer] } + private def getController(): KafkaServer = { + val controllerId = TestUtils.waitUntilControllerElected(zkClient) + servers.filter(s => s.config.brokerId == controllerId).head + } + } diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala index 6a587f3bd34..b89632e0a83 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala @@ -22,7 +22,7 @@ import kafka.server.KafkaConfig import kafka.utils.TestUtils import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode} import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult -import kafka.zookeeper.{CreateResponse, GetDataResponse, ResponseMetadata, ZooKeeperClientException} +import kafka.zookeeper._ import org.apache.kafka.common.TopicPartition import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.data.Stat @@ -85,7 +85,7 @@ class PartitionStateMachineTest extends JUnitSuite { partitionState.put(partition, NewPartition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) - EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch))) + EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion)) .andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null, ResponseMetadata(0, 0)))) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true)) @@ -103,7 +103,7 @@ class PartitionStateMachineTest extends JUnitSuite { partitionState.put(partition, NewPartition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) - EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch))) + EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion)) .andThrow(new ZooKeeperClientException("test")) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) @@ -119,7 +119,7 @@ class PartitionStateMachineTest extends JUnitSuite { partitionState.put(partition, NewPartition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) - EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch))) + EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch), controllerContext.epochZkVersion)) .andReturn(Seq(CreateResponse(Code.NODEEXISTS, null, Some(partition), null, ResponseMetadata(0, 0)))) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) @@ -159,7 +159,7 @@ class PartitionStateMachineTest extends JUnitSuite { val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) - EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch)) + EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion)) .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false)) @@ -190,7 +190,7 @@ class PartitionStateMachineTest extends JUnitSuite { val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId)) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) - EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch)) + EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion)) .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId), partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId, otherBrokerId), @@ -243,7 +243,7 @@ class PartitionStateMachineTest extends JUnitSuite { .andReturn((Map(partition.topic -> LogConfig()), Map.empty)) val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) - EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch)) + EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion)) .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false)) @@ -336,7 +336,7 @@ class PartitionStateMachineTest extends JUnitSuite { val updatedLeaderAndIsr = partitions.map { partition => partition -> leaderAndIsr.newLeaderAndIsr(brokerId, List(brokerId)) }.toMap - EasyMock.expect(mockZkClient.updateLeaderAndIsr(updatedLeaderAndIsr, controllerEpoch)) + EasyMock.expect(mockZkClient.updateLeaderAndIsr(updatedLeaderAndIsr, controllerEpoch, controllerContext.epochZkVersion)) .andReturn(UpdateLeaderAndIsrResult(updatedLeaderAndIsr, Seq.empty, Map.empty)) } prepareMockToUpdateLeaderAndIsr() diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala index c573c9f041e..ef274fa4fa7 100644 --- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala @@ -183,7 +183,7 @@ class ReplicaStateMachineTest extends JUnitSuite { EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)).andReturn( Seq(GetDataResponse(Code.OK, null, Some(partition), TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0)))) - EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch)) + EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch, controllerContext.epochZkVersion)) .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) EasyMock.expect(mockTopicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)).andReturn(false) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId), diff --git a/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala new file mode 100644 index 00000000000..80472e9b19f --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import org.apache.log4j.{AppenderSkeleton, Level, Logger} +import org.apache.log4j.spi.LoggingEvent + +import scala.collection.mutable.ListBuffer + +class LogCaptureAppender extends AppenderSkeleton { + private val events: ListBuffer[LoggingEvent] = ListBuffer.empty + + override protected def append(event: LoggingEvent): Unit = { + events.synchronized { + events += event + } + } + + def getMessages: ListBuffer[LoggingEvent] = { + events.synchronized { + return events.clone() + } + } + + override def close(): Unit = { + events.synchronized { + events.clear() + } + } + + override def requiresLayout: Boolean = false +} + +object LogCaptureAppender { + def createAndRegister(): LogCaptureAppender = { + val logCaptureAppender: LogCaptureAppender = new LogCaptureAppender + Logger.getRootLogger.addAppender(logCaptureAppender) + logCaptureAppender + } + + def setClassLoggerLevel(clazz: Class[_], logLevel: Level): Level = { + val logger = Logger.getLogger(clazz) + val previousLevel = logger.getLevel + Logger.getLogger(clazz).setLevel(logLevel) + previousLevel + } + + def unregister(logCaptureAppender: LogCaptureAppender): Unit = { + Logger.getRootLogger.removeAppender(logCaptureAppender) + } +} diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala index 3389161e78b..65273eb01a5 100644 --- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala @@ -20,7 +20,7 @@ package kafka.utils import kafka.server.{KafkaConfig, ReplicaFetcherManager} import kafka.api.LeaderAndIsr import kafka.controller.LeaderIsrAndControllerEpoch -import kafka.zk.{IsrChangeNotificationZNode, TopicZNode, ZooKeeperTestHarness} +import kafka.zk._ import org.apache.kafka.common.TopicPartition import org.junit.Assert._ import org.junit.{Before, Test} @@ -42,7 +42,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness { val topicPartition = new TopicPartition(topic, partition) val leaderAndIsr = LeaderAndIsr(leader, leaderEpoch, isr, 1) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) - zkClient.createTopicPartitionStatesRaw(Map(topicPartition -> leaderIsrAndControllerEpoch)) + zkClient.createTopicPartitionStatesRaw(Map(topicPartition -> leaderIsrAndControllerEpoch), ZkVersion.MatchAnyVersion) } @Test diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 2ca3a6c986d..3e8b8046d80 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -36,7 +36,7 @@ import kafka.server._ import kafka.server.checkpoints.OffsetCheckpointFile import Implicits._ import kafka.controller.LeaderIsrAndControllerEpoch -import kafka.zk.{AdminZkClient, BrokerIdsZNode, BrokerInfo, KafkaZkClient} +import kafka.zk._ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.{AdminClient, AlterConfigsResult, Config, ConfigEntry} import org.apache.kafka.clients.consumer._ @@ -635,7 +635,7 @@ object TestUtils extends Logging { .getOrElse(LeaderAndIsr(leader, List(leader))) topicPartition -> LeaderIsrAndControllerEpoch(newLeaderAndIsr, controllerEpoch) } - zkClient.setTopicPartitionStatesRaw(newLeaderIsrAndControllerEpochs) + zkClient.setTopicPartitionStatesRaw(newLeaderIsrAndControllerEpochs, ZkVersion.MatchAnyVersion) } /** diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 9cffb517c09..61ca3bbb6f8 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -42,6 +42,7 @@ import scala.util.Random import kafka.controller.LeaderIsrAndControllerEpoch import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zookeeper._ +import org.apache.kafka.common.errors.ControllerMovedException import org.apache.kafka.common.security.JaasUtils import org.apache.zookeeper.data.Stat @@ -55,12 +56,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { val topicPartition11 = new TopicPartition(topic1, 1) val topicPartition20 = new TopicPartition(topic2, 0) val topicPartitions10_11 = Seq(topicPartition10, topicPartition11) + val controllerEpochZkVersion = 0 var otherZkClient: KafkaZkClient = _ @Before override def setUp(): Unit = { super.setUp() + zkClient.createControllerEpochRaw(1) otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) } @@ -69,6 +72,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { override def tearDown(): Unit = { if (otherZkClient != null) otherZkClient.close() + zkClient.deletePath(ControllerEpochZNode.path) super.tearDown() } @@ -99,15 +103,28 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { zkClient.createRecursive("/delete/some/random/path") assertTrue(zkClient.pathExists("/delete/some/random/path")) - zkClient.deleteRecursive("/delete") - assertFalse(zkClient.pathExists("/delete/some/random/path")) - assertFalse(zkClient.pathExists("/delete/some/random")) - assertFalse(zkClient.pathExists("/delete/some")) + assertTrue(zkClient.deleteRecursive("/delete")) assertFalse(zkClient.pathExists("/delete")) intercept[IllegalArgumentException](zkClient.deleteRecursive("delete-invalid-path")) } + @Test + def testDeleteRecursiveWithControllerEpochVersionCheck(): Unit = { + assertFalse(zkClient.deleteRecursive("/delete/does-not-exist", controllerEpochZkVersion)) + + zkClient.createRecursive("/delete/some/random/path") + assertTrue(zkClient.pathExists("/delete/some/random/path")) + intercept[ControllerMovedException]( + zkClient.deleteRecursive("/delete", controllerEpochZkVersion + 1)) + + assertTrue(zkClient.deleteRecursive("/delete", controllerEpochZkVersion)) + assertFalse(zkClient.pathExists("/delete")) + + intercept[IllegalArgumentException](zkClient.deleteRecursive( + "delete-invalid-path", controllerEpochZkVersion)) + } + @Test def testCreateRecursive() { zkClient.createRecursive("/create-newrootpath") @@ -268,7 +285,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testIsrChangeNotificationsDeletion(): Unit = { // Should not fail even if parent node does not exist - zkClient.deleteIsrChangeNotifications(Seq("0000000000")) + zkClient.deleteIsrChangeNotifications(Seq("0000000000"), controllerEpochZkVersion) zkClient.createRecursive("/isr_change_notification") @@ -276,13 +293,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { zkClient.propagateIsrChanges(Set(topicPartition10)) zkClient.propagateIsrChanges(Set(topicPartition11)) - zkClient.deleteIsrChangeNotifications(Seq("0000000001")) + // Should throw exception if the controllerEpochZkVersion does not match + intercept[ControllerMovedException](zkClient.deleteIsrChangeNotifications(Seq("0000000001"), controllerEpochZkVersion + 1)) + // Delete should not succeed + assertEquals(Set("0000000000", "0000000001", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet) + + zkClient.deleteIsrChangeNotifications(Seq("0000000001"), controllerEpochZkVersion) // Should not fail if called on a non-existent notification - zkClient.deleteIsrChangeNotifications(Seq("0000000001")) + zkClient.deleteIsrChangeNotifications(Seq("0000000001"), controllerEpochZkVersion) assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet) - zkClient.deleteIsrChangeNotifications() - assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications) + zkClient.deleteIsrChangeNotifications(controllerEpochZkVersion) + assertEquals(Seq.empty, zkClient.getAllIsrChangeNotifications) } @Test @@ -335,7 +357,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testLogDirEventNotificationsDeletion(): Unit = { // Should not fail even if parent node does not exist - zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002")) + zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"), controllerEpochZkVersion) zkClient.createRecursive("/log_dir_event_notification") @@ -346,13 +368,16 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { zkClient.propagateLogDirEvent(brokerId) zkClient.propagateLogDirEvent(anotherBrokerId) - zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002")) + intercept[ControllerMovedException](zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"), controllerEpochZkVersion + 1)) + assertEquals(Seq("0000000000", "0000000001", "0000000002"), zkClient.getAllLogDirEventNotifications) + + zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"), controllerEpochZkVersion) assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications) zkClient.propagateLogDirEvent(anotherBrokerId) - zkClient.deleteLogDirEventNotifications() + zkClient.deleteLogDirEventNotifications(controllerEpochZkVersion) assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications) } @@ -368,14 +393,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { new TopicPartition("topic_b", 0) -> Seq(4, 5), new TopicPartition("topic_c", 0) -> Seq(5, 3) ) - zkClient.setOrCreatePartitionReassignment(reassignment) + + // Should throw ControllerMovedException if the controller epoch zkVersion does not match + intercept[ControllerMovedException](zkClient.setOrCreatePartitionReassignment(reassignment, controllerEpochZkVersion + 1)) + + zkClient.setOrCreatePartitionReassignment(reassignment, controllerEpochZkVersion) assertEquals(reassignment, zkClient.getPartitionReassignment) - val updatedReassingment = reassignment - new TopicPartition("topic_b", 0) - zkClient.setOrCreatePartitionReassignment(updatedReassingment) - assertEquals(updatedReassingment, zkClient.getPartitionReassignment) + val updatedReassignment = reassignment - new TopicPartition("topic_b", 0) + zkClient.setOrCreatePartitionReassignment(updatedReassignment, controllerEpochZkVersion) + assertEquals(updatedReassignment, zkClient.getPartitionReassignment) - zkClient.deletePartitionReassignment() + zkClient.deletePartitionReassignment(controllerEpochZkVersion) assertEquals(Map.empty, zkClient.getPartitionReassignment) zkClient.createPartitionReassignment(reassignment) @@ -513,9 +542,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testDeleteTopicZNode(): Unit = { - zkClient.deleteTopicZNode(topic1) + zkClient.deleteTopicZNode(topic1, controllerEpochZkVersion) zkClient.createRecursive(TopicZNode.path(topic1)) - zkClient.deleteTopicZNode(topic1) + zkClient.deleteTopicZNode(topic1, controllerEpochZkVersion) assertFalse(zkClient.pathExists(TopicZNode.path(topic1))) } @@ -530,7 +559,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertTrue(zkClient.isTopicMarkedForDeletion(topic1)) assertEquals(Set(topic1, topic2), zkClient.getTopicDeletions.toSet) - zkClient.deleteTopicDeletions(Seq(topic1, topic2)) + intercept[ControllerMovedException](zkClient.deleteTopicDeletions(Seq(topic1, topic2), controllerEpochZkVersion + 1)) + assertEquals(Set(topic1, topic2), zkClient.getTopicDeletions.toSet) + + zkClient.deleteTopicDeletions(Seq(topic1, topic2), controllerEpochZkVersion) assertTrue(zkClient.getTopicDeletions.isEmpty) } @@ -564,7 +596,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps) assertEquals(Set(topic1, topic2), zkClient.getAllEntitiesWithConfig(ConfigType.Topic).toSet) - zkClient.deleteTopicConfigs(Seq(topic1, topic2)) + zkClient.deleteTopicConfigs(Seq(topic1, topic2), controllerEpochZkVersion) assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty) } @@ -742,22 +774,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { Map( topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"), topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")), - zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4)) + zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4, controllerEpochZkVersion)) + + zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion) - zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs) + // Mismatch controller epoch zkVersion + intercept[ControllerMovedException](zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4, controllerEpochZkVersion + 1)) + // successful updates checkUpdateLeaderAndIsrResult( leaderIsrs(state = 1, zkVersion = 1), mutable.ArrayBuffer.empty, Map.empty, - zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4)) + zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4, controllerEpochZkVersion)) // Try to update with wrong ZK version checkUpdateLeaderAndIsrResult( Map.empty, ArrayBuffer(topicPartition10, topicPartition11), Map.empty, - zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4)) + zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4, controllerEpochZkVersion)) // Trigger successful, to be retried and failed partitions in same call val mixedState = Map( @@ -770,7 +806,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { ArrayBuffer(topicPartition11), Map( topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")), - zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4)) + zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4, controllerEpochZkVersion)) } private def checkGetDataResponse( @@ -786,9 +822,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion))) } - private def eraseMetadata(response: CreateResponse): CreateResponse = - response.copy(metadata = ResponseMetadata(0, 0)) - + private def eraseUncheckedInfoInCreateResponse(response: CreateResponse): CreateResponse = + response.copy(metadata = ResponseMetadata(0, 0), zkVersionCheckResult = None) + @Test def testGetTopicsAndPartitions(): Unit = { assertTrue(zkClient.getAllTopicsInCluster.isEmpty) @@ -800,7 +836,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertTrue(zkClient.getAllPartitions.isEmpty) - zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs) + zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion) assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions) } @@ -808,14 +844,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { def testCreateAndGetTopicPartitionStatesRaw(): Unit = { zkClient.createRecursive(TopicZNode.path(topic1)) + // Mismatch controller epoch zkVersion + intercept[ControllerMovedException](zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion + 1)) + assertEquals( Seq( CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10), TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)), CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11), TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))), - zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs) - .map(eraseMetadata).toList) + zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion) + .map(eraseUncheckedInfoInCreateResponse).toList) val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11) assertEquals(2, getResponses.size) @@ -824,11 +863,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { // Trying to create existing topicPartition states fails assertEquals( Seq( - CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10), - null, ResponseMetadata(0, 0)), - CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11), - null, ResponseMetadata(0, 0))), - zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList) + CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10), null, ResponseMetadata(0, 0)), + CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11), null, ResponseMetadata(0, 0))), + zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion).map(eraseUncheckedInfoInCreateResponse).toList) } @Test @@ -837,7 +874,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) = topicPartitions.map { topicPartition => SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition), - Some(topicPartition), stat, ResponseMetadata(0, 0)) + Some(topicPartition), stat, ResponseMetadata(0, 0), None) } zkClient.createRecursive(TopicZNode.path(topic1)) @@ -845,16 +882,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { // Trying to set non-existing topicPartition's data results in NONODE responses assertEquals( expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null), - zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map { - _.copy(metadata = ResponseMetadata(0, 0))}.toList) + zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion).map { + _.copy(metadata = ResponseMetadata(0, 0), zkVersionCheckResult = None)}.toList) - zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs) + zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion) assertEquals( expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)), - zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0)).map { - eraseMetadataAndStat}.toList) + zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), controllerEpochZkVersion).map { + eraseUncheckedInfoInSetDataResponse}.toList) + // Mismatch controller epoch zkVersion + intercept[ControllerMovedException](zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), controllerEpochZkVersion + 1)) val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11) assertEquals(2, getResponses.size) @@ -863,8 +902,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { // Other ZK client can also write the state of a partition assertEquals( expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)), - otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1)).map { - eraseMetadataAndStat}.toList) + otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1), controllerEpochZkVersion).map { + eraseUncheckedInfoInSetDataResponse}.toList) } @Test @@ -881,7 +920,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { zkClient.createRecursive(TopicZNode.path(topic1)) - zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs) + zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs, controllerEpochZkVersion) assertEquals( initialLeaderIsrAndControllerEpochs, zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11)) @@ -906,36 +945,38 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { } - private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = { + private def eraseUncheckedInfoInSetDataResponse(response: SetDataResponse): SetDataResponse = { val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null - response.copy(metadata = ResponseMetadata(0, 0), stat = stat) + response.copy(metadata = ResponseMetadata(0, 0), stat = stat, zkVersionCheckResult = None) } @Test def testControllerEpochMethods(): Unit = { + zkClient.deletePath(ControllerEpochZNode.path) + assertEquals(None, zkClient.getControllerEpoch) assertEquals("Setting non existing nodes should return NONODE results", SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)), - eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0))) + eraseUncheckedInfoInSetDataResponse(zkClient.setControllerEpochRaw(1, 0))) assertEquals("Creating non existing nodes is OK", CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)), - eraseMetadata(zkClient.createControllerEpochRaw(0))) + eraseUncheckedInfoInCreateResponse(zkClient.createControllerEpochRaw(0))) assertEquals(0, zkClient.getControllerEpoch.get._1) assertEquals("Attemt to create existing nodes should return NODEEXISTS", CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)), - eraseMetadata(zkClient.createControllerEpochRaw(0))) + eraseUncheckedInfoInCreateResponse(zkClient.createControllerEpochRaw(0))) assertEquals("Updating existing nodes is OK", SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)), - eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0))) + eraseUncheckedInfoInSetDataResponse(zkClient.setControllerEpochRaw(1, 0))) assertEquals(1, zkClient.getControllerEpoch.get._1) assertEquals("Updating with wrong ZK version returns BADVERSION", SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)), - eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0))) + eraseUncheckedInfoInSetDataResponse(zkClient.setControllerEpochRaw(1, 0))) } @Test @@ -943,9 +984,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { // No controller assertEquals(None, zkClient.getControllerId) // Create controller - zkClient.registerController(controllerId = 1, timestamp = 123456) + val (_, newEpochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(controllerId = 1) assertEquals(Some(1), zkClient.getControllerId) - zkClient.deleteController() + zkClient.deleteController(newEpochZkVersion) assertEquals(None, zkClient.getControllerId) } @@ -1002,7 +1043,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { zkClient.createPreferredReplicaElection(electionPartitions) } - zkClient.deletePreferredReplicaElection() + // Mismatch controller epoch zkVersion + intercept[ControllerMovedException](zkClient.deletePreferredReplicaElection(controllerEpochZkVersion + 1)) + assertEquals(electionPartitions, zkClient.getPreferredReplicaElection) + + zkClient.deletePreferredReplicaElection(controllerEpochZkVersion) assertTrue(zkClient.getPreferredReplicaElection.isEmpty) } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > consider fencing zookeeper updates with controller epoch zkVersion > ------------------------------------------------------------------ > > Key: KAFKA-6082 > URL: https://issues.apache.org/jira/browse/KAFKA-6082 > Project: Kafka > Issue Type: Sub-task > Reporter: Onur Karaman > Assignee: Zhanxiang (Patrick) Huang > Priority: Major > Fix For: 2.1.0 > > > > Kafka controller may fail to function properly (even after repeated > controller movement) due to the following sequence of events: > - User requests topic deletion > - Controller A deletes the partition znode > - Controller B becomes controller and reads the topic znode > - Controller A deletes the topic znode and remove the topic from the topic > deletion znode > - Controller B reads the partition znode and topic deletion znode > - According to controller B's context, the topic znode exists, the topic is > not listed for deletion, and some partition is not found for the given topic. > Then controller B will create topic znode with empty data (i.e. partition > assignment) and create the partition znodes. > - All controller after controller B will fail because there is not data in > the topic znode. > The long term solution is to have a way to prevent old controller from > writing to zookeeper if it is not the active controller. One idea is to use > the zookeeper multi API (See > [https://zookeeper.apache.org/doc/r3.4.3/api/org/apache/zookeeper/ZooKeeper.html#multi(java.lang.Iterable))] > such that controller only writes to zookeeper if the zk version of the > controller epoch znode has not been changed. > The short term solution is to let controller reads the topic deletion znode > first. If the topic is still listed in the topic deletion znode, then the new > controller will properly handle partition states of this topic without > creating partition znodes for this topic. And if the topic is not listed in > the topic deletion znode, then both the topic znode and the partition znodes > of this topic should have been deleted by the time the new controller tries > to read them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)