This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c29582bd57 [pipeline](split by segment)support segment split by scanner (#17738) c29582bd57 is described below commit c29582bd57ba50d5a510df3d25e95bfb93cc2fbf Author: HappenLee <happen...@hotmail.com> AuthorDate: Thu Mar 16 15:25:52 2023 +0800 [pipeline](split by segment)support segment split by scanner (#17738) * support segment split by scanner * change code by cr --- be/src/common/status.h | 2 - be/src/olap/reader.h | 5 + be/src/olap/rowset/beta_rowset_reader.cpp | 21 ++- be/src/olap/rowset/beta_rowset_reader.h | 6 +- be/src/olap/rowset/rowset_reader.h | 6 +- be/src/olap/tablet_manager.cpp | 27 ++- be/src/olap/tablet_manager.h | 3 + be/src/vec/exec/scan/new_olap_scan_node.cpp | 203 ++++++++++++++++----- be/src/vec/exec/scan/new_olap_scanner.cpp | 61 ++++--- be/src/vec/exec/scan/new_olap_scanner.h | 4 +- be/src/vec/exec/scan/pip_scanner_context.h | 2 - be/src/vec/olap/block_reader.cpp | 11 +- be/src/vec/olap/vertical_block_reader.cpp | 3 +- .../java/org/apache/doris/plugin/AuditEvent.java | 2 +- 14 files changed, 256 insertions(+), 100 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index 254c25e528..772eaa2230 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -404,8 +404,6 @@ public: bool ok() const { return _code == ErrorCode::OK; } - bool is_blocked_by_sc() const { return _code == ErrorCode::PIP_WAIT_FOR_SC; } - bool is_io_error() const { return ErrorCode::IO_ERROR == _code || ErrorCode::READ_UNENOUGH == _code || ErrorCode::CHECKSUM_ERROR == _code || ErrorCode::FILE_DATA_ERROR == _code || diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 17f7e79286..813369fca9 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -101,6 +101,11 @@ public: DeleteBitmap* delete_bitmap {nullptr}; std::vector<RowsetReaderSharedPtr> rs_readers; + // if rs_readers_segment_offsets is not empty, means we only scan + // [pair.first, pair.second) segment in rs_reader, only effective in dup key + // and pipeline + std::vector<std::pair<int, int>> rs_readers_segment_offsets; + // return_columns is init from query schema std::vector<uint32_t> return_columns; // output_columns only contain columns in OrderByExprs and outputExprs diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 0f4410a45f..0a4e7ccdb4 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -42,6 +42,10 @@ void BetaRowsetReader::reset_read_options() { _read_options.key_ranges.clear(); } +RowsetReaderSharedPtr BetaRowsetReader::clone() { + return RowsetReaderSharedPtr(new BetaRowsetReader(_rowset)); +} + bool BetaRowsetReader::update_profile(RuntimeProfile* profile) { if (_iterator != nullptr) { return _iterator->update_profile(profile); @@ -51,6 +55,7 @@ bool BetaRowsetReader::update_profile(RuntimeProfile* profile) { Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context, std::vector<RowwiseIteratorUPtr>* out_iters, + const std::pair<int, int>& segment_offset, bool use_cache) { RETURN_NOT_OK(_rowset->load()); _context = read_context; @@ -174,7 +179,15 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context should_use_cache)); // create iterator for each segment - for (auto& seg_ptr : _segment_cache_handle.get_segments()) { + auto& segments = _segment_cache_handle.get_segments(); + auto [seg_start, seg_end] = segment_offset; + if (seg_start == seg_end) { + seg_start = 0; + seg_end = segments.size(); + } + + for (int i = seg_start; i < seg_end; i++) { + auto& seg_ptr = segments[i]; std::unique_ptr<RowwiseIterator> iter; auto s = seg_ptr->new_iterator(*_input_schema, _read_options, &iter); if (!s.ok()) { @@ -191,13 +204,15 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context } out_iters->push_back(std::move(iter)); } + return Status::OK(); } -Status BetaRowsetReader::init(RowsetReaderContext* read_context) { +Status BetaRowsetReader::init(RowsetReaderContext* read_context, + const std::pair<int, int>& segment_offset) { _context = read_context; std::vector<RowwiseIteratorUPtr> iterators; - RETURN_NOT_OK(get_segment_iterators(_context, &iterators)); + RETURN_NOT_OK(get_segment_iterators(_context, &iterators, segment_offset)); // merge or union segment iterator if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) { diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index eba2c2885c..45321c6302 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -31,10 +31,12 @@ public: ~BetaRowsetReader() override { _rowset->release(); } - Status init(RowsetReaderContext* read_context) override; + Status init(RowsetReaderContext* read_context, + const std::pair<int, int>& segment_offset) override; Status get_segment_iterators(RowsetReaderContext* read_context, std::vector<RowwiseIteratorUPtr>* out_iters, + const std::pair<int, int>& segment_offset, bool use_cache = false) override; void reset_read_options() override; Status next_block(vectorized::Block* block) override; @@ -66,6 +68,8 @@ public: bool update_profile(RuntimeProfile* profile) override; + RowsetReaderSharedPtr clone() override; + private: bool _should_push_down_value_predicates() const; diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index 795f841b0a..3d6b2c6fad 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -41,10 +41,12 @@ public: virtual ~RowsetReader() = default; // reader init - virtual Status init(RowsetReaderContext* read_context) = 0; + virtual Status init(RowsetReaderContext* read_context, + const std::pair<int, int>& segment_offset = {0, 0}) = 0; virtual Status get_segment_iterators(RowsetReaderContext* read_context, std::vector<RowwiseIteratorUPtr>* out_iters, + const std::pair<int, int>& segment_offset = {0, 0}, bool use_cache = false) = 0; virtual void reset_read_options() = 0; @@ -73,6 +75,8 @@ public: } virtual bool update_profile(RuntimeProfile* profile) = 0; + + virtual RowsetReaderSharedPtr clone() = 0; }; } // namespace doris diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index f7e1ca25a5..5bc7019fda 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -18,36 +18,23 @@ #include "olap/tablet_manager.h" #include <gen_cpp/Types_types.h> -#include <rapidjson/document.h> #include <re2/re2.h> -#include <thrift/protocol/TDebugProtocol.h> #include <algorithm> -#include <boost/algorithm/string.hpp> -#include <boost/algorithm/string/classification.hpp> -#include <boost/algorithm/string/split.hpp> #include <cstdint> #include <cstdio> -#include <cstdlib> #include <filesystem> -#include "common/compiler_util.h" #include "env/env.h" -#include "env/env_util.h" #include "gutil/strings/strcat.h" #include "olap/base_compaction.h" #include "olap/cumulative_compaction.h" #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/push_handler.h" -#include "olap/reader.h" -#include "olap/rowset/rowset_id_generator.h" -#include "olap/schema_change.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" -#include "olap/utils.h" -#include "rapidjson/document.h" #include "rapidjson/prettywriter.h" #include "rapidjson/stringbuffer.h" #include "runtime/thread_context.h" @@ -56,7 +43,6 @@ #include "util/file_utils.h" #include "util/histogram.h" #include "util/path_util.h" -#include "util/pretty_printer.h" #include "util/scoped_cleanup.h" #include "util/time.h" #include "util/trace.h" @@ -534,6 +520,19 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, bool include_dele return _get_tablet_unlocked(tablet_id, include_deleted, err); } +std::pair<TabletSharedPtr, Status> TabletManager::get_tablet_and_status(TTabletId tablet_id, + bool include_deleted) { + std::string err; + auto tablet = get_tablet(tablet_id, include_deleted, &err); + if (tablet == nullptr) { + auto err_str = fmt::format("failed to get tablet: {}, reason: {}", tablet_id, err); + LOG(WARNING) << err_str; + return {tablet, Status::InternalError(err_str)}; + } + + return {tablet, Status::OK()}; +} + TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, bool include_deleted, string* err) { TabletSharedPtr tablet; diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 1b7e9cf89b..dca1b635ad 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -75,6 +75,9 @@ public: TabletSharedPtr get_tablet(TTabletId tablet_id, bool include_deleted = false, std::string* err = nullptr); + std::pair<TabletSharedPtr, Status> get_tablet_and_status(TTabletId tablet_id, + bool include_deleted = false); + TabletSharedPtr get_tablet(TTabletId tablet_id, TabletUid tablet_uid, bool include_deleted = false, std::string* err = nullptr); diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 55babf3066..51e13beb3c 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -17,6 +17,8 @@ #include "vec/exec/scan/new_olap_scan_node.h" +#include <charconv> + #include "common/status.h" #include "olap/storage_engine.h" #include "olap/tablet.h" @@ -404,57 +406,172 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) { } int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); - std::unordered_set<std::string> disk_set; - for (auto& scan_range : _scan_ranges) { - auto tablet_id = scan_range->tablet_id; - std::string err; - TabletSharedPtr tablet = - StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); - if (tablet == nullptr) { - auto err_str = fmt::format("failed to get tablet: {}, reason: {}", tablet_id, err); - LOG(WARNING) << err_str; - return Status::InternalError(err_str); - } + bool is_duplicate_key = false; + int segment_count = 0; + std::vector<std::vector<RowsetReaderSharedPtr>> rowset_readers_vector(_scan_ranges.size()); + std::vector<std::vector<int>> tablet_rs_seg_count(_scan_ranges.size()); + + // Split tablet segment by scanner, only use in pipeline in duplicate key + // 1. if tablet count lower than scanner thread num, count segment num of all tablet ready for scan + // TODO: some tablet may do not have segment, may need split segment all case + if (_shared_scan_opt && _scan_ranges.size() < config::doris_scanner_thread_pool_thread_num) { + for (int i = 0; i < _scan_ranges.size(); ++i) { + auto& scan_range = _scan_ranges[i]; + auto tablet_id = scan_range->tablet_id; + auto [tablet, status] = + StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id, + true); + RETURN_IF_ERROR(status); + + is_duplicate_key = tablet->keys_type() == DUP_KEYS; + if (!is_duplicate_key) { + break; + } - std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &cond_ranges; - int size_based_scanners_per_tablet = 1; + int64_t version = 0; + std::from_chars(scan_range->version.c_str(), + scan_range->version.c_str() + scan_range->version.size(), version); + + std::shared_lock rdlock(tablet->get_header_lock()); + // acquire tablet rowset readers at the beginning of the scan node + // to prevent this case: when there are lots of olap scanners to run for example 10000 + // the rowsets maybe compacted when the last olap scanner starts + Status acquire_reader_st = + tablet->capture_rs_readers({0, version}, &rowset_readers_vector[i]); + if (!acquire_reader_st.ok()) { + LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; + std::stringstream ss; + ss << "failed to initialize storage reader. tablet=" << tablet->full_name() + << ", res=" << acquire_reader_st + << ", backend=" << BackendOptions::get_localhost(); + return Status::InternalError(ss.str()); + } - if (config::doris_scan_range_max_mb > 0) { - size_based_scanners_per_tablet = std::max( - 1, (int)(tablet->tablet_footprint() / (config::doris_scan_range_max_mb << 20))); + for (const auto& rowset_reader : rowset_readers_vector[i]) { + auto num_segments = rowset_reader->rowset()->num_segments(); + tablet_rs_seg_count[i].emplace_back(num_segments); + segment_count += num_segments; + } } + } - int ranges_per_scanner = - std::max(1, (int)ranges->size() / - std::min(scanners_per_tablet, size_based_scanners_per_tablet)); - int num_ranges = ranges->size(); - for (int i = 0; i < num_ranges;) { - std::vector<doris::OlapScanRange*> scanner_ranges; - scanner_ranges.push_back((*ranges)[i].get()); - ++i; - for (int j = 1; i < num_ranges && j < ranges_per_scanner && - (*ranges)[i]->end_include == (*ranges)[i - 1]->end_include; - ++j, ++i) { - scanner_ranges.push_back((*ranges)[i].get()); + std::unordered_set<std::string> disk_set; + auto build_new_scanner = [&](const TPaloScanRange& scan_range, + const std::vector<OlapScanRange*>& key_ranges, + const std::vector<RowsetReaderSharedPtr>& rs_readers, + const std::vector<std::pair<int, int>>& rs_reader_seg_offsets) { + NewOlapScanner* scanner = new NewOlapScanner(_state, this, _limit_per_scanner, + _olap_scan_node.is_preaggregation, + _need_agg_finalize, _scanner_profile.get()); + + scanner->set_compound_filters(_compound_filters); + // add scanner to pool before doing prepare. + // so that scanner can be automatically deconstructed if prepare failed. + _scanner_pool.add(scanner); + RETURN_IF_ERROR(scanner->prepare(scan_range, key_ranges, _vconjunct_ctx_ptr.get(), + _olap_filters, _filter_predicates, _push_down_functions, + _common_vexpr_ctxs_pushdown.get(), rs_readers, + rs_reader_seg_offsets)); + scanners->push_back((VScanner*)scanner); + disk_set.insert(scanner->scan_disk()); + return Status::OK(); + }; + if (is_duplicate_key) { + // 2. Split by segment count, each scanner need scan avg segment count + auto avg_segment_count = + std::max(segment_count / config::doris_scanner_thread_pool_thread_num, 1); + for (int i = 0; i < _scan_ranges.size(); ++i) { + auto& scan_range = _scan_ranges[i]; + std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &cond_ranges; + int num_ranges = ranges->size(); + std::vector<doris::OlapScanRange*> scanner_ranges(num_ranges); + for (int j = 0; j < num_ranges; ++j) { + scanner_ranges[j] = (*ranges)[j].get(); + } + + const auto& rs_seg_count = tablet_rs_seg_count[i]; + int rs_seg_count_index = 0; + int rs_seg_start_scan = 0; + int scanner_seg_occupy = 0; + std::vector<RowsetReaderSharedPtr> rs_readers; + std::vector<std::pair<int, int>> rs_reader_seg_offsets; + + while (rs_seg_count_index < rs_seg_count.size()) { + auto max_add_seg_nums = rs_seg_count[rs_seg_count_index] - rs_seg_start_scan; + rs_readers.emplace_back(rowset_readers_vector[i][rs_seg_count_index]->clone()); + + if (scanner_seg_occupy + max_add_seg_nums > avg_segment_count) { + auto need_add_seg_nums = avg_segment_count - scanner_seg_occupy; + rs_reader_seg_offsets.emplace_back( + rs_seg_start_scan, + rs_seg_start_scan + need_add_seg_nums); // only scan need_add_seg_nums + RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, rs_readers, + rs_reader_seg_offsets)); + + rs_seg_start_scan += need_add_seg_nums; + scanner_seg_occupy = 0; + rs_readers.clear(); + rs_reader_seg_offsets.clear(); + } else if (scanner_seg_occupy + max_add_seg_nums == avg_segment_count) { + rs_reader_seg_offsets.emplace_back(rs_seg_start_scan, + rs_seg_count[rs_seg_count_index]); + RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, rs_readers, + rs_reader_seg_offsets)); + + rs_seg_start_scan = 0; + scanner_seg_occupy = 0; + rs_readers.clear(); + rs_reader_seg_offsets.clear(); + rs_seg_count_index++; + } else { + rs_reader_seg_offsets.emplace_back(rs_seg_start_scan, + rs_seg_count[rs_seg_count_index]); + + rs_seg_start_scan = 0; + scanner_seg_occupy += max_add_seg_nums; + rs_seg_count_index++; + } } - NewOlapScanner* scanner = new NewOlapScanner( - _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, - _need_agg_finalize, _scanner_profile.get()); - - scanner->set_compound_filters(_compound_filters); - // add scanner to pool before doing prepare. - // so that scanner can be automatically deconstructed if prepare failed. - _scanner_pool.add(scanner); - RETURN_IF_ERROR(scanner->prepare( - *scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(), _olap_filters, - _filter_predicates, _push_down_functions, _common_vexpr_ctxs_pushdown.get())); - scanners->push_back((VScanner*)scanner); - disk_set.insert(scanner->scan_disk()); + // dispose some segment tail + if (!rs_readers.empty()) { + build_new_scanner(*scan_range, scanner_ranges, rs_readers, rs_reader_seg_offsets); + } + } + } else { + for (auto& scan_range : _scan_ranges) { + auto tablet_id = scan_range->tablet_id; + auto [tablet, status] = + StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id, + true); + RETURN_IF_ERROR(status); + + std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &cond_ranges; + int size_based_scanners_per_tablet = 1; + + if (config::doris_scan_range_max_mb > 0) { + size_based_scanners_per_tablet = + std::max(1, (int)(tablet->tablet_footprint() / + (config::doris_scan_range_max_mb << 20))); + } + int ranges_per_scanner = + std::max(1, (int)ranges->size() / std::min(scanners_per_tablet, + size_based_scanners_per_tablet)); + int num_ranges = ranges->size(); + for (int i = 0; i < num_ranges;) { + std::vector<doris::OlapScanRange*> scanner_ranges; + scanner_ranges.push_back((*ranges)[i].get()); + ++i; + for (int j = 1; i < num_ranges && j < ranges_per_scanner && + (*ranges)[i]->end_include == (*ranges)[i - 1]->end_include; + ++j, ++i) { + scanner_ranges.push_back((*ranges)[i].get()); + } + RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, {}, {})); + } } + COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size())); } - - COUNTER_SET(_num_disks_accessed_counter, static_cast<int64_t>(disk_set.size())); // telemetry::set_span_attribute(span, _num_disks_accessed_counter); // telemetry::set_span_attribute(span, _num_scanners); diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 9a503bd16d..cb41de43c9 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -52,7 +52,9 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range, const std::vector<TCondition>& filters, const FilterPredicates& filter_predicates, const std::vector<FunctionFilter>& function_filters, - VExprContext** common_vexpr_ctxs_pushdown) { + VExprContext** common_vexpr_ctxs_pushdown, + const std::vector<RowsetReaderSharedPtr>& rs_readers, + const std::vector<std::pair<int, int>>& rs_reader_seg_offsets) { RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr)); if (common_vexpr_ctxs_pushdown != nullptr) { // Copy common_vexpr_ctxs_pushdown from scan node to this scanner's _common_vexpr_ctxs_pushdown, just necessary. @@ -70,14 +72,10 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range, TTabletId tablet_id = scan_range.tablet_id; _version = strtoul(scan_range.version.c_str(), nullptr, 10); { - std::string err; - _tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); - if (_tablet.get() == nullptr) { - std::stringstream ss; - ss << "failed to get tablet. tablet_id=" << tablet_id << ", reason=" << err; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } + auto [tablet, status] = + StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id, true); + RETURN_IF_ERROR(status); + _tablet = std::move(tablet); _tablet_schema->copy_from(*_tablet->tablet_schema()); TOlapScanNode& olap_scan_node = ((NewOlapScanNode*)_parent)->_olap_scan_node; @@ -116,27 +114,32 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range, { std::shared_lock rdlock(_tablet->get_header_lock()); - const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); - if (rowset == nullptr) { - std::stringstream ss; - ss << "fail to get latest version of tablet: " << tablet_id; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } + if (rs_readers.empty()) { + const RowsetSharedPtr rowset = _tablet->rowset_with_max_version(); + if (rowset == nullptr) { + std::stringstream ss; + ss << "fail to get latest version of tablet: " << tablet_id; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } - // acquire tablet rowset readers at the beginning of the scan node - // to prevent this case: when there are lots of olap scanners to run for example 10000 - // the rowsets maybe compacted when the last olap scanner starts - Version rd_version(0, _version); - Status acquire_reader_st = - _tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers); - if (!acquire_reader_st.ok()) { - LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; - std::stringstream ss; - ss << "failed to initialize storage reader. tablet=" << _tablet->full_name() - << ", res=" << acquire_reader_st - << ", backend=" << BackendOptions::get_localhost(); - return Status::InternalError(ss.str()); + // acquire tablet rowset readers at the beginning of the scan node + // to prevent this case: when there are lots of olap scanners to run for example 10000 + // the rowsets maybe compacted when the last olap scanner starts + Version rd_version(0, _version); + Status acquire_reader_st = + _tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers); + if (!acquire_reader_st.ok()) { + LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st; + std::stringstream ss; + ss << "failed to initialize storage reader. tablet=" << _tablet->full_name() + << ", res=" << acquire_reader_st + << ", backend=" << BackendOptions::get_localhost(); + return Status::InternalError(ss.str()); + } + } else { + _tablet_reader_params.rs_readers = rs_readers; + _tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets; } // Initialize tablet_reader_params diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 83968e2535..2a04e021db 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -45,7 +45,9 @@ public: VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& filters, const FilterPredicates& filter_predicates, const std::vector<FunctionFilter>& function_filters, - VExprContext** common_vexpr_ctxs_pushdown); + VExprContext** common_vexpr_ctxs_pushdown, + const std::vector<RowsetReaderSharedPtr>& rs_readers = {}, + const std::vector<std::pair<int, int>>& rs_reader_seg_offsets = {}); const std::string& scan_disk() const { return _tablet->data_dir()->path(); } diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 0e418256c2..6a38617a9d 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -84,7 +84,6 @@ public: void set_max_queue_size(int max_queue_size) override { for (int i = 0; i < max_queue_size; ++i) { - _blocks_queue_empty.emplace_back(true); _queue_mutexs.emplace_back(new std::mutex); _blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>()); } @@ -92,7 +91,6 @@ public: private: int _next_queue_to_feed = 0; - std::vector<bool> _blocks_queue_empty; std::vector<std::unique_ptr<std::mutex>> _queue_mutexs; std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues; }; diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index a30abe2650..a9dbcd6226 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -75,10 +75,17 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params) { _reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt; std::vector<RowsetReaderSharedPtr> valid_rs_readers; - for (auto& rs_reader : read_params.rs_readers) { + DCHECK(read_params.rs_readers_segment_offsets.empty() || + read_params.rs_readers_segment_offsets.size() == read_params.rs_readers.size()); + + bool is_empty = read_params.rs_readers_segment_offsets.empty(); + for (int i = 0; i < read_params.rs_readers.size(); ++i) { + auto& rs_reader = read_params.rs_readers[i]; // _vcollect_iter.topn_next() will init rs_reader by itself if (!_vcollect_iter.use_topn_next()) { - RETURN_NOT_OK(rs_reader->init(&_reader_context)); + RETURN_NOT_OK(rs_reader->init( + &_reader_context, + is_empty ? std::pair {0, 0} : read_params.rs_readers_segment_offsets[i])); } Status res = _vcollect_iter.add_child(rs_reader); if (!res.ok() && !res.is<END_OF_FILE>()) { diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 7195ed3381..097ddb7513 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -53,7 +53,8 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para // segment iterator will be inited here // In vertical compaction, every group will load segment so we should cache // segment to avoid tot many s3 head request - RETURN_NOT_OK(rs_reader->get_segment_iterators(&_reader_context, segment_iters, true)); + RETURN_NOT_OK( + rs_reader->get_segment_iterators(&_reader_context, segment_iters, {0, 0}, true)); // if segments overlapping, all segment iterator should be inited in // heap merge iterator. If segments are none overlapping, only first segment of this // rowset will be inited and push to heap, other segment will be inited later when current diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java index 597d156ea0..71ffa474b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java @@ -64,7 +64,7 @@ public class AuditEvent { public int errorCode = 0; @AuditField(value = "ErrorMessage") public String errorMessage = ""; - @AuditField(value = "Time") + @AuditField(value = "Time(ms)") public long queryTime = -1; @AuditField(value = "ScanBytes") public long scanBytes = -1; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org