The following suggest that the controlled shutdown failed. Since you are
many partitions, you probably need to increase
controlled.shutdown.retry.backoff.ms and controlled.shutdown.max.retries to
give enough time for the controlled shutdown to complete. Also, try
increasing controller.socket.timeout.ms since you had SocketTimeoutException in
the controller.

[2014-04-17 03:25:59.471-0600] WARN [Kafka Server 2], Retrying controlled
shutdown after the previous attempt failed... (kafka.server.KafkaServer)
[2014-04-17 03:25:59.472-0600] WARN [Kafka Server 2], Proceeding to do an
unclean shutdown as all the controlled shutdown attempts failed
(kafka.server.Kafka

Thanks,

Jun


On Thu, Apr 17, 2014 at 8:44 AM, Bello, Bob <bob.be...@dish.com> wrote:

> I setup a 2 node cluster. With replication-factor 2. Create 10 topics with
> 131 partitions each then tried to do a controlled shutdown of broker #2
> (which is the current elected leader).
>
> (Sorry, our Broker #1 and Broker #2 server times are 6h off, I will have
> to get that fixed).
>
>
> According to Zookeeper Broker #2 is still the controller:
>
> [zk: tm1mwwm001:2181(CONNECTED) 13] get /controller
> {"version":1,"brokerid":2,"timestamp":"1397726326047"}
> cZxid = 0x11e
> ctime = Thu Apr 17 09:18:43 MDT 2014
> mZxid = 0x11e
> mtime = Thu Apr 17 09:18:43 MDT 2014
> pZxid = 0x11e
> cversion = 0
> dataVersion = 0
> aclVersion = 0
> ephemeralOwner = 0x1456658f6770011
> dataLength = 54
> numChildren = 0
>
>
> From Broker #2 - server.log
>
> [2014-04-17 03:25:59.471-0600] WARN [Kafka Server 2], Retrying controlled
> shutdown after the previous attempt failed... (kafka.server.KafkaServer)
> [2014-04-17 03:25:59.472-0600] WARN [Kafka Server 2], Proceeding to do an
> unclean shutdown as all the controlled shutdown attempts failed
> (kafka.server.Kafka
> Server)
> [2014-04-17 03:25:59.474-0600] INFO [Socket Server on Broker 2], Shutting
> down (kafka.network.SocketServer)
> [2014-04-17 03:25:59.481-0600] INFO [Socket Server on Broker 2], Shutdown
> completed (kafka.network.SocketServer)
> [2014-04-17 03:25:59.482-0600] INFO [Kafka Request Handler on Broker 2],
> shutting down (kafka.server.KafkaRequestHandlerPool)
> [2014-04-17 03:25:59.561-0600] WARN [KafkaApi-2] Fetch request with
> correlation id 688 from client ReplicaFetcherThread-4-2 on partition
> [test8,26] failed du
> e to Leader not local for partition [test8,26] on broker 2
> (kafka.server.KafkaApis)
> [2014-04-17 03:25:59.563-0600] WARN [KafkaApi-2] Fetch request with
> correlation id 688 from client ReplicaFetcherThread-6-2 on partition
> [test4,96] failed du
> e to Leader not local for partition [test4,96] on broker 2
> (kafka.server.KafkaApis)
> [2014-04-17 03:25:59.602-0600] WARN [KafkaApi-2] Fetch request with
> correlation id 430 from client ReplicaFetcherThread-1-2 on partition
> [test3,18] failed du
> e to Leader not local for partition [test3,18] on broker 2
> (kafka.server.KafkaApis)
> [2014-04-17 03:26:10.227-0600] INFO Partition [test9,124] on broker 2:
> Shrinking ISR for partition [test9,124] from 2,1 to 2
> (kafka.cluster.Partition)
> [2014-04-17 03:26:10.229-0600] INFO Partition [test1,87] on broker 2:
> Shrinking ISR for partition [test1,87] from 2,1 to 2
> (kafka.cluster.Partition)
> [2014-04-17 03:26:10.230-0600] INFO Partition [test9,68] on broker 2:
> Shrinking ISR for partition [test9,68] from 2,1 to 2
> (kafka.cluster.Partition)
> [2014-04-17 03:26:10.232-0600] ERROR Conditional update of path
> /brokers/topics/test9/partitions/68/state with data
> {"controller_epoch":3,"leader":2,"version
> ":1,"leader_epoch":0,"isr":[2]} and expected version 0 failed due to
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
> BadVersion f
> or /brokers/topics/test9/partitions/68/state (kafka.utils.ZkUtils$)
> [2014-04-17 03:26:10.233-0600] INFO Partition [test9,68] on broker 2:
> Cached zkVersion [0] not equal to that in zookeeper, skip updating ISR
> (kafka.cluster.P
> artition)
>
> Broker #2 - state-change.log
> *only TRACE messages*
>
> [2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending
> UpdateMetadata request (Leader:2,ISR:2,1,LeaderEpoch:0,ControllerEpoch:3)
> with correlationId 1966 to broker 2 for partition [test1,65]
> (state.change.logger)
> [2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending
> UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3)
> with correlationId 1966 to broker 2 for partition [test9,110]
> (state.change.logger)
> [2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending
> UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3)
> with correlationId 1966 to broker 2 for partition [test10,11]
> (state.change.logger)
> [2014-04-17 03:26:29.517-0600] TRACE Controller 2 epoch 3 sending
> UpdateMetadata request (Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3)
> with correlationId 1966 to broker 2 for partition [test8,61]
> (state.change.logger)
>
> Broker #2 - controller.log
>
> [2014-04-17 03:25:59.746-0600] DEBUG The stop replica request (delete =
> true) sent to broker 2 is  (kafka.controller.ControllerBrokerRequestBatch)
> [2014-04-17 03:25:59.746-0600] DEBUG The stop replica request (delete =
> false) sent to broker 2 is [Topic=test7,Partition=123,Replica=2]
> (kafka.controller.ControllerBrokerRequestBatch)
> [2014-04-17 03:26:29.469-0600] WARN
> [Controller-2-to-broker-2-send-thread], Controller 2 fails to send a
> request to broker id:2,host:tm1mwwm002,port:9092
> (kafka.controller.RequestSendThread)
> java.net.SocketTimeoutException
>         at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>         at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>         at kafka.utils.Utils$.read(Utils.scala:375)
>         at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-17 03:26:29.470-0600] INFO [Replica state machine on controller
> 2]: Invoking state change to OfflineReplica for replicas
> [Topic=test7,Partition=123,Replica=2] (kafka.controller.ReplicaStateMachine)
> [2014-04-17 03:26:29.470-0600] DEBUG [Controller 2]: Removing replica 2
> from ISR 1,2 for partition [test7,123]. (kafka.controller.KafkaController)
> [2014-04-17 03:26:29.471-0600] ERROR
> [Controller-2-to-broker-2-send-thread], Controller 2 epoch 3 failed to send
> LeaderAndIsr request with correlation id 1960 to broker
> id:2,host:tm1mwwm002,port:9092. Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
>         at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-17 03:26:29.473-0600] ERROR
> [Controller-2-to-broker-2-send-thread], Controller 2's connection to broker
> id:2,host:tm1mwwm002,port:9092 was unsuccessful
> (kafka.controller.RequestSendThread)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect0(Native Method)
>         at sun.nio.ch.Net.connect(Net.java:465)
>         at sun.nio.ch.Net.connect(Net.java:457)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
>         at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-17 03:26:29.475-0600] INFO [Controller 2]: New leader and ISR for
> partition [test7,123] is {"leader":1,"leader_epoch":1,"isr":[1]}
> (kafka.controller.KafkaController)
>
>
> *seeing repeated messages of these*
>
> [2014-04-17 03:31:19.939-0600] ERROR
> [Controller-2-to-broker-2-send-thread], Controller 2 epoch 3 failed to send
> LeaderAndIsr request with correlation id 1960 to broker
> id:2,host:tm1mwwm002,port:9092. Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
>         at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-17 03:31:19.940-0600] ERROR
> [Controller-2-to-broker-2-send-thread], Controller 2's connection to broker
> id:2,host:tm1mwwm002,port:9092 was unsuccessful
> (kafka.controller.RequestSendThread)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect0(Native Method)
>         at sun.nio.ch.Net.connect(Net.java:465)
>         at sun.nio.ch.Net.connect(Net.java:457)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
>         at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-17 03:31:20.240-0600] ERROR
> [Controller-2-to-broker-2-send-thread], Controller 2 epoch 3 failed to send
> LeaderAndIsr request with correlation id 1960 to broker
> id:2,host:tm1mwwm002,port:9092. Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
>         at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-17 03:31:20.240-0600] ERROR
> [Controller-2-to-broker-2-send-thread], Controller 2's connection to broker
> id:2,host:tm1mwwm002,port:9092 was unsuccessful
> (kafka.controller.RequestSendThread)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect0(Native Method)
>         at sun.nio.ch.Net.connect(Net.java:465)
>         at sun.nio.ch.Net.connect(Net.java:457)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
>         at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
> Broker #2 - server.log
>
> [2014-04-17 03:34:20.226-0600] INFO Partition [test3,108] on broker 2:
> Shrinking ISR for partition [test3,108] from 2,1 to 2
> (kafka.cluster.Partition)
> [2014-04-17 03:34:20.227-0600] ERROR Conditional update of path
> /brokers/topics/test3/partitions/108/state with data
> {"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2]}
> and expected version 0 failed due to
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
> BadVersion for /brokers/topics/test3/partitions/108/state
> (kafka.utils.ZkUtils$)
> [2014-04-17 03:34:20.227-0600] INFO Partition [test3,108] on broker 2:
> Cached zkVersion [0] not equal to that in zookeeper, skip updating ISR
> (kafka.cluster.Partition)
> [2014-04-17 03:34:20.227-0600] INFO Partition [test5,101] on broker 2:
> Shrinking ISR for partition [test5,101] from 2,1 to 2
> (kafka.cluster.Partition)
> [2014-04-17 03:34:20.228-0600] ERROR Conditional update of path
> /brokers/topics/test5/partitions/101/state with data
> {"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2]}
> and expected version 0 failed due to
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
> BadVersion for /brokers/topics/test5/partitions/101/state
> (kafka.utils.ZkUtils$)
> [2014-04-17 03:34:20.228-0600] INFO Partition [test5,101] on broker 2:
> Cached zkVersion [0] not equal to that in zookeeper, skip updating ISR
> (kafka.cluster.Partition)
> [2014-04-17 03:34:20.229-0600] INFO Partition [test5,33] on broker 2:
> Shrinking ISR for partition [test5,33] from 2,1 to 2
> (kafka.cluster.Partition)
> [2014-04-17 03:34:20.230-0600] ERROR Conditional update of path
> /brokers/topics/test5/partitions/33/state with data
> {"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2]}
> and expected version 0 failed due to
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
> BadVersion for /brokers/topics/test5/partitions/33/state
> (kafka.utils.ZkUtils$)
> [2014-04-17 03:34:20.230-0600] INFO Partition [test5,33] on broker 2:
> Cached zkVersion [0] not equal to that in zookeeper, skip updating ISR
> (kafka.cluster.Partition)
>
>
>
> Broker #1 - server.log
>
> [2014-04-17 09:25:56.833-0600] INFO [ReplicaFetcherManager on broker 1]
> Removed fetcher for partitions [test8,26]
> (kafka.server.ReplicaFetcherManager)
> [2014-04-17 09:25:56.887-0600] INFO [ReplicaFetcherManager on broker 1]
> Removed fetcher for partitions [test3,18]
> (kafka.server.ReplicaFetcherManager)
> [2014-04-17 09:25:56.945-0600] INFO [ReplicaFetcherManager on broker 1]
> Removed fetcher for partitions [test5,33]
> (kafka.server.ReplicaFetcherManager)
> [2014-04-17 09:25:56.995-0600] INFO [ReplicaFetcherManager on broker 1]
> Removed fetcher for partitions [test9,68]
> (kafka.server.ReplicaFetcherManager)
> [2014-04-17 09:25:57.045-0600] INFO [ReplicaFetcherManager on broker 1]
> Removed fetcher for partitions [test3,108]
> (kafka.server.ReplicaFetcherManager)
> [2014-04-17 09:25:57.094-0600] INFO [ReplicaFetcherManager on broker 1]
> Removed fetcher for partitions [test6,1]
> (kafka.server.ReplicaFetcherManager)
> [2014-04-17 09:25:57.144-0600] INFO [ReplicaFetcherManager on broker 1]
> Removed fetcher for partitions [test5,101]
> (kafka.server.ReplicaFetcherManager)
> [2014-04-17 09:26:26.465-0600] WARN Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-04-17 09:26:26.466-0600] WARN Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-04-17 09:26:26.467-0600] WARN Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-04-17 09:26:26.473-0600] ERROR [ReplicaFetcherThread-4-2], Error in
> fetch Name: FetchRequest; Version: 0; CorrelationId: 688; ClientId:
> ReplicaFetcherThread-4-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [test4,126] -> PartitionFetchInfo(0,10485760),[test8,66] ->
> PartitionFetchInfo(0,10485760),[test8,26] ->
> PartitionFetchInfo(0,10485760),[test4,22] -> PartitionFetchInfo(0,10485760)
> (kafka.server.ReplicaFetcherThread)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect0(Native Method)
>         at sun.nio.ch.Net.connect(Net.java:465)
>         at sun.nio.ch.Net.connect(Net.java:457)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>         at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>         at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
> *lots of spamming of these type of log entries*
>
> [2014-04-17 09:37:14.524-0600] WARN Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-04-17 09:37:14.524-0600] ERROR [ReplicaFetcherThread-0-2], Error in
> fetch Name: FetchRequest; Version: 0; CorrelationId: 546078; ClientId:
> ReplicaFetcherThread-0-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [test5,67] -> PartitionFetchInfo(0,10485760),[test4,122] ->
> PartitionFetchInfo(0,10485760),[test1,79] ->
> PartitionFetchInfo(0,10485760),[test1,127] ->
> PartitionFetchInfo(0,10485760),[test1,87] -> PartitionFetchInfo(0,10485760)
> (kafka.server.ReplicaFetcherThread)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect0(Native Method)
>         at sun.nio.ch.Net.connect(Net.java:465)
>         at sun.nio.ch.Net.connect(Net.java:457)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>         at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>         at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-17 09:37:14.524-0600] WARN Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-04-17 09:37:14.524-0600] ERROR [ReplicaFetcherThread-1-2], Error in
> fetch Name: FetchRequest; Version: 0; CorrelationId: 561475; ClientId:
> ReplicaFetcherThread-1-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [test3,10] -> PartitionFetchInfo(0,10485760),[test10,90] ->
> PartitionFetchInfo(0,10485760) (kafka.server.ReplicaFetcherThread)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect0(Native Method)
>         at sun.nio.ch.Net.connect(Net.java:465)
>         at sun.nio.ch.Net.connect(Net.java:457)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>         at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>         at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
> Broker #1 - state-change.log
> *All TRACE messages*
>
>
> [2014-04-17 09:25:57.195-0600] TRACE Broker 1 cached leader info
> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3),ReplicationFactor:2),AllReplicas:1,2)
> for partition [test10,11] in response to UpdateMetadata request sent by
> controller 2 epoch 3 with correlation id 1964 (state.change.logger)
> [2014-04-17 09:25:57.195-0600] TRACE Broker 1 cached leader info
> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3),ReplicationFactor:2),AllReplicas:1,2)
> for partition [test8,61] in response to UpdateMetadata request sent by
> controller 2 epoch 3 with correlation id 1964 (state.change.logger)
> [2014-04-17 09:26:26.875-0600] TRACE Broker 1 received LeaderAndIsr
> request
> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:1,ControllerEpoch:3),ReplicationFactor:2),AllReplicas:1,2)
> correlation id 1966 from controller 2 epoch 3 for partition [test7,123]
> (state.change.logger)
> [2014-04-17 09:26:26.875-0600] TRACE Broker 1 handling LeaderAndIsr
> request correlationId 1966 from controller 2 epoch 3 starting the
> become-leader transition for partition [test7,123] (state.change.logger)
> [2014-04-17 09:26:26.877-0600] TRACE Broker 1 stopped fetchers as part of
> become-leader request from controller 2 epoch 3 with correlation id 1966
> for partition [test7,123] (state.change.logger)
> [2014-04-17 09:26:26.877-0600] TRACE Broker 1 completed LeaderAndIsr
> request correlationId 1966 from controller 2 epoch 3 for the become-leader
> transition for partition [test7,123] (state.change.logger)
>
> Broker #1 - controller.log
>
>
> [2014-04-17 09:18:42.887-0600] DEBUG [Partition state machine on
> Controller 1]: After leader election, leader cache is updated to
> Map([test_topic,0] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2),
> [test_topic,7] -> (Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2),
> [test_topic,4] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2),
> [test_topic,10] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2),
> [test_topic,2] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2),
> [test_topic,3] -> (Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2),
> [test_topic,9] -> (Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2),
> [test_topic,6] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2),
> [test_topic,12] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2),
> [test_topic,8] -> (Leader:2,ISR:2,LeaderEpoch:5,ControllerEpoch:2),
> [test_topic,1] -> (Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2),
> [test_topic,11] -> (Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2),
> [test_topic,5] -> (Leader:2,ISR:2,LeaderEpoch:4,ControllerEpoch:2))
> (kafka.controller.PartitionStateMachine)
> [2014-04-17 09:18:43.514-0600] INFO
> [Controller-1-to-broker-2-send-thread], Shutting down
> (kafka.controller.RequestSendThread)
> [2014-04-17 09:18:43.515-0600] WARN
> [Controller-1-to-broker-2-send-thread], Controller 1 fails to send a
> request to broker id:2,host:tm1mwwm002,port:9092
> (kafka.controller.RequestSendThread)
> java.nio.channels.AsynchronousCloseException
>         at
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
>         at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
>         at kafka.utils.Utils$.read(Utils.scala:375)
>         at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>         at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-04-17 09:18:43.515-0600] INFO
> [Controller-1-to-broker-2-send-thread], Stopped
>  (kafka.controller.RequestSendThread)
> [2014-04-17 09:18:43.515-0600] INFO
> [Controller-1-to-broker-2-send-thread], Shutdown completed
> (kafka.controller.RequestSendThread)
> [2014-04-17 09:18:43.516-0600] INFO
> [Controller-1-to-broker-1-send-thread], Shutting down
> (kafka.controller.RequestSendThread)
> [2014-04-17 09:18:43.516-0600] INFO
> [Controller-1-to-broker-1-send-thread], Stopped
>  (kafka.controller.RequestSendThread)
> [2014-04-17 09:18:43.516-0600] INFO
> [Controller-1-to-broker-1-send-thread], Shutdown completed
> (kafka.controller.RequestSendThread)
> [2014-04-17 09:18:43.517-0600] INFO [Controller 1]: Controller shutdown
> complete (kafka.controller.KafkaController)
> [2014-04-17 09:20:09.860-0600] INFO [ControllerEpochListener on 1]:
> Initialized controller epoch to 3 and zk version 2
> (kafka.controller.ControllerEpochListener)
> [2014-04-17 09:20:09.904-0600] INFO [Controller 1]: Controller starting up
> (kafka.controller.KafkaController)
> [2014-04-17 09:20:09.961-0600] INFO [Controller 1]: Controller startup
> complete (kafka.controller.KafkaController)
>
>
>
> - Bob
>
>
> -----Original Message-----
> From: Seshadri, Balaji
> Sent: Thursday, April 17, 2014 7:54 AM
> To: users@kafka.apache.org; Bello, Bob
> Subject: RE: Controller is not being failed over 0.8.1
>
> The ZK data was showing controller as still broker-1.
>
> @Bob,
>
> Can you please send errors you see in the state-change.log.
>
> Thanks,
>
> Balaji
> ________________________________________
> From: Jun Rao [jun...@gmail.com]
> Sent: Wednesday, April 16, 2014 9:37 PM
> To: users@kafka.apache.org
> Subject: Re: Controller is not being failed over 0.8.1
>
> What's the controller value in the zk path (see
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
> )?
> Any error in the controller/state-change log?
>
> Thanks,
>
> Jun
>
>
> On Wed, Apr 16, 2014 at 10:07 AM, Seshadri, Balaji <
> balaji.sesha...@dish.com
> > wrote:
>
> > Hi,
> >
> > We got the following error spamming the logs when broker 1 is the
> > controller and we are shutting it down in controlled manner not kill -9.
> >
> > The leader being switched to broker 2 for all partitions but controller
> is
> > not being failed over to broker 2.
> >
> > [2014-04-16 10:48:47.976-0600] ERROR
> > [Controller-1-to-broker-1-send-thread], Controller 1's connection to
> broker
> > id:1,host:tm1-kafkabroker101,port:9092 was unsuccessful
> > (kafka.controller.RequestSendThread)
> > java.net.ConnectException: Connection refused
> >         at sun.nio.ch.Net.connect0(Native Method)
> >         at sun.nio.ch.Net.connect(Net.java:465)
> >         at sun.nio.ch.Net.connect(Net.java:457)
> >         at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> >         at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> >         at
> >
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
> >         at
> >
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
> >         at
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> >         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-04-16 10:48:48.276-0600] ERROR
> > [Controller-1-to-broker-1-send-thread], Controller 1 epoch 38 failed to
> > send LeaderAndIsr request with correlation id 1766 to broker
> > id:1,host:tm1-kafkabroker101,port:9092. Reconnecting to broker.
> > (kafka.controller.RequestSendThread)
> > java.nio.channels.ClosedChannelException
> >         at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> >         at
> >
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
> >         at
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> >         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > [2014-04-16 10:48:48.277-0600] ERROR
> > [Controller-1-to-broker-1-send-thread], Controller 1's connection to
> broker
> > id:1,host:tm1-kafkabroker101,port:9092 was unsuccessful
> > (kafka.controller.RequestSendThread)
> > java.net.ConnectException: Connection refused
> >         at sun.nio.ch.Net.connect0(Native Method)
> >         at sun.nio.ch.Net.connect(Net.java:465)
> >         at sun.nio.ch.Net.connect(Net.java:457)
> >         at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> >         at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> >         at
> >
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
> >         at
> >
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
> >         at
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> >         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> >
> > It stopped our upgrade because of this issue.
> >
> > Thanks,
> >
> > Balaji
> >
> >
> >
>

Reply via email to