This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new ab7ecb7740e [pick](statistics) pick loaded rows statistics to 2.0 
(#25531)
ab7ecb7740e is described below

commit ab7ecb7740eb19f88a99dcd16263b4a68c54ae3b
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Fri Oct 20 13:35:55 2023 +0800

    [pick](statistics) pick loaded rows statistics to 2.0 (#25531)
    
    * [feature](load) collect loaded rows on table level after txn published 
(#24346)
    
    As title.
    
    Stream load 20 lines
    
    ```
    2023-09-14 11:40:04,186 DEBUG (PUBLISH_VERSION|23) 
[DatabaseTransactionMgr.updateCatalogAfterVisible():1769] table id to loaded 
rows:{51016=20}
    ```
    
    ```
    mysql> select count(*) from dup_tbl_basic;
    +----------+
    | count(*) |
    +----------+
    |       20 |
    +----------+
    1 row in set (0.05 sec)
    ```
    
    * [enhancement](statistics) collect table level loaded rows on BE to make 
RPC light weight (#24609)
    
    * [fix](statistics) use replication_num as replica num (#25325)
---
 be/src/agent/task_worker_pool.cpp                  |  7 ++++--
 be/src/olap/task/engine_publish_version_task.cpp   | 25 ++++++++++++++++++----
 be/src/olap/task/engine_publish_version_task.h     | 11 +++++++---
 .../java/org/apache/doris/master/MasterImpl.java   |  3 +++
 .../org/apache/doris/task/PublishVersionTask.java  | 14 ++++++++++++
 .../doris/transaction/DatabaseTransactionMgr.java  | 16 ++++++++++++++
 .../doris/transaction/PublishVersionDaemon.java    | 25 ++++++++++++++++++++--
 .../apache/doris/transaction/TransactionState.java | 13 +++++++++++
 gensrc/thrift/MasterService.thrift                 |  1 +
 9 files changed, 104 insertions(+), 11 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index b288731d09a..09c46461aca 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1528,14 +1528,17 @@ void 
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
         std::map<TTabletId, TVersion> succ_tablets;
         // partition_id, tablet_id, publish_version
         std::vector<std::tuple<int64_t, int64_t, int64_t>> 
discontinuous_version_tablets;
+        std::map<TTableId, int64_t> table_id_to_num_delta_rows;
         uint32_t retry_time = 0;
         Status status;
         bool is_task_timeout = false;
         while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
             succ_tablets.clear();
             error_tablet_ids.clear();
+            table_id_to_num_delta_rows.clear();
             EnginePublishVersionTask engine_task(publish_version_req, 
&error_tablet_ids,
-                                                 &succ_tablets, 
&discontinuous_version_tablets);
+                                                 &succ_tablets, 
&discontinuous_version_tablets,
+                                                 &table_id_to_num_delta_rows);
             status = StorageEngine::instance()->execute_task(&engine_task);
             if (status.ok()) {
                 break;
@@ -1620,7 +1623,7 @@ void 
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
         finish_task_request.__set_succ_tablets(succ_tablets);
         finish_task_request.__set_error_tablet_ids(
                 std::vector<TTabletId>(error_tablet_ids.begin(), 
error_tablet_ids.end()));
-
+        
finish_task_request.__set_table_id_to_delta_num_rows(table_id_to_num_delta_rows);
         _finish_task(finish_task_request);
         _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
     }
diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index 24593c40bad..702c4386f11 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -29,6 +29,7 @@
 #include <set>
 #include <shared_mutex>
 #include <string>
+#include <unordered_map>
 #include <utility>
 
 #include "common/logging.h"
@@ -72,11 +73,13 @@ void TabletPublishStatistics::record_in_bvar() {
 EnginePublishVersionTask::EnginePublishVersionTask(
         const TPublishVersionRequest& publish_version_req, 
std::set<TTabletId>* error_tablet_ids,
         std::map<TTabletId, TVersion>* succ_tablets,
-        std::vector<std::tuple<int64_t, int64_t, int64_t>>* 
discontinuous_version_tablets)
+        std::vector<std::tuple<int64_t, int64_t, int64_t>>* 
discontinuous_version_tablets,
+        std::map<TTableId, int64_t>* table_id_to_num_delta_rows)
         : _publish_version_req(publish_version_req),
           _error_tablet_ids(error_tablet_ids),
           _succ_tablets(succ_tablets),
-          _discontinuous_version_tablets(discontinuous_version_tablets) {}
+          _discontinuous_version_tablets(discontinuous_version_tablets),
+          _table_id_to_num_delta_rows(table_id_to_num_delta_rows) {}
 
 void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
     std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
@@ -91,7 +94,7 @@ Status EnginePublishVersionTask::finish() {
     std::unique_ptr<ThreadPoolToken> token =
             
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
                     ThreadPool::ExecutionMode::CONCURRENT);
-
+    std::unordered_map<int64_t, int64_t> tablet_id_to_num_delta_rows;
     // each partition
     for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
         int64_t partition_id = par_ver_info.partition_id;
@@ -188,6 +191,11 @@ Status EnginePublishVersionTask::finish() {
                     continue;
                 }
             }
+
+            auto rowset_meta_ptr = rowset->rowset_meta();
+            tablet_id_to_num_delta_rows.insert(
+                    {rowset_meta_ptr->tablet_id(), 
rowset_meta_ptr->num_rows()});
+
             auto tablet_publish_txn_ptr = 
std::make_shared<TabletPublishTxnTask>(
                     this, tablet, rowset, partition_id, transaction_id, 
version, tablet_info);
             auto submit_st = token->submit_func([=]() { 
tablet_publish_txn_ptr->handle(); });
@@ -204,7 +212,6 @@ Status EnginePublishVersionTask::finish() {
         std::set<TabletInfo> partition_related_tablet_infos;
         
StorageEngine::instance()->tablet_manager()->get_partition_related_tablets(
                 partition_id, &partition_related_tablet_infos);
-
         Version version(par_ver_info.version, par_ver_info.version);
         for (auto& tablet_info : partition_related_tablet_infos) {
             TabletSharedPtr tablet =
@@ -241,6 +248,7 @@ Status EnginePublishVersionTask::finish() {
             }
         }
     }
+    _calculate_tbl_num_delta_rows(tablet_id_to_num_delta_rows);
 
     if (!res.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
         LOG(INFO) << "finish to publish version on transaction."
@@ -252,6 +260,15 @@ Status EnginePublishVersionTask::finish() {
     return res;
 }
 
+void EnginePublishVersionTask::_calculate_tbl_num_delta_rows(
+        const std::unordered_map<int64_t, int64_t>& 
tablet_id_to_num_delta_rows) {
+    for (const auto& kv : tablet_id_to_num_delta_rows) {
+        auto table_id =
+                
StorageEngine::instance()->tablet_manager()->get_tablet(kv.first)->get_table_id();
+        (*_table_id_to_num_delta_rows)[table_id] += kv.second;
+    }
+}
+
 TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* 
engine_task,
                                            TabletSharedPtr tablet, 
RowsetSharedPtr rowset,
                                            int64_t partition_id, int64_t 
transaction_id,
diff --git a/be/src/olap/task/engine_publish_version_task.h 
b/be/src/olap/task/engine_publish_version_task.h
index 0a270c93d2a..8f3790574a2 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -87,21 +87,26 @@ public:
     EnginePublishVersionTask(
             const TPublishVersionRequest& publish_version_req,
             std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, 
TVersion>* succ_tablets,
-            std::vector<std::tuple<int64_t, int64_t, int64_t>>* 
discontinous_version_tablets);
-    ~EnginePublishVersionTask() {}
+            std::vector<std::tuple<int64_t, int64_t, int64_t>>* 
discontinous_version_tablets,
+            std::map<TTableId, int64_t>* table_id_to_num_delta_rows);
+    ~EnginePublishVersionTask() override = default;
 
-    virtual Status finish() override;
+    Status finish() override;
 
     void add_error_tablet_id(int64_t tablet_id);
 
     int64_t finish_task();
 
 private:
+    void _calculate_tbl_num_delta_rows(
+            const std::unordered_map<int64_t, int64_t>& 
tablet_id_to_num_delta_rows);
+
     const TPublishVersionRequest& _publish_version_req;
     std::mutex _tablet_ids_mutex;
     std::set<TTabletId>* _error_tablet_ids;
     std::map<TTabletId, TVersion>* _succ_tablets;
     std::vector<std::tuple<int64_t, int64_t, int64_t>>* 
_discontinuous_version_tablets;
+    std::map<TTableId, int64_t>* _table_id_to_num_delta_rows;
 };
 
 class AsyncTabletPublishTask {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 23118647cca..2833eff5f3d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -491,6 +491,9 @@ public class MasterImpl {
             // not remove the task from queue and be will retry
             return;
         }
+        if (request.isSetTableIdToDeltaNumRows()) {
+            
publishVersionTask.setTableIdToDeltaNumRows(request.getTableIdToDeltaNumRows());
+        }
         AgentTaskQueue.removeTask(publishVersionTask.getBackendId(),
                                   publishVersionTask.getTaskType(),
                                   publishVersionTask.getSignature());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
index 8461b1db4f5..74cff551b4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
@@ -21,6 +21,7 @@ import org.apache.doris.thrift.TPartitionVersionInfo;
 import org.apache.doris.thrift.TPublishVersionRequest;
 import org.apache.doris.thrift.TTaskType;
 
+import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -38,6 +39,11 @@ public class PublishVersionTask extends AgentTask {
     // tabletId => version, current version = 0
     private Map<Long, Long> succTablets;
 
+    /**
+     * To collect loaded rows for each table from each BE
+     */
+    private final Map<Long, Long> tableIdToDeltaNumRows = Maps.newHashMap();
+
     public PublishVersionTask(long backendId, long transactionId, long dbId,
             List<TPartitionVersionInfo> partitionVersionInfos, long 
createTime) {
         super(null, backendId, TTaskType.PUBLISH_VERSION, dbId, -1L, -1L, -1L, 
-1L, transactionId, createTime);
@@ -81,4 +87,12 @@ public class PublishVersionTask extends AgentTask {
         }
         this.errorTablets.addAll(errorTablets);
     }
+
+    public void setTableIdToDeltaNumRows(Map<Long, Long> 
tabletIdToDeltaNumRows) {
+        this.tableIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows);
+    }
+
+    public Map<Long, Long> getTableIdToDeltaNumRows() {
+        return tableIdToDeltaNumRows;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 7677c1684f6..b74f902358f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -51,6 +51,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.AnalysisManager;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.ClearTransactionTask;
@@ -1791,6 +1792,21 @@ public class DatabaseTransactionMgr {
                 }
             }
         }
+        AnalysisManager analysisManager = 
Env.getCurrentEnv().getAnalysisManager();
+        Map<Long, Long> tableIdToTotalNumDeltaRows = 
transactionState.getTableIdToTotalNumDeltaRows();
+        Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
+        tableIdToTotalNumDeltaRows
+                        .forEach((tableId, numRows) -> {
+                            OlapTable table = (OlapTable) 
db.getTableNullable(tableId);
+                            if (table != null) {
+                                short replicaNum = table.getTableProperty()
+                                        .getReplicaAllocation()
+                                        .getTotalReplicaNum();
+                                tableIdToNumDeltaRows.put(tableId, numRows / 
replicaNum);
+                            }
+                        });
+        LOG.debug("table id to loaded rows:{}", tableIdToNumDeltaRows);
+        tableIdToNumDeltaRows.forEach(analysisManager::updateUpdatedRows);
         return true;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 33ea8de07eb..747508d2118 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -29,13 +29,17 @@ import org.apache.doris.task.PublishVersionTask;
 import org.apache.doris.thrift.TPartitionVersionInfo;
 import org.apache.doris.thrift.TTaskType;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Stream;
 
 public class PublishVersionDaemon extends MasterDaemon {
 
@@ -121,12 +125,29 @@ public class PublishVersionDaemon extends MasterDaemon {
             AgentTaskExecutor.submit(batchTask);
         }
 
+        Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
         // try to finish the transaction, if failed just retry in next loop
         for (TransactionState transactionState : readyTransactionStates) {
-            boolean hasBackendAliveAndUnfinishTask = 
transactionState.getPublishVersionTasks().values().stream()
+            Stream<PublishVersionTask> publishVersionTaskStream = 
transactionState
+                    .getPublishVersionTasks()
+                    .values()
+                    .stream()
+                    .peek(task -> {
+                        if (task.isFinished() && 
CollectionUtils.isEmpty(task.getErrorTablets())) {
+                            Map<Long, Long> tableIdToDeltaNumRows =
+                                    task.getTableIdToDeltaNumRows();
+                            tableIdToDeltaNumRows.forEach((tableId, numRows) 
-> {
+                                tableIdToDeltaNumRows
+                                        .computeIfPresent(tableId, (id, 
orgNumRows) -> orgNumRows + numRows);
+                                tableIdToNumDeltaRows.putIfAbsent(tableId, 
numRows);
+                            });
+                        }
+                    });
+            boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
                     .anyMatch(task -> !task.isFinished() && 
infoService.checkBackendAlive(task.getBackendId()));
+            
transactionState.setTableIdToTotalNumDeltaRows(tableIdToNumDeltaRows);
 
-            boolean shouldFinishTxn = !hasBackendAliveAndUnfinishTask || 
transactionState.isPublishTimeout();
+            boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || 
transactionState.isPublishTimeout();
             if (shouldFinishTxn) {
                 try {
                     // one transaction exception should not affect other 
transaction
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 897bc3b63b8..5d95917e58d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -253,6 +253,11 @@ public class TransactionState implements Writable {
     // tbl id -> (index ids)
     private Map<Long, Set<Long>> loadedTblIndexes = Maps.newHashMap();
 
+    /**
+     * the value is the num delta rows of all replicas in each table
+     */
+    private final Map<Long, Long> tableIdToTotalNumDeltaRows = 
Maps.newHashMap();
+
     private String errorLogUrl = null;
 
     // record some error msgs during the transaction operation.
@@ -701,6 +706,14 @@ public class TransactionState implements Writable {
         }
     }
 
+    public Map<Long, Long> getTableIdToTotalNumDeltaRows() {
+        return tableIdToTotalNumDeltaRows;
+    }
+
+    public void setTableIdToTotalNumDeltaRows(Map<Long, Long> 
tableIdToTotalNumDeltaRows) {
+        this.tableIdToTotalNumDeltaRows.putAll(tableIdToTotalNumDeltaRows);
+    }
+
     public void setErrorMsg(String errMsg) {
         this.errMsg = errMsg;
     }
diff --git a/gensrc/thrift/MasterService.thrift 
b/gensrc/thrift/MasterService.thrift
index dedc454d33f..9acd3f85f7b 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -66,6 +66,7 @@ struct TFinishTaskRequest {
     15: optional i64 copy_size
     16: optional i64 copy_time_ms
     17: optional map<Types.TTabletId, Types.TVersion> succ_tablets
+    18: optional map<i64, i64> table_id_to_delta_num_rows
 }
 
 struct TTablet {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to