This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 48065fce19 [bugfix](merge-on-write) optimize rowset tree and tablet header lock (#20911) 48065fce19 is described below commit 48065fce19170d7d3eb5c0d5ae023f560fac6277 Author: Xin Liao <liaoxin...@126.com> AuthorDate: Sun Jun 18 19:26:02 2023 +0800 [bugfix](merge-on-write) optimize rowset tree and tablet header lock (#20911) --- be/src/olap/delta_writer.cpp | 4 +- be/src/olap/memtable.cpp | 17 +- be/src/olap/rowset/rowset_meta.h | 3 + be/src/olap/rowset/segment_v2/segment.cpp | 1 + be/src/olap/rowset/segment_v2/segment_writer.cpp | 8 +- be/src/olap/tablet.cpp | 299 +++++++++++------------ be/src/olap/tablet.h | 22 +- be/src/olap/tablet_meta.cpp | 1 + be/src/service/point_query_executor.cpp | 10 +- be/test/olap/tablet_test.cpp | 62 ----- 10 files changed, 191 insertions(+), 236 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 394950a193..df9167484b 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -454,11 +454,9 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes, RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, segments, _delete_bitmap)); } - int64_t cur_max_version = _tablet->max_version().second; RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap( - _cur_rowset, _rowset_ids, _delete_bitmap, cur_max_version, segments, _req.txn_id, + _cur_rowset, _rowset_ids, _delete_bitmap, segments, _req.txn_id, _rowset_writer.get())); - _rowset_ids = _tablet->all_rs_id(cur_max_version); } Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id, _req.load_id, _cur_rowset, false); diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 2cdc440848..ace9f4f689 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -453,15 +453,18 @@ Status MemTable::_generate_delete_bitmap(int32_t segment_id) { auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get()); std::vector<segment_v2::SegmentSharedPtr> segments; RETURN_IF_ERROR(beta_rowset->load_segments(segment_id, segment_id + 1, &segments)); - std::shared_lock meta_rlock(_tablet->get_header_lock()); - // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (_tablet->tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { - return Status::OK(); + std::vector<RowsetSharedPtr> specified_rowsets; + { + std::shared_lock meta_rlock(_tablet->get_header_lock()); + // tablet is under alter process. The delete bitmap will be calculated after conversion. + if (_tablet->tablet_state() == TABLET_NOTREADY && + SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { + return Status::OK(); + } + specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids); } - OlapStopWatch watch; - RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments, &_mow_context->rowset_ids, + RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments, specified_rowsets, _mow_context->delete_bitmap, _mow_context->max_version)); size_t total_rows = std::accumulate( diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 6736c53cac..68bb1e7075 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -321,6 +321,9 @@ public: segments_key_bounds->push_back(key_range); } } + + auto& get_segments_key_bounds() { return _rowset_meta_pb.segments_key_bounds(); } + virtual bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) { // for compatibility, old version has not segment key bounds if (_rowset_meta_pb.segments_key_bounds_size() == 0) { diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 527f8391cc..e046785bb0 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -378,6 +378,7 @@ Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, RowLocation* } row_location->row_id = index_iterator->get_current_ordinal(); row_location->segment_id = _segment_id; + row_location->rowset_id = _rowset_id; if (has_seq_col) { size_t num_to_read = 1; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 8f7d27a947..a5c9fa147b 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -366,9 +366,13 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* std::vector<bool> use_default_flag; use_default_flag.reserve(num_rows); std::unordered_map<RowsetId, SegmentCacheHandle, HashOfRowsetId> segment_caches; - // locate rows in base data + std::vector<RowsetSharedPtr> specified_rowsets; { std::shared_lock rlock(_tablet->get_header_lock()); + specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids); + } + // locate rows in base data + { for (size_t pos = row_pos; pos < num_rows; pos++) { std::string key = _full_encode_keys(key_columns, pos); RETURN_IF_ERROR(_primary_key_index_builder->add_item(key)); @@ -377,7 +381,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* RowLocation loc; // save rowset shared ptr so this rowset wouldn't delete RowsetSharedPtr rowset; - auto st = _tablet->lookup_row_key(key, false, &_mow_context->rowset_ids, &loc, + auto st = _tablet->lookup_row_key(key, false, specified_rowsets, &loc, _mow_context->max_version, segment_caches, &rowset); if (st.is<NOT_FOUND>()) { if (!_tablet_schema->allow_key_not_exist_in_partial_update()) { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 6a3749ca4f..79d0237931 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -303,10 +303,6 @@ Status Tablet::_init_once_action() { _stale_rs_version_map[version] = std::move(rowset); } - if (_schema->keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) { - _rowset_tree = std::make_unique<RowsetTree>(); - res = _rowset_tree->Init(rowset_vec); - } return res; } @@ -331,9 +327,6 @@ Status Tablet::revise_tablet_meta(const std::vector<RowsetSharedPtr>& to_add, // reconstruct from tablet meta _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas()); if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) { - auto new_rowset_tree = std::make_unique<RowsetTree>(); - ModifyRowSetTree(*_rowset_tree, to_delete, to_add, new_rowset_tree.get()); - _rowset_tree = std::move(new_rowset_tree); std::vector<RowsetSharedPtr> calc_delete_bitmap_rowsets; int64_t to_add_min_version = INT64_MAX; int64_t to_add_max_version = INT64_MIN; @@ -422,13 +415,6 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) { _rs_version_map[rowset->version()] = rowset; _timestamped_version_tracker.add_version(rowset->version()); - // Update rowset tree - if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) { - auto new_rowset_tree = std::make_unique<RowsetTree>(); - ModifyRowSetTree(*_rowset_tree, {}, {rowset}, new_rowset_tree.get()); - _rowset_tree = std::move(new_rowset_tree); - } - std::vector<RowsetSharedPtr> rowsets_to_delete; // yiguolei: temp code, should remove the rowset contains by this rowset // but it should be removed in multi path version @@ -521,13 +507,6 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add, _tablet_meta->modify_rs_metas(rs_metas_to_add, rs_metas_to_delete, same_version); - // Update rowset tree - if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) { - auto new_rowset_tree = std::make_unique<RowsetTree>(); - ModifyRowSetTree(*_rowset_tree, to_delete, to_add, new_rowset_tree.get()); - _rowset_tree = std::move(new_rowset_tree); - } - if (!same_version) { // add rs_metas_to_delete to tracker _timestamped_version_tracker.add_stale_path_version(rs_metas_to_delete); @@ -658,13 +637,6 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) { RETURN_IF_ERROR(_tablet_meta->add_rs_meta(rowset->rowset_meta())); _rs_version_map[rowset->version()] = rowset; - // Update rowset tree - if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) { - auto new_rowset_tree = std::make_unique<RowsetTree>(); - ModifyRowSetTree(*_rowset_tree, {}, {rowset}, new_rowset_tree.get()); - _rowset_tree = std::move(new_rowset_tree); - } - _timestamped_version_tracker.add_version(rowset->version()); ++_newly_created_rowset_num; @@ -1190,11 +1162,6 @@ void Tablet::delete_all_files() { it.second->remove(); } _stale_rs_version_map.clear(); - - if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) { - // clear rowset_tree - _rowset_tree = std::make_unique<RowsetTree>(); - } } void Tablet::check_tablet_path_exists() { @@ -2722,71 +2689,71 @@ Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation& row_ return Status::OK(); } -// ATTN: caller should hold the meta lock. Status Tablet::lookup_row_key( - const Slice& encoded_key, bool with_seq_col, const RowsetIdUnorderedSet* rowset_ids, - RowLocation* row_location, uint32_t version, + const Slice& encoded_key, bool with_seq_col, + const std::vector<RowsetSharedPtr>& specified_rowsets, RowLocation* row_location, + uint32_t version, std::unordered_map<RowsetId, SegmentCacheHandle, HashOfRowsetId>& segment_caches, RowsetSharedPtr* rowset) { - std::vector<std::pair<RowsetSharedPtr, int32_t>> selected_rs; size_t seq_col_length = 0; if (_schema->has_sequence_col() && with_seq_col) { seq_col_length = _schema->column(_schema->sequence_col_idx()).length() + 1; } Slice key_without_seq = Slice(encoded_key.get_data(), encoded_key.get_size() - seq_col_length); - _rowset_tree->FindRowsetsWithKeyInRange(key_without_seq, rowset_ids, &selected_rs); - if (selected_rs.empty()) { - return Status::NotFound("No rowsets contains the key in key range"); - } - // Usually newly written data has a higher probability of being modified, so prefer - // to search the key in the rowset with larger version. - std::sort(selected_rs.begin(), selected_rs.end(), - [](std::pair<RowsetSharedPtr, int32_t>& a, std::pair<RowsetSharedPtr, int32_t>& b) { - if (a.first->end_version() == b.first->end_version()) { - return a.second > b.second; - } - return a.first->end_version() > b.first->end_version(); - }); RowLocation loc; - for (auto& rs : selected_rs) { - if (rs.first->end_version() > version) { + + for (auto& rs : specified_rowsets) { + auto& segments_key_bounds = rs->rowset_meta()->get_segments_key_bounds(); + int num_segments = rs->num_segments(); + DCHECK_EQ(segments_key_bounds.size(), num_segments); + std::vector<uint32_t> picked_segments; + for (int i = num_segments - 1; i >= 0; i--) { + if (key_without_seq.compare(segments_key_bounds[i].max_key()) > 0 || + key_without_seq.compare(segments_key_bounds[i].min_key()) < 0) { + continue; + } + picked_segments.emplace_back(i); + } + if (picked_segments.empty()) { continue; } - auto iter = segment_caches.find(rs.first->rowset_id()); + + auto iter = segment_caches.find(rs->rowset_id()); if (iter == segment_caches.end()) { SegmentCacheHandle segment_cache_handle; RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( - std::static_pointer_cast<BetaRowset>(rs.first), &segment_cache_handle, true)); - iter = segment_caches.emplace(rs.first->rowset_id(), std::move(segment_cache_handle)) - .first; + std::static_pointer_cast<BetaRowset>(rs), &segment_cache_handle, true)); + iter = segment_caches.emplace(rs->rowset_id(), std::move(segment_cache_handle)).first; } auto& segments = iter->second.get_segments(); - DCHECK_GT(segments.size(), rs.second); - Status s = segments[rs.second]->lookup_row_key(encoded_key, with_seq_col, &loc); - if (s.is<NOT_FOUND>()) { - continue; - } - if (!s.ok()) { - return s; - } - loc.rowset_id = rs.first->rowset_id(); - if (_tablet_meta->delete_bitmap().contains_agg_without_cache( - {loc.rowset_id, loc.segment_id, version}, loc.row_id)) { - // if has sequence col, we continue to compare the sequence_id of - // all rowsets, util we find an existing key. - if (_schema->has_sequence_col()) { + DCHECK_EQ(segments.size(), num_segments); + + for (auto id : picked_segments) { + Status s = segments[id]->lookup_row_key(encoded_key, with_seq_col, &loc); + if (s.is<NOT_FOUND>()) { continue; } - // The key is deleted, we don't need to search for it any more. - break; - } - *row_location = loc; - if (rowset) { - // return it's rowset - *rowset = rs.first; + if (!s.ok()) { + return s; + } + if (_tablet_meta->delete_bitmap().contains_agg_without_cache( + {loc.rowset_id, loc.segment_id, version}, loc.row_id)) { + // if has sequence col, we continue to compare the sequence_id of + // all rowsets, util we find an existing key. + if (_schema->has_sequence_col()) { + continue; + } + // The key is deleted, we don't need to search for it any more. + break; + } + *row_location = loc; + if (rowset) { + // return it's rowset + *rowset = rs; + } + // find it and return + return s; } - // find it and return - return s; } return Status::NotFound("can't find key in all rowsets"); } @@ -2837,7 +2804,7 @@ void Tablet::sort_block(vectorized::Block& in_block, vectorized::Block& output_b Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, const segment_v2::SegmentSharedPtr& seg, - const RowsetIdUnorderedSet* specified_rowset_ids, + const std::vector<RowsetSharedPtr>& specified_rowsets, DeleteBitmapPtr delete_bitmap, int64_t end_version, RowsetWriter* rowset_writer) { OlapStopWatch watch; @@ -2900,52 +2867,49 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, continue; } - if (specified_rowset_ids != nullptr && !specified_rowset_ids->empty()) { - RowsetSharedPtr rowset_find; - auto st = lookup_row_key(key, true, specified_rowset_ids, &loc, - dummy_version.first - 1, segment_caches, &rowset_find); - bool expected_st = st.ok() || st.is<NOT_FOUND>() || st.is<ALREADY_EXIST>(); - DCHECK(expected_st) << "unexpected error status while lookup_row_key:" << st; - if (!expected_st) { - return st; - } - if (st.is<NOT_FOUND>()) { - ++row_id; - continue; - } + RowsetSharedPtr rowset_find; + auto st = lookup_row_key(key, true, specified_rowsets, &loc, dummy_version.first - 1, + segment_caches, &rowset_find); + bool expected_st = st.ok() || st.is<NOT_FOUND>() || st.is<ALREADY_EXIST>(); + DCHECK(expected_st) << "unexpected error status while lookup_row_key:" << st; + if (!expected_st) { + return st; + } + if (st.is<NOT_FOUND>()) { + ++row_id; + continue; + } - // sequence id smaller than the previous one, so delete current row - if (st.is<ALREADY_EXIST>()) { - delete_bitmap->add({rowset_id, seg->id(), 0}, row_id); - continue; - } else if (is_partial_update && rowset_writer != nullptr) { - // In publish version, record rows to be deleted for concurrent update - // For example, if version 5 and 6 update a row, but version 6 only see - // version 4 when write, and when publish version, version 5's value will - // be marked as deleted and it's update is losed. - // So here we should read version 5's columns and build a new row, which is - // consists of version 6's update columns and version 5's origin columns - // here we build 2 read plan for ori values and update values - prepare_to_read(loc, pos, &read_plan_ori); - prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, - &read_plan_update); - rsid_to_rowset[rowset_find->rowset_id()] = rowset_find; - ++pos; - // delete bitmap will be calculate when memtable flush and - // publish. The two stages may see different versions. - // When there is sequence column, the currently imported data - // of rowset may be marked for deletion at memtablet flush or - // publish because the seq column is smaller than the previous - // rowset. - // just set 0 as a unified temporary version number, and update to - // the real version number later. - delete_bitmap->add({loc.rowset_id, loc.segment_id, 0}, loc.row_id); - delete_bitmap->add({rowset_id, seg->id(), 0}, row_id); - continue; - } - // when st = ok + // sequence id smaller than the previous one, so delete current row + if (st.is<ALREADY_EXIST>()) { + delete_bitmap->add({rowset_id, seg->id(), 0}, row_id); + continue; + } else if (is_partial_update && rowset_writer != nullptr) { + // In publish version, record rows to be deleted for concurrent update + // For example, if version 5 and 6 update a row, but version 6 only see + // version 4 when write, and when publish version, version 5's value will + // be marked as deleted and it's update is losed. + // So here we should read version 5's columns and build a new row, which is + // consists of version 6's update columns and version 5's origin columns + // here we build 2 read plan for ori values and update values + prepare_to_read(loc, pos, &read_plan_ori); + prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, &read_plan_update); + rsid_to_rowset[rowset_find->rowset_id()] = rowset_find; + ++pos; + // delete bitmap will be calculate when memtable flush and + // publish. The two stages may see different versions. + // When there is sequence column, the currently imported data + // of rowset may be marked for deletion at memtablet flush or + // publish because the seq column is smaller than the previous + // rowset. + // just set 0 as a unified temporary version number, and update to + // the real version number later. delete_bitmap->add({loc.rowset_id, loc.segment_id, 0}, loc.row_id); + delete_bitmap->add({rowset_id, seg->id(), 0}, row_id); + continue; } + // when st = ok + delete_bitmap->add({loc.rowset_id, loc.segment_id, 0}, loc.row_id); ++row_id; } remaining -= num_read; @@ -2959,18 +2923,19 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, } LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id() << " rowset: " << rowset_id << " seg_id: " << seg->id() << " dummy_version: " << end_version + 1 - << " rows: " << seg->num_rows() << " cost: " << watch.get_elapse_time_us() << "(us)"; + << " rows: " << seg->num_rows() + << " bimap num: " << delete_bitmap->delete_bitmap.size() + << " cost: " << watch.get_elapse_time_us() << "(us)"; return Status::OK(); } -// caller should hold meta_lock Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments, - const RowsetIdUnorderedSet* specified_rowset_ids, + const std::vector<RowsetSharedPtr>& specified_rowsets, DeleteBitmapPtr delete_bitmap, int64_t end_version, RowsetWriter* rowset_writer) { auto rowset_id = rowset->rowset_id(); - if (specified_rowset_ids == nullptr || specified_rowset_ids->empty() || segments.empty()) { + if (specified_rowsets.empty() || segments.empty()) { LOG(INFO) << "skip to construct delete bitmap tablet: " << tablet_id() << " rowset: " << rowset_id; return Status::OK(); @@ -2987,8 +2952,8 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset, DeleteBitmapPtr seg_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id()); seg_delete_bitmaps.push_back(seg_delete_bitmap); RETURN_IF_ERROR(token->submit_func([=, &calc_status, this]() { - auto st = calc_segment_delete_bitmap(rowset, seg, specified_rowset_ids, - seg_delete_bitmap, end_version, rowset_writer); + auto st = calc_segment_delete_bitmap(rowset, seg, specified_rowsets, seg_delete_bitmap, + end_version, rowset_writer); if (!st.ok()) { LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: " << tablet_id() << " rowset: " << rowset_id << " seg_id: " << seg->id() @@ -2999,7 +2964,7 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset, } // this thread calc delete bitmap of segment 0 - RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[0], specified_rowset_ids, + RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[0], specified_rowsets, delete_bitmap, end_version, rowset_writer)); token->wait(); auto code = calc_status.load(); @@ -3012,6 +2977,21 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset, return Status::OK(); } +std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids( + const RowsetIdUnorderedSet* specified_rowset_ids) { + std::vector<RowsetSharedPtr> rowsets; + for (auto& rs : _rs_version_map) { + if (!specified_rowset_ids || + specified_rowset_ids->find(rs.second->rowset_id()) != specified_rowset_ids->end()) { + rowsets.push_back(rs.second); + } + } + std::sort(rowsets.begin(), rowsets.end(), [](RowsetSharedPtr& lhs, RowsetSharedPtr& rhs) { + return lhs->end_version() > rhs->end_version(); + }); + return rowsets; +} + Status Tablet::generate_new_block_for_partial_update( TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, @@ -3166,9 +3146,10 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id()); RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); + std::vector<RowsetSharedPtr> specified_rowsets = get_rowset_by_ids(&cur_rowset_ids); OlapStopWatch watch; - RETURN_IF_ERROR( - calc_delete_bitmap(rowset, segments, &cur_rowset_ids, delete_bitmap, cur_version - 1)); + RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, + cur_version - 1)); size_t total_rows = std::accumulate( segments.begin(), segments.end(), 0, [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); @@ -3176,7 +3157,6 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) << ", rowset_ids: " << cur_rowset_ids.size() << ", cur max_version: " << cur_version << ", transaction_id: " << -1 << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows; - for (auto iter = delete_bitmap->delete_bitmap.begin(); iter != delete_bitmap->delete_bitmap.end(); ++iter) { _tablet_meta->delete_bitmap().merge( @@ -3187,23 +3167,33 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) } Status Tablet::commit_phase_update_delete_bitmap( - const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids, - DeleteBitmapPtr delete_bitmap, const int64_t& cur_version, - const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id, - RowsetWriter* rowset_writer) { + const RowsetSharedPtr& rowset, RowsetIdUnorderedSet& pre_rowset_ids, + DeleteBitmapPtr delete_bitmap, const std::vector<segment_v2::SegmentSharedPtr>& segments, + int64_t txn_id, RowsetWriter* rowset_writer) { RowsetIdUnorderedSet cur_rowset_ids; RowsetIdUnorderedSet rowset_ids_to_add; RowsetIdUnorderedSet rowset_ids_to_del; + int64_t cur_version; - std::shared_lock meta_rlock(_meta_lock); - cur_rowset_ids = all_rs_id(cur_version); - _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del); + std::vector<RowsetSharedPtr> specified_rowsets; + { + std::shared_lock meta_rlock(_meta_lock); + cur_version = max_version().second; + cur_rowset_ids = all_rs_id(cur_version); + _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, + &rowset_ids_to_del); + if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) { + LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size() + << ", rowset_ids_to_del: " << rowset_ids_to_del.size(); + } + specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add); + } for (const auto& to_del : rowset_ids_to_del) { delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX}); } OlapStopWatch watch; - RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add, delete_bitmap, + RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, cur_version, rowset_writer)); size_t total_rows = std::accumulate( segments.begin(), segments.end(), 0, @@ -3213,6 +3203,7 @@ Status Tablet::commit_phase_update_delete_bitmap( << ", rowset_ids to del: " << rowset_ids_to_del.size() << ", cur max_version: " << cur_version << ", transaction_id: " << txn_id << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows; + pre_rowset_ids = cur_rowset_ids; return Status::OK(); } @@ -3229,22 +3220,30 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, _load_rowset_segments(rowset, &segments); std::lock_guard<std::mutex> rwlock(_rowset_update_lock); - std::shared_lock meta_rlock(_meta_lock); - // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (tablet_state() == TABLET_NOTREADY && - SchemaChangeHandler::tablet_in_converting(tablet_id())) { - LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id=" - << tablet_id(); - return Status::OK(); + { + std::shared_lock meta_rlock(_meta_lock); + // tablet is under alter process. The delete bitmap will be calculated after conversion. + if (tablet_state() == TABLET_NOTREADY && + SchemaChangeHandler::tablet_in_converting(tablet_id())) { + LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id=" + << tablet_id(); + return Status::OK(); + } + cur_rowset_ids = all_rs_id(cur_version - 1); } - cur_rowset_ids = all_rs_id(cur_version - 1); _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del); for (const auto& to_del : rowset_ids_to_del) { delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX}); } + std::vector<RowsetSharedPtr> specified_rowsets; + { + std::shared_lock meta_rlock(_meta_lock); + specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add); + } + OlapStopWatch watch; - RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add, delete_bitmap, + RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, cur_version - 1, rowset_writer)); size_t total_rows = std::accumulate( segments.begin(), segments.end(), 0, diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 3c3a010397..7cccb03a31 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -47,7 +47,6 @@ #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_meta.h" #include "olap/rowset/rowset_reader.h" -#include "olap/rowset/rowset_tree.h" #include "olap/rowset/segment_v2/segment.h" #include "olap/tablet_meta.h" #include "olap/tablet_schema.h" @@ -401,8 +400,9 @@ public: // NOTE: the method only works in unique key model with primary key index, you will got a // not supported error in other data model. Status lookup_row_key( - const Slice& encoded_key, bool with_seq_col, const RowsetIdUnorderedSet* rowset_ids, - RowLocation* row_location, uint32_t version, + const Slice& encoded_key, bool with_seq_col, + const std::vector<RowsetSharedPtr>& specified_rowsets, RowLocation* row_location, + uint32_t version, std::unordered_map<RowsetId, SegmentCacheHandle, HashOfRowsetId>& segment_caches, RowsetSharedPtr* rowset = nullptr); @@ -429,14 +429,19 @@ public: // and build newly generated rowset's delete_bitmap Status calc_delete_bitmap(RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments, - const RowsetIdUnorderedSet* specified_rowset_ids, + const std::vector<RowsetSharedPtr>& specified_rowsets, DeleteBitmapPtr delete_bitmap, int64_t version, RowsetWriter* rowset_writer = nullptr); + + std::vector<RowsetSharedPtr> get_rowset_by_ids( + const RowsetIdUnorderedSet* specified_rowset_ids); + Status calc_segment_delete_bitmap(RowsetSharedPtr rowset, const segment_v2::SegmentSharedPtr& seg, - const RowsetIdUnorderedSet* specified_rowset_ids, + const std::vector<RowsetSharedPtr>& specified_rowsets, DeleteBitmapPtr delete_bitmap, int64_t end_version, RowsetWriter* rowset_writer); + Status calc_delete_bitmap_between_segments( RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments, DeleteBitmapPtr delete_bitmap); @@ -456,8 +461,8 @@ public: Status update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset); Status commit_phase_update_delete_bitmap( - const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids, - DeleteBitmapPtr delete_bitmap, const int64_t& cur_version, + const RowsetSharedPtr& rowset, RowsetIdUnorderedSet& pre_rowset_ids, + DeleteBitmapPtr delete_bitmap, const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id, RowsetWriter* rowset_writer = nullptr); @@ -605,9 +610,6 @@ private: // These _stale rowsets are been removed when rowsets' pathVersion is expired, // this policy is judged and computed by TimestampedVersionTracker. std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _stale_rs_version_map; - // RowsetTree is used to locate rowsets containing a key or a key range quickly. - // It's only used in UNIQUE_KEYS data model. - std::unique_ptr<RowsetTree> _rowset_tree; // if this tablet is broken, set to true. default is false std::atomic<bool> _is_bad; // timestamp of last cumu compaction failure diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 4f08e50a79..9641e6e61a 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -942,6 +942,7 @@ bool DeleteBitmap::contains_agg(const BitmapKey& bmk, uint32_t row_id) const { } bool DeleteBitmap::contains_agg_without_cache(const BitmapKey& bmk, uint32_t row_id) const { + std::shared_lock l(lock); DeleteBitmap::BitmapKey start {std::get<0>(bmk), std::get<1>(bmk), 0}; for (auto it = delete_bitmap.lower_bound(start); it != delete_bitmap.end(); ++it) { auto& [k, bm] = *it; diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 0e8acf62fe..255d4c9a2c 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -246,6 +246,11 @@ Status PointQueryExecutor::_lookup_row_key() { // 2. lookup row location Status st; std::unordered_map<RowsetId, SegmentCacheHandle, HashOfRowsetId> segment_caches; + std::vector<RowsetSharedPtr> specified_rowsets; + { + std::shared_lock rlock(_tablet->get_header_lock()); + specified_rowsets = _tablet->get_rowset_by_ids(nullptr); + } for (size_t i = 0; i < _row_read_ctxs.size(); ++i) { RowLocation location; if (!config::disable_storage_row_cache) { @@ -260,8 +265,9 @@ Status PointQueryExecutor::_lookup_row_key() { } // Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this ptr auto rowset_ptr = std::make_unique<RowsetSharedPtr>(); - st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, true, nullptr, &location, - INT32_MAX /*rethink?*/, segment_caches, rowset_ptr.get())); + st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, true, specified_rowsets, + &location, INT32_MAX /*rethink?*/, segment_caches, + rowset_ptr.get())); if (st.is_not_found()) { continue; } diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp index f2ee6b2d96..3ae55f9743 100644 --- a/be/test/olap/tablet_test.cpp +++ b/be/test/olap/tablet_test.cpp @@ -392,66 +392,4 @@ TEST_F(TestTablet, cooldown_policy) { } } -TEST_F(TestTablet, rowset_tree_update) { - TTabletSchema tschema; - tschema.keys_type = TKeysType::UNIQUE_KEYS; - TabletMetaSharedPtr tablet_meta = new_tablet_meta(tschema, true); - TabletSharedPtr tablet(new Tablet(tablet_meta, nullptr)); - RowsetIdUnorderedSet rowset_ids; - tablet->init(); - - RowsetMetaSharedPtr rsm1(new RowsetMeta()); - init_rs_meta(rsm1, 6, 7, convert_key_bounds({{"100", "200"}, {"300", "400"}})); - RowsetId id1; - id1.init(10010); - RowsetSharedPtr rs_ptr1; - MockRowset::create_rowset(tablet->tablet_schema(), "", rsm1, &rs_ptr1, false); - tablet->add_inc_rowset(rs_ptr1); - rowset_ids.insert(id1); - - RowsetMetaSharedPtr rsm2(new RowsetMeta()); - init_rs_meta(rsm2, 8, 8, convert_key_bounds({{"500", "999"}})); - RowsetId id2; - id2.init(10086); - rsm2->set_rowset_id(id2); - RowsetSharedPtr rs_ptr2; - MockRowset::create_rowset(tablet->tablet_schema(), "", rsm2, &rs_ptr2, false); - tablet->add_inc_rowset(rs_ptr2); - rowset_ids.insert(id2); - - RowsetId id3; - id3.init(540081); - rowset_ids.insert(id3); - - std::unordered_map<RowsetId, SegmentCacheHandle, HashOfRowsetId> segment_caches; - RowLocation loc; - // Key not in range. - ASSERT_TRUE(tablet->lookup_row_key("99", true, &rowset_ids, &loc, 7, segment_caches) - .is<ErrorCode::NOT_FOUND>()); - // Version too low. - ASSERT_TRUE(tablet->lookup_row_key("101", true, &rowset_ids, &loc, 3, segment_caches) - .is<ErrorCode::NOT_FOUND>()); - // Hit a segment, but since we don't have real data, return an internal error when loading the - // segment. - LOG(INFO) << tablet->lookup_row_key("101", true, &rowset_ids, &loc, 7, segment_caches) - .to_string(); - ASSERT_TRUE(tablet->lookup_row_key("101", true, &rowset_ids, &loc, 7, segment_caches) - .is<ErrorCode::IO_ERROR>()); - // Key not in range. - ASSERT_TRUE(tablet->lookup_row_key("201", true, &rowset_ids, &loc, 7, segment_caches) - .is<ErrorCode::NOT_FOUND>()); - ASSERT_TRUE(tablet->lookup_row_key("300", true, &rowset_ids, &loc, 7, segment_caches) - .is<ErrorCode::IO_ERROR>()); - // Key not in range. - ASSERT_TRUE(tablet->lookup_row_key("499", true, &rowset_ids, &loc, 7, segment_caches) - .is<ErrorCode::NOT_FOUND>()); - // Version too low. - ASSERT_TRUE(tablet->lookup_row_key("500", true, &rowset_ids, &loc, 7, segment_caches) - .is<ErrorCode::NOT_FOUND>()); - // Hit a segment, but since we don't have real data, return an internal error when loading the - // segment. - ASSERT_TRUE(tablet->lookup_row_key("500", true, &rowset_ids, &loc, 8, segment_caches) - .is<ErrorCode::IO_ERROR>()); -} - } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org