This is an automated email from the ASF dual-hosted git repository. jianliangqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a1799e5506 [improve](point query) reuse rowset from lookup_row_key to eliminate tablet lock (#16770) a1799e5506 is described below commit a1799e5506d7d5f07b12f860d61c490c57f24cf2 Author: lihangyu <15605149...@163.com> AuthorDate: Mon Feb 20 18:38:11 2023 +0800 [improve](point query) reuse rowset from lookup_row_key to eliminate tablet lock (#16770) Reuse rowset for 2 reasons: 1. eliminate tablet lock for performance issue, if other thread hold the lock too long could affect point query latency 2. rowset should be acquired during lookup procedure --- be/src/olap/tablet.cpp | 14 ++++++---- be/src/olap/tablet.h | 7 ++--- be/src/service/point_query_executor.cpp | 46 +++++++++++++++++++-------------- be/src/service/point_query_executor.h | 24 ++++++++++++++--- 4 files changed, 60 insertions(+), 31 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 01087a2a01..8cbafa7f6e 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2223,11 +2223,10 @@ TabletSchemaSPtr Tablet::get_max_version_schema(std::lock_guard<std::shared_mute } Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation& row_location, - const TupleDescriptor* desc, vectorized::Block* block, - bool write_to_cache) { + RowsetSharedPtr input_rowset, const TupleDescriptor* desc, + vectorized::Block* block, bool write_to_cache) { // read row data - BetaRowsetSharedPtr rowset = - std::static_pointer_cast<BetaRowset>(get_rowset(row_location.rowset_id)); + BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(input_rowset); if (!rowset) { return Status::NotFound( fmt::format("rowset {} not found", row_location.rowset_id.to_string())); @@ -2286,7 +2285,8 @@ Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation& row_ } Status Tablet::lookup_row_key(const Slice& encoded_key, const RowsetIdUnorderedSet* rowset_ids, - RowLocation* row_location, uint32_t version) { + RowLocation* row_location, uint32_t version, + RowsetSharedPtr* rowset) { std::vector<std::pair<RowsetSharedPtr, int32_t>> selected_rs; size_t seq_col_length = 0; if (_schema->has_sequence_col()) { @@ -2335,6 +2335,10 @@ Status Tablet::lookup_row_key(const Slice& encoded_key, const RowsetIdUnorderedS break; } *row_location = loc; + if (rowset) { + // return it's rowset + *rowset = rs.first; + } // find it and return return s; } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 34fe9c32fe..82f42dccdd 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -341,12 +341,13 @@ public: // NOTE: the method only works in unique key model with primary key index, you will got a // not supported error in other data model. Status lookup_row_key(const Slice& encoded_key, const RowsetIdUnorderedSet* rowset_ids, - RowLocation* row_location, uint32_t version); + RowLocation* row_location, uint32_t version, + RowsetSharedPtr* rowset = nullptr); // Lookup a row with TupleDescriptor and fill Block Status lookup_row_data(const Slice& encoded_key, const RowLocation& row_location, - const TupleDescriptor* desc, vectorized::Block* block, - bool write_to_cache = false); + RowsetSharedPtr rowset, const TupleDescriptor* desc, + vectorized::Block* block, bool write_to_cache = false); // calc delete bitmap when flush memtable, use a fake version to calc // For example, cur max version is 5, and we use version 6 to calc but diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 9c70e08061..e33a7a186b 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -195,7 +195,7 @@ std::string PointQueryExecutor::print_profile() { "", total_us, init_us, init_key_us, lookup_key_us, lookup_data_us, output_data_us, _hit_lookup_cache, _binary_row_format, _reusable->output_exprs().size(), - _primary_keys.size(), _row_cache_hits); + _row_read_ctxs.size(), _row_cache_hits); } Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) { @@ -209,43 +209,49 @@ Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) { olap_tuples[i].add_value(key_col); } } - _primary_keys.resize(olap_tuples.size()); + _row_read_ctxs.resize(olap_tuples.size()); // get row cursor and encode keys for (size_t i = 0; i < olap_tuples.size(); ++i) { RowCursor cursor; RETURN_IF_ERROR(cursor.init_scan_key(_tablet->tablet_schema(), olap_tuples[i].values())); RETURN_IF_ERROR(cursor.from_tuple(olap_tuples[i])); - encode_key_with_padding<RowCursor, true, true>( - &_primary_keys[i], cursor, _tablet->tablet_schema()->num_key_columns(), true); + encode_key_with_padding<RowCursor, true, true>(&_row_read_ctxs[i]._primary_key, cursor, + _tablet->tablet_schema()->num_key_columns(), + true); } return Status::OK(); } Status PointQueryExecutor::_lookup_row_key() { SCOPED_TIMER(&_profile_metrics.lookup_key_ns); - _row_locations.resize(_primary_keys.size()); - _cached_row_data.resize(_primary_keys.size()); // 2. lookup row location Status st; - for (size_t i = 0; i < _primary_keys.size(); ++i) { + for (size_t i = 0; i < _row_read_ctxs.size(); ++i) { RowLocation location; if (!config::disable_storage_row_cache) { RowCache::CacheHandle cache_handle; - auto hit_cache = RowCache::instance()->lookup({_tablet->tablet_id(), _primary_keys[i]}, - &cache_handle); + auto hit_cache = RowCache::instance()->lookup( + {_tablet->tablet_id(), _row_read_ctxs[i]._primary_key}, &cache_handle); if (hit_cache) { - _cached_row_data[i] = std::move(cache_handle); + _row_read_ctxs[i]._cached_row_data = std::move(cache_handle); ++_row_cache_hits; continue; } } - st = (_tablet->lookup_row_key(_primary_keys[i], nullptr, &location, - INT32_MAX /*rethink?*/)); + // Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this ptr + auto rowset_ptr = std::make_unique<RowsetSharedPtr>(); + st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr, &location, + INT32_MAX /*rethink?*/, rowset_ptr.get())); if (st.is_not_found()) { continue; } RETURN_IF_ERROR(st); - _row_locations[i] = location; + _row_read_ctxs[i]._row_location = location; + // acquire and wrap this rowset + (*rowset_ptr)->acquire(); + VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->unique_id(); + _row_read_ctxs[i]._rowset_ptr = std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)>( + rowset_ptr.release(), &release_rowset); } return Status::OK(); } @@ -253,19 +259,19 @@ Status PointQueryExecutor::_lookup_row_key() { Status PointQueryExecutor::_lookup_row_data() { // 3. get values SCOPED_TIMER(&_profile_metrics.lookup_data_ns); - for (size_t i = 0; i < _row_locations.size(); ++i) { - if (_cached_row_data[i].valid()) { + for (size_t i = 0; i < _row_read_ctxs.size(); ++i) { + if (_row_read_ctxs[i]._cached_row_data.valid()) { vectorized::JsonbSerializeUtil::jsonb_to_block( - *_reusable->tuple_desc(), _cached_row_data[i].data().data, - _cached_row_data[i].data().size, *_result_block); + *_reusable->tuple_desc(), _row_read_ctxs[i]._cached_row_data.data().data, + _row_read_ctxs[i]._cached_row_data.data().size, *_result_block); continue; } - if (!_row_locations[i].has_value()) { + if (!_row_read_ctxs[i]._row_location.has_value()) { continue; } RETURN_IF_ERROR(_tablet->lookup_row_data( - _primary_keys[i], _row_locations[i].value(), _reusable->tuple_desc(), - _result_block.get(), + _row_read_ctxs[i]._primary_key, _row_read_ctxs[i]._row_location.value(), + *(_row_read_ctxs[i]._rowset_ptr), _reusable->tuple_desc(), _result_block.get(), !config::disable_storage_row_cache /*whether write row cache*/)); } return Status::OK(); diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index 1169817019..9dd5a36822 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -23,6 +23,7 @@ #include "common/status.h" #include "gen_cpp/internal_service.pb.h" #include "gutil/int128.h" +#include "olap/rowset/rowset.h" #include "olap/tablet.h" #include "util/runtime_profile.h" #include "vec/core/block.h" @@ -253,11 +254,28 @@ private: Status _output_data(); + static void release_rowset(RowsetSharedPtr* r) { + if (r && *r) { + VLOG_DEBUG << "release rowset " << (*r)->unique_id(); + (*r)->release(); + } + delete r; + } + + // Read context for each row + struct RowReadContext { + RowReadContext() : _rowset_ptr(nullptr, &release_rowset) {} + std::string _primary_key; + RowCache::CacheHandle _cached_row_data; + std::optional<RowLocation> _row_location; + // rowset will be aquired during read + // and released after used + std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)> _rowset_ptr; + }; + PTabletKeyLookupResponse* _response; TabletSharedPtr _tablet; - std::vector<std::string> _primary_keys; - std::vector<RowCache::CacheHandle> _cached_row_data; - std::vector<std::optional<RowLocation>> _row_locations; + std::vector<RowReadContext> _row_read_ctxs; std::shared_ptr<Reusable> _reusable; std::unique_ptr<vectorized::Block> _result_block; Metrics _profile_metrics; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org