We fixed a couple issues related to automatic leader balancing and controlled shutdown. Would you mind trying out 0.8.2-beta?
On Fri, Nov 7, 2014 at 11:52 AM, Solon Gordon <so...@knewton.com> wrote: > We're using 0.8.1.1 with auto.leader.rebalance.enable=true. > > On Fri, Nov 7, 2014 at 2:35 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Solon, > > > > Which version of Kafka are you running and are you enabling auto leader > > rebalance at the same time? > > > > Guozhang > > > > On Fri, Nov 7, 2014 at 8:41 AM, Solon Gordon <so...@knewton.com> wrote: > > > > > Hi all, > > > > > > My team has observed that if a broker process is killed in the middle > of > > > the controlled shutdown procedure, the remaining brokers start spewing > > > errors and do not properly rebalance leadership. The cluster cannot > > recover > > > without major manual intervention. > > > > > > Here is how to reproduce the problem: > > > 1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call them > A, > > > B, and C.) Set controlled.shutdown.enable=true. > > > 2. Create a topic with replication_factor = 3 and a large number of > > > partitions (say 100). > > > 3. Send a TERM signal to broker A. This initiates controlled shutdown. > > > 4. Before controlled shutdown completes, quickly send a KILL signal to > > > broker A. > > > > > > Result: > > > - Brokers B and C start logging ReplicaFetcherThread connection errors > > > every few milliseconds. (See below for an example.) > > > - Broker A is still listed as a leader and ISR for any partitions which > > > were not transferred during controlled shutdown. This causes connection > > > errors when clients try to produce to or consume from these partitions. > > > > > > This scenario is difficult to recover from. The only ways we have found > > are > > > to restart broker A multiple times (if it still exists) or to kill > both B > > > and C and then start them one by one. Without this kind of > intervention, > > > the above issues persist indefinitely. > > > > > > This may sound like a contrived scenario, but it's exactly what we have > > > seen when a Kafka EC2 instance gets terminated by AWS. So this seems > > like a > > > real liability. > > > > > > Are there any existing JIRA tickets which cover this behavior? And do > you > > > have any recommendations for avoiding it, other than forsaking > controlled > > > shutdowns entirely? > > > > > > Thanks, > > > Solon > > > > > > Error example: > > > [2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225], > > Error > > > in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500; ClientId: > > > ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait: 500 > > ms; > > > MinBytes: 1 bytes; RequestInfo: [my-topic,42] -> > > > PartitionFetchInfo(503,10485760),[my-topic,63] -> > > > PartitionFetchInfo(386,10485760),[my-topic,99] -> > > > PartitionFetchInfo(525,10485760),[my-topic,84] -> > > > PartitionFetchInfo(436,10485760),[my-topic,48] -> > > > PartitionFetchInfo(484,10485760),[my-topic,75] -> > > > PartitionFetchInfo(506,10485760),[my-topic,45] -> > > > PartitionFetchInfo(473,10485760),[my-topic,66] -> > > > PartitionFetchInfo(532,10485760),[my-topic,30] -> > > > PartitionFetchInfo(435,10485760),[my-topic,96] -> > > > PartitionFetchInfo(517,10485760),[my-topic,27] -> > > > PartitionFetchInfo(470,10485760),[my-topic,36] -> > > > PartitionFetchInfo(472,10485760),[my-topic,9] -> > > > PartitionFetchInfo(514,10485760),[my-topic,33] -> > > > PartitionFetchInfo(582,10485760),[my-topic,69] -> > > > PartitionFetchInfo(504,10485760),[my-topic,57] -> > > > PartitionFetchInfo(444,10485760),[my-topic,78] -> > > > PartitionFetchInfo(559,10485760),[my-topic,12] -> > > > PartitionFetchInfo(417,10485760),[my-topic,90] -> > > > PartitionFetchInfo(429,10485760),[my-topic,18] -> > > > PartitionFetchInfo(497,10485760),[my-topic,0] -> > > > PartitionFetchInfo(402,10485760),[my-topic,6] -> > > > PartitionFetchInfo(527,10485760),[my-topic,54] -> > > > PartitionFetchInfo(524,10485760),[my-topic,15] -> > > > PartitionFetchInfo(448,10485760),[console,0] -> > > > PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread) > > > java.net.ConnectException: Connection refused > > > at sun.nio.ch.Net.connect0(Native Method) > > > at sun.nio.ch.Net.connect(Net.java:465) > > > at sun.nio.ch.Net.connect(Net.java:457) > > > at > > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) > > > at > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > at > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) > > > at > > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) > > > at > > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > > > at > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > at > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) > > > at > > > > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > > > at > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > at > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > [2014-11-06 17:10:21,462] WARN Reconnect due to socket error: null > > > (kafka.consumer.SimpleConsumer) > > > > > > We also see these errors repeatedly in the controller log: > > > [2014-11-06 21:37:50,945] ERROR > > > [Controller-1359390395-to-broker-1978259225-send-thread], Controller > > > 1359390395 epoch 6 failed to send StopReplica request with correlation > id > > > 118 to broker > id:1978259225,host:ip-10-164-59-90.ec2.internal,port:9092. > > > Reconnecting to broker. (kafka.controller.RequestSendThread) > > > java.nio.channels.ClosedChannelException > > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) > > > at > > > > > > > > > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) > > > at > > > > > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > [2014-11-06 21:37:50,947] ERROR > > > [Controller-1359390395-to-broker-1978259225-send-thread], Controller > > > 1359390395's connection to broker > > > id:1978259225,host:ip-10-164-59-90.ec2.internal,port:9092 was > > unsuccessful > > > (kafka.controller.RequestSendThread) > > > java.net.ConnectException: Connection refused > > > at sun.nio.ch.Net.connect0(Native Method) > > > at sun.nio.ch.Net.connect(Net.java:465) > > > at sun.nio.ch.Net.connect(Net.java:457) > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > at > > > > > > > > > kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173) > > > at > > > > > > > > > kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140) > > > at > > > > > > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > > > > > > > -- > > -- Guozhang > > >