Rajesh Balamohan created KAFKA-904:
--------------------------------------

             Summary: ClientUtils throws SocketTimeOut in client side at high 
load
                 Key: KAFKA-904
                 URL: https://issues.apache.org/jira/browse/KAFKA-904
             Project: Kafka
          Issue Type: Improvement
         Environment: Redhat linux with 3 physical nodes.
            Reporter: Rajesh Balamohan


When performance benchmarking under heavy load, we encountered few 
"SocketTimeOut" exception in client side (producer pushing then message to 
broker).

Looking at the stacktrace, the issue stemmed from 
core/src/main/scala/kafka/client/ClientUtils.scala

We provide 3 brokers to the list. It appears that Kafka always hits the first 
broker and if its down, it would not remove the id from the broker list. This 
could cause delays in message processing and also bring down performance.

Can we have different strategies like round-robin, weighted round robin etc to 
decide on which broker to connect to?. Also, if a broker is down it should be 
removed from the broker list.

 /**
   * Used by the producer to send a metadata request since it has access to the 
ProducerConfig
   * @param topics The topics for which the metadata needs to be fetched
   * @param brokers The brokers in the cluster as configured on the producer 
through broker.list
   * @param producerConfig The producer's config
   * @return topic metadata response
   */
  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], 
producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
    var fetchMetaDataSucceeded: Boolean = false
    var i: Int = 0
    val topicMetadataRequest = new 
TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, 
producerConfig.clientId, topics.toSeq)
    var topicMetadataResponse: TopicMetadataResponse = null
    var t: Throwable = null
    while(i < brokers.size && !fetchMetaDataSucceeded) {
      val producer: SyncProducer = 
ProducerPool.createSyncProducer(producerConfig, brokers(i))
      info("Fetching metadata with correlation id %d for %d topic(s) 
%s".format(correlationId, topics.size, topics))
      try {
        topicMetadataResponse = producer.send(topicMetadataRequest)
        fetchMetaDataSucceeded = true
      }
      catch {
        case e =>
          warn("Fetching topic metadata with correlation id %d for topics [%s] 
from broker [%s] failed"
            .format(correlationId, topics, brokers(i).toString), e)
          t = e
      } finally {
        i = i + 1
        producer.close()
      }
    }
    if(!fetchMetaDataSucceeded){
      throw new KafkaException("fetching topic metadata for topics [%s] from 
broker [%s] failed".format(topics, brokers), t)
    } else {
      debug("Successfully fetched metadata for %d topic(s) 
%s".format(topics.size, topics))
    }
    return topicMetadataResponse
  }



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to