This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5d99dffe6f1 [opt](scan) read scan ranges in the order of partitions
(#31630)
5d99dffe6f1 is described below
commit 5d99dffe6f1a3fcb107ce56181aeff96ef222def
Author: Ashin Gau <[email protected]>
AuthorDate: Sat Mar 2 01:02:48 2024 +0800
[opt](scan) read scan ranges in the order of partitions (#31630)
---
be/src/pipeline/exec/file_scan_operator.cpp | 55 +++++++++++++++++++++++------
be/src/vec/exec/scan/new_file_scan_node.cpp | 55 +++++++++++++++++++++++------
2 files changed, 88 insertions(+), 22 deletions(-)
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index ac193147dfb..9d48fce2552 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -73,20 +73,53 @@ void FileScanLocalState::set_scan_ranges(RuntimeState*
state,
_scan_ranges = scan_ranges;
} else {
// There is no need for the number of scanners to exceed the number of
threads in thread pool.
- _scan_ranges.clear();
- auto range_iter = scan_ranges.begin();
- for (int i = 0; i < max_scanners && range_iter != scan_ranges.end();
++i, ++range_iter) {
- _scan_ranges.push_back(*range_iter);
+ _scan_ranges.resize(max_scanners);
+ std::vector<TScanRangeParams>& scan_ranges_ =
+ const_cast<std::vector<TScanRangeParams>&>(scan_ranges);
+ auto& first_ranges =
scan_ranges_[0].scan_range.ext_scan_range.file_scan_range.ranges;
+ if (first_ranges[0].__isset.columns_from_path_keys &&
+ !first_ranges[0].columns_from_path_keys.empty()) {
+ int num_keys = first_ranges[0].columns_from_path_keys.size();
+ // In the insert statement, reading data in partition order can
reduce the memory usage of BE
+ // and prevent the generation of smaller tables.
+ std::sort(scan_ranges_.begin(), scan_ranges_.end(),
+ [&num_keys](TScanRangeParams r1, TScanRangeParams r2) {
+ auto& path1 =
r1.scan_range.ext_scan_range.file_scan_range.ranges[0]
+ .columns_from_path;
+ auto& path2 =
r2.scan_range.ext_scan_range.file_scan_range.ranges[0]
+ .columns_from_path;
+ for (int i = 0; i < num_keys; ++i) {
+ if (path1[i] < path2[i]) {
+ return true;
+ }
+ }
+ return false;
+ });
}
- for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
- if (i == max_scanners) {
- i = 0;
+ int num_ranges = scan_ranges.size() / max_scanners;
+ int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
+ int scan_index = 0;
+ int range_index = 0;
+ for (int i = 0; i < num_add_one; ++i) {
+ _scan_ranges[scan_index] = scan_ranges_[range_index++];
+ auto& ranges =
+
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+ for (int j = 0; j < num_ranges; j++) {
+ auto& merged_ranges = scan_ranges_[range_index++]
+
.scan_range.ext_scan_range.file_scan_range.ranges;
+ ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
+ }
+ }
+ for (int i = num_add_one; i < max_scanners; ++i) {
+ _scan_ranges[scan_index] = scan_ranges_[range_index++];
+ auto& ranges =
+
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+ for (int j = 0; j < num_ranges - 1; j++) {
+ auto& merged_ranges = scan_ranges_[range_index++]
+
.scan_range.ext_scan_range.file_scan_range.ranges;
+ ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
}
- auto& ranges =
_scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
- auto& merged_ranges =
range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
- ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
}
- _scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " <<
_scan_ranges.size();
}
if (scan_ranges.size() > 0 &&
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 2ce80f4463a..a0ae03a8647 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -71,20 +71,53 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
_scan_ranges = scan_ranges;
} else {
// There is no need for the number of scanners to exceed the number of
threads in thread pool.
- _scan_ranges.clear();
- auto range_iter = scan_ranges.begin();
- for (int i = 0; i < max_scanners && range_iter != scan_ranges.end();
++i, ++range_iter) {
- _scan_ranges.push_back(*range_iter);
+ _scan_ranges.resize(max_scanners);
+ std::vector<TScanRangeParams>& scan_ranges_ =
+ const_cast<std::vector<TScanRangeParams>&>(scan_ranges);
+ auto& first_ranges =
scan_ranges_[0].scan_range.ext_scan_range.file_scan_range.ranges;
+ if (first_ranges[0].__isset.columns_from_path_keys &&
+ !first_ranges[0].columns_from_path_keys.empty()) {
+ int num_keys = first_ranges[0].columns_from_path_keys.size();
+ // In the insert statement, reading data in partition order can
reduce the memory usage of BE
+ // and prevent the generation of smaller tables.
+ std::sort(scan_ranges_.begin(), scan_ranges_.end(),
+ [&num_keys](TScanRangeParams r1, TScanRangeParams r2) {
+ auto& path1 =
r1.scan_range.ext_scan_range.file_scan_range.ranges[0]
+ .columns_from_path;
+ auto& path2 =
r2.scan_range.ext_scan_range.file_scan_range.ranges[0]
+ .columns_from_path;
+ for (int i = 0; i < num_keys; ++i) {
+ if (path1[i] < path2[i]) {
+ return true;
+ }
+ }
+ return false;
+ });
}
- for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
- if (i == max_scanners) {
- i = 0;
+ int num_ranges = scan_ranges.size() / max_scanners;
+ int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
+ int scan_index = 0;
+ int range_index = 0;
+ for (int i = 0; i < num_add_one; ++i) {
+ _scan_ranges[scan_index] = scan_ranges_[range_index++];
+ auto& ranges =
+
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+ for (int j = 0; j < num_ranges; j++) {
+ auto& merged_ranges = scan_ranges_[range_index++]
+
.scan_range.ext_scan_range.file_scan_range.ranges;
+ ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
+ }
+ }
+ for (int i = num_add_one; i < max_scanners; ++i) {
+ _scan_ranges[scan_index] = scan_ranges_[range_index++];
+ auto& ranges =
+
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+ for (int j = 0; j < num_ranges - 1; j++) {
+ auto& merged_ranges = scan_ranges_[range_index++]
+
.scan_range.ext_scan_range.file_scan_range.ranges;
+ ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
}
- auto& ranges =
_scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
- auto& merged_ranges =
range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
- ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
}
- _scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " <<
_scan_ranges.size();
}
if (scan_ranges.size() > 0 &&
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]