Hi all, I've been investigating how Kafka 0.8.1.1 responds to the scenario where one broker loses connectivity (due to something like a hardware issue or network partition.) It looks like the brokers themselves adjust within a few seconds to reassign leaders and shrink ISRs. However, I see producer threads block for multiple minutes before timing out, regardless of what producer settings I use. Why would this be?
Here is my test procedure: 1. Start up three brokers. 2. Create a topic with 3 partitions and replication factor 3. 3. Start up a producer with producer.type=sync, request.required.acks=1, request.timeout.ms=1000, message.send.max.retries=0. (With this configuration I'd expect all requests to complete or error within a second.) 4. Make the producer send one message per second. 5. Disable connectivity for one broker via iptables. The result is that I see the producer block for almost two minutes before timing out, way more than the one second timeout I configured. Often I see that the first request to the bad broker times out after a second as expected, but a subsequent request takes minutes to time out. I've included example producer logs below. Any idea why this would happen or if there is some config option I'm missing to prevent it? We would like to be able to recover from this scenario in seconds, not minutes. Thanks, Solon First request times out after a second: 17:48:48.602 [Producer timer] DEBUG k.producer.async.DefaultEventHandler - Producer sending messages with correlation id 30 for to pics [latency-measurer,0] to broker XXX on YYY:9092 17:48:49.604 [Producer timer] INFO kafka.producer.SyncProducer - Disconnecting from YYY:9092 17:48:49.617 [Producer timer] WARN k.producer.async.DefaultEventHandler - Failed to send producer request with correlation id 30 to broker XXX with data for partitions [latency-measurer,0] java.net.SocketTimeoutException: null at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) ~[na:1.7.0_55] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[na:1.7.0_55] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[na:1.7.0_55] at kafka.utils.Utils$.read(Unknown Source) at kafka.network.BoundedByteBufferReceive.readFrom(Unknown Source) at kafka.network.Receive$class.readCompletely(Unknown Source) at kafka.network.BoundedByteBufferReceive.readCompletely(Unknown Source) at kafka.network.BlockingChannel.receive(Unknown Source) at kafka.producer.SyncProducer.liftedTree1$1(Unknown Source) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(Unknown Source) ... The next takes over two minutes: 17:48:50.602 [Producer timer] DEBUG k.producer.async.DefaultEventHandler - Producer sending messages with correlation id 35 for topics [latency-measurer,0] to broker XXX on YYY:9092 17:50:57.793 [Producer timer] ERROR kafka.producer.SyncProducer - Producer connection to YYY:9092 unsuccessful java.net.ConnectException: Connection timed out at sun.nio.ch.Net.connect0(Native Method) ~[na:1.7.0_55] at sun.nio.ch.Net.connect(Net.java:465) ~[na:1.7.0_55] at sun.nio.ch.Net.connect(Net.java:457) ~[na:1.7.0_55] at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670) ~[na:1.7.0_55] at kafka.network.BlockingChannel.connect(Unknown Source) at kafka.producer.SyncProducer.connect(Unknown Source) at kafka.producer.SyncProducer.getOrMakeConnection(Unknown Source) ...