Hi All,

I have a situation where one producer/consumer is causing timeout errors on
the Kafka broker. The exception in the logs looks like this:

[2013-12-16 17:32:25,992] ERROR Closing socket for /10.236.67.30 because of
error (kafka.network.Processor)
java.io.IOException: Connection timed out
        at sun.nio.ch.FileDispatcher.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
        at sun.nio.ch.IOUtil.read(IOUtil.java:175)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
        at kafka.utils.Utils$.read(Utils.scala:395)
        at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Processor.read(SocketServer.scala:347)
        at kafka.network.Processor.run(SocketServer.scala:245)
        at java.lang.Thread.run(Thread.java:662)

When this happens, _another separate_ producer hangs for about 2-5 seconds.

In the producer log I get this exception:

[2013-12-16 16:32:14,961] INFO Disconnecting from
qa-hermes004.phx.qa.com:9092 (kafka.producer.SyncProducer)
[2013-12-16 16:32:14,982] WARN Failed to send producer request with
correlation id 3290 to broker 1 with data for partitions [ktr3,4]
(kafka.producer.async.DefaultEventHandler)
java.io.IOException: An established connection was aborted by the software
in your host machine.
        at sun.nio.ch.SocketDispatcher.writev0(Native Method)
        at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:49)
        at sun.nio.ch.IOUtil.write(IOUtil.java:171)
        at sun.nio.ch.SocketChannelImpl.write0(SocketChannelImpl.java:377)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:400)
        at java.nio.channels.SocketChannel.write(SocketChannel.java:371)
        at
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
        at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
        at
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
        at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
        at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
        at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
        at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
        at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
        at
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
        at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:245)
        at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
        at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:97)
        at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
        at kafka.producer.Producer.send(Producer.scala:74)
        at kafka.javaapi.producer.Producer.send(Producer.scala:32)
        at producer.MessageProducer.send(MessageProducer.java:33)
        at producer.MessageProducer.main(MessageProducer.java:103)
[2013-12-16 16:32:14,987] INFO Back off for 100 ms before retrying send.
Remaining retries = 3 (kafka.producer.async.DefaultEventHandler)
[2013-12-16 16:32:15,088] INFO Fetching metadata from broker id:0,host:
qa-hermes003.phx.qa.com,port:9092 with correlation id 3291 for 1 topic(s)
Set(ktr3) (kafka.client.ClientUtils$)
[2013-12-16 16:32:15,134] INFO Connected to
qa-hermes003.phx.qa.com:9092for producing
(kafka.producer.SyncProducer)
[2013-12-16 16:32:15,185] INFO Disconnecting from
qa-hermes003.phx.qa.com:9092 (kafka.producer.SyncProducer)
[2013-12-16 16:32:15,185] INFO Disconnecting from
qa-hermes003.phx.qa.com:9092 (kafka.producer.SyncProducer)
[2013-12-16 16:32:15,232] INFO Connected to
qa-hermes004.phx.qa.com:9092for producing
(kafka.producer.SyncProducer)
[2013-12-16 16:32:15,279] INFO Connected to
qa-hermes003.phx.qa.com:9092for producing
(kafka.producer.SyncProducer)

It's almost as if the error from the other producer/consumer is causing the
broker itself to hang, causing my producer to abort the connection.

Has anyone else seen anything like this before? Any idea what's going on?

Reply via email to