[
https://issues.apache.org/jira/browse/KAFKA-904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13656707#comment-13656707
]
Jun Rao commented on KAFKA-904:
-------------------------------
This is already fixed in KAFKA-890.
> ClientUtils throws SocketTimeOut in client side at high load
> ------------------------------------------------------------
>
> Key: KAFKA-904
> URL: https://issues.apache.org/jira/browse/KAFKA-904
> Project: Kafka
> Issue Type: Improvement
> Environment: Redhat linux with 3 physical nodes.
> Reporter: Rajesh Balamohan
>
> When performance benchmarking under heavy load, we encountered few
> "SocketTimeOut" exception in client side (producer pushing then message to
> broker).
> Looking at the stacktrace, the issue stemmed from
> core/src/main/scala/kafka/client/ClientUtils.scala
> We provide 3 brokers to the list. It appears that Kafka always hits the first
> broker and if its down, it would not remove the id from the broker list. This
> could cause delays in message processing and also bring down performance.
> Can we have different strategies like round-robin, weighted round robin etc
> to decide on which broker to connect to?. Also, if a broker is down it should
> be removed from the broker list.
> /**
> * Used by the producer to send a metadata request since it has access to
> the ProducerConfig
> * @param topics The topics for which the metadata needs to be fetched
> * @param brokers The brokers in the cluster as configured on the producer
> through broker.list
> * @param producerConfig The producer's config
> * @return topic metadata response
> */
> def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker],
> producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
> var fetchMetaDataSucceeded: Boolean = false
> var i: Int = 0
> val topicMetadataRequest = new
> TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId,
> producerConfig.clientId, topics.toSeq)
> var topicMetadataResponse: TopicMetadataResponse = null
> var t: Throwable = null
> while(i < brokers.size && !fetchMetaDataSucceeded) {
> val producer: SyncProducer =
> ProducerPool.createSyncProducer(producerConfig, brokers(i))
> info("Fetching metadata with correlation id %d for %d topic(s)
> %s".format(correlationId, topics.size, topics))
> try {
> topicMetadataResponse = producer.send(topicMetadataRequest)
> fetchMetaDataSucceeded = true
> }
> catch {
> case e =>
> warn("Fetching topic metadata with correlation id %d for topics
> [%s] from broker [%s] failed"
> .format(correlationId, topics, brokers(i).toString), e)
> t = e
> } finally {
> i = i + 1
> producer.close()
> }
> }
> if(!fetchMetaDataSucceeded){
> throw new KafkaException("fetching topic metadata for topics [%s] from
> broker [%s] failed".format(topics, brokers), t)
> } else {
> debug("Successfully fetched metadata for %d topic(s)
> %s".format(topics.size, topics))
> }
> return topicMetadataResponse
> }
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira