Hi All, I came across a problem, If we use broker IP which is not reachable or out of network. Then it takes more than configured time(request.timeout.ms). After checking the log got to know that it is trying to fetch topic meta-data three times by changing correlation id. Due to this even though i keep (request.timeout.ms=1000) It takes 3 sec to throw Exception. I am using Kafka0.8.1.1 with patch https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch
I have attached the log. Please check this and clarify why it is behaving like this. Whether it is by design or have to set some other property also. Regards Madhukar
[2015-04-16 19:43:42,712] INFO Verifying properties (kafka.utils.VerifiableProperties) [2015-04-16 19:43:42,741] INFO Property message.send.max.retries is overridden to 0 (kafka.utils.VerifiableProperties) [2015-04-16 19:43:42,741] INFO Property metadata.broker.list is overridden to 178.95.20.30:9092 (kafka.utils.VerifiableProperties) [2015-04-16 19:43:42,741] INFO Property request.required.acks is overridden to -1 (kafka.utils.VerifiableProperties) [2015-04-16 19:43:42,741] INFO Property request.timeout.ms is overridden to 1000 (kafka.utils.VerifiableProperties) [2015-04-16 19:43:42,742] INFO Property serializer.class is overridden to kafka.serializer.StringEncoder (kafka.utils.VerifiableProperties) Sending... KeyedMessage(Test,1,1,Date: 2015/04/16 19:43:42, Message No1) [2015-04-16 19:43:42,804] DEBUG Handling 1 events (kafka.producer.async.DefaultEventHandler) [2015-04-16 19:43:42,843] TRACE Instantiating Scala Sync Producer with properties: {metadata.broker.list=178.95.20.30:9092, request.required.acks=-1, request.timeout.ms=1000, port=9092, message.send.max.retries=0, serializer.class=kafka.serializer.StringEncoder, host=178.95.20.30} (kafka.producer.SyncProducer) [2015-04-16 19:43:42,844] INFO Fetching metadata from broker id:0,host:178.95.20.30,port:9092 with correlation id 0 for 1 topic(s) Set(Test) (kafka.client.ClientUtils$) [2015-04-16 19:43:42,849] TRACE verifying sendbuffer of size 20 (kafka.producer.SyncProducer) [2015-04-16 19:43:42,850] TRACE START BlockingChannel#connect (kafka.network.BlockingChannel) [2015-04-16 19:43:42,851] TRACE >STATE: Not connected.. (kafka.network.BlockingChannel) [2015-04-16 19:43:42,856] TRACE >STATE Now connecting (kafka.network.BlockingChannel) [2015-04-16 19:43:43,870] ERROR Producer connection to 178.95.20.30:9092 unsuccessful (kafka.producer.SyncProducer) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41) [2015-04-16 19:43:43,878] WARN Fetching topic metadata with correlation id 0 for topics [Set(Test)] from broker [id:0,host:178.95.20.30,port:9092] failed (kafka.client.ClientUtils$) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41) [2015-04-16 19:43:43,881] ERROR fetching topic metadata for topics [Set(Test)] from broker [ArrayBuffer(id:0,host:178.95.20.30,port:9092)] failed (kafka.utils.Utils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(Test)] from broker [ArrayBuffer(id:0,host:178.95.20.30,port:9092)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41) Caused by: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) ... 9 more [2015-04-16 19:43:43,884] DEBUG Getting broker partition info for topic Test (kafka.producer.BrokerPartitionInfo) [2015-04-16 19:43:43,885] TRACE Instantiating Scala Sync Producer with properties: {metadata.broker.list=178.95.20.30:9092, request.required.acks=-1, request.timeout.ms=1000, port=9092, message.send.max.retries=0, serializer.class=kafka.serializer.StringEncoder, host=178.95.20.30} (kafka.producer.SyncProducer) [2015-04-16 19:43:43,886] INFO Fetching metadata from broker id:0,host:178.95.20.30,port:9092 with correlation id 1 for 1 topic(s) Set(Test) (kafka.client.ClientUtils$) [2015-04-16 19:43:43,886] TRACE verifying sendbuffer of size 20 (kafka.producer.SyncProducer) [2015-04-16 19:43:43,887] TRACE START BlockingChannel#connect (kafka.network.BlockingChannel) [2015-04-16 19:43:43,887] TRACE >STATE: Not connected.. (kafka.network.BlockingChannel) [2015-04-16 19:43:43,887] TRACE >STATE Now connecting (kafka.network.BlockingChannel) [2015-04-16 19:43:44,889] ERROR Producer connection to 178.95.20.30:9092 unsuccessful (kafka.producer.SyncProducer) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41) [2015-04-16 19:43:44,892] WARN Fetching topic metadata with correlation id 1 for topics [Set(Test)] from broker [id:0,host:178.95.20.30,port:9092] failed (kafka.client.ClientUtils$) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41) [2015-04-16 19:43:44,899] ERROR Failed to collate messages by topic, partition due to: fetching topic metadata for topics [Set(Test)] from broker [ArrayBuffer(id:0,host:178.95.20.30,port:9092)] failed (kafka.producer.async.DefaultEventHandler) [2015-04-16 19:43:44,900] INFO Back off for 100 ms before retrying send. Remaining retries = 0 (kafka.producer.async.DefaultEventHandler) [2015-04-16 19:43:45,004] TRACE Instantiating Scala Sync Producer with properties: {metadata.broker.list=178.95.20.30:9092, request.required.acks=-1, request.timeout.ms=1000, port=9092, message.send.max.retries=0, serializer.class=kafka.serializer.StringEncoder, host=178.95.20.30} (kafka.producer.SyncProducer) [2015-04-16 19:43:45,004] INFO Fetching metadata from broker id:0,host:178.95.20.30,port:9092 with correlation id 2 for 1 topic(s) Set(Test) (kafka.client.ClientUtils$) [2015-04-16 19:43:45,005] TRACE verifying sendbuffer of size 20 (kafka.producer.SyncProducer) [2015-04-16 19:43:45,005] TRACE START BlockingChannel#connect (kafka.network.BlockingChannel) [2015-04-16 19:43:45,005] TRACE >STATE: Not connected.. (kafka.network.BlockingChannel) [2015-04-16 19:43:45,005] TRACE >STATE Now connecting (kafka.network.BlockingChannel) [2015-04-16 19:43:46,007] ERROR Producer connection to 178.95.20.30:9092 unsuccessful (kafka.producer.SyncProducer) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41) [2015-04-16 19:43:46,010] WARN Fetching topic metadata with correlation id 2 for topics [Set(Test)] from broker [id:0,host:178.95.20.30,port:9092] failed (kafka.client.ClientUtils$) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41) [2015-04-16 19:43:46,013] ERROR fetching topic metadata for topics [Set(Test)] from broker [ArrayBuffer(id:0,host:178.95.20.30,port:9092)] failed (kafka.utils.Utils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(Test)] from broker [ArrayBuffer(id:0,host:178.95.20.30,port:9092)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41) Caused by: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) ... 9 more [2015-04-16 19:43:46,017] ERROR Failed to send requests for topics Test with correlation ids in [0,2] (kafka.producer.async.DefaultEventHandler) Error while sending message kafka.common.FailedToSendMessageException: Failed to send messages after 0 tries.Time Passed:3 seconds, 3776 ms.Done!! at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41)