Hi all,

I've been investigating how Kafka 0.8.1.1 responds to the scenario where
one broker loses connectivity (due to something like a hardware issue or
network partition.) It looks like the brokers themselves adjust within a
few seconds to reassign leaders and shrink ISRs. However, I see producer
threads block for multiple minutes before timing out, regardless of what
producer settings I use. Why would this be?

Here is my test procedure:
1. Start up three brokers.
2. Create a topic with 3 partitions and replication factor 3.
3. Start up a producer with producer.type=sync, request.required.acks=1,
request.timeout.ms=1000, message.send.max.retries=0. (With this
configuration I'd expect all requests to complete or error within a second.)
4. Make the producer send one message per second.
5. Disable connectivity for one broker via iptables.

The result is that I see the producer block for almost two minutes before
timing out, way more than the one second timeout I configured. Often I see
that the first request to the bad broker times out after a second as
expected, but a subsequent request takes minutes to time out. I've included
example producer logs below.

Any idea why this would happen or if there is some config option I'm
missing to prevent it? We would like to be able to recover from this
scenario in seconds, not minutes.

Thanks,
Solon


First request times out after a second:
17:48:48.602 [Producer timer] DEBUG k.producer.async.DefaultEventHandler -
Producer sending messages with correlation id 30 for to
pics [latency-measurer,0] to broker XXX on YYY:9092
17:48:49.604 [Producer timer] INFO  kafka.producer.SyncProducer -
Disconnecting from YYY:9092
17:48:49.617 [Producer timer] WARN  k.producer.async.DefaultEventHandler -
Failed to send producer request with correlation id 30
to broker XXX with data for partitions [latency-measurer,0]
java.net.SocketTimeoutException: null
        at
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
~[na:1.7.0_55]
        at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
~[na:1.7.0_55]
        at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
~[na:1.7.0_55]
        at kafka.utils.Utils$.read(Unknown Source)
        at kafka.network.BoundedByteBufferReceive.readFrom(Unknown Source)
        at kafka.network.Receive$class.readCompletely(Unknown Source)
        at kafka.network.BoundedByteBufferReceive.readCompletely(Unknown
Source)
        at kafka.network.BlockingChannel.receive(Unknown Source)
        at kafka.producer.SyncProducer.liftedTree1$1(Unknown Source)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(Unknown
Source)
        ...

The next takes over two minutes:
17:48:50.602 [Producer timer] DEBUG k.producer.async.DefaultEventHandler -
Producer sending messages with correlation id 35 for topics
[latency-measurer,0] to broker XXX on YYY:9092
17:50:57.793 [Producer timer] ERROR kafka.producer.SyncProducer - Producer
connection to YYY:9092 unsuccessful
java.net.ConnectException: Connection timed out
        at sun.nio.ch.Net.connect0(Native Method) ~[na:1.7.0_55]
        at sun.nio.ch.Net.connect(Net.java:465) ~[na:1.7.0_55]
        at sun.nio.ch.Net.connect(Net.java:457) ~[na:1.7.0_55]
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
~[na:1.7.0_55]
        at kafka.network.BlockingChannel.connect(Unknown Source)
        at kafka.producer.SyncProducer.connect(Unknown Source)
        at kafka.producer.SyncProducer.getOrMakeConnection(Unknown Source)
        ...

Reply via email to