This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f5739b7ddd6 [opt](scan) read scan ranges in the order of partitions 
(#33515)
f5739b7ddd6 is described below

commit f5739b7ddd6466873525fcb223b4e0de43b21733
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Mon Apr 15 15:53:55 2024 +0800

    [opt](scan) read scan ranges in the order of partitions (#33515)
    
    Follow #33410. scan_ranges are already sorted by path(as well as partition 
path) in FE, so merge scan ranges in order, not round robin.
    In the insert statement, reading data in partition order can reduce the 
memory usage of BE and prevent the generation of smaller tables.
---
 be/src/pipeline/exec/file_scan_operator.cpp        | 36 +++++++++++++++-------
 be/src/vec/exec/scan/new_file_scan_node.cpp        | 36 +++++++++++++++-------
 .../doris/datasource/FederationBackendPolicy.java  | 27 ++++++----------
 .../doris/planner/FederationBackendPolicyTest.java | 19 ++++++++++--
 4 files changed, 77 insertions(+), 41 deletions(-)

diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index ac193147dfb..f81781481df 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -73,20 +73,34 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* 
state,
         _scan_ranges = scan_ranges;
     } else {
         // There is no need for the number of scanners to exceed the number of 
threads in thread pool.
-        _scan_ranges.clear();
-        auto range_iter = scan_ranges.begin();
-        for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); 
++i, ++range_iter) {
-            _scan_ranges.push_back(*range_iter);
+        // scan_ranges is sorted by path(as well as partition path) in FE, so 
merge scan ranges in order.
+        // In the insert statement, reading data in partition order can reduce 
the memory usage of BE
+        // and prevent the generation of smaller tables.
+        _scan_ranges.resize(max_scanners);
+        int num_ranges = scan_ranges.size() / max_scanners;
+        int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
+        int scan_index = 0;
+        int range_index = 0;
+        for (int i = 0; i < num_add_one; ++i) {
+            _scan_ranges[scan_index] = scan_ranges[range_index++];
+            auto& ranges =
+                    
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+            for (int j = 0; j < num_ranges; j++) {
+                auto& merged_ranges =
+                        
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+                ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
+            }
         }
-        for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
-            if (i == max_scanners) {
-                i = 0;
+        for (int i = num_add_one; i < max_scanners; ++i) {
+            _scan_ranges[scan_index] = scan_ranges[range_index++];
+            auto& ranges =
+                    
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+            for (int j = 0; j < num_ranges - 1; j++) {
+                auto& merged_ranges =
+                        
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+                ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
             }
-            auto& ranges = 
_scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
-            auto& merged_ranges = 
range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
-            ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
         }
-        _scan_ranges.shrink_to_fit();
         LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << 
_scan_ranges.size();
     }
     if (scan_ranges.size() > 0 &&
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp 
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 2ce80f4463a..eed7cfaaec6 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -71,20 +71,34 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
         _scan_ranges = scan_ranges;
     } else {
         // There is no need for the number of scanners to exceed the number of 
threads in thread pool.
-        _scan_ranges.clear();
-        auto range_iter = scan_ranges.begin();
-        for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); 
++i, ++range_iter) {
-            _scan_ranges.push_back(*range_iter);
+        // scan_ranges is sorted by path(as well as partition path) in FE, so 
merge scan ranges in order.
+        // In the insert statement, reading data in partition order can reduce 
the memory usage of BE
+        // and prevent the generation of smaller tables.
+        _scan_ranges.resize(max_scanners);
+        int num_ranges = scan_ranges.size() / max_scanners;
+        int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
+        int scan_index = 0;
+        int range_index = 0;
+        for (int i = 0; i < num_add_one; ++i) {
+            _scan_ranges[scan_index] = scan_ranges[range_index++];
+            auto& ranges =
+                    
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+            for (int j = 0; j < num_ranges; j++) {
+                auto& merged_ranges =
+                        
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+                ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
+            }
         }
-        for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
-            if (i == max_scanners) {
-                i = 0;
+        for (int i = num_add_one; i < max_scanners; ++i) {
+            _scan_ranges[scan_index] = scan_ranges[range_index++];
+            auto& ranges =
+                    
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+            for (int j = 0; j < num_ranges - 1; j++) {
+                auto& merged_ranges =
+                        
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+                ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
             }
-            auto& ranges = 
_scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
-            auto& merged_ranges = 
range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
-            ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
         }
-        _scan_ranges.shrink_to_fit();
         LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << 
_scan_ranges.size();
     }
     if (scan_ranges.size() > 0 &&
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
index e3e3405a1b1..7938fba4d28 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
@@ -211,24 +211,18 @@ public class FederationBackendPolicy {
         this.enableSplitsRedistribution = enableSplitsRedistribution;
     }
 
+    /**
+     * Assign splits to each backend. Ensure that each backend receives a 
similar amount of data.
+     * In order to make sure backends utilize the os page cache as much as 
possible, and all backends read splits
+     * in the order of partitions(reading data in partition order can reduce 
the memory usage of backends),
+     * splits should be sorted by path.
+     * Fortunately, the process of obtaining splits ensures that the splits 
have been sorted according to the path.
+     * If the splits are unordered, it is strongly recommended to sort them 
before calling this function.
+     */
     public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> 
splits) throws UserException {
-        // Sorting splits is to ensure that the same query utilizes the os 
page cache as much as possible.
-        splits.sort((split1, split2) -> {
-            int pathComparison = 
split1.getPathString().compareTo(split2.getPathString());
-            if (pathComparison != 0) {
-                return pathComparison;
-            }
-
-            int startComparison = Long.compare(split1.getStart(), 
split2.getStart());
-            if (startComparison != 0) {
-                return startComparison;
-            }
-            return Long.compare(split1.getLength(), split2.getLength());
-        });
-
         ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();
 
-        List<Split> remainingSplits = null;
+        List<Split> remainingSplits;
 
         List<Backend> backends = new ArrayList<>();
         for (List<Backend> backendList : backendMap.values()) {
@@ -242,8 +236,7 @@ public class FederationBackendPolicy {
         // locality information
         if (Config.split_assigner_optimized_local_scheduling) {
             remainingSplits = new ArrayList<>(splits.size());
-            for (int i = 0; i < splits.size(); ++i) {
-                Split split = splits.get(i);
+            for (Split split : splits) {
                 if (split.isRemotelyAccessible() && (split.getHosts() != null 
&& split.getHosts().length > 0)) {
                     List<Backend> candidateNodes = 
selectExactNodes(backendMap, split.getHosts());
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index c0307cbd6d1..82f46862674 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -288,6 +288,21 @@ public class FederationBackendPolicyTest {
 
     }
 
+    public static void sortSplits(List<Split> splits) {
+        splits.sort((split1, split2) -> {
+            int pathComparison = 
split1.getPathString().compareTo(split2.getPathString());
+            if (pathComparison != 0) {
+                return pathComparison;
+            }
+
+            int startComparison = Long.compare(split1.getStart(), 
split2.getStart());
+            if (startComparison != 0) {
+                return startComparison;
+            }
+            return Long.compare(split1.getLength(), split2.getLength());
+        });
+    }
+
     @Test
     public void testGenerateRandomly() throws UserException {
         SystemInfoService service = new SystemInfoService();
@@ -367,7 +382,7 @@ public class FederationBackendPolicyTest {
             List<Split> totalSplits = new ArrayList<>();
             totalSplits.addAll(remoteSplits);
             totalSplits.addAll(localSplits);
-            Collections.shuffle(totalSplits);
+            sortSplits(totalSplits);
             Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(totalSplits);
             if (i == 0) {
                 result = ArrayListMultimap.create(assignment);
@@ -489,7 +504,7 @@ public class FederationBackendPolicyTest {
             List<Split> totalSplits = new ArrayList<>();
             totalSplits.addAll(remoteSplits);
             totalSplits.addAll(localSplits);
-            Collections.shuffle(totalSplits);
+            sortSplits(totalSplits);
             Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(totalSplits);
             if (i == 0) {
                 result = ArrayListMultimap.create(assignment);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to