Hi All,

I came across a problem, If we use broker IP which is not reachable or out
of network. Then it takes more than configured time(request.timeout.ms).
After checking the log got to know that it is trying to fetch topic
meta-data three times by changing correlation id.
Due to this even though i keep (request.timeout.ms=1000) It takes 3 sec to
throw Exception. I am using Kafka0.8.1.1 with patch
https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch


I have attached the log. Please check this and clarify why it is behaving
like this. Whether it is by design or have to set some other property also.



Regards
Madhukar
[2015-04-16 19:43:42,712] INFO Verifying properties 
(kafka.utils.VerifiableProperties)
[2015-04-16 19:43:42,741] INFO Property message.send.max.retries is overridden 
to 0 (kafka.utils.VerifiableProperties)
[2015-04-16 19:43:42,741] INFO Property metadata.broker.list is overridden to 
178.95.20.30:9092 (kafka.utils.VerifiableProperties)
[2015-04-16 19:43:42,741] INFO Property request.required.acks is overridden to 
-1 (kafka.utils.VerifiableProperties)
[2015-04-16 19:43:42,741] INFO Property request.timeout.ms is overridden to 
1000 (kafka.utils.VerifiableProperties)
[2015-04-16 19:43:42,742] INFO Property serializer.class is overridden to 
kafka.serializer.StringEncoder (kafka.utils.VerifiableProperties)
Sending... KeyedMessage(Test,1,1,Date: 2015/04/16 19:43:42, Message No1)
[2015-04-16 19:43:42,804] DEBUG Handling 1 events 
(kafka.producer.async.DefaultEventHandler)
[2015-04-16 19:43:42,843] TRACE Instantiating Scala Sync Producer with 
properties: {metadata.broker.list=178.95.20.30:9092, request.required.acks=-1, 
request.timeout.ms=1000, port=9092, message.send.max.retries=0, 
serializer.class=kafka.serializer.StringEncoder, host=178.95.20.30} 
(kafka.producer.SyncProducer)
[2015-04-16 19:43:42,844] INFO Fetching metadata from broker 
id:0,host:178.95.20.30,port:9092 with correlation id 0 for 1 topic(s) Set(Test) 
(kafka.client.ClientUtils$)
[2015-04-16 19:43:42,849] TRACE verifying sendbuffer of size 20 
(kafka.producer.SyncProducer)
[2015-04-16 19:43:42,850] TRACE START BlockingChannel#connect 
(kafka.network.BlockingChannel)
[2015-04-16 19:43:42,851] TRACE >STATE: Not connected.. 
(kafka.network.BlockingChannel)
[2015-04-16 19:43:42,856] TRACE >STATE Now connecting 
(kafka.network.BlockingChannel)
[2015-04-16 19:43:43,870] ERROR Producer connection to 178.95.20.30:9092 
unsuccessful (kafka.producer.SyncProducer)
java.net.SocketTimeoutException
        at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
        at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41)
[2015-04-16 19:43:43,878] WARN Fetching topic metadata with correlation id 0 
for topics [Set(Test)] from broker [id:0,host:178.95.20.30,port:9092] failed 
(kafka.client.ClientUtils$)
java.net.SocketTimeoutException
        at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
        at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41)
[2015-04-16 19:43:43,881] ERROR fetching topic metadata for topics [Set(Test)] 
from broker [ArrayBuffer(id:0,host:178.95.20.30,port:9092)] failed 
(kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(Test)] 
from broker [ArrayBuffer(id:0,host:178.95.20.30,port:9092)] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
        at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41)
Caused by: java.net.SocketTimeoutException
        at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        ... 9 more
