This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2f192019d3 [bugfix](delete hanlder) delete predicate is merged and could not find schema cause core dump (#12161) 2f192019d3 is described below commit 2f192019d30aa0f48c192ebf672391b1207374ff Author: yiguolei <676222...@qq.com> AuthorDate: Tue Aug 30 09:18:21 2022 +0800 [bugfix](delete hanlder) delete predicate is merged and could not find schema cause core dump (#12161) Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/exec/olap_scanner.cpp | 8 ++-- be/src/olap/base_compaction.cpp | 3 +- be/src/olap/cumulative_compaction.cpp | 4 +- be/src/olap/delete_handler.cpp | 15 +++---- be/src/olap/delete_handler.h | 5 ++- be/src/olap/merger.cpp | 16 +++---- be/src/olap/reader.cpp | 2 +- be/src/olap/reader.h | 2 +- be/src/olap/schema_change.cpp | 33 +++++++------- be/src/olap/schema_change.h | 7 +-- be/src/olap/tablet.cpp | 16 +++---- be/src/olap/tablet.h | 10 +++-- be/src/olap/tablet_meta.cpp | 50 ++++------------------ be/src/olap/tablet_meta.h | 7 +-- be/src/vec/exec/scan/new_olap_scanner.cpp | 4 +- be/src/vec/exec/volap_scanner.cpp | 9 ++-- be/test/olap/cumulative_compaction_policy_test.cpp | 36 ++++++++++------ be/test/olap/delete_handler_test.cpp | 8 ++-- gensrc/proto/olap_file.proto | 2 +- 19 files changed, 103 insertions(+), 134 deletions(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index e052125a49..8ff6db2ab6 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -203,13 +203,13 @@ Status OlapScanner::_init_tablet_reader_params( std::copy(function_filters.cbegin(), function_filters.cend(), std::inserter(_tablet_reader_params.function_filters, _tablet_reader_params.function_filters.begin())); - std::copy(_tablet->delete_predicates().cbegin(), _tablet->delete_predicates().cend(), + auto& delete_preds = _tablet->delete_predicates(); + std::copy(delete_preds.cbegin(), delete_preds.cend(), std::inserter(_tablet_reader_params.delete_predicates, _tablet_reader_params.delete_predicates.begin())); // Merge the columns in delete predicate that not in latest schema in to current tablet schema - for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) { - _tablet_schema->merge_dropped_columns( - _tablet->tablet_schema(Version(del_pred_pb.version(), del_pred_pb.version()))); + for (auto& del_pred_rs : _tablet_reader_params.delete_predicates) { + _tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_rs->version())); } // Range for (auto key_range : key_ranges) { diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 22485c2b7a..2dca9b9d78 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -105,7 +105,8 @@ void BaseCompaction::_filter_input_rowset() { Status BaseCompaction::pick_rowsets_to_compact() { _input_rowsets.clear(); - _tablet->pick_candidate_rowsets_to_base_compaction(&_input_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_base_compaction(&_input_rowsets, rdlock); std::sort(_input_rowsets.begin(), _input_rowsets.end(), Rowset::comparator); RETURN_NOT_OK(check_version_continuity(_input_rowsets)); RETURN_NOT_OK(_check_rowset_overlapping(_input_rowsets)); diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index ba52fbfdcb..4461a240b5 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -96,8 +96,8 @@ Status CumulativeCompaction::execute_compact_impl() { Status CumulativeCompaction::pick_rowsets_to_compact() { std::vector<RowsetSharedPtr> candidate_rowsets; - - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); if (candidate_rowsets.empty()) { return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION); diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp index 3c5f9ef5c6..bbb0735356 100644 --- a/be/src/olap/delete_handler.cpp +++ b/be/src/olap/delete_handler.cpp @@ -237,23 +237,22 @@ bool DeleteHandler::_parse_condition(const std::string& condition_str, TConditio return true; } -Status DeleteHandler::init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tablet_schema, - const std::vector<DeletePredicatePB>& delete_conditions, - int64_t version) { +Status DeleteHandler::init(TabletSchemaSPtr tablet_schema, + const std::vector<RowsetMetaSharedPtr>& delete_preds, int64_t version) { DCHECK(!_is_inited) << "reinitialize delete handler."; DCHECK(version >= 0) << "invalid parameters. version=" << version; _predicate_mem_pool.reset(new MemPool()); - for (const auto& delete_condition : delete_conditions) { + for (const auto& delete_pred : delete_preds) { // Skip the delete condition with large version - if (delete_condition.version() > version) { + if (delete_pred->version().first > version) { continue; } // Need the tablet schema at the delete condition to parse the accurate column unique id - TabletSchemaSPtr delete_pred_related_schema = tablet->tablet_schema( - Version(delete_condition.version(), delete_condition.version())); + TabletSchemaSPtr delete_pred_related_schema = delete_pred->tablet_schema(); + auto& delete_condition = delete_pred->delete_predicate(); DeleteConditions temp; - temp.filter_version = delete_condition.version(); + temp.filter_version = delete_pred->version().first; for (const auto& sub_predicate : delete_condition.sub_predicates()) { TCondition condition; if (!_parse_condition(sub_predicate, &condition)) { diff --git a/be/src/olap/delete_handler.h b/be/src/olap/delete_handler.h index 7d720a9701..3dc8e5b3b7 100644 --- a/be/src/olap/delete_handler.h +++ b/be/src/olap/delete_handler.h @@ -25,6 +25,7 @@ #include "olap/block_column_predicate.h" #include "olap/column_predicate.h" #include "olap/olap_define.h" +#include "olap/rowset/rowset_meta.h" #include "olap/tablet_schema.h" namespace doris { @@ -89,8 +90,8 @@ public: // return: // * Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_PARAMETERS): input parameters are not valid // * Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR): alloc memory failed - Status init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tablet_schema, - const std::vector<DeletePredicatePB>& delete_conditions, int64_t version); + Status init(TabletSchemaSPtr tablet_schema, + const std::vector<RowsetMetaSharedPtr>& delete_conditions, int64_t version); bool empty() const { return _del_conds.empty(); } diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 4ee7d18796..ace3d6b39a 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -43,16 +43,16 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, reader_params.version = dst_rowset_writer->version(); { std::shared_lock rdlock(tablet->get_header_lock()); - std::copy(tablet->delete_predicates().cbegin(), tablet->delete_predicates().cend(), + auto delete_preds = tablet->delete_predicates(); + std::copy(delete_preds.cbegin(), delete_preds.cend(), std::inserter(reader_params.delete_predicates, reader_params.delete_predicates.begin())); } TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>(); merge_tablet_schema->copy_from(*cur_tablet_schema); // Merge the columns in delete predicate that not in latest schema in to current tablet schema - for (auto& del_pred_pb : reader_params.delete_predicates) { - merge_tablet_schema->merge_dropped_columns( - tablet->tablet_schema(Version(del_pred_pb.version(), del_pred_pb.version()))); + for (auto& del_pred_rs : reader_params.delete_predicates) { + merge_tablet_schema->merge_dropped_columns(tablet->tablet_schema(del_pred_rs->version())); } reader_params.tablet_schema = merge_tablet_schema; RETURN_NOT_OK(reader.init(reader_params)); @@ -116,16 +116,16 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, reader_params.version = dst_rowset_writer->version(); { std::shared_lock rdlock(tablet->get_header_lock()); - std::copy(tablet->delete_predicates().cbegin(), tablet->delete_predicates().cend(), + auto delete_preds = tablet->delete_predicates(); + std::copy(delete_preds.cbegin(), delete_preds.cend(), std::inserter(reader_params.delete_predicates, reader_params.delete_predicates.begin())); } TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>(); merge_tablet_schema->copy_from(*cur_tablet_schema); // Merge the columns in delete predicate that not in latest schema in to current tablet schema - for (auto& del_pred_pb : reader_params.delete_predicates) { - merge_tablet_schema->merge_dropped_columns( - tablet->tablet_schema(Version(del_pred_pb.version(), del_pred_pb.version()))); + for (auto& del_pred_rs : reader_params.delete_predicates) { + merge_tablet_schema->merge_dropped_columns(tablet->tablet_schema(del_pred_rs->version())); } reader_params.tablet_schema = merge_tablet_schema; if (tablet->enable_unique_key_merge_on_write()) { diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 2f7eb06243..a074ec6ffd 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -497,7 +497,7 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) { _filter_delete = true; } - return _delete_handler.init(_tablet, _tablet_schema, read_params.delete_predicates, + return _delete_handler.init(_tablet_schema, read_params.delete_predicates, read_params.version.second); } diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 15d1f26598..004e75c773 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -78,7 +78,7 @@ public: std::vector<TCondition> conditions; std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>> bloom_filters; std::vector<FunctionFilter> function_filters; - std::vector<DeletePredicatePB> delete_predicates; + std::vector<RowsetMetaSharedPtr> delete_predicates; // For unique key table with merge-on-write DeleteBitmap* delete_bitmap {nullptr}; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 04683d75d1..65f9b61bb1 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1850,15 +1850,15 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& res = Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS); break; } - for (auto& delete_pred : base_tablet->delete_predicates()) { - if (delete_pred.version() > end_version) { + auto& all_del_preds = base_tablet->delete_predicates(); + for (auto& delete_pred : all_del_preds) { + if (delete_pred->version().first > end_version) { continue; } - base_tablet_schema->merge_dropped_columns(base_tablet->tablet_schema( - Version(delete_pred.version(), delete_pred.version()))); + base_tablet_schema->merge_dropped_columns( + base_tablet->tablet_schema(delete_pred->version())); } - res = delete_handler.init(base_tablet, base_tablet_schema, - base_tablet->delete_predicates(), end_version); + res = delete_handler.init(base_tablet_schema, all_del_preds, end_version); if (!res) { LOG(WARNING) << "init delete handler failed. base_tablet=" << base_tablet->full_name() << ", end_version=" << end_version; @@ -2013,9 +2013,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams bool sc_directly = false; // a.Parse the Alter request and convert it into an internal representation - Status res = _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer, - &sc_sorting, &sc_directly, sc_params.materialized_params_map, - *sc_params.desc_tbl, sc_params.base_tablet_schema); + Status res = _parse_request(sc_params, &rb_changer, &sc_sorting, &sc_directly); auto process_alter_exit = [&]() -> Status { { @@ -2115,12 +2113,15 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams // @static // Analyze the mapping of the column and the mapping of the filter key -Status SchemaChangeHandler::_parse_request( - TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, RowBlockChanger* rb_changer, - bool* sc_sorting, bool* sc_directly, - const std::unordered_map<std::string, AlterMaterializedViewParam>& - materialized_function_map, - DescriptorTbl desc_tbl, TabletSchemaSPtr base_tablet_schema) { +Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params, + RowBlockChanger* rb_changer, bool* sc_sorting, + bool* sc_directly) { + TabletSharedPtr base_tablet = sc_params.base_tablet; + TabletSharedPtr new_tablet = sc_params.new_tablet; + TabletSchemaSPtr base_tablet_schema = sc_params.base_tablet_schema; + const std::unordered_map<std::string, AlterMaterializedViewParam>& materialized_function_map = + sc_params.materialized_params_map; + DescriptorTbl desc_tbl = *sc_params.desc_tbl; // set column mapping for (int i = 0, new_schema_size = new_tablet->tablet_schema()->num_columns(); i < new_schema_size; ++i) { @@ -2232,7 +2233,7 @@ Status SchemaChangeHandler::_parse_request( } } - if (base_tablet->delete_predicates().size() != 0) { + if (!sc_params.delete_handler->empty()) { // there exists delete condition in header, can't do linked schema change *sc_directly = true; } diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 45747a1400..1a8c26ced6 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -305,11 +305,8 @@ private: static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params); - static Status _parse_request(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, - RowBlockChanger* rb_changer, bool* sc_sorting, bool* sc_directly, - const std::unordered_map<std::string, AlterMaterializedViewParam>& - materialized_function_map, - DescriptorTbl desc_tbl, TabletSchemaSPtr base_tablet_schema); + static Status _parse_request(const SchemaChangeParams& sc_params, RowBlockChanger* rb_changer, + bool* sc_sorting, bool* sc_directly); // Initialization Settings for creating a default value static Status _init_column_mapping(ColumnMapping* column_mapping, diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 49ae2fd2ea..6aaeff9632 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -177,9 +177,6 @@ Status Tablet::revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowset // delete versions from new local tablet_meta for (const Version& version : versions_to_delete) { new_tablet_meta->delete_rs_meta_by_version(version, nullptr); - if (new_tablet_meta->version_for_delete_predicate(version)) { - new_tablet_meta->remove_delete_predicate_by_version(version); - } LOG(INFO) << "delete version from new local tablet_meta when clone. [table=" << full_name() << ", version=" << version << "]"; } @@ -1137,17 +1134,18 @@ TabletInfo Tablet::get_tablet_info() const { } void Tablet::pick_candidate_rowsets_to_cumulative_compaction( - std::vector<RowsetSharedPtr>* candidate_rowsets) { + std::vector<RowsetSharedPtr>* candidate_rowsets, + std::shared_lock<std::shared_mutex>& /* meta lock*/) { if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) { return; } - std::shared_lock rdlock(_meta_lock); _cumulative_compaction_policy->pick_candidate_rowsets(_rs_version_map, _cumulative_point, candidate_rowsets); } -void Tablet::pick_candidate_rowsets_to_base_compaction(vector<RowsetSharedPtr>* candidate_rowsets) { - std::shared_lock rdlock(_meta_lock); +void Tablet::pick_candidate_rowsets_to_base_compaction( + vector<RowsetSharedPtr>* candidate_rowsets, + std::shared_lock<std::shared_mutex>& /* meta lock*/) { for (auto& it : _rs_version_map) { // Do compaction on local rowsets only. if (it.first.first < _cumulative_point && it.second->is_local()) { @@ -1753,10 +1751,6 @@ Status Tablet::cooldown() { has_shutdown = tablet_state() == TABLET_SHUTDOWN; if (!has_shutdown) { modify_rowsets(to_add, to_delete); - if (new_rowset_meta->has_delete_predicate()) { - _tablet_meta->add_delete_predicate(new_rowset_meta->delete_predicate(), - new_rowset_meta->start_version()); - } _self_owned_remote_rowsets.insert(to_add.front()); save_meta(); } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 65cee05302..6326f3c13b 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -20,6 +20,7 @@ #include <functional> #include <memory> #include <set> +#include <shared_mutex> #include <string> #include <unordered_map> #include <unordered_set> @@ -156,7 +157,7 @@ public: Status capture_rs_readers(const std::vector<Version>& version_path, std::vector<RowsetReaderSharedPtr>* rs_readers) const; - const std::vector<DeletePredicatePB>& delete_predicates() { + const std::vector<RowsetMetaSharedPtr> delete_predicates() { return _tablet_meta->delete_predicates(); } bool version_for_delete_predicate(const Version& version); @@ -228,8 +229,11 @@ public: TabletInfo get_tablet_info() const; void pick_candidate_rowsets_to_cumulative_compaction( - std::vector<RowsetSharedPtr>* candidate_rowsets); - void pick_candidate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets); + std::vector<RowsetSharedPtr>* candidate_rowsets, + std::shared_lock<std::shared_mutex>& /* meta lock*/); + void pick_candidate_rowsets_to_base_compaction( + std::vector<RowsetSharedPtr>* candidate_rowsets, + std::shared_lock<std::shared_mutex>& /* meta lock*/); void calculate_cumulative_point(); // TODO(ygl): diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 6e2d23ddc6..47d409abcd 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -208,7 +208,6 @@ TabletMeta::TabletMeta(const TabletMeta& b) _schema(b._schema), _rs_metas(b._rs_metas), _stale_rs_metas(b._stale_rs_metas), - _del_predicates(b._del_predicates), _in_restore_mode(b._in_restore_mode), _preferred_rowset_type(b._preferred_rowset_type), _storage_policy(b._storage_policy), @@ -447,9 +446,6 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { for (auto& it : tablet_meta_pb.rs_metas()) { RowsetMetaSharedPtr rs_meta(new RowsetMeta()); rs_meta->init_from_pb(it); - if (rs_meta->has_delete_predicate()) { - add_delete_predicate(rs_meta->delete_predicate(), rs_meta->version().first); - } _rs_metas.push_back(std::move(rs_meta)); } @@ -607,12 +603,7 @@ Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr& rs_meta) { } } } - _rs_metas.push_back(rs_meta); - if (rs_meta->has_delete_predicate()) { - add_delete_predicate(rs_meta->delete_predicate(), rs_meta->version().first); - } - return Status::OK(); } @@ -640,9 +631,6 @@ void TabletMeta::modify_rs_metas(const std::vector<RowsetMetaSharedPtr>& to_add, auto it = _rs_metas.begin(); while (it != _rs_metas.end()) { if (rs_to_del->version() == (*it)->version()) { - if ((*it)->has_delete_predicate()) { - remove_delete_predicate_by_version((*it)->version()); - } _rs_metas.erase(it); // there should be only one rowset match the version break; @@ -721,36 +709,14 @@ RowsetMetaSharedPtr TabletMeta::acquire_stale_rs_meta_by_version(const Version& return nullptr; } -void TabletMeta::add_delete_predicate(const DeletePredicatePB& delete_predicate, int64_t version) { - for (auto& del_pred : _del_predicates) { - if (del_pred.version() == version) { - *del_pred.mutable_sub_predicates() = delete_predicate.sub_predicates(); - return; +const std::vector<RowsetMetaSharedPtr> TabletMeta::delete_predicates() const { + std::vector<RowsetMetaSharedPtr> res; + for (auto& del_pred : _rs_metas) { + if (del_pred->has_delete_predicate()) { + res.push_back(del_pred); } } - DeletePredicatePB copied_pred = delete_predicate; - copied_pred.set_version(version); - _del_predicates.emplace_back(copied_pred); -} - -void TabletMeta::remove_delete_predicate_by_version(const Version& version) { - DCHECK(version.first == version.second) << "version=" << version; - int pred_to_del = -1; - for (int i = 0; i < _del_predicates.size(); ++i) { - if (_del_predicates[i].version() == version.first) { - pred_to_del = i; - // one DeletePredicatePB stands for a nested predicate, such as user submit a delete predicate a=1 and b=2 - // they could be saved as a one DeletePredicatePB - break; - } - } - if (pred_to_del > -1) { - _del_predicates.erase(_del_predicates.begin() + pred_to_del); - } -} - -const std::vector<DeletePredicatePB>& TabletMeta::delete_predicates() const { - return _del_predicates; + return res; } bool TabletMeta::version_for_delete_predicate(const Version& version) { @@ -758,8 +724,8 @@ bool TabletMeta::version_for_delete_predicate(const Version& version) { return false; } - for (auto& del_pred : _del_predicates) { - if (del_pred.version() == version.first) { + for (auto& del_pred : _rs_metas) { + if (del_pred->version().first == version.first && del_pred->has_delete_predicate()) { return true; } } diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index d0f591e354..e42105a40a 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -169,10 +169,7 @@ public: RowsetMetaSharedPtr acquire_rs_meta_by_version(const Version& version) const; void delete_stale_rs_meta_by_version(const Version& version); RowsetMetaSharedPtr acquire_stale_rs_meta_by_version(const Version& version) const; - - void add_delete_predicate(const DeletePredicatePB& delete_predicate, int64_t version); - void remove_delete_predicate_by_version(const Version& version); - const std::vector<DeletePredicatePB>& delete_predicates() const; + const std::vector<RowsetMetaSharedPtr> delete_predicates() const; bool version_for_delete_predicate(const Version& version); std::string full_name() const; @@ -241,8 +238,6 @@ private: // These stale rowsets meta are been removed when rowsets' pathVersion is expired, // this policy is judged and computed by TimestampedVersionTracker. std::vector<RowsetMetaSharedPtr> _stale_rs_metas; - - std::vector<DeletePredicatePB> _del_predicates; bool _in_restore_mode = false; RowsetTypePB _preferred_rowset_type = BETA_ROWSET; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 3ae67a34be..b99ce8281d 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -172,8 +172,8 @@ Status NewOlapScanner::_init_tablet_reader_params( std::copy(function_filters.cbegin(), function_filters.cend(), std::inserter(_tablet_reader_params.function_filters, _tablet_reader_params.function_filters.begin())); - - std::copy(_tablet->delete_predicates().cbegin(), _tablet->delete_predicates().cend(), + auto& delete_preds = _tablet->delete_predicates(); + std::copy(delete_preds.cbegin(), delete_preds.cend(), std::inserter(_tablet_reader_params.delete_predicates, _tablet_reader_params.delete_predicates.begin())); diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index 835a3dd59b..efa80b36ac 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -192,15 +192,14 @@ Status VOlapScanner::_init_tablet_reader_params( std::copy(function_filters.cbegin(), function_filters.cend(), std::inserter(_tablet_reader_params.function_filters, _tablet_reader_params.function_filters.begin())); - - std::copy(_tablet->delete_predicates().cbegin(), _tablet->delete_predicates().cend(), + auto& delete_preds = _tablet->delete_predicates(); + std::copy(delete_preds.cbegin(), delete_preds.cend(), std::inserter(_tablet_reader_params.delete_predicates, _tablet_reader_params.delete_predicates.begin())); // Merge the columns in delete predicate that not in latest schema in to current tablet schema - for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) { - _tablet_schema->merge_dropped_columns( - _tablet->tablet_schema(Version(del_pred_pb.version(), del_pred_pb.version()))); + for (auto& del_pred_rs : _tablet_reader_params.delete_predicates) { + _tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_rs->version())); } // Range diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp index deff7a6190..a7117fba43 100644 --- a/be/test/olap/cumulative_compaction_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_policy_test.cpp @@ -194,7 +194,8 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_candidate_rowsets) { _tablet->calculate_cumulative_point(); std::vector<RowsetSharedPtr> candidate_rowsets; - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); EXPECT_EQ(2, candidate_rowsets.size()); } @@ -214,7 +215,8 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) { NumBasedCumulativeCompactionPolicy policy; std::vector<RowsetSharedPtr> candidate_rowsets; - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -243,7 +245,8 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) { NumBasedCumulativeCompactionPolicy policy; std::vector<RowsetSharedPtr> candidate_rowsets; - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -633,7 +636,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets) { _tablet->calculate_cumulative_point(); std::vector<RowsetSharedPtr> candidate_rowsets; - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); EXPECT_EQ(3, candidate_rowsets.size()); } @@ -651,7 +655,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets_big_base) _tablet->calculate_cumulative_point(); std::vector<RowsetSharedPtr> candidate_rowsets; - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); EXPECT_EQ(3, candidate_rowsets.size()); } @@ -670,7 +675,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) { std::vector<RowsetSharedPtr> candidate_rowsets; - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -699,7 +705,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_big_base) { std::vector<RowsetSharedPtr> candidate_rowsets; - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -728,7 +735,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_promotion) { std::vector<RowsetSharedPtr> candidate_rowsets; - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -757,7 +765,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_same_leve std::vector<RowsetSharedPtr> candidate_rowsets; - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -786,7 +795,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_empty) { std::vector<RowsetSharedPtr> candidate_rowsets; - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -815,7 +825,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_reach_min std::vector<RowsetSharedPtr> candidate_rowsets; - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; @@ -844,7 +855,8 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) { std::vector<RowsetSharedPtr> candidate_rowsets; - _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets); + std::shared_lock rdlock(_tablet->get_header_lock()); + _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock); std::vector<RowsetSharedPtr> input_rowsets; Version last_delete_version {-1, -1}; diff --git a/be/test/olap/delete_handler_test.cpp b/be/test/olap/delete_handler_test.cpp index b5a7c9f307..2bf6d74714 100644 --- a/be/test/olap/delete_handler_test.cpp +++ b/be/test/olap/delete_handler_test.cpp @@ -968,7 +968,7 @@ TEST_F(TestDeleteHandler, InitSuccess) { add_delete_predicate(del_pred_4, 5); // Get delete conditions which version <= 5 - res = _delete_handler.init(tablet, tablet->tablet_schema(), tablet->delete_predicates(), 5); + res = _delete_handler.init(tablet->tablet_schema(), tablet->delete_predicates(), 5); EXPECT_EQ(Status::OK(), res); _delete_handler.finalize(); } @@ -1000,7 +1000,7 @@ TEST_F(TestDeleteHandler, FilterDataSubconditions) { add_delete_predicate(del_pred, 2); // 指定版本号为10以载入Header中的所有过滤条件(在这个case中,只有过滤条件1) - res = _delete_handler.init(tablet, tablet->tablet_schema(), tablet->delete_predicates(), 4); + res = _delete_handler.init(tablet->tablet_schema(), tablet->delete_predicates(), 4); EXPECT_EQ(Status::OK(), res); // 构造一行测试数据 @@ -1084,7 +1084,7 @@ TEST_F(TestDeleteHandler, FilterDataConditions) { add_delete_predicate(del_pred_3, 4); // 指定版本号为4以载入meta中的所有过滤条件(在这个case中,只有过滤条件1) - res = _delete_handler.init(tablet, tablet->tablet_schema(), tablet->delete_predicates(), 4); + res = _delete_handler.init(tablet->tablet_schema(), tablet->delete_predicates(), 4); EXPECT_EQ(Status::OK(), res); std::vector<string> data_str; @@ -1146,7 +1146,7 @@ TEST_F(TestDeleteHandler, FilterDataVersion) { add_delete_predicate(del_pred_2, 4); // 指定版本号为4以载入meta中的所有过滤条件(过滤条件1,过滤条件2) - res = _delete_handler.init(tablet, tablet->tablet_schema(), tablet->delete_predicates(), 4); + res = _delete_handler.init(tablet->tablet_schema(), tablet->delete_predicates(), 4); EXPECT_EQ(Status::OK(), res); // 构造一行测试数据 diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 020be00ead..f19228b935 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -149,7 +149,7 @@ enum KeysType { } message DeletePredicatePB { - required int32 version = 1; + required int32 version = 1; // This field is useless, but could not removed, not depend on it repeated string sub_predicates = 2; repeated InPredicatePB in_predicates = 3; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org