[ 
https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15186928#comment-15186928
 ] 

Flavio Junqueira commented on KAFKA-3173:
-----------------------------------------

There is a race in the controller failover that can be causing this. Here is 
what we have in {{KafkaController.oncControllerFailover}}:

{noformat}
      partitionStateMachine.registerListeners()
      replicaStateMachine.registerListeners()
      initializeControllerContext()
      replicaStateMachine.startup()
      partitionStateMachine.startup()
{noformat}

Both partition state machine and replica state machine are registering zk 
listeners before they startup. In partition state machine, the two calls that 
invoke {{ControllerBrokerRequestBatch.newBatch()}} are 
{{triggerOnlinePartitionStateChange}} and {{handleStateChanges}}. They both 
invoke a private method that check {{hasStarted}} and throw an exception if it 
hasn't started. The variable {{hasStarted}} is set to true upon executing 
{{startup}}. Consequently, if we register the listener before starting the 
state machine and an event comes through, then what will happen is that we will 
leave the batch dirty, which will cause the exception in the description.

In both {{triggerOnlinePartitionStateChange}} and {{handleStateChanges}}, we 
simply log the error and move on leaving the batch dirty. Note that we have the 
same issue in replica state machine.

I believe the right execution order in {{onControllerFailover}} should be:

{noformat}
      partitionStateMachine.startup()
      replicaStateMachine.startup()
      partitionStateMachine.registerListeners()
      replicaStateMachine.registerListeners()
      initializeControllerContext()
{noformat}

There could be other sources, but right now this looks like a clear one to me. 
Also, something that looks really bad in lots of place in the controller code 
is that if there is some error processing partition or replica changes, then 
the code simply logs and moves on. Instead, I believe it should either recover 
from the error or resigning as controller, but we can't simply skip an update. 
This should be addressed when rewriting the controller, though.

As for this jira, I suggest we fix this race and revisit it in the case it 
reappears.


> Error while moving some partitions to OnlinePartition state 
> ------------------------------------------------------------
>
>                 Key: KAFKA-3173
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3173
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.9.0.0
>            Reporter: Flavio Junqueira
>            Assignee: Flavio Junqueira
>            Priority: Critical
>             Fix For: 0.10.0.0
>
>
> We observed another instance of the problem reported in KAFKA-2300, but this 
> time the error appeared in the partition state machine. In KAFKA-2300, we 
> haven't cleaned up the state in {{PartitionStateMachine}} and 
> {{ReplicaStateMachine}} as we do in {{KafkaController}}.
> Here is the stack trace:
> {noformat}
> 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
> Controller to broker state change requests batch is not empty while creating 
> a new one. 
> Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo:
> (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)))
>  might be lost        at 
> kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
>         at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144)
>         at 
> kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517)
>         at 
> kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504)
>         at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437)
>         at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
>         at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
>         at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)        at 
> kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418)
>         at 
> org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)        at 
> org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to