[ 
https://issues.apache.org/jira/browse/KAFKA-2448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2448:
--------------------------------
    Description: 
When a broker get bounced, ideally the sequence should be like this:
1.1. Broker shutdown resources.
1.2. Broker close zkClient (this will cause the ephemeral node of 
/brokers/ids/BROKER_ID to be deleted)
1.3. Broker restart and load the log segment
1.4. Broker create ephemeral node /brokers/ids/BROKER_ID
The broker side log s are:
{noformat}
...
2015/08/17 22:42:37.663 INFO [SocketServer] [Thread-1] [kafka-server] [] 
[Socket Server on Broker 1140], Shutting down
2015/08/17 22:42:37.735 INFO [SocketServer] [Thread-1] [kafka-server] [] 
[Socket Server on Broker 1140], Shutdown completed
...
2015/08/17 22:42:53.898 INFO [ZooKeeper] [Thread-1] [kafka-server] [] Session: 
0x14d43fd905f68d7 closed
2015/08/17 22:42:53.898 INFO [ClientCnxn] [main-EventThread] [kafka-server] [] 
EventThread shut down
2015/08/17 22:42:53.898 INFO [KafkaServer] [Thread-1] [kafka-server] [] [Kafka 
Server 1140], shut down completed
...
2015/08/17 22:43:03.306 INFO [ClientCnxn] 
[main-SendThread(zk-ei1-kafkatest.stg.linkedin.com:12913)] [kafka-server] [] 
Session establishment complete on server zk-ei1-kafkatest.stg.linkedin
.com/172.20.73.211:12913, sessionid = 0x24d43fd93d96821, negotiated timeout = 
12000
2015/08/17 22:43:03.306 INFO [ZkClient] [main-EventThread] [kafka-server] [] 
zookeeper state changed (SyncConnected)
...
{noformat}


On the controller side, the sequence should be:
2.1. Controlled shutdown the broker
2.2. BrokerChangeListener fired for /brokers/ids child change because ephemeral 
node is deleted in step 1.2
2.3. BrokerChangeListener fired again for /borkers/ids child change because the 
ephemeral node is created in 1.4

The issue I saw was on controller side, the broker change listener only fired 
once after step 1.4. So the controller did not see any broker change.

{noformat}
2015/08/17 22:41:46.189 [KafkaController] [Controller 1507]: Shutting down 
broker 1140
...
2015/08/17 22:42:38.031 [RequestSendThread] 
[Controller-1507-to-broker-1140-send-thread], Controller 1507 epoch 799 fails 
to send request Name: StopReplicaRequest; Version: 0; CorrelationId: 5334; 
ClientId: ; DeletePartitions: false; ControllerId: 1507; ControllerEpoch: 799; 
Partitions: [seas-decisionboard-searcher-service_call,1] to broker 1140 : 
(EndPoint(eat1-app1140.corp.linkedin.com,10251,PLAINTEXT)). Reconnecting to 
broker.
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
2015/08/17 22:42:38.031 [RequestSendThread] 
[Controller-1507-to-broker-1140-send-thread], Controller 1507 connected to 1140 
: (EndPoint(eat1-app1140.corp.linkedin.com,10251,PLAINTEXT)) for sending state 
change requests
2015/08/17 22:42:38.332 [RequestSendThread] 
[Controller-1507-to-broker-1140-send-thread], Controller 1507 epoch 799 fails 
to send request Name: StopReplicaRequest; Version: 0; CorrelationId: 5334; 
ClientId: ; DeletePartitions: false; ControllerId: 1507; ControllerEpoch: 799; 
Partitions: [seas-decisionboard-searcher-service_call,1] to broker 1140 : 
(EndPoint(eat1-app1140.corp.linkedin.com,10251,PLAINTEXT)). Reconnecting to 
broker.
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

....
2015/08/17 22:43:09.035 [ReplicaStateMachine$BrokerChangeListener] 
[BrokerChangeListener on Controller 1507]: Broker change listener fired for 
path /brokers/ids with children 
1140,1282,1579,871,1556,872,1511,873,874,852,1575,875,1574,1530,854,857,858,859,1493,1272,880,1547,1568,1500,1521,863,864,865,867,1507
2015/08/17 22:43:09.082 [ReplicaStateMachine$BrokerChangeListener] 
[BrokerChangeListener on Controller 1507]: Newly added brokers: , deleted 
brokers: , all live brokers: 
873,1507,1511,1568,1521,852,874,857,1493,1530,875,1282,1574,880,863,858,1556,1547,872,1579,864,1272,859,1575,854,867,865,1500,871
{noformat}

>From ZK transaction log, the zk session in step 1.4 has already be closed.
{noformat}
2015-08-17T22:42:53.899Z, s:0x14d43fd905f68d7, zx:0x26088e0cda UNKNOWN(null)
{noformat}

In this case, there are 10 seconds between step 1.2 and step 1.4
ZK session timeout was set to 12 seconds.
According to our test, the ephemeral node in zookeeper will be deleted after 
the session is explicitly closed. But it seems not the case when the broker 
shuts down.

Another suspicious thing is that even after socket server one broker has 
shutdown, the controller was still be able to connect to the broker and send 
request.

Currently we are setting the zk session timeout to 6 seconds and this seems 
solve the problem. My hunch is that zookeeper somehow did not fire the callback 
in step 1.2. So step 2.2 was not triggered. 

>From the available log, the missing piece here is how do we know if the zk 
>watcher has been fired by zookeeper in step 2.2?

  was:
When a broker get bounced, ideally the sequence should be like this:
1.1. Broker shutdown resources.
1.2. Broker close zkClient (this will cause the ephemeral node of 
/brokers/ids/BROKER_ID to be deleted)
1.3. Broker restart and load the log segment
1.4. Broker create ephemeral node /brokers/ids/BROKER_ID
The broker side log s are:
{noformat}
...
2015/08/17 22:42:37.663 INFO [SocketServer] [Thread-1] [kafka-server] [] 
[Socket Server on Broker 1140], Shutting down
2015/08/17 22:42:37.735 INFO [SocketServer] [Thread-1] [kafka-server] [] 
[Socket Server on Broker 1140], Shutdown completed
...
2015/08/17 22:42:53.898 INFO [ZooKeeper] [Thread-1] [kafka-server] [] Session: 
0x14d43fd905f68d7 closed
2015/08/17 22:42:53.898 INFO [ClientCnxn] [main-EventThread] [kafka-server] [] 
EventThread shut down
2015/08/17 22:42:53.898 INFO [KafkaServer] [Thread-1] [kafka-server] [] [Kafka 
Server 1140], shut down completed
...
2015/08/17 22:43:03.306 INFO [ClientCnxn] 
[main-SendThread(zk-ei1-kafkatest.stg.linkedin.com:12913)] [kafka-server] [] 
Session establishment complete on server zk-ei1-kafkatest.stg.linkedin
.com/172.20.73.211:12913, sessionid = 0x24d43fd93d96821, negotiated timeout = 
12000
2015/08/17 22:43:03.306 INFO [ZkClient] [main-EventThread] [kafka-server] [] 
zookeeper state changed (SyncConnected)
...
{noformat}


On the controller side, the sequence should be:
2.1. Controlled shutdown the broker
2.2. BrokerChangeListener fired for /brokers/ids child change because ephemeral 
node is deleted in step 1.2
2.3. BrokerChangeListener fired again for /borkers/ids child change because the 
ephemeral node is created in 1.4

The issue I saw was on controller side, the broker change listener only fired 
once after step 1.4. So the controller did not see any broker change.

{noformat}
2015/08/17 22:41:46.189 [KafkaController] [Controller 1507]: Shutting down 
broker 1140
...
2015/08/17 22:42:38.031 [RequestSendThread] 
[Controller-1507-to-broker-1140-send-thread], Controller 1507 epoch 799 fails 
to send request Name: StopReplicaRequest; Version: 0; CorrelationId: 5334; 
ClientId: ; DeletePartitions: false; ControllerId: 1507; ControllerEpoch: 799; 
Partitions: [seas-decisionboard-searcher-service_call,1] to broker 1140 : 
(EndPoint(eat1-app1140.corp.linkedin.com,10251,PLAINTEXT)). Reconnecting to 
broker.
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
2015/08/17 22:42:38.031 [RequestSendThread] 
[Controller-1507-to-broker-1140-send-thread], Controller 1507 connected to 1140 
: (EndPoint(eat1-app1140.corp.linkedin.com,10251,PLAINTEXT)) for sending state 
change requests
2015/08/17 22:42:38.332 [RequestSendThread] 
[Controller-1507-to-broker-1140-send-thread], Controller 1507 epoch 799 fails 
to send request Name: StopReplicaRequest; Version: 0; CorrelationId: 5334; 
ClientId: ; DeletePartitions: false; ControllerId: 1507; ControllerEpoch: 799; 
Partitions: [seas-decisionboard-searcher-service_call,1] to broker 1140 : 
(EndPoint(eat1-app1140.corp.linkedin.com,10251,PLAINTEXT)). Reconnecting to 
broker.
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

....
2015/08/17 22:43:09.035 [ReplicaStateMachine$BrokerChangeListener] 
[BrokerChangeListener on Controller 1507]: Broker change listener fired for 
path /brokers/ids with children 
1140,1282,1579,871,1556,872,1511,873,874,852,1575,875,1574,1530,854,857,858,859,1493,1272,880,1547,1568,1500,1521,863,864,865,867,1507
2015/08/17 22:43:09.082 [ReplicaStateMachine$BrokerChangeListener] 
[BrokerChangeListener on Controller 1507]: Newly added brokers: , deleted 
brokers: , all live brokers: 
873,1507,1511,1568,1521,852,874,857,1493,1530,875,1282,1574,880,863,858,1556,1547,872,1579,864,1272,859,1575,854,867,865,1500,871
{noformat}

>From ZK transaction log, the zk session in step 1.4 has already be closed.
{noformat}
2015-08-17T22:42:53.899Z, s:0x14d43fd905f68d7, zx:0x26088e0cda UNKNOWN(null)
{noformat}

In this case, there are 10 seconds between step 1.2 and step 1.4
ZK session timeout was set to 12 seconds.
According to our test, the ephemeral node in zookeeper will be deleted after 
the session is explicitly closed. But it seems not the case when the broker 
shuts down.

Another suspicious thing is that even after socket server one broker has 
shutdown, the controller was still be able to connect to the broker and send 
request.

Currently we are setting the zk session timeout to 6 seconds and this seems 
solve the problem. My hunch is that zookeeper somehow did not fire the callback 
in step 1.4. So step 2.2 was not triggered. 

>From the available log, the missing piece here is how do we know if the zk 
>watcher has been fired by zookeeper in step 2.2?


> BrokerChangeListener missed broker id path ephemeral node deletion event.
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-2448
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2448
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jiangjie Qin
>            Assignee: Flavio Junqueira
>
> When a broker get bounced, ideally the sequence should be like this:
> 1.1. Broker shutdown resources.
> 1.2. Broker close zkClient (this will cause the ephemeral node of 
> /brokers/ids/BROKER_ID to be deleted)
> 1.3. Broker restart and load the log segment
> 1.4. Broker create ephemeral node /brokers/ids/BROKER_ID
> The broker side log s are:
> {noformat}
> ...
> 2015/08/17 22:42:37.663 INFO [SocketServer] [Thread-1] [kafka-server] [] 
> [Socket Server on Broker 1140], Shutting down
> 2015/08/17 22:42:37.735 INFO [SocketServer] [Thread-1] [kafka-server] [] 
> [Socket Server on Broker 1140], Shutdown completed
> ...
> 2015/08/17 22:42:53.898 INFO [ZooKeeper] [Thread-1] [kafka-server] [] 
> Session: 0x14d43fd905f68d7 closed
> 2015/08/17 22:42:53.898 INFO [ClientCnxn] [main-EventThread] [kafka-server] 
> [] EventThread shut down
> 2015/08/17 22:42:53.898 INFO [KafkaServer] [Thread-1] [kafka-server] [] 
> [Kafka Server 1140], shut down completed
> ...
> 2015/08/17 22:43:03.306 INFO [ClientCnxn] 
> [main-SendThread(zk-ei1-kafkatest.stg.linkedin.com:12913)] [kafka-server] [] 
> Session establishment complete on server zk-ei1-kafkatest.stg.linkedin
> .com/172.20.73.211:12913, sessionid = 0x24d43fd93d96821, negotiated timeout = 
> 12000
> 2015/08/17 22:43:03.306 INFO [ZkClient] [main-EventThread] [kafka-server] [] 
> zookeeper state changed (SyncConnected)
> ...
> {noformat}
> On the controller side, the sequence should be:
> 2.1. Controlled shutdown the broker
> 2.2. BrokerChangeListener fired for /brokers/ids child change because 
> ephemeral node is deleted in step 1.2
> 2.3. BrokerChangeListener fired again for /borkers/ids child change because 
> the ephemeral node is created in 1.4
> The issue I saw was on controller side, the broker change listener only fired 
> once after step 1.4. So the controller did not see any broker change.
> {noformat}
> 2015/08/17 22:41:46.189 [KafkaController] [Controller 1507]: Shutting down 
> broker 1140
> ...
> 2015/08/17 22:42:38.031 [RequestSendThread] 
> [Controller-1507-to-broker-1140-send-thread], Controller 1507 epoch 799 fails 
> to send request Name: StopReplicaRequest; Version: 0; CorrelationId: 5334; 
> ClientId: ; DeletePartitions: false; ControllerId: 1507; ControllerEpoch: 
> 799; Partitions: [seas-decisionboard-searcher-service_call,1] to broker 1140 
> : (EndPoint(eat1-app1140.corp.linkedin.com,10251,PLAINTEXT)). Reconnecting to 
> broker.
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
>         at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>         at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> 2015/08/17 22:42:38.031 [RequestSendThread] 
> [Controller-1507-to-broker-1140-send-thread], Controller 1507 connected to 
> 1140 : (EndPoint(eat1-app1140.corp.linkedin.com,10251,PLAINTEXT)) for sending 
> state change requests
> 2015/08/17 22:42:38.332 [RequestSendThread] 
> [Controller-1507-to-broker-1140-send-thread], Controller 1507 epoch 799 fails 
> to send request Name: StopReplicaRequest; Version: 0; CorrelationId: 5334; 
> ClientId: ; DeletePartitions: false; ControllerId: 1507; ControllerEpoch: 
> 799; Partitions: [seas-decisionboard-searcher-service_call,1] to broker 1140 
> : (EndPoint(eat1-app1140.corp.linkedin.com,10251,PLAINTEXT)). Reconnecting to 
> broker.
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
>         at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>         at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> ....
> 2015/08/17 22:43:09.035 [ReplicaStateMachine$BrokerChangeListener] 
> [BrokerChangeListener on Controller 1507]: Broker change listener fired for 
> path /brokers/ids with children 
> 1140,1282,1579,871,1556,872,1511,873,874,852,1575,875,1574,1530,854,857,858,859,1493,1272,880,1547,1568,1500,1521,863,864,865,867,1507
> 2015/08/17 22:43:09.082 [ReplicaStateMachine$BrokerChangeListener] 
> [BrokerChangeListener on Controller 1507]: Newly added brokers: , deleted 
> brokers: , all live brokers: 
> 873,1507,1511,1568,1521,852,874,857,1493,1530,875,1282,1574,880,863,858,1556,1547,872,1579,864,1272,859,1575,854,867,865,1500,871
> {noformat}
> From ZK transaction log, the zk session in step 1.4 has already be closed.
> {noformat}
> 2015-08-17T22:42:53.899Z, s:0x14d43fd905f68d7, zx:0x26088e0cda UNKNOWN(null)
> {noformat}
> In this case, there are 10 seconds between step 1.2 and step 1.4
> ZK session timeout was set to 12 seconds.
> According to our test, the ephemeral node in zookeeper will be deleted after 
> the session is explicitly closed. But it seems not the case when the broker 
> shuts down.
> Another suspicious thing is that even after socket server one broker has 
> shutdown, the controller was still be able to connect to the broker and send 
> request.
> Currently we are setting the zk session timeout to 6 seconds and this seems 
> solve the problem. My hunch is that zookeeper somehow did not fire the 
> callback in step 1.2. So step 2.2 was not triggered. 
> From the available log, the missing piece here is how do we know if the zk 
> watcher has been fired by zookeeper in step 2.2?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to