This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a1799e5506 [improve](point query) reuse rowset from lookup_row_key to 
eliminate tablet lock (#16770)
a1799e5506 is described below

commit a1799e5506d7d5f07b12f860d61c490c57f24cf2
Author: lihangyu <15605149...@163.com>
AuthorDate: Mon Feb 20 18:38:11 2023 +0800

    [improve](point query) reuse rowset from lookup_row_key to eliminate tablet 
lock (#16770)
    
    Reuse rowset for 2 reasons:
    1. eliminate tablet lock for performance issue, if other thread hold the 
lock too long could affect point query latency
    2. rowset should be acquired during lookup procedure
---
 be/src/olap/tablet.cpp                  | 14 ++++++----
 be/src/olap/tablet.h                    |  7 ++---
 be/src/service/point_query_executor.cpp | 46 +++++++++++++++++++--------------
 be/src/service/point_query_executor.h   | 24 ++++++++++++++---
 4 files changed, 60 insertions(+), 31 deletions(-)

diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 01087a2a01..8cbafa7f6e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2223,11 +2223,10 @@ TabletSchemaSPtr 
Tablet::get_max_version_schema(std::lock_guard<std::shared_mute
 }
 
 Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation& 
row_location,
-                               const TupleDescriptor* desc, vectorized::Block* 
block,
-                               bool write_to_cache) {
+                               RowsetSharedPtr input_rowset, const 
TupleDescriptor* desc,
+                               vectorized::Block* block, bool write_to_cache) {
     // read row data
-    BetaRowsetSharedPtr rowset =
-            
std::static_pointer_cast<BetaRowset>(get_rowset(row_location.rowset_id));
+    BetaRowsetSharedPtr rowset = 
std::static_pointer_cast<BetaRowset>(input_rowset);
     if (!rowset) {
         return Status::NotFound(
                 fmt::format("rowset {} not found", 
row_location.rowset_id.to_string()));
@@ -2286,7 +2285,8 @@ Status Tablet::lookup_row_data(const Slice& encoded_key, 
const RowLocation& row_
 }
 
 Status Tablet::lookup_row_key(const Slice& encoded_key, const 
RowsetIdUnorderedSet* rowset_ids,
-                              RowLocation* row_location, uint32_t version) {
+                              RowLocation* row_location, uint32_t version,
+                              RowsetSharedPtr* rowset) {
     std::vector<std::pair<RowsetSharedPtr, int32_t>> selected_rs;
     size_t seq_col_length = 0;
     if (_schema->has_sequence_col()) {
@@ -2335,6 +2335,10 @@ Status Tablet::lookup_row_key(const Slice& encoded_key, 
const RowsetIdUnorderedS
             break;
         }
         *row_location = loc;
+        if (rowset) {
+            // return it's rowset
+            *rowset = rs.first;
+        }
         // find it and return
         return s;
     }
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 34fe9c32fe..82f42dccdd 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -341,12 +341,13 @@ public:
     // NOTE: the method only works in unique key model with primary key index, 
you will got a
     //       not supported error in other data model.
     Status lookup_row_key(const Slice& encoded_key, const 
RowsetIdUnorderedSet* rowset_ids,
-                          RowLocation* row_location, uint32_t version);
+                          RowLocation* row_location, uint32_t version,
+                          RowsetSharedPtr* rowset = nullptr);
 
     // Lookup a row with TupleDescriptor and fill Block
     Status lookup_row_data(const Slice& encoded_key, const RowLocation& 
row_location,
-                           const TupleDescriptor* desc, vectorized::Block* 
block,
-                           bool write_to_cache = false);
+                           RowsetSharedPtr rowset, const TupleDescriptor* desc,
+                           vectorized::Block* block, bool write_to_cache = 
false);
 
     // calc delete bitmap when flush memtable, use a fake version to calc
     // For example, cur max version is 5, and we use version 6 to calc but
diff --git a/be/src/service/point_query_executor.cpp 
b/be/src/service/point_query_executor.cpp
index 9c70e08061..e33a7a186b 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -195,7 +195,7 @@ std::string PointQueryExecutor::print_profile() {
             "",
             total_us, init_us, init_key_us, lookup_key_us, lookup_data_us, 
output_data_us,
             _hit_lookup_cache, _binary_row_format, 
_reusable->output_exprs().size(),
-            _primary_keys.size(), _row_cache_hits);
+            _row_read_ctxs.size(), _row_cache_hits);
 }
 
 Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) {
@@ -209,43 +209,49 @@ Status PointQueryExecutor::_init_keys(const 
PTabletKeyLookupRequest* request) {
             olap_tuples[i].add_value(key_col);
         }
     }
