[ https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14105480#comment-14105480 ]
Yiyang Li commented on KAFKA-1029: ---------------------------------- Hello, I am totally new to Kafka, and we are .Net developers who is testing the kafka by the kafka-net library. The Nuget package is still under alpha version, leaving lots of functionality not implemented. We recently got this problem when we embed a producer in a service, where according to the library, it will do a ResponseTimeoutCheck every 30000 ms. However, one of the client throws an error (the others are fine) ERROR Closing socket for /10.207.x.x because of error (kafka.network.Processor) java.io.IOException: An existing connection was forcibly closed by the remote host at sun.nio.ch.SocketDispatcher.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:51) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:217) at kafka.network.Processor.write(SocketServer.scala:375) at kafka.network.Processor.run(SocketServer.scala:247) at java.lang.Thread.run(Thread.java:745) log4j:ERROR Failed to rename [/cygdrive/c/kafka/bin/../logs/server.log] to [/cygdrive/c/kafka/bin/../logs/server.log.2014-08-20-17]. under the kafka-server-start.sh I have change the log4j properties to log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log Other backgound: Kafka binaries: Scala 2.9.2 - kafka_2.9.2-0.8.1.1 Brokers: 3 brokers in 3 different machines, using the same port under each IP (borker id = 0, 1, 2) Zookeeper: 2181 port at one of the brokers I understand that it might be hard to reproduce as it might be the problem in incomplete .Net client Details of ResponseTimeoutCheck: https://github.com/YiyangLi/kafka-net/blob/master/src/kafka-net/KafkaConnection.cs (Line 200) Thanks. > Zookeeper leader election stuck in ephemeral node retry loop > ------------------------------------------------------------ > > Key: KAFKA-1029 > URL: https://issues.apache.org/jira/browse/KAFKA-1029 > Project: Kafka > Issue Type: Bug > Components: controller > Affects Versions: 0.8.0 > Reporter: Sam Meder > Assignee: Sam Meder > Priority: Blocker > Fix For: 0.8.0 > > Attachments: > 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch > > > We're seeing the following log statements (over and over): > [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3, > "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2, > "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$) > [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{ > "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a > while back in a different session, hence I will backoff for this node to be > deleted by Zookeeper and retry (kafka.utils.ZkUtils$) > where the broker is essentially stuck in the loop that is trying to deal with > left-over ephemeral nodes. The code looks a bit racy to me. In particular: > ZookeeperLeaderElector: > def elect: Boolean = { > controllerContext.zkClient.subscribeDataChanges(electionPath, > leaderChangeListener) > val timestamp = SystemTime.milliseconds.toString > val electString = ... > try { > > createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, > electionPath, electString, leaderId, > (controllerString : String, leaderId : Any) => > KafkaController.parseControllerId(controllerString) == > leaderId.asInstanceOf[Int], > controllerContext.zkSessionTimeout) > leaderChangeListener is registered before the create call (by the way, it > looks like a new registration will be added every elect call - shouldn't it > register in startup()?) so can update leaderId to the current leader before > the call to create. If that happens then we will continuously get node exists > exceptions and the checker function will always return true, i.e. we will > never get out of the while(true) loop. > I think the right fix here is to pass brokerId instead of leaderId when > calling create, i.e. > createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, > electionPath, electString, brokerId, > (controllerString : String, leaderId : Any) => > KafkaController.parseControllerId(controllerString) == > leaderId.asInstanceOf[Int], > controllerContext.zkSessionTimeout) > The loop dealing with the ephemeral node bug is now only triggered for the > broker that owned the node previously, although I am still not 100% sure if > that is sufficient. -- This message was sent by Atlassian JIRA (v6.2#6252)