Hello Solon,

request.timeout.ms only controls the produce request timeout value, when
the producer's first produce request gets timed out, it will try to
re-fresh its metadata by sending metadata request. But when this
non-produce request hits the broker whose connectivity has been disabled
(i.e. trying to re-connect to that broker), it will not be respecting 1 sec
timeout.

I think this is indeed an issue: basically when we gets a request time out
from the broker, we would avoid trying to re-connect to it refreshing
metadata. Could you file a JIRA for this?

Guozhang


On Tue, Nov 4, 2014 at 10:43 AM, Solon Gordon <so...@knewton.com> wrote:

> Hi all,
>
> I've been investigating how Kafka 0.8.1.1 responds to the scenario where
> one broker loses connectivity (due to something like a hardware issue or
> network partition.) It looks like the brokers themselves adjust within a
> few seconds to reassign leaders and shrink ISRs. However, I see producer
> threads block for multiple minutes before timing out, regardless of what
> producer settings I use. Why would this be?
>
> Here is my test procedure:
> 1. Start up three brokers.
> 2. Create a topic with 3 partitions and replication factor 3.
> 3. Start up a producer with producer.type=sync, request.required.acks=1,
> request.timeout.ms=1000, message.send.max.retries=0. (With this
> configuration I'd expect all requests to complete or error within a
> second.)
> 4. Make the producer send one message per second.
> 5. Disable connectivity for one broker via iptables.
>
> The result is that I see the producer block for almost two minutes before
> timing out, way more than the one second timeout I configured. Often I see
> that the first request to the bad broker times out after a second as
> expected, but a subsequent request takes minutes to time out. I've included
> example producer logs below.
>
> Any idea why this would happen or if there is some config option I'm
> missing to prevent it? We would like to be able to recover from this
> scenario in seconds, not minutes.
>
> Thanks,
> Solon
>
>
> First request times out after a second:
> 17:48:48.602 [Producer timer] DEBUG k.producer.async.DefaultEventHandler -
> Producer sending messages with correlation id 30 for to
> pics [latency-measurer,0] to broker XXX on YYY:9092
> 17:48:49.604 [Producer timer] INFO  kafka.producer.SyncProducer -
> Disconnecting from YYY:9092
> 17:48:49.617 [Producer timer] WARN  k.producer.async.DefaultEventHandler -
> Failed to send producer request with correlation id 30
> to broker XXX with data for partitions [latency-measurer,0]
> java.net.SocketTimeoutException: null
>         at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
> ~[na:1.7.0_55]
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> ~[na:1.7.0_55]
>         at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> ~[na:1.7.0_55]
>         at kafka.utils.Utils$.read(Unknown Source)
>         at kafka.network.BoundedByteBufferReceive.readFrom(Unknown Source)
>         at kafka.network.Receive$class.readCompletely(Unknown Source)
>         at kafka.network.BoundedByteBufferReceive.readCompletely(Unknown
> Source)
>         at kafka.network.BlockingChannel.receive(Unknown Source)
>         at kafka.producer.SyncProducer.liftedTree1$1(Unknown Source)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(Unknown
> Source)
>         ...
>
> The next takes over two minutes:
> 17:48:50.602 [Producer timer] DEBUG k.producer.async.DefaultEventHandler -
> Producer sending messages with correlation id 35 for topics
> [latency-measurer,0] to broker XXX on YYY:9092
> 17:50:57.793 [Producer timer] ERROR kafka.producer.SyncProducer - Producer
> connection to YYY:9092 unsuccessful
> java.net.ConnectException: Connection timed out
>         at sun.nio.ch.Net.connect0(Native Method) ~[na:1.7.0_55]
>         at sun.nio.ch.Net.connect(Net.java:465) ~[na:1.7.0_55]
>         at sun.nio.ch.Net.connect(Net.java:457) ~[na:1.7.0_55]
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> ~[na:1.7.0_55]
>         at kafka.network.BlockingChannel.connect(Unknown Source)
>         at kafka.producer.SyncProducer.connect(Unknown Source)
>         at kafka.producer.SyncProducer.getOrMakeConnection(Unknown Source)
>         ...
>



-- 
-- Guozhang

Reply via email to