[ https://issues.apache.org/jira/browse/KAFKA-904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13656707#comment-13656707 ]
Jun Rao commented on KAFKA-904: ------------------------------- This is already fixed in KAFKA-890. > ClientUtils throws SocketTimeOut in client side at high load > ------------------------------------------------------------ > > Key: KAFKA-904 > URL: https://issues.apache.org/jira/browse/KAFKA-904 > Project: Kafka > Issue Type: Improvement > Environment: Redhat linux with 3 physical nodes. > Reporter: Rajesh Balamohan > > When performance benchmarking under heavy load, we encountered few > "SocketTimeOut" exception in client side (producer pushing then message to > broker). > Looking at the stacktrace, the issue stemmed from > core/src/main/scala/kafka/client/ClientUtils.scala > We provide 3 brokers to the list. It appears that Kafka always hits the first > broker and if its down, it would not remove the id from the broker list. This > could cause delays in message processing and also bring down performance. > Can we have different strategies like round-robin, weighted round robin etc > to decide on which broker to connect to?. Also, if a broker is down it should > be removed from the broker list. > /** > * Used by the producer to send a metadata request since it has access to > the ProducerConfig > * @param topics The topics for which the metadata needs to be fetched > * @param brokers The brokers in the cluster as configured on the producer > through broker.list > * @param producerConfig The producer's config > * @return topic metadata response > */ > def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], > producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { > var fetchMetaDataSucceeded: Boolean = false > var i: Int = 0 > val topicMetadataRequest = new > TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, > producerConfig.clientId, topics.toSeq) > var topicMetadataResponse: TopicMetadataResponse = null > var t: Throwable = null > while(i < brokers.size && !fetchMetaDataSucceeded) { > val producer: SyncProducer = > ProducerPool.createSyncProducer(producerConfig, brokers(i)) > info("Fetching metadata with correlation id %d for %d topic(s) > %s".format(correlationId, topics.size, topics)) > try { > topicMetadataResponse = producer.send(topicMetadataRequest) > fetchMetaDataSucceeded = true > } > catch { > case e => > warn("Fetching topic metadata with correlation id %d for topics > [%s] from broker [%s] failed" > .format(correlationId, topics, brokers(i).toString), e) > t = e > } finally { > i = i + 1 > producer.close() > } > } > if(!fetchMetaDataSucceeded){ > throw new KafkaException("fetching topic metadata for topics [%s] from > broker [%s] failed".format(topics, brokers), t) > } else { > debug("Successfully fetched metadata for %d topic(s) > %s".format(topics.size, topics)) > } > return topicMetadataResponse > } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira