I setup a 2 node cluster. With replication-factor 2. Create 10 topics with 131
partitions each then tried to do a controlled shutdown of broker #2 (which is
the current elected leader).
(Sorry, our Broker #1 and Broker #2 server times are 6h off, I will have to get
that fixed).
According to Zookeeper Broker #2 is still the controller:
[zk: tm1mwwm001:2181(CONNECTED) 13] get /controller
{"version":1,"brokerid":2,"timestamp":"1397726326047"}
cZxid = 0x11e
ctime = Thu Apr 17 09:18:43 MDT 2014
mZxid = 0x11e
mtime = Thu Apr 17 09:18:43 MDT 2014
pZxid = 0x11e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1456658f6770011
dataLength = 54
numChildren = 0
>From Broker #2 - server.log
[2014-04-17 03:25:59.471-0600] WARN [Kafka Server 2], Retrying controlled
shutdown after the previous attempt failed... (kafka.server.KafkaServer)
[2014-04-17 03:25:59.472-0600] WARN [Kafka Server 2], Proceeding to do an
unclean shutdown as all the controlled shutdown attempts failed
(kafka.server.Kafka
Server)
[2014-04-17 03:25:59.474-0600] INFO [Socket Server on Broker 2], Shutting down
(kafka.network.SocketServer)
[2014-04-17 03:25:59.481-0600] INFO [Socket Server on Broker 2], Shutdown
completed (kafka.network.SocketServer)
[2014-04-17 03:25:59.482-0600] INFO [Kafka Request Handler on Broker 2],
shutting down (kafka.server.KafkaRequestHandlerPool)
[2014-04-17 03:25:59.561-0600] WARN [KafkaApi-2] Fetch request with correlation
id 688 from client ReplicaFetcherThread-4-2 on partition [test8,26] failed du
e to Leader not local for partition [test8,26] on broker 2
(kafka.server.KafkaApis)
[2014-04-17 03:25:59.563-0600] WARN [KafkaApi-2] Fetch request with correlation
id 688 from client ReplicaFetcherThread-6-2 on partition [test4,96] failed du
e to Leader not local for partition [test4,96] on broker 2
(kafka.server.KafkaApis)
[2014-04-17 03:25:59.602-0600] WARN [KafkaApi-2] Fetch request with correlation
id 430 from client ReplicaFetcherThread-1-2 on partition [test3,18] failed du
e to Leader not local for partition [test3,18] on broker 2
(kafka.server.KafkaApis)
[2014-04-17 03:26:10.227-0600] INFO Partition [test9,124] on broker 2:
Shrinking ISR for partition [test9,124] from 2,1 to 2 (kafka.cluster.Partition)
[2014-04-17 03:26:10.229-0600] INFO Partition [test1,87] on broker 2: Shrinking
ISR for partition [test1,87] from 2,1 to 2 (kafka.cluster.Partition)
[2014-04-17 03:26:10.230-0600] INFO Partition [test9,68] on broker 2: Shrinking
ISR for partition [test9,68] from 2,1 to 2 (kafka.cluster.Partition)
[2014-04-17 03:26:10.232-0600] ERROR Conditional update of path
/brokers/topics/test9/partitions/68/state with data
{"controller_epoch":3,"leader":2,"version
":1,"leader_epoch":0,"isr":[2]} and expected version 0 failed due to
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion f
or /brokers/topics/test9/partitions/68/state (kafka.utils.ZkUtils$)
[2014-04-17 03:26:10.233-0600] INFO Partition [test9,68] on broker 2: Cached
zkVersion [0] not equal to that in zookeeper, skip updating ISR (kafka.cluster.P
artition)
Broker #2 - state-change.log
*only TRACE messages*
[2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending
UpdateMetadata request (Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:3) with
correlationId 1966 to broker 2 for partition [test1,65] (state.change.logger)
[2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending
UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3) with
correlationId 1966 to broker 2 for partition [test9,110] (state.change.logger)
[2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending
UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3) with
correlationId 1966 to broker 2 for partition [test10,11] (state.change.logger)
[2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending
UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3) with
correlationId 1966 to broker 2 for partition [test8,61] (state.change.logger)
Broker #2 - controller.log
[2014-04-17 03:25:59.746-0600] DEBUG The stop replica request (delete = true)
sent to broker 2 is (kafka.controller.ControllerBrokerRequestBatch)
[2014-04-17 03:25:59.746-0600] DEBUG The stop replica request (delete = false)
sent to broker 2 is [Topic=test7,Partition=123,Replica=2]
(kafka.controller.ControllerBrokerRequestBatch)
[2014-04-17 03:26:29.469-0600] WARN [Controller-2-to-broker-2-send-thread],
Controller 2 fails to send a request to broker id:2,host:tm1mwwm002,port:9092
(kafka.controller.RequestSendThread)
java.net.SocketTimeoutException
at
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
at kafka.utils.Utils$.read(Utils.scala:375)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 03:26:29.470-0600] INFO [Replica state machine on controller 2]:
Invoking state change to OfflineReplica for replicas
[Topic=test7,Partition=123,Replica=2] (kafka.controller.ReplicaStateMachine)
[2014-04-17 03:26:29.470-0600] DEBUG [Controller 2]: Removing replica 2 from
ISR 1,2 for partition [test7,123]. (kafka.controller.KafkaController)
[2014-04-17 03:26:29.471-0600] ERROR [Controller-2-to-broker-2-send-thread],
Controller 2 epoch 3 failed to send LeaderAndIsr request with correlation id
1960 to broker id:2,host:tm1mwwm002,port:9092. Reconnecting to broker.
(kafka.controller.RequestSendThread)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 03:26:29.473-0600] ERROR [Controller-2-to-broker-2-send-thread],
Controller 2's connection to broker id:2,host:tm1mwwm002,port:9092 was
unsuccessful (kafka.controller.RequestSendThread)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at
kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 03:26:29.475-0600] INFO [Controller 2]: New leader and ISR for
partition [test7,123] is {"leader":1,"leader_epoch":1,"isr":[1]}
(kafka.controller.KafkaController)
*seeing repeated messages of these*
[2014-04-17 03:31:19.939-0600] ERROR [Controller-2-to-broker-2-send-thread],
Controller 2 epoch 3 failed to send LeaderAndIsr request with correlation id
1960 to broker id:2,host:tm1mwwm002,port:9092. Reconnecting to broker.
(kafka.controller.RequestSendThread)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 03:31:19.940-0600] ERROR [Controller-2-to-broker-2-send-thread],
Controller 2's connection to broker id:2,host:tm1mwwm002,port:9092 was
unsuccessful (kafka.controller.RequestSendThread)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at
kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 03:31:20.240-0600] ERROR [Controller-2-to-broker-2-send-thread],
Controller 2 epoch 3 failed to send LeaderAndIsr request with correlation id
1960 to broker id:2,host:tm1mwwm002,port:9092. Reconnecting to broker.
(kafka.controller.RequestSendThread)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 03:31:20.240-0600] ERROR [Controller-2-to-broker-2-send-thread],
Controller 2's connection to broker id:2,host:tm1mwwm002,port:9092 was
unsuccessful (kafka.controller.RequestSendThread)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at
kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Broker #2 - server.log
[2014-04-17 03:34:20.226-0600] INFO Partition [test3,108] on broker 2:
Shrinking ISR for partition [test3,108] from 2,1 to 2 (kafka.cluster.Partition)
[2014-04-17 03:34:20.227-0600] ERROR Conditional update of path
/brokers/topics/test3/partitions/108/state with data
{"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} and
expected version 0 failed due to
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for /brokers/topics/test3/partitions/108/state (kafka.utils.ZkUtils$)
[2014-04-17 03:34:20.227-0600] INFO Partition [test3,108] on broker 2: Cached
zkVersion [0] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)
[2014-04-17 03:34:20.227-0600] INFO Partition [test5,101] on broker 2:
Shrinking ISR for partition [test5,101] from 2,1 to 2 (kafka.cluster.Partition)
[2014-04-17 03:34:20.228-0600] ERROR Conditional update of path
/brokers/topics/test5/partitions/101/state with data
{"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} and
expected version 0 failed due to
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for /brokers/topics/test5/partitions/101/state (kafka.utils.ZkUtils$)
[2014-04-17 03:34:20.228-0600] INFO Partition [test5,101] on broker 2: Cached
zkVersion [0] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)
[2014-04-17 03:34:20.229-0600] INFO Partition [test5,33] on broker 2: Shrinking
ISR for partition [test5,33] from 2,1 to 2 (kafka.cluster.Partition)
[2014-04-17 03:34:20.230-0600] ERROR Conditional update of path
/brokers/topics/test5/partitions/33/state with data
{"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} and
expected version 0 failed due to
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for /brokers/topics/test5/partitions/33/state (kafka.utils.ZkUtils$)
[2014-04-17 03:34:20.230-0600] INFO Partition [test5,33] on broker 2: Cached
zkVersion [0] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)
Broker #1 - server.log
[2014-04-17 09:25:56.833-0600] INFO [ReplicaFetcherManager on broker 1] Removed
fetcher for partitions [test8,26] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:25:56.887-0600] INFO [ReplicaFetcherManager on broker 1] Removed
fetcher for partitions [test3,18] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:25:56.945-0600] INFO [ReplicaFetcherManager on broker 1] Removed
fetcher for partitions [test5,33] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:25:56.995-0600] INFO [ReplicaFetcherManager on broker 1] Removed
fetcher for partitions [test9,68] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:25:57.045-0600] INFO [ReplicaFetcherManager on broker 1] Removed
fetcher for partitions [test3,108] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:25:57.094-0600] INFO [ReplicaFetcherManager on broker 1] Removed
fetcher for partitions [test6,1] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:25:57.144-0600] INFO [ReplicaFetcherManager on broker 1] Removed
fetcher for partitions [test5,101] (kafka.server.ReplicaFetcherManager)
[2014-04-17 09:26:26.465-0600] WARN Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)
[2014-04-17 09:26:26.466-0600] WARN Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)
[2014-04-17 09:26:26.467-0600] WARN Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)
[2014-04-17 09:26:26.473-0600] ERROR [ReplicaFetcherThread-4-2], Error in fetch
Name: FetchRequest; Version: 0; CorrelationId: 688; ClientId:
ReplicaFetcherThread-4-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes;
RequestInfo: [test4,126] -> PartitionFetchInfo(0,10485760),[test8,66] ->
PartitionFetchInfo(0,10485760),[test8,26] ->
PartitionFetchInfo(0,10485760),[test4,22] -> PartitionFetchInfo(0,10485760)
(kafka.server.ReplicaFetcherThread)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
*lots of spamming of these type of log entries*
[2014-04-17 09:37:14.524-0600] WARN Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)
[2014-04-17 09:37:14.524-0600] ERROR [ReplicaFetcherThread-0-2], Error in fetch
Name: FetchRequest; Version: 0; CorrelationId: 546078; ClientId:
ReplicaFetcherThread-0-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes;
RequestInfo: [test5,67] -> PartitionFetchInfo(0,10485760),[test4,122] ->
PartitionFetchInfo(0,10485760),[test1,79] ->
PartitionFetchInfo(0,10485760),[test1,127] ->
PartitionFetchInfo(0,10485760),[test1,87] -> PartitionFetchInfo(0,10485760)
(kafka.server.ReplicaFetcherThread)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 09:37:14.524-0600] WARN Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)
[2014-04-17 09:37:14.524-0600] ERROR [ReplicaFetcherThread-1-2], Error in fetch
Name: FetchRequest; Version: 0; CorrelationId: 561475; ClientId:
ReplicaFetcherThread-1-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes;
RequestInfo: [test3,10] -> PartitionFetchInfo(0,10485760),[test10,90] ->
PartitionFetchInfo(0,10485760) (kafka.server.ReplicaFetcherThread)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Broker #1 - state-change.log
*All TRACE messages*
[2014-04-17 09:25:57.195-0600] TRACE Broker 1 cached leader info
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3),ReplicationFactor:2),AllReplicas:1,2)
for partition [test10,11] in response to UpdateMetadata request sent by
controller 2 epoch 3 with correlation id 1964 (state.change.logger)
[2014-04-17 09:25:57.195-0600] TRACE Broker 1 cached leader info
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3),ReplicationFactor:2),AllReplicas:1,2)
for partition [test8,61] in response to UpdateMetadata request sent by
controller 2 epoch 3 with correlation id 1964 (state.change.logger)
[2014-04-17 09:26:26.875-0600] TRACE Broker 1 received LeaderAndIsr request
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3),ReplicationFactor:2),AllReplicas:1,2)
correlation id 1966 from controller 2 epoch 3 for partition [test7,123]
(state.change.logger)
[2014-04-17 09:26:26.875-0600] TRACE Broker 1 handling LeaderAndIsr request
correlationId 1966 from controller 2 epoch 3 starting the become-leader
transition for partition [test7,123] (state.change.logger)
[2014-04-17 09:26:26.877-0600] TRACE Broker 1 stopped fetchers as part of
become-leader request from controller 2 epoch 3 with correlation id 1966 for
partition [test7,123] (state.change.logger)
[2014-04-17 09:26:26.877-0600] TRACE Broker 1 completed LeaderAndIsr request
correlationId 1966 from controller 2 epoch 3 for the become-leader transition
for partition [test7,123] (state.change.logger)
Broker #1 - controller.log
[2014-04-17 09:18:42.887-0600] DEBUG [Partition state machine on Controller 1]:
After leader election, leader cache is updated to Map([test_topic,0] ->
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,7] ->
(Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,4] ->
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,10] ->
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,2] ->
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,3] ->
(Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,9] ->
(Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,6] ->
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,12] ->
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,8] ->
(Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,1] ->
(Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,11] ->
(Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2), [test_topic,5] ->
(Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2))
(kafka.controller.PartitionStateMachine)
[2014-04-17 09:18:43.514-0600] INFO [Controller-1-to-broker-2-send-thread],
Shutting down (kafka.controller.RequestSendThread)
[2014-04-17 09:18:43.515-0600] WARN [Controller-1-to-broker-2-send-thread],
Controller 1 fails to send a request to broker id:2,host:tm1mwwm002,port:9092
(kafka.controller.RequestSendThread)
java.nio.channels.AsynchronousCloseException
at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
at kafka.utils.Utils$.read(Utils.scala:375)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-04-17 09:18:43.515-0600] INFO [Controller-1-to-broker-2-send-thread],
Stopped (kafka.controller.RequestSendThread)
[2014-04-17 09:18:43.515-0600] INFO [Controller-1-to-broker-2-send-thread],
Shutdown completed (kafka.controller.RequestSendThread)
[2014-04-17 09:18:43.516-0600] INFO [Controller-1-to-broker-1-send-thread],
Shutting down (kafka.controller.RequestSendThread)
[2014-04-17 09:18:43.516-0600] INFO [Controller-1-to-broker-1-send-thread],
Stopped (kafka.controller.RequestSendThread)
[2014-04-17 09:18:43.516-0600] INFO [Controller-1-to-broker-1-send-thread],
Shutdown completed (kafka.controller.RequestSendThread)
[2014-04-17 09:18:43.517-0600] INFO [Controller 1]: Controller shutdown
complete (kafka.controller.KafkaController)
[2014-04-17 09:20:09.860-0600] INFO [ControllerEpochListener on 1]: Initialized
controller epoch to 3 and zk version 2
(kafka.controller.ControllerEpochListener)
[2014-04-17 09:20:09.904-0600] INFO [Controller 1]: Controller starting up
(kafka.controller.KafkaController)
[2014-04-17 09:20:09.961-0600] INFO [Controller 1]: Controller startup complete
(kafka.controller.KafkaController)
- Bob
-----Original Message-----
From: Seshadri, Balaji
Sent: Thursday, April 17, 2014 7:54 AM
To: [email protected]; Bello, Bob
Subject: RE: Controller is not being failed over 0.8.1
The ZK data was showing controller as still broker-1.
@Bob,
Can you please send errors you see in the state-change.log.
Thanks,
Balaji
________________________________________
From: Jun Rao [[email protected]]
Sent: Wednesday, April 16, 2014 9:37 PM
To: [email protected]
Subject: Re: Controller is not being failed over 0.8.1
What's the controller value in the zk path (see
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper)?
Any error in the controller/state-change log?
Thanks,
Jun
On Wed, Apr 16, 2014 at 10:07 AM, Seshadri, Balaji <[email protected]
> wrote:
> Hi,
>
> We got the following error spamming the logs when broker 1 is the
> controller and we are shutting it down in controlled manner not kill -9.
>
> The leader being switched to broker 2 for all partitions but controller is
> not being failed over to broker 2.
>
> [2014-04-16 10:48:47.976-0600] ERROR
> [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker
> id:1,host:tm1-kafkabroker101,port:9092 was unsuccessful
> (kafka.controller.RequestSendThread)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
> at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
> at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-16 10:48:48.276-0600] ERROR
> [Controller-1-to-broker-1-send-thread], Controller 1 epoch 38 failed to
> send LeaderAndIsr request with correlation id 1766 to broker
> id:1,host:tm1-kafkabroker101,port:9092. Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
> at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-16 10:48:48.277-0600] ERROR
> [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker
> id:1,host:tm1-kafkabroker101,port:9092 was unsuccessful
> (kafka.controller.RequestSendThread)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
> at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
> at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
> It stopped our upgrade because of this issue.
>
> Thanks,
>
> Balaji
>
>
>