This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dae59ad7fa9 [bug](scan) Fix missing sync rowsets in cloud mode (#31756)
dae59ad7fa9 is described below
commit dae59ad7fa97fcb6865066ccec016f6c2fdfd945
Author: plat1ko <[email protected]>
AuthorDate: Mon Mar 4 23:36:21 2024 +0800
[bug](scan) Fix missing sync rowsets in cloud mode (#31756)
---
be/src/olap/parallel_scanner_builder.cpp | 14 -------
be/src/pipeline/exec/olap_scan_operator.cpp | 45 ++++++++++++----------
be/src/vec/exec/scan/vscan_node.cpp | 4 +-
.../cloud_p0/conf/regression-conf-custom.groovy | 2 +-
4 files changed, 28 insertions(+), 37 deletions(-)
diff --git a/be/src/olap/parallel_scanner_builder.cpp
b/be/src/olap/parallel_scanner_builder.cpp
index e9797d1a40a..46568ef4178 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -17,9 +17,6 @@
#include "parallel_scanner_builder.h"
-#include "cloud/cloud_meta_mgr.h"
-#include "cloud/cloud_tablet.h"
-#include "cloud/config.h"
#include "olap/rowset/beta_rowset.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "vec/exec/scan/new_olap_scanner.h"
@@ -44,17 +41,6 @@ Status
ParallelScannerBuilder<ParentType>::_build_scanners_by_rowid(
std::list<VScannerSPtr>& scanners) {
DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
- if (config::is_cloud_mode()) {
- std::vector<std::function<Status()>> tasks;
- tasks.reserve(_tablets.size());
- for (auto&& [tablet, version] : _tablets) {
- tasks.emplace_back([tablet, version]() {
- return
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
- });
- }
- RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
- }
-
for (auto&& [tablet, version] : _tablets) {
DCHECK(_all_rowsets.contains(tablet->tablet_id()));
auto& rowsets = _all_rowsets[tablet->tablet_id()];
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index cc9270f0809..e33293c7215 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -264,34 +264,38 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
bool has_cpu_limit = state()->query_options().__isset.resource_limit &&
state()->query_options().resource_limit.__isset.cpu_limit;
+ std::vector<TabletWithVersion> tablets;
+ tablets.reserve(_scan_ranges.size());
+ for (auto&& scan_range : _scan_ranges) {
+ // TODO(plat1ko): Get cloud tablet in parallel
+ auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
+ int64_t version = 0;
+ std::from_chars(scan_range->version.data(),
+ scan_range->version.data() +
scan_range->version.size(), version);
+ tablets.emplace_back(std::move(tablet), version);
+ }
+
+ if (config::is_cloud_mode()) {
+ std::vector<std::function<Status()>> tasks;
+ tasks.reserve(_scan_ranges.size());
+ for (auto&& [tablet, version] : tablets) {
+ tasks.emplace_back([tablet, version]() {
+ return
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
+ });
+ }
+ RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
+ }
+
if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
p._push_down_agg_type == TPushAggOp::NONE) {
- std::vector<TabletWithVersion> tablets;
bool is_dup_mow_key = true;
- for (auto&& scan_range : _scan_ranges) {
- auto tablet =
DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
+ for (auto&& [tablet, _] : tablets) {
is_dup_mow_key =
tablet->keys_type() == DUP_KEYS || (tablet->keys_type() ==
UNIQUE_KEYS &&
tablet->enable_unique_key_merge_on_write());
if (!is_dup_mow_key) {
break;
}
-
- int64_t version = 0;
- std::from_chars(scan_range->version.data(),
- scan_range->version.data() +
scan_range->version.size(), version);
- tablets.emplace_back(TabletWithVersion {std::move(tablet),
version});
- }
-
- if (config::is_cloud_mode()) {
- std::vector<std::function<Status()>> tasks;
- tasks.reserve(tablets.size());
- for (auto&& [tablet, version] : tablets) {
- tasks.emplace_back([tablet, version]() {
- return
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
- });
- }
- RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
}
if (is_dup_mow_key) {
@@ -351,9 +355,10 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
});
RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
scanner->set_compound_filters(_compound_filters);
- scanners->push_back(scanner);
+ scanners->push_back(std::move(scanner));
return Status::OK();
};
+
for (auto& scan_range : _scan_ranges) {
auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
int64_t version = 0;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index d08fff4ba9b..cb9240014fd 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -1303,8 +1303,8 @@ Status VScanNode::_prepare_scanners(const int
query_parallel_instance_num) {
std::list<VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
// Init scanner wrapper
- for (auto it = scanners.begin(); it != scanners.end(); ++it) {
- _scanners.emplace_back(std::make_shared<ScannerDelegate>(*it));
+ for (auto& scanner : scanners) {
+ _scanners.emplace_back(std::make_shared<ScannerDelegate>(scanner));
}
if (scanners.empty()) {
_eos = true;
diff --git
a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
index ba7214cc43c..8e8e4d13784 100644
--- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
+++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
@@ -19,4 +19,4 @@ testGroups = "p0"
testDirectories =
"ddl_p0,database_p0,load,load_p0,query_p0,table_p0,account_p0,autobucket,bitmap_functions,bloom_filter_p0,cast_decimal_to_boolean,cast_double_to_decimal,compression_p0,connector_p0,correctness,correctness_p0,csv_header_p0,data_model_p0,database_p0,datatype_p0,delete_p0,demo_p0,empty_relation,export_p0,external_table_p0,fault_injection_p0,flink_connector_p0,insert_overwrite_p0,insert_p0,internal_schema_p0,javaudf_p0,job_p0,json_p0,jsonb_p0,meta_action_p0,metrics_p0,mtmv_
[...]
//exclude groups and exclude suites is more prior than include groups and
include suites.
excludeSuites =
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_primary_key_partial_update_parallel"
-excludeDirectories =
"workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster"
+excludeDirectories =
"workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,point_query_p0,nereids_rules_p0/mv"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]