Which ack are you using in the producer? On the broker, we have a jmx bean
under RequestChannel that measures the total time of serving each producer
request and the breakdown (queue time, local time, remote time, etc). Take
a look at those metrics and see if the total time exceeds 30 secs. If so,
find out which part takes the most amount of time.

Also, are you using the latest code in the 0.8 branch?

Thanks,

Jun


On Wed, Mar 20, 2013 at 11:06 AM, Bob Jervis <bjer...@gmail.com> wrote:

> We are seeing some odd socket timeouts from one of our producers.  This
> producer fans out data from one topic into dozens or hundreds of potential
> output topics.  We batch the send's to write 1,000 messages at a time.
>
> The odd thing is that the timeouts are happening in the socket read, so I
> assume that the socket.timeout.ms value applies, which we leave as the
> default of 30 seconds.  The odd thing is that these exceptions are getting
> thrown in clusters of 3-5 at a time with just a few seconds or less in
> between each.  We are running with 64 network threads in our brokers, which
> seems plenty given that the broker has only 8 cores.  From the clustering
> of timeouts, it looks perhaps like we are issuing multiple metadata
> requests in parallel.  Is that true?
>
> We haven't touched the io threads (still set at 2), but I'm wondering if
> these are just artifacts of congestion in the communication between the
> brokers and our clients.  Are we using too many distinct topics (~95) and
> should we try to cut down on them as a way to smooth the message exchanges
> between broker and client?  I think that we are expecting the number of
> topics in production to be much higher than these values.
>
> It does appear that the producer in this case is able to continue sending,
> but these exceptions in the logs make our testers unhappy.
>
> I won't include the very lengthy log messages in toto, but the stack traces
> look like:
>
> java.net.SocketTimeoutException
>         at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>         at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>         at kafka.utils.Utils$.read(Utils.scala:372)
>         at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>         at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:105)
>         at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:33)
>         at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:75)
>         at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:66)
>         at kafka.utils.Utils$.swallow(Utils.scala:164)
>         at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>         at kafka.utils.Utils$.swallowError(Utils.scala:43)
>         at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:66)
>         at kafka.producer.Producer.send(Producer.scala:76)
>         at kafka.javaapi.producer.Producer.send(Producer.scala:41)
>         at
>
> com.visibletechnologies.platform.common.kafka.KafkaWriter.flush(KafkaWriter.java:114)
>         at
>
> com.visibletechnologies.platform.common.kafka.KafkaWriter.checkFlush(KafkaWriter.java:92)
>         at
>
> com.visibletechnologies.platform.katta.krouter.KRouter.checkFlush(KRouter.java:182)
>         at
>
> com.visibletechnologies.platform.katta.krouter.KRouter.doWork(KRouter.java:139)
>         at
>
> com.visibletechnologies.framework.servicebase.ServiceBase.start(ServiceBase.java:187)
>         at
> com.visibletechnologies.platform.katta.krouter.Main.main(Main.java:132)
>

Reply via email to