Hello Solon, request.timeout.ms only controls the produce request timeout value, when the producer's first produce request gets timed out, it will try to re-fresh its metadata by sending metadata request. But when this non-produce request hits the broker whose connectivity has been disabled (i.e. trying to re-connect to that broker), it will not be respecting 1 sec timeout.
I think this is indeed an issue: basically when we gets a request time out from the broker, we would avoid trying to re-connect to it refreshing metadata. Could you file a JIRA for this? Guozhang On Tue, Nov 4, 2014 at 10:43 AM, Solon Gordon <so...@knewton.com> wrote: > Hi all, > > I've been investigating how Kafka 0.8.1.1 responds to the scenario where > one broker loses connectivity (due to something like a hardware issue or > network partition.) It looks like the brokers themselves adjust within a > few seconds to reassign leaders and shrink ISRs. However, I see producer > threads block for multiple minutes before timing out, regardless of what > producer settings I use. Why would this be? > > Here is my test procedure: > 1. Start up three brokers. > 2. Create a topic with 3 partitions and replication factor 3. > 3. Start up a producer with producer.type=sync, request.required.acks=1, > request.timeout.ms=1000, message.send.max.retries=0. (With this > configuration I'd expect all requests to complete or error within a > second.) > 4. Make the producer send one message per second. > 5. Disable connectivity for one broker via iptables. > > The result is that I see the producer block for almost two minutes before > timing out, way more than the one second timeout I configured. Often I see > that the first request to the bad broker times out after a second as > expected, but a subsequent request takes minutes to time out. I've included > example producer logs below. > > Any idea why this would happen or if there is some config option I'm > missing to prevent it? We would like to be able to recover from this > scenario in seconds, not minutes. > > Thanks, > Solon > > > First request times out after a second: > 17:48:48.602 [Producer timer] DEBUG k.producer.async.DefaultEventHandler - > Producer sending messages with correlation id 30 for to > pics [latency-measurer,0] to broker XXX on YYY:9092 > 17:48:49.604 [Producer timer] INFO kafka.producer.SyncProducer - > Disconnecting from YYY:9092 > 17:48:49.617 [Producer timer] WARN k.producer.async.DefaultEventHandler - > Failed to send producer request with correlation id 30 > to broker XXX with data for partitions [latency-measurer,0] > java.net.SocketTimeoutException: null > at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) > ~[na:1.7.0_55] > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > ~[na:1.7.0_55] > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > ~[na:1.7.0_55] > at kafka.utils.Utils$.read(Unknown Source) > at kafka.network.BoundedByteBufferReceive.readFrom(Unknown Source) > at kafka.network.Receive$class.readCompletely(Unknown Source) > at kafka.network.BoundedByteBufferReceive.readCompletely(Unknown > Source) > at kafka.network.BlockingChannel.receive(Unknown Source) > at kafka.producer.SyncProducer.liftedTree1$1(Unknown Source) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(Unknown > Source) > ... > > The next takes over two minutes: > 17:48:50.602 [Producer timer] DEBUG k.producer.async.DefaultEventHandler - > Producer sending messages with correlation id 35 for topics > [latency-measurer,0] to broker XXX on YYY:9092 > 17:50:57.793 [Producer timer] ERROR kafka.producer.SyncProducer - Producer > connection to YYY:9092 unsuccessful > java.net.ConnectException: Connection timed out > at sun.nio.ch.Net.connect0(Native Method) ~[na:1.7.0_55] > at sun.nio.ch.Net.connect(Net.java:465) ~[na:1.7.0_55] > at sun.nio.ch.Net.connect(Net.java:457) ~[na:1.7.0_55] > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) > ~[na:1.7.0_55] > at kafka.network.BlockingChannel.connect(Unknown Source) > at kafka.producer.SyncProducer.connect(Unknown Source) > at kafka.producer.SyncProducer.getOrMakeConnection(Unknown Source) > ... > -- -- Guozhang