I setup a 2 node cluster. With replication-factor 2. Create 10 topics with 131 
partitions each then tried to do a controlled shutdown of broker #2 (which is 
the current elected leader).

(Sorry, our Broker #1 and Broker #2 server times are 6h off, I will have to get 
that fixed).


According to Zookeeper Broker #2 is still the controller:

[zk: tm1mwwm001:2181(CONNECTED) 13] get /controller
{"version":1,"brokerid":2,"timestamp":"1397726326047"}
cZxid = 0x11e
ctime = Thu Apr 17 09:18:43 MDT 2014
mZxid = 0x11e
mtime = Thu Apr 17 09:18:43 MDT 2014
pZxid = 0x11e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1456658f6770011
dataLength = 54
numChildren = 0


>From Broker #2 - server.log

[2014-04-17 03:25:59.471-0600] WARN [Kafka Server 2], Retrying controlled 
shutdown after the previous attempt failed... (kafka.server.KafkaServer)
[2014-04-17 03:25:59.472-0600] WARN [Kafka Server 2], Proceeding to do an 
unclean shutdown as all the controlled shutdown attempts failed 
(kafka.server.Kafka
Server)
[2014-04-17 03:25:59.474-0600] INFO [Socket Server on Broker 2], Shutting down 
(kafka.network.SocketServer)
[2014-04-17 03:25:59.481-0600] INFO [Socket Server on Broker 2], Shutdown 
completed (kafka.network.SocketServer)
[2014-04-17 03:25:59.482-0600] INFO [Kafka Request Handler on Broker 2], 
shutting down (kafka.server.KafkaRequestHandlerPool)
[2014-04-17 03:25:59.561-0600] WARN [KafkaApi-2] Fetch request with correlation 
id 688 from client ReplicaFetcherThread-4-2 on partition [test8,26] failed du
e to Leader not local for partition [test8,26] on broker 2 
(kafka.server.KafkaApis)
[2014-04-17 03:25:59.563-0600] WARN [KafkaApi-2] Fetch request with correlation 
id 688 from client ReplicaFetcherThread-6-2 on partition [test4,96] failed du
e to Leader not local for partition [test4,96] on broker 2 
(kafka.server.KafkaApis)
[2014-04-17 03:25:59.602-0600] WARN [KafkaApi-2] Fetch request with correlation 
id 430 from client ReplicaFetcherThread-1-2 on partition [test3,18] failed du
e to Leader not local for partition [test3,18] on broker 2 
(kafka.server.KafkaApis)
[2014-04-17 03:26:10.227-0600] INFO Partition [test9,124] on broker 2: 
Shrinking ISR for partition [test9,124] from 2,1 to 2 (kafka.cluster.Partition)
[2014-04-17 03:26:10.229-0600] INFO Partition [test1,87] on broker 2: Shrinking 
ISR for partition [test1,87] from 2,1 to 2 (kafka.cluster.Partition)
[2014-04-17 03:26:10.230-0600] INFO Partition [test9,68] on broker 2: Shrinking 
ISR for partition [test9,68] from 2,1 to 2 (kafka.cluster.Partition)
[2014-04-17 03:26:10.232-0600] ERROR Conditional update of path 
/brokers/topics/test9/partitions/68/state with data 
{"controller_epoch":3,"leader":2,"version
":1,"leader_epoch":0,"isr":[2]} and expected version 0 failed due to 
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
BadVersion f
or /brokers/topics/test9/partitions/68/state (kafka.utils.ZkUtils$)
[2014-04-17 03:26:10.233-0600] INFO Partition [test9,68] on broker 2: Cached 
zkVersion [0] not equal to that in zookeeper, skip updating ISR (kafka.cluster.P
artition)

Broker #2 - state-change.log
*only TRACE messages*

[2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending 
UpdateMetadata request (Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:3) with 
correlationId 1966 to broker 2 for partition [test1,65] (state.change.logger)
[2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending 
UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3) with 
correlationId 1966 to broker 2 for partition [test9,110] (state.change.logger)
[2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending 
UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3) with 
correlationId 1966 to broker 2 for partition [test10,11] (state.change.logger)
[2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending 
UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3) with 
correlationId 1966 to broker 2 for partition [test8,61] (state.change.logger)

Broker #2 - controller.log

[2014-04-17 03:25:59.746-0600] DEBUG The stop replica request (delete = true) 
sent to broker 2 is  (kafka.controller.ControllerBrokerRequestBatch)
[2014-04-17 03:25:59.746-0600] DEBUG The stop replica request (delete = false) 
sent to broker 2 is [Topic=test7,Partition=123,Replica=2] 
(kafka.controller.ControllerBrokerRequestBatch)
[2014-04-17 03:26:29.469-0600] WARN [Controller-2-to-broker-2-send-thread], 
Controller 2 fails to send a request to broker id:2,host:tm1mwwm002,port:9092 
(kafka.controller.RequestSendThread)
java.net.SocketTimeoutException
        at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
        at kafka.utils.Utils$.read(Utils.scala:375)
        at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 03:26:29.470-0600] INFO [Replica state machine on controller 2]: 
Invoking state change to OfflineReplica for replicas 
[Topic=test7,Partition=123,Replica=2] (kafka.controller.ReplicaStateMachine)
[2014-04-17 03:26:29.470-0600] DEBUG [Controller 2]: Removing replica 2 from 
ISR 1,2 for partition [test7,123]. (kafka.controller.KafkaController)
[2014-04-17 03:26:29.471-0600] ERROR [Controller-2-to-broker-2-send-thread], 
Controller 2 epoch 3 failed to send LeaderAndIsr request with correlation id 
1960 to broker id:2,host:tm1mwwm002,port:9092. Reconnecting to broker. 
(kafka.controller.RequestSendThread)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 03:26:29.473-0600] ERROR [Controller-2-to-broker-2-send-thread], 
Controller 2's connection to broker id:2,host:tm1mwwm002,port:9092 was 
unsuccessful (kafka.controller.RequestSendThread)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at 
kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 03:26:29.475-0600] INFO [Controller 2]: New leader and ISR for 
partition [test7,123] is {"leader":1,"leader_epoch":1,"isr":[1]} 
(kafka.controller.KafkaController)


