[ 
https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941296#comment-15941296
 ] 

Onur Karaman commented on KAFKA-4900:
-------------------------------------

I haven't figured out some of the small details but I think I figured out the 
gist of what happened at LinkedIn.

One problem is that ZkClient's ZkEventThread and a RequestSendThread can 
concurrently use objects that aren't thread-safe:
* Selector
* NetworkClient
* SSLEngine (this was the big one for us. We turn on SSL for interbroker 
communication).

As per the "Concurrency Notes" section from the [SSLEngine 
javadoc|https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html]:
bq. two threads must not attempt to call the same method (either wrap() or 
unwrap()) concurrently

SSLEngine.wrap gets called in:
* SslTransportLayer.write
* SslTransportLayer.handshake
* SslTransportLayer.close

It turns out that the ZkEventThread and RequestSendThread can concurrently call 
SSLEngine.wrap:
* ZkEventThread calls SslTransportLayer.close from 
ControllerChannelManager.removeExistingBroker
* RequestSendThread can call SslTransportLayer.write or 
SslTransportLayer.handshake from NetworkClient.poll

Suppose the controller moves for whatever reason. The former controller could 
have had a RequestSendThread who was in the middle of sending out messages to 
the cluster while the ZkEventThread began executing 
KafkaController.onControllerResignation, which calls 
ControllerChannelManager.shutdown, which sequentially cleans up the 
controller-to-broker queue and connection for every broker in the cluster. This 
cleanup includes the call to ControllerChannelManager.removeExistingBroker as 
mentioned earlier, causing the concurrent call to SSLEngine.wrap. This 
concurrent call throws a BufferOverflowException which 
ControllerChannelManager.removeExistingBroker catches so the 
ControllerChannelManager.shutdown moves onto cleaning up the next 
controller-to-broker queue and connection, skipping the cleanup steps such as 
clearing the queue, stopping the RequestSendThread, and removing the entry from 
its brokerStateInfo map.

By failing out of the Selector.close, the sensors corresponding to the broker 
connection has not been cleaned up. Any later attempt at initializing an 
identical Selector will result in a sensor collision and therefore cause 
Selector initialization to throw an exception. In other words, any later 
attempts by this broker to become controller again will fail on initialization. 
When controller initialization fails, the controller deletes the /controller 
znode and lets another broker take over.

Now suppose the controller moves enough times such that every broker hits the 
BufferOverflowException concurrency issue. We're now guaranteed to fail 
controller initialization due to the sensor collision on every controller 
transition, so the controller will move across brokers continuously.

The "connection-close-rate" is specifically cited because that's always the 
first sensor registered upon Selector initialization and therefore is the 
collision we see in the logs.

Now the question is why did the controller move enough times to hit every 
broker in the cluster with the BufferOverflowException to begin with? 
Considering the BufferOverflowException on resignation issue alone, it's 
plausible that the cluster should stabilize after the first 
BufferOverflowException.

We were running 0.10.1.1, where a controller can process events from ZkClient 
even after resigning as controller (fixed in 0.10.2.0 in KAFKA-4447). For 
instance, processing an isr change notification after resignation causes a 
NullPointerException while broadcasting an UpdateMetadataRequest to the cluster 
since the earlier controller resignation made the ControllerChannelManager 
null. This NullPointerException causes the controller to delete the /controller 
znode even though it's no longer the controller, kicking out the current active 
controller, potentially causing the same scenario to repeat. Every broker's 
logs had reported this NullPointerException from the 
IsrChangeNotificationListener, which explains how the controller moved around 
enough times such that every broker experienced the initial 
BufferOverflowException.

> Brokers stuck in controller re-election loop after failing to register metrics
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-4900
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4900
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller, core
>    Affects Versions: 0.10.1.1
>            Reporter: Nick Travers
>
> We hit this today in one of out three node staging clusters. The exception 
> continues to occur on all three nodes.
> {code}
> 2017-03-15 02:17:30,677 ERROR 
> [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> server.ZookeeperLeaderElector - Error while electing or becoming leader on 
> broker 9
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, 
> group=controller-channel-metrics,description=Connections closed per second in 
> the window., tags={broker-id=10}]' already exists, can't register another one.
>         at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380)
>         at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179)
>         at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164)
>         at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:617)
>         at org.apache.kafka.common.network.Selector.<init>(Selector.java:138)
>         at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101)
>         at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
>         at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
>         at scala.collection.immutable.Set$Set3.foreach(Set.scala:163)
>         at 
> kafka.controller.ControllerChannelManager.<init>(ControllerChannelManager.scala:45)
>         at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814)
>         at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742)
>         at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334)
>         at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167)
>         at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
>         at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
>         at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
>         at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>         at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
>         at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) 
> {code}
> We observe a tight loop of controller (re-)election, i.e. one node hits this 
> exception, and leadership transitions to the next, which then hits the 
> exception, ad infinitum.
> Producers and consumers appear to be connecting ok, and are able to produce 
> and consume messages.
> Relevant data points:
>  - prior to this cluster restart, a partition reassignment was attempted for 
> a number of topics, which appeared to get stuck in the "in progress" state 
> (on the order of days)
>  - these topics were subsequently deleted
>  - a rolling restart of the cluster was performed was to turn on 
> broker-to-broker SSL communication
>  - the SSL change has subsequently been _rolled back_ after we observed these 
> exceptions
>  - the entire cluster was shut down, and nodes brought back one at a time in 
> an attempt to clear the exception. We were able to restart the cluster, but 
> we continue to see the exceptions
> We also observed, during the same time as the exception above, the following 
> exception on all hosts:
> {code}
> 2017-03-15 01:44:04,572 ERROR 
> [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener 
> on Controller 10]: Error while handling broker changes
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Integer
>         at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
>         at 
> kafka.controller.KafkaController$$anonfun$8$$anonfun$apply$2.apply(KafkaController.scala:436)
>         at 
> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
>         at scala.collection.immutable.List.exists(List.scala:84)
>         at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:436)
>         at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:435)
>         at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>         at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>         at 
> scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
>         at 
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
>         at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
>         at 
> kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:435)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:374)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>         at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355)
>         at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to