Did you change fetch.wait.max.ms in the consumer config? If so, did you
make sure that it is smaller than socket.timeout.ms? Also, if you look at
the request log, how long does it take to complete the timed out fetch
request?

Thanks,

Jun


On Tue, Dec 17, 2013 at 2:30 PM, Tom Amon <ta46...@gmail.com> wrote:

> It appears that consumers that do not get messages regularly are timing out
> every 30 seconds. This interval coincides with the default setting for "
> socket.timeout.ms" at the consumer. When the timeout happens it looks like
> the broker socket hangs for a few seconds, causing all other connected
> consumers and producers to hang along with it. Producers recover by
> reconnecting, consumers do the same. The exception in the consumer log
> (every 30 seconds) is below. Am I misreading something? What is supposed to
> happen with consumers that don't regularly consume messages? Can a consumer
> timeout affect the broker socket such that it causes it to hang long enough
> for other connected producers to abort their connections?
>
> [2013-12-17 00:00:25,107] INFO Reconnect due to socket error:
>  (kafka.consumer.SimpleConsumer)
> java.net.SocketTimeoutException
> at java.io.InterruptedIOException.<init>(InterruptedIOException.java:43)
> at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:184)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:98)
> at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:232)
> at kafka.utils.Utils$.read(Utils.scala:374)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$sendRequest(SimpleConsumer.scala:71)
> at
>
> kafka.consumer.SimpleConsumer$anonfun$fetch$1$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> at
>
> kafka.consumer.SimpleConsumer$anonfun$fetch$1$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at
>
> kafka.consumer.SimpleConsumer$anonfun$fetch$1$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> -----------------------------------------------------------------------
>
> When you say it pauses, do you mean producing and consuming?  Can you get
> metrics form before that is happening, during and after?
>
>
>
> Could be gc pauses ... are you using this
> http://kafka.apache.org/documentation.html#java or defaults?
>
>
>
> /*******************************************
>
>  Joe Stein
>
>  Founder, Principal Consultant
>
>  Big Data Open Source Security LLC
>
>  http://www.stealth.ly
>
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>
> ********************************************/
>
>
>
>
>
> On Tue, Dec 17, 2013 at 2:01 PM, Tom Amon <ta46...@gmail.com> wrote:
>
>
>
> > I'm on Kafka 0.8 final. Both brokers are up. The behavior is my
>
> > producer produces messages just fine, then it pauses for a few
>
> > seconds. Then it continues. The brokers are not stopping and starting.
>
> > The broker logs show that another producer/consumer has a connection
>
> > error at the same time my producer pauses. The exception at my
>
> > producer (that pauses) indicates that the connection was aborted,
>
> > which to my understanding usually indicates that the TCP connection
>
> > was closed underneath it due to the broker end not responding. It's as
>
> > if the socket error indicated in the logs below for "10.236.67.30"
>
> > (the other producer/consumer) causes the broker to hang long enough for
> the TCP connection for my producer to timeout.
>
> >
>
> >
>
> > Thanks
>
> >
>
> >
>
> > ----------------------------------------------------------------------
>
> > ---
>
> >
>
> > What version of kafka are you on?
>
> >
>
> >
>
> >
>
> > It seems like your producers are not seeing your broker(s).  can you
>
> > confirm brokers are up?
>
> >
>
> >
>
> >
>
> > On Mon, Dec 16, 2013 at 7:52 PM, Tom Amon <ta46...@gmail.com> wrote:
>
> >
>
> >
>
> >
>
> > > Hi All,
>
> >
>
> > >
>
> >
>
> > > I have a situation where one producer/consumer is causing timeout
>
> >
>
> > > errors on the Kafka broker. The exception in the logs looks like this:
>
> >
>
> > >
>
> >
>
> > > [2013-12-16 17:32:25,992] ERROR Closing socket for /10.236.67.30
>
> >
>
> > > because of error (kafka.network.Processor)
>
> >
>
> > > java.io.IOException: Connection timed out
>
> >
>
> > >         at sun.nio.ch.FileDispatcher.read0(Native Method)
>
> >
>
> > >         at
>
> > > sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
>
> >
>
> > >         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
>
> >
>
> > >         at sun.nio.ch.IOUtil.read(IOUtil.java:175)
>
> >
>
> > >         at
>
> > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
>
> >
>
> > >         at kafka.utils.Utils$.read(Utils.scala:395)
>
> >
>
> > >         at
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferRecei
>
> > ve.scala:54)
>
> >
>
> > >         at kafka.network.Processor.read(SocketServer.scala:347)
>
> >
>
> > >         at kafka.network.Processor.run(SocketServer.scala:245)
>
> >
>
> > >         at java.lang.Thread.run(Thread.java:662)
>
> >
>
> > >
>
> >
>
> > > When this happens, _another separate_ producer hangs for about 2-5
>
> > seconds.
>
> >
>
> > >
>
> >
>
> > > In the producer log I get this exception:
>
> >
>
> > >
>
> >
>
> > > [2013-12-16 16:32:14,961] INFO Disconnecting from
>
> >
>
> > > qa-hermes004.phx.qa.com:9092 (kafka.producer.SyncProducer)
>
> >
>
> > > [2013-12-16 16:32:14,982] WARN Failed to send producer request with
>
> >
>
> > > correlation id 3290 to broker 1 with data for partitions [ktr3,4]
>
> >
>
> > > (kafka.producer.async.DefaultEventHandler)
>
> >
>
> > > java.io.IOException: An established connection was aborted by the
>
> >
>
> > > software in your host machine.
>
> >
>
> > >         at sun.nio.ch.SocketDispatcher.writev0(Native Method)
>
> >
>
> > >         at
>
> > > sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:49)
>
> >
>
> > >         at sun.nio.ch.IOUtil.write(IOUtil.java:171)
>
> >
>
> > >         at
>
> > sun.nio.ch.SocketChannelImpl.write0(SocketChannelImpl.java:377)
>
> >
>
> > >         at
>
> > > sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:400)
>
> >
>
> > >         at
>
> > > java.nio.channels.SocketChannel.write(SocketChannel.java:371)
>
> >
>
> > >         at
>
> >
>
> > >
>
> > kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scal
>
> > a:56)
>
> >
>
> > >         at
>
> > kafka.network.Send$class.writeCompletely(Transmission.scala:75)
>
> >
>
> > >         at
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferS
>
> > end.scala:26)
>
> >
>
> > >         at
>
> > > kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
>
> >
>
> > >         at
>
> > kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
>
> >
>
> > >         at
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncPr
>
> > oducer.scala:71)
>
> >
>
> > >         at
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.ap
>
> > ply$mcV$sp(SyncProducer.scala:102)
>
> >
>
> > >         at
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.ap
>
> > ply(SyncProducer.scala:102)
>
> >
>
> > >         at
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.ap
>
> > ply(SyncProducer.scala:102)
>
> >
>
> > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
> >
>
> > >         at
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.
>
> > scala:101)
>
> >
>
> > >         at
>
> >
>
> > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala
>
> > > :101)
>
> >
>
> > >         at
>
> >
>
> > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala
>
> > > :101)
>
> >
>
> > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
> >
>
> > >         at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
>
> >
>
> > >         at
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultE
>
> > ventHandler$$send(DefaultEventHandler.scala:245)
>
> >
>
> > >         at
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedDa
>
> > ta$1.apply(DefaultEventHandler.scala:107)
>
> >
>
> > >         at
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedDa
>
> > ta$1.apply(DefaultEventHandler.scala:101)
>
> >
>
> > >         at
>
> >
>
> > >
>
> > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scal
>
> > a:80)
>
> >
>
> > >         at
>
> >
>
> > >
>
> > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scal
>
> > a:80)
>
> >
>
> > >         at
>
> > > scala.collection.Iterator$class.foreach(Iterator.scala:631)
>
> >
>
> > >         at
>
> >
>
> > > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:1
>
> > > 61)
>
> >
>
> > >         at
>
> >
>
> > >
>
> > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:
>
> > 194)
>
> >
>
> > >         at
>
> > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>
> >
>
> > >         at
>
> > > scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>
> >
>
> > >         at
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(Defaul
>
> > tEventHandler.scala:97)
>
> >
>
> > >         at
>
> >
>
> > >
>
> >
>
> > >
>
> >
>
> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.sc
>
> > ala:73)
>
> >
>
> > >         at kafka.producer.Producer.send(Producer.scala:74)
>
> >
>
> > >         at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>
> >
>
> > >         at producer.MessageProducer.send(MessageProducer.java:33)
>
> >
>
> > >         at producer.MessageProducer.main(MessageProducer.java:103)
>
> >
>
> > > [2013-12-16 16:32:14,987] INFO Back off for 100 ms before retrying
> send.
>
> >
>
> > > Remaining retries = 3 (kafka.producer.async.DefaultEventHandler)
>
> >
>
> > > [2013-12-16 16:32:15,088] INFO Fetching metadata from broker id:0,host:
>
> >
>
> > > qa-hermes003.phx.qa.com,port:9092 with correlation id 3291 for 1
>
> >
>
> > > topic(s)
>
> >
>
> > > Set(ktr3) (kafka.client.ClientUtils$)
>
> >
>
> > > [2013-12-16 16:32:15,134] INFO Connected to
>
> >
>
> > > qa-hermes003.phx.qa.com:9092for producing
>
> >
>
> > > (kafka.producer.SyncProducer)
>
> >
>
> > > [2013-12-16 16:32:15,185] INFO Disconnecting from
>
> >
>
> > > qa-hermes003.phx.qa.com:9092 (kafka.producer.SyncProducer)
>
> >
>
> > > [2013-12-16 16:32:15,185] INFO Disconnecting from
>
> >
>
> > > qa-hermes003.phx.qa.com:9092 (kafka.producer.SyncProducer)
>
> >
>
> > > [2013-12-16 16:32:15,232] INFO Connected to
>
> >
>
> > > qa-hermes004.phx.qa.com:9092for producing
>
> >
>
> > > (kafka.producer.SyncProducer)
>
> >
>
> > > [2013-12-16 16:32:15,279] INFO Connected to
>
> >
>
> > > qa-hermes003.phx.qa.com:9092for producing
>
> >
>
> > > (kafka.producer.SyncProducer)
>
> >
>
> > >
>
> >
>
> > > It's almost as if the error from the other producer/consumer is
>
> >
>
> > > causing the broker itself to hang, causing my producer to abort the
>
> > connection.
>
> >
>
> > >
>
> >
>
> > > Has anyone else seen anything like this before? Any idea what's going
> on?
>
> >
>
> > >
>
> >
>

Reply via email to