*seeing repeated messages of these*

[2014-04-17 03:31:19.939-0600] ERROR [Controller-2-to-broker-2-send-thread], 
Controller 2 epoch 3 failed to send LeaderAndIsr request with correlation id 
1960 to broker id:2,host:tm1mwwm002,port:9092. Reconnecting to broker. 
(kafka.controller.RequestSendThread)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 03:31:19.940-0600] ERROR [Controller-2-to-broker-2-send-thread], 
Controller 2's connection to broker id:2,host:tm1mwwm002,port:9092 was 
unsuccessful (kafka.controller.RequestSendThread)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at 
kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 03:31:20.240-0600] ERROR [Controller-2-to-broker-2-send-thread], 
Controller 2 epoch 3 failed to send LeaderAndIsr request with correlation id 
1960 to broker id:2,host:tm1mwwm002,port:9092. Reconnecting to broker. 
(kafka.controller.RequestSendThread)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 03:31:20.240-0600] ERROR [Controller-2-to-broker-2-send-thread], 
Controller 2's connection to broker id:2,host:tm1mwwm002,port:9092 was 
unsuccessful (kafka.controller.RequestSendThread)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at 
kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
        at 
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


Broker #2 - server.log

[2014-04-17 03:34:20.226-0600] INFO Partition [test3,108] on broker 2: 
Shrinking ISR for partition [test3,108] from 2,1 to 2 (kafka.cluster.Partition)
[2014-04-17 03:34:20.227-0600] ERROR Conditional update of path 
/brokers/topics/test3/partitions/108/state with data 
{"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} and 
expected version 0 failed due to 
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
BadVersion for /brokers/topics/test3/partitions/108/state (kafka.utils.ZkUtils$)
[2014-04-17 03:34:20.227-0600] INFO Partition [test3,108] on broker 2: Cached 
zkVersion [0] not equal to that in zookeeper, skip updating ISR 
(kafka.cluster.Partition)
[2014-04-17 03:34:20.227-0600] INFO Partition [test5,101] on broker 2: 
Shrinking ISR for partition [test5,101] from 2,1 to 2 (kafka.cluster.Partition)
[2014-04-17 03:34:20.228-0600] ERROR Conditional update of path 
/brokers/topics/test5/partitions/101/state with data 
{"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} and 
expected version 0 failed due to 
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
BadVersion for /brokers/topics/test5/partitions/101/state (kafka.utils.ZkUtils$)
[2014-04-17 03:34:20.228-0600] INFO Partition [test5,101] on broker 2: Cached 
zkVersion [0] not equal to that in zookeeper, skip updating ISR 
(kafka.cluster.Partition)
[2014-04-17 03:34:20.229-0600] INFO Partition [test5,33] on broker 2: Shrinking 
ISR for partition [test5,33] from 2,1 to 2 (kafka.cluster.Partition)
[2014-04-17 03:34:20.230-0600] ERROR Conditional update of path 
/brokers/topics/test5/partitions/33/state with data 
{"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} and 
expected version 0 failed due to 
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
BadVersion for /brokers/topics/test5/partitions/33/state (kafka.utils.ZkUtils$)
[2014-04-17 03:34:20.230-0600] INFO Partition [test5,33] on broker 2: Cached 
zkVersion [0] not equal to that in zookeeper, skip updating ISR 
(kafka.cluster.Partition)



Broker #1 - server.log

[2014-04-17 09:25:56.833-0600] INFO [ReplicaFetcherManager on broker 1] Removed 
fetcher for partitions [test8,26] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:25:56.887-0600] INFO [ReplicaFetcherManager on broker 1] Removed 
fetcher for partitions [test3,18] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:25:56.945-0600] INFO [ReplicaFetcherManager on broker 1] Removed 
fetcher for partitions [test5,33] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:25:56.995-0600] INFO [ReplicaFetcherManager on broker 1] Removed 
fetcher for partitions [test9,68] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:25:57.045-0600] INFO [ReplicaFetcherManager on broker 1] Removed 
fetcher for partitions [test3,108] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:25:57.094-0600] INFO [ReplicaFetcherManager on broker 1] Removed 
fetcher for partitions [test6,1] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:25:57.144-0600] INFO [ReplicaFetcherManager on broker 1] Removed 
fetcher for partitions [test5,101] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:26:26.465-0600] WARN Reconnect due to socket error: null 
(kafka.consumer.SimpleConsumer)
[2014-04-17 09:26:26.466-0600] WARN Reconnect due to socket error: null 
(kafka.consumer.SimpleConsumer)
[2014-04-17 09:26:26.467-0600] WARN Reconnect due to socket error: null 
(kafka.consumer.SimpleConsumer)
[2014-04-17 09:26:26.473-0600] ERROR [ReplicaFetcherThread-4-2], Error in fetch 
Name: FetchRequest; Version: 0; CorrelationId: 688; ClientId: 
ReplicaFetcherThread-4-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [test4,126] -> PartitionFetchInfo(0,10485760),[test8,66] -> 
PartitionFetchInfo(0,10485760),[test8,26] -> 
PartitionFetchInfo(0,10485760),[test4,22] -> PartitionFetchInfo(0,10485760) 
(kafka.server.ReplicaFetcherThread)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
        at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


