This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ce9a20a375 [enhancement](merge-on-write) format logs about MoW and add more stats for publish (#20853) ce9a20a375 is described below commit ce9a20a375001372881b3d7080397d57548aa4ec Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com> AuthorDate: Sat Jun 17 23:14:28 2023 +0800 [enhancement](merge-on-write) format logs about MoW and add more stats for publish (#20853) --- be/src/agent/task_worker_pool.cpp | 5 ++- be/src/olap/delta_writer.cpp | 13 +++--- be/src/olap/memtable.cpp | 10 +++++ be/src/olap/olap_common.h | 6 ++- be/src/olap/tablet.cpp | 47 +++++++++++++++------- be/src/olap/tablet.h | 4 +- be/src/olap/task/engine_publish_version_task.cpp | 26 ++++++++---- be/src/olap/task/engine_publish_version_task.h | 23 ++++++++++- be/src/olap/txn_manager.cpp | 38 ++++++++++++----- be/src/olap/txn_manager.h | 12 +++--- be/test/olap/delta_writer_test.cpp | 7 +++- .../olap/engine_storage_migration_task_test.cpp | 4 +- be/test/olap/remote_rowset_gc_test.cpp | 4 +- be/test/olap/tablet_cooldown_test.cpp | 4 +- be/test/olap/txn_manager_test.cpp | 7 +++- 15 files changed, 151 insertions(+), 59 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index fd2c8a17dd..100ac2fd4a 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1469,7 +1469,7 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { _tasks.push_back(agent_task_req); _worker_thread_condition_variable.notify_one(); } - LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done" + LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done, " << "transaction_id: " << publish_version_req.transaction_id; break; } else { @@ -1521,7 +1521,8 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { LOG_INFO("successfully publish version") .tag("signature", agent_task_req.signature) .tag("transaction_id", publish_version_req.transaction_id) - .tag("tablets_num", succ_tablet_ids.size()); + .tag("tablets_num", succ_tablet_ids.size()) + .tag("cost(s)", time(nullptr) - agent_task_req.recv_time); } status.to_thrift(&finish_task_request.task_status); diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 6789e12e19..394950a193 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -203,8 +203,8 @@ Status DeltaWriter::init() { context.tablet_id = _tablet->table_id(); context.tablet = _tablet; context.write_type = DataWriteType::TYPE_DIRECT; - context.mow_context = - std::make_shared<MowContext>(_cur_max_version, _rowset_ids, _delete_bitmap); + context.mow_context = std::make_shared<MowContext>(_cur_max_version, _req.txn_id, _rowset_ids, + _delete_bitmap); RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer)); _schema.reset(new Schema(_tablet_schema)); @@ -347,7 +347,8 @@ void DeltaWriter::_reset_mem_table() { _mem_table_insert_trackers.push_back(mem_table_insert_tracker); _mem_table_flush_trackers.push_back(mem_table_flush_tracker); } - auto mow_context = std::make_shared<MowContext>(_cur_max_version, _rowset_ids, _delete_bitmap); + auto mow_context = std::make_shared<MowContext>(_cur_max_version, _req.txn_id, _rowset_ids, + _delete_bitmap); _mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots, _req.tuple_desc, _rowset_writer.get(), mow_context, mem_table_insert_tracker, mem_table_flush_tracker)); @@ -454,9 +455,9 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes, _delete_bitmap)); } int64_t cur_max_version = _tablet->max_version().second; - RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap(_cur_rowset, _rowset_ids, - _delete_bitmap, cur_max_version, - segments, _rowset_writer.get())); + RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap( + _cur_rowset, _rowset_ids, _delete_bitmap, cur_max_version, segments, _req.txn_id, + _rowset_writer.get())); _rowset_ids = _tablet->all_rs_id(cur_max_version); } Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id, diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 9a6e5687d2..2cdc440848 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -459,9 +459,19 @@ Status MemTable::_generate_delete_bitmap(int32_t segment_id) { SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) { return Status::OK(); } + + OlapStopWatch watch; RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments, &_mow_context->rowset_ids, _mow_context->delete_bitmap, _mow_context->max_version)); + size_t total_rows = std::accumulate( + segments.begin(), segments.end(), 0, + [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); + LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " << tablet_id() + << ", rowset_ids: " << _mow_context->rowset_ids.size() + << ", cur max_version: " << _mow_context->max_version + << ", transaction_id: " << _mow_context->txn_id + << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows; return Status::OK(); } diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index d3d9bf3122..1ebf306b41 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -461,9 +461,11 @@ using RowsetIdUnorderedSet = std::unordered_set<RowsetId, HashOfRowsetId>; class DeleteBitmap; // merge on write context struct MowContext { - MowContext(int64_t version, const RowsetIdUnorderedSet& ids, std::shared_ptr<DeleteBitmap> db) - : max_version(version), rowset_ids(ids), delete_bitmap(db) {} + MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids, + std::shared_ptr<DeleteBitmap> db) + : max_version(version), txn_id(txnid), rowset_ids(ids), delete_bitmap(db) {} int64_t max_version; + int64_t txn_id; const RowsetIdUnorderedSet& rowset_ids; std::shared_ptr<DeleteBitmap> delete_bitmap; }; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index fb9d0bdadd..6a3749ca4f 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3009,11 +3009,6 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset, for (auto seg_delete_bitmap : seg_delete_bitmaps) { delete_bitmap->merge(*seg_delete_bitmap); } - - LOG(INFO) << "construct delete bitmap tablet: " << tablet_id() << " rowset: " << rowset_id - << " dummy_version: " << end_version + 1 - << " bitmap num: " << delete_bitmap->delete_bitmap.size() - << " cost: " << watch.get_elapse_time_us() << "(us)"; return Status::OK(); } @@ -3170,8 +3165,18 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) RowsetIdUnorderedSet cur_rowset_ids = all_rs_id(cur_version - 1); DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id()); RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); + + OlapStopWatch watch; RETURN_IF_ERROR( calc_delete_bitmap(rowset, segments, &cur_rowset_ids, delete_bitmap, cur_version - 1)); + size_t total_rows = std::accumulate( + segments.begin(), segments.end(), 0, + [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); + LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: " << tablet_id() + << ", rowset_ids: " << cur_rowset_ids.size() << ", cur max_version: " << cur_version + << ", transaction_id: " << -1 << ", cost: " << watch.get_elapse_time_us() + << "(us), total rows: " << total_rows; + for (auto iter = delete_bitmap->delete_bitmap.begin(); iter != delete_bitmap->delete_bitmap.end(); ++iter) { _tablet_meta->delete_bitmap().merge( @@ -3184,7 +3189,8 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) Status Tablet::commit_phase_update_delete_bitmap( const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids, DeleteBitmapPtr delete_bitmap, const int64_t& cur_version, - const std::vector<segment_v2::SegmentSharedPtr>& segments, RowsetWriter* rowset_writer) { + const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id, + RowsetWriter* rowset_writer) { RowsetIdUnorderedSet cur_rowset_ids; RowsetIdUnorderedSet rowset_ids_to_add; RowsetIdUnorderedSet rowset_ids_to_del; @@ -3192,22 +3198,28 @@ Status Tablet::commit_phase_update_delete_bitmap( std::shared_lock meta_rlock(_meta_lock); cur_rowset_ids = all_rs_id(cur_version); _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del); - if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) { - LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size() - << ", rowset_ids_to_del: " << rowset_ids_to_del.size(); - } for (const auto& to_del : rowset_ids_to_del) { delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX}); } + OlapStopWatch watch; RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add, delete_bitmap, cur_version, rowset_writer)); + size_t total_rows = std::accumulate( + segments.begin(), segments.end(), 0, + [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); + LOG(INFO) << "[Before Commit] construct delete bitmap tablet: " << tablet_id() + << ", rowset_ids to add: " << rowset_ids_to_add.size() + << ", rowset_ids to del: " << rowset_ids_to_del.size() + << ", cur max_version: " << cur_version << ", transaction_id: " << txn_id + << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows; return Status::OK(); } Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids, - DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer) { + DeleteBitmapPtr delete_bitmap, int64_t txn_id, + RowsetWriter* rowset_writer) { RowsetIdUnorderedSet cur_rowset_ids; RowsetIdUnorderedSet rowset_ids_to_add; RowsetIdUnorderedSet rowset_ids_to_del; @@ -3227,16 +3239,21 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, } cur_rowset_ids = all_rs_id(cur_version - 1); _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del); - if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) { - LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size() - << ", rowset_ids_to_del: " << rowset_ids_to_del.size(); - } for (const auto& to_del : rowset_ids_to_del) { delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX}); } + OlapStopWatch watch; RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add, delete_bitmap, cur_version - 1, rowset_writer)); + size_t total_rows = std::accumulate( + segments.begin(), segments.end(), 0, + [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); + LOG(INFO) << "[Publish] construct delete bitmap tablet: " << tablet_id() + << ", rowset_ids to add: " << rowset_ids_to_add.size() + << ", rowset_ids to del: " << rowset_ids_to_del.size() + << ", cur max_version: " << cur_version << ", transaction_id: " << txn_id + << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows; // update version without write lock, compaction and publish_txn // will update delete bitmap, handle compaction with _rowset_update_lock diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 50eb3469f1..3c3a010397 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -458,12 +458,12 @@ public: Status commit_phase_update_delete_bitmap( const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids, DeleteBitmapPtr delete_bitmap, const int64_t& cur_version, - const std::vector<segment_v2::SegmentSharedPtr>& segments, + const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id, RowsetWriter* rowset_writer = nullptr); Status update_delete_bitmap(const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids, - DeleteBitmapPtr delete_bitmap, + DeleteBitmapPtr delete_bitmap, int64_t txn_id, RowsetWriter* rowset_writer = nullptr); void calc_compaction_output_rowset_delete_bitmap( const std::vector<RowsetSharedPtr>& input_rowsets, diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index d8fd5c1bbe..bca689960a 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -142,11 +142,12 @@ Status EnginePublishVersionTask::finish() { } if (tablet_state == TabletState::TABLET_RUNNING && version.first != max_version.second + 1) { - VLOG_NOTICE << "uniq key with merge-on-write version not continuous, current " - "max " - "version=" - << max_version.second << ", publish_version=" << version.first - << " tablet_id=" << tablet->tablet_id(); + LOG_EVERY_SECOND(INFO) + << "uniq key with merge-on-write version not continuous, " + "current max version=" + << max_version.second << ", publish_version=" << version.first + << ", tablet_id=" << tablet->tablet_id() + << ", transaction_id=" << _publish_version_req.transaction_id; // If a tablet migrates out and back, the previously failed // publish task may retry on the new tablet, so check // whether the version exists. if not exist, then set @@ -219,16 +220,19 @@ TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task _partition_id(partition_id), _transaction_id(transaction_id), _version(version), - _tablet_info(tablet_info) {} + _tablet_info(tablet_info) { + _stats.submit_time_us = MonotonicMicros(); +} void TabletPublishTxnTask::handle() { + _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us; Defer defer {[&] { if (_engine_publish_version_task->finish_task() == 1) { _engine_publish_version_task->notify(); } }}; auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn( - _partition_id, _tablet, _transaction_id, _version); + _partition_id, _tablet, _transaction_id, _version, &_stats); if (publish_status != Status::OK()) { LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id() << ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id @@ -238,7 +242,9 @@ void TabletPublishTxnTask::handle() { } // add visible rowset to tablet + int64_t t1 = MonotonicMicros(); publish_status = _tablet->add_inc_rowset(_rowset); + _stats.add_inc_rowset_us = MonotonicMicros() - t1; if (publish_status != Status::OK() && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) { LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id() << ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id @@ -247,10 +253,14 @@ void TabletPublishTxnTask::handle() { return; } _engine_publish_version_task->add_succ_tablet_id(_tablet_info.tablet_id); + int64_t cost_us = MonotonicMicros() - _stats.submit_time_us; + // print stats if publish cost > 500ms LOG(INFO) << "publish version successfully on tablet" << ", table_id=" << _tablet->table_id() << ", tablet=" << _tablet->full_name() << ", transaction_id=" << _transaction_id << ", version=" << _version.first - << ", num_rows=" << _rowset->num_rows() << ", res=" << publish_status; + << ", num_rows=" << _rowset->num_rows() << ", res=" << publish_status + << ", cost: " << cost_us << "(us) " + << (cost_us > 500 * 1000 ? _stats.to_string() : ""); } } // namespace doris diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index 7c163839bd..efc7c8a4cf 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -31,18 +31,38 @@ #include "olap/rowset/rowset.h" #include "olap/tablet.h" #include "olap/task/engine_task.h" +#include "util/time.h" namespace doris { class EnginePublishVersionTask; class TPublishVersionRequest; +struct TabletPublishStatistics { + int64_t submit_time_us = 0; + int64_t schedule_time_us = 0; + int64_t lock_wait_time_us = 0; + int64_t save_meta_time_us = 0; + int64_t calc_delete_bitmap_time_us = 0; + int64_t partial_update_write_segment_us = 0; + int64_t add_inc_rowset_us = 0; + + std::string to_string() { + return fmt::format( + "[Publish Statistics: schedule time(us): {}, lock wait time(us): {}, save meta " + "time(us): {}, calc delete bitmap time(us): {}, partial update write segment " + "time(us): {}, add inc rowset time(us): {}]", + schedule_time_us, lock_wait_time_us, save_meta_time_us, calc_delete_bitmap_time_us, + partial_update_write_segment_us, add_inc_rowset_us); + } +}; + class TabletPublishTxnTask { public: TabletPublishTxnTask(EnginePublishVersionTask* engine_task, TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t partition_id, int64_t transaction_id, Version version, const TabletInfo& tablet_info); - ~TabletPublishTxnTask() {} + ~TabletPublishTxnTask() = default; void handle(); @@ -55,6 +75,7 @@ private: int64_t _transaction_id; Version _version; TabletInfo _tablet_info; + TabletPublishStatistics _stats; }; class EnginePublishVersionTask : public EngineTask { diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index f7e56fd785..ad799868aa 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -42,6 +42,7 @@ #include "olap/storage_engine.h" #include "olap/tablet_manager.h" #include "olap/tablet_meta.h" +#include "olap/task/engine_publish_version_task.h" #include "util/time.h" namespace doris { @@ -78,7 +79,7 @@ TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size) _txn_map_locks = new std::shared_mutex[_txn_map_shard_size]; _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size]; _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size]; - _txn_mutex = new std::mutex[_txn_shard_size]; + _txn_mutex = new std::shared_mutex[_txn_shard_size]; _txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size]; _txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size]; } @@ -165,9 +166,11 @@ Status TxnManager::commit_txn(TPartitionId partition_id, const TabletSharedPtr& } Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, - TTransactionId transaction_id, const Version& version) { + TTransactionId transaction_id, const Version& version, + TabletPublishStatistics* stats) { return publish_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, - tablet->tablet_id(), tablet->schema_hash(), tablet->tablet_uid(), version); + tablet->tablet_id(), tablet->schema_hash(), tablet->tablet_uid(), version, + stats); } // delete the txn from manager if it is not committed(not have a valid rowset) @@ -192,7 +195,7 @@ void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id, pair<int64_t, int64_t> key(partition_id, transaction_id); TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid); - std::unique_lock<std::mutex> txn_lock(_get_txn_lock(transaction_id)); + std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); { // get tx std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); @@ -236,7 +239,7 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, return Status::Error<ROWSET_INVALID>(); } - std::unique_lock<std::mutex> txn_lock(_get_txn_lock(transaction_id)); + std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); // this while loop just run only once, just for if break do { // get tx @@ -322,21 +325,24 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, // remove a txn from txn manager Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, - SchemaHash schema_hash, TabletUid tablet_uid, - const Version& version) { + SchemaHash schema_hash, TabletUid tablet_uid, const Version& version, + TabletPublishStatistics* stats) { auto tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id); if (tablet == nullptr) { return Status::OK(); } + DCHECK(stats != nullptr); pair<int64_t, int64_t> key(partition_id, transaction_id); TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid); RowsetSharedPtr rowset = nullptr; TabletTxnInfo tablet_txn_info; + int64_t t1 = MonotonicMicros(); /// Step 1: get rowset, tablet_txn_info by key { - std::unique_lock<std::mutex> txn_rlock(_get_txn_lock(transaction_id)); + std::shared_lock txn_rlock(_get_txn_lock(transaction_id)); std::shared_lock txn_map_rlock(_get_txn_map_lock(transaction_id)); + stats->lock_wait_time_us += MonotonicMicros() - t1; txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) { @@ -368,9 +374,12 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, std::unique_ptr<RowsetWriter> rowset_writer; _create_transient_rowset_writer(tablet, rowset, &rowset_writer); + int64_t t2 = MonotonicMicros(); RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset, tablet_txn_info.rowset_ids, - tablet_txn_info.delete_bitmap, + tablet_txn_info.delete_bitmap, transaction_id, rowset_writer.get())); + int64_t t3 = MonotonicMicros(); + stats->calc_delete_bitmap_time_us = t3 - t2; if (rowset->tablet_schema()->is_partial_update()) { // build rowset writer and merge transient rowset RETURN_IF_ERROR(rowset_writer->flush()); @@ -380,8 +389,11 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, // erase segment cache cause we will add a segment to rowset SegmentLoader::instance()->erase_segment(rowset->rowset_id()); } + stats->partial_update_write_segment_us = MonotonicMicros() - t3; + int64_t t4 = MonotonicMicros(); std::shared_lock rlock(tablet->get_header_lock()); tablet->save_meta(); + stats->save_meta_time_us = MonotonicMicros() - t4; } /// Step 3: add to binlog @@ -397,8 +409,10 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, } /// Step 4: save meta + int64_t t5 = MonotonicMicros(); auto status = RowsetMetaManager::save(meta, tablet_uid, rowset->rowset_id(), rowset->rowset_meta()->get_rowset_pb(), enable_binlog); + stats->save_meta_time_us += MonotonicMicros() - t5; if (!status.ok()) { LOG(WARNING) << "save committed rowset failed. when publish txn rowset_id:" << rowset->rowset_id() << ", tablet id: " << tablet_id @@ -415,14 +429,16 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, /// Step 5: remove tablet_info from tnx_tablet_map // txn_tablet_map[key] empty, remove key from txn_tablet_map - std::unique_lock<std::mutex> txn_lock(_get_txn_lock(transaction_id)); + int64_t t6 = MonotonicMicros(); + std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); + stats->lock_wait_time_us += MonotonicMicros() - t6; txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) { it->second.erase(tablet_info); VLOG_NOTICE << "publish txn successfully." << " partition_id: " << key.first << ", txn_id: " << key.second - << ", tablet: " << tablet_info.to_string() + << ", tablet_id: " << tablet_info.tablet_id << ", rowsetid: " << rowset->rowset_id() << ", version: " << version.first << "," << version.second; if (it->second.empty()) { diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index e2ed4290f9..36be3b03f5 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -48,6 +48,7 @@ namespace doris { class DeltaWriter; class OlapMeta; +struct TabletPublishStatistics; struct TabletTxnInfo { PUniqueId load_id; @@ -106,7 +107,8 @@ public: const RowsetSharedPtr& rowset_ptr, bool is_recovery); Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, - TTransactionId transaction_id, const Version& version); + TTransactionId transaction_id, const Version& version, + TabletPublishStatistics* stats); // delete the txn from manager if it is not committed(not have a valid rowset) Status rollback_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, @@ -124,7 +126,7 @@ public: // not persist rowset meta because Status publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid, - const Version& version); + const Version& version, TabletPublishStatistics* stats); // delete the txn from manager if it is not committed(not have a valid rowset) Status rollback_txn(TPartitionId partition_id, TTransactionId transaction_id, @@ -201,7 +203,7 @@ private: txn_partition_map_t& _get_txn_partition_map(TTransactionId transactionId); - inline std::mutex& _get_txn_lock(TTransactionId transactionId); + inline std::shared_mutex& _get_txn_lock(TTransactionId transactionId); std::shared_mutex& _get_txn_tablet_delta_writer_map_lock(TTransactionId transactionId); @@ -231,7 +233,7 @@ private: std::shared_mutex* _txn_map_locks; - std::mutex* _txn_mutex; + std::shared_mutex* _txn_mutex; txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map; std::shared_mutex* _txn_tablet_delta_writer_map_locks; @@ -251,7 +253,7 @@ inline TxnManager::txn_partition_map_t& TxnManager::_get_txn_partition_map( return _txn_partition_maps[transactionId & (_txn_map_shard_size - 1)]; } -inline std::mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) { +inline std::shared_mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) { return _txn_mutex[transactionId & (_txn_shard_size - 1)]; } diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp index 1085a09879..f1d5f49619 100644 --- a/be/test/olap/delta_writer_test.cpp +++ b/be/test/olap/delta_writer_test.cpp @@ -47,6 +47,7 @@ #include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/tablet_manager.h" +#include "olap/task/engine_publish_version_task.h" #include "olap/txn_manager.h" #include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" @@ -610,9 +611,10 @@ TEST_F(TestDeltaWriter, vec_write) { for (auto& tablet_rs : tablet_related_rs) { std::cout << "start to publish txn" << std::endl; RowsetSharedPtr rowset = tablet_rs.second; + TabletPublishStatistics stats; res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id, write_req.schema_hash, - tablet_rs.first.tablet_uid, version); + tablet_rs.first.tablet_uid, version, &stats); ASSERT_TRUE(res.ok()); std::cout << "start to add inc rowset:" << rowset->rowset_id() << ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first @@ -725,9 +727,10 @@ TEST_F(TestDeltaWriter, vec_sequence_col) { std::cout << "start to publish txn" << std::endl; RowsetSharedPtr rowset = tablet_related_rs.begin()->second; + TabletPublishStatistics pstats; res = k_engine->txn_manager()->publish_txn( meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id, - write_req.schema_hash, tablet_related_rs.begin()->first.tablet_uid, version); + write_req.schema_hash, tablet_related_rs.begin()->first.tablet_uid, version, &pstats); ASSERT_TRUE(res.ok()); std::cout << "start to add inc rowset:" << rowset->rowset_id() << ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first diff --git a/be/test/olap/engine_storage_migration_task_test.cpp b/be/test/olap/engine_storage_migration_task_test.cpp index e4dfe784d2..79fbf26203 100644 --- a/be/test/olap/engine_storage_migration_task_test.cpp +++ b/be/test/olap/engine_storage_migration_task_test.cpp @@ -46,6 +46,7 @@ #include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/tablet_manager.h" +#include "olap/task/engine_publish_version_task.h" #include "olap/txn_manager.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptor_helper.h" @@ -206,9 +207,10 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) { write_req.txn_id, write_req.partition_id, &tablet_related_rs); for (auto& tablet_rs : tablet_related_rs) { RowsetSharedPtr rowset = tablet_rs.second; + TabletPublishStatistics stats; res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, tablet->tablet_id(), tablet->schema_hash(), - tablet->tablet_uid(), version); + tablet->tablet_uid(), version, &stats); EXPECT_EQ(Status::OK(), res); res = tablet->add_inc_rowset(rowset); EXPECT_EQ(Status::OK(), res); diff --git a/be/test/olap/remote_rowset_gc_test.cpp b/be/test/olap/remote_rowset_gc_test.cpp index 49a70b56a5..a3b21a52c9 100644 --- a/be/test/olap/remote_rowset_gc_test.cpp +++ b/be/test/olap/remote_rowset_gc_test.cpp @@ -50,6 +50,7 @@ #include "olap/tablet.h" #include "olap/tablet_manager.h" #include "olap/tablet_meta.h" +#include "olap/task/engine_publish_version_task.h" #include "olap/txn_manager.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptor_helper.h" @@ -212,9 +213,10 @@ TEST_F(RemoteRowsetGcTest, normal) { write_req.txn_id, write_req.partition_id, &tablet_related_rs); for (auto& tablet_rs : tablet_related_rs) { RowsetSharedPtr rowset = tablet_rs.second; + TabletPublishStatistics stats; st = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id, write_req.schema_hash, - tablet_rs.first.tablet_uid, version); + tablet_rs.first.tablet_uid, version, &stats); ASSERT_EQ(Status::OK(), st); st = tablet->add_inc_rowset(rowset); ASSERT_EQ(Status::OK(), st); diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index 52b23c68f2..038605f7cb 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -57,6 +57,7 @@ #include "olap/tablet.h" #include "olap/tablet_manager.h" #include "olap/tablet_meta.h" +#include "olap/task/engine_publish_version_task.h" #include "olap/txn_manager.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptor_helper.h" @@ -405,9 +406,10 @@ void createTablet(StorageEngine* engine, TabletSharedPtr* tablet, int64_t replic &tablet_related_rs); for (auto& tablet_rs : tablet_related_rs) { RowsetSharedPtr rowset = tablet_rs.second; + TabletPublishStatistics stats; st = engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id, (*tablet)->tablet_id(), (*tablet)->schema_hash(), - (*tablet)->tablet_uid(), version); + (*tablet)->tablet_uid(), version, &stats); ASSERT_EQ(Status::OK(), st); st = (*tablet)->add_inc_rowset(rowset); ASSERT_EQ(Status::OK(), st); diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp index 2932ae4aa8..146836cdc2 100644 --- a/be/test/olap/txn_manager_test.cpp +++ b/be/test/olap/txn_manager_test.cpp @@ -40,6 +40,7 @@ #include "olap/rowset/rowset_meta_manager.h" #include "olap/storage_engine.h" #include "olap/tablet_schema.h" +#include "olap/task/engine_publish_version_task.h" #include "util/uid_util.h" using ::testing::_; @@ -280,8 +281,9 @@ TEST_F(TxnManagerTest, PublishVersionSuccessful) { schema_hash, _tablet_uid, load_id, _rowset, false); EXPECT_TRUE(status == Status::OK()); Version new_version(10, 11); + TabletPublishStatistics stats; status = _txn_mgr->publish_txn(_meta, partition_id, transaction_id, tablet_id, schema_hash, - _tablet_uid, new_version); + _tablet_uid, new_version, &stats); EXPECT_TRUE(status == Status::OK()); RowsetMetaSharedPtr rowset_meta(new RowsetMeta()); @@ -298,8 +300,9 @@ TEST_F(TxnManagerTest, PublishVersionSuccessful) { TEST_F(TxnManagerTest, PublishNotExistedTxn) { Version new_version(10, 11); auto not_exist_txn = transaction_id + 1000; + TabletPublishStatistics stats; Status status = _txn_mgr->publish_txn(_meta, partition_id, not_exist_txn, tablet_id, - schema_hash, _tablet_uid, new_version); + schema_hash, _tablet_uid, new_version, &stats); EXPECT_EQ(status, Status::OK()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org