showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r616347985



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String> 
allTopics,
         if (log.isDebugEnabled()) {
             log.debug("final assignment: " + assignment);
         }
-
+        
         return assignment;
     }
 
-    private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> 
partitionsPerTopic) {
-        SortedSet<TopicPartition> allPartitions =
-            new 
TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
-        for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) {
-            String topic = entry.getKey();
-            for (int i = 0; i < entry.getValue(); ++i) {
+    /**
+     * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+     * and sortedToBeRemovedPartitions. We use two pointers technique here:
+     *
+     * We loop the sortedPartition, and compare the ith element in sorted 
toBeRemovedPartitions(i start from 0):
+     *   - if not equal to the ith element, add to unassignedPartitions
+     *   - if equal to the the ith element, get next element from 
sortedToBeRemovedPartitions
+     *
+     * @param sortedPartitions: sorted all partitions
+     * @param sortedToBeRemovedPartitions: sorted partitions, all are included 
in the sortedPartitions
+     * @return the partitions don't assign to any current consumers
+     */
+    private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> 
sortedPartitions,
+                                                         List<TopicPartition> 
sortedToBeRemovedPartitions) {
+        List<TopicPartition> unassignedPartitions = new ArrayList<>(
+            sortedPartitions.size() - sortedToBeRemovedPartitions.size());
+
+        int index = 0;
+        boolean shouldAddDirectly = false;
+        int sizeToBeRemovedPartitions = sortedToBeRemovedPartitions.size();
+        TopicPartition nextPartition = sortedToBeRemovedPartitions.get(index);
+        for (TopicPartition topicPartition : sortedPartitions) {
+            if (shouldAddDirectly || !nextPartition.equals(topicPartition)) {
+                unassignedPartitions.add(topicPartition);
+            } else {
+                // equal case, don't add to unassignedPartitions, just get 
next partition
+                if (index < sizeToBeRemovedPartitions - 1) {
+                    nextPartition = sortedToBeRemovedPartitions.get(++index);
+                } else {
+                    // add the remaining directly since there is no more 
toBeRemovedPartitions
+                    shouldAddDirectly = true;
+                }
+            }
+        }
+        return unassignedPartitions;
+    }
+
+
+    private List<TopicPartition> getTopicPartitions(Map<String, Integer> 
partitionsPerTopic) {
+        List<TopicPartition> allPartitions = new ArrayList<>(
+            partitionsPerTopic.values().stream().reduce(0, Integer::sum));
+
+        List<String> allTopics = new ArrayList<>(partitionsPerTopic.keySet());
+        // sort all topics first, then we can have sorted all topic partitions 
by adding partitions starting from 0
+        Collections.sort(allTopics);

Review comment:
       adopt the technique of refactor 4 from 
https://github.com/apache/kafka/pull/10552#discussion_r615229332
   We used to maintain a SortedSet of the all topic partitions. It takes some 
time to build the set while adding the partitions. 
   
   Improve it by using ArrayList, and sorting all topics first(only 500 topics 
to sort, compared to the original 1 million partitions to sort), and then add 
the partitions by looping all sorted topics. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to