This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c29582bd57 [pipeline](split by segment)support segment split by 
scanner (#17738)
c29582bd57 is described below

commit c29582bd57ba50d5a510df3d25e95bfb93cc2fbf
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Thu Mar 16 15:25:52 2023 +0800

    [pipeline](split by segment)support segment split by scanner (#17738)
    
    * support segment split by scanner
    
    * change code by cr
---
 be/src/common/status.h                             |   2 -
 be/src/olap/reader.h                               |   5 +
 be/src/olap/rowset/beta_rowset_reader.cpp          |  21 ++-
 be/src/olap/rowset/beta_rowset_reader.h            |   6 +-
 be/src/olap/rowset/rowset_reader.h                 |   6 +-
 be/src/olap/tablet_manager.cpp                     |  27 ++-
 be/src/olap/tablet_manager.h                       |   3 +
 be/src/vec/exec/scan/new_olap_scan_node.cpp        | 203 ++++++++++++++++-----
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  61 ++++---
 be/src/vec/exec/scan/new_olap_scanner.h            |   4 +-
 be/src/vec/exec/scan/pip_scanner_context.h         |   2 -
 be/src/vec/olap/block_reader.cpp                   |  11 +-
 be/src/vec/olap/vertical_block_reader.cpp          |   3 +-
 .../java/org/apache/doris/plugin/AuditEvent.java   |   2 +-
 14 files changed, 256 insertions(+), 100 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index 254c25e528..772eaa2230 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -404,8 +404,6 @@ public:
 
     bool ok() const { return _code == ErrorCode::OK; }
 
-    bool is_blocked_by_sc() const { return _code == 
ErrorCode::PIP_WAIT_FOR_SC; }
-
     bool is_io_error() const {
         return ErrorCode::IO_ERROR == _code || ErrorCode::READ_UNENOUGH == 
_code ||
                ErrorCode::CHECKSUM_ERROR == _code || 
ErrorCode::FILE_DATA_ERROR == _code ||
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 17f7e79286..813369fca9 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -101,6 +101,11 @@ public:
         DeleteBitmap* delete_bitmap {nullptr};
 
         std::vector<RowsetReaderSharedPtr> rs_readers;
+        // if rs_readers_segment_offsets is not empty, means we only scan
+        // [pair.first, pair.second) segment in rs_reader, only effective in 
dup key
+        // and pipeline
+        std::vector<std::pair<int, int>> rs_readers_segment_offsets;
+
         // return_columns is init from query schema
         std::vector<uint32_t> return_columns;
         // output_columns only contain columns in OrderByExprs and outputExprs
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 0f4410a45f..0a4e7ccdb4 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -42,6 +42,10 @@ void BetaRowsetReader::reset_read_options() {
     _read_options.key_ranges.clear();
 }
 
+RowsetReaderSharedPtr BetaRowsetReader::clone() {
+    return RowsetReaderSharedPtr(new BetaRowsetReader(_rowset));
+}
+
 bool BetaRowsetReader::update_profile(RuntimeProfile* profile) {
     if (_iterator != nullptr) {
         return _iterator->update_profile(profile);
@@ -51,6 +55,7 @@ bool BetaRowsetReader::update_profile(RuntimeProfile* 
profile) {
 
 Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* 
read_context,
                                                
std::vector<RowwiseIteratorUPtr>* out_iters,
+                                               const std::pair<int, int>& 
segment_offset,
                                                bool use_cache) {
     RETURN_NOT_OK(_rowset->load());
     _context = read_context;
@@ -174,7 +179,15 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
                                                            should_use_cache));
 
     // create iterator for each segment
-    for (auto& seg_ptr : _segment_cache_handle.get_segments()) {
+    auto& segments = _segment_cache_handle.get_segments();
+    auto [seg_start, seg_end] = segment_offset;
+    if (seg_start == seg_end) {
+        seg_start = 0;
+        seg_end = segments.size();
+    }
+
+    for (int i = seg_start; i < seg_end; i++) {
+        auto& seg_ptr = segments[i];
         std::unique_ptr<RowwiseIterator> iter;
         auto s = seg_ptr->new_iterator(*_input_schema, _read_options, &iter);
         if (!s.ok()) {
@@ -191,13 +204,15 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
         }
         out_iters->push_back(std::move(iter));
     }
+
     return Status::OK();
 }
 
-Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
+Status BetaRowsetReader::init(RowsetReaderContext* read_context,
+                              const std::pair<int, int>& segment_offset) {
     _context = read_context;
     std::vector<RowwiseIteratorUPtr> iterators;
-    RETURN_NOT_OK(get_segment_iterators(_context, &iterators));
+    RETURN_NOT_OK(get_segment_iterators(_context, &iterators, segment_offset));
 
     // merge or union segment iterator
     if (read_context->need_ordered_result && 
_rowset->rowset_meta()->is_segments_overlapping()) {
diff --git a/be/src/olap/rowset/beta_rowset_reader.h 
b/be/src/olap/rowset/beta_rowset_reader.h
index eba2c2885c..45321c6302 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -31,10 +31,12 @@ public:
 
     ~BetaRowsetReader() override { _rowset->release(); }
 
-    Status init(RowsetReaderContext* read_context) override;
+    Status init(RowsetReaderContext* read_context,
+                const std::pair<int, int>& segment_offset) override;
 
     Status get_segment_iterators(RowsetReaderContext* read_context,
                                  std::vector<RowwiseIteratorUPtr>* out_iters,
+                                 const std::pair<int, int>& segment_offset,
                                  bool use_cache = false) override;
     void reset_read_options() override;
     Status next_block(vectorized::Block* block) override;
@@ -66,6 +68,8 @@ public:
 
     bool update_profile(RuntimeProfile* profile) override;
 
+    RowsetReaderSharedPtr clone() override;
+
 private:
     bool _should_push_down_value_predicates() const;
 
diff --git a/be/src/olap/rowset/rowset_reader.h 
b/be/src/olap/rowset/rowset_reader.h
index 795f841b0a..3d6b2c6fad 100644
--- a/be/src/olap/rowset/rowset_reader.h
+++ b/be/src/olap/rowset/rowset_reader.h
@@ -41,10 +41,12 @@ public:
     virtual ~RowsetReader() = default;
 
     // reader init
-    virtual Status init(RowsetReaderContext* read_context) = 0;
+    virtual Status init(RowsetReaderContext* read_context,
+                        const std::pair<int, int>& segment_offset = {0, 0}) = 
0;
 
     virtual Status get_segment_iterators(RowsetReaderContext* read_context,
                                          std::vector<RowwiseIteratorUPtr>* 
out_iters,
+                                         const std::pair<int, int>& 
segment_offset = {0, 0},
                                          bool use_cache = false) = 0;
     virtual void reset_read_options() = 0;
 
@@ -73,6 +75,8 @@ public:
     }
 
     virtual bool update_profile(RuntimeProfile* profile) = 0;
+
+    virtual RowsetReaderSharedPtr clone() = 0;
 };
 
 } // namespace doris
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index f7e1ca25a5..5bc7019fda 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -18,36 +18,23 @@
 #include "olap/tablet_manager.h"
 
 #include <gen_cpp/Types_types.h>
-#include <rapidjson/document.h>
 #include <re2/re2.h>
-#include <thrift/protocol/TDebugProtocol.h>
 
 #include <algorithm>
-#include <boost/algorithm/string.hpp>
-#include <boost/algorithm/string/classification.hpp>
-#include <boost/algorithm/string/split.hpp>
 #include <cstdint>
 #include <cstdio>
-#include <cstdlib>
 #include <filesystem>
 
-#include "common/compiler_util.h"
 #include "env/env.h"
-#include "env/env_util.h"
 #include "gutil/strings/strcat.h"
 #include "olap/base_compaction.h"
 #include "olap/cumulative_compaction.h"
 #include "olap/data_dir.h"
 #include "olap/olap_common.h"
 #include "olap/push_handler.h"
-#include "olap/reader.h"
-#include "olap/rowset/rowset_id_generator.h"
-#include "olap/schema_change.h"
 #include "olap/tablet.h"
 #include "olap/tablet_meta.h"
 #include "olap/tablet_meta_manager.h"
-#include "olap/utils.h"
-#include "rapidjson/document.h"
 #include "rapidjson/prettywriter.h"
 #include "rapidjson/stringbuffer.h"
 #include "runtime/thread_context.h"
@@ -56,7 +43,6 @@
 #include "util/file_utils.h"
 #include "util/histogram.h"
 #include "util/path_util.h"
-#include "util/pretty_printer.h"
 #include "util/scoped_cleanup.h"
 #include "util/time.h"
 #include "util/trace.h"
@@ -534,6 +520,19 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId 
tablet_id, bool include_dele
     return _get_tablet_unlocked(tablet_id, include_deleted, err);
 }
 
+std::pair<TabletSharedPtr, Status> 
TabletManager::get_tablet_and_status(TTabletId tablet_id,
+                                                                        bool 
include_deleted) {
+    std::string err;
+    auto tablet = get_tablet(tablet_id, include_deleted, &err);
+    if (tablet == nullptr) {
+        auto err_str = fmt::format("failed to get tablet: {}, reason: {}", 
tablet_id, err);
+        LOG(WARNING) << err_str;
+        return {tablet, Status::InternalError(err_str)};
+    }
+
+    return {tablet, Status::OK()};
+}
+
 TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, bool 
include_deleted,
                                                     string* err) {
     TabletSharedPtr tablet;
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 1b7e9cf89b..dca1b635ad 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -75,6 +75,9 @@ public:
     TabletSharedPtr get_tablet(TTabletId tablet_id, bool include_deleted = 
false,
                                std::string* err = nullptr);
 
+    std::pair<TabletSharedPtr, Status> get_tablet_and_status(TTabletId 
tablet_id,
+                                                             bool 
include_deleted = false);
+
     TabletSharedPtr get_tablet(TTabletId tablet_id, TabletUid tablet_uid,
                                bool include_deleted = false, std::string* err 
= nullptr);
 
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 55babf3066..51e13beb3c 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -17,6 +17,8 @@
 
 #include "vec/exec/scan/new_olap_scan_node.h"
 
+#include <charconv>
+
 #include "common/status.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
@@ -404,57 +406,172 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
     }
     int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
 
-    std::unordered_set<std::string> disk_set;
-    for (auto& scan_range : _scan_ranges) {
-        auto tablet_id = scan_range->tablet_id;
-        std::string err;
-        TabletSharedPtr tablet =
-                
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
-        if (tablet == nullptr) {
-            auto err_str = fmt::format("failed to get tablet: {}, reason: {}", 
tablet_id, err);
-            LOG(WARNING) << err_str;
-            return Status::InternalError(err_str);
-        }
+    bool is_duplicate_key = false;
+    int segment_count = 0;
+    std::vector<std::vector<RowsetReaderSharedPtr>> 
rowset_readers_vector(_scan_ranges.size());
+    std::vector<std::vector<int>> tablet_rs_seg_count(_scan_ranges.size());
+
+    // Split tablet segment by scanner, only use in pipeline in duplicate key
+    // 1. if tablet count lower than scanner thread num, count segment num of 
all tablet ready for scan
+    // TODO: some tablet may do not have segment, may need split segment all 
case
+    if (_shared_scan_opt && _scan_ranges.size() < 
config::doris_scanner_thread_pool_thread_num) {
+        for (int i = 0; i < _scan_ranges.size(); ++i) {
+            auto& scan_range = _scan_ranges[i];
+            auto tablet_id = scan_range->tablet_id;
+            auto [tablet, status] =
+                    
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id,
+                                                                               
        true);
+            RETURN_IF_ERROR(status);
+
+            is_duplicate_key = tablet->keys_type() == DUP_KEYS;
+            if (!is_duplicate_key) {
+                break;
+            }
 
-        std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = 
&cond_ranges;
-        int size_based_scanners_per_tablet = 1;
+            int64_t version = 0;
+            std::from_chars(scan_range->version.c_str(),
+                            scan_range->version.c_str() + 
scan_range->version.size(), version);
+
+            std::shared_lock rdlock(tablet->get_header_lock());
+            // acquire tablet rowset readers at the beginning of the scan node
+            // to prevent this case: when there are lots of olap scanners to 
run for example 10000
+            // the rowsets maybe compacted when the last olap scanner starts
+            Status acquire_reader_st =
+                    tablet->capture_rs_readers({0, version}, 
&rowset_readers_vector[i]);
+            if (!acquire_reader_st.ok()) {
+                LOG(WARNING) << "fail to init reader.res=" << 
acquire_reader_st;
+                std::stringstream ss;
+                ss << "failed to initialize storage reader. tablet=" << 
tablet->full_name()
+                   << ", res=" << acquire_reader_st
+                   << ", backend=" << BackendOptions::get_localhost();
+                return Status::InternalError(ss.str());
+            }
 
-        if (config::doris_scan_range_max_mb > 0) {
-            size_based_scanners_per_tablet = std::max(
-                    1, (int)(tablet->tablet_footprint() / 
(config::doris_scan_range_max_mb << 20)));
+            for (const auto& rowset_reader : rowset_readers_vector[i]) {
+                auto num_segments = rowset_reader->rowset()->num_segments();
+                tablet_rs_seg_count[i].emplace_back(num_segments);
+                segment_count += num_segments;
+            }
         }
+    }
 
-        int ranges_per_scanner =
-                std::max(1, (int)ranges->size() /
-                                    std::min(scanners_per_tablet, 
size_based_scanners_per_tablet));
-        int num_ranges = ranges->size();
-        for (int i = 0; i < num_ranges;) {
-            std::vector<doris::OlapScanRange*> scanner_ranges;
-            scanner_ranges.push_back((*ranges)[i].get());
-            ++i;
-            for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
-                            (*ranges)[i]->end_include == (*ranges)[i - 
1]->end_include;
-                 ++j, ++i) {
-                scanner_ranges.push_back((*ranges)[i].get());
+    std::unordered_set<std::string> disk_set;
+    auto build_new_scanner = [&](const TPaloScanRange& scan_range,
+                                 const std::vector<OlapScanRange*>& key_ranges,
+                                 const std::vector<RowsetReaderSharedPtr>& 
rs_readers,
+                                 const std::vector<std::pair<int, int>>& 
rs_reader_seg_offsets) {
+        NewOlapScanner* scanner = new NewOlapScanner(_state, this, 
_limit_per_scanner,
+                                                     
_olap_scan_node.is_preaggregation,
+                                                     _need_agg_finalize, 
_scanner_profile.get());
+
+        scanner->set_compound_filters(_compound_filters);
+        // add scanner to pool before doing prepare.
+        // so that scanner can be automatically deconstructed if prepare 
failed.
+        _scanner_pool.add(scanner);
+        RETURN_IF_ERROR(scanner->prepare(scan_range, key_ranges, 
_vconjunct_ctx_ptr.get(),
+                                         _olap_filters, _filter_predicates, 
_push_down_functions,
+                                         _common_vexpr_ctxs_pushdown.get(), 
rs_readers,
+                                         rs_reader_seg_offsets));
+        scanners->push_back((VScanner*)scanner);
+        disk_set.insert(scanner->scan_disk());
+        return Status::OK();
+    };
+    if (is_duplicate_key) {
+        // 2. Split by segment count, each scanner need scan avg segment count
+        auto avg_segment_count =
+                std::max(segment_count / 
config::doris_scanner_thread_pool_thread_num, 1);
+        for (int i = 0; i < _scan_ranges.size(); ++i) {
+            auto& scan_range = _scan_ranges[i];
+            std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = 
&cond_ranges;
+            int num_ranges = ranges->size();
+            std::vector<doris::OlapScanRange*> scanner_ranges(num_ranges);
+            for (int j = 0; j < num_ranges; ++j) {
+                scanner_ranges[j] = (*ranges)[j].get();
+            }
+
+            const auto& rs_seg_count = tablet_rs_seg_count[i];
+            int rs_seg_count_index = 0;
+            int rs_seg_start_scan = 0;
+            int scanner_seg_occupy = 0;
+            std::vector<RowsetReaderSharedPtr> rs_readers;
+            std::vector<std::pair<int, int>> rs_reader_seg_offsets;
+
+            while (rs_seg_count_index < rs_seg_count.size()) {
+                auto max_add_seg_nums = rs_seg_count[rs_seg_count_index] - 
rs_seg_start_scan;
+                
rs_readers.emplace_back(rowset_readers_vector[i][rs_seg_count_index]->clone());
+
+                if (scanner_seg_occupy + max_add_seg_nums > avg_segment_count) 
{
+                    auto need_add_seg_nums = avg_segment_count - 
scanner_seg_occupy;
+                    rs_reader_seg_offsets.emplace_back(
+                            rs_seg_start_scan,
+                            rs_seg_start_scan + need_add_seg_nums); // only 
scan need_add_seg_nums
+                    RETURN_IF_ERROR(build_new_scanner(*scan_range, 
scanner_ranges, rs_readers,
+                                                      rs_reader_seg_offsets));
+
+                    rs_seg_start_scan += need_add_seg_nums;
+                    scanner_seg_occupy = 0;
+                    rs_readers.clear();
+                    rs_reader_seg_offsets.clear();
+                } else if (scanner_seg_occupy + max_add_seg_nums == 
avg_segment_count) {
+                    rs_reader_seg_offsets.emplace_back(rs_seg_start_scan,
+                                                       
rs_seg_count[rs_seg_count_index]);
+                    RETURN_IF_ERROR(build_new_scanner(*scan_range, 
scanner_ranges, rs_readers,
+                                                      rs_reader_seg_offsets));
+
+                    rs_seg_start_scan = 0;
+                    scanner_seg_occupy = 0;
+                    rs_readers.clear();
+                    rs_reader_seg_offsets.clear();
+                    rs_seg_count_index++;
+                } else {
+                    rs_reader_seg_offsets.emplace_back(rs_seg_start_scan,
+                                                       
rs_seg_count[rs_seg_count_index]);
+
+                    rs_seg_start_scan = 0;
+                    scanner_seg_occupy += max_add_seg_nums;
+                    rs_seg_count_index++;
+                }
             }
 
-            NewOlapScanner* scanner = new NewOlapScanner(
-                    _state, this, _limit_per_scanner, 
_olap_scan_node.is_preaggregation,
-                    _need_agg_finalize, _scanner_profile.get());
-
-            scanner->set_compound_filters(_compound_filters);
-            // add scanner to pool before doing prepare.
-            // so that scanner can be automatically deconstructed if prepare 
failed.
-            _scanner_pool.add(scanner);
-            RETURN_IF_ERROR(scanner->prepare(
-                    *scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(), 
_olap_filters,
-                    _filter_predicates, _push_down_functions, 
_common_vexpr_ctxs_pushdown.get()));
-            scanners->push_back((VScanner*)scanner);
-            disk_set.insert(scanner->scan_disk());
+            // dispose some segment tail
+            if (!rs_readers.empty()) {
+                build_new_scanner(*scan_range, scanner_ranges, rs_readers, 
rs_reader_seg_offsets);
+            }
+        }
+    } else {
+        for (auto& scan_range : _scan_ranges) {
+            auto tablet_id = scan_range->tablet_id;
+            auto [tablet, status] =
+                    
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id,
+                                                                               
        true);
+            RETURN_IF_ERROR(status);
+
+            std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = 
&cond_ranges;
+            int size_based_scanners_per_tablet = 1;
+
+            if (config::doris_scan_range_max_mb > 0) {
+                size_based_scanners_per_tablet =
+                        std::max(1, (int)(tablet->tablet_footprint() /
+                                          (config::doris_scan_range_max_mb << 
20)));
+            }
+            int ranges_per_scanner =
+                    std::max(1, (int)ranges->size() / 
std::min(scanners_per_tablet,
+                                                               
size_based_scanners_per_tablet));
+            int num_ranges = ranges->size();
+            for (int i = 0; i < num_ranges;) {
+                std::vector<doris::OlapScanRange*> scanner_ranges;
+                scanner_ranges.push_back((*ranges)[i].get());
+                ++i;
+                for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
+                                (*ranges)[i]->end_include == (*ranges)[i - 
1]->end_include;
+                     ++j, ++i) {
+                    scanner_ranges.push_back((*ranges)[i].get());
+                }
+                RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, 
{}, {}));
+            }
         }
