[ https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15929381#comment-15929381 ]
Nick Travers commented on KAFKA-4900: ------------------------------------- Circling back on this, now that we (seemingly) have a root-cause on our end for the issues we saw. It appears that while running the partition re-assignment tool, we persisted state in ZK that violated the type constraints on certain items in the re-assignment json (specifically, a List of Strings instead of a List of Integers for the broker assignment). I've created KAFKA-4914 to improve this. This initially threw a ClassCastException, as you'd expect, on each broker at startup time. I assume this then lead to a situation where metrics had already been registered for the broker. Part of the controller (re-)election process is registering metrics for the controller. If there are already metrics registered, an IllegalArgumentException is thrown, crashing the (re-)election method, which triggers the same issue on the next elected broker, etc. etc. I'm unsure exactly how the first exception triggered the second, but the ordering is as stated. ClassCastException first, and eventually an IllegalArgumentException. Could one potential solution here be to guard against re-registering metrics by first checking to see if the metrics are registered and then taking appropriate action (i.e. create or overwrite the metric)?. It feels a little heavy handed to throw an exception deep within the controller election process due to a metrics issue. This resulted in the whole cluster falling into a sustained controller re-election spin. > Brokers stuck in controller re-election loop after failing to register metrics > ------------------------------------------------------------------------------ > > Key: KAFKA-4900 > URL: https://issues.apache.org/jira/browse/KAFKA-4900 > Project: Kafka > Issue Type: Bug > Components: controller, core > Affects Versions: 0.10.1.1 > Reporter: Nick Travers > > We hit this today in one of out three node staging clusters. The exception > continues to occur on all three nodes. > {code} > 2017-03-15 02:17:30,677 ERROR > [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] > server.ZookeeperLeaderElector - Error while electing or becoming leader on > broker 9 > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=connection-close-rate, > group=controller-channel-metrics,description=Connections closed per second in > the window., tags={broker-id=10}]' already exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:617) > at org.apache.kafka.common.network.Selector.<init>(Selector.java:138) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45) > at scala.collection.immutable.Set$Set3.foreach(Set.scala:163) > at > kafka.controller.ControllerChannelManager.<init>(ControllerChannelManager.scala:45) > at > kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814) > at > kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742) > at > kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334) > at > kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167) > at > kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141) > at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} > We observe a tight loop of controller (re-)election, i.e. one node hits this > exception, and leadership transitions to the next, which then hits the > exception, ad infinitum. > Producers and consumers appear to be connecting ok, and are able to produce > and consume messages. > Relevant data points: > - prior to this cluster restart, a partition reassignment was attempted for > a number of topics, which appeared to get stuck in the "in progress" state > (on the order of days) > - these topics were subsequently deleted > - a rolling restart of the cluster was performed was to turn on > broker-to-broker SSL communication > - the SSL change has subsequently been _rolled back_ after we observed these > exceptions > - the entire cluster was shut down, and nodes brought back one at a time in > an attempt to clear the exception. We were able to restart the cluster, but > we continue to see the exceptions > We also observed, during the same time as the exception above, the following > exception on all hosts: > {code} > 2017-03-15 01:44:04,572 ERROR > [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] > controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener > on Controller 10]: Error while handling broker changes > java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > at > kafka.controller.KafkaController$$anonfun$8$$anonfun$apply$2.apply(KafkaController.scala:436) > at > scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) > at scala.collection.immutable.List.exists(List.scala:84) > at > kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:436) > at > kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:435) > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) > at > scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247) > at > scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) > at scala.collection.AbstractTraversable.filter(Traversable.scala:104) > at > kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:435) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:374) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)