I setup a 2 node cluster. With replication-factor 2. Create 10 topics with 131 partitions each then tried to do a controlled shutdown of broker #2 (which is the current elected leader).
(Sorry, our Broker #1 and Broker #2 server times are 6h off, I will have to get that fixed). According to Zookeeper Broker #2 is still the controller: [zk: tm1mwwm001:2181(CONNECTED) 13] get /controller {"version":1,"brokerid":2,"timestamp":"1397726326047"} cZxid = 0x11e ctime = Thu Apr 17 09:18:43 MDT 2014 mZxid = 0x11e mtime = Thu Apr 17 09:18:43 MDT 2014 pZxid = 0x11e cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x1456658f6770011 dataLength = 54 numChildren = 0 >From Broker #2 - server.log [2014-04-17 03:25:59.471-0600] WARN [Kafka Server 2], Retrying controlled shutdown after the previous attempt failed... (kafka.server.KafkaServer) [2014-04-17 03:25:59.472-0600] WARN [Kafka Server 2], Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed (kafka.server.Kafka Server) [2014-04-17 03:25:59.474-0600] INFO [Socket Server on Broker 2], Shutting down (kafka.network.SocketServer) [2014-04-17 03:25:59.481-0600] INFO [Socket Server on Broker 2], Shutdown completed (kafka.network.SocketServer) [2014-04-17 03:25:59.482-0600] INFO [Kafka Request Handler on Broker 2], shutting down (kafka.server.KafkaRequestHandlerPool) [2014-04-17 03:25:59.561-0600] WARN [KafkaApi-2] Fetch request with correlation id 688 from client ReplicaFetcherThread-4-2 on partition [test8,26] failed du e to Leader not local for partition [test8,26] on broker 2 (kafka.server.KafkaApis) [2014-04-17 03:25:59.563-0600] WARN [KafkaApi-2] Fetch request with correlation id 688 from client ReplicaFetcherThread-6-2 on partition [test4,96] failed du e to Leader not local for partition [test4,96] on broker 2 (kafka.server.KafkaApis) [2014-04-17 03:25:59.602-0600] WARN [KafkaApi-2] Fetch request with correlation id 430 from client ReplicaFetcherThread-1-2 on partition [test3,18] failed du e to Leader not local for partition [test3,18] on broker 2 (kafka.server.KafkaApis) [2014-04-17 03:26:10.227-0600] INFO Partition [test9,124] on broker 2: Shrinking ISR for partition [test9,124] from 2,1 to 2 (kafka.cluster.Partition) [2014-04-17 03:26:10.229-0600] INFO Partition [test1,87] on broker 2: Shrinking ISR for partition [test1,87] from 2,1 to 2 (kafka.cluster.Partition) [2014-04-17 03:26:10.230-0600] INFO Partition [test9,68] on broker 2: Shrinking ISR for partition [test9,68] from 2,1 to 2 (kafka.cluster.Partition) [2014-04-17 03:26:10.232-0600] ERROR Conditional update of path /brokers/topics/test9/partitions/68/state with data {"controller_epoch":3,"leader":2,"version ":1,"leader_epoch":0,"isr":[2]} and expected version 0 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion f or /brokers/topics/test9/partitions/68/state (kafka.utils.ZkUtils$) [2014-04-17 03:26:10.233-0600] INFO Partition [test9,68] on broker 2: Cached zkVersion [0] not equal to that in zookeeper, skip updating ISR (kafka.cluster.P artition) Broker #2 - state-change.log *only TRACE messages* [2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending UpdateMetadata request (Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:3) with correlationId 1966 to broker 2 for partition [test1,65] (state.change.logger) [2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3) with correlationId 1966 to broker 2 for partition [test9,110] (state.change.logger) [2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3) with correlationId 1966 to broker 2 for partition [test10,11] (state.change.logger) [2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3) with correlationId 1966 to broker 2 for partition [test8,61] (state.change.logger) Broker #2 - controller.log [2014-04-17 03:25:59.746-0600] DEBUG The stop replica request (delete = true) sent to broker 2 is (kafka.controller.ControllerBrokerRequestBatch) [2014-04-17 03:25:59.746-0600] DEBUG The stop replica request (delete = false) sent to broker 2 is [Topic=test7,Partition=123,Replica=2] (kafka.controller.ControllerBrokerRequestBatch) [2014-04-17 03:26:29.469-0600] WARN [Controller-2-to-broker-2-send-thread], Controller 2 fails to send a request to broker id:2,host:tm1mwwm002,port:9092 (kafka.controller.RequestSendThread) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-04-17 03:26:29.470-0600] INFO [Replica state machine on controller 2]: Invoking state change to OfflineReplica for replicas [Topic=test7,Partition=123,Replica=2] (kafka.controller.ReplicaStateMachine) [2014-04-17 03:26:29.470-0600] DEBUG [Controller 2]: Removing replica 2 from ISR 1,2 for partition [test7,123]. (kafka.controller.KafkaController) [2014-04-17 03:26:29.471-0600] ERROR [Controller-2-to-broker-2-send-thread], Controller 2 epoch 3 failed to send LeaderAndIsr request with correlation id 1960 to broker id:2,host:tm1mwwm002,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-04-17 03:26:29.473-0600] ERROR [Controller-2-to-broker-2-send-thread], Controller 2's connection to broker id:2,host:tm1mwwm002,port:9092 was unsuccessful (kafka.controller.RequestSendThread) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-04-17 03:26:29.475-0600] INFO [Controller 2]: New leader and ISR for partition [test7,123] is {"leader":1,"leader_epoch":1,"isr":[1]} (kafka.controller.KafkaController) *seeing repeated messages of these* [2014-04-17 03:31:19.939-0600] ERROR [Controller-2-to-broker-2-send-thread], Controller 2 epoch 3 failed to send LeaderAndIsr request with correlation id 1960 to broker id:2,host:tm1mwwm002,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-04-17 03:31:19.940-0600] ERROR [Controller-2-to-broker-2-send-thread], Controller 2's connection to broker id:2,host:tm1mwwm002,port:9092 was unsuccessful (kafka.controller.RequestSendThread) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-04-17 03:31:20.240-0600] ERROR [Controller-2-to-broker-2-send-thread], Controller 2 epoch 3 failed to send LeaderAndIsr request with correlation id 1960 to broker id:2,host:tm1mwwm002,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-04-17 03:31:20.240-0600] ERROR [Controller-2-to-broker-2-send-thread], Controller 2's connection to broker id:2,host:tm1mwwm002,port:9092 was unsuccessful (kafka.controller.RequestSendThread) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Broker #2 - server.log [2014-04-17 03:34:20.226-0600] INFO Partition [test3,108] on broker 2: Shrinking ISR for partition [test3,108] from 2,1 to 2 (kafka.cluster.Partition) [2014-04-17 03:34:20.227-0600] ERROR Conditional update of path /brokers/topics/test3/partitions/108/state with data {"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} and expected version 0 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/test3/partitions/108/state (kafka.utils.ZkUtils$) [2014-04-17 03:34:20.227-0600] INFO Partition [test3,108] on broker 2: Cached zkVersion [0] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) [2014-04-17 03:34:20.227-0600] INFO Partition [test5,101] on broker 2: Shrinking ISR for partition [test5,101] from 2,1 to 2 (kafka.cluster.Partition) [2014-04-17 03:34:20.228-0600] ERROR Conditional update of path /brokers/topics/test5/partitions/101/state with data {"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} and expected version 0 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/test5/partitions/101/state (kafka.utils.ZkUtils$) [2014-04-17 03:34:20.228-0600] INFO Partition [test5,101] on broker 2: Cached zkVersion [0] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) [2014-04-17 03:34:20.229-0600] INFO Partition [test5,33] on broker 2: Shrinking ISR for partition [test5,33] from 2,1 to 2 (kafka.cluster.Partition) [2014-04-17 03:34:20.230-0600] ERROR Conditional update of path /brokers/topics/test5/partitions/33/state with data {"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} and expected version 0 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/test5/partitions/33/state (kafka.utils.ZkUtils$) [2014-04-17 03:34:20.230-0600] INFO Partition [test5,33] on broker 2: Cached zkVersion [0] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) Broker #1 - server.log [2014-04-17 09:25:56.833-0600] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [test8,26] (kafka.server.ReplicaFetcherManager) [2014-04-17 09:25:56.887-0600] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [test3,18] (kafka.server.ReplicaFetcherManager) [2014-04-17 09:25:56.945-0600] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [test5,33] (kafka.server.ReplicaFetcherManager) [2014-04-17 09:25:56.995-0600] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [test9,68] (kafka.server.ReplicaFetcherManager) [2014-04-17 09:25:57.045-0600] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [test3,108] (kafka.server.ReplicaFetcherManager) [2014-04-17 09:25:57.094-0600] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [test6,1] (kafka.server.ReplicaFetcherManager) [2014-04-17 09:25:57.144-0600] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [test5,101] (kafka.server.ReplicaFetcherManager) [2014-04-17 09:26:26.465-0600] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer) [2014-04-17 09:26:26.466-0600] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer) [2014-04-17 09:26:26.467-0600] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer) [2014-04-17 09:26:26.473-0600] ERROR [ReplicaFetcherThread-4-2], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 688; ClientId: ReplicaFetcherThread-4-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test4,126] -> PartitionFetchInfo(0,10485760),[test8,66] -> PartitionFetchInfo(0,10485760),[test8,26] -> PartitionFetchInfo(0,10485760),[test4,22] -> PartitionFetchInfo(0,10485760) (kafka.server.ReplicaFetcherThread) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) *lots of spamming of these type of log entries* [2014-04-17 09:37:14.524-0600] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer) [2014-04-17 09:37:14.524-0600] ERROR [ReplicaFetcherThread-0-2], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 546078; ClientId: ReplicaFetcherThread-0-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test5,67] -> PartitionFetchInfo(0,10485760),[test4,122] -> PartitionFetchInfo(0,10485760),[test1,79] -> PartitionFetchInfo(0,10485760),[test1,127] -> PartitionFetchInfo(0,10485760),[test1,87] -> PartitionFetchInfo(0,10485760) (kafka.server.ReplicaFetcherThread) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-04-17 09:37:14.524-0600] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer) [2014-04-17 09:37:14.524-0600] ERROR [ReplicaFetcherThread-1-2], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 561475; ClientId: ReplicaFetcherThread-1-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test3,10] -> PartitionFetchInfo(0,10485760),[test10,90] -> PartitionFetchInfo(0,10485760) (kafka.server.ReplicaFetcherThread) java.net.ConnectException: Connection refused at sun.nio.ch.Net.connect0(Native Method) at sun.nio.ch.Net.connect(Net.java:465) at sun.nio.ch.Net.connect(Net.java:457) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Broker #1 - state-change.log *All TRACE messages* [2014-04-17 09:25:57.195-0600] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3),ReplicationFactor:2),AllReplicas:1,2) for partition [test10,11] in response to UpdateMetadata request sent by controller 2 epoch 3 with correlation id 1964 (state.change.logger) [2014-04-17 09:25:57.195-0600] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3),ReplicationFactor:2),AllReplicas:1,2) for partition [test8,61] in response to UpdateMetadata request sent by controller 2 epoch 3 with correlation id 1964 (state.change.logger) [2014-04-17 09:26:26.875-0600] TRACE Broker 1 received LeaderAndIsr request (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3),ReplicationFactor:2),AllReplicas:1,2) correlation id 1966 from controller 2 epoch 3 for partition [test7,123] (state.change.logger) [2014-04-17 09:26:26.875-0600] TRACE Broker 1 handling LeaderAndIsr request correlationId 1966 from controller 2 epoch 3 starting the become-leader transition for partition [test7,123] (state.change.logger) [2014-04-17 09:26:26.877-0600] TRACE Broker 1 stopped fetchers as part of become-leader request from controller 2 epoch 3 with correlation id 1966 for partition [test7,123] (state.change.logger) [2014-04-17 09:26:26.877-0600] TRACE Broker 1 completed LeaderAndIsr request correlationId 1966 from controller 2 epoch 3 for the become-leader transition for partition [test7,123] (state.change.logger) Broker #1 - controller.log [2014-04-17 09:18:42.887-0600] DEBUG [Partition state machine on Controller 1]: After leader election, leader cache is updated to Map([test_topic,0] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,7] -> (Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,4] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,10] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,2] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,3] -> (Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,9] -> (Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,6] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,12] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,8] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,1] -> (Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,11] -> (Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,5] -> (Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2)) (kafka.controller.PartitionStateMachine) [2014-04-17 09:18:43.514-0600] INFO [Controller-1-to-broker-2-send-thread], Shutting down (kafka.controller.RequestSendThread) [2014-04-17 09:18:43.515-0600] WARN [Controller-1-to-broker-2-send-thread], Controller 1 fails to send a request to broker id:2,host:tm1mwwm002,port:9092 (kafka.controller.RequestSendThread) java.nio.channels.AsynchronousCloseException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2014-04-17 09:18:43.515-0600] INFO [Controller-1-to-broker-2-send-thread], Stopped (kafka.controller.RequestSendThread) [2014-04-17 09:18:43.515-0600] INFO [Controller-1-to-broker-2-send-thread], Shutdown completed (kafka.controller.RequestSendThread) [2014-04-17 09:18:43.516-0600] INFO [Controller-1-to-broker-1-send-thread], Shutting down (kafka.controller.RequestSendThread) [2014-04-17 09:18:43.516-0600] INFO [Controller-1-to-broker-1-send-thread], Stopped (kafka.controller.RequestSendThread) [2014-04-17 09:18:43.516-0600] INFO [Controller-1-to-broker-1-send-thread], Shutdown completed (kafka.controller.RequestSendThread) [2014-04-17 09:18:43.517-0600] INFO [Controller 1]: Controller shutdown complete (kafka.controller.KafkaController) [2014-04-17 09:20:09.860-0600] INFO [ControllerEpochListener on 1]: Initialized controller epoch to 3 and zk version 2 (kafka.controller.ControllerEpochListener) [2014-04-17 09:20:09.904-0600] INFO [Controller 1]: Controller starting up (kafka.controller.KafkaController) [2014-04-17 09:20:09.961-0600] INFO [Controller 1]: Controller startup complete (kafka.controller.KafkaController) - Bob -----Original Message----- From: Seshadri, Balaji Sent: Thursday, April 17, 2014 7:54 AM To: users@kafka.apache.org; Bello, Bob Subject: RE: Controller is not being failed over 0.8.1 The ZK data was showing controller as still broker-1. @Bob, Can you please send errors you see in the state-change.log. Thanks, Balaji ________________________________________ From: Jun Rao [jun...@gmail.com] Sent: Wednesday, April 16, 2014 9:37 PM To: users@kafka.apache.org Subject: Re: Controller is not being failed over 0.8.1 What's the controller value in the zk path (see https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper)? Any error in the controller/state-change log? Thanks, Jun On Wed, Apr 16, 2014 at 10:07 AM, Seshadri, Balaji <balaji.sesha...@dish.com > wrote: > Hi, > > We got the following error spamming the logs when broker 1 is the > controller and we are shutting it down in controlled manner not kill -9. > > The leader being switched to broker 2 for all partitions but controller is > not being failed over to broker 2. > > [2014-04-16 10:48:47.976-0600] ERROR > [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker > id:1,host:tm1-kafkabroker101,port:9092 was unsuccessful > (kafka.controller.RequestSendThread) > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at > kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173) > at > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > [2014-04-16 10:48:48.276-0600] ERROR > [Controller-1-to-broker-1-send-thread], Controller 1 epoch 38 failed to > send LeaderAndIsr request with correlation id 1766 to broker > id:1,host:tm1-kafkabroker101,port:9092. Reconnecting to broker. > (kafka.controller.RequestSendThread) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) > at > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > [2014-04-16 10:48:48.277-0600] ERROR > [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker > id:1,host:tm1-kafkabroker101,port:9092 was unsuccessful > (kafka.controller.RequestSendThread) > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at > kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173) > at > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > It stopped our upgrade because of this issue. > > Thanks, > > Balaji > > >