[ https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13658505#comment-13658505 ]
Jun Rao commented on KAFKA-901: ------------------------------- Thanks for the patch. Overall, this is a good patch. It adds the new functionality in a less intrusive way to the controller. Let's spend a bit more time on the wire protocol change since it may not be trivial to change it later. Some comments: 1. KafkaController: 1.1 It seems that everytime that we try to send a LeaderAndIsrRequest, we follow that with an UpdateMetadataRequest. Would it be simpler to consolidate the logic in ControllerBrokerRequestBatch.sendRequestsToBrokers() such that for every LeaderAndIsrRequest that we send, we also send an UpdateMetadataRequest (with partitions in LeaderAndIsrReqeust) to all live brokers? 1.2 ControllerContext: Not sure why we need allBrokers. Could we just return liveBrokersUnderlying as all brokers? 1.3 The commet under sendUpdateMetadataRequest is not accurate since it doesn't always just send to new brokers. 1.4 removeReplicaFromIsr(): 1.4.1 The logging in the following statement says ISR, but actually prints both leaderAndISr. debug("Removing replica %d from ISR %s for partition %s." 1.4.2 Two new statements are added to change partitionLeadershipInfo. Is that fixing an existing issue not related to metadata? 2. KafkaApis: Not sure why we need to maintain both aliveBrokers and allBrokers. It seems just storing aliveBrokers (with the broker info) is enough for the purpose of answering metadata request. 3. ControllerBrokerRequestBatch: If we just need to send live brokers in the updateMetadata request, there is no need to maintain aliveBrokers and allBrokers here. 4. PartitionStateInfo: 4.1 Now that we are sending allReplicas, there is no need to explicitly send replicationFactor. 4.2 We encode ISR as strings, but all replicas as ints. We should make them consistent. It's better to encode ISR as ints too. 5. UpdateMetadataRequest: Not sure why we need ackTimeoutMs. 6. BrokerPartitionInfo: Are the changes in getBrokerPartitionInfo() necessary? If partitionMetadata is empty, the caller in DefaultEventHandler already throws NoBrokersForPartitionException. 7. ConsumerFetcherManager.LeaderFinderThread: The code change here is just for logging. Would it be simpler to just log the metadata response in debug mode? If we want to see the exception type associated with the error coder, we can fix the toString() method in metadata response. 8. AdminUtils: 8.1 Could you explain why the test testGetTopicMetadata() is deleted? 8.2 To ensure that the metadata is propagated to all brokers, could we add a utility function waitUntilMetadataPropagated() that takes in a list of brokers, a list of topics and a timeout? We can reuse this function in all relevant tests. 9. AsyncProducerTest.testInvalidPartition(): Not sure about the change. If we hit any exception (including UnknowTopicOrPartitionException) during partitionAndCollate(), the event handler will retry. > Kafka server can become unavailable if clients send several metadata requests > ----------------------------------------------------------------------------- > > Key: KAFKA-901 > URL: https://issues.apache.org/jira/browse/KAFKA-901 > Project: Kafka > Issue Type: Bug > Components: replication > Affects Versions: 0.8 > Reporter: Neha Narkhede > Assignee: Neha Narkhede > Priority: Blocker > Attachments: kafka-901.patch, metadata-request-improvement.patch > > > Currently, if a broker is bounced without controlled shutdown and there are > several clients talking to the Kafka cluster, each of the clients realize the > unavailability of leaders for some partitions. This leads to several metadata > requests sent to the Kafka brokers. Since metadata requests are pretty slow, > all the I/O threads quickly become busy serving the metadata requests. This > leads to a full request queue, that stalls handling of finished responses > since the same network thread handles requests as well as responses. In this > situation, clients timeout on metadata requests and send more metadata > requests. This quickly makes the Kafka cluster unavailable. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira