[ https://issues.apache.org/jira/browse/KAFKA-1887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14329318#comment-14329318 ]
Sriharsha Chintalapani commented on KAFKA-1887: ----------------------------------------------- This can be fixed by moving KafkaHealthcheck.shutdown() after controller.shutdown() as suggested by [~nehanarkhede] [~gwenshap] . We also need to move kafkaHealthCheck.start() before controller.start() otherwise we will see the same error in state-change.log after broker started. The below patch causing the unit tests run time to go from 9min 30 sec to 15mins on my machine and also causing intermittent test failure in ProducerFailureHandlingTest.testNotEnoughReplicasAfterBrokerShutdown as producer.send.get gets an NotEnoughReplicasAfterAppendException instead of NotEnoughReplicasException (probably not related to this patch). I am looking into the test slowness with the patch but if you have any idea/fix for this please go ahead and take the jira I don't want to hold up 0.8.2.1 release. I'll update the jira as soon as I've a fix. {code} + /* tell everyone we are alive */ + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck.startup() + /* start kafka controller */ kafkaController = new KafkaController(config, zkClient, brokerState) kafkaController.startup() @@ -152,10 +156,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() - /* tell everyone we are alive */ - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) - kafkaHealthcheck.startup() - /* register broker metrics */ registerStats() @@ -310,8 +310,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if (canShutdown) { Utils.swallow(controlledShutdown()) brokerState.newState(BrokerShuttingDown) - if(kafkaHealthcheck != null) - Utils.swallow(kafkaHealthcheck.shutdown()) if(socketServer != null) Utils.swallow(socketServer.shutdown()) if(requestHandlerPool != null) @@ -329,6 +327,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(consumerCoordinator.shutdown()) if(kafkaController != null) Utils.swallow(kafkaController.shutdown()) + if(kafkaHealthcheck != null) + Utils.swallow(kafkaHealthcheck.shutdown()) if(zkClient != null) Utils.swallow(zkClient.close()) {code} > controller error message on shutting the last broker > ---------------------------------------------------- > > Key: KAFKA-1887 > URL: https://issues.apache.org/jira/browse/KAFKA-1887 > Project: Kafka > Issue Type: Bug > Components: core > Reporter: Jun Rao > Assignee: Sriharsha Chintalapani > Priority: Minor > Fix For: 0.8.3 > > > We always see the following error in state-change log on shutting down the > last broker. > [2015-01-20 13:21:04,036] ERROR Controller 0 epoch 3 initiated state change > for partition [test,0] from OfflinePartition to OnlinePartition failed > (state.change.logger) > kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is > alive. Live brokers are: [Set()], Assigned replicas are: [List(0)] > at > kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:75) > at > kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:357) > at > kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:206) > at > kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120) > at > kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117) > at > kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:446) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:373) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.Utils$.inLock(Utils.scala:535) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) -- This message was sent by Atlassian JIRA (v6.3.4#6332)