These configs are reasonable and shouldn't affect consumer timeouts. Did you get the time breakdown from the request log?
Thanks, Jun On Fri, Dec 20, 2013 at 6:14 PM, Tom Amon <ta46...@gmail.com> wrote: > I figured out the cause but I'm stumped as to the reason. If the brokers do > _not_ have the replica settings in them, everything works fine. Putting the > replica settings into the config file causes all consumers to experience a > socket timeout every 30 seconds. > > Specifically here are the settings I put into the file: > > num.replica.fetchers=4 > replica.fetch.max.bytes=1048576 > replica.fetch.wait.max.ms=500 > replica.high.watermark.checkpoint.interval.ms=5000 > replica.socket.timeout.ms=30000 > replica.socket.receive.buffer.bytes=65536 > replica.lag.time.max.ms=10000 > replica.lag.max.messages=4000 > > I copied them from the kafka docs. Have I missed something important in the > documentation? These are the only properties that I changed to cause this > issue to happen. Everything else is default or specified in the config file > shipped with the kafka binaries. > > Thanks > > > ------------------------------------------------------------------------ > > The fetch.wait.max.ms is unchanged. The request log is empty. I'll turn it > up to TRACE (it's currently on INFO) and see what it contains. > > > > Thanks > > > > > > ----------------------------------------------------------------------- > > > > > > Did you change fetch.wait.max.ms in the consumer config? If so, did you > make sure that it is smaller than socket.timeout.ms? Also, if you look at > the request log, how long does it take to complete the timed out fetch > request? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > On Tue, Dec 17, 2013 at 2:30 PM, Tom Amon <ta46...@gmail.com> wrote: > > > > > > > > > It appears that consumers that do not get messages regularly are > > > > > timing out every 30 seconds. This interval coincides with the default > > setting for " > > > > > socket.timeout.ms" at the consumer. When the timeout happens it looks > > > > > like the broker socket hangs for a few seconds, causing all other > > > > > connected consumers and producers to hang along with it. Producers > > > > > recover by reconnecting, consumers do the same. The exception in the > > > > > consumer log (every 30 seconds) is below. Am I misreading something? > > > > > What is supposed to happen with consumers that don't regularly consume > > > > > messages? Can a consumer timeout affect the broker socket such that it > > > > > causes it to hang long enough for other connected producers to abort > > their connections? > > > > > > > > > > [2013-12-17 00:00:25,107] INFO Reconnect due to socket error: > > > > > (kafka.consumer.SimpleConsumer) > > > > > java.net.SocketTimeoutException > > > > > at > > > > > java.io.InterruptedIOException.<init>(InterruptedIOException.java:43) > > > > > at > > > > > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:184 > > > > > ) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:98) > > > > > at > > > > > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java: > > > > > 232) at kafka.utils.Utils$.read(Utils.scala:374) > > > > > at > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferRecei > > > > > ve.scala:54) at > > > > > kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > > > > at > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBuffe > > > > > rReceive.scala:29) at > > > > > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > > > > at > > > > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) > > > > > at > > > > > > > > > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$sendReques > > > > > t(SimpleConsumer.scala:71) > > > > > at > > > > > > > > > > kafka.consumer.SimpleConsumer$anonfun$fetch$1$anonfun$apply$mcV$sp$1.a > > > > > pply$mcV$sp(SimpleConsumer.scala:110) > > > > > at > > > > > > > > > > kafka.consumer.SimpleConsumer$anonfun$fetch$1$anonfun$apply$mcV$sp$1.a > > > > > pply(SimpleConsumer.scala:110) > > > > > at > > > > > > > > > > kafka.consumer.SimpleConsumer$anonfun$fetch$1$anonfun$apply$mcV$sp$1.a > > > > > pply(SimpleConsumer.scala:110) at > > > > > kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > > > at > > > > > > > > > > kafka.consumer.SimpleConsumer$anonfun$fetch$1.apply$mcV$sp(SimpleConsu > > > > > mer.scala:109) > > > > > at > > > > > > > > > > kafka.consumer.SimpleConsumer$anonfun$fetch$1.apply(SimpleConsumer.sca > > > > > la:109) > > > > > at > > > > > > > > > > kafka.consumer.SimpleConsumer$anonfun$fetch$1.apply(SimpleConsumer.sca > > > > > la:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108) > > > > > at > > > > > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcher > > > > > Thread.scala:96) > > > > > at > > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala: > > > > > 88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > > > > > > > ---------------------------------------------------------------------- > > > > > - > > > > > > > > > > When you say it pauses, do you mean producing and consuming? Can you > > > > > get metrics form before that is happening, during and after? > > > > > > > > > > > > > > > > > > > > Could be gc pauses ... are you using this > > > > > http://kafka.apache.org/documentation.html#java or defaults? > > > > > > > > > > > > > > > > > > > > /******************************************* > > > > > > > > > > Joe Stein > > > > > > > > > > Founder, Principal Consultant > > > > > > > > > > Big Data Open Source Security LLC > > > > > > > > > > http://www.stealth.ly > > > > > > > > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > > > > > > > > > ********************************************/ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Dec 17, 2013 at 2:01 PM, Tom Amon <ta46...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > I'm on Kafka 0.8 final. Both brokers are up. The behavior is my > > > > > > > > > > > producer produces messages just fine, then it pauses for a few > > > > > > > > > > > seconds. Then it continues. The brokers are not stopping and starting. > > > > > > > > > > > The broker logs show that another producer/consumer has a connection > > > > > > > > > > > error at the same time my producer pauses. The exception at my > > > > > > > > > > > producer (that pauses) indicates that the connection was aborted, > > > > > > > > > > > which to my understanding usually indicates that the TCP connection > > > > > > > > > > > was closed underneath it due to the broker end not responding. It's > > > > > > as > > > > > > > > > > > if the socket error indicated in the logs below for "10.236.67.30" > > > > > > > > > > > (the other producer/consumer) causes the broker to hang long enough > > > > > > for > > > > > the TCP connection for my producer to timeout. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -------------------------------------------------------------------- > > > > > > -- > > > > > > > > > > > --- > > > > > > > > > > > > > > > > > > > > > > What version of kafka are you on? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It seems like your producers are not seeing your broker(s). can you > > > > > > > > > > > confirm brokers are up? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 16, 2013 at 7:52 PM, Tom Amon <ta46...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi All, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have a situation where one producer/consumer is causing timeout > > > > > > > > > > > > > > > > > > > > > > > errors on the Kafka broker. The exception in the logs looks like > this: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [2013-12-16 17:32:25,992] ERROR Closing socket for /10.236.67.30 > > > > > > > > > > > > > > > > > > > > > > > because of error (kafka.network.Processor) > > > > > > > > > > > > > > > > > > > > > > > java.io.IOException: Connection timed out > > > > > > > > > > > > > > > > > > > > > > > at sun.nio.ch.FileDispatcher.read0(Native Method) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) > > > > > > > > > > > > > > > > > > > > > > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202) > > > > > > > > > > > > > > > > > > > > > > > at sun.nio.ch.IOUtil.read(IOUtil.java:175) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243) > > > > > > > > > > > > > > > > > > > > > > > at kafka.utils.Utils$.read(Utils.scala:395) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferRec > > > > > > ei > > > > > > > > > > > ve.scala:54) > > > > > > > > > > > > > > > > > > > > > > > at kafka.network.Processor.read(SocketServer.scala:347) > > > > > > > > > > > > > > > > > > > > > > > at kafka.network.Processor.run(SocketServer.scala:245) > > > > > > > > > > > > > > > > > > > > > > > at java.lang.Thread.run(Thread.java:662) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > When this happens, _another separate_ producer hangs for about 2-5 > > > > > > > > > > > seconds. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In the producer log I get this exception: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [2013-12-16 16:32:14,961] INFO Disconnecting from > > > > > > > > > > > > > > > > > > > > > > > qa-hermes004.phx.qa.com:9092 (kafka.producer.SyncProducer) > > > > > > > > > > > > > > > > > > > > > > > [2013-12-16 16:32:14,982] WARN Failed to send producer request > > > > > > > with > > > > > > > > > > > > > > > > > > > > > > > correlation id 3290 to broker 1 with data for partitions [ktr3,4] > > > > > > > > > > > > > > > > > > > > > > > (kafka.producer.async.DefaultEventHandler) > > > > > > > > > > > > > > > > > > > > > > > java.io.IOException: An established connection was aborted by the > > > > > > > > > > > > > > > > > > > > > > > software in your host machine. > > > > > > > > > > > > > > > > > > > > > > > at sun.nio.ch.SocketDispatcher.writev0(Native Method) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:49) > > > > > > > > > > > > > > > > > > > > > > > at sun.nio.ch.IOUtil.write(IOUtil.java:171) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > sun.nio.ch.SocketChannelImpl.write0(SocketChannelImpl.java:377) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:400) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > java.nio.channels.SocketChannel.write(SocketChannel.java:371) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.sc > > > > > > al > > > > > > > > > > > a:56) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > kafka.network.Send$class.writeCompletely(Transmission.scala:75) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBuffe > > > > > > rS > > > > > > > > > > > end.scala:26) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > kafka.network.BlockingChannel.send(BlockingChannel.scala:92) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(Sync > > > > > > Pr > > > > > > > > > > > oducer.scala:71) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1. > > > > > > ap > > > > > > > > > > > ply$mcV$sp(SyncProducer.scala:102) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1. > > > > > > ap > > > > > > > > > > > ply(SyncProducer.scala:102) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1. > > > > > > ap > > > > > > > > > > > ply(SyncProducer.scala:102) > > > > > > > > > > > > > > > > > > > > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer. > > > > > > > > > > > scala:101) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.sca > > > > > > > la > > > > > > > > > > > > :101) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.sca > > > > > > > la > > > > > > > > > > > > :101) > > > > > > > > > > > > > > > > > > > > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > kafka.producer.SyncProducer.send(SyncProducer.scala:100) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$Defaul > > > > > > tE > > > > > > > > > > > ventHandler$$send(DefaultEventHandler.scala:245) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerialized > > > > > > Da > > > > > > > > > > > ta$1.apply(DefaultEventHandler.scala:107) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerialized > > > > > > Da > > > > > > > > > > > ta$1.apply(DefaultEventHandler.scala:101) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sc > > > > > > al > > > > > > > > > > > a:80) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sc > > > > > > al > > > > > > > > > > > a:80) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > scala.collection.Iterator$class.foreach(Iterator.scala:631) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala > > > > > > > :1 > > > > > > > > > > > > 61) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala: > > > > > > > > > > > 194) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(Defa > > > > > > ul > > > > > > > > > > > tEventHandler.scala:97) > > > > > > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler. > > > > > > sc > > > > > > > > > > > ala:73) > > > > > > > > > > > > > > > > > > > > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > > > > > > > > > > > > > > > > > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > > > > > > > > > > > > > > > > > > > > at producer.MessageProducer.send(MessageProducer.java:33) > > > > > > > > > > > > > > > > > > > > > > > at producer.MessageProducer.main(MessageProducer.java:103) > > > > > > > > > > > > > > > > > > > > > > > [2013-12-16 16:32:14,987] INFO Back off for 100 ms before retrying > > > > > send. > > > > > > > > > > > > > > > > > > > > > > > Remaining retries = 3 (kafka.producer.async.DefaultEventHandler) > > > > > > > > > > > > > > > > > > > > > > > [2013-12-16 16:32:15,088] INFO Fetching metadata from broker > > id:0,host: > > > > > > > > > > > > > > > > > > > > > > > qa-hermes003.phx.qa.com,port:9092 with correlation id 3291 for 1 > > > > > > > > > > > > > > > > > > > > > > > topic(s) > > > > > > > > > > > > > > > > > > > > > > > Set(ktr3) (kafka.client.ClientUtils$) > > > > > > > > > > > > > > > > > > > > > > > [2013-12-16 16:32:15,134] INFO Connected to > > > > > > > > > > > > > > > > > > > > > > > qa-hermes003.phx.qa.com:9092for producing > > > > > > > > > > > > > > > > > > > > > > > (kafka.producer.SyncProducer) > > > > > > > > > > > > > > > > > > > > > > > [2013-12-16 16:32:15,185] INFO Disconnecting from > > > > > > > > > > > > > > > > > > > > > > > qa-hermes003.phx.qa.com:9092 (kafka.producer.SyncProducer) > > > > > > > > > > > > > > > > > > > > > > > [2013-12-16 16:32:15,185] INFO Disconnecting from > > > > > > > > > > > > > > > > > > > > > > > qa-hermes003.phx.qa.com:9092 (kafka.producer.SyncProducer) > > > > > > > > > > > > > > > > > > > > > > > [2013-12-16 16:32:15,232] INFO Connected to > > > > > > > > > > > > > > > > > > > > > > > qa-hermes004.phx.qa.com:9092for producing > > > > > > > > > > > > > > > > > > > > > > > (kafka.producer.SyncProducer) > > > > > > > > > > > > > > > > > > > > > > > [2013-12-16 16:32:15,279] INFO Connected to > > > > > > > > > > > > > > > > > > > > > > > qa-hermes003.phx.qa.com:9092for producing > > > > > > > > > > > > > > > > > > > > > > > (kafka.producer.SyncProducer) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It's almost as if the error from the other producer/consumer is > > > > > > > > > > > > > > > > > > > > > > > causing the broker itself to hang, causing my producer to abort > > > > > > > the > > > > > > > > > > > connection. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Has anyone else seen anything like this before? Any idea what's > > > > > > > going > > > > > on? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >