This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f5739b7ddd6 [opt](scan) read scan ranges in the order of partitions (#33515) f5739b7ddd6 is described below commit f5739b7ddd6466873525fcb223b4e0de43b21733 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Mon Apr 15 15:53:55 2024 +0800 [opt](scan) read scan ranges in the order of partitions (#33515) Follow #33410. scan_ranges are already sorted by path(as well as partition path) in FE, so merge scan ranges in order, not round robin. In the insert statement, reading data in partition order can reduce the memory usage of BE and prevent the generation of smaller tables. --- be/src/pipeline/exec/file_scan_operator.cpp | 36 +++++++++++++++------- be/src/vec/exec/scan/new_file_scan_node.cpp | 36 +++++++++++++++------- .../doris/datasource/FederationBackendPolicy.java | 27 ++++++---------- .../doris/planner/FederationBackendPolicyTest.java | 19 ++++++++++-- 4 files changed, 77 insertions(+), 41 deletions(-) diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index ac193147dfb..f81781481df 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -73,20 +73,34 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, _scan_ranges = scan_ranges; } else { // There is no need for the number of scanners to exceed the number of threads in thread pool. - _scan_ranges.clear(); - auto range_iter = scan_ranges.begin(); - for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) { - _scan_ranges.push_back(*range_iter); + // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order. + // In the insert statement, reading data in partition order can reduce the memory usage of BE + // and prevent the generation of smaller tables. + _scan_ranges.resize(max_scanners); + int num_ranges = scan_ranges.size() / max_scanners; + int num_add_one = scan_ranges.size() - num_ranges * max_scanners; + int scan_index = 0; + int range_index = 0; + for (int i = 0; i < num_add_one; ++i) { + _scan_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } } - for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) { - if (i == max_scanners) { - i = 0; + for (int i = num_add_one; i < max_scanners; ++i) { + _scan_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges - 1; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } - auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges; - auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges; - ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } - _scan_ranges.shrink_to_fit(); LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); } if (scan_ranges.size() > 0 && diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index 2ce80f4463a..eed7cfaaec6 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -71,20 +71,34 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state, _scan_ranges = scan_ranges; } else { // There is no need for the number of scanners to exceed the number of threads in thread pool. - _scan_ranges.clear(); - auto range_iter = scan_ranges.begin(); - for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) { - _scan_ranges.push_back(*range_iter); + // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order. + // In the insert statement, reading data in partition order can reduce the memory usage of BE + // and prevent the generation of smaller tables. + _scan_ranges.resize(max_scanners); + int num_ranges = scan_ranges.size() / max_scanners; + int num_add_one = scan_ranges.size() - num_ranges * max_scanners; + int scan_index = 0; + int range_index = 0; + for (int i = 0; i < num_add_one; ++i) { + _scan_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } } - for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) { - if (i == max_scanners) { - i = 0; + for (int i = num_add_one; i < max_scanners; ++i) { + _scan_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + _scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges - 1; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } - auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges; - auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges; - ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); } - _scan_ranges.shrink_to_fit(); LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); } if (scan_ranges.size() > 0 && diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index e3e3405a1b1..7938fba4d28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -211,24 +211,18 @@ public class FederationBackendPolicy { this.enableSplitsRedistribution = enableSplitsRedistribution; } + /** + * Assign splits to each backend. Ensure that each backend receives a similar amount of data. + * In order to make sure backends utilize the os page cache as much as possible, and all backends read splits + * in the order of partitions(reading data in partition order can reduce the memory usage of backends), + * splits should be sorted by path. + * Fortunately, the process of obtaining splits ensures that the splits have been sorted according to the path. + * If the splits are unordered, it is strongly recommended to sort them before calling this function. + */ public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) throws UserException { - // Sorting splits is to ensure that the same query utilizes the os page cache as much as possible. - splits.sort((split1, split2) -> { - int pathComparison = split1.getPathString().compareTo(split2.getPathString()); - if (pathComparison != 0) { - return pathComparison; - } - - int startComparison = Long.compare(split1.getStart(), split2.getStart()); - if (startComparison != 0) { - return startComparison; - } - return Long.compare(split1.getLength(), split2.getLength()); - }); - ListMultimap<Backend, Split> assignment = ArrayListMultimap.create(); - List<Split> remainingSplits = null; + List<Split> remainingSplits; List<Backend> backends = new ArrayList<>(); for (List<Backend> backendList : backendMap.values()) { @@ -242,8 +236,7 @@ public class FederationBackendPolicy { // locality information if (Config.split_assigner_optimized_local_scheduling) { remainingSplits = new ArrayList<>(splits.size()); - for (int i = 0; i < splits.size(); ++i) { - Split split = splits.get(i); + for (Split split : splits) { if (split.isRemotelyAccessible() && (split.getHosts() != null && split.getHosts().length > 0)) { List<Backend> candidateNodes = selectExactNodes(backendMap, split.getHosts()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index c0307cbd6d1..82f46862674 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -288,6 +288,21 @@ public class FederationBackendPolicyTest { } + public static void sortSplits(List<Split> splits) { + splits.sort((split1, split2) -> { + int pathComparison = split1.getPathString().compareTo(split2.getPathString()); + if (pathComparison != 0) { + return pathComparison; + } + + int startComparison = Long.compare(split1.getStart(), split2.getStart()); + if (startComparison != 0) { + return startComparison; + } + return Long.compare(split1.getLength(), split2.getLength()); + }); + } + @Test public void testGenerateRandomly() throws UserException { SystemInfoService service = new SystemInfoService(); @@ -367,7 +382,7 @@ public class FederationBackendPolicyTest { List<Split> totalSplits = new ArrayList<>(); totalSplits.addAll(remoteSplits); totalSplits.addAll(localSplits); - Collections.shuffle(totalSplits); + sortSplits(totalSplits); Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(totalSplits); if (i == 0) { result = ArrayListMultimap.create(assignment); @@ -489,7 +504,7 @@ public class FederationBackendPolicyTest { List<Split> totalSplits = new ArrayList<>(); totalSplits.addAll(remoteSplits); totalSplits.addAll(localSplits); - Collections.shuffle(totalSplits); + sortSplits(totalSplits); Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(totalSplits); if (i == 0) { result = ArrayListMultimap.create(assignment); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org