This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 145683ccdbc [improvement](group commit) make get column function more 
reliable when replaying wal (#28900)
145683ccdbc is described below

commit 145683ccdbc9ad37f23b2a8b91888f7b24aaa688
Author: huanghaibin <[email protected]>
AuthorDate: Sun Dec 24 21:17:39 2023 +0800

    [improvement](group commit) make get column function more reliable when 
replaying wal (#28900)
---
 be/src/olap/wal_table.cpp                          | 46 +++++++++++++---------
 be/src/olap/wal_table.h                            |  1 +
 .../apache/doris/service/FrontendServiceImpl.java  | 38 +++++++++---------
 gensrc/thrift/FrontendService.thrift               |  7 +++-
 4 files changed, 53 insertions(+), 39 deletions(-)

diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp
index bde0e8dd69d..7f98c410b07 100644
--- a/be/src/olap/wal_table.cpp
+++ b/be/src/olap/wal_table.cpp
@@ -204,7 +204,17 @@ Status WalTable::_replay_wal_internal(const std::string& 
wal) {
     if (!st.ok()) {
         LOG(WARNING) << "abort txn " << wal_id << " fail";
     }
-    RETURN_IF_ERROR(_get_column_info(_db_id, _table_id));
+    auto get_st = _get_column_info(_db_id, _table_id);
+    if (!get_st.ok()) {
+        if (get_st.is<ErrorCode::NOT_FOUND>()) {
+            {
+                std::lock_guard<std::mutex> lock(_replay_wal_lock);
+                _replay_wal_map.erase(wal);
+            }
+            RETURN_IF_ERROR(_delete_wal(wal_id));
+        }
+        return get_st;
+    }
 #endif
     RETURN_IF_ERROR(_send_request(wal_id, wal, label));
     return Status::OK();
@@ -354,8 +364,7 @@ Status WalTable::_send_request(int64_t wal_id, const 
std::string& wal, const std
         }
     } else {
         LOG(INFO) << "success to replay wal =" << wal;
-        RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
-        
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, 
wal_id));
+        RETURN_IF_ERROR(_delete_wal(wal_id));
         std::lock_guard<std::mutex> lock(_replay_wal_lock);
         if (_replay_wal_map.erase(wal)) {
             LOG(INFO) << "erase " << wal << " from _replay_wal_map";
@@ -414,26 +423,21 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t 
tb_id) {
                 [&request, &result](FrontendServiceConnection& client) {
                     client->getColumnInfo(result, request);
                 }));
-        std::string columns_str = result.column_info;
-        std::vector<std::string> column_element;
-        doris::vectorized::WalReader::string_split(columns_str, ",", 
column_element);
+        status = Status::create(result.status);
+        if (!status.ok()) {
+            return status;
+        }
+        std::vector<TColumnInfo> column_element = result.columns;
         int64_t column_index = 1;
         _column_id_name_map.clear();
         _column_id_index_map.clear();
         for (auto column : column_element) {
-            auto pos = column.find(":");
-            try {
-                auto column_name = column.substr(0, pos);
-                int64_t column_id = std::strtoll(column.substr(pos + 
1).c_str(), NULL, 10);
-                _column_id_name_map.emplace(column_id, column_name);
-                _column_id_index_map.emplace(column_id, column_index);
-                column_index++;
-            } catch (const std::invalid_argument& e) {
-                return Status::InvalidArgument("Invalid format, {}", e.what());
-            }
+            auto column_name = column.columnName;
+            auto column_id = column.columnId;
+            _column_id_name_map.emplace(column_id, column_name);
+            _column_id_index_map.emplace(column_id, column_index);
+            column_index++;
         }
-
-        status = Status::create(result.status);
     }
     return status;
 }
@@ -447,4 +451,10 @@ Status WalTable::_read_wal_header(const std::string& 
wal_path, std::string& colu
     return Status::OK();
 }
 
+Status WalTable::_delete_wal(int64_t wal_id) {
+    RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id));
+    RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, 
wal_id));
+    return Status::OK();
+}
+
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_table.h b/be/src/olap/wal_table.h
index 354f4f16b05..e3d66d577a2 100644
--- a/be/src/olap/wal_table.h
+++ b/be/src/olap/wal_table.h
@@ -50,6 +50,7 @@ private:
     Status _read_wal_header(const std::string& wal, std::string& columns);
     bool _need_replay(const replay_wal_info& info);
     Status _replay_wal_internal(const std::string& wal);
+    Status _delete_wal(int64_t wal_id);
 
 private:
     ExecEnv* _exec_env;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 4f88d22c836..75405854cab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -114,6 +114,7 @@ import org.apache.doris.thrift.TCheckAuthRequest;
 import org.apache.doris.thrift.TCheckAuthResult;
 import org.apache.doris.thrift.TColumnDef;
 import org.apache.doris.thrift.TColumnDesc;
+import org.apache.doris.thrift.TColumnInfo;
 import org.apache.doris.thrift.TCommitTxnRequest;
 import org.apache.doris.thrift.TCommitTxnResult;
 import org.apache.doris.thrift.TConfirmUnusedRemoteFilesRequest;
@@ -240,7 +241,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -3280,40 +3280,38 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     @Override
     public TGetColumnInfoResult getColumnInfo(TGetColumnInfoRequest request) {
         TGetColumnInfoResult result = new TGetColumnInfoResult();
-        TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
         long dbId = request.getDbId();
         long tableId = request.getTableId();
         if (!Env.getCurrentEnv().isMaster()) {
-            errorStatus.setStatusCode(TStatusCode.NOT_MASTER);
-            errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG);
+            status.setStatusCode(TStatusCode.NOT_MASTER);
+            status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
             LOG.error("failed to getColumnInfo: {}", NOT_MASTER_ERR_MSG);
             return result;
         }
 
-        Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
+        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
         if (db == null) {
-            errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d 
is not exists", dbId)));
-            result.setStatus(errorStatus);
+            status.setStatusCode(TStatusCode.NOT_FOUND);
+            status.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is 
not exists", dbId)));
             return result;
         }
-
-        Table table;
-        try {
-            table = db.getTable(tableId).get();
-        } catch (NoSuchElementException e) {
-            errorStatus.setErrorMsgs(
+        Table table = db.getTableNullable(tableId);
+        if (table == null) {
+            status.setStatusCode(TStatusCode.NOT_FOUND);
+            status.setErrorMsgs(
                     (Lists.newArrayList(String.format("dbId=%d tableId=%d is 
not exists", dbId, tableId))));
-            result.setStatus(errorStatus);
             return result;
         }
-        StringBuilder sb = new StringBuilder();
+        List<TColumnInfo> columnsResult = Lists.newArrayList();
         for (Column column : table.getBaseSchema(true)) {
-            sb.append(column.getName() + ":" + column.getUniqueId() + ",");
+            final TColumnInfo info = new TColumnInfo();
+            info.setColumnName(column.getName());
+            info.setColumnId(column.getUniqueId());
+            columnsResult.add(info);
         }
-        String columnInfo = sb.toString();
-        columnInfo = columnInfo.substring(0, columnInfo.length() - 1);
-        result.setStatus(new TStatus(TStatusCode.OK));
-        result.setColumnInfo(columnInfo);
+        result.setColumns(columnsResult);
         return result;
     }
 
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index d8a53d35662..d672597a0db 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1302,6 +1302,11 @@ struct TGetBackendMetaResult {
     3: optional Types.TNetworkAddress master_address
 }
 
+struct TColumnInfo {
+  1: optional string columnName
+  2: optional i64 columnId
+}
+
 struct TGetColumnInfoRequest {
     1: optional i64 db_id
     2: optional i64 table_id
@@ -1309,7 +1314,7 @@ struct TGetColumnInfoRequest {
 
 struct TGetColumnInfoResult {
     1: optional Status.TStatus status
-    2: optional string column_info
+    2: optional list<TColumnInfo> columns
 }
 
 service FrontendService {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to