[ https://issues.apache.org/jira/browse/KAFKA-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiangjie Qin updated KAFKA-2448: -------------------------------- Assignee: Flavio Junqueira > BrokerChangeListener missed broker id path ephemeral node deletion event. > ------------------------------------------------------------------------- > > Key: KAFKA-2448 > URL: https://issues.apache.org/jira/browse/KAFKA-2448 > Project: Kafka > Issue Type: Bug > Reporter: Jiangjie Qin > Assignee: Flavio Junqueira > > When a broker get bounced, ideally the sequence should be like this: > 1.1. Broker shutdown resources. > 1.2. Broker close zkClient (this will cause the ephemeral node of > /brokers/ids/BROKER_ID to be deleted) > 1.3. Broker restart and load the log segment > 1.4. Broker create ephemeral node /brokers/ids/BROKER_ID > The broker side log s are: > {noformat} > ... > 2015/08/17 22:42:37.663 INFO [SocketServer] [Thread-1] [kafka-server] [] > [Socket Server on Broker 1140], Shutting down > 2015/08/17 22:42:37.735 INFO [SocketServer] [Thread-1] [kafka-server] [] > [Socket Server on Broker 1140], Shutdown completed > ... > 2015/08/17 22:42:53.898 INFO [ZooKeeper] [Thread-1] [kafka-server] [] > Session: 0x14d43fd905f68d7 closed > 2015/08/17 22:42:53.898 INFO [ClientCnxn] [main-EventThread] [kafka-server] > [] EventThread shut down > 2015/08/17 22:42:53.898 INFO [KafkaServer] [Thread-1] [kafka-server] [] > [Kafka Server 1140], shut down completed > ... > 2015/08/17 22:43:03.306 INFO [ClientCnxn] > [main-SendThread(zk-ei1-kafkatest.stg.linkedin.com:12913)] [kafka-server] [] > Session establishment complete on server zk-ei1-kafkatest.stg.linkedin > .com/172.20.73.211:12913, sessionid = 0x24d43fd93d96821, negotiated timeout = > 12000 > 2015/08/17 22:43:03.306 INFO [ZkClient] [main-EventThread] [kafka-server] [] > zookeeper state changed (SyncConnected) > ... > {noformat} > On the controller side, the sequence should be: > 2.1. Controlled shutdown the broker > 2.2. BrokerChangeListener fired for /brokers/ids child change because > ephemeral node is deleted in step 1.2 > 2.3. BrokerChangeListener fired again for /borkers/ids child change because > the ephemeral node is created in 1.4 > The issue I saw was on controller side, the broker change listener only fired > once after step 1.4. So the controller did not see any broker change. > {noformat} > 2015/08/17 22:41:46.189 [KafkaController] [Controller 1507]: Shutting down > broker 1140 > ... > 2015/08/17 22:42:38.031 [RequestSendThread] > [Controller-1507-to-broker-1140-send-thread], Controller 1507 epoch 799 fails > to send request Name: StopReplicaRequest; Version: 0; CorrelationId: 5334; > ClientId: ; DeletePartitions: false; ControllerId: 1507; ControllerEpoch: > 799; Partitions: [seas-decisionboard-searcher-service_call,1] to broker 1140 > : (EndPoint(eat1-app1140.corp.linkedin.com,10251,PLAINTEXT)). Reconnecting to > broker. > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) > at > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > 2015/08/17 22:42:38.031 [RequestSendThread] > [Controller-1507-to-broker-1140-send-thread], Controller 1507 connected to > 1140 : (EndPoint(eat1-app1140.corp.linkedin.com,10251,PLAINTEXT)) for sending > state change requests > 2015/08/17 22:42:38.332 [RequestSendThread] > [Controller-1507-to-broker-1140-send-thread], Controller 1507 epoch 799 fails > to send request Name: StopReplicaRequest; Version: 0; CorrelationId: 5334; > ClientId: ; DeletePartitions: false; ControllerId: 1507; ControllerEpoch: > 799; Partitions: [seas-decisionboard-searcher-service_call,1] to broker 1140 > : (EndPoint(eat1-app1140.corp.linkedin.com,10251,PLAINTEXT)). Reconnecting to > broker. > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) > at > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > .... > 2015/08/17 22:43:09.035 [ReplicaStateMachine$BrokerChangeListener] > [BrokerChangeListener on Controller 1507]: Broker change listener fired for > path /brokers/ids with children > 1140,1282,1579,871,1556,872,1511,873,874,852,1575,875,1574,1530,854,857,858,859,1493,1272,880,1547,1568,1500,1521,863,864,865,867,1507 > 2015/08/17 22:43:09.082 [ReplicaStateMachine$BrokerChangeListener] > [BrokerChangeListener on Controller 1507]: Newly added brokers: , deleted > brokers: , all live brokers: > 873,1507,1511,1568,1521,852,874,857,1493,1530,875,1282,1574,880,863,858,1556,1547,872,1579,864,1272,859,1575,854,867,865,1500,871 > {noformat} > From ZK transaction log, the zk session in step 1.4 has already be closed. > {noformat} > 2015-08-17T22:42:53.899Z, s:0x14d43fd905f68d7, zx:0x26088e0cda UNKNOWN(null) > {noformat} > In this case, there are 10 seconds between step 1.2 and step 1.4 > ZK session timeout was set to 12 seconds. > According to our test, the ephemeral node in zookeeper will be deleted after > the session is explicitly closed. But it seems not the case when the broker > shuts down. > Another suspicious thing is that even after socket server one broker has > shutdown, the controller was still be able to connect to the broker and send > request. > Currently we are setting the zk session timeout to 6 seconds and this seems > solve the problem. My hunch is that zookeeper somehow did not fire the > callback in step 1.4. So step 2.2 was not triggered. > From the available log, the missing piece here is how do we know if the zk > watcher has been fired by zookeeper in step 2.2? -- This message was sent by Atlassian JIRA (v6.3.4#6332)