[ 
https://issues.apache.org/jira/browse/KAFKA-904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13656707#comment-13656707
 ] 

Jun Rao commented on KAFKA-904:
-------------------------------

This is already fixed in KAFKA-890.
                
> ClientUtils throws SocketTimeOut in client side at high load
> ------------------------------------------------------------
>
>                 Key: KAFKA-904
>                 URL: https://issues.apache.org/jira/browse/KAFKA-904
>             Project: Kafka
>          Issue Type: Improvement
>         Environment: Redhat linux with 3 physical nodes.
>            Reporter: Rajesh Balamohan
>
> When performance benchmarking under heavy load, we encountered few 
> "SocketTimeOut" exception in client side (producer pushing then message to 
> broker).
> Looking at the stacktrace, the issue stemmed from 
> core/src/main/scala/kafka/client/ClientUtils.scala
> We provide 3 brokers to the list. It appears that Kafka always hits the first 
> broker and if its down, it would not remove the id from the broker list. This 
> could cause delays in message processing and also bring down performance.
> Can we have different strategies like round-robin, weighted round robin etc 
> to decide on which broker to connect to?. Also, if a broker is down it should 
> be removed from the broker list.
>  /**
>    * Used by the producer to send a metadata request since it has access to 
> the ProducerConfig
>    * @param topics The topics for which the metadata needs to be fetched
>    * @param brokers The brokers in the cluster as configured on the producer 
> through broker.list
>    * @param producerConfig The producer's config
>    * @return topic metadata response
>    */
>   def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], 
> producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
>     var fetchMetaDataSucceeded: Boolean = false
>     var i: Int = 0
>     val topicMetadataRequest = new 
> TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, 
> producerConfig.clientId, topics.toSeq)
>     var topicMetadataResponse: TopicMetadataResponse = null
>     var t: Throwable = null
>     while(i < brokers.size && !fetchMetaDataSucceeded) {
>       val producer: SyncProducer = 
> ProducerPool.createSyncProducer(producerConfig, brokers(i))
>       info("Fetching metadata with correlation id %d for %d topic(s) 
> %s".format(correlationId, topics.size, topics))
>       try {
>         topicMetadataResponse = producer.send(topicMetadataRequest)
>         fetchMetaDataSucceeded = true
>       }
>       catch {
>         case e =>
>           warn("Fetching topic metadata with correlation id %d for topics 
> [%s] from broker [%s] failed"
>             .format(correlationId, topics, brokers(i).toString), e)
>           t = e
>       } finally {
>         i = i + 1
>         producer.close()
>       }
>     }
>     if(!fetchMetaDataSucceeded){
>       throw new KafkaException("fetching topic metadata for topics [%s] from 
> broker [%s] failed".format(topics, brokers), t)
>     } else {
>       debug("Successfully fetched metadata for %d topic(s) 
> %s".format(topics.size, topics))
>     }
>     return topicMetadataResponse
>   }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to