[2015-04-16 19:43:43,884] DEBUG Getting broker partition info for topic Test 
(kafka.producer.BrokerPartitionInfo)
[2015-04-16 19:43:43,885] TRACE Instantiating Scala Sync Producer with 
properties: {metadata.broker.list=178.95.20.30:9092, request.required.acks=-1, 
request.timeout.ms=1000, port=9092, message.send.max.retries=0, 
serializer.class=kafka.serializer.StringEncoder, host=178.95.20.30} 
(kafka.producer.SyncProducer)
[2015-04-16 19:43:43,886] INFO Fetching metadata from broker 
id:0,host:178.95.20.30,port:9092 with correlation id 1 for 1 topic(s) Set(Test) 
(kafka.client.ClientUtils$)
[2015-04-16 19:43:43,886] TRACE verifying sendbuffer of size 20 
(kafka.producer.SyncProducer)
[2015-04-16 19:43:43,887] TRACE START BlockingChannel#connect 
(kafka.network.BlockingChannel)
[2015-04-16 19:43:43,887] TRACE >STATE: Not connected.. 
(kafka.network.BlockingChannel)
[2015-04-16 19:43:43,887] TRACE >STATE Now connecting 
(kafka.network.BlockingChannel)
[2015-04-16 19:43:44,889] ERROR Producer connection to 178.95.20.30:9092 
unsuccessful (kafka.producer.SyncProducer)
java.net.SocketTimeoutException
        at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
        at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41)
[2015-04-16 19:43:44,892] WARN Fetching topic metadata with correlation id 1 
for topics [Set(Test)] from broker [id:0,host:178.95.20.30,port:9092] failed 
(kafka.client.ClientUtils$)
java.net.SocketTimeoutException
        at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
        at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41)
[2015-04-16 19:43:44,899] ERROR Failed to collate messages by topic, partition 
due to: fetching topic metadata for topics [Set(Test)] from broker 
[ArrayBuffer(id:0,host:178.95.20.30,port:9092)] failed 
(kafka.producer.async.DefaultEventHandler)
[2015-04-16 19:43:44,900] INFO Back off for 100 ms before retrying send. 
Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
[2015-04-16 19:43:45,004] TRACE Instantiating Scala Sync Producer with 
properties: {metadata.broker.list=178.95.20.30:9092, request.required.acks=-1, 
request.timeout.ms=1000, port=9092, message.send.max.retries=0, 
serializer.class=kafka.serializer.StringEncoder, host=178.95.20.30} 
(kafka.producer.SyncProducer)
[2015-04-16 19:43:45,004] INFO Fetching metadata from broker 
id:0,host:178.95.20.30,port:9092 with correlation id 2 for 1 topic(s) Set(Test) 
(kafka.client.ClientUtils$)
[2015-04-16 19:43:45,005] TRACE verifying sendbuffer of size 20 
(kafka.producer.SyncProducer)
[2015-04-16 19:43:45,005] TRACE START BlockingChannel#connect 
(kafka.network.BlockingChannel)
[2015-04-16 19:43:45,005] TRACE >STATE: Not connected.. 
(kafka.network.BlockingChannel)
[2015-04-16 19:43:45,005] TRACE >STATE Now connecting 
(kafka.network.BlockingChannel)
[2015-04-16 19:43:46,007] ERROR Producer connection to 178.95.20.30:9092 
unsuccessful (kafka.producer.SyncProducer)
java.net.SocketTimeoutException
        at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
        at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41)
[2015-04-16 19:43:46,010] WARN Fetching topic metadata with correlation id 2 
for topics [Set(Test)] from broker [id:0,host:178.95.20.30,port:9092] failed 
(kafka.client.ClientUtils$)
java.net.SocketTimeoutException
        at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
        at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41)
[2015-04-16 19:43:46,013] ERROR fetching topic metadata for topics [Set(Test)] 
from broker [ArrayBuffer(id:0,host:178.95.20.30,port:9092)] failed 
(kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(Test)] 
from broker [ArrayBuffer(id:0,host:178.95.20.30,port:9092)] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
        at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41)
Caused by: java.net.SocketTimeoutException
        at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        ... 9 more
[2015-04-16 19:43:46,017] ERROR Failed to send requests for topics Test with 
correlation ids in [0,2] (kafka.producer.async.DefaultEventHandler)
Error while sending message
kafka.common.FailedToSendMessageException: Failed to send messages after 0 
tries.Time Passed:3 seconds, 
3776 ms.Done!!

        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.Producer.send(Producer.scala:76)
        at kafka.javaapi.producer.Producer.send(Producer.scala:33)
        at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41)

Reply via email to