This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ce9a20a375 [enhancement](merge-on-write) format logs about MoW and add 
more stats for publish (#20853)
ce9a20a375 is described below

commit ce9a20a375001372881b3d7080397d57548aa4ec
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Sat Jun 17 23:14:28 2023 +0800

    [enhancement](merge-on-write) format logs about MoW and add more stats for 
publish (#20853)
---
 be/src/agent/task_worker_pool.cpp                  |  5 ++-
 be/src/olap/delta_writer.cpp                       | 13 +++---
 be/src/olap/memtable.cpp                           | 10 +++++
 be/src/olap/olap_common.h                          |  6 ++-
 be/src/olap/tablet.cpp                             | 47 +++++++++++++++-------
 be/src/olap/tablet.h                               |  4 +-
 be/src/olap/task/engine_publish_version_task.cpp   | 26 ++++++++----
 be/src/olap/task/engine_publish_version_task.h     | 23 ++++++++++-
 be/src/olap/txn_manager.cpp                        | 38 ++++++++++++-----
 be/src/olap/txn_manager.h                          | 12 +++---
 be/test/olap/delta_writer_test.cpp                 |  7 +++-
 .../olap/engine_storage_migration_task_test.cpp    |  4 +-
 be/test/olap/remote_rowset_gc_test.cpp             |  4 +-
 be/test/olap/tablet_cooldown_test.cpp              |  4 +-
 be/test/olap/txn_manager_test.cpp                  |  7 +++-
 15 files changed, 151 insertions(+), 59 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index fd2c8a17dd..100ac2fd4a 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1469,7 +1469,7 @@ void 
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
                     _tasks.push_back(agent_task_req);
                     _worker_thread_condition_variable.notify_one();
                 }
-                LOG_EVERY_SECOND(INFO) << "wait for previous publish version 
task to be done"
+                LOG_EVERY_SECOND(INFO) << "wait for previous publish version 
task to be done, "
                                        << "transaction_id: " << 
publish_version_req.transaction_id;
                 break;
             } else {
@@ -1521,7 +1521,8 @@ void 
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
             LOG_INFO("successfully publish version")
                     .tag("signature", agent_task_req.signature)
                     .tag("transaction_id", publish_version_req.transaction_id)
-                    .tag("tablets_num", succ_tablet_ids.size());
+                    .tag("tablets_num", succ_tablet_ids.size())
+                    .tag("cost(s)", time(nullptr) - agent_task_req.recv_time);
         }
 
         status.to_thrift(&finish_task_request.task_status);
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 6789e12e19..394950a193 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -203,8 +203,8 @@ Status DeltaWriter::init() {
     context.tablet_id = _tablet->table_id();
     context.tablet = _tablet;
     context.write_type = DataWriteType::TYPE_DIRECT;
-    context.mow_context =
-            std::make_shared<MowContext>(_cur_max_version, _rowset_ids, 
_delete_bitmap);
+    context.mow_context = std::make_shared<MowContext>(_cur_max_version, 
_req.txn_id, _rowset_ids,
+                                                       _delete_bitmap);
     RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer));
 
     _schema.reset(new Schema(_tablet_schema));
@@ -347,7 +347,8 @@ void DeltaWriter::_reset_mem_table() {
         _mem_table_insert_trackers.push_back(mem_table_insert_tracker);
         _mem_table_flush_trackers.push_back(mem_table_flush_tracker);
     }
-    auto mow_context = std::make_shared<MowContext>(_cur_max_version, 
_rowset_ids, _delete_bitmap);
+    auto mow_context = std::make_shared<MowContext>(_cur_max_version, 
_req.txn_id, _rowset_ids,
+                                                    _delete_bitmap);
     _mem_table.reset(new MemTable(_tablet, _schema.get(), 
_tablet_schema.get(), _req.slots,
                                   _req.tuple_desc, _rowset_writer.get(), 
mow_context,
                                   mem_table_insert_tracker, 
mem_table_flush_tracker));
@@ -454,9 +455,9 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& 
slave_tablet_nodes,
                                                                          
_delete_bitmap));
         }
         int64_t cur_max_version = _tablet->max_version().second;
-        
RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap(_cur_rowset, 
_rowset_ids,
-                                                                   
_delete_bitmap, cur_max_version,
-                                                                   segments, 
_rowset_writer.get()));
+        RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap(
+                _cur_rowset, _rowset_ids, _delete_bitmap, cur_max_version, 
segments, _req.txn_id,
+                _rowset_writer.get()));
         _rowset_ids = _tablet->all_rs_id(cur_max_version);
     }
     Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, 
_tablet, _req.txn_id,
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 9a6e5687d2..2cdc440848 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -459,9 +459,19 @@ Status MemTable::_generate_delete_bitmap(int32_t 
segment_id) {
         SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
         return Status::OK();
     }
+
+    OlapStopWatch watch;
     RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments, 
&_mow_context->rowset_ids,
                                                 _mow_context->delete_bitmap,
                                                 _mow_context->max_version));
+    size_t total_rows = std::accumulate(
+            segments.begin(), segments.end(), 0,
+            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
+    LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " << 
tablet_id()
+              << ", rowset_ids: " << _mow_context->rowset_ids.size()
+              << ", cur max_version: " << _mow_context->max_version
+              << ", transaction_id: " << _mow_context->txn_id
+              << ", cost: " << watch.get_elapse_time_us() << "(us), total 
rows: " << total_rows;
     return Status::OK();
 }
 
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index d3d9bf3122..1ebf306b41 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -461,9 +461,11 @@ using RowsetIdUnorderedSet = std::unordered_set<RowsetId, 
HashOfRowsetId>;
 class DeleteBitmap;
 // merge on write context
 struct MowContext {
-    MowContext(int64_t version, const RowsetIdUnorderedSet& ids, 
std::shared_ptr<DeleteBitmap> db)
-            : max_version(version), rowset_ids(ids), delete_bitmap(db) {}
+    MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
+               std::shared_ptr<DeleteBitmap> db)
+            : max_version(version), txn_id(txnid), rowset_ids(ids), 
delete_bitmap(db) {}
     int64_t max_version;
+    int64_t txn_id;
     const RowsetIdUnorderedSet& rowset_ids;
     std::shared_ptr<DeleteBitmap> delete_bitmap;
 };
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index fb9d0bdadd..6a3749ca4f 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3009,11 +3009,6 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
     for (auto seg_delete_bitmap : seg_delete_bitmaps) {
         delete_bitmap->merge(*seg_delete_bitmap);
     }
-
-    LOG(INFO) << "construct delete bitmap tablet: " << tablet_id() << " 
rowset: " << rowset_id
-              << " dummy_version: " << end_version + 1
-              << " bitmap num: " << delete_bitmap->delete_bitmap.size()
-              << " cost: " << watch.get_elapse_time_us() << "(us)";
     return Status::OK();
 }
 
@@ -3170,8 +3165,18 @@ Status Tablet::update_delete_bitmap_without_lock(const 
RowsetSharedPtr& rowset)
     RowsetIdUnorderedSet cur_rowset_ids = all_rs_id(cur_version - 1);
     DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(tablet_id());
     RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, 
