[ https://issues.apache.org/jira/browse/KAFKA-1387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14151673#comment-14151673 ]
James Lent commented on KAFKA-1387: ----------------------------------- In case anyone is interested in the complete code for the new class I am testing with: {noformat} class EphemeralNodeMonitor(zkClient: ZkClient, path: String, recreateNode: () => Unit) extends Logging { val dataListener = new DataListener val stateListener = new StateListener def start() { zkClient.subscribeStateChanges(stateListener) zkClient.subscribeDataChanges(path, dataListener) } def close() { zkClient.unsubscribeStateChanges(stateListener) zkClient.unsubscribeDataChanges(path, dataListener) } class DataListener extends IZkDataListener { var oldData: String = null def handleDataChange(dataPath: String, newData: scala.Any) { if (!newData.toString.equals(oldData)) { oldData = newData.toString info("Ephemeral node %s has new data [%s]".format(dataPath, newData)) } } def handleDataDeleted(dataPath: String) { if (zkClient.exists(path)) { info("Ephemeral node %s was deleted, but, has already been recreated".format(dataPath)) } else { info("Ephemeral node %s was deleted, recreate it".format(dataPath)) recreateNode() } } } class StateListener() extends IZkStateListener { def handleStateChanged(state: KeeperState) {} def handleNewSession() { if (zkClient.exists(path)) { info("New session started, but, ephemeral %s already/still exists".format(path)) } else { info("New session started, recreate ephemeral node %s".format(path)) recreateNode() } } } {noformat} > Kafka getting stuck creating ephemeral node it has already created when two > zookeeper sessions are established in a very short period of time > --------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-1387 > URL: https://issues.apache.org/jira/browse/KAFKA-1387 > Project: Kafka > Issue Type: Bug > Reporter: Fedor Korotkiy > > Kafka broker re-registers itself in zookeeper every time handleNewSession() > callback is invoked. > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaHealthcheck.scala > > Now imagine the following sequence of events. > 1) Zookeeper session reestablishes. handleNewSession() callback is queued by > the zkClient, but not invoked yet. > 2) Zookeeper session reestablishes again, queueing callback second time. > 3) First callback is invoked, creating /broker/[id] ephemeral path. > 4) Second callback is invoked and it tries to create /broker/[id] path using > createEphemeralPathExpectConflictHandleZKBug() function. But the path is > already exists, so createEphemeralPathExpectConflictHandleZKBug() is getting > stuck in the infinite loop. > Seems like controller election code have the same issue. > I'am able to reproduce this issue on the 0.8.1 branch from github using the > following configs. > # zookeeper > tickTime=10 > dataDir=/tmp/zk/ > clientPort=2101 > maxClientCnxns=0 > # kafka > broker.id=1 > log.dir=/tmp/kafka > zookeeper.connect=localhost:2101 > zookeeper.connection.timeout.ms=100 > zookeeper.sessiontimeout.ms=100 > Just start kafka and zookeeper and then pause zookeeper several times using > Ctrl-Z. -- This message was sent by Atlassian JIRA (v6.3.4#6332)