GitHub user cmccabe opened a pull request: https://github.com/apache/kafka/pull/4183
Kafka 5692 elect preferred You can merge this pull request into a Git repository by running: $ git pull https://github.com/cmccabe/kafka KAFKA-5692-elect-preferred Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4183.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4183 ---- commit f34bd4ba0e61a910e77ba0a6c9152974737412ca Author: Tom Bentley <tbent...@redhat.com> Date: 2017-09-06T14:39:24Z KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use AdminClient See also KIP-183. This implements the following algorithm: 1. AdminClient sends ElectPreferredLeadersRequest. 2. KafakApis receives ElectPreferredLeadersRequest and delegates to ReplicaManager.electPreferredLeaders() 3. ReplicaManager delegates to KafkaController.electPreferredLeaders() 4. KafkaController adds a PreferredReplicaLeaderElection to the EventManager, 5. ReplicaManager.electPreferredLeaders()'s callback uses the delayedElectPreferredReplicasPurgatory to wait for the results of the election to appear in the metadata cache. If there are no results because of errors, or because the preferred leaders are already leading the partitions then a response is returned immediately. In the EventManager work thread the preferred leader is elected as follows: 1. The EventManager runs PreferredReplicaLeaderElection.process() 2. process() calls KafkaController.onPreferredReplicaElectionWithResults() 3. KafkaController.onPreferredReplicaElectionWithResults() calls the PartitionStateMachine.handleStateChangesWithResults() to perform the election (asynchronously the PSM will send LeaderAndIsrRequest to the new and old leaders and UpdateMetadataRequest to all brokers) then invokes the callback. Note: the change in parameter type for CollectionUtils.groupDataByTopic(). This makes sense because the AdminClient APIs use Collection consistently, rather than List or Set. If binary compatiblity is a consideration the old version should be kept, delegating to the new version. I had to add PartitionStateMachine.handleStateChangesWithResults() in order to be able to process a set of state changes in the PartitionStateMachine *and get back individual results*. At the same time I noticed that all callers of existing handleStateChange() were destructuring a TopicAndPartition that they already had in order to call handleStateChange(), and that handleStateChange() immediately instantiated a new TopicAndPartition. Since TopicAndPartition is immutable this is pointless, so I refactored it. handleStateChange() also now returns any exception it caught, which is necessary for handleStateChangesWithResults() commit 74677b59bf925a834b4f84c5c5718aa4eeb38902 Author: Tom Bentley <tbent...@redhat.com> Date: 2017-09-20T11:57:19Z Style commit d3cc0f5c272573d688e614e08b66bfec9cf49fa7 Author: Tom Bentley <tbent...@redhat.com> Date: 2017-09-20T13:19:26Z WIP commit 89a3bf441cbf55fbc41be188b18a9404cbb8e6ee Author: Tom Bentley <tbent...@redhat.com> Date: 2017-09-22T14:30:58Z Properly detect and throw correct exception when leader unavailable commit 98fe16acccf9a42732ae159bc84abb308be7a4da Author: Tom Bentley <tbent...@redhat.com> Date: 2017-09-22T14:34:49Z Javadoc + formatting commit 756a5d558f8a03f2e8709bef8eddef702fcb26bf Author: Tom Bentley <tbent...@redhat.com> Date: 2017-09-22T14:36:10Z Formatting and DelayedElectionOperation commit c734c0eef9e77f9efe620d969158e9f8eb73bf9a Author: Tom Bentley <tbent...@redhat.com> Date: 2017-09-22T14:36:37Z Add a test TODO: Add Authorizer test commit 736383b7e6801270e33c6cb1aab0aa27f8f115a5 Author: Tom Bentley <tbent...@redhat.com> Date: 2017-09-27T15:18:44Z Add ELECT_PREFERRED_LEADERS to AuthorizerIntegrationTests commit f8c1d1ba0fe33ea9258a4b99a071a53d3c77028a Author: Tom Bentley <tbent...@redhat.com> Date: 2017-09-27T20:15:45Z Fix flaky test commit 64fcbfc1e3cbf5949431f8c7ca4011f19fa3ac3d Author: Tom Bentley <tbent...@redhat.com> Date: 2017-10-12T11:30:58Z Add errorCounts() method commit b4787c9a6d1dc8b0fbc7d78833cd19389cdc06cf Author: Tom Bentley <tbent...@redhat.com> Date: 2017-10-30T11:16:59Z Move exceptionalFuture() to KafkaFuture. commit 3a92833ed321e7e9d885497fcf286cdfaede27e7 Author: Tom Bentley <tbent...@redhat.com> Date: 2017-10-30T18:30:19Z WIP solution using thenCompose() commit 0a57af9172985c4c382087aa167bd4421b375d5d Author: Colin P. Mccabe <cmcc...@confluent.io> Date: 2017-11-06T22:06:32Z Rework futures ---- ---