This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new a6bf8c1 [Feature](Transaction) Support two phase commit (2PC) for stream load (#7473) a6bf8c1 is described below commit a6bf8c13eb2e2ba05b866db02f872df1ac775638 Author: weizuo93 <wei...@apache.org> AuthorDate: Wed Feb 16 11:55:04 2022 +0800 [Feature](Transaction) Support two phase commit (2PC) for stream load (#7473) The two phase batch commit means: During Stream load, after data is written, the message will be returned to the client, the data is invisible at this point and the transaction status is PRECOMMITTED. The data will be visible only after COMMIT is triggered by client. 1. User can invoke the following interface to trigger commit operations for transaction: curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:commit" \ http://fe_host:http_port/api/{db}/_stream_load_2pc or curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:commit" \ http://be_host:webserver_port/api/{db}/_stream_load_2pc 2.User can invoke the following interface to trigger abort operations for transaction: curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:abort" \ http://fe_host:http_port/api/{db}/_stream_load_2pc or curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:abort" \ http://be_host:webserver_port/api/{db}/_stream_load_2pc --- be/src/common/config.h | 1 + be/src/http/CMakeLists.txt | 1 + be/src/http/action/stream_load.cpp | 21 +- be/src/http/action/stream_load_2pc.cpp | 102 ++++++++ .../action/stream_load_2pc.h} | 26 +- be/src/http/http_common.h | 4 + be/src/runtime/stream_load/stream_load_context.cpp | 4 + be/src/runtime/stream_load/stream_load_context.h | 4 + .../runtime/stream_load/stream_load_executor.cpp | 70 +++++- be/src/runtime/stream_load/stream_load_executor.h | 11 + be/src/service/http_service.cpp | 5 + .../load-data/stream-load-manual.md | 22 ++ .../load-data/stream-load-manual.md | 22 ++ .../org/apache/doris/clone/TabletScheduler.java | 15 ++ .../main/java/org/apache/doris/common/Config.java | 6 + .../org/apache/doris/common/FeMetaVersion.java | 4 +- .../doris/common/LabelAlreadyUsedException.java | 13 +- .../org/apache/doris/httpv2/rest/LoadAction.java | 58 +++++ .../doris/httpv2/rest/RestBaseController.java | 2 + .../apache/doris/service/FrontendServiceImpl.java | 131 ++++++++++ .../doris/transaction/DatabaseTransactionMgr.java | 273 ++++++++++++++++++--- .../doris/transaction/GlobalTransactionMgr.java | 64 ++++- .../doris/transaction/PartitionCommitInfo.java | 12 + .../apache/doris/transaction/TransactionState.java | 19 +- .../doris/transaction/TransactionStatus.java | 7 +- .../doris/transaction/TransactionStateTest.java | 2 +- gensrc/thrift/FrontendService.thrift | 19 ++ 27 files changed, 844 insertions(+), 74 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index cc34489..c5eed39 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -354,6 +354,7 @@ CONF_mInt32(stream_load_record_batch_size, "50"); CONF_Int32(stream_load_record_expire_time_secs, "28800"); // time interval to clean expired stream load records CONF_mInt64(clean_stream_load_record_interval_secs, "1800"); +CONF_mBool(disable_stream_load_2pc, "true"); // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. // You may need to lower the speed when the sink receiver bes are too busy. diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt index e004b01..8ca052d 100644 --- a/be/src/http/CMakeLists.txt +++ b/be/src/http/CMakeLists.txt @@ -47,6 +47,7 @@ add_library(Webserver STATIC action/pprof_actions.cpp action/metrics_action.cpp action/stream_load.cpp + action/stream_load_2pc.cpp action/meta_action.cpp action/compaction_action.cpp action/config_action.cpp diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 19bcff2..a9da838 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -193,11 +193,16 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) { // wait stream load finish RETURN_IF_ERROR(ctx->future.get()); - // If put file success we need commit this load - int64_t commit_and_publish_start_time = MonotonicNanos(); - RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx)); - ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; - + if (ctx->two_phase_commit) { + int64_t pre_commit_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx)); + ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; + } else { + // If put file success we need commit this load + int64_t commit_and_publish_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx)); + ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; + } return Status::OK(); } @@ -218,6 +223,8 @@ int StreamLoadAction::on_header(HttpRequest* req) { ctx->label = generate_uuid_string(); } + ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false; + LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db << ", tbl=" << ctx->table; @@ -266,6 +273,10 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct return Status::InternalError(ss.str()); } + if (ctx->two_phase_commit && config::disable_stream_load_2pc) { + return Status::InternalError("Two phase commit (2PC) for stream load was disabled"); + } + // check content length ctx->body_bytes = 0; size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024; diff --git a/be/src/http/action/stream_load_2pc.cpp b/be/src/http/action/stream_load_2pc.cpp new file mode 100644 index 0000000..511bce9 --- /dev/null +++ b/be/src/http/action/stream_load_2pc.cpp @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/stream_load_2pc.h" + +#include <rapidjson/prettywriter.h> +#include <rapidjson/stringbuffer.h> + +#include "common/status.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "http/utils.h" +#include "runtime/stream_load/stream_load_context.h" +#include "runtime/stream_load/stream_load_executor.h" +#include "util/json_util.h" + +namespace doris { + +const static std::string HEADER_JSON = "application/json"; + +StreamLoad2PCAction::StreamLoad2PCAction(ExecEnv* exec_env) : _exec_env(exec_env) {} + +void StreamLoad2PCAction::handle(HttpRequest* req) { + Status status = Status::OK(); + std::string status_result; + + if (config::disable_stream_load_2pc) { + status = Status::InternalError("Two phase commit (2PC) for stream load was disabled"); + status_result = to_json(status); + HttpChannel::send_reply(req, HttpStatus::OK, status_result); + return; + } + + StreamLoadContext* ctx = new StreamLoadContext(_exec_env); + ctx->ref(); + req->set_handler_ctx(ctx); + ctx->db = req->param(HTTP_DB_KEY); + std::string req_txn_id = req->header(HTTP_TXN_ID_KEY); + try { + ctx->txn_id = std::stoull(req_txn_id); + } catch (const std::exception& e) { + status = Status::InternalError("convert txn_id [" + req_txn_id + "] failed"); + status_result = to_json(status); + HttpChannel::send_reply(req, HttpStatus::OK, status_result); + return; + } + ctx->txn_operation = req->header(HTTP_TXN_OPERATION_KEY); + if (ctx->txn_operation.compare("commit") != 0 && ctx->txn_operation.compare("abort") != 0) { + status = Status::InternalError("transaction operation should be \'commit\' or \'abort\'"); + status_result = to_json(status); + HttpChannel::send_reply(req, HttpStatus::OK, status_result); + return; + } + + if (!parse_basic_auth(*req, &ctx->auth)) { + LOG(WARNING) << "parse basic authorization failed."; + status = Status::InternalError("no valid Basic authorization"); + } + + status =_exec_env->stream_load_executor()->operate_txn_2pc(ctx); + + if (!status.ok()) { + status_result = to_json(status); + } else { + status_result = get_success_info(req_txn_id, ctx->txn_operation); + } + HttpChannel::send_reply(req, HttpStatus::OK, status_result); +} + +std::string StreamLoad2PCAction::get_success_info(const std::string txn_id, const std::string txn_operation) { + rapidjson::StringBuffer s; + rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s); + + writer.StartObject(); + // status + writer.Key("status"); + writer.String("Success"); + // msg + std::string msg = "transaction [" + txn_id + "] " + txn_operation + " successfully."; + writer.Key("msg"); + writer.String(msg.c_str()); + writer.EndObject(); + return s.GetString(); +} + +} // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/http/action/stream_load_2pc.h similarity index 56% copy from be/src/runtime/stream_load/stream_load_executor.h copy to be/src/http/action/stream_load_2pc.h index 4ba791c..7043b52 100644 --- a/be/src/runtime/stream_load/stream_load_executor.h +++ b/be/src/http/action/stream_load_2pc.h @@ -17,33 +17,21 @@ #pragma once -#include <memory> +#include <string> +#include "http/http_handler.h" namespace doris { class ExecEnv; -class StreamLoadContext; -class Status; -class TTxnCommitAttachment; -class StreamLoadPipe; -class StreamLoadExecutor { +class StreamLoad2PCAction : public HttpHandler { public: - StreamLoadExecutor(ExecEnv* exec_env) : _exec_env(exec_env) {} + StreamLoad2PCAction(ExecEnv* exec_env); - Status begin_txn(StreamLoadContext* ctx); + virtual ~StreamLoad2PCAction(){}; - Status commit_txn(StreamLoadContext* ctx); - - void rollback_txn(StreamLoadContext* ctx); - - Status execute_plan_fragment(StreamLoadContext* ctx); - - Status execute_plan_fragment(StreamLoadContext* ctx, std::shared_ptr<StreamLoadPipe> pipe); -private: - // collect the load statistics from context and set them to stat - // return true if stat is set, otherwise, return false - bool collect_load_stat(StreamLoadContext* ctx, TTxnCommitAttachment* attachment); + void handle(HttpRequest* req) override; + std::string get_success_info(const std::string txn_id, const std::string txn_operation); private: ExecEnv* _exec_env; diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 4ade62f..a35b241 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -54,4 +54,8 @@ static const std::string HTTP_SEND_BATCH_PARALLELISM = "send_batch_parallelism"; static const std::string HTTP_100_CONTINUE = "100-continue"; +static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; +static const std::string HTTP_TXN_ID_KEY = "txn_id"; +static const std::string HTTP_TXN_OPERATION_KEY = "txn_operation"; + } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index a7ab92f..172b32f 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -34,6 +34,10 @@ std::string StreamLoadContext::to_json() const { writer.Key("Label"); writer.String(label.c_str()); + writer.Key("TwoPhaseCommit"); + std::string need_two_phase_commit = two_phase_commit ? "true" : "false"; + writer.String(need_two_phase_commit.c_str()); + // status writer.Key("Status"); switch (status.code()) { diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index cd3b458..ac7338b 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -134,6 +134,7 @@ public: double max_filter_ratio = 0.0; int32_t timeout_second = -1; AuthInfo auth; + bool two_phase_commit = false; // the following members control the max progress of a consuming // process. if any of them reach, the consuming will finish. @@ -152,6 +153,8 @@ public: int64_t txn_id = -1; + std::string txn_operation = ""; + bool need_rollback = false; // when use_streaming is true, we use stream_pipe to send source data, // otherwise we save source data to file first, then process it. @@ -180,6 +183,7 @@ public: int64_t begin_txn_cost_nanos = 0; int64_t stream_load_put_cost_nanos = 0; int64_t commit_and_publish_txn_cost_nanos = 0; + int64_t pre_commit_txn_cost_nanos = 0; int64_t read_data_cost_nanos = 0; int64_t write_data_cost_nanos = 0; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 55f590c..7fa2245 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -19,10 +19,6 @@ #include "common/status.h" #include "common/utils.h" -#include "gen_cpp/FrontendService.h" -#include "gen_cpp/FrontendService_types.h" -#include "gen_cpp/HeartbeatService_types.h" -#include "gen_cpp/Types_types.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -174,10 +170,65 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { return Status::OK(); } -Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { - DorisMetrics::instance()->txn_commit_request_total->increment(1); +Status StreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) { TLoadTxnCommitRequest request; + get_commit_request(ctx, request); + + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TLoadTxnCommitResult result; +#ifndef BE_TEST + RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->loadTxnPreCommit(result, request); + }, + config::txn_commit_rpc_timeout_ms)); +#else + result = k_stream_load_commit_result; +#endif + // Return if this transaction is precommitted successful; otherwise, we need try + // to + // rollback this transaction + Status status(result.status); + if (!status.ok()) { + LOG(WARNING) << "precommit transaction failed, errmsg=" << status.get_error_msg() + << ctx->brief(); + if (status.code() == TStatusCode::PUBLISH_TIMEOUT) { + ctx->need_rollback = false; + } + return status; + } + // precommit success, set need_rollback to false + ctx->need_rollback = false; + return Status::OK(); +} + +Status StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { + TLoadTxn2PCRequest request; + set_request_auth(&request, ctx->auth); + request.__set_db(ctx->db); + request.__set_txnId(ctx->txn_id); + request.__set_operation(ctx->txn_operation); + request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms); + + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + TLoadTxn2PCResult result; + RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->loadTxn2PC(result, request); + }, + config::txn_commit_rpc_timeout_ms)); + Status status(result.status); + if (!status.ok()) { + LOG(WARNING) << "2PC commit transaction failed, errmsg=" << status.get_error_msg(); + return status; + } + return Status::OK(); +} + +void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx, TLoadTxnCommitRequest& request) { set_request_auth(&request, ctx->auth); request.db = ctx->db; if (ctx->db_id > 0) { @@ -197,6 +248,13 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { request.txnCommitAttachment = std::move(attachment); request.__isset.txnCommitAttachment = true; } +} + +Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { + DorisMetrics::instance()->txn_commit_request_total->increment(1); + + TLoadTxnCommitRequest request; + get_commit_request(ctx, request); TNetworkAddress master_addr = _exec_env->master_info()->network_address; TLoadTxnCommitResult result; diff --git a/be/src/runtime/stream_load/stream_load_executor.h b/be/src/runtime/stream_load/stream_load_executor.h index 4ba791c..8adcde1 100644 --- a/be/src/runtime/stream_load/stream_load_executor.h +++ b/be/src/runtime/stream_load/stream_load_executor.h @@ -19,6 +19,11 @@ #include <memory> +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/FrontendService_types.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "gen_cpp/Types_types.h" + namespace doris { class ExecEnv; @@ -33,8 +38,14 @@ public: Status begin_txn(StreamLoadContext* ctx); + Status pre_commit_txn(StreamLoadContext* ctx); + + Status operate_txn_2pc(StreamLoadContext* ctx); + Status commit_txn(StreamLoadContext* ctx); + void get_commit_request(StreamLoadContext* ctx, TLoadTxnCommitRequest& request); + void rollback_txn(StreamLoadContext* ctx); Status execute_plan_fragment(StreamLoadContext* ctx); diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index e19427f..be7519f 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -33,6 +33,7 @@ #include "http/action/restore_tablet_action.h" #include "http/action/snapshot_action.h" #include "http/action/stream_load.h" +#include "http/action/stream_load_2pc.h" #include "http/action/tablet_migration_action.h" #include "http/action/tablets_distribution_action.h" #include "http/action/tablets_info_action.h" @@ -62,6 +63,10 @@ Status HttpService::start() { StreamLoadAction* streamload_action = _pool.add(new StreamLoadAction(_env)); _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load", streamload_action); + StreamLoad2PCAction* streamload_2pc_action = _pool.add(new StreamLoad2PCAction(_env)); + _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/_stream_load_2pc", + streamload_2pc_action); + // register download action std::vector<std::string> allow_paths; diff --git a/docs/en/administrator-guide/load-data/stream-load-manual.md b/docs/en/administrator-guide/load-data/stream-load-manual.md index ab4bda2..b91b339 100644 --- a/docs/en/administrator-guide/load-data/stream-load-manual.md +++ b/docs/en/administrator-guide/load-data/stream-load-manual.md @@ -169,6 +169,28 @@ The number of rows in the original file = `dpp.abnorm.ALL + dpp.norm.ALL` + merge\_type The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND is the default value, which means that all this batch of data needs to be appended to the existing data. DELETE means to delete all rows with the same key as this batch of data. MERGE semantics Need to be used in conjunction with the delete condition, which means that the data that meets the delete condition is processed according to DELETE semantics and the rest is processed according to APPEND semantics ++ two\_phase\_commit + + Stream load supports the two-phase commit mode。The mode could be enabled by declaring ```two_phase_commit=true``` in http header. This mode is disabled by default. + the two-phase commit mode means:During Stream load, after data is written, the message will be returned to the client, the data is invisible at this point and the transaction status is PRECOMMITTED. The data will be visible only after COMMIT is triggered by client。 + + 1. User can invoke the following interface to trigger commit operations for transaction: + ``` + curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/_stream_load_2pc + ``` + or + ``` + curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:commit" http://be_host:webserver_port/api/{db}/_stream_load_2pc + ``` + + 2. User can invoke the following interface to trigger abort operations for transaction: + ``` + curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:abort" http://fe_host:http_port/api/{db}/_stream_load_2pc + ``` + or + ``` + curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:abort" http://be_host:webserver_port/api/{db}/_stream_load_2pc + ``` ### Return results diff --git a/docs/zh-CN/administrator-guide/load-data/stream-load-manual.md b/docs/zh-CN/administrator-guide/load-data/stream-load-manual.md index 4bb0c08..a5ba62a 100644 --- a/docs/zh-CN/administrator-guide/load-data/stream-load-manual.md +++ b/docs/zh-CN/administrator-guide/load-data/stream-load-manual.md @@ -179,6 +179,28 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的 3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。 + merge\_type 数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理 + ++ two\_phase\_commit + + Stream load 导入可以开启两阶段事务提交模式。开启方式为在 HEADER 中声明 ```two_phase_commit=true``` 。默认的两阶段批量事务提交为关闭。 + 两阶段批量事务提交模式的意思是:Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。 + + 1. 用户可以调用如下接口对stream load事务触发commit操作: + ``` + curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/_stream_load_2pc + ``` + 或 + ``` + curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:commit" http://be_host:webserver_port/api/{db}/_stream_load_2pc + ``` + 2. 用户可以调用如下接口对stream load事务触发abort操作: + ``` + curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:abort" http://fe_host:http_port/api/{db}/_stream_load_2pc + ``` + 或 + ``` + curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:abort" http://be_host:webserver_port/api/{db}/_stream_load_2pc + ``` #### strict mode 与 source data 的导入关系 diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 39573e1..ae96bde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -63,6 +63,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.collect.Table; +import org.apache.doris.transaction.DatabaseTransactionMgr; +import org.apache.doris.transaction.TransactionState; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -526,6 +528,19 @@ public class TabletScheduler extends MasterDaemon { throw new SchedException(Status.UNRECOVERABLE, "table's state is not NORMAL"); } + if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE) { + try { + DatabaseTransactionMgr dbTransactionMgr = Catalog.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(db.getId()); + for (TransactionState transactionState : dbTransactionMgr.getPreCommittedTxnList()) { + if(transactionState.getTableIdList().contains(tbl.getId())) { + // If table releate to transaction with precommitted status, do not allow to do balance. + throw new SchedException(Status.UNRECOVERABLE, "There exists PRECOMMITTED transaction releated to table"); + } + } + } catch (AnalysisException e) { + } + } + if (statusPair.first != TabletStatus.VERSION_INCOMPLETE && (partition.getState() != PartitionState.NORMAL || tableState != OlapTableState.NORMAL) && tableState != OlapTableState.WAITING_STABLE) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index a5a55b0..1ebe53b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -563,6 +563,12 @@ public class Config extends ConfigBase { public static int stream_load_default_timeout_second = 600; // 600s /** + * Default stream load pre-commit status timeout + */ + @ConfField(mutable = true, masterOnly = true) + public static int stream_load_default_precommit_timeout_second = 3600; // 3600s + + /** * Max load timeout applicable to all type of load except for stream load */ @ConfField(mutable = true, masterOnly = true) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index e05ba86..6d5661e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -238,6 +238,8 @@ public final class FeMetaVersion { public static final int VERSION_105 = 105; // add ldap info public static final int VERSION_106 = 106; + // support stream load 2PC + public static final int VERSION_107 = 107; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_106; + public static final int VERSION_CURRENT = VERSION_107; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java index 6a5ff58..d723c88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java @@ -17,7 +17,7 @@ package org.apache.doris.common; -import org.apache.doris.transaction.TransactionStatus; +import org.apache.doris.transaction.TransactionState; import com.google.common.base.Preconditions; @@ -33,19 +33,22 @@ public class LabelAlreadyUsedException extends DdlException { super("Label [" + label + "] has already been used."); } - public LabelAlreadyUsedException(String label, TransactionStatus txnStatus) { - super("Label [" + label + "] has already been used."); - switch (txnStatus) { + public LabelAlreadyUsedException(TransactionState txn) { + super("Label [" + txn.getLabel() + "] has already been used, relate to txn [" + txn.getTransactionId() + "]"); + switch (txn.getTransactionStatus()) { case UNKNOWN: case PREPARE: jobStatus = "RUNNING"; break; + case PRECOMMITTED: + jobStatus = "PRECOMMITTED"; + break; case COMMITTED: case VISIBLE: jobStatus = "FINISHED"; break; default: - Preconditions.checkState(false, txnStatus); + Preconditions.checkState(false, txn.getTransactionStatus()); break; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index b674834..8cacb08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -73,6 +73,15 @@ public class LoadAction extends RestBaseController { return executeWithoutPassword(request, response, db, table); } + @RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT) + public Object streamLoad2PC(HttpServletRequest request, + HttpServletResponse response, + @PathVariable(value = DB_KEY) String db) { + this.isStreamLoad = true; + executeCheckPassword(request, response); + return executeStreamLoad2PC(request, db); + } + // Same as Multi load, to be compatible with http v1's response body, // we return error by using RestBaseResult. private Object executeWithoutPassword(HttpServletRequest request, @@ -154,4 +163,53 @@ public class LoadAction extends RestBaseController { return new RestBaseResult(e.getMessage()); } } + + private Object executeStreamLoad2PC(HttpServletRequest request, String db) { + try { + String dbName = db; + + final String clusterName = ConnectContext.get().getClusterName(); + if (Strings.isNullOrEmpty(clusterName)) { + return new RestBaseResult("No cluster selected."); + } + + if (Strings.isNullOrEmpty(dbName)) { + return new RestBaseResult("No database selected."); + } + + if (Strings.isNullOrEmpty(request.getHeader(TXN_ID_KEY))) { + return new RestBaseResult("No transaction id selected."); + } + + String txnOperation = request.getHeader(TXN_OPERATION_KEY); + if (Strings.isNullOrEmpty(txnOperation)) { + return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected."); + } + + // Choose a backend sequentially. + SystemInfoService.BeAvailablePredicate beAvailablePredicate = + new SystemInfoService.BeAvailablePredicate(false, false, true); + List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag( + 1, beAvailablePredicate, false, clusterName, null, null); + if (backendIds == null) { + return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); + } + + Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0)); + if (backend == null) { + return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG); + } + + TNetworkAddress redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort()); + + LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}", + redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation); + + RedirectView redirectView = redirectTo(request, redirectAddr); + return redirectView; + + } catch (Exception e) { + return new RestBaseResult(e.getMessage()); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java index db24c6b..e5988fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java @@ -49,6 +49,8 @@ public class RestBaseController extends BaseController { protected static final String DB_KEY = "db"; protected static final String TABLE_KEY = "table"; protected static final String LABEL_KEY = "label"; + protected static final String TXN_ID_KEY = "txn_id"; + protected static final String TXN_OPERATION_KEY = "txn_operation"; private static final Logger LOG = LogManager.getLogger(RestBaseController.class); public ActionAuthorizationInfo executeCheckPassword(HttpServletRequest request, diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ac8ee57..94ca3bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -76,6 +76,8 @@ import org.apache.doris.thrift.TIsMethodSupportedRequest; import org.apache.doris.thrift.TListPrivilegesResult; import org.apache.doris.thrift.TListTableStatusResult; import org.apache.doris.thrift.TLoadCheckRequest; +import org.apache.doris.thrift.TLoadTxn2PCRequest; +import org.apache.doris.thrift.TLoadTxn2PCResult; import org.apache.doris.thrift.TLoadTxnBeginRequest; import org.apache.doris.thrift.TLoadTxnBeginResult; import org.apache.doris.thrift.TLoadTxnCommitRequest; @@ -107,6 +109,7 @@ import org.apache.doris.thrift.TUpdateExportTaskStatusRequest; import org.apache.doris.thrift.TUpdateMiniEtlTaskStatusRequest; import org.apache.doris.thrift.TWaitingTxnStatusRequest; import org.apache.doris.thrift.TWaitingTxnStatusResult; +import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.TxnCoordinator; @@ -805,6 +808,134 @@ public class FrontendServiceImpl implements FrontendService.Iface { } @Override + public TLoadTxnCommitResult loadTxnPreCommit(TLoadTxnCommitRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("receive txn pre-commit request: {}, backend: {}", request, clientAddr); + + TLoadTxnCommitResult result = new TLoadTxnCommitResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + try { + loadTxnPreCommitImpl(request); + } catch (UserException e) { + LOG.warn("failed to pre-commit txn: {}: {}", request.getTxnId(), e.getMessage()); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + return result; + } + return result; + } + + private void loadTxnPreCommitImpl(TLoadTxnCommitRequest request) throws UserException { + String cluster = request.getCluster(); + if (Strings.isNullOrEmpty(cluster)) { + cluster = SystemInfoService.DEFAULT_CLUSTER; + } + + if (request.isSetAuthCode()) { + } else if (request.isSetAuthCodeUuid()) { + checkAuthCodeUuid(request.getDb(), request.getTxnId(), request.getAuthCodeUuid()); + } else { + checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), + request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); + } + + // get database + Catalog catalog = Catalog.getCurrentCatalog(); + String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb()); + Database db; + if (request.isSetDbId() && request.getDbId() > 0) { + db = catalog.getDbNullable(request.getDbId()); + } else { + db = catalog.getDbNullable(fullDbName); + } + if (db == null) { + String dbName = fullDbName; + if (Strings.isNullOrEmpty(request.getCluster())) { + dbName = request.getDb(); + } + throw new UserException("unknown database, database=" + dbName); + } + + long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000; + Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP); + Catalog.getCurrentGlobalTransactionMgr().preCommitTransaction2PC( + db, Lists.newArrayList(table), request.getTxnId(), + TabletCommitInfo.fromThrift(request.getCommitInfos()), + timeoutMs, TxnCommitAttachment.fromThrift(request.txnCommitAttachment)); + } + + @Override + public TLoadTxn2PCResult loadTxn2PC(TLoadTxn2PCRequest request) throws TException { + String clientAddr = getClientAddrAsString(); + LOG.debug("receive txn 2PC request: {}, backend: {}", request, clientAddr); + + TLoadTxn2PCResult result = new TLoadTxn2PCResult(); + TStatus status = new TStatus(TStatusCode.OK); + result.setStatus(status); + try { + loadTxn2PCImpl(request); + } catch (UserException e) { + LOG.warn("failed to {} txn {}: {}", request.getOperation(), request.getTxnId(), e.getMessage()); + status.setStatusCode(TStatusCode.ANALYSIS_ERROR); + status.addToErrorMsgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatusCode(TStatusCode.INTERNAL_ERROR); + status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); + return result; + } + return result; + } + + private void loadTxn2PCImpl(TLoadTxn2PCRequest request) throws UserException { + String cluster = request.getCluster(); + if (Strings.isNullOrEmpty(cluster)) { + cluster = SystemInfoService.DEFAULT_CLUSTER; + } + + String dbName = request.getDb(); + if (Strings.isNullOrEmpty(dbName)) { + throw new UserException("No database selected."); + } + + String fullDbName = ClusterNamespace.getFullName(cluster, dbName); + + // get database + Catalog catalog = Catalog.getCurrentCatalog(); + Database database = catalog.getDbNullable(fullDbName); + if (database == null) { + throw new UserException("unknown database, database=" + fullDbName); + } + + DatabaseTransactionMgr dbTransactionMgr = Catalog.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(database.getId()); + TransactionState transactionState = dbTransactionMgr.getTransactionState(request.getTxnId()); + if (transactionState == null) { + throw new UserException("transaction [" + request.getTxnId() + "] not found"); + } + List<Long> tableIdList = transactionState.getTableIdList(); + List<Table> tableList = database.getTablesOnIdOrderWithIgnoringWrongTableId(tableIdList); + for (Table table : tableList) { + // check auth + checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), + table.getName(), request.getUserIp(), PrivPredicate.LOAD); + } + + String txnOperation = request.getOperation().trim(); + if (txnOperation.equalsIgnoreCase("commit")) { + Catalog.getCurrentGlobalTransactionMgr().commitTransaction2PC(database, tableList, request.getTxnId(), 5000); + } else if (txnOperation.equalsIgnoreCase("abort")) { + Catalog.getCurrentGlobalTransactionMgr().abortTransaction2PC(database.getId(), request.getTxnId()); + } else { + throw new UserException("transaction operation should be \'commit\' or \'abort\'"); + } + } + + @Override public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws TException { String clientAddr = getClientAddrAsString(); LOG.debug("receive txn commit request: {}, backend: {}", request, clientAddr); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 87cd981..0876965 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -51,10 +51,7 @@ import org.apache.doris.persist.EditLog; import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; -import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.ClearTransactionTask; -import org.apache.doris.task.PublishVersionTask; -import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUniqueId; import com.google.common.annotations.VisibleForTesting; @@ -284,17 +281,18 @@ public class DatabaseTransactionMgr { notAbortedTxns.add(txn); } } - // there should be at most 1 txn in PREPARE/COMMITTED/VISIBLE status + // there should be at most 1 txn in PREPARE/PRECOMMITTED/COMMITTED/VISIBLE status Preconditions.checkState(notAbortedTxns.size() <= 1, notAbortedTxns); if (!notAbortedTxns.isEmpty()) { TransactionState notAbortedTxn = notAbortedTxns.get(0); - if (requestId != null && notAbortedTxn.getTransactionStatus() == TransactionStatus.PREPARE + if (requestId != null && (notAbortedTxn.getTransactionStatus() == TransactionStatus.PREPARE + || notAbortedTxn.getTransactionStatus() == TransactionStatus.PRECOMMITTED) && notAbortedTxn.getRequestId() != null && notAbortedTxn.getRequestId().equals(requestId)) { // this may be a retry request for same job, just return existing txn id. throw new DuplicatedRequestException(DebugUtil.printId(requestId), notAbortedTxn.getTransactionId(), ""); } - throw new LabelAlreadyUsedException(label, notAbortedTxn.getTransactionStatus()); + throw new LabelAlreadyUsedException(notAbortedTxn); } } @@ -342,19 +340,10 @@ public class DatabaseTransactionMgr { this.usedQuotaDataBytes = usedQuotaDataBytes; } - /** - * commit transaction process as follows: - * 1. validate whether `Load` is cancelled - * 2. validate whether `Table` is deleted - * 3. validate replicas consistency - * 4. update transaction state version - * 5. persistent transactionState - * 6. update nextVersion because of the failure of persistent transaction resulting in error version - */ - public void commitTransaction(List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos, + public void preCommitTransaction2PC(List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) throws UserException { - // 1. check status + // check status // the caller method already own db lock, we do not obtain db lock here Database db = catalog.getDbOrMetaException(dbId); TransactionState transactionState; @@ -372,12 +361,36 @@ public class DatabaseTransactionMgr { if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { LOG.debug("transaction is already visible: {}", transactionId); - return; + throw new TransactionCommitFailedException("transaction is already visible"); } + if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { LOG.debug("transaction is already committed: {}", transactionId); + throw new TransactionCommitFailedException("transaction is already committed"); + } + + if (transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED) { + LOG.debug("transaction is already pre-committed: {}", transactionId); return; } + + Set<Long> errorReplicaIds = Sets.newHashSet(); + Set<Long> totalInvolvedBackends = Sets.newHashSet(); + Map<Long, Set<Long>> tableToPartition = new HashMap<>(); + + checkCommitStatus(tableList, transactionState, tabletCommitInfos, txnCommitAttachment, errorReplicaIds, + tableToPartition, totalInvolvedBackends); + + unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, tableToPartition, totalInvolvedBackends, db); + LOG.info("transaction:[{}] successfully pre-committed", transactionState); + } + + private void checkCommitStatus(List<Table> tableList, TransactionState transactionState, + List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, + Set<Long> errorReplicaIds, Map<Long, Set<Long>> tableToPartition, + Set<Long> totalInvolvedBackends) throws UserException { + Database db = catalog.getDbOrMetaException(dbId); + // update transaction state extra if exists if (txnCommitAttachment != null) { transactionState.setTxnCommitAttachment(txnCommitAttachment); @@ -385,12 +398,11 @@ public class DatabaseTransactionMgr { TabletInvertedIndex tabletInvertedIndex = catalog.getTabletInvertedIndex(); Map<Long, Set<Long>> tabletToBackends = new HashMap<>(); - Map<Long, Set<Long>> tableToPartition = new HashMap<>(); Map<Long, Table> idToTable = new HashMap<>(); for (int i = 0; i < tableList.size(); i++) { idToTable.put(tableList.get(i).getId(), tableList.get(i)); } - // 2. validate potential exists problem: db->table->partition + // validate potential exists problem: db->table->partition // guarantee exist exception during a transaction // if index is dropped, it does not matter. // if table or partition is dropped during load, just ignore that tablet, @@ -433,8 +445,6 @@ public class DatabaseTransactionMgr { } tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId()); } - Set<Long> errorReplicaIds = Sets.newHashSet(); - Set<Long> totalInvolvedBackends = Sets.newHashSet(); for (long tableId : tableToPartition.keySet()) { OlapTable table = (OlapTable) db.getTableOrMetaException(tableId); for (Partition partition : table.getAllPartitions()) { @@ -511,9 +521,9 @@ public class DatabaseTransactionMgr { LOG.warn("Failed to commit txn [{}]. " + "Tablet [{}] success replica num is {} < quorum replica num {} " + "while error backends {}", - transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum, + transactionState.getTransactionId(), tablet.getId(), successReplicaNum, quorumReplicaNum, Joiner.on(",").join(errorBackendIdsForTablet)); - throw new TabletQuorumFailedException(transactionId, tablet.getId(), + throw new TabletQuorumFailedException(transactionState.getTransactionId(), tablet.getId(), successReplicaNum, quorumReplicaNum, errorBackendIdsForTablet); } @@ -521,6 +531,72 @@ public class DatabaseTransactionMgr { } } } + } + + /** + * commit transaction process as follows: + * 1. validate whether `Load` is cancelled + * 2. validate whether `Table` is deleted + * 3. validate replicas consistency + * 4. update transaction state version + * 5. persistent transactionState + * 6. update nextVersion because of the failure of persistent transaction resulting in error version + */ + public void commitTransaction(List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos, + TxnCommitAttachment txnCommitAttachment, Boolean is2PC) + throws UserException { + // check status + // the caller method already own db lock, we do not obtain db lock here + Database db = catalog.getDbOrMetaException(dbId); + TransactionState transactionState; + readLock(); + try { + transactionState = unprotectedGetTransactionState(transactionId); + } finally { + readUnlock(); + } + + if (transactionState == null) { + LOG.debug("transaction not found: {}", transactionId); + throw new TransactionCommitFailedException("transaction [" + transactionId + "] not found."); + } + + if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { + LOG.debug("transaction is already aborted: {}", transactionId); + throw new TransactionCommitFailedException("transaction [" + transactionId + + "] is already aborted. abort reason: " + transactionState.getReason()); + } + + if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + LOG.debug("transaction is already visible: {}", transactionId); + if (is2PC) { + throw new TransactionCommitFailedException("transaction [" + transactionId + + "] is already visible, not pre-committed."); + } + return; + } + if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { + LOG.debug("transaction is already committed: {}", transactionId); + if (is2PC) { + throw new TransactionCommitFailedException("transaction [" + transactionId + + "] is already committed, not pre-committed."); + } + return; + } + + if (is2PC && transactionState.getTransactionStatus() == TransactionStatus.PREPARE) { + LOG.debug("transaction is prepare, not pre-committed: {}", transactionId); + throw new TransactionCommitFailedException("transaction [" + transactionId + + "] is prepare, not pre-committed."); + } + + Set<Long> errorReplicaIds = Sets.newHashSet(); + Set<Long> totalInvolvedBackends = Sets.newHashSet(); + Map<Long, Set<Long>> tableToPartition = new HashMap<>(); + if (!is2PC) { + checkCommitStatus(tableList, transactionState, tabletCommitInfos, txnCommitAttachment, errorReplicaIds, + tableToPartition, totalInvolvedBackends); + } // before state transform transactionState.beforeStateTransform(TransactionStatus.COMMITTED); @@ -528,8 +604,11 @@ public class DatabaseTransactionMgr { boolean txnOperated = false; writeLock(); try { - unprotectedCommitTransaction(transactionState, errorReplicaIds, tableToPartition, totalInvolvedBackends, - db); + if (is2PC) { + unprotectedCommitTransaction2PC(transactionState, db); + } else { + unprotectedCommitTransaction(transactionState, errorReplicaIds, tableToPartition, totalInvolvedBackends, db); + } txnOperated = true; } finally { writeUnlock(); @@ -537,7 +616,7 @@ public class DatabaseTransactionMgr { transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated); } - // 6. update nextVersion because of the failure of persistent transaction resulting in error version + // update nextVersion because of the failure of persistent transaction resulting in error version updateCatalogAfterCommitted(transactionState, db); LOG.info("transaction:[{}] successfully committed", transactionState); } @@ -646,6 +725,19 @@ public class DatabaseTransactionMgr { } } + public List<TransactionState> getPreCommittedTxnList() { + readLock(); + try { + // only send task to preCommitted transaction + return idToRunningTransactionState.values().stream() + .filter(transactionState -> (transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED)) + .sorted(Comparator.comparing(TransactionState::getPreCommitTime)) + .collect(Collectors.toList()); + } finally { + readUnlock(); + } + } + public List<TransactionState> getCommittedTxnList() { readLock(); try { @@ -837,6 +929,35 @@ public class DatabaseTransactionMgr { LOG.info("finish transaction {} successfully", transactionState); } + protected void unprotectedPreCommitTransaction2PC(TransactionState transactionState, Set<Long> errorReplicaIds, + Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends, + Database db) { + // transaction state is modified during check if the transaction could committed + if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) { + return; + } + // update transaction state version + transactionState.setPreCommitTime(System.currentTimeMillis()); + transactionState.setTransactionStatus(TransactionStatus.PRECOMMITTED); + transactionState.setErrorReplicas(errorReplicaIds); + for (long tableId : tableToPartition.keySet()) { + TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId); + for (long partitionId : tableToPartition.get(tableId)) { + PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo(partitionId,-1, -1, -1); + tableCommitInfo.addPartitionCommitInfo(partitionCommitInfo); + } + transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); + } + // persist transactionState + unprotectUpsertTransactionState(transactionState, false); + + // add publish version tasks. set task to null as a placeholder. + // tasks will be created when publishing version. + for (long backendId : totalInvolvedBackends) { + transactionState.addPublishVersionTask(backendId, null); + } + } + protected void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds, Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends, Database db) { @@ -870,6 +991,52 @@ public class DatabaseTransactionMgr { } } + protected void unprotectedCommitTransaction2PC(TransactionState transactionState, Database db) { + // transaction state is modified during check if the transaction could committed + if (transactionState.getTransactionStatus() != TransactionStatus.PRECOMMITTED) { + LOG.warn("Unknow exception. state of transaction [{}] changed, failed to commit transaction", + transactionState.getTransactionId()); + return; + } + // update transaction state version + transactionState.setCommitTime(System.currentTimeMillis()); + transactionState.setTransactionStatus(TransactionStatus.COMMITTED); + + Iterator<TableCommitInfo> tableCommitInfoIterator = transactionState.getIdToTableCommitInfos().values().iterator(); + while (tableCommitInfoIterator.hasNext()) { + TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next(); + long tableId = tableCommitInfo.getTableId(); + OlapTable table = (OlapTable) db.getTableNullable(tableId); + // table maybe dropped between commit and publish, ignore this error + if (table == null) { + tableCommitInfoIterator.remove(); + LOG.warn("table {} is dropped, skip and remove it from transaction state {}", + tableId, + transactionState); + continue; + } + Iterator<PartitionCommitInfo> partitionCommitInfoIterator = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator(); + while (partitionCommitInfoIterator.hasNext()) { + PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next(); + long partitionId = partitionCommitInfo.getPartitionId(); + Partition partition = table.getPartition(partitionId); + // partition maybe dropped between commit and publish version, ignore this error + if (partition == null) { + partitionCommitInfoIterator.remove(); + LOG.warn("partition {} is dropped, skip and remove it from transaction state {}", + partitionId, + transactionState); + continue; + } + partitionCommitInfo.setVersion(partition.getNextVersion()); + partitionCommitInfo.setVersionHash(partition.getNextVersionHash()); + partitionCommitInfo.setVersionTime(System.currentTimeMillis()); + } + } + // persist transactionState + editLog.logInsertTransactionState(transactionState); + } + // for add/update/delete TransactionState protected void unprotectUpsertTransactionState(TransactionState transactionState, boolean isReplay) { // if this is a replay operation, we should not log it @@ -992,27 +1159,65 @@ public class DatabaseTransactionMgr { LOG.info("abort transaction: {} successfully", transactionState); } + public void abortTransaction2PC(long transactionId) throws UserException { + LOG.info("begin to abort txn {}", transactionId); + if (transactionId < 0) { + LOG.info("transaction id is {}, less than 0, maybe this is an old type load job, ignore abort operation", transactionId); + return; + } + TransactionState transactionState; + readLock(); + try { + transactionState = unprotectedGetTransactionState(transactionId); + } finally { + readUnlock(); + } + + if (transactionState == null) { + throw new TransactionNotFoundException("transaction [" + transactionId + "] not found"); + } + + // before state transform + transactionState.beforeStateTransform(TransactionStatus.ABORTED); + boolean txnOperated = false; + writeLock(); + try { + txnOperated = unprotectAbortTransaction(transactionId, "User Abort"); + } finally { + writeUnlock(); + transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, "User Abort"); + } + + // send clear txn task to BE to clear the transactions on BE. + // This is because parts of a txn may succeed in some BE, and these parts of txn should be cleared + // explicitly, or it will be remained on BE forever + // (However the report process will do the diff and send clear txn tasks to BE, but that is our + // last defense) + if (txnOperated && transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { + clearBackendTransactions(transactionState); + } + LOG.info("abort transaction: {} successfully", transactionState); + } + private boolean unprotectAbortTransaction(long transactionId, String reason) throws UserException { TransactionState transactionState = unprotectedGetTransactionState(transactionId); if (transactionState == null) { - throw new TransactionNotFoundException("transaction not found", transactionId); + throw new TransactionNotFoundException("transaction [" + transactionId + "] not found."); } if (transactionState.getTransactionStatus() == TransactionStatus.ABORTED) { - return false; + throw new TransactionNotFoundException("transaction [" + transactionId + "] is already aborted, " + + "abort reason: " + transactionState.getReason()); } if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED || transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - throw new UserException("transaction's state is already " - + transactionState.getTransactionStatus() + ", could not abort"); + throw new UserException("transaction [" + transactionId + "] is already " + + transactionState.getTransactionStatus() + ", could not abort."); } transactionState.setFinishTime(System.currentTimeMillis()); transactionState.setReason(reason); transactionState.setTransactionStatus(TransactionStatus.ABORTED); unprotectUpsertTransactionState(transactionState, false); - for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) { - AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature()); - } return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 6aedf62..2a8975e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -173,6 +173,32 @@ public class GlobalTransactionMgr implements Writable { } } + public void preCommitTransaction2PC(Database db, List<Table> tableList, long transactionId, + List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis, + TxnCommitAttachment txnCommitAttachment) + throws UserException { + if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) { + throw new UserException("get tableList write lock timeout, tableList=(" + StringUtils.join(tableList, ",") + ")"); + } + try { + preCommitTransaction2PC(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment); + } finally { + MetaLockUtils.writeUnlockTables(tableList); + } + } + + public void preCommitTransaction2PC(long dbId, List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos, + TxnCommitAttachment txnCommitAttachment) + throws UserException { + if (Config.disable_load_job) { + throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented"); + } + + LOG.debug("try to pre-commit transaction: {}", transactionId); + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); + dbTransactionMgr.preCommitTransaction2PC(tableList, transactionId, tabletCommitInfos, txnCommitAttachment); + } + public void commitTransaction(long dbId, List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos) throws UserException { commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, null); @@ -196,7 +222,17 @@ public class GlobalTransactionMgr implements Writable { LOG.debug("try to commit transaction: {}", transactionId); DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); - dbTransactionMgr.commitTransaction(tableList, transactionId, tabletCommitInfos, txnCommitAttachment); + dbTransactionMgr.commitTransaction(tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false); + } + + private void commitTransaction2PC(long dbId, long transactionId) + throws UserException { + if (Config.disable_load_job) { + throw new TransactionCommitFailedException("disable_load_job is set to true, all load jobs are prevented"); + } + + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); + dbTransactionMgr.commitTransaction(null, transactionId, null, null, true); } public boolean commitAndPublishTransaction(Database db, List<Table> tableList, long transactionId, @@ -230,6 +266,23 @@ public class GlobalTransactionMgr implements Writable { return dbTransactionMgr.publishTransaction(db, transactionId, publishTimeoutMillis); } + public void commitTransaction2PC(Database db, List<Table> tableList, long transactionId, long timeoutMillis) + throws UserException { + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + if (!MetaLockUtils.tryWriteLockTablesOrMetaException(tableList, timeoutMillis, TimeUnit.MILLISECONDS)) { + throw new UserException("get tableList write lock timeout, tableList=(" + StringUtils.join(tableList, ",") + ")"); + } + try { + commitTransaction2PC(db.getId(), transactionId); + } finally { + MetaLockUtils.writeUnlockTables(tableList); + } + stopWatch.stop(); + LOG.info("stream load tasks are committed successfully. txns: {}. time cost: {} ms." + + " data will be visable later.", transactionId, stopWatch.getTime()); + } + public void abortTransaction(long dbId, long transactionId, String reason) throws UserException { abortTransaction(dbId, transactionId, reason, null); } @@ -245,6 +298,11 @@ public class GlobalTransactionMgr implements Writable { dbTransactionMgr.abortTransaction(label, reason); } + public void abortTransaction2PC(Long dbId, long transactionId) throws UserException { + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); + dbTransactionMgr.abortTransaction2PC(transactionId); + } + /* * get all txns which is ready to publish * a ready-to-publish txn's partition's visible version should be ONE less than txn's commit version. @@ -498,6 +556,10 @@ public class GlobalTransactionMgr implements Writable { for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) { try { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first); + TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second); + if (transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED) { + continue; + } dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE is down", null); } catch (UserException e) { LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java index b88d3e6..08064c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PartitionCommitInfo.java @@ -85,6 +85,18 @@ public class PartitionCommitInfo implements Writable { public long getVersionHash() { return versionHash; } + + public void setVersion(long version) { + this.version = version; + } + + public void setVersionTime(long versionTime) { + this.versionTime = versionTime; + } + + public void setVersionHash(long versionHash) { + this.versionHash = versionHash; + } @Override public String toString() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 918a962..4b7309e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -183,6 +183,7 @@ public class TransactionState implements Writable { private TransactionStatus transactionStatus; private LoadJobSourceType sourceType; private long prepareTime; + private long preCommitTime; private long commitTime; private long finishTime; private String reason = ""; @@ -208,6 +209,7 @@ public class TransactionState implements Writable { // 3. in afterStateTransform(), callback object can not be found, so the write lock can not be released. private TxnStateChangeCallback callback = null; private long timeoutMs = Config.stream_load_default_timeout_second; + private long preCommittedTimeoutMs = Config.stream_load_default_precommit_timeout_second * 1000; private String authCode = ""; // is set to true, we will double the publish timeout @@ -238,6 +240,7 @@ public class TransactionState implements Writable { this.transactionStatus = TransactionStatus.PREPARE; this.sourceType = LoadJobSourceType.FRONTEND; this.prepareTime = -1; + this.preCommitTime = -1; this.commitTime = -1; this.finishTime = -1; this.reason = ""; @@ -260,6 +263,7 @@ public class TransactionState implements Writable { this.transactionStatus = TransactionStatus.PREPARE; this.sourceType = sourceType; this.prepareTime = -1; + this.preCommitTime = -1; this.commitTime = -1; this.finishTime = -1; this.reason = ""; @@ -334,6 +338,10 @@ public class TransactionState implements Writable { return prepareTime; } + public long getPreCommitTime() { + return preCommitTime; + } + public long getCommitTime() { return commitTime; } @@ -463,6 +471,10 @@ public class TransactionState implements Writable { this.prepareTime = prepareTime; } + public void setPreCommitTime(long preCommitTime) { + this.preCommitTime = preCommitTime; + } + public void setCommitTime(long commitTime) { this.commitTime = commitTime; } @@ -528,7 +540,8 @@ public class TransactionState implements Writable { // return true if txn is running but timeout public boolean isTimeout(long currentMillis) { - return transactionStatus == TransactionStatus.PREPARE && currentMillis - prepareTime > timeoutMs; + return (transactionStatus == TransactionStatus.PREPARE && currentMillis - prepareTime > timeoutMs) || + (transactionStatus == TransactionStatus.PRECOMMITTED && currentMillis - preCommitTime > preCommittedTimeoutMs); } public synchronized void addTableIndexes(OlapTable table) { @@ -606,6 +619,7 @@ public class TransactionState implements Writable { out.writeInt(transactionStatus.value()); out.writeInt(sourceType.value()); out.writeLong(prepareTime); + out.writeLong(preCommitTime); out.writeLong(commitTime); out.writeLong(finishTime); Text.writeString(out, reason); @@ -663,6 +677,9 @@ public class TransactionState implements Writable { transactionStatus = TransactionStatus.valueOf(in.readInt()); sourceType = LoadJobSourceType.valueOf(in.readInt()); prepareTime = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_107) { + preCommitTime = in.readLong(); + } commitTime = in.readLong(); finishTime = in.readLong(); reason = Text.readString(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionStatus.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionStatus.java index efbe0b1..31287d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionStatus.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionStatus.java @@ -22,7 +22,8 @@ public enum TransactionStatus { PREPARE(1), COMMITTED(2), VISIBLE(3), - ABORTED(4); + ABORTED(4), + PRECOMMITTED(5); private final int flag; @@ -46,6 +47,8 @@ public enum TransactionStatus { return VISIBLE; case 4: return ABORTED; + case 5: + return PRECOMMITTED; default: return null; } @@ -68,6 +71,8 @@ public enum TransactionStatus { return "VISIBLE"; case ABORTED: return "ABORTED"; + case PRECOMMITTED: + return "PRECOMMITTED"; default: return "UNKNOWN"; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java index 93ad565..dc4b372 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java @@ -51,7 +51,7 @@ public class TransactionStateTest { @Test public void testSerDe() throws IOException { MetaContext metaContext = new MetaContext(); - metaContext.setMetaVersion(FeMetaVersion.VERSION_83); + metaContext.setMetaVersion(FeMetaVersion.VERSION_107); metaContext.setThreadLocalInfo(); // 1. Write objects to file diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 2818196..a6db982 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -656,6 +656,23 @@ struct TLoadTxnCommitResult { 1: required Status.TStatus status } +struct TLoadTxn2PCRequest { + 1: optional string cluster + 2: required string user + 3: required string passwd + 4: optional string db + 5: optional string user_ip + 6: optional i64 txnId + 7: optional string operation + 8: optional i64 auth_code + 9: optional string auth_code_uuid + 10: optional i64 thrift_rpc_timeout_ms +} + +struct TLoadTxn2PCResult { + 1: required Status.TStatus status +} + struct TLoadTxnRollbackRequest { 1: optional string cluster 2: required string user @@ -749,6 +766,8 @@ service FrontendService { TFeResult updateExportTaskStatus(1: TUpdateExportTaskStatusRequest request) TLoadTxnBeginResult loadTxnBegin(1: TLoadTxnBeginRequest request) + TLoadTxnCommitResult loadTxnPreCommit(1: TLoadTxnCommitRequest request) + TLoadTxn2PCResult loadTxn2PC(1: TLoadTxn2PCRequest request) TLoadTxnCommitResult loadTxnCommit(1: TLoadTxnCommitRequest request) TLoadTxnRollbackResult loadTxnRollback(1: TLoadTxnRollbackRequest request) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org