This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dae59ad7fa9 [bug](scan) Fix missing sync rowsets in cloud mode (#31756)
dae59ad7fa9 is described below

commit dae59ad7fa97fcb6865066ccec016f6c2fdfd945
Author: plat1ko <[email protected]>
AuthorDate: Mon Mar 4 23:36:21 2024 +0800

    [bug](scan) Fix missing sync rowsets in cloud mode (#31756)
---
 be/src/olap/parallel_scanner_builder.cpp           | 14 -------
 be/src/pipeline/exec/olap_scan_operator.cpp        | 45 ++++++++++++----------
 be/src/vec/exec/scan/vscan_node.cpp                |  4 +-
 .../cloud_p0/conf/regression-conf-custom.groovy    |  2 +-
 4 files changed, 28 insertions(+), 37 deletions(-)

diff --git a/be/src/olap/parallel_scanner_builder.cpp 
b/be/src/olap/parallel_scanner_builder.cpp
index e9797d1a40a..46568ef4178 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -17,9 +17,6 @@
 
 #include "parallel_scanner_builder.h"
 
-#include "cloud/cloud_meta_mgr.h"
-#include "cloud/cloud_tablet.h"
-#include "cloud/config.h"
 #include "olap/rowset/beta_rowset.h"
 #include "pipeline/exec/olap_scan_operator.h"
 #include "vec/exec/scan/new_olap_scanner.h"
@@ -44,17 +41,6 @@ Status 
ParallelScannerBuilder<ParentType>::_build_scanners_by_rowid(
         std::list<VScannerSPtr>& scanners) {
     DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
 
-    if (config::is_cloud_mode()) {
-        std::vector<std::function<Status()>> tasks;
-        tasks.reserve(_tablets.size());
-        for (auto&& [tablet, version] : _tablets) {
-            tasks.emplace_back([tablet, version]() {
-                return 
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
-            });
-        }
-        RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
-    }
-
     for (auto&& [tablet, version] : _tablets) {
         DCHECK(_all_rowsets.contains(tablet->tablet_id()));
         auto& rowsets = _all_rowsets[tablet->tablet_id()];
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index cc9270f0809..e33293c7215 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -264,34 +264,38 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     bool has_cpu_limit = state()->query_options().__isset.resource_limit &&
                          
state()->query_options().resource_limit.__isset.cpu_limit;
 
+    std::vector<TabletWithVersion> tablets;
+    tablets.reserve(_scan_ranges.size());
+    for (auto&& scan_range : _scan_ranges) {
+        // TODO(plat1ko): Get cloud tablet in parallel
+        auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
+        int64_t version = 0;
+        std::from_chars(scan_range->version.data(),
+                        scan_range->version.data() + 
scan_range->version.size(), version);
+        tablets.emplace_back(std::move(tablet), version);
+    }
+
+    if (config::is_cloud_mode()) {
+        std::vector<std::function<Status()>> tasks;
+        tasks.reserve(_scan_ranges.size());
+        for (auto&& [tablet, version] : tablets) {
+            tasks.emplace_back([tablet, version]() {
+                return 
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
+            });
+        }
+        RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
+    }
+
     if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
         p._push_down_agg_type == TPushAggOp::NONE) {
-        std::vector<TabletWithVersion> tablets;
         bool is_dup_mow_key = true;
-        for (auto&& scan_range : _scan_ranges) {
-            auto tablet = 
DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
+        for (auto&& [tablet, _] : tablets) {
             is_dup_mow_key =
                     tablet->keys_type() == DUP_KEYS || (tablet->keys_type() == 
UNIQUE_KEYS &&
                                                         
tablet->enable_unique_key_merge_on_write());
             if (!is_dup_mow_key) {
                 break;
             }
-
-            int64_t version = 0;
-            std::from_chars(scan_range->version.data(),
-                            scan_range->version.data() + 
scan_range->version.size(), version);
-            tablets.emplace_back(TabletWithVersion {std::move(tablet), 
version});
-        }
-
-        if (config::is_cloud_mode()) {
-            std::vector<std::function<Status()>> tasks;
-            tasks.reserve(tablets.size());
-            for (auto&& [tablet, version] : tablets) {
-                tasks.emplace_back([tablet, version]() {
-                    return 
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
-                });
-            }
-            RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
         }
 
         if (is_dup_mow_key) {
@@ -351,9 +355,10 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
                       });
         RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
         scanner->set_compound_filters(_compound_filters);
-        scanners->push_back(scanner);
+        scanners->push_back(std::move(scanner));
         return Status::OK();
     };
+
     for (auto& scan_range : _scan_ranges) {
         auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
         int64_t version = 0;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index d08fff4ba9b..cb9240014fd 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -1303,8 +1303,8 @@ Status VScanNode::_prepare_scanners(const int 
query_parallel_instance_num) {
     std::list<VScannerSPtr> scanners;
     RETURN_IF_ERROR(_init_scanners(&scanners));
     // Init scanner wrapper
-    for (auto it = scanners.begin(); it != scanners.end(); ++it) {
-        _scanners.emplace_back(std::make_shared<ScannerDelegate>(*it));
+    for (auto& scanner : scanners) {
+        _scanners.emplace_back(std::make_shared<ScannerDelegate>(scanner));
     }
     if (scanners.empty()) {
         _eos = true;
diff --git 
a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy 
b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
index ba7214cc43c..8e8e4d13784 100644
--- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
+++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
@@ -19,4 +19,4 @@ testGroups = "p0"
 testDirectories = 
"ddl_p0,database_p0,load,load_p0,query_p0,table_p0,account_p0,autobucket,bitmap_functions,bloom_filter_p0,cast_decimal_to_boolean,cast_double_to_decimal,compression_p0,connector_p0,correctness,correctness_p0,csv_header_p0,data_model_p0,database_p0,datatype_p0,delete_p0,demo_p0,empty_relation,export_p0,external_table_p0,fault_injection_p0,flink_connector_p0,insert_overwrite_p0,insert_p0,internal_schema_p0,javaudf_p0,job_p0,json_p0,jsonb_p0,meta_action_p0,metrics_p0,mtmv_
 [...]
 //exclude groups and exclude suites is more prior than include groups and 
include suites.
 excludeSuites = 
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_primary_key_partial_update_parallel"
-excludeDirectories = 
"workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster"
+excludeDirectories = 
"workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster,point_query_p0,nereids_rules_p0/mv"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to