Solon,

Which version of Kafka are you running and are you enabling auto leader
rebalance at the same time?

Guozhang

On Fri, Nov 7, 2014 at 8:41 AM, Solon Gordon <so...@knewton.com> wrote:

> Hi all,
>
> My team has observed that if a broker process is killed in the middle of
> the controlled shutdown procedure, the remaining brokers start spewing
> errors and do not properly rebalance leadership. The cluster cannot recover
> without major manual intervention.
>
> Here is how to reproduce the problem:
> 1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call them A,
> B, and C.) Set controlled.shutdown.enable=true.
> 2. Create a topic with replication_factor = 3 and a large number of
> partitions (say 100).
> 3. Send a TERM signal to broker A. This initiates controlled shutdown.
> 4. Before controlled shutdown completes, quickly send a KILL signal to
> broker A.
>
> Result:
> - Brokers B and C start logging ReplicaFetcherThread connection errors
> every few milliseconds. (See below for an example.)
> - Broker A is still listed as a leader and ISR for any partitions which
> were not transferred during controlled shutdown. This causes connection
> errors when clients try to produce to or consume from these partitions.
>
> This scenario is difficult to recover from. The only ways we have found are
> to restart broker A multiple times (if it still exists) or to kill both B
> and C and then start them one by one. Without this kind of intervention,
> the above issues persist indefinitely.
>
> This may sound like a contrived scenario, but it's exactly what we have
> seen when a Kafka EC2 instance gets terminated by AWS. So this seems like a
> real liability.
>
> Are there any existing JIRA tickets which cover this behavior? And do you
> have any recommendations for avoiding it, other than forsaking controlled
> shutdowns entirely?
>
> Thanks,
> Solon
>
> Error example:
> [2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225], Error
> in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500; ClientId:
> ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait: 500 ms;
> MinBytes: 1 bytes; RequestInfo: [my-topic,42] ->
> PartitionFetchInfo(503,10485760),[my-topic,63] ->
> PartitionFetchInfo(386,10485760),[my-topic,99] ->
> PartitionFetchInfo(525,10485760),[my-topic,84] ->
> PartitionFetchInfo(436,10485760),[my-topic,48] ->
> PartitionFetchInfo(484,10485760),[my-topic,75] ->
> PartitionFetchInfo(506,10485760),[my-topic,45] ->
> PartitionFetchInfo(473,10485760),[my-topic,66] ->
> PartitionFetchInfo(532,10485760),[my-topic,30] ->
> PartitionFetchInfo(435,10485760),[my-topic,96] ->
> PartitionFetchInfo(517,10485760),[my-topic,27] ->
> PartitionFetchInfo(470,10485760),[my-topic,36] ->
> PartitionFetchInfo(472,10485760),[my-topic,9] ->
> PartitionFetchInfo(514,10485760),[my-topic,33] ->
> PartitionFetchInfo(582,10485760),[my-topic,69] ->
> PartitionFetchInfo(504,10485760),[my-topic,57] ->
> PartitionFetchInfo(444,10485760),[my-topic,78] ->
> PartitionFetchInfo(559,10485760),[my-topic,12] ->
> PartitionFetchInfo(417,10485760),[my-topic,90] ->
> PartitionFetchInfo(429,10485760),[my-topic,18] ->
> PartitionFetchInfo(497,10485760),[my-topic,0] ->
> PartitionFetchInfo(402,10485760),[my-topic,6] ->
> PartitionFetchInfo(527,10485760),[my-topic,54] ->
> PartitionFetchInfo(524,10485760),[my-topic,15] ->
> PartitionFetchInfo(448,10485760),[console,0] ->
> PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.Net.connect0(Native Method)
>         at sun.nio.ch.Net.connect(Net.java:465)
>         at sun.nio.ch.Net.connect(Net.java:457)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>         at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>         at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>         at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-11-06 17:10:21,462] WARN Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
>
> We also see these errors repeatedly in the controller log:
> [2014-11-06 21:37:50,945] ERROR
> [Controller-1359390395-to-broker-1978259225-send-thread], Controller
> 1359390395 epoch 6 failed to send StopReplica request with correlation id
> 118 to broker id:1978259225,host:ip-10-164-59-90.ec2.internal,port:9092.
> Reconnecting to broker. (kafka.controller.RequestSendThread)
> java.nio.channels.ClosedChannelException
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
>   at
>
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
>   at
>
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2014-11-06 21:37:50,947] ERROR
> [Controller-1359390395-to-broker-1978259225-send-thread], Controller
> 1359390395's connection to broker
> id:1978259225,host:ip-10-164-59-90.ec2.internal,port:9092 was unsuccessful
> (kafka.controller.RequestSendThread)
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.Net.connect0(Native Method)
>   at sun.nio.ch.Net.connect(Net.java:465)
>   at sun.nio.ch.Net.connect(Net.java:457)
>   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>   at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>   at
>
> kafka.controller.RequestSendThread.connectToBroker(ControllerChannelManager.scala:173)
>   at
>
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:140)
>   at
>
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>



-- 
-- Guozhang

Reply via email to