delete_bitmap));
+
+    OlapStopWatch watch;
     RETURN_IF_ERROR(
             calc_delete_bitmap(rowset, segments, &cur_rowset_ids, 
delete_bitmap, cur_version - 1));
+    size_t total_rows = std::accumulate(
+            segments.begin(), segments.end(), 0,
+            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
+    LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: " 
<< tablet_id()
+              << ", rowset_ids: " << cur_rowset_ids.size() << ", cur 
max_version: " << cur_version
+              << ", transaction_id: " << -1 << ", cost: " << 
watch.get_elapse_time_us()
+              << "(us), total rows: " << total_rows;
+
     for (auto iter = delete_bitmap->delete_bitmap.begin();
          iter != delete_bitmap->delete_bitmap.end(); ++iter) {
         _tablet_meta->delete_bitmap().merge(
@@ -3184,7 +3189,8 @@ Status Tablet::update_delete_bitmap_without_lock(const 
RowsetSharedPtr& rowset)
 Status Tablet::commit_phase_update_delete_bitmap(
         const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& 
pre_rowset_ids,
         DeleteBitmapPtr delete_bitmap, const int64_t& cur_version,
-        const std::vector<segment_v2::SegmentSharedPtr>& segments, 
RowsetWriter* rowset_writer) {
+        const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t 
txn_id,
+        RowsetWriter* rowset_writer) {
     RowsetIdUnorderedSet cur_rowset_ids;
     RowsetIdUnorderedSet rowset_ids_to_add;
     RowsetIdUnorderedSet rowset_ids_to_del;
@@ -3192,22 +3198,28 @@ Status Tablet::commit_phase_update_delete_bitmap(
     std::shared_lock meta_rlock(_meta_lock);
     cur_rowset_ids = all_rs_id(cur_version);
     _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, 
&rowset_ids_to_del);
-    if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) {
-        LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size()
-                  << ", rowset_ids_to_del: " << rowset_ids_to_del.size();
-    }
     for (const auto& to_del : rowset_ids_to_del) {
         delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
     }
 
+    OlapStopWatch watch;
     RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add, 
delete_bitmap,
                                        cur_version, rowset_writer));
+    size_t total_rows = std::accumulate(
+            segments.begin(), segments.end(), 0,
+            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
+    LOG(INFO) << "[Before Commit] construct delete bitmap tablet: " << 
tablet_id()
+              << ", rowset_ids to add: " << rowset_ids_to_add.size()
+              << ", rowset_ids to del: " << rowset_ids_to_del.size()
+              << ", cur max_version: " << cur_version << ", transaction_id: " 
<< txn_id
+              << ", cost: " << watch.get_elapse_time_us() << "(us), total 
rows: " << total_rows;
     return Status::OK();
 }
 
 Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
                                     const RowsetIdUnorderedSet& pre_rowset_ids,
-                                    DeleteBitmapPtr delete_bitmap, 
RowsetWriter* rowset_writer) {
+                                    DeleteBitmapPtr delete_bitmap, int64_t 
txn_id,
+                                    RowsetWriter* rowset_writer) {
     RowsetIdUnorderedSet cur_rowset_ids;
     RowsetIdUnorderedSet rowset_ids_to_add;
     RowsetIdUnorderedSet rowset_ids_to_del;
@@ -3227,16 +3239,21 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
     }
     cur_rowset_ids = all_rs_id(cur_version - 1);
     _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, 
&rowset_ids_to_del);
-    if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) {
-        LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size()
-                  << ", rowset_ids_to_del: " << rowset_ids_to_del.size();
-    }
     for (const auto& to_del : rowset_ids_to_del) {
         delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
     }
 
+    OlapStopWatch watch;
     RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add, 
delete_bitmap,
                                        cur_version - 1, rowset_writer));
