Rajesh Balamohan created KAFKA-904: -------------------------------------- Summary: ClientUtils throws SocketTimeOut in client side at high load Key: KAFKA-904 URL: https://issues.apache.org/jira/browse/KAFKA-904 Project: Kafka Issue Type: Improvement Environment: Redhat linux with 3 physical nodes. Reporter: Rajesh Balamohan
When performance benchmarking under heavy load, we encountered few "SocketTimeOut" exception in client side (producer pushing then message to broker). Looking at the stacktrace, the issue stemmed from core/src/main/scala/kafka/client/ClientUtils.scala We provide 3 brokers to the list. It appears that Kafka always hits the first broker and if its down, it would not remove the id from the broker list. This could cause delays in message processing and also bring down performance. Can we have different strategies like round-robin, weighted round robin etc to decide on which broker to connect to?. Also, if a broker is down it should be removed from the broker list. /** * Used by the producer to send a metadata request since it has access to the ProducerConfig * @param topics The topics for which the metadata needs to be fetched * @param brokers The brokers in the cluster as configured on the producer through broker.list * @param producerConfig The producer's config * @return topic metadata response */ def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) var topicMetadataResponse: TopicMetadataResponse = null var t: Throwable = null while(i < brokers.size && !fetchMetaDataSucceeded) { val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i)) info("Fetching metadata with correlation id %d for %d topic(s) %s".format(correlationId, topics.size, topics)) try { topicMetadataResponse = producer.send(topicMetadataRequest) fetchMetaDataSucceeded = true } catch { case e => warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed" .format(correlationId, topics, brokers(i).toString), e) t = e } finally { i = i + 1 producer.close() } } if(!fetchMetaDataSucceeded){ throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t) } else { debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) } return topicMetadataResponse } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira