Dmitry Bugaychenko created KAFKA-2029:
-----------------------------------------

             Summary: Improving controlled shutdown for rolling updates
                 Key: KAFKA-2029
                 URL: https://issues.apache.org/jira/browse/KAFKA-2029
             Project: Kafka
          Issue Type: Improvement
          Components: controller
    Affects Versions: 0.8.1.1
            Reporter: Dmitry Bugaychenko
            Assignee: Neha Narkhede
            Priority: Critical


Controlled shutdown as implemented currently can cause numerous problems: 
deadlocks, local and global datalos, partitions without leader and etc. In some 
cases the only way to restore cluster is to stop it completelly using kill -9 
and start again.

Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue 
size makes things much worse (see discussion there).

Note 2: The problems described here can occure in any setup, but they are 
extremly painful in setup with large brokers (36 disks, 1000+ assigned 
partitions per broker in our case).

The problems are:

# Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
shutdown requests and finally considers it as failed and procedes to unclean 
shutdow, controller got stuck for a while (holding a lock waiting for free 
space in controller-to-broker queue). After broker starts back it receives 
followers request and erases highwatermarks (with a message that "replica does 
not exists" - controller hadn't yet sent a request with replica assignment), 
then controller starts replicas on the broker it deletes all local data (due to 
missing highwatermarks). Furthermore, controller starts processing pending 
shutdown request and stops replicas on the broker letting it in a 
non-functional state. Solution to the problem might be to increase time broker 
waits for controller reponse to shutdown request, but this timeout is taken 
from controller.socket.timeout.ms which is global for all broker-controller 
communication and setting it to 30 minutes is dangerous. *Proposed solution: 
introduce dedicated config parameter for this timeout with a high default*.
# If a broker gets down during controlled shutdown and did not come back 
controller got stuck in a deadlock (one thread owns the lock and tries to add 
message to the dead broker's queue, send thread is a infinite loop trying to 
retry message to the dead broker, and the broker failure handler is waiting for 
a lock). There are numerous partitions without a leader and the only way out is 
to kill -9 the controller. *Proposed solution: add timeout for adding message 
to broker's queue*. ControllerChannelManager.sendRequest:
{code}
  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
(RequestOrResponse) => Unit = null) {
    brokerLock synchronized {
      val stateInfoOpt = brokerStateInfo.get(brokerId)
      stateInfoOpt match {
        case Some(stateInfo) =>
          // ODKL Patch: prevent infinite hang on trying to send message to a 
dead broker.
          // TODO: Move timeout to config
          if (!stateInfo.messageQueue.offer((request, callback), 10, 
TimeUnit.SECONDS)) {
            error("Timed out trying to send message to broker " + 
brokerId.toString)
            // Do not throw, as it brings controller into completely 
non-functional state
            // "Controller to broker state change requests batch is not empty 
while creating a new one"
            //throw new IllegalStateException("Timed out trying to send message 
to broker " + brokerId.toString)
          }
        case None =>
          warn("Not sending request %s to broker %d, since it is 
offline.".format(request, brokerId))
      }
    }
  }
{code}
# When broker which is a controler starts shut down if auto leader rebalance is 
running it deadlocks in the end (shutdown thread owns the lock and waits for 
rebalance thread to exit and rebalance thread wait for lock). *Proposed 
solution: use bounded wait in rebalance thread*. KafkaController.scala:
{code}
  // ODKL Patch to prevent deadlocks in shutdown.
  /**
   * Execute the given function inside the lock
   */
  def inLockIfRunning[T](lock: ReentrantLock)(fun: => T): T = {
    if (isRunning || lock.isHeldByCurrentThread) {
      // TODO: Configure timeout.
      if (!lock.tryLock(10, TimeUnit.SECONDS)) {
        throw new IllegalStateException("Failed to acquire controller lock in 
10 seconds.");
      }
      try {
        return fun
      } finally {
        lock.unlock()
      }
    } else {
      throw new IllegalStateException("Controller is not running, not allowed 
to lock.")
    }
  }

  private def checkAndTriggerPartitionRebalance(): Unit = {
    // Use inLockIfRunning here instead of inLock
  }
{code}
# Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector act 
in a way that they prefer the oldes replica in ISR (the one that joined the ISR 
first). In case of rolling update it means moving partitions to the tail which 
increases the overal amount of movements and finally significantly overloads 
the last broker (with 4 brokers and RF 3 the last one gets 3/4 of leadership). 
In case of multiple failures this logic can cuase a significant disbalance in 
the leadership. *Proposed solution: Move leadership to preferd replica if 
possible or to the younges replica (in controlled shutdown) or second prefered 
replica (in offline partition)*:
{code}
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) 
extends PartitionLeaderSelector with Logging {
...
            // ODKL Patch: Trying to select replica from ISR not depending on 
ISR join order, but following the
            // assignment order. Preferred replica is the first one, thus if 
possible it'll be chosen, but most
            // probably it is the dead one, thus we fallback to second 
preferred replica. Here we do not care about
            // overloading second preferred replica as we do not expect rolling 
crashed.
            val newLeader = liveBrokersInIsr.sortBy(x => 
assignedReplicas.indexOf(x)).head
...
}

class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
        extends PartitionLeaderSelector
        with Logging {
...
    // ODKL Patch: Trying to select replica from ISR not depending on ISR join 
order. If preferred replica is in ISR, choose
    // it, choose the last replica from ISR - it is expected to be the youngest 
(most probably already survived rolling
    // update)
    val newLeaderOpt = if (newIsr.contains(assignedReplicas.head)) 
assignedReplicas.headOption else newIsr.lastOption
...
}
{code}
# Auto leader rebalance started simultaneously with controlled shutdown compete 
with it for space in queue and can slow down the process. If the queue size is 
large it could also create a significant data loss (for few minutes there might 
be multiple brokers considering itself as a leader and accepting produce 
requests). *Proposed solution: add throttling to the auto rebalance*:
{code}
private def checkAndTriggerPartitionRebalance(): Unit = {
...
          if (imbalanceRatio > 
(config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
            info("Balancing broker " + leaderBroker + " with imbalance rate " + 
imbalanceRatio)
            topicsNotInPreferredReplica.foreach {
              case (topicPartition, replicas) => {
                inLockIfRunning(controllerContext.controllerLock) {
                  // do this check only if the broker is live and there are no 
partitions being reassigned currently
                  // and preferred replica election is not in progress
                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
                    controllerContext.partitionsBeingReassigned.size == 0 &&
                    
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
                    
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
                    
!deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
                    controllerContext.allTopics.contains(topicPartition.topic)) 
{
                    onPreferredReplicaElection(Set(topicPartition), true)
                  }
                }

                // ODKL patch: prevent too fast prefered replica elections.
                // TODO: Make configurable/use true throttling
                Utils.swallow(Thread.sleep(2000))
              }
            }
            info("Balancing broker " + leaderBroker + " done")
          }
...
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to