*lots of spamming of these type of log entries*

[2014-04-17 09:37:14.524-0600] WARN Reconnect due to socket error: null 
(kafka.consumer.SimpleConsumer)
[2014-04-17 09:37:14.524-0600] ERROR [ReplicaFetcherThread-0-2], Error in fetch 
Name: FetchRequest; Version: 0; CorrelationId: 546078; ClientId: 
ReplicaFetcherThread-0-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [test5,67] -> PartitionFetchInfo(0,10485760),[test4,122] -> 
PartitionFetchInfo(0,10485760),[test1,79] -> 
PartitionFetchInfo(0,10485760),[test1,127] -> 
PartitionFetchInfo(0,10485760),[test1,87] -> PartitionFetchInfo(0,10485760) 
(kafka.server.ReplicaFetcherThread)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
        at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 09:37:14.524-0600] WARN Reconnect due to socket error: null 
(kafka.consumer.SimpleConsumer)
[2014-04-17 09:37:14.524-0600] ERROR [ReplicaFetcherThread-1-2], Error in fetch 
Name: FetchRequest; Version: 0; CorrelationId: 561475; ClientId: 
ReplicaFetcherThread-1-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [test3,10] -> PartitionFetchInfo(0,10485760),[test10,90] -> 
PartitionFetchInfo(0,10485760) (kafka.server.ReplicaFetcherThread)
java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
        at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
        at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
        at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


Broker #1 - state-change.log
*All TRACE messages*


[2014-04-17 09:25:57.195-0600] TRACE Broker 1 cached leader info 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3),ReplicationFactor:2),AllReplicas:1,2)
 for partition [test10,11] in response to UpdateMetadata request sent by 
controller 2 epoch 3 with correlation id 1964 (state.change.logger)
[2014-04-17 09:25:57.195-0600] TRACE Broker 1 cached leader info 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3),ReplicationFactor:2),AllReplicas:1,2)
 for partition [test8,61] in response to UpdateMetadata request sent by 
controller 2 epoch 3 with correlation id 1964 (state.change.logger)
[2014-04-17 09:26:26.875-0600] TRACE Broker 1 received LeaderAndIsr request 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3),ReplicationFactor:2),AllReplicas:1,2)
 correlation id 1966 from controller 2 epoch 3 for partition [test7,123] 
(state.change.logger)
[2014-04-17 09:26:26.875-0600] TRACE Broker 1 handling LeaderAndIsr request 
correlationId 1966 from controller 2 epoch 3 starting the become-leader 
transition for partition [test7,123] (state.change.logger)
[2014-04-17 09:26:26.877-0600] TRACE Broker 1 stopped fetchers as part of 
become-leader request from controller 2 epoch 3 with correlation id 1966 for 
partition [test7,123] (state.change.logger)
[2014-04-17 09:26:26.877-0600] TRACE Broker 1 completed LeaderAndIsr request 
correlationId 1966 from controller 2 epoch 3 for the become-leader transition 
for partition [test7,123] (state.change.logger)

