[ 
https://issues.apache.org/jira/browse/KAFKA-1887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14329318#comment-14329318
 ] 

Sriharsha Chintalapani commented on KAFKA-1887:
-----------------------------------------------

This can be fixed by moving KafkaHealthcheck.shutdown() after 
controller.shutdown() as suggested by [~nehanarkhede] [~gwenshap] . We also 
need to move kafkaHealthCheck.start() before controller.start() otherwise we 
will see the same error in state-change.log after broker started.  
The below patch causing the unit tests run time to go from 9min 30 sec to 
15mins on my machine and also causing intermittent test failure in 
ProducerFailureHandlingTest.testNotEnoughReplicasAfterBrokerShutdown  as 
producer.send.get gets an NotEnoughReplicasAfterAppendException instead of 
NotEnoughReplicasException (probably not related to this patch). I am looking 
into the test slowness with the patch but if you have any idea/fix for this 
please go ahead and take the jira I don't want to hold up 0.8.2.1 release. I'll 
update the jira as soon as I've a fix. 
{code}
+        /* tell everyone we are alive */
+        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, 
config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, 
zkClient)
+        kafkaHealthcheck.startup()
+
         /* start kafka controller */
         kafkaController = new KafkaController(config, zkClient, brokerState)
         kafkaController.startup()
@@ -152,10 +156,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
         topicConfigManager = new TopicConfigManager(zkClient, logManager)
         topicConfigManager.startup()
 
-        /* tell everyone we are alive */
-        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, 
config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, 
zkClient)
-        kafkaHealthcheck.startup()
-
         /* register broker metrics */
         registerStats()
 
@@ -310,8 +310,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
       if (canShutdown) {
         Utils.swallow(controlledShutdown())
         brokerState.newState(BrokerShuttingDown)
-        if(kafkaHealthcheck != null)
-          Utils.swallow(kafkaHealthcheck.shutdown())
         if(socketServer != null)
           Utils.swallow(socketServer.shutdown())
         if(requestHandlerPool != null)
@@ -329,6 +327,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
           Utils.swallow(consumerCoordinator.shutdown())
         if(kafkaController != null)
           Utils.swallow(kafkaController.shutdown())
+        if(kafkaHealthcheck != null)
+          Utils.swallow(kafkaHealthcheck.shutdown())
         if(zkClient != null)
           Utils.swallow(zkClient.close())

{code}

> controller error message on shutting the last broker
> ----------------------------------------------------
>
>                 Key: KAFKA-1887
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1887
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Jun Rao
>            Assignee: Sriharsha Chintalapani
>            Priority: Minor
>             Fix For: 0.8.3
>
>
> We always see the following error in state-change log on shutting down the 
> last broker.
> [2015-01-20 13:21:04,036] ERROR Controller 0 epoch 3 initiated state change 
> for partition [test,0] from OfflinePartition to OnlinePartition failed 
> (state.change.logger)
> kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is 
> alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
>         at 
> kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:75)
>         at 
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:357)
>         at 
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:206)
>         at 
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
>         at 
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
>         at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at 
> kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
>         at 
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:446)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:373)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>         at kafka.utils.Utils$.inLock(Utils.scala:535)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>         at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to