This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new ab7ecb7740e [pick](statistics) pick loaded rows statistics to 2.0 (#25531) ab7ecb7740e is described below commit ab7ecb7740eb19f88a99dcd16263b4a68c54ae3b Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Fri Oct 20 13:35:55 2023 +0800 [pick](statistics) pick loaded rows statistics to 2.0 (#25531) * [feature](load) collect loaded rows on table level after txn published (#24346) As title. Stream load 20 lines ``` 2023-09-14 11:40:04,186 DEBUG (PUBLISH_VERSION|23) [DatabaseTransactionMgr.updateCatalogAfterVisible():1769] table id to loaded rows:{51016=20} ``` ``` mysql> select count(*) from dup_tbl_basic; +----------+ | count(*) | +----------+ | 20 | +----------+ 1 row in set (0.05 sec) ``` * [enhancement](statistics) collect table level loaded rows on BE to make RPC light weight (#24609) * [fix](statistics) use replication_num as replica num (#25325) --- be/src/agent/task_worker_pool.cpp | 7 ++++-- be/src/olap/task/engine_publish_version_task.cpp | 25 ++++++++++++++++++---- be/src/olap/task/engine_publish_version_task.h | 11 +++++++--- .../java/org/apache/doris/master/MasterImpl.java | 3 +++ .../org/apache/doris/task/PublishVersionTask.java | 14 ++++++++++++ .../doris/transaction/DatabaseTransactionMgr.java | 16 ++++++++++++++ .../doris/transaction/PublishVersionDaemon.java | 25 ++++++++++++++++++++-- .../apache/doris/transaction/TransactionState.java | 13 +++++++++++ gensrc/thrift/MasterService.thrift | 1 + 9 files changed, 104 insertions(+), 11 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index b288731d09a..09c46461aca 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1528,14 +1528,17 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { std::map<TTabletId, TVersion> succ_tablets; // partition_id, tablet_id, publish_version std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets; + std::map<TTableId, int64_t> table_id_to_num_delta_rows; uint32_t retry_time = 0; Status status; bool is_task_timeout = false; while (retry_time < PUBLISH_VERSION_MAX_RETRY) { succ_tablets.clear(); error_tablet_ids.clear(); + table_id_to_num_delta_rows.clear(); EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids, - &succ_tablets, &discontinuous_version_tablets); + &succ_tablets, &discontinuous_version_tablets, + &table_id_to_num_delta_rows); status = StorageEngine::instance()->execute_task(&engine_task); if (status.ok()) { break; @@ -1620,7 +1623,7 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() { finish_task_request.__set_succ_tablets(succ_tablets); finish_task_request.__set_error_tablet_ids( std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end())); - + finish_task_request.__set_table_id_to_delta_num_rows(table_id_to_num_delta_rows); _finish_task(finish_task_request); _remove_task_info(agent_task_req.task_type, agent_task_req.signature); } diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 24593c40bad..702c4386f11 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -29,6 +29,7 @@ #include <set> #include <shared_mutex> #include <string> +#include <unordered_map> #include <utility> #include "common/logging.h" @@ -72,11 +73,13 @@ void TabletPublishStatistics::record_in_bvar() { EnginePublishVersionTask::EnginePublishVersionTask( const TPublishVersionRequest& publish_version_req, std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>* succ_tablets, - std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets) + std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets, + std::map<TTableId, int64_t>* table_id_to_num_delta_rows) : _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids), _succ_tablets(succ_tablets), - _discontinuous_version_tablets(discontinuous_version_tablets) {} + _discontinuous_version_tablets(discontinuous_version_tablets), + _table_id_to_num_delta_rows(table_id_to_num_delta_rows) {} void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) { std::lock_guard<std::mutex> lck(_tablet_ids_mutex); @@ -91,7 +94,7 @@ Status EnginePublishVersionTask::finish() { std::unique_ptr<ThreadPoolToken> token = StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT); - + std::unordered_map<int64_t, int64_t> tablet_id_to_num_delta_rows; // each partition for (auto& par_ver_info : _publish_version_req.partition_version_infos) { int64_t partition_id = par_ver_info.partition_id; @@ -188,6 +191,11 @@ Status EnginePublishVersionTask::finish() { continue; } } + + auto rowset_meta_ptr = rowset->rowset_meta(); + tablet_id_to_num_delta_rows.insert( + {rowset_meta_ptr->tablet_id(), rowset_meta_ptr->num_rows()}); + auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>( this, tablet, rowset, partition_id, transaction_id, version, tablet_info); auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); }); @@ -204,7 +212,6 @@ Status EnginePublishVersionTask::finish() { std::set<TabletInfo> partition_related_tablet_infos; StorageEngine::instance()->tablet_manager()->get_partition_related_tablets( partition_id, &partition_related_tablet_infos); - Version version(par_ver_info.version, par_ver_info.version); for (auto& tablet_info : partition_related_tablet_infos) { TabletSharedPtr tablet = @@ -241,6 +248,7 @@ Status EnginePublishVersionTask::finish() { } } } + _calculate_tbl_num_delta_rows(tablet_id_to_num_delta_rows); if (!res.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) { LOG(INFO) << "finish to publish version on transaction." @@ -252,6 +260,15 @@ Status EnginePublishVersionTask::finish() { return res; } +void EnginePublishVersionTask::_calculate_tbl_num_delta_rows( + const std::unordered_map<int64_t, int64_t>& tablet_id_to_num_delta_rows) { + for (const auto& kv : tablet_id_to_num_delta_rows) { + auto table_id = + StorageEngine::instance()->tablet_manager()->get_tablet(kv.first)->get_table_id(); + (*_table_id_to_num_delta_rows)[table_id] += kv.second; + } +} + TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task, TabletSharedPtr tablet, RowsetSharedPtr rowset, int64_t partition_id, int64_t transaction_id, diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index 0a270c93d2a..8f3790574a2 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -87,21 +87,26 @@ public: EnginePublishVersionTask( const TPublishVersionRequest& publish_version_req, std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>* succ_tablets, - std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets); - ~EnginePublishVersionTask() {} + std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets, + std::map<TTableId, int64_t>* table_id_to_num_delta_rows); + ~EnginePublishVersionTask() override = default; - virtual Status finish() override; + Status finish() override; void add_error_tablet_id(int64_t tablet_id); int64_t finish_task(); private: + void _calculate_tbl_num_delta_rows( + const std::unordered_map<int64_t, int64_t>& tablet_id_to_num_delta_rows); + const TPublishVersionRequest& _publish_version_req; std::mutex _tablet_ids_mutex; std::set<TTabletId>* _error_tablet_ids; std::map<TTabletId, TVersion>* _succ_tablets; std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets; + std::map<TTableId, int64_t>* _table_id_to_num_delta_rows; }; class AsyncTabletPublishTask { diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 23118647cca..2833eff5f3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -491,6 +491,9 @@ public class MasterImpl { // not remove the task from queue and be will retry return; } + if (request.isSetTableIdToDeltaNumRows()) { + publishVersionTask.setTableIdToDeltaNumRows(request.getTableIdToDeltaNumRows()); + } AgentTaskQueue.removeTask(publishVersionTask.getBackendId(), publishVersionTask.getTaskType(), publishVersionTask.getSignature()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java index 8461b1db4f5..74cff551b4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java @@ -21,6 +21,7 @@ import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TPublishVersionRequest; import org.apache.doris.thrift.TTaskType; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,6 +39,11 @@ public class PublishVersionTask extends AgentTask { // tabletId => version, current version = 0 private Map<Long, Long> succTablets; + /** + * To collect loaded rows for each table from each BE + */ + private final Map<Long, Long> tableIdToDeltaNumRows = Maps.newHashMap(); + public PublishVersionTask(long backendId, long transactionId, long dbId, List<TPartitionVersionInfo> partitionVersionInfos, long createTime) { super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, -1L, transactionId, createTime); @@ -81,4 +87,12 @@ public class PublishVersionTask extends AgentTask { } this.errorTablets.addAll(errorTablets); } + + public void setTableIdToDeltaNumRows(Map<Long, Long> tabletIdToDeltaNumRows) { + this.tableIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows); + } + + public Map<Long, Long> getTableIdToDeltaNumRows() { + return tableIdToDeltaNumRows; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 7677c1684f6..b74f902358f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -51,6 +51,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.EditLog; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.ClearTransactionTask; @@ -1791,6 +1792,21 @@ public class DatabaseTransactionMgr { } } } + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + Map<Long, Long> tableIdToTotalNumDeltaRows = transactionState.getTableIdToTotalNumDeltaRows(); + Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap(); + tableIdToTotalNumDeltaRows + .forEach((tableId, numRows) -> { + OlapTable table = (OlapTable) db.getTableNullable(tableId); + if (table != null) { + short replicaNum = table.getTableProperty() + .getReplicaAllocation() + .getTotalReplicaNum(); + tableIdToNumDeltaRows.put(tableId, numRows / replicaNum); + } + }); + LOG.debug("table id to loaded rows:{}", tableIdToNumDeltaRows); + tableIdToNumDeltaRows.forEach(analysisManager::updateUpdatedRows); return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 33ea8de07eb..747508d2118 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -29,13 +29,17 @@ import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TTaskType; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Stream; public class PublishVersionDaemon extends MasterDaemon { @@ -121,12 +125,29 @@ public class PublishVersionDaemon extends MasterDaemon { AgentTaskExecutor.submit(batchTask); } + Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap(); // try to finish the transaction, if failed just retry in next loop for (TransactionState transactionState : readyTransactionStates) { - boolean hasBackendAliveAndUnfinishTask = transactionState.getPublishVersionTasks().values().stream() + Stream<PublishVersionTask> publishVersionTaskStream = transactionState + .getPublishVersionTasks() + .values() + .stream() + .peek(task -> { + if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) { + Map<Long, Long> tableIdToDeltaNumRows = + task.getTableIdToDeltaNumRows(); + tableIdToDeltaNumRows.forEach((tableId, numRows) -> { + tableIdToDeltaNumRows + .computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows); + tableIdToNumDeltaRows.putIfAbsent(tableId, numRows); + }); + } + }); + boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream .anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId())); + transactionState.setTableIdToTotalNumDeltaRows(tableIdToNumDeltaRows); - boolean shouldFinishTxn = !hasBackendAliveAndUnfinishTask || transactionState.isPublishTimeout(); + boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout(); if (shouldFinishTxn) { try { // one transaction exception should not affect other transaction diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 897bc3b63b8..5d95917e58d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -253,6 +253,11 @@ public class TransactionState implements Writable { // tbl id -> (index ids) private Map<Long, Set<Long>> loadedTblIndexes = Maps.newHashMap(); + /** + * the value is the num delta rows of all replicas in each table + */ + private final Map<Long, Long> tableIdToTotalNumDeltaRows = Maps.newHashMap(); + private String errorLogUrl = null; // record some error msgs during the transaction operation. @@ -701,6 +706,14 @@ public class TransactionState implements Writable { } } + public Map<Long, Long> getTableIdToTotalNumDeltaRows() { + return tableIdToTotalNumDeltaRows; + } + + public void setTableIdToTotalNumDeltaRows(Map<Long, Long> tableIdToTotalNumDeltaRows) { + this.tableIdToTotalNumDeltaRows.putAll(tableIdToTotalNumDeltaRows); + } + public void setErrorMsg(String errMsg) { this.errMsg = errMsg; } diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index dedc454d33f..9acd3f85f7b 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -66,6 +66,7 @@ struct TFinishTaskRequest { 15: optional i64 copy_size 16: optional i64 copy_time_ms 17: optional map<Types.TTabletId, Types.TVersion> succ_tablets + 18: optional map<i64, i64> table_id_to_delta_num_rows } struct TTablet { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org