This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b9b028099d [enhancement](stream load pipe) using queryid or load id to
identify stream load pipe instead of fragment instance id (#17362)
b9b028099d is described below
commit b9b028099d00ec9d7c12515af66acf01192748e4
Author: yiguolei <[email protected]>
AuthorDate: Sat Mar 4 16:19:36 2023 +0800
[enhancement](stream load pipe) using queryid or load id to identify stream
load pipe instead of fragment instance id (#17362)
* [enhancement](stream load pipe) using queryid or load id to identify
stream load pipe instead of fragment instance id
NewLoadStreamMgr already has pipe and other info. Do not need save the pipe
into fragment state. and FragmentState should be more clear.
But this pr will change the behaviour of BE.
I will pick the pr to doris 1.2.3 and add the load id to FE support. The
user could upgrade from 1.2.3 to 2.x
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/pipeline_fragment_context.cpp | 9 +++-
be/src/pipeline/pipeline_fragment_context.h | 5 --
be/src/runtime/fragment_mgr.cpp | 51 +++---------------
be/src/runtime/fragment_mgr.h | 5 --
be/src/runtime/query_fragments_ctx.h | 2 +
be/src/service/internal_service.cpp | 61 +++++++++++-----------
be/test/runtime/fragment_mgr_test.cpp | 2 +-
.../apache/doris/planner/StreamLoadPlanner.java | 1 +
.../apache/doris/qe/InsertStreamTxnExecutor.java | 16 ++++--
.../java/org/apache/doris/qe/StmtExecutor.java | 8 ++-
.../org/apache/doris/rpc/BackendServiceProxy.java | 14 +++--
.../apache/doris/transaction/TransactionEntry.java | 10 ++++
.../doris/load/sync/canal/CanalSyncDataTest.java | 13 ++---
gensrc/proto/internal_service.proto | 3 ++
14 files changed, 94 insertions(+), 106 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index df51da373a..727383d1a5 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -64,6 +64,8 @@
#include "runtime/client_cache.h"
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_state.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
+#include "runtime/stream_load/stream_load_context.h"
#include "task_scheduler.h"
#include "util/container_util.hpp"
#include "vec/exec/join/vhash_join_node.h"
@@ -123,8 +125,11 @@ void PipelineFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
_exec_status = Status::Cancelled(msg);
}
_runtime_state->set_is_cancelled(true);
- if (_pipe != nullptr) {
- _pipe->cancel(PPlanFragmentCancelReason_Name(reason));
+ // Get pipe from new load stream manager and send cancel to it or the
fragment may hang to wait read from pipe
+ // For stream load the fragment's query_id == load id, it is set in FE.
+ auto stream_load_ctx =
_exec_env->new_load_stream_mgr()->get(_query_id);
+ if (stream_load_ctx != nullptr) {
+
stream_load_ctx->pipe->cancel(PPlanFragmentCancelReason_Name(reason));
}
_cancel_reason = reason;
_cancel_msg = msg;
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 0a756ed37f..5bd25e256d 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -100,9 +100,6 @@ public:
void send_report(bool);
- void set_pipe(std::shared_ptr<io::StreamLoadPipe> pipe) { _pipe = pipe; }
- std::shared_ptr<io::StreamLoadPipe> get_pipe() const { return _pipe; }
-
void report_profile();
Status update_status(Status status) {
@@ -171,8 +168,6 @@ private:
RuntimeProfile::Counter* _start_timer;
RuntimeProfile::Counter* _prepare_timer;
- std::shared_ptr<io::StreamLoadPipe> _pipe;
-
std::function<void(RuntimeState*, Status*)> _call_back;
std::once_flag _close_once_flag;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0b15d0a518..7599a707c4 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -134,9 +134,6 @@ public:
std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() { return
_fragments_ctx; }
- void set_pipe(std::shared_ptr<io::StreamLoadPipe> pipe) { _pipe = pipe; }
- std::shared_ptr<io::StreamLoadPipe> get_pipe() const { return _pipe; }
-
void set_need_wait_execution_trigger() { _need_wait_execution_trigger =
true; }
private:
@@ -167,8 +164,6 @@ private:
std::shared_ptr<QueryFragmentsCtx> _fragments_ctx;
std::shared_ptr<RuntimeFilterMergeControllerEntity>
_merge_controller_handler;
- // The pipe for data transfering, such as insert.
- std::shared_ptr<io::StreamLoadPipe> _pipe;
// If set the true, this plan fragment will be executed only after FE send
execution start rpc.
bool _need_wait_execution_trigger = false;
@@ -251,9 +246,14 @@ Status FragmentExecState::cancel(const
PPlanFragmentCancelReason& reason, const
_executor.set_is_report_on_cancel(false);
}
_executor.cancel(reason, msg);
- if (_pipe != nullptr) {
- _pipe->cancel(PPlanFragmentCancelReason_Name(reason));
+#ifndef BE_TEST
+ // Get pipe from new load stream manager and send cancel to it or the
fragment may hang to wait read from pipe
+ // For stream load the fragment's query_id == load id, it is set in FE.
+ auto stream_load_ctx =
_fragments_ctx->exec_env()->new_load_stream_mgr()->get(_query_id);
+ if (stream_load_ctx != nullptr) {
+
stream_load_ctx->pipe->cancel(PPlanFragmentCancelReason_Name(reason));
}
+#endif
_cancelled = true;
}
return Status::OK();
@@ -546,9 +546,6 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params) {
_exec_env->new_load_stream_mgr()->put(stream_load_ctx->id,
stream_load_ctx));
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
- set_pipe(params.params.fragment_instance_id, pipe,
- params.txn_conf.__isset.enable_pipeline_txn_load &&
- params.txn_conf.enable_pipeline_txn_load);
return Status::OK();
} else {
return exec_plan_fragment(params, empty_function);
@@ -576,40 +573,6 @@ Status FragmentMgr::start_query_execution(const
PExecPlanFragmentStartRequest* r
return Status::OK();
}
-void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id,
- std::shared_ptr<io::StreamLoadPipe> pipe, bool
enable_pipeline_engine) {
- if (enable_pipeline_engine) {
- std::lock_guard<std::mutex> lock(_lock);
- auto iter = _pipeline_map.find(fragment_instance_id);
- if (iter != _pipeline_map.end()) {
- _pipeline_map[fragment_instance_id]->set_pipe(std::move(pipe));
- }
- } else {
- std::lock_guard<std::mutex> lock(_lock);
- auto iter = _fragment_map.find(fragment_instance_id);
- if (iter != _fragment_map.end()) {
- _fragment_map[fragment_instance_id]->set_pipe(std::move(pipe));
- }
- }
-}
-
-std::shared_ptr<io::StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId&
fragment_instance_id) {
- {
- std::lock_guard<std::mutex> lock(_lock);
- auto pipeline_iter = _pipeline_map.find(fragment_instance_id);
- if (pipeline_iter != _pipeline_map.end()) {
- return _pipeline_map[fragment_instance_id]->get_pipe();
- } else {
- auto fragment_iter = _fragment_map.find(fragment_instance_id);
- if (fragment_iter != _fragment_map.end()) {
- return _fragment_map[fragment_instance_id]->get_pipe();
- } else {
- return nullptr;
- }
- }
- }
-}
-
void FragmentMgr::remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
std::lock_guard<std::mutex> lock(_lock);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 455e81c8d5..9c066c58f3 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -127,11 +127,6 @@ public:
Status merge_filter(const PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data);
- void set_pipe(const TUniqueId& fragment_instance_id,
std::shared_ptr<io::StreamLoadPipe> pipe,
- bool enable_pipeline_engine);
-
- std::shared_ptr<io::StreamLoadPipe> get_pipe(const TUniqueId&
fragment_instance_id);
-
std::string to_http_path(const std::string& file_name);
void coordinator_callback(const ReportStatusRequest& req);
diff --git a/be/src/runtime/query_fragments_ctx.h
b/be/src/runtime/query_fragments_ctx.h
index a01c3924c0..389cecb860 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -67,6 +67,8 @@ public:
// this may be a bug, bug <= 1 in theory it shouldn't cause any problems
at this stage.
bool countdown() { return fragment_num.fetch_sub(1) <= 1; }
+ ExecEnv* exec_env() { return _exec_env; }
+
bool is_timeout(const DateTimeValue& now) const {
if (timeout_second <= 0) {
return false;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index fdb7c52b7d..db76453d76 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -717,15 +717,16 @@ void
PInternalServiceImpl::send_data(google::protobuf::RpcController* controller
google::protobuf::Closure* done) {
bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
brpc::ClosureGuard closure_guard(done);
- TUniqueId fragment_instance_id;
- fragment_instance_id.hi = request->fragment_instance_id().hi();
- fragment_instance_id.lo = request->fragment_instance_id().lo();
-
- auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
- if (pipe == nullptr) {
+ TUniqueId load_id;
+ load_id.hi = request->load_id().hi();
+ load_id.lo = request->load_id().lo();
+ // On 1.2.3 we add load id to send data request and using load id to
get pipe
+ auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
+ if (stream_load_ctx == nullptr) {
response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs("pipe is null");
+ response->mutable_status()->add_error_msgs("could not find stream
load context");
} else {
+ auto pipe = stream_load_ctx->pipe;
for (int i = 0; i < request->data_size(); ++i) {
PDataRow* row = new PDataRow();
row->CopyFrom(request->data(i));
@@ -748,16 +749,16 @@ void
PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([this, request, response, done]() {
brpc::ClosureGuard closure_guard(done);
- TUniqueId fragment_instance_id;
- fragment_instance_id.hi = request->fragment_instance_id().hi();
- fragment_instance_id.lo = request->fragment_instance_id().lo();
+ TUniqueId load_id;
+ load_id.hi = request->load_id().hi();
+ load_id.lo = request->load_id().lo();
- auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
- if (pipe == nullptr) {
+ auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
+ if (stream_load_ctx == nullptr) {
response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs("pipe is null");
+ response->mutable_status()->add_error_msgs("could not find stream
load context");
} else {
- pipe->finish();
+ stream_load_ctx->pipe->finish();
response->mutable_status()->set_status_code(0);
}
});
@@ -774,16 +775,15 @@ void
PInternalServiceImpl::rollback(google::protobuf::RpcController* controller,
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([this, request, response, done]() {
brpc::ClosureGuard closure_guard(done);
- TUniqueId fragment_instance_id;
- fragment_instance_id.hi = request->fragment_instance_id().hi();
- fragment_instance_id.lo = request->fragment_instance_id().lo();
-
- auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
- if (pipe == nullptr) {
+ TUniqueId load_id;
+ load_id.hi = request->load_id().hi();
+ load_id.lo = request->load_id().lo();
+ auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
+ if (stream_load_ctx == nullptr) {
response->mutable_status()->set_status_code(1);
- response->mutable_status()->add_error_msgs("pipe is null");
+ response->mutable_status()->add_error_msgs("could not find stream
load context");
} else {
- pipe->cancel("rollback");
+ stream_load_ctx->pipe->cancel("rollback");
response->mutable_status()->set_status_code(0);
}
});
@@ -1083,10 +1083,11 @@ void
PInternalServiceImpl::request_slave_tablet_pull_rowset(
// Check file length
uint64_t local_file_size =
std::filesystem::file_size(local_file_path);
if (local_file_size != file_size) {
- LOG(WARNING)
- << "failed to pull rowset for slave replica.
download file length error"
- << ", remote_path=" << remote_file_url << ",
file_size=" << file_size
- << ", local_file_size=" << local_file_size;
+ LOG(WARNING) << "failed to pull rowset for slave replica.
download file "
+ "length error"
+ << ", remote_path=" << remote_file_url
+ << ", file_size=" << file_size
+ << ", local_file_size=" << local_file_size;
return Status::InternalError("downloaded file size is not
equal");
}
chmod(local_file_path.c_str(), S_IRUSR | S_IWUSR);
@@ -1094,10 +1095,10 @@ void
PInternalServiceImpl::request_slave_tablet_pull_rowset(
};
auto st = HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY,
1, download_cb);
if (!st.ok()) {
- LOG(WARNING)
- << "failed to pull rowset for slave replica. failed to
download file. url="
- << remote_file_url << ", local_path=" <<
local_file_path
- << ", txn_id=" << rowset_meta->txn_id();
+ LOG(WARNING) << "failed to pull rowset for slave replica.
failed to download "
+ "file. url="
+ << remote_file_url << ", local_path=" <<
local_file_path
+ << ", txn_id=" << rowset_meta->txn_id();
_response_pull_slave_rowset(host, brpc_port,
rowset_meta->txn_id(),
rowset_meta->tablet_id(), node_id,
false);
return;
diff --git a/be/test/runtime/fragment_mgr_test.cpp
b/be/test/runtime/fragment_mgr_test.cpp
index 3dfc8b0a37..9a3967c031 100644
--- a/be/test/runtime/fragment_mgr_test.cpp
+++ b/be/test/runtime/fragment_mgr_test.cpp
@@ -118,7 +118,7 @@ TEST_F(FragmentMgrTest, OfferPoolFailed) {
config::fragment_pool_thread_num_min = 1;
config::fragment_pool_thread_num_max = 1;
config::fragment_pool_queue_size = 0;
- FragmentMgr mgr(nullptr);
+ FragmentMgr mgr(doris::ExecEnv::GetInstance());
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 9b5e15da75..16c6f38835 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -206,6 +206,7 @@ public class StreamLoadPlanner {
fileStatus.setIsDir(false);
fileStatus.setSize(-1); // must set to -1, means stream.
}
+ // The load id will pass to csv reader to find the stream load context
from new load stream manager
fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable,
BrokerDesc.createForStreamLoad(),
fileGroup, fileStatus, taskInfo.isStrictMode(),
taskInfo.getFileType(), taskInfo.getHiddenColumns());
scanNode = fileScanNode;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index 5f9e1e7972..324c6d523a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -61,9 +61,11 @@ public class InsertStreamTxnExecutor {
public void beginTransaction(TStreamLoadPutRequest request) throws
UserException, TException, TimeoutException,
InterruptedException, ExecutionException {
TTxnParams txnConf = txnEntry.getTxnConf();
+ // StreamLoadTask's id == request's load_id
StreamLoadTask streamLoadTask =
StreamLoadTask.fromTStreamLoadPutRequest(request);
StreamLoadPlanner planner = new StreamLoadPlanner(
txnEntry.getDb(), (OlapTable) txnEntry.getTable(),
streamLoadTask);
+ // Will using load id as query id in fragment
TExecPlanFragmentParams tRequest =
planner.plan(streamLoadTask.getId());
BeSelectionPolicy policy = new
BeSelectionPolicy.Builder().setCluster(txnEntry.getDb().getClusterName())
.needLoadAvailable().needQueryAvailable().build();
@@ -82,6 +84,10 @@ public class InsertStreamTxnExecutor {
}
}
txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id);
+ this.loadId = request.getLoadId();
+ this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+ .setHi(loadId.getHi())
+ .setLo(loadId.getLo()).build());
Backend backend =
Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
txnConf.setUserIp(backend.getIp());
@@ -109,11 +115,12 @@ public class InsertStreamTxnExecutor {
.setHi(txnConf.getFragmentInstanceId().getHi())
.setLo(txnConf.getFragmentInstanceId().getLo()).build();
+
Backend backend = txnEntry.getBackend();
TNetworkAddress address = new TNetworkAddress(backend.getIp(),
backend.getBrpcPort());
try {
Future<InternalService.PCommitResult> future = BackendServiceProxy
- .getInstance().commit(address, fragmentInstanceId);
+ .getInstance().commit(address, fragmentInstanceId,
this.txnEntry.getpLoadId());
InternalService.PCommitResult result = future.get(5,
TimeUnit.SECONDS);
TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
@@ -135,7 +142,7 @@ public class InsertStreamTxnExecutor {
TNetworkAddress address = new TNetworkAddress(be.getIp(),
be.getBrpcPort());
try {
Future<InternalService.PRollbackResult> future =
BackendServiceProxy.getInstance().rollback(address,
- fragmentInstanceId);
+ fragmentInstanceId, this.txnEntry.getpLoadId());
InternalService.PRollbackResult result = future.get(5,
TimeUnit.SECONDS);
TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
@@ -161,7 +168,7 @@ public class InsertStreamTxnExecutor {
TNetworkAddress address = new TNetworkAddress(backend.getIp(),
backend.getBrpcPort());
try {
Future<InternalService.PSendDataResult> future =
BackendServiceProxy.getInstance().sendData(
- address, fragmentInstanceId, txnEntry.getDataToSend());
+ address, fragmentInstanceId, this.txnEntry.getpLoadId(),
txnEntry.getDataToSend());
InternalService.PSendDataResult result = future.get(5,
TimeUnit.SECONDS);
TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
@@ -180,6 +187,9 @@ public class InsertStreamTxnExecutor {
public void setLoadId(TUniqueId loadId) {
this.loadId = loadId;
+ this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+ .setHi(loadId.getHi())
+ .setLo(loadId.getLo()).build());
}
public long getTxnId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 0ba673baa9..73f7bcca55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -623,10 +623,8 @@ public class StmtExecutor implements ProfileWriter {
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
e.getMessage());
throw e;
} catch (UserException e) {
- LOG.warn("", e);
// analysis exception only print message, not print the stack
- LOG.warn("execute Exception. {}, {}", context.getQueryIdentifier(),
- e.getMessage());
+ LOG.warn("execute Exception. {}", context.getQueryIdentifier(), e);
context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
} catch (Exception e) {
@@ -1358,8 +1356,7 @@ public class StmtExecutor implements ProfileWriter {
if (context.getTxnEntry() == null) {
context.setTxnEntry(new TransactionEntry());
}
- TransactionEntry txnEntry = context.getTxnEntry();
- txnEntry.setTxnConf(txnParams);
+ context.getTxnEntry().setTxnConf(txnParams);
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("',
'status':'")
.append(TransactionStatus.PREPARE.name());
@@ -1405,6 +1402,7 @@ public class StmtExecutor implements ProfileWriter {
.append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}");
context.getState().setOk(0, 0, sb.toString());
} catch (Exception e) {
+ LOG.warn("Txn commit failed", e);
throw new AnalysisException(e.getMessage());
} finally {
context.setTxnEntry(null);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 1a09b0721c..3c04380461 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -294,11 +294,13 @@ public class BackendServiceProxy {
}
public Future<InternalService.PSendDataResult> sendData(
- TNetworkAddress address, Types.PUniqueId fragmentInstanceId,
List<InternalService.PDataRow> data)
+ TNetworkAddress address, Types.PUniqueId fragmentInstanceId,
+ Types.PUniqueId loadId, List<InternalService.PDataRow> data)
throws RpcException {
final InternalService.PSendDataRequest.Builder pRequest =
InternalService.PSendDataRequest.newBuilder();
pRequest.setFragmentInstanceId(fragmentInstanceId);
+ pRequest.setLoadId(loadId);
pRequest.addAllData(data);
try {
final BackendServiceClient client = getProxy(address);
@@ -309,10 +311,11 @@ public class BackendServiceProxy {
}
}
- public Future<InternalService.PRollbackResult> rollback(TNetworkAddress
address, Types.PUniqueId fragmentInstanceId)
+ public Future<InternalService.PRollbackResult> rollback(TNetworkAddress
address,
+ Types.PUniqueId fragmentInstanceId, Types.PUniqueId loadId)
throws RpcException {
final InternalService.PRollbackRequest pRequest =
InternalService.PRollbackRequest.newBuilder()
- .setFragmentInstanceId(fragmentInstanceId).build();
+
.setFragmentInstanceId(fragmentInstanceId).setLoadId(loadId).build();
try {
final BackendServiceClient client = getProxy(address);
return client.rollback(pRequest);
@@ -322,10 +325,11 @@ public class BackendServiceProxy {
}
}
- public Future<InternalService.PCommitResult> commit(TNetworkAddress
address, Types.PUniqueId fragmentInstanceId)
+ public Future<InternalService.PCommitResult> commit(TNetworkAddress
address,
+ Types.PUniqueId fragmentInstanceId, Types.PUniqueId loadId)
throws RpcException {
final InternalService.PCommitRequest pRequest =
InternalService.PCommitRequest.newBuilder()
- .setFragmentInstanceId(fragmentInstanceId).build();
+
.setFragmentInstanceId(fragmentInstanceId).setLoadId(loadId).build();
try {
final BackendServiceClient client = getProxy(address);
return client.commit(pRequest);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index 4db596dc55..7136871579 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -20,6 +20,7 @@ package org.apache.doris.transaction;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TTxnParams;
@@ -35,6 +36,7 @@ public class TransactionEntry {
private TTxnParams txnConf;
private List<InternalService.PDataRow> dataToSend = new ArrayList<>();
private long rowsInTransaction = 0;
+ private Types.PUniqueId pLoadId;
public TransactionEntry() {
}
@@ -116,4 +118,12 @@ public class TransactionEntry {
public void setRowsInTransaction(long rowsInTransaction) {
this.rowsInTransaction = rowsInTransaction;
}
+
+ public Types.PUniqueId getpLoadId() {
+ return pLoadId;
+ }
+
+ public void setpLoadId(Types.PUniqueId pLoadId) {
+ this.pLoadId = pLoadId;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index 430d8d204e..a1f31b0b34 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -263,11 +263,11 @@ public class CanalSyncDataTest {
minTimes = 0;
result = execFuture;
- backendServiceProxy.commit((TNetworkAddress) any,
(Types.PUniqueId) any);
+ backendServiceProxy.commit((TNetworkAddress) any,
(Types.PUniqueId) any, (Types.PUniqueId) any);
minTimes = 0;
result = commitFuture;
- backendServiceProxy.sendData((TNetworkAddress) any,
(Types.PUniqueId) any,
+ backendServiceProxy.sendData((TNetworkAddress) any,
(Types.PUniqueId) any, (Types.PUniqueId) any,
(List<InternalService.PDataRow>) any);
minTimes = 0;
result = sendDataFuture;
@@ -336,7 +336,7 @@ public class CanalSyncDataTest {
minTimes = 0;
result = execFuture;
- backendServiceProxy.rollback((TNetworkAddress) any,
(Types.PUniqueId) any);
+ backendServiceProxy.rollback((TNetworkAddress) any,
(Types.PUniqueId) any, (Types.PUniqueId) any);
minTimes = 0;
result = abortFuture;
@@ -403,15 +403,16 @@ public class CanalSyncDataTest {
minTimes = 0;
result = execFuture;
- backendServiceProxy.commit((TNetworkAddress) any,
(Types.PUniqueId) any);
+ backendServiceProxy.commit((TNetworkAddress) any,
(Types.PUniqueId) any, (Types.PUniqueId) any);
minTimes = 0;
result = commitFuture;
- backendServiceProxy.rollback((TNetworkAddress) any,
(Types.PUniqueId) any);
+ backendServiceProxy.rollback((TNetworkAddress) any,
(Types.PUniqueId) any, (Types.PUniqueId) any);
minTimes = 0;
result = abortFuture;
- backendServiceProxy.sendData((TNetworkAddress) any,
(Types.PUniqueId) any, (List<InternalService.PDataRow>) any);
+ backendServiceProxy.sendData((TNetworkAddress) any,
(Types.PUniqueId) any,
+ (Types.PUniqueId) any,
(List<InternalService.PDataRow>) any);
minTimes = 0;
result = sendDataFuture;
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 05746715c3..3cff8fd733 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -385,6 +385,7 @@ message PDataRow {
message PSendDataRequest {
required PUniqueId fragment_instance_id = 1;
repeated PDataRow data = 2;
+ optional PUniqueId load_id = 3; // load_id == query_id in fragment exec
}
message PSendDataResult {
@@ -393,6 +394,7 @@ message PSendDataResult {
message PCommitRequest {
required PUniqueId fragment_instance_id = 1;
+ optional PUniqueId load_id = 2;
}
message PCommitResult {
@@ -401,6 +403,7 @@ message PCommitResult {
message PRollbackRequest {
required PUniqueId fragment_instance_id = 1;
+ optional PUniqueId load_id = 2;
}
message PRollbackResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]