Broker #1 - controller.log


[2014-04-17 09:18:42.887-0600] DEBUG [Partition state machine on Controller 1]: 
After leader election, leader cache is updated to Map([test_topic,0] -> 
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,7] -> 
(Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,4] -> 
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,10] -> 
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,2] -> 
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,3] -> 
(Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,9] -> 
(Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,6] -> 
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,12] -> 
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,8] -> 
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,1] -> 
(Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,11] -> 
(Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,5] -> 
(Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2)) 
(kafka.controller.PartitionStateMachine)
[2014-04-17 09:18:43.514-0600] INFO [Controller-1-to-broker-2-send-thread], 
Shutting down (kafka.controller.RequestSendThread)
[2014-04-17 09:18:43.515-0600] WARN [Controller-1-to-broker-2-send-thread], 
Controller 1 fails to send a request to broker id:2,host:tm1mwwm002,port:9092 
(kafka.controller.RequestSendThread)
java.nio.channels.AsynchronousCloseException
        at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
        at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
        at kafka.utils.Utils$.read(Utils.scala:375)
        at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
        at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 09:18:43.515-0600] INFO [Controller-1-to-broker-2-send-thread], 
Stopped  (kafka.controller.RequestSendThread)
[2014-04-17 09:18:43.515-0600] INFO [Controller-1-to-broker-2-send-thread], 
Shutdown completed (kafka.controller.RequestSendThread)
[2014-04-17 09:18:43.516-0600] INFO [Controller-1-to-broker-1-send-thread], 
Shutting down (kafka.controller.RequestSendThread)
[2014-04-17 09:18:43.516-0600] INFO [Controller-1-to-broker-1-send-thread], 
Stopped  (kafka.controller.RequestSendThread)
[2014-04-17 09:18:43.516-0600] INFO [Controller-1-to-broker-1-send-thread], 
Shutdown completed (kafka.controller.RequestSendThread)
[2014-04-17 09:18:43.517-0600] INFO [Controller 1]: Controller shutdown 
complete (kafka.controller.KafkaController)
[2014-04-17 09:20:09.860-0600] INFO [ControllerEpochListener on 1]: Initialized 
controller epoch to 3 and zk version 2 
(kafka.controller.ControllerEpochListener)
[2014-04-17 09:20:09.904-0600] INFO [Controller 1]: Controller starting up 
(kafka.controller.KafkaController)
[2014-04-17 09:20:09.961-0600] INFO [Controller 1]: Controller startup complete 
(kafka.controller.KafkaController)



- Bob


-----Original Message-----
From: Seshadri, Balaji
Sent: Thursday, April 17, 2014 7:54 AM
To: users@kafka.apache.org; Bello, Bob
Subject: RE: Controller is not being failed over 0.8.1

The ZK data was showing controller as still broker-1.

@Bob,

Can you please send errors you see in the state-change.log.

Thanks,

Balaji
________________________________________
From: Jun Rao [jun...@gmail.com]
Sent: Wednesday, April 16, 2014 9:37 PM
To: users@kafka.apache.org
Subject: Re: Controller is not being failed over 0.8.1

What's the controller value in the zk path (see
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper)?
Any error in the controller/state-change log?

Thanks,

Jun


On Wed, Apr 16, 2014 at 10:07 AM, Seshadri, Balaji <balaji.sesha...@dish.com
> wrote:

> Hi,
>
> We got the following error spamming the logs when broker 1 is the
> controller and we are shutting it down in controlled manner not kill -9.
>
> The leader being switched to broker 2 for all partitions but controller is
> not being failed over to broker 2.
>
> [2014-04-16 10:48:47.976-0600] ERROR
> [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker
> id:1,host:tm1-kafkabroker101,port:9092 was unsuccessful
> (kafka.controller.RequestSendThread)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect0(Native Method)
>         at sun.nio.ch.Net.connect(Net.java:465)
>         at sun.nio.ch.Net.connect(Net.java:457)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
>         at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-16 10:48:48.276-0600] ERROR
> [Controller-1-to-broker-1-send-thread], Controller 1 epoch 38 failed to
> send LeaderAndIsr request with correlation id 1766 to broker
> id:1,host:tm1-kafkabroker101,port:9092. Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
>         at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-16 10:48:48.277-0600] ERROR
> [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker
> id:1,host:tm1-kafkabroker101,port:9092 was unsuccessful
> (kafka.controller.RequestSendThread)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect0(Native Method)
>         at sun.nio.ch.Net.connect(Net.java:465)
>         at sun.nio.ch.Net.connect(Net.java:457)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
>         at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
> It stopped our upgrade because of this issue.
>
> Thanks,
>
> Balaji
>
>
>

Reply via email to