These configs are reasonable and shouldn't affect consumer timeouts. Did
you get the time breakdown from the request log?

Thanks,

Jun


On Fri, Dec 20, 2013 at 6:14 PM, Tom Amon <ta46...@gmail.com> wrote:

> I figured out the cause but I'm stumped as to the reason. If the brokers do
> _not_ have the replica settings in them, everything works fine. Putting the
> replica settings into the config file causes all consumers to experience a
> socket timeout every 30 seconds.
>
> Specifically here are the settings I put into the file:
>
> num.replica.fetchers=4
> replica.fetch.max.bytes=1048576
> replica.fetch.wait.max.ms=500
> replica.high.watermark.checkpoint.interval.ms=5000
> replica.socket.timeout.ms=30000
> replica.socket.receive.buffer.bytes=65536
> replica.lag.time.max.ms=10000
> replica.lag.max.messages=4000
>
> I copied them from the kafka docs. Have I missed something important in the
> documentation? These are the only properties that I changed to cause this
> issue to happen. Everything else is default or specified in the config file
> shipped with the kafka binaries.
>
> Thanks
>
>
> ------------------------------------------------------------------------
>
> The fetch.wait.max.ms is unchanged. The request log is empty. I'll turn it
> up to TRACE (it's currently on INFO) and see what it contains.
>
>
>
> Thanks
>
>
>
>
>
> -----------------------------------------------------------------------
>
>
>
>
>
> Did you change fetch.wait.max.ms in the consumer config? If so, did you
> make sure that it is smaller than socket.timeout.ms? Also, if you look at
> the request log, how long does it take to complete the timed out fetch
> request?
>
>
>
>
>
>
>
> Thanks,
>
>
>
>
>
>
>
> Jun
>
>
>
>
>
>
>
>
>
>
>
> On Tue, Dec 17, 2013 at 2:30 PM, Tom Amon <ta46...@gmail.com> wrote:
>
>
>
>
>
>
>
> > It appears that consumers that do not get messages regularly are
>
>
>
> > timing out every 30 seconds. This interval coincides with the default
>
> setting for "
>
>
>
> > socket.timeout.ms" at the consumer. When the timeout happens it looks
>
>
>
> > like the broker socket hangs for a few seconds, causing all other
>
>
>
> > connected consumers and producers to hang along with it. Producers
>
>
>
> > recover by reconnecting, consumers do the same. The exception in the
>
>
>
> > consumer log (every 30 seconds) is below. Am I misreading something?
>
>
>
> > What is supposed to happen with consumers that don't regularly consume
>
>
>
> > messages? Can a consumer timeout affect the broker socket such that it
>
>
>
> > causes it to hang long enough for other connected producers to abort
>
> their connections?
>
>
>
> >
>
>
>
> > [2013-12-17 00:00:25,107] INFO Reconnect due to socket error:
>
>
>
> >  (kafka.consumer.SimpleConsumer)
>
>
>
> > java.net.SocketTimeoutException
>
>
>
> > at
>
>
>
> > java.io.InterruptedIOException.<init>(InterruptedIOException.java:43)
>
>
>
> > at
>
>
>
> > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:184
>
>
>
> > ) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:98)
>
>
>
> > at
>
>
>
> > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:
>
>
>
> > 232) at kafka.utils.Utils$.read(Utils.scala:374)
>
>
>
> > at
>
>
>
> >
>
>
>
> > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferRecei
>
>
>
> > ve.scala:54) at
>
>
>
> > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>
>
>
> > at
>
>
>
> >
>
>
>
> > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBuffe
>
>
>
> > rReceive.scala:29) at
>
>
>
> > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>
>
>
> > at
>
>
>
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
>
>
>
> > at
>
>
>
> >
>
>
>
> > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$sendReques
>
>
>
> > t(SimpleConsumer.scala:71)
>
>
>
> > at
>
>
>
> >
>
>
>
> > kafka.consumer.SimpleConsumer$anonfun$fetch$1$anonfun$apply$mcV$sp$1.a
>
>
>
> > pply$mcV$sp(SimpleConsumer.scala:110)
>
>
>
> > at
>
>
>
> >
>
>
>
> > kafka.consumer.SimpleConsumer$anonfun$fetch$1$anonfun$apply$mcV$sp$1.a
>
>
>
> > pply(SimpleConsumer.scala:110)
>
>
>
> > at
>
>
>
> >
>
>
>
> > kafka.consumer.SimpleConsumer$anonfun$fetch$1$anonfun$apply$mcV$sp$1.a
>
>
>
> > pply(SimpleConsumer.scala:110) at
>
>
>
> > kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
>
>
> > at
>
>
>
> >
>
>
>
> > kafka.consumer.SimpleConsumer$anonfun$fetch$1.apply$mcV$sp(SimpleConsu
>
>
>
> > mer.scala:109)
>
>
>
> > at
>
>
>
> >
>
>
>
> > kafka.consumer.SimpleConsumer$anonfun$fetch$1.apply(SimpleConsumer.sca
>
>
>
> > la:109)
>
>
>
> > at
>
>
>
> >
>
>
>
> > kafka.consumer.SimpleConsumer$anonfun$fetch$1.apply(SimpleConsumer.sca
>
>
>
> > la:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
>
>
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
>
>
>
> > at
>
>
>
> >
>
>
>
> > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcher
>
>
>
> > Thread.scala:96)
>
>
>
> > at
>
>
>
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:
>
>
>
> > 88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
>
> >
>
>
>
> > ----------------------------------------------------------------------
>
>
>
> > -
>
>
>
> >
>
>
>
> > When you say it pauses, do you mean producing and consuming?  Can you
>
>
>
> > get metrics form before that is happening, during and after?
>
>
>
> >
>
>
>
> >
>
>
>
> >
>
>
>
> > Could be gc pauses ... are you using this
>
>
>
> > http://kafka.apache.org/documentation.html#java or defaults?
>
>
>
> >
>
>
>
> >
>
>
>
> >
>
>
>
> > /*******************************************
>
>
>
> >
>
>
>
> >  Joe Stein
>
>
>
> >
>
>
>
> >  Founder, Principal Consultant
>
>
>
> >
>
>
>
> >  Big Data Open Source Security LLC
>
>
>
> >
>
>
>
> >  http://www.stealth.ly
>
>
>
> >
>
>
>
> >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>
>
>
> >
>
>
>
> > ********************************************/
>
>
>
> >
>
>
>
> >
>
>
>
> >
>
>
>
> >
>
>
>
> >
>
>
>
> > On Tue, Dec 17, 2013 at 2:01 PM, Tom Amon <ta46...@gmail.com> wrote:
>
>
>
> >
>
>
>
> >
>
>
>
> >
>
>
>
> > > I'm on Kafka 0.8 final. Both brokers are up. The behavior is my
>
>
>
> >
>
>
>
> > > producer produces messages just fine, then it pauses for a few
>
>
>
> >
>
>
>
> > > seconds. Then it continues. The brokers are not stopping and starting.
>
>
>
> >
>
>
>
> > > The broker logs show that another producer/consumer has a connection
>
>
>
> >
>
>
>
> > > error at the same time my producer pauses. The exception at my
>
>
>
> >
>
>
>
> > > producer (that pauses) indicates that the connection was aborted,
>
>
>
> >
>
>
>
> > > which to my understanding usually indicates that the TCP connection
>
>
>
> >
>
>
>
> > > was closed underneath it due to the broker end not responding. It's
>
>
>
> > > as
>
>
>
> >
>
>
>
> > > if the socket error indicated in the logs below for "10.236.67.30"
>
>
>
> >
>
>
>
> > > (the other producer/consumer) causes the broker to hang long enough
>
>
>
> > > for
>
>
>
> > the TCP connection for my producer to timeout.
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > Thanks
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > --------------------------------------------------------------------
>
>
>
> > > --
>
>
>
> >
>
>
>
> > > ---
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > What version of kafka are you on?
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > It seems like your producers are not seeing your broker(s).  can you
>
>
>
> >
>
>
>
> > > confirm brokers are up?
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > On Mon, Dec 16, 2013 at 7:52 PM, Tom Amon <ta46...@gmail.com> wrote:
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > Hi All,
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > I have a situation where one producer/consumer is causing timeout
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > errors on the Kafka broker. The exception in the logs looks like
> this:
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > [2013-12-16 17:32:25,992] ERROR Closing socket for /10.236.67.30
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > because of error (kafka.network.Processor)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > java.io.IOException: Connection timed out
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at sun.nio.ch.FileDispatcher.read0(Native Method)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > > > sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at sun.nio.ch.IOUtil.read(IOUtil.java:175)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at kafka.utils.Utils$.read(Utils.scala:395)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferRec
>
>
>
> > > ei
>
>
>
> >
>
>
>
> > > ve.scala:54)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at kafka.network.Processor.read(SocketServer.scala:347)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at kafka.network.Processor.run(SocketServer.scala:245)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at java.lang.Thread.run(Thread.java:662)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > When this happens, _another separate_ producer hangs for about 2-5
>
>
>
> >
>
>
>
> > > seconds.
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > In the producer log I get this exception:
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > [2013-12-16 16:32:14,961] INFO Disconnecting from
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > qa-hermes004.phx.qa.com:9092 (kafka.producer.SyncProducer)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > [2013-12-16 16:32:14,982] WARN Failed to send producer request
>
>
>
> > > > with
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > correlation id 3290 to broker 1 with data for partitions [ktr3,4]
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > (kafka.producer.async.DefaultEventHandler)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > java.io.IOException: An established connection was aborted by the
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > software in your host machine.
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at sun.nio.ch.SocketDispatcher.writev0(Native Method)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > > > sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:49)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at sun.nio.ch.IOUtil.write(IOUtil.java:171)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > > sun.nio.ch.SocketChannelImpl.write0(SocketChannelImpl.java:377)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > > > sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:400)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > > > java.nio.channels.SocketChannel.write(SocketChannel.java:371)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > > kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.sc
>
>
>
> > > al
>
>
>
> >
>
>
>
> > > a:56)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > > kafka.network.Send$class.writeCompletely(Transmission.scala:75)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBuffe
>
>
>
> > > rS
>
>
>
> >
>
>
>
> > > end.scala:26)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > > > kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > > kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(Sync
>
>
>
> > > Pr
>
>
>
> >
>
>
>
> > > oducer.scala:71)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.
>
>
>
> > > ap
>
>
>
> >
>
>
>
> > > ply$mcV$sp(SyncProducer.scala:102)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.
>
>
>
> > > ap
>
>
>
> >
>
>
>
> > > ply(SyncProducer.scala:102)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.
>
>
>
> > > ap
>
>
>
> >
>
>
>
> > > ply(SyncProducer.scala:102)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.
>
>
>
> >
>
>
>
> > > scala:101)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.sca
>
>
>
> > > > la
>
>
>
> >
>
>
>
> > > > :101)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.sca
>
>
>
> > > > la
>
>
>
> >
>
>
>
> > > > :101)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> > > > kafka.producer.SyncProducer.send(SyncProducer.scala:100)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$Defaul
>
>
>
> > > tE
>
>
>
> >
>
>
>
> > > ventHandler$$send(DefaultEventHandler.scala:245)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerialized
>
>
>
> > > Da
>
>
>
> >
>
>
>
> > > ta$1.apply(DefaultEventHandler.scala:107)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerialized
>
>
>
> > > Da
>
>
>
> >
>
>
>
> > > ta$1.apply(DefaultEventHandler.scala:101)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sc
>
>
>
> > > al
>
>
>
> >
>
>
>
> > > a:80)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sc
>
>
>
> > > al
>
>
>
> >
>
>
>
> > > a:80)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > > > scala.collection.Iterator$class.foreach(Iterator.scala:631)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala
>
>
>
> > > > :1
>
>
>
> >
>
>
>
> > > > 61)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:
>
>
>
> >
>
>
>
> > > 194)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > > > scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(Defa
>
>
>
> > > ul
>
>
>
> >
>
>
>
> > > tEventHandler.scala:97)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.
>
>
>
> > > sc
>
>
>
> >
>
>
>
> > > ala:73)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at kafka.producer.Producer.send(Producer.scala:74)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at producer.MessageProducer.send(MessageProducer.java:33)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >         at producer.MessageProducer.main(MessageProducer.java:103)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > [2013-12-16 16:32:14,987] INFO Back off for 100 ms before retrying
>
>
>
> > send.
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > Remaining retries = 3 (kafka.producer.async.DefaultEventHandler)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > [2013-12-16 16:32:15,088] INFO Fetching metadata from broker
>
> id:0,host:
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > qa-hermes003.phx.qa.com,port:9092 with correlation id 3291 for 1
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > topic(s)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > Set(ktr3) (kafka.client.ClientUtils$)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > [2013-12-16 16:32:15,134] INFO Connected to
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > qa-hermes003.phx.qa.com:9092for producing
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > (kafka.producer.SyncProducer)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > [2013-12-16 16:32:15,185] INFO Disconnecting from
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > qa-hermes003.phx.qa.com:9092 (kafka.producer.SyncProducer)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > [2013-12-16 16:32:15,185] INFO Disconnecting from
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > qa-hermes003.phx.qa.com:9092 (kafka.producer.SyncProducer)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > [2013-12-16 16:32:15,232] INFO Connected to
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > qa-hermes004.phx.qa.com:9092for producing
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > (kafka.producer.SyncProducer)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > [2013-12-16 16:32:15,279] INFO Connected to
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > qa-hermes003.phx.qa.com:9092for producing
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > (kafka.producer.SyncProducer)
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > It's almost as if the error from the other producer/consumer is
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > causing the broker itself to hang, causing my producer to abort
>
>
>
> > > > the
>
>
>
> >
>
>
>
> > > connection.
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > > Has anyone else seen anything like this before? Any idea what's
>
>
>
> > > > going
>
>
>
> > on?
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>
>
>
> > > >
>
>
>
> >
>
>
>
> > >
>
>
>
> >
>

Reply via email to