[ 
https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14105480#comment-14105480
 ] 

Yiyang Li commented on KAFKA-1029:
----------------------------------

Hello, I am totally new to Kafka, and we are .Net developers who is testing the 
kafka by the kafka-net library. The Nuget package is still under alpha version, 
leaving lots of functionality not implemented. 

We recently got this problem when we embed a producer in a service, where 
according to the library, it will do a ResponseTimeoutCheck every 30000 ms. 
However, one of the client throws an error (the others are fine)

ERROR Closing socket for /10.207.x.x because of error (kafka.network.Processor)
java.io.IOException: An existing connection was forcibly closed by the remote 
host
        at sun.nio.ch.SocketDispatcher.write0(Native Method)
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:51)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
        at sun.nio.ch.IOUtil.write(IOUtil.java:65)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466)
        at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:217)
        at kafka.network.Processor.write(SocketServer.scala:375)
        at kafka.network.Processor.run(SocketServer.scala:247)
        at java.lang.Thread.run(Thread.java:745)
log4j:ERROR Failed to rename [/cygdrive/c/kafka/bin/../logs/server.log] to 
[/cygdrive/c/kafka/bin/../logs/server.log.2014-08-20-17].

under the kafka-server-start.sh

I have change the log4j properties to 

log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log

Other backgound:
Kafka binaries: Scala 2.9.2 - kafka_2.9.2-0.8.1.1
Brokers: 3 brokers in 3 different machines, using the same port under each IP  
(borker id = 0, 1, 2)
Zookeeper: 2181 port at one of the brokers

I understand that it might be hard to reproduce as it might be the problem in 
incomplete .Net client

Details of ResponseTimeoutCheck:

https://github.com/YiyangLi/kafka-net/blob/master/src/kafka-net/KafkaConnection.cs
 (Line 200)

Thanks. 


> Zookeeper leader election stuck in ephemeral node retry loop
> ------------------------------------------------------------
>
>                 Key: KAFKA-1029
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1029
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 0.8.0
>            Reporter: Sam Meder
>            Assignee: Sam Meder
>            Priority: Blocker
>             Fix For: 0.8.0
>
>         Attachments: 
> 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch
>
>
> We're seeing the following log statements (over and over):
> [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3, 
> "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2, 
> "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$)
> [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{ 
> "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a 
> while back in a different session, hence I will backoff for this node to be 
> deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> where the broker is essentially stuck in the loop that is trying to deal with 
> left-over ephemeral nodes. The code looks a bit racy to me. In particular:
> ZookeeperLeaderElector:
>   def elect: Boolean = {
>     controllerContext.zkClient.subscribeDataChanges(electionPath, 
> leaderChangeListener)
>     val timestamp = SystemTime.milliseconds.toString
>     val electString = ...
>     try {
>       
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, leaderId,
>         (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
>         controllerContext.zkSessionTimeout)
> leaderChangeListener is registered before the create call (by the way, it 
> looks like a new registration will be added every elect call - shouldn't it 
> register in startup()?) so can update leaderId to the current leader before 
> the call to create. If that happens then we will continuously get node exists 
> exceptions and the checker function will always return true, i.e. we will 
> never get out of the while(true) loop.
> I think the right fix here is to pass brokerId instead of leaderId when 
> calling create, i.e.
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, brokerId,
>         (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
>         controllerContext.zkSessionTimeout)
> The loop dealing with the ephemeral node bug is now only triggered for the 
> broker that owned the node previously, although I am still not 100% sure if 
> that is sufficient.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to