cmccabe commented on a change in pull request #10688: URL: https://github.com/apache/kafka/pull/10688#discussion_r632697059
########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -800,19 +800,44 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List<ApiMessageAndVers boolean uncleanOk = electionTypeIsUnclean(request.electionType()); List<ApiMessageAndVersion> records = new ArrayList<>(); ElectLeadersResponseData response = new ElectLeadersResponseData(); - for (TopicPartitions topic : request.topicPartitions()) { - ReplicaElectionResult topicResults = - new ReplicaElectionResult().setTopic(topic.topic()); - response.replicaElectionResults().add(topicResults); - for (int partitionId : topic.partitions()) { - ApiError error = electLeader(topic.topic(), partitionId, uncleanOk, records); - topicResults.partitionResult().add(new PartitionResult(). - setPartitionId(partitionId). - setErrorCode(error.error().code()). - setErrorMessage(error.message())); + if (request.topicPartitions() == null) { + // If topicPartitions is null, we try to elect a new leader for every partition. + // There are some obvious issues with this wire protocol. For example, what + // if we have too many partitions to fit the results in a single RPC? Or what + // if we generate too many records to fit in a single batch? This behavior Review comment: Thinking about this more, it doesn't seem necessary to do this atomically. We can just do it non-atomically. It's not a logically indivisible operation like creating a topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org