This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a6bf8c1  [Feature](Transaction) Support two phase commit (2PC) for 
stream load (#7473)
a6bf8c1 is described below

commit a6bf8c13eb2e2ba05b866db02f872df1ac775638
Author: weizuo93 <wei...@apache.org>
AuthorDate: Wed Feb 16 11:55:04 2022 +0800

    [Feature](Transaction) Support two phase commit (2PC) for stream load 
(#7473)
    
    The two phase batch commit means:
    During Stream load, after data is written, the message will be returned to 
the client,
    the data is invisible at this point and the transaction status is 
PRECOMMITTED.
    The data will be visible only after COMMIT is triggered by client.
    
    1. User can invoke the following interface to trigger commit operations for 
transaction:
    
    curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H 
"txn_operation:commit" \
    http://fe_host:http_port/api/{db}/_stream_load_2pc
    
    or
    
    curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H 
"txn_operation:commit" \
    http://be_host:webserver_port/api/{db}/_stream_load_2pc
    
    
    2.User can invoke the following interface to trigger abort operations for 
transaction:
    
    curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H 
"txn_operation:abort" \
    http://fe_host:http_port/api/{db}/_stream_load_2pc
    
    or
    
    curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H 
"txn_operation:abort" \
    http://be_host:webserver_port/api/{db}/_stream_load_2pc
---
 be/src/common/config.h                             |   1 +
 be/src/http/CMakeLists.txt                         |   1 +
 be/src/http/action/stream_load.cpp                 |  21 +-
 be/src/http/action/stream_load_2pc.cpp             | 102 ++++++++
 .../action/stream_load_2pc.h}                      |  26 +-
 be/src/http/http_common.h                          |   4 +
 be/src/runtime/stream_load/stream_load_context.cpp |   4 +
 be/src/runtime/stream_load/stream_load_context.h   |   4 +
 .../runtime/stream_load/stream_load_executor.cpp   |  70 +++++-
 be/src/runtime/stream_load/stream_load_executor.h  |  11 +
 be/src/service/http_service.cpp                    |   5 +
 .../load-data/stream-load-manual.md                |  22 ++
 .../load-data/stream-load-manual.md                |  22 ++
 .../org/apache/doris/clone/TabletScheduler.java    |  15 ++
 .../main/java/org/apache/doris/common/Config.java  |   6 +
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../doris/common/LabelAlreadyUsedException.java    |  13 +-
 .../org/apache/doris/httpv2/rest/LoadAction.java   |  58 +++++
 .../doris/httpv2/rest/RestBaseController.java      |   2 +
 .../apache/doris/service/FrontendServiceImpl.java  | 131 ++++++++++
 .../doris/transaction/DatabaseTransactionMgr.java  | 273 ++++++++++++++++++---
 .../doris/transaction/GlobalTransactionMgr.java    |  64 ++++-
 .../doris/transaction/PartitionCommitInfo.java     |  12 +
 .../apache/doris/transaction/TransactionState.java |  19 +-
 .../doris/transaction/TransactionStatus.java       |   7 +-
 .../doris/transaction/TransactionStateTest.java    |   2 +-
 gensrc/thrift/FrontendService.thrift               |  19 ++
 27 files changed, 844 insertions(+), 74 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index cc34489..c5eed39 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -354,6 +354,7 @@ CONF_mInt32(stream_load_record_batch_size, "50");
 CONF_Int32(stream_load_record_expire_time_secs, "28800");
 // time interval to clean expired stream load records
 CONF_mInt64(clean_stream_load_record_interval_secs, "1800");
+CONF_mBool(disable_stream_load_2pc, "true");
 
 // OlapTableSink sender's send interval, should be less than the real response 
time of a tablet writer rpc.
 // You may need to lower the speed when the sink receiver bes are too busy.
diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt
index e004b01..8ca052d 100644
--- a/be/src/http/CMakeLists.txt
+++ b/be/src/http/CMakeLists.txt
@@ -47,6 +47,7 @@ add_library(Webserver STATIC
   action/pprof_actions.cpp
   action/metrics_action.cpp
   action/stream_load.cpp
+  action/stream_load_2pc.cpp
   action/meta_action.cpp
   action/compaction_action.cpp
   action/config_action.cpp
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 19bcff2..a9da838 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -193,11 +193,16 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
     // wait stream load finish
     RETURN_IF_ERROR(ctx->future.get());
 
-    // If put file success we need commit this load
-    int64_t commit_and_publish_start_time = MonotonicNanos();
-    RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx));
-    ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
-
+    if (ctx->two_phase_commit) {
+        int64_t pre_commit_start_time = MonotonicNanos();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx));
+        ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
+    } else {
+        // If put file success we need commit this load
+        int64_t commit_and_publish_start_time = MonotonicNanos();
+        RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx));
+        ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
+    }
     return Status::OK();
 }
 
@@ -218,6 +223,8 @@ int StreamLoadAction::on_header(HttpRequest* req) {
         ctx->label = generate_uuid_string();
     }
 
+    ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? 
true : false;
+
     LOG(INFO) << "new income streaming load request." << ctx->brief() << ", 
db=" << ctx->db
               << ", tbl=" << ctx->table;
 
@@ -266,6 +273,10 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, 
StreamLoadContext* ct
         return Status::InternalError(ss.str());
     }
 