+    size_t total_rows = std::accumulate(
+            segments.begin(), segments.end(), 0,
+            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
+    LOG(INFO) << "[Publish] construct delete bitmap tablet: " << tablet_id()
+              << ", rowset_ids to add: " << rowset_ids_to_add.size()
+              << ", rowset_ids to del: " << rowset_ids_to_del.size()
+              << ", cur max_version: " << cur_version << ", transaction_id: " 
<< txn_id
+              << ", cost: " << watch.get_elapse_time_us() << "(us), total 
rows: " << total_rows;
 
     // update version without write lock, compaction and publish_txn
     // will update delete bitmap, handle compaction with _rowset_update_lock
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 50eb3469f1..3c3a010397 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -458,12 +458,12 @@ public:
     Status commit_phase_update_delete_bitmap(
             const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& 
pre_rowset_ids,
             DeleteBitmapPtr delete_bitmap, const int64_t& cur_version,
-            const std::vector<segment_v2::SegmentSharedPtr>& segments,
+            const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t 
txn_id,
             RowsetWriter* rowset_writer = nullptr);
 
     Status update_delete_bitmap(const RowsetSharedPtr& rowset,
                                 const RowsetIdUnorderedSet& pre_rowset_ids,
-                                DeleteBitmapPtr delete_bitmap,
+                                DeleteBitmapPtr delete_bitmap, int64_t txn_id,
                                 RowsetWriter* rowset_writer = nullptr);
     void calc_compaction_output_rowset_delete_bitmap(
             const std::vector<RowsetSharedPtr>& input_rowsets,
diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index d8fd5c1bbe..bca689960a 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -142,11 +142,12 @@ Status EnginePublishVersionTask::finish() {
                 }
                 if (tablet_state == TabletState::TABLET_RUNNING &&
                     version.first != max_version.second + 1) {
-                    VLOG_NOTICE << "uniq key with merge-on-write version not 
continuous, current "
-                                   "max "
-                                   "version="
-                                << max_version.second << ", publish_version=" 
<< version.first
-                                << " tablet_id=" << tablet->tablet_id();
+                    LOG_EVERY_SECOND(INFO)
+                            << "uniq key with merge-on-write version not 
continuous, "
+                               "current max version="
+                            << max_version.second << ", publish_version=" << 
version.first
+                            << ", tablet_id=" << tablet->tablet_id()
+                            << ", transaction_id=" << 
_publish_version_req.transaction_id;
                     // If a tablet migrates out and back, the previously failed
                     // publish task may retry on the new tablet, so check
                     // whether the version exists. if not exist, then set
@@ -219,16 +220,19 @@ 
TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
           _partition_id(partition_id),
           _transaction_id(transaction_id),
           _version(version),
-          _tablet_info(tablet_info) {}
+          _tablet_info(tablet_info) {
+    _stats.submit_time_us = MonotonicMicros();
+}
 
 void TabletPublishTxnTask::handle() {
+    _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
     Defer defer {[&] {
         if (_engine_publish_version_task->finish_task() == 1) {
             _engine_publish_version_task->notify();
         }
     }};
     auto publish_status = 
StorageEngine::instance()->txn_manager()->publish_txn(
-            _partition_id, _tablet, _transaction_id, _version);
+            _partition_id, _tablet, _transaction_id, _version, &_stats);
     if (publish_status != Status::OK()) {
         LOG(WARNING) << "failed to publish version. rowset_id=" << 
_rowset->rowset_id()
                      << ", tablet_id=" << _tablet_info.tablet_id << ", 
txn_id=" << _transaction_id
@@ -238,7 +242,9 @@ void TabletPublishTxnTask::handle() {
     }
 
     // add visible rowset to tablet
+    int64_t t1 = MonotonicMicros();
     publish_status = _tablet->add_inc_rowset(_rowset);
+    _stats.add_inc_rowset_us = MonotonicMicros() - t1;
     if (publish_status != Status::OK() && 
!publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
         LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << 
_rowset->rowset_id()
                      << ", tablet_id=" << _tablet_info.tablet_id << ", 
txn_id=" << _transaction_id
@@ -247,10 +253,14 @@ void TabletPublishTxnTask::handle() {
         return;
     }
     _engine_publish_version_task->add_succ_tablet_id(_tablet_info.tablet_id);
+    int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
+    // print stats if publish cost > 500ms
     LOG(INFO) << "publish version successfully on tablet"
               << ", table_id=" << _tablet->table_id() << ", tablet=" << 
_tablet->full_name()
               << ", transaction_id=" << _transaction_id << ", version=" << 
_version.first
-              << ", num_rows=" << _rowset->num_rows() << ", res=" << 
publish_status;
+              << ", num_rows=" << _rowset->num_rows() << ", res=" << 
publish_status
+              << ", cost: " << cost_us << "(us) "
+              << (cost_us > 500 * 1000 ? _stats.to_string() : "");
 }
 
 } // namespace doris
diff --git a/be/src/olap/task/engine_publish_version_task.h 
b/be/src/olap/task/engine_publish_version_task.h
index 7c163839bd..efc7c8a4cf 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -31,18 +31,38 @@
 #include "olap/rowset/rowset.h"
 #include "olap/tablet.h"
 #include "olap/task/engine_task.h"
+#include "util/time.h"
 
 namespace doris {
 
 class EnginePublishVersionTask;
 class TPublishVersionRequest;
 
+struct TabletPublishStatistics {
+    int64_t submit_time_us = 0;
+    int64_t schedule_time_us = 0;
+    int64_t lock_wait_time_us = 0;
+    int64_t save_meta_time_us = 0;
+    int64_t calc_delete_bitmap_time_us = 0;
+    int64_t partial_update_write_segment_us = 0;
+    int64_t add_inc_rowset_us = 0;
+
+    std::string to_string() {
+        return fmt::format(
+                "[Publish Statistics: schedule time(us): {}, lock wait 
time(us): {}, save meta "
+                "time(us): {}, calc delete bitmap time(us): {}, partial update 
write segment "
+                "time(us): {}, add inc rowset time(us): {}]",
+                schedule_time_us, lock_wait_time_us, save_meta_time_us, 
calc_delete_bitmap_time_us,
+                partial_update_write_segment_us, add_inc_rowset_us);
+    }
+};
+
 class TabletPublishTxnTask {
 public:
     TabletPublishTxnTask(EnginePublishVersionTask* engine_task, 
TabletSharedPtr tablet,
                          RowsetSharedPtr rowset, int64_t partition_id, int64_t 
transaction_id,
                          Version version, const TabletInfo& tablet_info);
-    ~TabletPublishTxnTask() {}
+    ~TabletPublishTxnTask() = default;
 
     void handle();
 
@@ -55,6 +75,7 @@ private:
     int64_t _transaction_id;
     Version _version;
     TabletInfo _tablet_info;
+    TabletPublishStatistics _stats;
 };
 
 class EnginePublishVersionTask : public EngineTask {
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index f7e56fd785..ad799868aa 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -42,6 +42,7 @@
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
 #include "olap/tablet_meta.h"
+#include "olap/task/engine_publish_version_task.h"
 #include "util/time.h"
 
 namespace doris {
@@ -78,7 +79,7 @@ TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t 
txn_shard_size)
     _txn_map_locks = new std::shared_mutex[_txn_map_shard_size];
     _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size];
     _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size];
-    _txn_mutex = new std::mutex[_txn_shard_size];
+    _txn_mutex = new std::shared_mutex[_txn_shard_size];
     _txn_tablet_delta_writer_map = new 
txn_tablet_delta_writer_map_t[_txn_map_shard_size];
     _txn_tablet_delta_writer_map_locks = new 
std::shared_mutex[_txn_map_shard_size];
 }
@@ -165,9 +166,11 @@ Status TxnManager::commit_txn(TPartitionId partition_id, 
const TabletSharedPtr&
 }
 
 Status TxnManager::publish_txn(TPartitionId partition_id, const 
TabletSharedPtr& tablet,
-                               TTransactionId transaction_id, const Version& 
version) {
+                               TTransactionId transaction_id, const Version& 
version,
+                               TabletPublishStatistics* stats) {
     return publish_txn(tablet->data_dir()->get_meta(), partition_id, 
transaction_id,
-                       tablet->tablet_id(), tablet->schema_hash(), 
tablet->tablet_uid(), version);
+                       tablet->tablet_id(), tablet->schema_hash(), 
tablet->tablet_uid(), version,
+                       stats);
 }
 
 // delete the txn from manager if it is not committed(not have a valid rowset)
@@ -192,7 +195,7 @@ void TxnManager::set_txn_related_delete_bitmap(TPartitionId 
partition_id,
     pair<int64_t, int64_t> key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
 
-    std::unique_lock<std::mutex> txn_lock(_get_txn_lock(transaction_id));
+    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
     {
         // get tx
         std::lock_guard<std::shared_mutex> 
wrlock(_get_txn_map_lock(transaction_id));
@@ -236,7 +239,7 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId 
partition_id,
         return Status::Error<ROWSET_INVALID>();
     }
 
-    std::unique_lock<std::mutex> txn_lock(_get_txn_lock(transaction_id));
+    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
     // this while loop just run only once, just for if break
     do {
         // get tx
@@ -322,21 +325,24 @@ Status TxnManager::commit_txn(OlapMeta* meta, 
TPartitionId partition_id,
 // remove a txn from txn manager
 Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
                                TTransactionId transaction_id, TTabletId 
tablet_id,
-                               SchemaHash schema_hash, TabletUid tablet_uid,
-                               const Version& version) {
+                               SchemaHash schema_hash, TabletUid tablet_uid, 
const Version& version,
+                               TabletPublishStatistics* stats) {
     auto tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
     if (tablet == nullptr) {
         return Status::OK();
     }
+    DCHECK(stats != nullptr);
 
     pair<int64_t, int64_t> key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
     RowsetSharedPtr rowset = nullptr;
     TabletTxnInfo tablet_txn_info;
+    int64_t t1 = MonotonicMicros();
     /// Step 1: get rowset, tablet_txn_info by key
     {
-        std::unique_lock<std::mutex> txn_rlock(_get_txn_lock(transaction_id));
+        std::shared_lock txn_rlock(_get_txn_lock(transaction_id));
         std::shared_lock txn_map_rlock(_get_txn_map_lock(transaction_id));
+        stats->lock_wait_time_us += MonotonicMicros() - t1;
 
         txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
         if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) {
@@ -368,9 +374,12 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
         std::unique_ptr<RowsetWriter> rowset_writer;
         _create_transient_rowset_writer(tablet, rowset, &rowset_writer);
 
+        int64_t t2 = MonotonicMicros();
         RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset, 
tablet_txn_info.rowset_ids,
-                                                     
tablet_txn_info.delete_bitmap,
+                                                     
tablet_txn_info.delete_bitmap, transaction_id,
                                                      rowset_writer.get()));
+        int64_t t3 = MonotonicMicros();
+        stats->calc_delete_bitmap_time_us = t3 - t2;
         if (rowset->tablet_schema()->is_partial_update()) {
             // build rowset writer and merge transient rowset
             RETURN_IF_ERROR(rowset_writer->flush());
@@ -380,8 +389,11 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
             // erase segment cache cause we will add a segment to rowset
             SegmentLoader::instance()->erase_segment(rowset->rowset_id());
         }
+        stats->partial_update_write_segment_us = MonotonicMicros() - t3;
+        int64_t t4 = MonotonicMicros();
         std::shared_lock rlock(tablet->get_header_lock());
         tablet->save_meta();
+        stats->save_meta_time_us = MonotonicMicros() - t4;
     }
 
     /// Step 3:  add to binlog
@@ -397,8 +409,10 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
     }
 
     /// Step 4: save meta
+    int64_t t5 = MonotonicMicros();
     auto status = RowsetMetaManager::save(meta, tablet_uid, 
rowset->rowset_id(),
                                           
rowset->rowset_meta()->get_rowset_pb(), enable_binlog);
+    stats->save_meta_time_us += MonotonicMicros() - t5;
     if (!status.ok()) {
         LOG(WARNING) << "save committed rowset failed. when publish txn 
rowset_id:"
                      << rowset->rowset_id() << ", tablet id: " << tablet_id
@@ -415,14 +429,16 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
 
     /// Step 5: remove tablet_info from tnx_tablet_map
     // txn_tablet_map[key] empty, remove key from txn_tablet_map
-    std::unique_lock<std::mutex> txn_lock(_get_txn_lock(transaction_id));
+    int64_t t6 = MonotonicMicros();
+    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
     std::lock_guard<std::shared_mutex> 
wrlock(_get_txn_map_lock(transaction_id));
+    stats->lock_wait_time_us += MonotonicMicros() - t6;
     txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
     if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) {
         it->second.erase(tablet_info);
         VLOG_NOTICE << "publish txn successfully."
                     << " partition_id: " << key.first << ", txn_id: " << 
key.second
-                    << ", tablet: " << tablet_info.to_string()
+                    << ", tablet_id: " << tablet_info.tablet_id
                     << ", rowsetid: " << rowset->rowset_id() << ", version: " 
<< version.first
                     << "," << version.second;
         if (it->second.empty()) {
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index e2ed4290f9..36be3b03f5 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -48,6 +48,7 @@
 namespace doris {
 class DeltaWriter;
 class OlapMeta;
+struct TabletPublishStatistics;
 
 struct TabletTxnInfo {
     PUniqueId load_id;
@@ -106,7 +107,8 @@ public:
                       const RowsetSharedPtr& rowset_ptr, bool is_recovery);
 
     Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& 
tablet,
-                       TTransactionId transaction_id, const Version& version);
+                       TTransactionId transaction_id, const Version& version,
+                       TabletPublishStatistics* stats);
 
     // delete the txn from manager if it is not committed(not have a valid 
rowset)
     Status rollback_txn(TPartitionId partition_id, const TabletSharedPtr& 
tablet,
@@ -124,7 +126,7 @@ public:
     // not persist rowset meta because
     Status publish_txn(OlapMeta* meta, TPartitionId partition_id, 
TTransactionId transaction_id,
                        TTabletId tablet_id, SchemaHash schema_hash, TabletUid 
tablet_uid,
-                       const Version& version);
+                       const Version& version, TabletPublishStatistics* stats);
 
     // delete the txn from manager if it is not committed(not have a valid 
rowset)
     Status rollback_txn(TPartitionId partition_id, TTransactionId 
transaction_id,
@@ -201,7 +203,7 @@ private:
 
     txn_partition_map_t& _get_txn_partition_map(TTransactionId transactionId);
 
-    inline std::mutex& _get_txn_lock(TTransactionId transactionId);
+    inline std::shared_mutex& _get_txn_lock(TTransactionId transactionId);
 
     std::shared_mutex& _get_txn_tablet_delta_writer_map_lock(TTransactionId 
transactionId);
 
@@ -231,7 +233,7 @@ private:
 
     std::shared_mutex* _txn_map_locks;
 
-    std::mutex* _txn_mutex;
+    std::shared_mutex* _txn_mutex;
 
     txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map;
     std::shared_mutex* _txn_tablet_delta_writer_map_locks;
@@ -251,7 +253,7 @@ inline TxnManager::txn_partition_map_t& 
TxnManager::_get_txn_partition_map(
     return _txn_partition_maps[transactionId & (_txn_map_shard_size - 1)];
 }
 
-inline std::mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) {
+inline std::shared_mutex& TxnManager::_get_txn_lock(TTransactionId 
transactionId) {
     return _txn_mutex[transactionId & (_txn_shard_size - 1)];
 }
 
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index 1085a09879..f1d5f49619 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -47,6 +47,7 @@
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
 #include "olap/tablet_manager.h"
+#include "olap/task/engine_publish_version_task.h"
 #include "olap/txn_manager.h"
 #include "runtime/decimalv2_value.h"
 #include "runtime/define_primitive_type.h"
@@ -610,9 +611,10 @@ TEST_F(TestDeltaWriter, vec_write) {
     for (auto& tablet_rs : tablet_related_rs) {
         std::cout << "start to publish txn" << std::endl;
         RowsetSharedPtr rowset = tablet_rs.second;
+        TabletPublishStatistics stats;
         res = k_engine->txn_manager()->publish_txn(meta, 
write_req.partition_id, write_req.txn_id,
                                                    write_req.tablet_id, 
write_req.schema_hash,
-                                                   tablet_rs.first.tablet_uid, 
version);
+                                                   tablet_rs.first.tablet_uid, 
version, &stats);
         ASSERT_TRUE(res.ok());
         std::cout << "start to add inc rowset:" << rowset->rowset_id()
                   << ", num rows:" << rowset->num_rows() << ", version:" << 
rowset->version().first
@@ -725,9 +727,10 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
 
     std::cout << "start to publish txn" << std::endl;
     RowsetSharedPtr rowset = tablet_related_rs.begin()->second;
+    TabletPublishStatistics pstats;
     res = k_engine->txn_manager()->publish_txn(
             meta, write_req.partition_id, write_req.txn_id, 
write_req.tablet_id,
-            write_req.schema_hash, 
tablet_related_rs.begin()->first.tablet_uid, version);
+            write_req.schema_hash, 
tablet_related_rs.begin()->first.tablet_uid, version, &pstats);
     ASSERT_TRUE(res.ok());
     std::cout << "start to add inc rowset:" << rowset->rowset_id()
               << ", num rows:" << rowset->num_rows() << ", version:" << 
rowset->version().first
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp 
b/be/test/olap/engine_storage_migration_task_test.cpp
index e4dfe784d2..79fbf26203 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -46,6 +46,7 @@
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
 #include "olap/tablet_manager.h"
+#include "olap/task/engine_publish_version_task.h"
 #include "olap/txn_manager.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/descriptor_helper.h"
@@ -206,9 +207,10 @@ TEST_F(TestEngineStorageMigrationTask, 
write_and_migration) {
             write_req.txn_id, write_req.partition_id, &tablet_related_rs);
     for (auto& tablet_rs : tablet_related_rs) {
         RowsetSharedPtr rowset = tablet_rs.second;
+        TabletPublishStatistics stats;
         res = k_engine->txn_manager()->publish_txn(meta, 
write_req.partition_id, write_req.txn_id,
                                                    tablet->tablet_id(), 
tablet->schema_hash(),
-                                                   tablet->tablet_uid(), 
version);
+                                                   tablet->tablet_uid(), 
version, &stats);
         EXPECT_EQ(Status::OK(), res);
         res = tablet->add_inc_rowset(rowset);
         EXPECT_EQ(Status::OK(), res);
diff --git a/be/test/olap/remote_rowset_gc_test.cpp 
b/be/test/olap/remote_rowset_gc_test.cpp
index 49a70b56a5..a3b21a52c9 100644
--- a/be/test/olap/remote_rowset_gc_test.cpp
+++ b/be/test/olap/remote_rowset_gc_test.cpp
@@ -50,6 +50,7 @@
 #include "olap/tablet.h"
 #include "olap/tablet_manager.h"
 #include "olap/tablet_meta.h"
+#include "olap/task/engine_publish_version_task.h"
 #include "olap/txn_manager.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/descriptor_helper.h"
@@ -212,9 +213,10 @@ TEST_F(RemoteRowsetGcTest, normal) {
             write_req.txn_id, write_req.partition_id, &tablet_related_rs);
     for (auto& tablet_rs : tablet_related_rs) {
         RowsetSharedPtr rowset = tablet_rs.second;
+        TabletPublishStatistics stats;
         st = k_engine->txn_manager()->publish_txn(meta, 
write_req.partition_id, write_req.txn_id,
                                                   write_req.tablet_id, 
write_req.schema_hash,
-                                                  tablet_rs.first.tablet_uid, 
version);
+                                                  tablet_rs.first.tablet_uid, 
version, &stats);
         ASSERT_EQ(Status::OK(), st);
         st = tablet->add_inc_rowset(rowset);
         ASSERT_EQ(Status::OK(), st);
diff --git a/be/test/olap/tablet_cooldown_test.cpp 
b/be/test/olap/tablet_cooldown_test.cpp
index 52b23c68f2..038605f7cb 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -57,6 +57,7 @@
 #include "olap/tablet.h"
 #include "olap/tablet_manager.h"
 #include "olap/tablet_meta.h"
+#include "olap/task/engine_publish_version_task.h"
 #include "olap/txn_manager.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/descriptor_helper.h"
@@ -405,9 +406,10 @@ void createTablet(StorageEngine* engine, TabletSharedPtr* 
tablet, int64_t replic
                                                    &tablet_related_rs);
     for (auto& tablet_rs : tablet_related_rs) {
         RowsetSharedPtr rowset = tablet_rs.second;
+        TabletPublishStatistics stats;
         st = engine->txn_manager()->publish_txn(meta, write_req.partition_id, 
write_req.txn_id,
                                                 (*tablet)->tablet_id(), 
(*tablet)->schema_hash(),
-                                                (*tablet)->tablet_uid(), 
version);
+                                                (*tablet)->tablet_uid(), 
version, &stats);
         ASSERT_EQ(Status::OK(), st);
         st = (*tablet)->add_inc_rowset(rowset);
         ASSERT_EQ(Status::OK(), st);
diff --git a/be/test/olap/txn_manager_test.cpp 
b/be/test/olap/txn_manager_test.cpp
index 2932ae4aa8..146836cdc2 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -40,6 +40,7 @@
 #include "olap/rowset/rowset_meta_manager.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_schema.h"
+#include "olap/task/engine_publish_version_task.h"
 #include "util/uid_util.h"
 
 using ::testing::_;
@@ -280,8 +281,9 @@ TEST_F(TxnManagerTest, PublishVersionSuccessful) {
                                          schema_hash, _tablet_uid, load_id, 
_rowset, false);
     EXPECT_TRUE(status == Status::OK());
     Version new_version(10, 11);
+    TabletPublishStatistics stats;
     status = _txn_mgr->publish_txn(_meta, partition_id, transaction_id, 
tablet_id, schema_hash,
-                                   _tablet_uid, new_version);
+                                   _tablet_uid, new_version, &stats);
     EXPECT_TRUE(status == Status::OK());
 
     RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
@@ -298,8 +300,9 @@ TEST_F(TxnManagerTest, PublishVersionSuccessful) {
 TEST_F(TxnManagerTest, PublishNotExistedTxn) {
     Version new_version(10, 11);
     auto not_exist_txn = transaction_id + 1000;
+    TabletPublishStatistics stats;
     Status status = _txn_mgr->publish_txn(_meta, partition_id, not_exist_txn, 
tablet_id,
-                                          schema_hash, _tablet_uid, 
new_version);
+                                          schema_hash, _tablet_uid, 
new_version, &stats);
     EXPECT_EQ(status, Status::OK());
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to