Maysam Yabandeh created KAFKA-3964:
--------------------------------------

             Summary: Metadata update requests are sometimes received after 
LeaderAndIsrRequests
                 Key: KAFKA-3964
                 URL: https://issues.apache.org/jira/browse/KAFKA-3964
             Project: Kafka
          Issue Type: Bug
            Reporter: Maysam Yabandeh
            Priority: Minor


The broker needs metadata of the leader before being able to process 
LeaderAndIsrRequest from the controller. For this reason on broker startup the 
controller first sends the metadata update requests and AFTER that it sends the 
LeaderAndIsrRequests:
{code}
 def onBrokerStartup(newBrokers: Seq[Int]) {
    info("New broker startup callback for %s".format(newBrokers.mkString(",")))
    val newBrokersSet = newBrokers.toSet
    // send update metadata request to all live and shutting down brokers. Old 
brokers will get to know of the new
    // broker via this update.
    // In cases of controlled shutdown leaders will not be elected when a new 
broker comes up. So at least in the
    // common controlled shutdown case, the metadata will reach the new brokers 
faster
    
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
    // the very first thing to do when a new broker comes up is send it the 
entire list of partitions that it is
    // supposed to host. Based on that the broker starts the high watermark 
threads for the input list of partitions
    val allReplicasOnNewBrokers = 
controllerContext.replicasOnBrokers(newBrokersSet)
    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, 
OnlineReplica)
{code}

However this protocol is not followed when a nodes becomes the controller: it 
sends LeaderAndIsrRequests BEFORE sending the metadata update requests:
{code}
  def onControllerFailover() {
...
      replicaStateMachine.startup()
...
      /* send partition leadership info to all live brokers */      
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
{code}
ReplicaStateMachine::startup
{code}
  def startup() {
...
    // move all Online replicas to Online
    handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica){code}
which trigger LeaderAndIsrRequest messages.

Here is the symptoms that one would observe when this problem manifests:
# The first set of messages that the broker receives from the controller is 
LeaderAndIsrRequests
# The broker fails to become the follower as requested by the controller
{code}
2016-07-12 21:03:53,081 ERROR change.logger: Broker 14 received 
LeaderAndIsrRequest with correlation id 0 from controller 21 epoch 290 for 
partition [topicxyz,7] but cannot become follower since the new leader 22 is 
unavailable.
{code}
# The fetcher hence does not start and the partition remains under-replicated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to