Ok, I've reproduced this again and made sure to grab the broker logs before
the instance are terminated. I posted a writeup with what seemed like the
relevant bits of the logs here:
https://gist.github.com/lukesteensen/793a467a058af51a7047

To summarize, it looks like Gwen was correct and the broker that answered
the metadata request hadn't yet processed the leader change. While this
race condition is not-great, there seem to be some producer behaviors that
turned this into a much larger issue:

1. No explicit connection timeouts
2. Requests are expired from the RecordAccumluator after request.timeout.ms
instead of max.block.ms

It seems the correct way to handle this would be for connection attempts to
timeout after request.timeout.ms, causing a metadata update and allowing
requests stuck in the RecordAccumulator to be retried as long as they are
still under max.block.ms.

Thanks,
Luke


On Thu, Jan 14, 2016 at 11:48 AM, Luke Steensen <
luke.steen...@braintreepayments.com> wrote:

> I don't have broker logs at the moment, but I'll work on getting some I
> can share. We are running 0.9.0.0 for both the brokers and producer in this
> case. I've pasted some bits from the producer log below in case that's
> helpful. Of particular note is how long it takes for the second disconnect
> to occur when the connection times out. During that time, all produce
> requests to partition 0 were being expired in the RecordAccumulator.
>
> DEBUG [2016-01-05 21:42:49,086] Updated cluster metadata
> Cluster(nodes = [Node(1, ip-10-42-21-178.us-west-2.compute.internal,
> 9092), Node(0, ip-10-42-18-0.us-west-2.compute.internal, 9092), Node(2,
> ip-10-42-26-175.us-west-2.compute.internal, 9092)], partitions = [
> Partition(topic = foo, partition = 7, leader = 1, replicas = [1,2,0,], isr
> = [1,2,0,],
> Partition(topic = foo, partition = 6, leader = 0, replicas = [0,1,2,], isr
> = [0,1,2,],
> Partition(topic = foo, partition = 5, leader = 2, replicas = [2,1,0,], isr
> = [2,1,0,],
> Partition(topic = foo, partition = 4, leader = 1, replicas = [1,0,2,], isr
> = [1,0,2,],
> Partition(topic = foo, partition = 3, leader = 0, replicas = [0,2,1,], isr
> = [0,2,1,],
> Partition(topic = foo, partition = 2, leader = 2, replicas = [2,0,1,], isr
> = [2,0,1,],
> Partition(topic = foo, partition = 1, leader = 1, replicas = [1,2,0,], isr
> = [1,2,0,],
> Partition(topic = foo, partition = 0, leader = 0, replicas = [0,1,2,], isr
> = [0,1,2,]])
>
> DEBUG [2016-01-05 21:46:51,454] [] o.a.kafka.common.network.Selector -
> Connection with ip-10-42-18-0.us-west-2.compute.internal/10.42.18.0
> disconnected
> java.io.EOFException: null
> #011at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
> ~[vault_deploy.jar:na]
> #011at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> ~[vault_deploy.jar:na]
> #011at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:160)
> ~[vault_deploy.jar:na]
> #011at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:141)
> ~[vault_deploy.jar:na]
> #011at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
> ~[vault_deploy.jar:na]
> #011at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
> [vault_deploy.jar:na]
> #011at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> [vault_deploy.jar:na]
> #011at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> [vault_deploy.jar:na]
> #011at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
> DEBUG [2016-01-05 21:46:51,455] [] o.apache.kafka.clients.NetworkClient -
> Node 0 disconnected.
>
> DEBUG [2016-01-05 21:46:51,456] Updated cluster metadata
> Cluster(nodes = [Node(2, ip-10-42-26-175.us-west-2.compute.internal,
> 9092), Node(1, ip-10-42-21-178.us-west-2.compute.internal, 9092), Node(0,
> ip-10-42-18-0.us-west-2.compute.internal, 9092)], partitions = [
> Partition(topic = foo, partition = 7, leader = 1, replicas = [1,2,0,], isr
> = [1,2,],
> Partition(topic = foo, partition = 6, leader = 1, replicas = [0,1,2,], isr
> = [1,2,],
> Partition(topic = foo, partition = 5, leader = 2, replicas = [2,1,0,], isr
> = [2,1,],
> Partition(topic = foo, partition = 4, leader = 1, replicas = [1,0,2,], isr
> = [1,2,],
> Partition(topic = foo, partition = 3, leader = 2, replicas = [0,2,1,], isr
> = [2,1,],
> Partition(topic = foo, partition = 2, leader = 2, replicas = [2,0,1,], isr
> = [2,1,],
> Partition(topic = foo, partition = 1, leader = 1, replicas = [1,2,0,], isr
> = [1,2,],
> Partition(topic = foo, partition = 0, leader = 0, replicas = [0,1,2,], isr
> = [0,1,2,]])
>
> DEBUG [2016-01-05 21:49:18,159] [] o.a.kafka.common.network.Selector -
> Connection with ip-10-42-18-0.us-west-2.compute.internal/10.42.18.0
> disconnected
> java.net.ConnectException: Connection timed out
> #011at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> ~[na:1.8.0_66]
> #011at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> ~[na:1.8.0_66]
> #011at
> org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
> ~[vault_deploy.jar:na]
> #011at
> org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:79)
> ~[vault_deploy.jar:na]
> #011at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
> ~[vault_deploy.jar:na]
> #011at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
> [vault_deploy.jar:na]
> #011at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> [vault_deploy.jar:na]
> #011at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> [vault_deploy.jar:na]
> #011at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
> DEBUG [2016-01-05 21:49:18,159] [] o.apache.kafka.clients.NetworkClient -
> Node 0 disconnected.
>
>
> On Thu, Jan 14, 2016 at 10:47 AM, Gwen Shapira <g...@confluent.io> wrote:
>
>> Do you happen to have broker-logs and state-change logs from the
>> controlled
>> shutdown attempt?
>>
>> In theory, the producer should not really see a disconnect - it should get
>> NotALeader exception (because leaders are re-assigned before the shutdown)
>> that will cause it to get the metadata. I am guessing that leadership
>> actually *was* transferred but the broker that answered the metadata
>> request did not get the news .
>>
>> In 0.8.2 we had some bugs regarding how membership info is distributed to
>> all nodes. This was resolved in 0.9.0.0, so perhaps an upgrade will help.
>>
>> Gwen
>>
>> On Wed, Jan 13, 2016 at 11:22 AM, Luke Steensen <
>> luke.steen...@braintreepayments.com> wrote:
>>
>> > Yes, that was my intention and we have both of those configs turned on.
>> For
>> > some reason, however, the controlled shutdown wasn't transferring
>> > leadership of all partitions, which caused the issues I described in my
>> > initial email.
>> >
>> >
>> > On Wed, Jan 13, 2016 at 12:05 AM, Ján Koščo <3k.stan...@gmail.com>
>> wrote:
>> >
>> > > Not sure, but should combination of auto.leader.rebalance.enable=true
>> > > and controlled.shutdown.enable=true sort this out for you?
>> > >
>> > > 2016-01-13 1:13 GMT+01:00 Scott Reynolds <sreyno...@twilio.com>:
>> > >
>> > > > we use 0.9.0.0 and it is working fine. Not all the features work
>> and a
>> > > few
>> > > > things make a few assumptions about how zookeeper is used. But as a
>> > tool
>> > > > for provisioning, expanding and failure recovery it is working fine
>> so
>> > > far.
>> > > >
>> > > > *knocks on wood*
>> > > >
>> > > > On Tue, Jan 12, 2016 at 4:08 PM, Luke Steensen <
>> > > > luke.steen...@braintreepayments.com> wrote:
>> > > >
>> > > > > Ah, that's a good idea. Do you know if kafka-manager works with
>> kafka
>> > > 0.9
>> > > > > by chance? That would be a nice improvement of the cli tools.
>> > > > >
>> > > > > Thanks,
>> > > > > Luke
>> > > > >
>> > > > >
>> > > > > On Tue, Jan 12, 2016 at 4:53 PM, Scott Reynolds <
>> > sreyno...@twilio.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Luke,
>> > > > > >
>> > > > > > We practice the same immutable pattern on AWS. To decommission a
>> > > > broker,
>> > > > > we
>> > > > > > use partition reassignment first to move the partitions off of
>> the
>> > > node
>> > > > > and
>> > > > > > preferred leadership election. To do this with a web ui, so that
>> > you
>> > > > can
>> > > > > > handle it on lizard brain at 3 am, we have the Yahoo Kafka
>> Manager
>> > > > > running
>> > > > > > on the broker hosts.
>> > > > > >
>> > > > > > https://github.com/yahoo/kafka-manager
>> > > > > >
>> > > > > > On Tue, Jan 12, 2016 at 2:50 PM, Luke Steensen <
>> > > > > > luke.steen...@braintreepayments.com> wrote:
>> > > > > >
>> > > > > > > Hello,
>> > > > > > >
>> > > > > > > We've run into a bit of a head-scratcher with a new kafka
>> > > deployment
>> > > > > and
>> > > > > > > I'm curious if anyone has any ideas.
>> > > > > > >
>> > > > > > > A little bit of background: this deployment uses "immutable
>> > > > > > infrastructure"
>> > > > > > > on AWS, so instead of configuring the host in-place, we stop
>> the
>> > > > > broker,
>> > > > > > > tear down the instance, and replace it wholesale. My
>> > understanding
>> > > > was
>> > > > > > that
>> > > > > > > controlled shutdown combined with producer retries would allow
>> > this
>> > > > > > > operation to be zero-downtime. Unfortunately, things aren't
>> > working
>> > > > > quite
>> > > > > > > as I expected.
>> > > > > > >
>> > > > > > > After poring over the logs, I pieced together to following
>> chain
>> > of
>> > > > > > events:
>> > > > > > >
>> > > > > > >    1. our operations script stops the broker process and
>> proceeds
>> > > to
>> > > > > > >    terminate the instance
>> > > > > > >    2. our producer application detects the disconnect and
>> > requests
>> > > > > > updated
>> > > > > > >    metadata from another node
>> > > > > > >    3. updated metadata is returned successfully, but the
>> downed
>> > > > broker
>> > > > > is
>> > > > > > >    still listed as leader for a single partition of the given
>> > topic
>> > > > > > >    4. on the next produce request bound for that partition,
>> the
>> > > > > producer
>> > > > > > >    attempts to initiate a connection to the downed host
>> > > > > > >    5. because the instance has been terminated, the node is
>> now
>> > in
>> > > > the
>> > > > > > >    "connecting" state until the system-level tcp timeout
>> expires
>> > > (2-3
>> > > > > > > minutes)
>> > > > > > >    6. during this time, all produce requests to the given
>> > partition
>> > > > sit
>> > > > > > in
>> > > > > > >    the record accumulator until they expire and are
>> immediately
>> > > > failed
>> > > > > > > without
>> > > > > > >    retries
>> > > > > > >    7. the tcp timeout finally fires, the node is recognized as
>> > > > > > >    disconnected, more metadata is fetched, and things return
>> to
>> > > > sanity
>> > > > > > >
>> > > > > > > I was able to work around the issue by waiting 60 seconds
>> between
>> > > > > > shutting
>> > > > > > > down the broker and terminating that instance, as well as
>> raising
>> > > > > > > request.timeout.ms on the producer to 2x our zookeeper
>> timeout.
>> > > This
>> > > > > > gives
>> > > > > > > the broker a much quicker "connection refused" error instead
>> of
>> > the
>> > > > > > > connection timeout and seems to give enough time for normal
>> > failure
>> > > > > > > detection and leader election to kick in before requests are
>> > timed
>> > > > out.
>> > > > > > >
>> > > > > > > So two questions really: (1) are there any known issues that
>> > would
>> > > > > cause
>> > > > > > a
>> > > > > > > controlled shutdown to fail to release leadership of all
>> > > partitions?
>> > > > > and
>> > > > > > > (2) should the producer be timing out connection attempts more
>> > > > > > proactively?
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Luke
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to