-    _primary_keys.resize(olap_tuples.size());
+    _row_read_ctxs.resize(olap_tuples.size());
     // get row cursor and encode keys
     for (size_t i = 0; i < olap_tuples.size(); ++i) {
         RowCursor cursor;
         RETURN_IF_ERROR(cursor.init_scan_key(_tablet->tablet_schema(), 
olap_tuples[i].values()));
         RETURN_IF_ERROR(cursor.from_tuple(olap_tuples[i]));
-        encode_key_with_padding<RowCursor, true, true>(
-                &_primary_keys[i], cursor, 
_tablet->tablet_schema()->num_key_columns(), true);
+        encode_key_with_padding<RowCursor, true, 
true>(&_row_read_ctxs[i]._primary_key, cursor,
+                                                       
_tablet->tablet_schema()->num_key_columns(),
+                                                       true);
     }
     return Status::OK();
 }
 
 Status PointQueryExecutor::_lookup_row_key() {
     SCOPED_TIMER(&_profile_metrics.lookup_key_ns);
-    _row_locations.resize(_primary_keys.size());
-    _cached_row_data.resize(_primary_keys.size());
     // 2. lookup row location
     Status st;
-    for (size_t i = 0; i < _primary_keys.size(); ++i) {
+    for (size_t i = 0; i < _row_read_ctxs.size(); ++i) {
         RowLocation location;
         if (!config::disable_storage_row_cache) {
             RowCache::CacheHandle cache_handle;
-            auto hit_cache = 
RowCache::instance()->lookup({_tablet->tablet_id(), _primary_keys[i]},
-                                                          &cache_handle);
+            auto hit_cache = RowCache::instance()->lookup(
+                    {_tablet->tablet_id(), _row_read_ctxs[i]._primary_key}, 
&cache_handle);
             if (hit_cache) {
-                _cached_row_data[i] = std::move(cache_handle);
+                _row_read_ctxs[i]._cached_row_data = std::move(cache_handle);
                 ++_row_cache_hits;
                 continue;
             }
         }
-        st = (_tablet->lookup_row_key(_primary_keys[i], nullptr, &location,
-                                      INT32_MAX /*rethink?*/));
+        // Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this 
ptr
+        auto rowset_ptr = std::make_unique<RowsetSharedPtr>();
+        st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr, 
&location,
+                                      INT32_MAX /*rethink?*/, 
rowset_ptr.get()));
         if (st.is_not_found()) {
             continue;
         }
         RETURN_IF_ERROR(st);
-        _row_locations[i] = location;
+        _row_read_ctxs[i]._row_location = location;
+        // acquire and wrap this rowset
+        (*rowset_ptr)->acquire();
+        VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->unique_id();
+        _row_read_ctxs[i]._rowset_ptr = std::unique_ptr<RowsetSharedPtr, 
decltype(&release_rowset)>(
+                rowset_ptr.release(), &release_rowset);
     }
     return Status::OK();
 }
@@ -253,19 +259,19 @@ Status PointQueryExecutor::_lookup_row_key() {
 Status PointQueryExecutor::_lookup_row_data() {
     // 3. get values
     SCOPED_TIMER(&_profile_metrics.lookup_data_ns);
-    for (size_t i = 0; i < _row_locations.size(); ++i) {
-        if (_cached_row_data[i].valid()) {
+    for (size_t i = 0; i < _row_read_ctxs.size(); ++i) {
+        if (_row_read_ctxs[i]._cached_row_data.valid()) {
             vectorized::JsonbSerializeUtil::jsonb_to_block(
-                    *_reusable->tuple_desc(), _cached_row_data[i].data().data,
-                    _cached_row_data[i].data().size, *_result_block);
+                    *_reusable->tuple_desc(), 
_row_read_ctxs[i]._cached_row_data.data().data,
+                    _row_read_ctxs[i]._cached_row_data.data().size, 
*_result_block);
             continue;
         }
-        if (!_row_locations[i].has_value()) {
+        if (!_row_read_ctxs[i]._row_location.has_value()) {
             continue;
         }
         RETURN_IF_ERROR(_tablet->lookup_row_data(
-                _primary_keys[i], _row_locations[i].value(), 
_reusable->tuple_desc(),
-                _result_block.get(),
+                _row_read_ctxs[i]._primary_key, 
_row_read_ctxs[i]._row_location.value(),
+                *(_row_read_ctxs[i]._rowset_ptr), _reusable->tuple_desc(), 
_result_block.get(),
                 !config::disable_storage_row_cache /*whether write row 
cache*/));
     }
     return Status::OK();
diff --git a/be/src/service/point_query_executor.h 
b/be/src/service/point_query_executor.h
index 1169817019..9dd5a36822 100644
--- a/be/src/service/point_query_executor.h
+++ b/be/src/service/point_query_executor.h
@@ -23,6 +23,7 @@
 #include "common/status.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "gutil/int128.h"
+#include "olap/rowset/rowset.h"
 #include "olap/tablet.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
@@ -253,11 +254,28 @@ private:
 
     Status _output_data();
 
+    static void release_rowset(RowsetSharedPtr* r) {
+        if (r && *r) {
+            VLOG_DEBUG << "release rowset " << (*r)->unique_id();
+            (*r)->release();
+        }
+        delete r;
+    }
+
+    // Read context for each row
+    struct RowReadContext {
+        RowReadContext() : _rowset_ptr(nullptr, &release_rowset) {}
+        std::string _primary_key;
+        RowCache::CacheHandle _cached_row_data;
+        std::optional<RowLocation> _row_location;
+        // rowset will be aquired during read
+        // and released after used
+        std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)> 
_rowset_ptr;
+    };
+
     PTabletKeyLookupResponse* _response;
     TabletSharedPtr _tablet;
-    std::vector<std::string> _primary_keys;
-    std::vector<RowCache::CacheHandle> _cached_row_data;
-    std::vector<std::optional<RowLocation>> _row_locations;
+    std::vector<RowReadContext> _row_read_ctxs;
     std::shared_ptr<Reusable> _reusable;
     std::unique_ptr<vectorized::Block> _result_block;
     Metrics _profile_metrics;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to