When you say it pauses, do you mean producing and consuming?  Can you get
metrics form before that is happening, during and after?

Could be gc pauses ... are you using this
http://kafka.apache.org/documentation.html#java or defaults?

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/


On Tue, Dec 17, 2013 at 2:01 PM, Tom Amon <ta46...@gmail.com> wrote:

> I'm on Kafka 0.8 final. Both brokers are up. The behavior is my producer
> produces messages just fine, then it pauses for a few seconds. Then it
> continues. The brokers are not stopping and starting. The broker logs show
> that another producer/consumer has a connection error at the same time my
> producer pauses. The exception at my producer (that pauses) indicates that
> the connection was aborted, which to my understanding usually indicates
> that the TCP connection was closed underneath it due to the broker end not
> responding. It's as if the socket error indicated in the logs below for
> "10.236.67.30" (the other producer/consumer) causes the broker to hang long
> enough for the TCP connection for my producer to timeout.
>
>
> Thanks
>
>
> -------------------------------------------------------------------------
>
> What version of kafka are you on?
>
>
>
> It seems like your producers are not seeing your broker(s).  can you
> confirm brokers are up?
>
>
>
> On Mon, Dec 16, 2013 at 7:52 PM, Tom Amon <ta46...@gmail.com> wrote:
>
>
>
> > Hi All,
>
> >
>
> > I have a situation where one producer/consumer is causing timeout
>
> > errors on the Kafka broker. The exception in the logs looks like this:
>
> >
>
> > [2013-12-16 17:32:25,992] ERROR Closing socket for /10.236.67.30
>
> > because of error (kafka.network.Processor)
>
> > java.io.IOException: Connection timed out
>
> >         at sun.nio.ch.FileDispatcher.read0(Native Method)
>
> >         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
>
> >         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
>
> >         at sun.nio.ch.IOUtil.read(IOUtil.java:175)
>
> >         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
>
> >         at kafka.utils.Utils$.read(Utils.scala:395)
>
> >         at
>
> >
>
> >
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>
> >         at kafka.network.Processor.read(SocketServer.scala:347)
>
> >         at kafka.network.Processor.run(SocketServer.scala:245)
>
> >         at java.lang.Thread.run(Thread.java:662)
>
> >
>
> > When this happens, _another separate_ producer hangs for about 2-5
> seconds.
>
> >
>
> > In the producer log I get this exception:
>
> >
>
> > [2013-12-16 16:32:14,961] INFO Disconnecting from
>
> > qa-hermes004.phx.qa.com:9092 (kafka.producer.SyncProducer)
>
> > [2013-12-16 16:32:14,982] WARN Failed to send producer request with
>
> > correlation id 3290 to broker 1 with data for partitions [ktr3,4]
>
> > (kafka.producer.async.DefaultEventHandler)
>
> > java.io.IOException: An established connection was aborted by the
>
> > software in your host machine.
>
> >         at sun.nio.ch.SocketDispatcher.writev0(Native Method)
>
> >         at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:49)
>
> >         at sun.nio.ch.IOUtil.write(IOUtil.java:171)
>
> >         at
> sun.nio.ch.SocketChannelImpl.write0(SocketChannelImpl.java:377)
>
> >         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:400)
>
> >         at java.nio.channels.SocketChannel.write(SocketChannel.java:371)
>
> >         at
>
> >
> kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
>
> >         at
> kafka.network.Send$class.writeCompletely(Transmission.scala:75)
>
> >         at
>
> >
>
> >
>
> kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
>
> >         at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
>
> >         at
> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
>
> >         at
>
> >
>
> >
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
>
> >         at
>
> >
>
> >
>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
>
> >         at
>
> >
>
> >
>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
>
> >         at
>
> >
>
> >
>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
>
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
> >         at
>
> >
>
> >
>
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
>
> >         at
>
> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
>
> >         at
>
> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
>
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
> >         at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
>
> >         at
>
> >
>
> >
>
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:245)
>
> >         at
>
> >
>
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
>
> >         at
>
> >
>
> >
>
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
>
> >         at
>
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>
> >         at
>
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>
> >         at
>
> > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
>
> >         at
>
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
>
> >         at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>
> >         at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>
> >         at
>
> >
>
> >
>
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:97)
>
> >         at
>
> >
>
> >
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
>
> >         at kafka.producer.Producer.send(Producer.scala:74)
>
> >         at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>
> >         at producer.MessageProducer.send(MessageProducer.java:33)
>
> >         at producer.MessageProducer.main(MessageProducer.java:103)
>
> > [2013-12-16 16:32:14,987] INFO Back off for 100 ms before retrying send.
>
> > Remaining retries = 3 (kafka.producer.async.DefaultEventHandler)
>
> > [2013-12-16 16:32:15,088] INFO Fetching metadata from broker id:0,host:
>
> > qa-hermes003.phx.qa.com,port:9092 with correlation id 3291 for 1
>
> > topic(s)
>
> > Set(ktr3) (kafka.client.ClientUtils$)
>
> > [2013-12-16 16:32:15,134] INFO Connected to
>
> > qa-hermes003.phx.qa.com:9092for producing
>
> > (kafka.producer.SyncProducer)
>
> > [2013-12-16 16:32:15,185] INFO Disconnecting from
>
> > qa-hermes003.phx.qa.com:9092 (kafka.producer.SyncProducer)
>
> > [2013-12-16 16:32:15,185] INFO Disconnecting from
>
> > qa-hermes003.phx.qa.com:9092 (kafka.producer.SyncProducer)
>
> > [2013-12-16 16:32:15,232] INFO Connected to
>
> > qa-hermes004.phx.qa.com:9092for producing
>
> > (kafka.producer.SyncProducer)
>
> > [2013-12-16 16:32:15,279] INFO Connected to
>
> > qa-hermes003.phx.qa.com:9092for producing
>
> > (kafka.producer.SyncProducer)
>
> >
>
> > It's almost as if the error from the other producer/consumer is
>
> > causing the broker itself to hang, causing my producer to abort the
> connection.
>
> >
>
> > Has anyone else seen anything like this before? Any idea what's going on?
>
> >
>

Reply via email to