+    if (ctx->two_phase_commit && config::disable_stream_load_2pc) {
+        return Status::InternalError("Two phase commit (2PC) for stream load 
was disabled");
+    }
+
     // check content length
     ctx->body_bytes = 0;
     size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
diff --git a/be/src/http/action/stream_load_2pc.cpp 
b/be/src/http/action/stream_load_2pc.cpp
new file mode 100644
index 0000000..511bce9
--- /dev/null
+++ b/be/src/http/action/stream_load_2pc.cpp
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/stream_load_2pc.h"
+
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/stringbuffer.h>
+
+#include "common/status.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "http/utils.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "runtime/stream_load/stream_load_executor.h"
+#include "util/json_util.h"
+
+namespace doris {
+
+const static std::string HEADER_JSON = "application/json";
+
+StreamLoad2PCAction::StreamLoad2PCAction(ExecEnv* exec_env) : 
_exec_env(exec_env) {}
+
+void StreamLoad2PCAction::handle(HttpRequest* req) {
+    Status status = Status::OK();
+    std::string status_result;
+
+    if (config::disable_stream_load_2pc) {
+        status = Status::InternalError("Two phase commit (2PC) for stream load 
was disabled");
+        status_result = to_json(status);
+        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        return;
+    }
+
+    StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
+    ctx->ref();
+    req->set_handler_ctx(ctx);
+    ctx->db = req->param(HTTP_DB_KEY);
+    std::string req_txn_id = req->header(HTTP_TXN_ID_KEY);
+    try {
+        ctx->txn_id = std::stoull(req_txn_id);
+    } catch (const std::exception& e) {
+        status = Status::InternalError("convert txn_id [" + req_txn_id + "] 
failed");
+        status_result = to_json(status);
+        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        return;
+    }
+    ctx->txn_operation = req->header(HTTP_TXN_OPERATION_KEY);
+    if (ctx->txn_operation.compare("commit") != 0 && 
ctx->txn_operation.compare("abort") != 0) {
+        status = Status::InternalError("transaction operation should be 
\'commit\' or \'abort\'");
+        status_result = to_json(status);
+        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+        return;
+    }
+
+    if (!parse_basic_auth(*req, &ctx->auth)) {
+        LOG(WARNING) << "parse basic authorization failed.";
+        status = Status::InternalError("no valid Basic authorization");
+    }
+
+    status =_exec_env->stream_load_executor()->operate_txn_2pc(ctx);
+
+    if (!status.ok()) {
+        status_result = to_json(status);
+    } else {
+        status_result = get_success_info(req_txn_id, ctx->txn_operation);
+    }
+    HttpChannel::send_reply(req, HttpStatus::OK, status_result);
+}
+
+std::string StreamLoad2PCAction::get_success_info(const std::string txn_id, 
const std::string txn_operation) {
+    rapidjson::StringBuffer s;
+    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+
+    writer.StartObject();
+    // status
+    writer.Key("status");
+    writer.String("Success");
+    // msg
+    std::string msg = "transaction [" + txn_id + "] " + txn_operation + " 
successfully.";
+    writer.Key("msg");
+    writer.String(msg.c_str());
+    writer.EndObject();
+    return s.GetString();
+}
+
+} // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_executor.h 
b/be/src/http/action/stream_load_2pc.h
similarity index 56%
copy from be/src/runtime/stream_load/stream_load_executor.h
copy to be/src/http/action/stream_load_2pc.h
index 4ba791c..7043b52 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/http/action/stream_load_2pc.h
@@ -17,33 +17,21 @@
 
 #pragma once
 
-#include <memory>
+#include <string>
+#include "http/http_handler.h"
 
 namespace doris {
 
 class ExecEnv;
-class StreamLoadContext;
-class Status;
-class TTxnCommitAttachment;
-class StreamLoadPipe;
 
-class StreamLoadExecutor {
+class StreamLoad2PCAction : public HttpHandler {
 public:
-    StreamLoadExecutor(ExecEnv* exec_env) : _exec_env(exec_env) {}
+    StreamLoad2PCAction(ExecEnv* exec_env);
 
-    Status begin_txn(StreamLoadContext* ctx);
+    virtual ~StreamLoad2PCAction(){};
 
-    Status commit_txn(StreamLoadContext* ctx);
-
-    void rollback_txn(StreamLoadContext* ctx);
-
-    Status execute_plan_fragment(StreamLoadContext* ctx);
-
-    Status execute_plan_fragment(StreamLoadContext* ctx, 
std::shared_ptr<StreamLoadPipe> pipe);
-private:
-    // collect the load statistics from context and set them to stat
-    // return true if stat is set, otherwise, return false
-    bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* 
attachment);
+    void handle(HttpRequest* req) override;
+    std::string get_success_info(const std::string txn_id, const std::string 
txn_operation);
 
 private:
     ExecEnv* _exec_env;
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 4ade62f..a35b241 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -54,4 +54,8 @@ static const std::string HTTP_SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
 
 static const std::string HTTP_100_CONTINUE = "100-continue";
 
+static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
+static const std::string HTTP_TXN_ID_KEY = "txn_id";
+static const std::string HTTP_TXN_OPERATION_KEY = "txn_operation";
+
 } // namespace doris
diff --git a/be/src/runtime/stream_load/stream_load_context.cpp 
b/be/src/runtime/stream_load/stream_load_context.cpp
index a7ab92f..172b32f 100644
--- a/be/src/runtime/stream_load/stream_load_context.cpp
+++ b/be/src/runtime/stream_load/stream_load_context.cpp
@@ -34,6 +34,10 @@ std::string StreamLoadContext::to_json() const {
     writer.Key("Label");
     writer.String(label.c_str());
 
+    writer.Key("TwoPhaseCommit");
+    std::string need_two_phase_commit = two_phase_commit ? "true" : "false";
+    writer.String(need_two_phase_commit.c_str());
+
     // status
     writer.Key("Status");
     switch (status.code()) {
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index cd3b458..ac7338b 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -134,6 +134,7 @@ public:
     double max_filter_ratio = 0.0;
     int32_t timeout_second = -1;
     AuthInfo auth;
+    bool two_phase_commit = false;
 
     // the following members control the max progress of a consuming
     // process. if any of them reach, the consuming will finish.
@@ -152,6 +153,8 @@ public:
 
     int64_t txn_id = -1;
 
+    std::string txn_operation = "";
+
     bool need_rollback = false;
     // when use_streaming is true, we use stream_pipe to send source data,
     // otherwise we save source data to file first, then process it.
@@ -180,6 +183,7 @@ public:
     int64_t begin_txn_cost_nanos = 0;
     int64_t stream_load_put_cost_nanos = 0;
     int64_t commit_and_publish_txn_cost_nanos = 0;
+    int64_t pre_commit_txn_cost_nanos = 0;
     int64_t read_data_cost_nanos = 0;
     int64_t write_data_cost_nanos = 0;
 
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 55f590c..7fa2245 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -19,10 +19,6 @@
 
 #include "common/status.h"
 #include "common/utils.h"
-#include "gen_cpp/FrontendService.h"
-#include "gen_cpp/FrontendService_types.h"
-#include "gen_cpp/HeartbeatService_types.h"
-#include "gen_cpp/Types_types.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
@@ -174,10 +170,65 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* 
ctx) {
     return Status::OK();
 }
 
-Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
-    DorisMetrics::instance()->txn_commit_request_total->increment(1);
+Status StreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
 
     TLoadTxnCommitRequest request;
+    get_commit_request(ctx, request);
+
+    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    TLoadTxnCommitResult result;
+#ifndef BE_TEST
+    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+            master_addr.hostname, master_addr.port,
+            [&request, &result](FrontendServiceConnection& client) {
+              client->loadTxnPreCommit(result, request);
+            },
+            config::txn_commit_rpc_timeout_ms));
+#else
+    result = k_stream_load_commit_result;
+#endif
+    // Return if this transaction is precommitted successful; otherwise, we 
need try
+    // to
+    // rollback this transaction
+    Status status(result.status);
+    if (!status.ok()) {
+        LOG(WARNING) << "precommit transaction failed, errmsg=" << 
status.get_error_msg()
+                     << ctx->brief();
+        if (status.code() == TStatusCode::PUBLISH_TIMEOUT) {
+            ctx->need_rollback = false;
+        }
+        return status;
+    }
+    // precommit success, set need_rollback to false
+    ctx->need_rollback = false;
+    return Status::OK();
+}
+
+Status StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
+    TLoadTxn2PCRequest request;
+    set_request_auth(&request, ctx->auth);
+    request.__set_db(ctx->db);
+    request.__set_txnId(ctx->txn_id);
+    request.__set_operation(ctx->txn_operation);
+    request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms);
+
+    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    TLoadTxn2PCResult result;
+    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+            master_addr.hostname, master_addr.port,
+            [&request, &result](FrontendServiceConnection& client) {
+              client->loadTxn2PC(result, request);
+            },
+            config::txn_commit_rpc_timeout_ms));
+    Status status(result.status);
+    if (!status.ok()) {
+        LOG(WARNING) << "2PC commit transaction failed, errmsg=" << 
status.get_error_msg();
+        return status;
+    }
+    return Status::OK();
+}
+
+void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx, 
TLoadTxnCommitRequest& request) {
     set_request_auth(&request, ctx->auth);
     request.db = ctx->db;
     if (ctx->db_id > 0) {
@@ -197,6 +248,13 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* 
ctx) {
         request.txnCommitAttachment = std::move(attachment);
         request.__isset.txnCommitAttachment = true;
     }
+}
+
+Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
+    DorisMetrics::instance()->txn_commit_request_total->increment(1);
+
+    TLoadTxnCommitRequest request;
+    get_commit_request(ctx, request);
 
     TNetworkAddress master_addr = _exec_env->master_info()->network_address;
     TLoadTxnCommitResult result;
diff --git a/be/src/runtime/stream_load/stream_load_executor.h 
b/be/src/runtime/stream_load/stream_load_executor.h
index 4ba791c..8adcde1 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/runtime/stream_load/stream_load_executor.h
@@ -19,6 +19,11 @@
 
 #include <memory>
 
