Thanks Guozhang. Any ideas on what could be wrong on that machine? We set up 
multiple producers in the same way but only one has this issue.


On Friday, August 8, 2014 2:41 PM, Guozhang Wang <wangg...@gmail.com> wrote:
 


This might be due to some issue on that producer machine, the "producer queue 
full and message sent rate low" is likely to be the result of the frequent 
connection timeout, but not the cause of it.

Guozhang




On Fri, Aug 8, 2014 at 2:30 PM, S. Zhou <myx...@yahoo.com.invalid> wrote:

A Kafka producer frequently timeout when connecting to a remote Kafka cluster 
while producers on other machine (same data center) can connect to the Kafka 
cluster with no problem.  From the monitoring,  the ProductQueueSize is always 
full and message sent rate is low. We use Kafka 0.8. We set 
"batch.num.messages=10000" and "queue.buffering.max.ms=5000". 
> 
>Here is the error message:
>[2014-08-08 17:52:02,786] ProducerSendThread producer.SyncProducer ERROR 
>Producer connection to kafka-XXX.com:9092 unsuccessful
>java.net.ConnectException: Connection timed out
>        at sun.nio.ch.Net.connect(Native Method)
>        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
>        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>        at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>        at 
>kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>        at 
>kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>        at 
>kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>        at 
>kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
>        at kafka.utils.Utils$.swallow(Utils.scala:187)
>        at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>        at kafka.utils.Utils$.swallowError(Utils.scala:46)
>        at 
>kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
>        at 
>kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>        at 
>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>        at 
>kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>        at scala.collection.immutable.Stream.foreach(Stream.scala:548)
>        at 
>kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>        at 
>kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>


-- 

-- Guozhang

Reply via email to