[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Bugaychenko updated KAFKA-2029:
--------------------------------------
    Description: 
Controlled shutdown as implemented currently can cause numerous problems: 
deadlocks, local and global datalos, partitions without leader and etc. In some 
cases the only way to restore cluster is to stop it completelly using kill -9 
and start again.

Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue 
size makes things much worse (see discussion there).

Note 2: The problems described here can occure in any setup, but they are 
extremly painful in setup with large brokers (36 disks, 1000+ assigned 
partitions per broker in our case).

Note 3: These improvements are actually workarounds and it is worth to consider 
global refactoring of the controller (make it single thread, or even get rid of 
it in the favour of ZK leader elections for partitions).

The problems are:

# Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
shutdown requests and finally considers it as failed and procedes to unclean 
shutdow, controller got stuck for a while (holding a lock waiting for free 
space in controller-to-broker queue). After broker starts back it receives 
followers request and erases highwatermarks (with a message that "replica does 
not exists" - controller hadn't yet sent a request with replica assignment), 
then controller starts replicas on the broker it deletes all local data (due to 
missing highwatermarks). Furthermore, controller starts processing pending 
shutdown request and stops replicas on the broker letting it in a 
non-functional state. Solution to the problem might be to increase time broker 
waits for controller reponse to shutdown request, but this timeout is taken 
from controller.socket.timeout.ms which is global for all broker-controller 
communication and setting it to 30 minutes is dangerous. *Proposed solution: 
introduce dedicated config parameter for this timeout with a high default*.
# If a broker gets down during controlled shutdown and did not come back 
controller got stuck in a deadlock (one thread owns the lock and tries to add 
message to the dead broker's queue, send thread is a infinite loop trying to 
retry message to the dead broker, and the broker failure handler is waiting for 
a lock). There are numerous partitions without a leader and the only way out is 
to kill -9 the controller. *Proposed solution: add timeout for adding message 
to broker's queue*. ControllerChannelManager.sendRequest:
{code}
  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
(RequestOrResponse) => Unit = null) {
    brokerLock synchronized {
      val stateInfoOpt = brokerStateInfo.get(brokerId)
      stateInfoOpt match {
        case Some(stateInfo) =>
          // ODKL Patch: prevent infinite hang on trying to send message to a 
dead broker.
          // TODO: Move timeout to config
          if (!stateInfo.messageQueue.offer((request, callback), 10, 
TimeUnit.SECONDS)) {
            error("Timed out trying to send message to broker " + 
brokerId.toString)
            // Do not throw, as it brings controller into completely 
non-functional state
            // "Controller to broker state change requests batch is not empty 
while creating a new one"
            //throw new IllegalStateException("Timed out trying to send message 
to broker " + brokerId.toString)
          }
        case None =>
          warn("Not sending request %s to broker %d, since it is 
offline.".format(request, brokerId))
      }
    }
  }
{code}
# When broker which is a controler starts shut down if auto leader rebalance is 
running it deadlocks in the end (shutdown thread owns the lock and waits for 
rebalance thread to exit and rebalance thread wait for lock). *Proposed 
solution: use bounded wait in rebalance thread*. KafkaController.scala:
{code}
  // ODKL Patch to prevent deadlocks in shutdown.
  /**
   * Execute the given function inside the lock
   */
  def inLockIfRunning[T](lock: ReentrantLock)(fun: => T): T = {
    if (isRunning || lock.isHeldByCurrentThread) {
      // TODO: Configure timeout.
      if (!lock.tryLock(10, TimeUnit.SECONDS)) {
        throw new IllegalStateException("Failed to acquire controller lock in 
10 seconds.");
      }
      try {
        return fun
      } finally {
        lock.unlock()
      }
    } else {
      throw new IllegalStateException("Controller is not running, not allowed 
to lock.")
    }
  }

  private def checkAndTriggerPartitionRebalance(): Unit = {
    // Use inLockIfRunning here instead of inLock
  }
{code}
# Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector act 
in a way that they prefer the oldes replica in ISR (the one that joined the ISR 
first). In case of rolling update it means moving partitions to the tail which 
increases the overal amount of movements and finally significantly overloads 
the last broker (with 4 brokers and RF 3 the last one gets 3/4 of leadership). 
In case of multiple failures this logic can cuase a significant disbalance in 
the leadership. *Proposed solution: Move leadership to preferd replica if 
possible or to the younges replica (in controlled shutdown) or second prefered 
replica (in offline partition)*:
{code}
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) 
extends PartitionLeaderSelector with Logging {
...
            // ODKL Patch: Trying to select replica from ISR not depending on 
ISR join order, but following the
            // assignment order. Preferred replica is the first one, thus if 
possible it'll be chosen, but most
            // probably it is the dead one, thus we fallback to second 
preferred replica. Here we do not care about
            // overloading second preferred replica as we do not expect rolling 
crashed.
            val newLeader = liveBrokersInIsr.sortBy(x => 
assignedReplicas.indexOf(x)).head
...
}

class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
        extends PartitionLeaderSelector
        with Logging {
...
    // ODKL Patch: Trying to select replica from ISR not depending on ISR join 
order. If preferred replica is in ISR, choose
    // it, choose the last replica from ISR - it is expected to be the youngest 
(most probably already survived rolling
    // update)
    val newLeaderOpt = if (newIsr.contains(assignedReplicas.head)) 
assignedReplicas.headOption else newIsr.lastOption
...
}
{code}
# Auto leader rebalance started simultaneously with controlled shutdown compete 
with it for space in queue and can slow down the process. If the queue size is 
large it could also create a significant data loss (for few minutes there might 
be multiple brokers considering itself as a leader and accepting produce 
requests). *Proposed solution: add throttling to the auto rebalance*:
{code}
private def checkAndTriggerPartitionRebalance(): Unit = {
...
          if (imbalanceRatio > 
(config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
            info("Balancing broker " + leaderBroker + " with imbalance rate " + 
imbalanceRatio)
            topicsNotInPreferredReplica.foreach {
              case (topicPartition, replicas) => {
                inLockIfRunning(controllerContext.controllerLock) {
                  // do this check only if the broker is live and there are no 
partitions being reassigned currently
                  // and preferred replica election is not in progress
                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
                    controllerContext.partitionsBeingReassigned.size == 0 &&
                    
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
                    
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
                    
!deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
                    controllerContext.allTopics.contains(topicPartition.topic)) 
{
                    onPreferredReplicaElection(Set(topicPartition), true)
                  }
                }

                // ODKL patch: prevent too fast prefered replica elections.
                // TODO: Make configurable/use true throttling
                Utils.swallow(Thread.sleep(2000))
              }
            }
            info("Balancing broker " + leaderBroker + " done")
          }
...
}
{code}

  was:
Controlled shutdown as implemented currently can cause numerous problems: 
deadlocks, local and global datalos, partitions without leader and etc. In some 
cases the only way to restore cluster is to stop it completelly using kill -9 
and start again.

Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue 
size makes things much worse (see discussion there).

Note 2: The problems described here can occure in any setup, but they are 
extremly painful in setup with large brokers (36 disks, 1000+ assigned 
partitions per broker in our case).

The problems are:

# Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
shutdown requests and finally considers it as failed and procedes to unclean 
shutdow, controller got stuck for a while (holding a lock waiting for free 
space in controller-to-broker queue). After broker starts back it receives 
followers request and erases highwatermarks (with a message that "replica does 
not exists" - controller hadn't yet sent a request with replica assignment), 
then controller starts replicas on the broker it deletes all local data (due to 
missing highwatermarks). Furthermore, controller starts processing pending 
shutdown request and stops replicas on the broker letting it in a 
non-functional state. Solution to the problem might be to increase time broker 
waits for controller reponse to shutdown request, but this timeout is taken 
from controller.socket.timeout.ms which is global for all broker-controller 
communication and setting it to 30 minutes is dangerous. *Proposed solution: 
introduce dedicated config parameter for this timeout with a high default*.
# If a broker gets down during controlled shutdown and did not come back 
controller got stuck in a deadlock (one thread owns the lock and tries to add 
message to the dead broker's queue, send thread is a infinite loop trying to 
retry message to the dead broker, and the broker failure handler is waiting for 
a lock). There are numerous partitions without a leader and the only way out is 
to kill -9 the controller. *Proposed solution: add timeout for adding message 
to broker's queue*. ControllerChannelManager.sendRequest:
{code}
  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
(RequestOrResponse) => Unit = null) {
    brokerLock synchronized {
      val stateInfoOpt = brokerStateInfo.get(brokerId)
      stateInfoOpt match {
        case Some(stateInfo) =>
          // ODKL Patch: prevent infinite hang on trying to send message to a 
dead broker.
          // TODO: Move timeout to config
          if (!stateInfo.messageQueue.offer((request, callback), 10, 
TimeUnit.SECONDS)) {
            error("Timed out trying to send message to broker " + 
brokerId.toString)
            // Do not throw, as it brings controller into completely 
non-functional state
            // "Controller to broker state change requests batch is not empty 
while creating a new one"
            //throw new IllegalStateException("Timed out trying to send message 
to broker " + brokerId.toString)
          }
        case None =>
          warn("Not sending request %s to broker %d, since it is 
offline.".format(request, brokerId))
      }
    }
  }
{code}
# When broker which is a controler starts shut down if auto leader rebalance is 
running it deadlocks in the end (shutdown thread owns the lock and waits for 
rebalance thread to exit and rebalance thread wait for lock). *Proposed 
solution: use bounded wait in rebalance thread*. KafkaController.scala:
{code}
  // ODKL Patch to prevent deadlocks in shutdown.
  /**
   * Execute the given function inside the lock
   */
  def inLockIfRunning[T](lock: ReentrantLock)(fun: => T): T = {
    if (isRunning || lock.isHeldByCurrentThread) {
      // TODO: Configure timeout.
      if (!lock.tryLock(10, TimeUnit.SECONDS)) {
        throw new IllegalStateException("Failed to acquire controller lock in 
10 seconds.");
      }
      try {
        return fun
      } finally {
        lock.unlock()
      }
    } else {
      throw new IllegalStateException("Controller is not running, not allowed 
to lock.")
    }
  }

  private def checkAndTriggerPartitionRebalance(): Unit = {
    // Use inLockIfRunning here instead of inLock
  }
{code}
# Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector act 
in a way that they prefer the oldes replica in ISR (the one that joined the ISR 
first). In case of rolling update it means moving partitions to the tail which 
increases the overal amount of movements and finally significantly overloads 
the last broker (with 4 brokers and RF 3 the last one gets 3/4 of leadership). 
In case of multiple failures this logic can cuase a significant disbalance in 
the leadership. *Proposed solution: Move leadership to preferd replica if 
possible or to the younges replica (in controlled shutdown) or second prefered 
replica (in offline partition)*:
{code}
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) 
extends PartitionLeaderSelector with Logging {
...
            // ODKL Patch: Trying to select replica from ISR not depending on 
ISR join order, but following the
            // assignment order. Preferred replica is the first one, thus if 
possible it'll be chosen, but most
            // probably it is the dead one, thus we fallback to second 
preferred replica. Here we do not care about
            // overloading second preferred replica as we do not expect rolling 
crashed.
            val newLeader = liveBrokersInIsr.sortBy(x => 
assignedReplicas.indexOf(x)).head
...
}

class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
        extends PartitionLeaderSelector
        with Logging {
...
    // ODKL Patch: Trying to select replica from ISR not depending on ISR join 
order. If preferred replica is in ISR, choose
    // it, choose the last replica from ISR - it is expected to be the youngest 
(most probably already survived rolling
    // update)
    val newLeaderOpt = if (newIsr.contains(assignedReplicas.head)) 
assignedReplicas.headOption else newIsr.lastOption
...
}
{code}
# Auto leader rebalance started simultaneously with controlled shutdown compete 
with it for space in queue and can slow down the process. If the queue size is 
large it could also create a significant data loss (for few minutes there might 
be multiple brokers considering itself as a leader and accepting produce 
requests). *Proposed solution: add throttling to the auto rebalance*:
{code}
private def checkAndTriggerPartitionRebalance(): Unit = {
...
          if (imbalanceRatio > 
(config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
            info("Balancing broker " + leaderBroker + " with imbalance rate " + 
imbalanceRatio)
            topicsNotInPreferredReplica.foreach {
              case (topicPartition, replicas) => {
                inLockIfRunning(controllerContext.controllerLock) {
                  // do this check only if the broker is live and there are no 
partitions being reassigned currently
                  // and preferred replica election is not in progress
                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
                    controllerContext.partitionsBeingReassigned.size == 0 &&
                    
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
                    
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
                    
!deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
                    controllerContext.allTopics.contains(topicPartition.topic)) 
{
                    onPreferredReplicaElection(Set(topicPartition), true)
                  }
                }

                // ODKL patch: prevent too fast prefered replica elections.
                // TODO: Make configurable/use true throttling
                Utils.swallow(Thread.sleep(2000))
              }
            }
            info("Balancing broker " + leaderBroker + " done")
          }
...
}
{code}


> Improving controlled shutdown for rolling updates
> -------------------------------------------------
>
>                 Key: KAFKA-2029
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2029
>             Project: Kafka
>          Issue Type: Improvement
>          Components: controller
>    Affects Versions: 0.8.1.1
>            Reporter: Dmitry Bugaychenko
>            Assignee: Neha Narkhede
>            Priority: Critical
>
> Controlled shutdown as implemented currently can cause numerous problems: 
> deadlocks, local and global datalos, partitions without leader and etc. In 
> some cases the only way to restore cluster is to stop it completelly using 
> kill -9 and start again.
> Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
> queue size makes things much worse (see discussion there).
> Note 2: The problems described here can occure in any setup, but they are 
> extremly painful in setup with large brokers (36 disks, 1000+ assigned 
> partitions per broker in our case).
> Note 3: These improvements are actually workarounds and it is worth to 
> consider global refactoring of the controller (make it single thread, or even 
> get rid of it in the favour of ZK leader elections for partitions).
> The problems are:
> # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
> shutdown requests and finally considers it as failed and procedes to unclean 
> shutdow, controller got stuck for a while (holding a lock waiting for free 
> space in controller-to-broker queue). After broker starts back it receives 
> followers request and erases highwatermarks (with a message that "replica 
> does not exists" - controller hadn't yet sent a request with replica 
> assignment), then controller starts replicas on the broker it deletes all 
> local data (due to missing highwatermarks). Furthermore, controller starts 
> processing pending shutdown request and stops replicas on the broker letting 
> it in a non-functional state. Solution to the problem might be to increase 
> time broker waits for controller reponse to shutdown request, but this 
> timeout is taken from controller.socket.timeout.ms which is global for all 
> broker-controller communication and setting it to 30 minutes is dangerous. 
> *Proposed solution: introduce dedicated config parameter for this timeout 
> with a high default*.
> # If a broker gets down during controlled shutdown and did not come back 
> controller got stuck in a deadlock (one thread owns the lock and tries to add 
> message to the dead broker's queue, send thread is a infinite loop trying to 
> retry message to the dead broker, and the broker failure handler is waiting 
> for a lock). There are numerous partitions without a leader and the only way 
> out is to kill -9 the controller. *Proposed solution: add timeout for adding 
> message to broker's queue*. ControllerChannelManager.sendRequest:
> {code}
>   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
> (RequestOrResponse) => Unit = null) {
>     brokerLock synchronized {
>       val stateInfoOpt = brokerStateInfo.get(brokerId)
>       stateInfoOpt match {
>         case Some(stateInfo) =>
>           // ODKL Patch: prevent infinite hang on trying to send message to a 
> dead broker.
>           // TODO: Move timeout to config
>           if (!stateInfo.messageQueue.offer((request, callback), 10, 
> TimeUnit.SECONDS)) {
>             error("Timed out trying to send message to broker " + 
> brokerId.toString)
>             // Do not throw, as it brings controller into completely 
> non-functional state
>             // "Controller to broker state change requests batch is not empty 
> while creating a new one"
>             //throw new IllegalStateException("Timed out trying to send 
> message to broker " + brokerId.toString)
>           }
>         case None =>
>           warn("Not sending request %s to broker %d, since it is 
> offline.".format(request, brokerId))
>       }
>     }
>   }
> {code}
> # When broker which is a controler starts shut down if auto leader rebalance 
> is running it deadlocks in the end (shutdown thread owns the lock and waits 
> for rebalance thread to exit and rebalance thread wait for lock). *Proposed 
> solution: use bounded wait in rebalance thread*. KafkaController.scala:
> {code}
>   // ODKL Patch to prevent deadlocks in shutdown.
>   /**
>    * Execute the given function inside the lock
>    */
>   def inLockIfRunning[T](lock: ReentrantLock)(fun: => T): T = {
>     if (isRunning || lock.isHeldByCurrentThread) {
>       // TODO: Configure timeout.
>       if (!lock.tryLock(10, TimeUnit.SECONDS)) {
>         throw new IllegalStateException("Failed to acquire controller lock in 
> 10 seconds.");
>       }
>       try {
>         return fun
>       } finally {
>         lock.unlock()
>       }
>     } else {
>       throw new IllegalStateException("Controller is not running, not allowed 
> to lock.")
>     }
>   }
>   private def checkAndTriggerPartitionRebalance(): Unit = {
>     // Use inLockIfRunning here instead of inLock
>   }
> {code}
> # Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector 
> act in a way that they prefer the oldes replica in ISR (the one that joined 
> the ISR first). In case of rolling update it means moving partitions to the 
> tail which increases the overal amount of movements and finally significantly 
> overloads the last broker (with 4 brokers and RF 3 the last one gets 3/4 of 
> leadership). In case of multiple failures this logic can cuase a significant 
> disbalance in the leadership. *Proposed solution: Move leadership to preferd 
> replica if possible or to the younges replica (in controlled shutdown) or 
> second prefered replica (in offline partition)*:
> {code}
> class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) 
> extends PartitionLeaderSelector with Logging {
> ...
>             // ODKL Patch: Trying to select replica from ISR not depending on 
> ISR join order, but following the
>             // assignment order. Preferred replica is the first one, thus if 
> possible it'll be chosen, but most
>             // probably it is the dead one, thus we fallback to second 
> preferred replica. Here we do not care about
>             // overloading second preferred replica as we do not expect 
> rolling crashed.
>             val newLeader = liveBrokersInIsr.sortBy(x => 
> assignedReplicas.indexOf(x)).head
> ...
> }
> class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
>         extends PartitionLeaderSelector
>         with Logging {
> ...
>     // ODKL Patch: Trying to select replica from ISR not depending on ISR 
> join order. If preferred replica is in ISR, choose
>     // it, choose the last replica from ISR - it is expected to be the 
> youngest (most probably already survived rolling
>     // update)
>     val newLeaderOpt = if (newIsr.contains(assignedReplicas.head)) 
> assignedReplicas.headOption else newIsr.lastOption
> ...
> }
> {code}
> # Auto leader rebalance started simultaneously with controlled shutdown 
> compete with it for space in queue and can slow down the process. If the 
> queue size is large it could also create a significant data loss (for few 
> minutes there might be multiple brokers considering itself as a leader and 
> accepting produce requests). *Proposed solution: add throttling to the auto 
> rebalance*:
> {code}
> private def checkAndTriggerPartitionRebalance(): Unit = {
> ...
>           if (imbalanceRatio > 
> (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
>             info("Balancing broker " + leaderBroker + " with imbalance rate " 
> + imbalanceRatio)
>             topicsNotInPreferredReplica.foreach {
>               case (topicPartition, replicas) => {
>                 inLockIfRunning(controllerContext.controllerLock) {
>                   // do this check only if the broker is live and there are 
> no partitions being reassigned currently
>                   // and preferred replica election is not in progress
>                   if (controllerContext.liveBrokerIds.contains(leaderBroker) 
> &&
>                     controllerContext.partitionsBeingReassigned.size == 0 &&
>                     
> controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
>                     
> !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
>                     
> !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
>                     
> controllerContext.allTopics.contains(topicPartition.topic)) {
>                     onPreferredReplicaElection(Set(topicPartition), true)
>                   }
>                 }
>                 // ODKL patch: prevent too fast prefered replica elections.
>                 // TODO: Make configurable/use true throttling
>                 Utils.swallow(Thread.sleep(2000))
>               }
>             }
>             info("Balancing broker " + leaderBroker + " done")
>           }
> ...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to