+#include "gen_cpp/FrontendService.h"
+#include "gen_cpp/FrontendService_types.h"
+#include "gen_cpp/HeartbeatService_types.h"
+#include "gen_cpp/Types_types.h"
+
 namespace doris {
 
 class ExecEnv;
@@ -33,8 +38,14 @@ public:
 
     Status begin_txn(StreamLoadContext* ctx);
 
+    Status pre_commit_txn(StreamLoadContext* ctx);
+
+    Status operate_txn_2pc(StreamLoadContext* ctx);
+
     Status commit_txn(StreamLoadContext* ctx);
 
+    void get_commit_request(StreamLoadContext* ctx, TLoadTxnCommitRequest& 
request);
+
     void rollback_txn(StreamLoadContext* ctx);
 
     Status execute_plan_fragment(StreamLoadContext* ctx);
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index e19427f..be7519f 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -33,6 +33,7 @@
 #include "http/action/restore_tablet_action.h"
 #include "http/action/snapshot_action.h"
 #include "http/action/stream_load.h"
+#include "http/action/stream_load_2pc.h"
 #include "http/action/tablet_migration_action.h"
 #include "http/action/tablets_distribution_action.h"
 #include "http/action/tablets_info_action.h"
@@ -62,6 +63,10 @@ Status HttpService::start() {
     StreamLoadAction* streamload_action = _pool.add(new 
StreamLoadAction(_env));
     _ev_http_server->register_handler(HttpMethod::PUT, 
"/api/{db}/{table}/_stream_load",
                                       streamload_action);
+    StreamLoad2PCAction* streamload_2pc_action = _pool.add(new 
StreamLoad2PCAction(_env));
+    _ev_http_server->register_handler(HttpMethod::PUT, 
"/api/{db}/_stream_load_2pc",
+                                      streamload_2pc_action);
+
 
     // register download action
     std::vector<std::string> allow_paths;
diff --git a/docs/en/administrator-guide/load-data/stream-load-manual.md 
b/docs/en/administrator-guide/load-data/stream-load-manual.md
index ab4bda2..b91b339 100644
--- a/docs/en/administrator-guide/load-data/stream-load-manual.md
+++ b/docs/en/administrator-guide/load-data/stream-load-manual.md
@@ -169,6 +169,28 @@ The number of rows in the original file = `dpp.abnorm.ALL 
+ dpp.norm.ALL`
 + merge\_type
      The type of data merging supports three types: APPEND, DELETE, and MERGE. 
APPEND is the default value, which means that all this batch of data needs to 
be appended to the existing data. DELETE means to delete all rows with the same 
key as this batch of data. MERGE semantics Need to be used in conjunction with 
the delete condition, which means that the data that meets the delete condition 
is processed according to DELETE semantics and the rest is processed according 
to APPEND semantics
 
++ two\_phase\_commit
+
+    Stream load supports the two-phase commit mode。The mode could be enabled 
by declaring ```two_phase_commit=true``` in http header. This mode is disabled 
by default.
+    the two-phase commit mode means:During Stream load, after data is written, 
the message will be returned to the client, the data is invisible at this point 
and the transaction status is PRECOMMITTED. The data will be visible only after 
COMMIT is triggered by client。
+    
+    1. User can invoke the following interface to trigger commit operations 
for transaction:
+    ```
+    curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H 
"txn_operation:commit" http://fe_host:http_port/api/{db}/_stream_load_2pc
+    ```
+    or
+    ```
+    curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H 
"txn_operation:commit" http://be_host:webserver_port/api/{db}/_stream_load_2pc
+    ```
+    
+    2. User can invoke the following interface to trigger abort operations for 
transaction:
+    ```
+    curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H 
"txn_operation:abort" http://fe_host:http_port/api/{db}/_stream_load_2pc
+    ```
+    or
+    ```
+    curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H 
"txn_operation:abort" http://be_host:webserver_port/api/{db}/_stream_load_2pc
+    ```
 
 ### Return results
 
diff --git a/docs/zh-CN/administrator-guide/load-data/stream-load-manual.md 
b/docs/zh-CN/administrator-guide/load-data/stream-load-manual.md
index 4bb0c08..a5ba62a 100644
--- a/docs/zh-CN/administrator-guide/load-data/stream-load-manual.md
+++ b/docs/zh-CN/administrator-guide/load-data/stream-load-manual.md
@@ -179,6 +179,28 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的
     3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 
对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 
对其不产生影响。
 + merge\_type
     数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 
其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 
条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理
+    
++ two\_phase\_commit
+
+    Stream load 导入可以开启两阶段事务提交模式。开启方式为在 HEADER 中声明 ```two_phase_commit=true``` 
。默认的两阶段批量事务提交为关闭。
+    两阶段批量事务提交模式的意思是:Stream 
load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。
+    
+    1. 用户可以调用如下接口对stream load事务触发commit操作:
+    ```
+    curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H 
"txn_operation:commit" http://fe_host:http_port/api/{db}/_stream_load_2pc
+    ```
+    或
+    ```
+    curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H 
"txn_operation:commit" http://be_host:webserver_port/api/{db}/_stream_load_2pc
+    ```
+    2. 用户可以调用如下接口对stream load事务触发abort操作:
+    ```
+    curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H 
"txn_operation:abort" http://fe_host:http_port/api/{db}/_stream_load_2pc
+    ```
+    或
+    ```
+    curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H 
"txn_operation:abort" http://be_host:webserver_port/api/{db}/_stream_load_2pc
+    ```
 
 #### strict mode 与 source data 的导入关系
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 39573e1..ae96bde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -63,6 +63,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Table;
 
+import org.apache.doris.transaction.DatabaseTransactionMgr;
+import org.apache.doris.transaction.TransactionState;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -526,6 +528,19 @@ public class TabletScheduler extends MasterDaemon {
                 throw new SchedException(Status.UNRECOVERABLE, "table's state 
is not NORMAL");
             }
 
+            if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE) {
+                try {
+                    DatabaseTransactionMgr dbTransactionMgr = 
Catalog.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(db.getId());
+                    for (TransactionState transactionState : 
dbTransactionMgr.getPreCommittedTxnList()) {
+                        
if(transactionState.getTableIdList().contains(tbl.getId())) {
+                            // If table releate to transaction with 
precommitted status, do not allow to do balance.
+                            throw new SchedException(Status.UNRECOVERABLE, 
"There exists PRECOMMITTED transaction releated to table");
+                        }
+                    }
+                } catch (AnalysisException e) {
+                }
+            }
+
             if (statusPair.first != TabletStatus.VERSION_INCOMPLETE
                     && (partition.getState() != PartitionState.NORMAL || 
tableState != OlapTableState.NORMAL)
                     && tableState != OlapTableState.WAITING_STABLE) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index a5a55b0..1ebe53b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -563,6 +563,12 @@ public class Config extends ConfigBase {
     public static int stream_load_default_timeout_second = 600; // 600s
 
     /**
+     * Default stream load pre-commit status timeout
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int stream_load_default_precommit_timeout_second = 3600; // 
3600s
+
+    /**
      * Max load timeout applicable to all type of load except for stream load
      */
     @ConfField(mutable = true, masterOnly = true)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
index e05ba86..6d5661e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -238,6 +238,8 @@ public final class FeMetaVersion {
     public static final int VERSION_105 = 105;
     // add ldap info
     public static final int VERSION_106 = 106;
+    // support stream load 2PC
+    public static final int VERSION_107 = 107;
     // note: when increment meta version, should assign the latest version to 
VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_106;
+    public static final int VERSION_CURRENT = VERSION_107;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
index 6a5ff58..d723c88 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.common;
 
-import org.apache.doris.transaction.TransactionStatus;
+import org.apache.doris.transaction.TransactionState;
 
 import com.google.common.base.Preconditions;
 
@@ -33,19 +33,22 @@ public class LabelAlreadyUsedException extends DdlException 
{
         super("Label [" + label + "] has already been used.");
     }
 
-    public LabelAlreadyUsedException(String label, TransactionStatus 
txnStatus) {
-        super("Label [" + label + "] has already been used.");
-        switch (txnStatus) {
+    public LabelAlreadyUsedException(TransactionState txn) {
+        super("Label [" + txn.getLabel() + "] has already been used, relate to 
txn [" + txn.getTransactionId() + "]");
+        switch (txn.getTransactionStatus()) {
         case UNKNOWN:
         case PREPARE:
             jobStatus = "RUNNING";
             break;
+        case PRECOMMITTED:
+            jobStatus = "PRECOMMITTED";
+            break;
         case COMMITTED:
         case VISIBLE:
             jobStatus = "FINISHED";
             break;
         default:
-            Preconditions.checkState(false, txnStatus);
+            Preconditions.checkState(false, txn.getTransactionStatus());
             break;
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index b674834..8cacb08 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -73,6 +73,15 @@ public class LoadAction extends RestBaseController {
         return executeWithoutPassword(request, response, db, table);
     }
 
+    @RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = 
RequestMethod.PUT)
+    public Object streamLoad2PC(HttpServletRequest request,
+                                   HttpServletResponse response,
+                                   @PathVariable(value = DB_KEY) String db) {
+        this.isStreamLoad = true;
+        executeCheckPassword(request, response);
+        return executeStreamLoad2PC(request, db);
+    }
+
     // Same as Multi load, to be compatible with http v1's response body,
     // we return error by using RestBaseResult.
     private Object executeWithoutPassword(HttpServletRequest request,
@@ -154,4 +163,53 @@ public class LoadAction extends RestBaseController {
             return new RestBaseResult(e.getMessage());
         }
     }
+
+    private Object executeStreamLoad2PC(HttpServletRequest request, String db) 
{
+        try {
+            String dbName = db;
+
+            final String clusterName = ConnectContext.get().getClusterName();
+            if (Strings.isNullOrEmpty(clusterName)) {
+                return new RestBaseResult("No cluster selected.");
+            }
+
+            if (Strings.isNullOrEmpty(dbName)) {
+                return new RestBaseResult("No database selected.");
+            }
+
+            if (Strings.isNullOrEmpty(request.getHeader(TXN_ID_KEY))) {
+                return new RestBaseResult("No transaction id selected.");
+            }
+
+            String txnOperation = request.getHeader(TXN_OPERATION_KEY);
+            if (Strings.isNullOrEmpty(txnOperation)) {
+                return new RestBaseResult("No transaction operation(\'commit\' 
or \'abort\') selected.");
+            }
+
+            // Choose a backend sequentially.
+            SystemInfoService.BeAvailablePredicate beAvailablePredicate =
+                    new SystemInfoService.BeAvailablePredicate(false, false, 
true);
+            List<Long> backendIds = 
Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
+                    1, beAvailablePredicate, false, clusterName, null, null);
+            if (backendIds == null) {
+                return new 
RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
+            }
+
+            Backend backend = 
Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
+            if (backend == null) {
+                return new 
RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
+            }
+
+            TNetworkAddress redirectAddr = new 
TNetworkAddress(backend.getHost(), backend.getHttpPort());
+
+            LOG.info("redirect stream load 2PC action to destination={}, db: 
{}, txn: {}, operation: {}",
+                    redirectAddr.toString(), dbName, 
request.getHeader(TXN_ID_KEY), txnOperation);
+
+            RedirectView redirectView = redirectTo(request, redirectAddr);
+            return redirectView;
+
+        } catch (Exception e) {
+            return new RestBaseResult(e.getMessage());
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
index db24c6b..e5988fd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java
@@ -49,6 +49,8 @@ public class RestBaseController extends BaseController {
     protected static final String DB_KEY = "db";
     protected static final String TABLE_KEY = "table";
     protected static final String LABEL_KEY = "label";
+    protected static final String TXN_ID_KEY = "txn_id";
+    protected static final String TXN_OPERATION_KEY = "txn_operation";
     private static final Logger LOG = 
LogManager.getLogger(RestBaseController.class);
 
     public ActionAuthorizationInfo executeCheckPassword(HttpServletRequest 
request,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index ac8ee57..94ca3bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -76,6 +76,8 @@ import org.apache.doris.thrift.TIsMethodSupportedRequest;
 import org.apache.doris.thrift.TListPrivilegesResult;
 import org.apache.doris.thrift.TListTableStatusResult;
 import org.apache.doris.thrift.TLoadCheckRequest;
+import org.apache.doris.thrift.TLoadTxn2PCRequest;
+import org.apache.doris.thrift.TLoadTxn2PCResult;
 import org.apache.doris.thrift.TLoadTxnBeginRequest;
 import org.apache.doris.thrift.TLoadTxnBeginResult;
 import org.apache.doris.thrift.TLoadTxnCommitRequest;
@@ -107,6 +109,7 @@ import 
org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
 import org.apache.doris.thrift.TUpdateMiniEtlTaskStatusRequest;
 import org.apache.doris.thrift.TWaitingTxnStatusRequest;
 import org.apache.doris.thrift.TWaitingTxnStatusResult;
+import org.apache.doris.transaction.DatabaseTransactionMgr;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
@@ -805,6 +808,134 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     }
 
     @Override
+    public TLoadTxnCommitResult loadTxnPreCommit(TLoadTxnCommitRequest 
request) throws TException {
+        String clientAddr = getClientAddrAsString();
+        LOG.debug("receive txn pre-commit request: {}, backend: {}", request, 
clientAddr);
+
+        TLoadTxnCommitResult result = new TLoadTxnCommitResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            loadTxnPreCommitImpl(request);
+        } catch (UserException e) {
+            LOG.warn("failed to pre-commit txn: {}: {}", request.getTxnId(), 
e.getMessage());
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            return result;
+        }
+        return result;
+    }
+
+    private void loadTxnPreCommitImpl(TLoadTxnCommitRequest request) throws 
UserException {
+        String cluster = request.getCluster();
+        if (Strings.isNullOrEmpty(cluster)) {
+            cluster = SystemInfoService.DEFAULT_CLUSTER;
+        }
+
+        if (request.isSetAuthCode()) {
+        } else if (request.isSetAuthCodeUuid()) {
+            checkAuthCodeUuid(request.getDb(), request.getTxnId(), 
request.getAuthCodeUuid());
+        } else {
+            checkPasswordAndPrivs(cluster, request.getUser(), 
request.getPasswd(), request.getDb(),
+                    request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
+        }
+
+        // get database
+        Catalog catalog = Catalog.getCurrentCatalog();
+        String fullDbName = ClusterNamespace.getFullName(cluster, 
request.getDb());
+        Database db;
+        if (request.isSetDbId() && request.getDbId() > 0) {
+            db = catalog.getDbNullable(request.getDbId());
+        } else {
+            db = catalog.getDbNullable(fullDbName);
+        }
+        if (db == null) {
+            String dbName = fullDbName;
+            if (Strings.isNullOrEmpty(request.getCluster())) {
+                dbName = request.getDb();
+            }
+            throw new UserException("unknown database, database=" + dbName);
+        }
+
+        long timeoutMs = request.isSetThriftRpcTimeoutMs() ? 
request.getThriftRpcTimeoutMs() / 2 : 5000;
+        Table table = db.getTableOrMetaException(request.getTbl(), 
TableType.OLAP);
+        Catalog.getCurrentGlobalTransactionMgr().preCommitTransaction2PC(
+                db, Lists.newArrayList(table), request.getTxnId(),
+                TabletCommitInfo.fromThrift(request.getCommitInfos()),
+                timeoutMs, 
TxnCommitAttachment.fromThrift(request.txnCommitAttachment));
+    }
+
+    @Override
+    public TLoadTxn2PCResult loadTxn2PC(TLoadTxn2PCRequest request) throws 
TException {
+        String clientAddr = getClientAddrAsString();
+        LOG.debug("receive txn 2PC request: {}, backend: {}", request, 
clientAddr);
+
+        TLoadTxn2PCResult result = new TLoadTxn2PCResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+        try {
+            loadTxn2PCImpl(request);
+        } catch (UserException e) {
+            LOG.warn("failed to {} txn {}: {}", request.getOperation(), 
request.getTxnId(), e.getMessage());
+            status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+            status.addToErrorMsgs(e.getMessage());
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            return result;
+        }
+        return result;
+    }
+
+    private void loadTxn2PCImpl(TLoadTxn2PCRequest request) throws 
UserException {
+        String cluster = request.getCluster();
+        if (Strings.isNullOrEmpty(cluster)) {
+            cluster = SystemInfoService.DEFAULT_CLUSTER;
+        }
+
+        String dbName = request.getDb();
+        if (Strings.isNullOrEmpty(dbName)) {
+            throw new UserException("No database selected.");
+        }
+
+        String fullDbName = ClusterNamespace.getFullName(cluster, dbName);
+
+        // get database
+        Catalog catalog = Catalog.getCurrentCatalog();
+        Database database = catalog.getDbNullable(fullDbName);
+        if (database == null) {
+            throw new UserException("unknown database, database=" + 
fullDbName);
+        }
+
+        DatabaseTransactionMgr dbTransactionMgr = 
Catalog.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(database.getId());
+        TransactionState transactionState = 
dbTransactionMgr.getTransactionState(request.getTxnId());
+        if (transactionState == null) {
+            throw new UserException("transaction [" + request.getTxnId() + "] 
not found");
+        }
+        List<Long> tableIdList = transactionState.getTableIdList();
+        List<Table> tableList = 
database.getTablesOnIdOrderWithIgnoringWrongTableId(tableIdList);
+        for (Table table : tableList) {
+            // check auth
+            checkPasswordAndPrivs(cluster, request.getUser(), 
request.getPasswd(), request.getDb(),
+                    table.getName(), request.getUserIp(), PrivPredicate.LOAD);
+        }
+
+        String txnOperation = request.getOperation().trim();
+        if (txnOperation.equalsIgnoreCase("commit")) {
+            
Catalog.getCurrentGlobalTransactionMgr().commitTransaction2PC(database, 
tableList, request.getTxnId(), 5000);
+        } else if (txnOperation.equalsIgnoreCase("abort")) {
+            
Catalog.getCurrentGlobalTransactionMgr().abortTransaction2PC(database.getId(), 
request.getTxnId());
+        } else {
+            throw new UserException("transaction operation should be 
\'commit\' or \'abort\'");
+        }
+    }
+
+    @Override
     public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) 
throws TException {
         String clientAddr = getClientAddrAsString();
         LOG.debug("receive txn commit request: {}, backend: {}", request, 
clientAddr);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 87cd981..0876965 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -51,10 +51,7 @@ import org.apache.doris.persist.EditLog;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
-import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.ClearTransactionTask;
-import org.apache.doris.task.PublishVersionTask;
-import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -284,17 +281,18 @@ public class DatabaseTransactionMgr {
                         notAbortedTxns.add(txn);
                     }
                 }
-                // there should be at most 1 txn in PREPARE/COMMITTED/VISIBLE 
status
+                // there should be at most 1 txn in 
PREPARE/PRECOMMITTED/COMMITTED/VISIBLE status
                 Preconditions.checkState(notAbortedTxns.size() <= 1, 
notAbortedTxns);
                 if (!notAbortedTxns.isEmpty()) {
                     TransactionState notAbortedTxn = notAbortedTxns.get(0);
-                    if (requestId != null && 
notAbortedTxn.getTransactionStatus() == TransactionStatus.PREPARE
+                    if (requestId != null && 
(notAbortedTxn.getTransactionStatus() == TransactionStatus.PREPARE
+                            || notAbortedTxn.getTransactionStatus() == 
TransactionStatus.PRECOMMITTED)
                             && notAbortedTxn.getRequestId() != null && 
notAbortedTxn.getRequestId().equals(requestId)) {
                         // this may be a retry request for same job, just 
return existing txn id.
                         throw new 
DuplicatedRequestException(DebugUtil.printId(requestId),
                                 notAbortedTxn.getTransactionId(), "");
                     }
-                    throw new LabelAlreadyUsedException(label, 
notAbortedTxn.getTransactionStatus());
+                    throw new LabelAlreadyUsedException(notAbortedTxn);
                 }
             }
 
@@ -342,19 +340,10 @@ public class DatabaseTransactionMgr {
         this.usedQuotaDataBytes = usedQuotaDataBytes;
     }
 
-    /**
-     * commit transaction process as follows:
-     * 1. validate whether `Load` is cancelled
-     * 2. validate whether `Table` is deleted
-     * 3. validate replicas consistency
-     * 4. update transaction state version
-     * 5. persistent transactionState
-     * 6. update nextVersion because of the failure of persistent transaction 
resulting in error version
-     */
-    public void commitTransaction(List<Table> tableList, long transactionId, 
List<TabletCommitInfo> tabletCommitInfos,
+    public void preCommitTransaction2PC(List<Table> tableList, long 
transactionId, List<TabletCommitInfo> tabletCommitInfos,
                                   TxnCommitAttachment txnCommitAttachment)
             throws UserException {
-        // 1. check status
+        // check status
         // the caller method already own db lock, we do not obtain db lock here
         Database db = catalog.getDbOrMetaException(dbId);
         TransactionState transactionState;
@@ -372,12 +361,36 @@ public class DatabaseTransactionMgr {
 
         if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
             LOG.debug("transaction is already visible: {}", transactionId);
-            return;
+            throw new TransactionCommitFailedException("transaction is already 
visible");
         }
+
         if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
             LOG.debug("transaction is already committed: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction is already 
committed");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.PRECOMMITTED) {
+            LOG.debug("transaction is already pre-committed: {}", 
transactionId);
             return;
         }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Set<Long> totalInvolvedBackends = Sets.newHashSet();
+        Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+
+        checkCommitStatus(tableList, transactionState, tabletCommitInfos, 
txnCommitAttachment, errorReplicaIds,
+                          tableToPartition, totalInvolvedBackends);
+
+        unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, 
tableToPartition, totalInvolvedBackends, db);
+        LOG.info("transaction:[{}] successfully pre-committed", 
transactionState);
+    }
+
+    private void checkCommitStatus(List<Table> tableList, TransactionState 
transactionState,
+                                   List<TabletCommitInfo> tabletCommitInfos, 
TxnCommitAttachment txnCommitAttachment,
+                                   Set<Long> errorReplicaIds, Map<Long, 
Set<Long>> tableToPartition,
+                                    Set<Long> totalInvolvedBackends) throws 
UserException {
+        Database db = catalog.getDbOrMetaException(dbId);
+
         // update transaction state extra if exists
         if (txnCommitAttachment != null) {
             transactionState.setTxnCommitAttachment(txnCommitAttachment);
@@ -385,12 +398,11 @@ public class DatabaseTransactionMgr {
 
         TabletInvertedIndex tabletInvertedIndex = 
catalog.getTabletInvertedIndex();
         Map<Long, Set<Long>> tabletToBackends = new HashMap<>();
-        Map<Long, Set<Long>> tableToPartition = new HashMap<>();
         Map<Long, Table> idToTable = new HashMap<>();
         for (int i = 0; i < tableList.size(); i++) {
             idToTable.put(tableList.get(i).getId(), tableList.get(i));
         }
-        // 2. validate potential exists problem: db->table->partition
+        // validate potential exists problem: db->table->partition
         // guarantee exist exception during a transaction
         // if index is dropped, it does not matter.
         // if table or partition is dropped during load, just ignore that 
tablet,
@@ -433,8 +445,6 @@ public class DatabaseTransactionMgr {
             }
             
tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
         }
-        Set<Long> errorReplicaIds = Sets.newHashSet();
-        Set<Long> totalInvolvedBackends = Sets.newHashSet();
         for (long tableId : tableToPartition.keySet()) {
             OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
             for (Partition partition : table.getAllPartitions()) {
@@ -511,9 +521,9 @@ public class DatabaseTransactionMgr {
                             LOG.warn("Failed to commit txn [{}]. "
                                             + "Tablet [{}] success replica num 
is {} < quorum replica num {} "
                                             + "while error backends {}",
-                                    transactionId, tablet.getId(), 
successReplicaNum, quorumReplicaNum,
+                                    transactionState.getTransactionId(), 
tablet.getId(), successReplicaNum, quorumReplicaNum,
                                     
Joiner.on(",").join(errorBackendIdsForTablet));
-                            throw new 
TabletQuorumFailedException(transactionId, tablet.getId(),
+                            throw new 
TabletQuorumFailedException(transactionState.getTransactionId(), tablet.getId(),
                                     successReplicaNum, quorumReplicaNum,
                                     errorBackendIdsForTablet);
                         }
@@ -521,6 +531,72 @@ public class DatabaseTransactionMgr {
                 }
             }
         }
+    }
+
+    /**
+     * commit transaction process as follows:
+     * 1. validate whether `Load` is cancelled
+     * 2. validate whether `Table` is deleted
+     * 3. validate replicas consistency
+     * 4. update transaction state version
+     * 5. persistent transactionState
+     * 6. update nextVersion because of the failure of persistent transaction 
resulting in error version
+     */
+    public void commitTransaction(List<Table> tableList, long transactionId, 
List<TabletCommitInfo> tabletCommitInfos,
+                                  TxnCommitAttachment txnCommitAttachment, 
Boolean is2PC)
+            throws UserException {
+        // check status
+        // the caller method already own db lock, we do not obtain db lock here
+        Database db = catalog.getDbOrMetaException(dbId);
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+
+        if (transactionState == null) {
+            LOG.debug("transaction not found: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId + "] not found.");
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED) {
+            LOG.debug("transaction is already aborted: {}", transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is already aborted. abort reason: " + 
transactionState.getReason());
+        }
+
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
+            LOG.debug("transaction is already visible: {}", transactionId);
+            if (is2PC) {
+                throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                        + "] is already visible, not pre-committed.");
+            }
+            return;
+        }
+        if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
+            LOG.debug("transaction is already committed: {}", transactionId);
+            if (is2PC) {
+                throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                        + "] is already committed, not pre-committed.");
+            }
+            return;
+        }
+
+        if (is2PC && transactionState.getTransactionStatus() == 
TransactionStatus.PREPARE) {
+            LOG.debug("transaction is prepare, not pre-committed: {}", 
transactionId);
+            throw new TransactionCommitFailedException("transaction [" + 
transactionId
+                    + "] is prepare, not pre-committed.");
+        }
+
+        Set<Long> errorReplicaIds = Sets.newHashSet();
+        Set<Long> totalInvolvedBackends = Sets.newHashSet();
+        Map<Long, Set<Long>> tableToPartition = new HashMap<>();
+        if (!is2PC) {
+            checkCommitStatus(tableList, transactionState, tabletCommitInfos, 
txnCommitAttachment, errorReplicaIds,
+                    tableToPartition, totalInvolvedBackends);
+        }
 
         // before state transform
         transactionState.beforeStateTransform(TransactionStatus.COMMITTED);
@@ -528,8 +604,11 @@ public class DatabaseTransactionMgr {
         boolean txnOperated = false;
         writeLock();
         try {
-            unprotectedCommitTransaction(transactionState, errorReplicaIds, 
tableToPartition, totalInvolvedBackends,
-                    db);
+            if (is2PC) {
+                unprotectedCommitTransaction2PC(transactionState, db);
+            } else {
+                unprotectedCommitTransaction(transactionState, 
errorReplicaIds, tableToPartition, totalInvolvedBackends, db);
+            }
             txnOperated = true;
         } finally {
             writeUnlock();
@@ -537,7 +616,7 @@ public class DatabaseTransactionMgr {
             transactionState.afterStateTransform(TransactionStatus.COMMITTED, 
txnOperated);
         }
 
-        // 6. update nextVersion because of the failure of persistent 
transaction resulting in error version
+        // update nextVersion because of the failure of persistent transaction 
resulting in error version
         updateCatalogAfterCommitted(transactionState, db);
         LOG.info("transaction:[{}] successfully committed", transactionState);
     }
@@ -646,6 +725,19 @@ public class DatabaseTransactionMgr {
         }
     }
 
+    public List<TransactionState> getPreCommittedTxnList() {
+        readLock();
+        try {
+            // only send task to preCommitted transaction
+            return idToRunningTransactionState.values().stream()
+                    .filter(transactionState -> 
(transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED))
+                    
.sorted(Comparator.comparing(TransactionState::getPreCommitTime))
+                    .collect(Collectors.toList());
+        } finally {
+            readUnlock();
+        }
+    }
+
     public List<TransactionState> getCommittedTxnList() {
         readLock();
         try {
@@ -837,6 +929,35 @@ public class DatabaseTransactionMgr {
         LOG.info("finish transaction {} successfully", transactionState);
     }
 
+    protected void unprotectedPreCommitTransaction2PC(TransactionState 
transactionState, Set<Long> errorReplicaIds,
+                                                Map<Long, Set<Long>> 
tableToPartition, Set<Long> totalInvolvedBackends,
+                                                Database db) {
+        // transaction state is modified during check if the transaction could 
committed
+        if (transactionState.getTransactionStatus() != 
TransactionStatus.PREPARE) {
+            return;
+        }
+        // update transaction state version
+        transactionState.setPreCommitTime(System.currentTimeMillis());
+        transactionState.setTransactionStatus(TransactionStatus.PRECOMMITTED);
+        transactionState.setErrorReplicas(errorReplicaIds);
+        for (long tableId : tableToPartition.keySet()) {
+            TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
+            for (long partitionId : tableToPartition.get(tableId)) {
+                PartitionCommitInfo partitionCommitInfo = new 
PartitionCommitInfo(partitionId,-1, -1, -1);
+                tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo);
+            }
+            transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo);
+        }
+        // persist transactionState
+        unprotectUpsertTransactionState(transactionState, false);
+
+        // add publish version tasks. set task to null as a placeholder.
+        // tasks will be created when publishing version.
+        for (long backendId : totalInvolvedBackends) {
+            transactionState.addPublishVersionTask(backendId, null);
+        }
+    }
+
     protected void unprotectedCommitTransaction(TransactionState 
transactionState, Set<Long> errorReplicaIds,
                                                 Map<Long, Set<Long>> 
tableToPartition, Set<Long> totalInvolvedBackends,
                                                 Database db) {
@@ -870,6 +991,52 @@ public class DatabaseTransactionMgr {
         }
     }
 
+    protected void unprotectedCommitTransaction2PC(TransactionState 
transactionState, Database db) {
+        // transaction state is modified during check if the transaction could 
committed
+        if (transactionState.getTransactionStatus() != 
TransactionStatus.PRECOMMITTED) {
+            LOG.warn("Unknow exception. state of transaction [{}] changed, 
failed to commit transaction",
+                    transactionState.getTransactionId());
+            return;
+        }
+        // update transaction state version
+        transactionState.setCommitTime(System.currentTimeMillis());
+        transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
+
+        Iterator<TableCommitInfo> tableCommitInfoIterator = 
transactionState.getIdToTableCommitInfos().values().iterator();
+        while (tableCommitInfoIterator.hasNext()) {
+            TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
+            long tableId = tableCommitInfo.getTableId();
+            OlapTable table = (OlapTable) db.getTableNullable(tableId);
+            // table maybe dropped between commit and publish, ignore this 
error
+            if (table == null) {
+                tableCommitInfoIterator.remove();
+                LOG.warn("table {} is dropped, skip and remove it from 
transaction state {}",
+                        tableId,
+                        transactionState);
+                continue;
+            }
+            Iterator<PartitionCommitInfo> partitionCommitInfoIterator = 
tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
+            while (partitionCommitInfoIterator.hasNext()) {
+                PartitionCommitInfo partitionCommitInfo = 
partitionCommitInfoIterator.next();
+                long partitionId = partitionCommitInfo.getPartitionId();
+                Partition partition = table.getPartition(partitionId);
+                // partition maybe dropped between commit and publish version, 
ignore this error
+                if (partition == null) {
+                    partitionCommitInfoIterator.remove();
+                    LOG.warn("partition {} is dropped, skip and remove it from 
transaction state {}",
+                            partitionId,
+                            transactionState);
+                    continue;
+                }
+                partitionCommitInfo.setVersion(partition.getNextVersion());
+                
partitionCommitInfo.setVersionHash(partition.getNextVersionHash());
+                partitionCommitInfo.setVersionTime(System.currentTimeMillis());
+            }
+        }
+        // persist transactionState
+        editLog.logInsertTransactionState(transactionState);
+    }
+
     // for add/update/delete TransactionState
     protected void unprotectUpsertTransactionState(TransactionState 
transactionState, boolean isReplay) {
         // if this is a replay operation, we should not log it
@@ -992,27 +1159,65 @@ public class DatabaseTransactionMgr {
         LOG.info("abort transaction: {} successfully", transactionState);
     }
 
+    public void abortTransaction2PC(long transactionId) throws UserException {
+        LOG.info("begin to abort txn {}", transactionId);
+        if (transactionId < 0) {
+            LOG.info("transaction id is {}, less than 0, maybe this is an old 
type load job, ignore abort operation", transactionId);
+            return;
+        }
+        TransactionState transactionState;
+        readLock();
+        try {
+            transactionState = unprotectedGetTransactionState(transactionId);
+        } finally {
+            readUnlock();
+        }
+
+        if (transactionState == null) {
+            throw new TransactionNotFoundException("transaction [" + 
transactionId + "] not found");
+        }
+
+        // before state transform
+        transactionState.beforeStateTransform(TransactionStatus.ABORTED);
+        boolean txnOperated = false;
+        writeLock();
+        try {
+            txnOperated = unprotectAbortTransaction(transactionId, "User 
Abort");
+        } finally {
+            writeUnlock();
+            transactionState.afterStateTransform(TransactionStatus.ABORTED, 
txnOperated, "User Abort");
+        }
+
+        // send clear txn task to BE to clear the transactions on BE.
+        // This is because parts of a txn may succeed in some BE, and these 
parts of txn should be cleared
+        // explicitly, or it will be remained on BE forever
+        // (However the report process will do the diff and send clear txn 
tasks to BE, but that is our
+        // last defense)
+        if (txnOperated && transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED) {
+            clearBackendTransactions(transactionState);
+        }
+        LOG.info("abort transaction: {} successfully", transactionState);
+    }
+
     private boolean unprotectAbortTransaction(long transactionId, String 
reason)
             throws UserException {
         TransactionState transactionState = 
unprotectedGetTransactionState(transactionId);
         if (transactionState == null) {
-            throw new TransactionNotFoundException("transaction not found", 
transactionId);
+            throw new TransactionNotFoundException("transaction [" + 
transactionId + "] not found.");
         }
         if (transactionState.getTransactionStatus() == 
TransactionStatus.ABORTED) {
-            return false;
+            throw new TransactionNotFoundException("transaction [" + 
transactionId + "] is already aborted, " +
+                    "abort reason: " + transactionState.getReason());
         }
         if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED
                 || transactionState.getTransactionStatus() == 
TransactionStatus.VISIBLE) {
-            throw new UserException("transaction's state is already "
-                    + transactionState.getTransactionStatus() + ", could not 
abort");
+            throw new UserException("transaction [" + transactionId + "] is 
already "
+                    + transactionState.getTransactionStatus() + ", could not 
abort.");
         }
         transactionState.setFinishTime(System.currentTimeMillis());
         transactionState.setReason(reason);
         transactionState.setTransactionStatus(TransactionStatus.ABORTED);
         unprotectUpsertTransactionState(transactionState, false);
-        for (PublishVersionTask task : 
transactionState.getPublishVersionTasks().values()) {
-            AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUBLISH_VERSION, task.getSignature());
-        }
         return true;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 6aedf62..2a8975e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -173,6 +173,32 @@ public class GlobalTransactionMgr implements Writable {
         }
     }
 
+    public void preCommitTransaction2PC(Database db, List<Table> tableList, 
long transactionId,
+                                               List<TabletCommitInfo> 
tabletCommitInfos, long timeoutMillis,
+                                               TxnCommitAttachment 
txnCommitAttachment)
+            throws UserException {
+        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 
timeoutMillis, TimeUnit.MILLISECONDS)) {
+            throw new UserException("get tableList write lock timeout, 
tableList=(" + StringUtils.join(tableList, ",") + ")");
+        }
+        try {
+            preCommitTransaction2PC(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
+        } finally {
+            MetaLockUtils.writeUnlockTables(tableList);
+        }
+    }
+
+    public void preCommitTransaction2PC(long dbId, List<Table> tableList, long 
transactionId, List<TabletCommitInfo> tabletCommitInfos,
+                                  TxnCommitAttachment txnCommitAttachment)
+            throws UserException {
+        if (Config.disable_load_job) {
+            throw new TransactionCommitFailedException("disable_load_job is 
set to true, all load jobs are prevented");
+        }
+
+        LOG.debug("try to pre-commit transaction: {}", transactionId);
+        DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
+        dbTransactionMgr.preCommitTransaction2PC(tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
+    }
+
     public void commitTransaction(long dbId, List<Table> tableList, long 
transactionId, List<TabletCommitInfo> tabletCommitInfos)
             throws UserException {
         commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, 
null);
@@ -196,7 +222,17 @@ public class GlobalTransactionMgr implements Writable {
 
         LOG.debug("try to commit transaction: {}", transactionId);
         DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
-        dbTransactionMgr.commitTransaction(tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
+        dbTransactionMgr.commitTransaction(tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment, false);
+    }
+
+    private void commitTransaction2PC(long dbId, long transactionId)
+            throws UserException {
+        if (Config.disable_load_job) {
+            throw new TransactionCommitFailedException("disable_load_job is 
set to true, all load jobs are prevented");
+        }
+
+        DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
+        dbTransactionMgr.commitTransaction(null, transactionId, null, null, 
true);
     }
 
     public boolean commitAndPublishTransaction(Database db, List<Table> 
tableList, long transactionId,
@@ -230,6 +266,23 @@ public class GlobalTransactionMgr implements Writable {
         return dbTransactionMgr.publishTransaction(db, transactionId, 
publishTimeoutMillis);
     }
 
+    public void commitTransaction2PC(Database db, List<Table> tableList, long 
transactionId, long timeoutMillis)
+            throws UserException {
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, 
timeoutMillis, TimeUnit.MILLISECONDS)) {
+            throw new UserException("get tableList write lock timeout, 
tableList=(" + StringUtils.join(tableList, ",") + ")");
+        }
+        try {
+            commitTransaction2PC(db.getId(), transactionId);
+        } finally {
+            MetaLockUtils.writeUnlockTables(tableList);
+        }
+        stopWatch.stop();
+        LOG.info("stream load tasks are committed successfully. txns: {}. time 
cost: {} ms." +
+                " data will be visable later.", transactionId, 
stopWatch.getTime());
+    }
+
     public void abortTransaction(long dbId, long transactionId, String reason) 
throws UserException {
         abortTransaction(dbId, transactionId, reason, null);
     }
@@ -245,6 +298,11 @@ public class GlobalTransactionMgr implements Writable {
         dbTransactionMgr.abortTransaction(label, reason);
     }
 
+    public void abortTransaction2PC(Long dbId, long transactionId) throws 
UserException {
+        DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
+        dbTransactionMgr.abortTransaction2PC(transactionId);
+    }
+
     /*
      * get all txns which is ready to publish
      * a ready-to-publish txn's partition's visible version should be ONE less 
than txn's commit version.
@@ -498,6 +556,10 @@ public class GlobalTransactionMgr implements Writable {
         for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
             try {
                 DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(txnInfo.first);
+                TransactionState transactionState = 
dbTransactionMgr.getTransactionState(txnInfo.second);
+                if (transactionState.getTransactionStatus() == 
TransactionStatus.PRECOMMITTED) {
+                    continue;
+                }
                 dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate 
BE is down", null);
             } catch (UserException e) {
                 LOG.warn("Abort txn on coordinate BE {} failed, msg={}", 
coordinateHost, e.getMessage());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
index b88d3e6..08064c0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java
@@ -85,6 +85,18 @@ public class PartitionCommitInfo implements Writable {
     public long getVersionHash() {
         return versionHash;
     }
+
+    public void setVersion(long version) {
+        this.version = version;
+    }
+
+    public void setVersionTime(long versionTime) {
+        this.versionTime = versionTime;
+    }
+
+    public void setVersionHash(long versionHash) {
+        this.versionHash = versionHash;
+    }
     
     @Override
     public String toString() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 918a962..4b7309e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -183,6 +183,7 @@ public class TransactionState implements Writable {
     private TransactionStatus transactionStatus;
     private LoadJobSourceType sourceType;
     private long prepareTime;
+    private long preCommitTime;
     private long commitTime;
     private long finishTime;
     private String reason = "";
@@ -208,6 +209,7 @@ public class TransactionState implements Writable {
     // 3. in afterStateTransform(), callback object can not be found, so the 
write lock can not be released.
     private TxnStateChangeCallback callback = null;
     private long timeoutMs = Config.stream_load_default_timeout_second;
+    private long preCommittedTimeoutMs = 
Config.stream_load_default_precommit_timeout_second * 1000;
     private String authCode = "";
 
     // is set to true, we will double the publish timeout
@@ -238,6 +240,7 @@ public class TransactionState implements Writable {
         this.transactionStatus = TransactionStatus.PREPARE;
         this.sourceType = LoadJobSourceType.FRONTEND;
         this.prepareTime = -1;
+        this.preCommitTime = -1;
         this.commitTime = -1;
         this.finishTime = -1;
         this.reason = "";
@@ -260,6 +263,7 @@ public class TransactionState implements Writable {
         this.transactionStatus = TransactionStatus.PREPARE;
         this.sourceType = sourceType;
         this.prepareTime = -1;
+        this.preCommitTime = -1;
         this.commitTime = -1;
         this.finishTime = -1;
         this.reason = "";
@@ -334,6 +338,10 @@ public class TransactionState implements Writable {
         return prepareTime;
     }
 
+    public long getPreCommitTime() {
+        return preCommitTime;
+    }
+
     public long getCommitTime() {
         return commitTime;
     }
@@ -463,6 +471,10 @@ public class TransactionState implements Writable {
         this.prepareTime = prepareTime;
     }
 
+    public void setPreCommitTime(long preCommitTime) {
+        this.preCommitTime = preCommitTime;
+    }
+
     public void setCommitTime(long commitTime) {
         this.commitTime = commitTime;
     }
@@ -528,7 +540,8 @@ public class TransactionState implements Writable {
 
     // return true if txn is running but timeout
     public boolean isTimeout(long currentMillis) {
-        return transactionStatus == TransactionStatus.PREPARE && currentMillis 
- prepareTime > timeoutMs;
+        return (transactionStatus == TransactionStatus.PREPARE && 
currentMillis - prepareTime > timeoutMs) ||
+                (transactionStatus == TransactionStatus.PRECOMMITTED && 
currentMillis - preCommitTime > preCommittedTimeoutMs);
     }
 
     public synchronized void addTableIndexes(OlapTable table) {
@@ -606,6 +619,7 @@ public class TransactionState implements Writable {
         out.writeInt(transactionStatus.value());
         out.writeInt(sourceType.value());
         out.writeLong(prepareTime);
+        out.writeLong(preCommitTime);
         out.writeLong(commitTime);
         out.writeLong(finishTime);
         Text.writeString(out, reason);
@@ -663,6 +677,9 @@ public class TransactionState implements Writable {
         transactionStatus = TransactionStatus.valueOf(in.readInt());
         sourceType = LoadJobSourceType.valueOf(in.readInt());
         prepareTime = in.readLong();
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_107) {
+            preCommitTime = in.readLong();
+        }
         commitTime = in.readLong();
         finishTime = in.readLong();
         reason = Text.readString(in);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionStatus.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionStatus.java
index efbe0b1..31287d1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionStatus.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionStatus.java
@@ -22,7 +22,8 @@ public enum TransactionStatus {
     PREPARE(1),
     COMMITTED(2),
     VISIBLE(3),
-    ABORTED(4);
+    ABORTED(4),
+    PRECOMMITTED(5);
     
     private final int flag;
     
@@ -46,6 +47,8 @@ public enum TransactionStatus {
                 return VISIBLE;
             case 4:
                 return ABORTED;
+            case 5:
+                return PRECOMMITTED;
             default:
                 return null;
         }
@@ -68,6 +71,8 @@ public enum TransactionStatus {
                 return "VISIBLE";
             case ABORTED:
                 return "ABORTED";
+            case PRECOMMITTED:
+                return "PRECOMMITTED";
             default:
                 return "UNKNOWN";
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
index 93ad565..dc4b372 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java
@@ -51,7 +51,7 @@ public class TransactionStateTest {
     @Test
     public void testSerDe() throws IOException {
         MetaContext metaContext = new MetaContext();
-        metaContext.setMetaVersion(FeMetaVersion.VERSION_83);
+        metaContext.setMetaVersion(FeMetaVersion.VERSION_107);
         metaContext.setThreadLocalInfo();
 
         // 1. Write objects to file
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 2818196..a6db982 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -656,6 +656,23 @@ struct TLoadTxnCommitResult {
     1: required Status.TStatus status
 }
 
+struct TLoadTxn2PCRequest {
+    1: optional string cluster
+    2: required string user
+    3: required string passwd
+    4: optional string db
+    5: optional string user_ip
+    6: optional i64 txnId
+    7: optional string operation
+    8: optional i64 auth_code
+    9: optional string auth_code_uuid
+    10: optional i64 thrift_rpc_timeout_ms
+}
+
+struct TLoadTxn2PCResult {
+    1: required Status.TStatus status
+}
+
 struct TLoadTxnRollbackRequest {
     1: optional string cluster
     2: required string user
@@ -749,6 +766,8 @@ service FrontendService {
     TFeResult updateExportTaskStatus(1: TUpdateExportTaskStatusRequest request)
 
     TLoadTxnBeginResult loadTxnBegin(1: TLoadTxnBeginRequest request)
+    TLoadTxnCommitResult loadTxnPreCommit(1: TLoadTxnCommitRequest request)
+    TLoadTxn2PCResult loadTxn2PC(1: TLoadTxn2PCRequest request)
     TLoadTxnCommitResult loadTxnCommit(1: TLoadTxnCommitRequest request)
     TLoadTxnRollbackResult loadTxnRollback(1: TLoadTxnRollbackRequest request)
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to