+        COUNTER_SET(_num_disks_accessed_counter, 
static_cast<int64_t>(disk_set.size()));
     }
-
-    COUNTER_SET(_num_disks_accessed_counter, 
static_cast<int64_t>(disk_set.size()));
     // telemetry::set_span_attribute(span, _num_disks_accessed_counter);
     // telemetry::set_span_attribute(span, _num_scanners);
 
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 9a503bd16d..cb41de43c9 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -52,7 +52,9 @@ Status NewOlapScanner::prepare(const TPaloScanRange& 
scan_range,
                                const std::vector<TCondition>& filters,
                                const FilterPredicates& filter_predicates,
                                const std::vector<FunctionFilter>& 
function_filters,
-                               VExprContext** common_vexpr_ctxs_pushdown) {
+                               VExprContext** common_vexpr_ctxs_pushdown,
+                               const std::vector<RowsetReaderSharedPtr>& 
rs_readers,
+                               const std::vector<std::pair<int, int>>& 
rs_reader_seg_offsets) {
     RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
     if (common_vexpr_ctxs_pushdown != nullptr) {
         // Copy common_vexpr_ctxs_pushdown from scan node to this scanner's 
_common_vexpr_ctxs_pushdown, just necessary.
@@ -70,14 +72,10 @@ Status NewOlapScanner::prepare(const TPaloScanRange& 
scan_range,
     TTabletId tablet_id = scan_range.tablet_id;
     _version = strtoul(scan_range.version.c_str(), nullptr, 10);
     {
-        std::string err;
-        _tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
-        if (_tablet.get() == nullptr) {
-            std::stringstream ss;
-            ss << "failed to get tablet. tablet_id=" << tablet_id << ", 
reason=" << err;
-            LOG(WARNING) << ss.str();
-            return Status::InternalError(ss.str());
-        }
+        auto [tablet, status] =
+                
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id, 
true);
+        RETURN_IF_ERROR(status);
+        _tablet = std::move(tablet);
         _tablet_schema->copy_from(*_tablet->tablet_schema());
 
         TOlapScanNode& olap_scan_node = 
((NewOlapScanNode*)_parent)->_olap_scan_node;
@@ -116,27 +114,32 @@ Status NewOlapScanner::prepare(const TPaloScanRange& 
scan_range,
 
         {
             std::shared_lock rdlock(_tablet->get_header_lock());
-            const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
-            if (rowset == nullptr) {
-                std::stringstream ss;
-                ss << "fail to get latest version of tablet: " << tablet_id;
-                LOG(WARNING) << ss.str();
-                return Status::InternalError(ss.str());
-            }
+            if (rs_readers.empty()) {
+                const RowsetSharedPtr rowset = 
_tablet->rowset_with_max_version();
+                if (rowset == nullptr) {
+                    std::stringstream ss;
+                    ss << "fail to get latest version of tablet: " << 
tablet_id;
+                    LOG(WARNING) << ss.str();
+                    return Status::InternalError(ss.str());
+                }
 
-            // acquire tablet rowset readers at the beginning of the scan node
-            // to prevent this case: when there are lots of olap scanners to 
run for example 10000
-            // the rowsets maybe compacted when the last olap scanner starts
-            Version rd_version(0, _version);
-            Status acquire_reader_st =
-                    _tablet->capture_rs_readers(rd_version, 
&_tablet_reader_params.rs_readers);
-            if (!acquire_reader_st.ok()) {
-                LOG(WARNING) << "fail to init reader.res=" << 
acquire_reader_st;
-                std::stringstream ss;
-                ss << "failed to initialize storage reader. tablet=" << 
_tablet->full_name()
-                   << ", res=" << acquire_reader_st
-                   << ", backend=" << BackendOptions::get_localhost();
-                return Status::InternalError(ss.str());
+                // acquire tablet rowset readers at the beginning of the scan 
node
+                // to prevent this case: when there are lots of olap scanners 
to run for example 10000
+                // the rowsets maybe compacted when the last olap scanner 
starts
+                Version rd_version(0, _version);
+                Status acquire_reader_st =
+                        _tablet->capture_rs_readers(rd_version, 
&_tablet_reader_params.rs_readers);
+                if (!acquire_reader_st.ok()) {
+                    LOG(WARNING) << "fail to init reader.res=" << 
acquire_reader_st;
+                    std::stringstream ss;
+                    ss << "failed to initialize storage reader. tablet=" << 
_tablet->full_name()
+                       << ", res=" << acquire_reader_st
+                       << ", backend=" << BackendOptions::get_localhost();
+                    return Status::InternalError(ss.str());
+                }
+            } else {
+                _tablet_reader_params.rs_readers = rs_readers;
+                _tablet_reader_params.rs_readers_segment_offsets = 
rs_reader_seg_offsets;
             }
 
             // Initialize tablet_reader_params
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index 83968e2535..2a04e021db 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -45,7 +45,9 @@ public:
                    VExprContext** vconjunct_ctx_ptr, const 
std::vector<TCondition>& filters,
                    const FilterPredicates& filter_predicates,
                    const std::vector<FunctionFilter>& function_filters,
-                   VExprContext** common_vexpr_ctxs_pushdown);
+                   VExprContext** common_vexpr_ctxs_pushdown,
+                   const std::vector<RowsetReaderSharedPtr>& rs_readers = {},
+                   const std::vector<std::pair<int, int>>& 
rs_reader_seg_offsets = {});
 
     const std::string& scan_disk() const { return _tablet->data_dir()->path(); 
}
 
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 0e418256c2..6a38617a9d 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -84,7 +84,6 @@ public:
 
     void set_max_queue_size(int max_queue_size) override {
         for (int i = 0; i < max_queue_size; ++i) {
-            _blocks_queue_empty.emplace_back(true);
             _queue_mutexs.emplace_back(new std::mutex);
             _blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
         }
@@ -92,7 +91,6 @@ public:
 
 private:
     int _next_queue_to_feed = 0;
-    std::vector<bool> _blocks_queue_empty;
     std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
     std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
 };
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index a30abe2650..a9dbcd6226 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -75,10 +75,17 @@ Status BlockReader::_init_collect_iter(const ReaderParams& 
read_params) {
 
     _reader_context.push_down_agg_type_opt = 
read_params.push_down_agg_type_opt;
     std::vector<RowsetReaderSharedPtr> valid_rs_readers;
-    for (auto& rs_reader : read_params.rs_readers) {
+    DCHECK(read_params.rs_readers_segment_offsets.empty() ||
+           read_params.rs_readers_segment_offsets.size() == 
read_params.rs_readers.size());
+
+    bool is_empty = read_params.rs_readers_segment_offsets.empty();
+    for (int i = 0; i < read_params.rs_readers.size(); ++i) {
+        auto& rs_reader = read_params.rs_readers[i];
         // _vcollect_iter.topn_next() will init rs_reader by itself
         if (!_vcollect_iter.use_topn_next()) {
-            RETURN_NOT_OK(rs_reader->init(&_reader_context));
+            RETURN_NOT_OK(rs_reader->init(
+                    &_reader_context,
+                    is_empty ? std::pair {0, 0} : 
read_params.rs_readers_segment_offsets[i]));
         }
         Status res = _vcollect_iter.add_child(rs_reader);
         if (!res.ok() && !res.is<END_OF_FILE>()) {
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index 7195ed3381..097ddb7513 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -53,7 +53,8 @@ Status VerticalBlockReader::_get_segment_iterators(const 
ReaderParams& read_para
         // segment iterator will be inited here
         // In vertical compaction, every group will load segment so we should 
cache
         // segment to avoid tot many s3 head request
-        RETURN_NOT_OK(rs_reader->get_segment_iterators(&_reader_context, 
segment_iters, true));
+        RETURN_NOT_OK(
+                rs_reader->get_segment_iterators(&_reader_context, 
segment_iters, {0, 0}, true));
         // if segments overlapping, all segment iterator should be inited in
         // heap merge iterator. If segments are none overlapping, only first 
segment of this
         // rowset will be inited and push to heap, other segment will be 
inited later when current
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 597d156ea0..71ffa474b0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -64,7 +64,7 @@ public class AuditEvent {
     public int errorCode = 0;
     @AuditField(value = "ErrorMessage")
     public String errorMessage = "";
-    @AuditField(value = "Time")
+    @AuditField(value = "Time(ms)")
     public long queryTime = -1;
     @AuditField(value = "ScanBytes")
     public long scanBytes = -1;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to