Rajesh Balamohan created KAFKA-904:
--------------------------------------
Summary: ClientUtils throws SocketTimeOut in client side at high
load
Key: KAFKA-904
URL: https://issues.apache.org/jira/browse/KAFKA-904
Project: Kafka
Issue Type: Improvement
Environment: Redhat linux with 3 physical nodes.
Reporter: Rajesh Balamohan
When performance benchmarking under heavy load, we encountered few
"SocketTimeOut" exception in client side (producer pushing then message to
broker).
Looking at the stacktrace, the issue stemmed from
core/src/main/scala/kafka/client/ClientUtils.scala
We provide 3 brokers to the list. It appears that Kafka always hits the first
broker and if its down, it would not remove the id from the broker list. This
could cause delays in message processing and also bring down performance.
Can we have different strategies like round-robin, weighted round robin etc to
decide on which broker to connect to?. Also, if a broker is down it should be
removed from the broker list.
/**
* Used by the producer to send a metadata request since it has access to the
ProducerConfig
* @param topics The topics for which the metadata needs to be fetched
* @param brokers The brokers in the cluster as configured on the producer
through broker.list
* @param producerConfig The producer's config
* @return topic metadata response
*/
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker],
producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
val topicMetadataRequest = new
TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId,
producerConfig.clientId, topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null
while(i < brokers.size && !fetchMetaDataSucceeded) {
val producer: SyncProducer =
ProducerPool.createSyncProducer(producerConfig, brokers(i))
info("Fetching metadata with correlation id %d for %d topic(s)
%s".format(correlationId, topics.size, topics))
try {
topicMetadataResponse = producer.send(topicMetadataRequest)
fetchMetaDataSucceeded = true
}
catch {
case e =>
warn("Fetching topic metadata with correlation id %d for topics [%s]
from broker [%s] failed"
.format(correlationId, topics, brokers(i).toString), e)
t = e
} finally {
i = i + 1
producer.close()
}
}
if(!fetchMetaDataSucceeded){
throw new KafkaException("fetching topic metadata for topics [%s] from
broker [%s] failed".format(topics, brokers), t)
} else {
debug("Successfully fetched metadata for %d topic(s)
%s".format(topics.size, topics))
}
return topicMetadataResponse
}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira