[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dmitry Bugaychenko updated KAFKA-2029: -------------------------------------- Attachment: KAFKA-2029.patch > Improving controlled shutdown for rolling updates > ------------------------------------------------- > > Key: KAFKA-2029 > URL: https://issues.apache.org/jira/browse/KAFKA-2029 > Project: Kafka > Issue Type: Improvement > Components: controller > Affects Versions: 0.8.1.1 > Reporter: Dmitry Bugaychenko > Assignee: Neha Narkhede > Priority: Critical > Attachments: KAFKA-2029.patch, KAFKA-2029.patch > > > Controlled shutdown as implemented currently can cause numerous problems: > deadlocks, local and global datalos, partitions without leader and etc. In > some cases the only way to restore cluster is to stop it completelly using > kill -9 and start again. > Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase > queue size makes things much worse (see discussion there). > Note 2: The problems described here can occure in any setup, but they are > extremly painful in setup with large brokers (36 disks, 1000+ assigned > partitions per broker in our case). > Note 3: These improvements are actually workarounds and it is worth to > consider global refactoring of the controller (make it single thread, or even > get rid of it in the favour of ZK leader elections for partitions). > The problems and improvements are: > # Controlled shutdown takes a long time (10+ minutes), broker sends multiple > shutdown requests and finally considers it as failed and procedes to unclean > shutdow, controller got stuck for a while (holding a lock waiting for free > space in controller-to-broker queue). After broker starts back it receives > followers request and erases highwatermarks (with a message that "replica > does not exists" - controller hadn't yet sent a request with replica > assignment), then controller starts replicas on the broker it deletes all > local data (due to missing highwatermarks). Furthermore, controller starts > processing pending shutdown request and stops replicas on the broker letting > it in a non-functional state. Solution to the problem might be to increase > time broker waits for controller reponse to shutdown request, but this > timeout is taken from controller.socket.timeout.ms which is global for all > broker-controller communication and setting it to 30 minutes is dangerous. > *Proposed solution: introduce dedicated config parameter for this timeout > with a high default*. > # If a broker gets down during controlled shutdown and did not come back > controller got stuck in a deadlock (one thread owns the lock and tries to add > message to the dead broker's queue, send thread is a infinite loop trying to > retry message to the dead broker, and the broker failure handler is waiting > for a lock). There are numerous partitions without a leader and the only way > out is to kill -9 the controller. *Proposed solution: add timeout for adding > message to broker's queue*. ControllerChannelManager.sendRequest: > {code} > def sendRequest(brokerId : Int, request : RequestOrResponse, callback: > (RequestOrResponse) => Unit = null) { > brokerLock synchronized { > val stateInfoOpt = brokerStateInfo.get(brokerId) > stateInfoOpt match { > case Some(stateInfo) => > // ODKL Patch: prevent infinite hang on trying to send message to a > dead broker. > // TODO: Move timeout to config > if (!stateInfo.messageQueue.offer((request, callback), 10, > TimeUnit.SECONDS)) { > error("Timed out trying to send message to broker " + > brokerId.toString) > // Do not throw, as it brings controller into completely > non-functional state > // "Controller to broker state change requests batch is not empty > while creating a new one" > //throw new IllegalStateException("Timed out trying to send > message to broker " + brokerId.toString) > } > case None => > warn("Not sending request %s to broker %d, since it is > offline.".format(request, brokerId)) > } > } > } > {code} > # When broker which is a controler starts shut down if auto leader rebalance > is running it deadlocks in the end (shutdown thread owns the lock and waits > for rebalance thread to exit and rebalance thread wait for lock). *Proposed > solution: use bounded wait in rebalance thread*. KafkaController.scala: > {code} > // ODKL Patch to prevent deadlocks in shutdown. > /** > * Execute the given function inside the lock > */ > def inLockIfRunning[T](lock: ReentrantLock)(fun: => T): T = { > if (isRunning || lock.isHeldByCurrentThread) { > // TODO: Configure timeout. > if (!lock.tryLock(10, TimeUnit.SECONDS)) { > throw new IllegalStateException("Failed to acquire controller lock in > 10 seconds."); > } > try { > return fun > } finally { > lock.unlock() > } > } else { > throw new IllegalStateException("Controller is not running, not allowed > to lock.") > } > } > private def checkAndTriggerPartitionRebalance(): Unit = { > // Use inLockIfRunning here instead of inLock > } > {code} > # Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector > act in a way that they prefer the oldes replica in ISR (the one that joined > the ISR first). In case of rolling update it means moving partitions to the > tail which increases the overal amount of movements and finally significantly > overloads the last broker (with 4 brokers and RF 3 the last one gets 3/4 of > leadership). In case of multiple failures this logic can cuase a significant > disbalance in the leadership. *Proposed solution: Move leadership to preferd > replica if possible or to the younges replica (in controlled shutdown) or > second prefered replica (in offline partition)*: > {code} > class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) > extends PartitionLeaderSelector with Logging { > ... > // ODKL Patch: Trying to select replica from ISR not depending on > ISR join order, but following the > // assignment order. Preferred replica is the first one, thus if > possible it'll be chosen, but most > // probably it is the dead one, thus we fallback to second > preferred replica. Here we do not care about > // overloading second preferred replica as we do not expect > rolling crashed. > val newLeader = liveBrokersInIsr.sortBy(x => > assignedReplicas.indexOf(x)).head > ... > } > class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) > extends PartitionLeaderSelector > with Logging { > ... > // ODKL Patch: Trying to select replica from ISR not depending on ISR > join order. If preferred replica is in ISR, choose > // it, choose the last replica from ISR - it is expected to be the > youngest (most probably already survived rolling > // update) > val newLeaderOpt = if (newIsr.contains(assignedReplicas.head)) > assignedReplicas.headOption else newIsr.lastOption > ... > } > {code} > # Auto leader rebalance started simultaneously with controlled shutdown > compete with it for space in queue and can slow down the process. If the > queue size is large it could also create a significant data loss (for few > minutes there might be multiple brokers considering itself as a leader and > accepting produce requests). *Proposed solution: add throttling to the auto > rebalance*: > {code} > private def checkAndTriggerPartitionRebalance(): Unit = { > ... > if (imbalanceRatio > > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { > info("Balancing broker " + leaderBroker + " with imbalance rate " > + imbalanceRatio) > topicsNotInPreferredReplica.foreach { > case (topicPartition, replicas) => { > inLockIfRunning(controllerContext.controllerLock) { > // do this check only if the broker is live and there are > no partitions being reassigned currently > // and preferred replica election is not in progress > if (controllerContext.liveBrokerIds.contains(leaderBroker) > && > controllerContext.partitionsBeingReassigned.size == 0 && > > controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 && > > !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) && > > !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) && > > controllerContext.allTopics.contains(topicPartition.topic)) { > onPreferredReplicaElection(Set(topicPartition), true) > } > } > // ODKL patch: prevent too fast prefered replica elections. > // TODO: Make configurable/use true throttling > Utils.swallow(Thread.sleep(2000)) > } > } > info("Balancing broker " + leaderBroker + " done") > } > ... > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)