showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r616349554



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         if (log.isDebugEnabled()) {
             log.debug("final assignment: " + assignment);
         }
-
+        
         return assignment;
     }
 
-    private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> 
partitionsPerTopic) {
-        SortedSet<TopicPartition> allPartitions =
-            new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
-            String topic = entry.getKey();
-            for (int i = 0; i < entry.getValue(); ++i) {
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and sortedToBeRemovedPartitions. We use two pointers technique here:
+     *
+     * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+     *
+     * @param sortedPartitions: sorted all partitions
+     * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedPartitions,

Review comment:
       adopt the refactor 2 from 
https://github.com/apache/kafka/pull/10552#discussion_r615228600.
   
   We used to have an SortedSet of unassignedPartitions, with all partitions 
(ex: 1 million partitions), and loop through current assignment, to remove 
already assigned partitions, ex: 999,000 of them, so we'll only have 1000 
partitions left. However, SortedSet element removing need some time because it 
needs to find element first, and then, do some tree node movement to maintain 
balanced. This situation should happen a lot since each rebalance, we should 
only have small set of changes (ex: 1 consumer dropped), so this is an 
important improvement.
   
   To refactor it, I used two pointer technique to loop through 2 sorted list: 
sortedPartitions and sortedToBeRemovedPartitions. And only add the difference 
set of the 2 lists. The looping and element adding is very fast in ArrayList. 
So, it improves a lot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to