This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2b6d876280 [feature](move-memtable)[6/7] add options to enable
memtable on sink node (#23470)
2b6d876280 is described below
commit 2b6d876280cd52e3b1389559876edad6fe2ed060
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Aug 25 22:32:22 2023 +0800
[feature](move-memtable)[6/7] add options to enable memtable on sink node
(#23470)
Co-authored-by: Siyang Tang
<[email protected]>
---
be/src/exec/data_sink.cpp | 13 +++++-
be/src/http/action/stream_load.cpp | 4 ++
be/src/http/http_common.h | 1 +
be/src/pipeline/exec/olap_table_sink_v2_operator.h | 49 ++++++++++++++++++++++
be/src/pipeline/pipeline_fragment_context.cpp | 10 ++++-
docs/en/docs/advanced/variables.md | 9 ++++
.../import/import-way/stream-load-manual.md | 9 ++++
docs/zh-CN/docs/advanced/variables.md | 9 ++++
.../import/import-way/stream-load-manual.md | 9 ++++
.../apache/doris/planner/StreamLoadPlanner.java | 2 +
.../java/org/apache/doris/qe/SessionVariable.java | 7 ++++
.../java/org/apache/doris/task/LoadTaskInfo.java | 4 ++
.../java/org/apache/doris/task/StreamLoadTask.java | 14 +++++++
gensrc/thrift/FrontendService.thrift | 1 +
gensrc/thrift/PaloInternalService.thrift | 2 +
15 files changed, 139 insertions(+), 4 deletions(-)
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 886233ad01..53f44dd590 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -37,6 +37,7 @@
#include "vec/sink/vresult_sink.h"
#include "vec/sink/vtable_sink.h"
#include "vec/sink/vtablet_sink.h"
+#include "vec/sink/vtablet_sink_v2.h"
namespace doris {
class DescriptorTbl;
@@ -148,7 +149,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
case TDataSinkType::OLAP_TABLE_SINK: {
Status status;
DCHECK(thrift_sink.__isset.olap_table_sink);
- sink->reset(new stream_load::VOlapTableSink(pool, row_desc,
output_exprs, &status));
+ if (state->query_options().enable_memtable_on_sink_node) {
+ sink->reset(new stream_load::VOlapTableSinkV2(pool, row_desc,
output_exprs, &status));
+ } else {
+ sink->reset(new stream_load::VOlapTableSink(pool, row_desc,
output_exprs, &status));
+ }
RETURN_IF_ERROR(status);
break;
}
@@ -286,7 +291,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
case TDataSinkType::OLAP_TABLE_SINK: {
Status status;
DCHECK(thrift_sink.__isset.olap_table_sink);
- sink->reset(new stream_load::VOlapTableSink(pool, row_desc,
output_exprs, &status));
+ if (state->query_options().enable_memtable_on_sink_node) {
+ sink->reset(new stream_load::VOlapTableSinkV2(pool, row_desc,
output_exprs, &status));
+ } else {
+ sink->reset(new stream_load::VOlapTableSink(pool, row_desc,
output_exprs, &status));
+ }
RETURN_IF_ERROR(status);
break;
}
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 6f5e1ed91d..85f1fb80a9 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -547,6 +547,10 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
request.__set_partial_update(false);
}
}
+ if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
+ bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE),
"true");
+ request.__set_memtable_on_sink_node(value);
+ }
#ifndef BE_TEST
// plan this load
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index ed6a77eb5d..bcbfa33e10 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -62,5 +62,6 @@ static const std::string HTTP_SQL = "sql";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
static const std::string HTTP_TXN_OPERATION_KEY = "txn_operation";
+static const std::string HTTP_MEMTABLE_ON_SINKNODE = "memtable_on_sink_node";
} // namespace doris
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
new file mode 100644
index 0000000000..75f56f6ba6
--- /dev/null
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "operator.h"
+#include "vec/sink/vtablet_sink_v2.h"
+
+namespace doris {
+
+namespace pipeline {
+
+class OlapTableSinkV2OperatorBuilder final
+ : public DataSinkOperatorBuilder<stream_load::VOlapTableSinkV2> {
+public:
+ OlapTableSinkV2OperatorBuilder(int32_t id, DataSink* sink)
+ : DataSinkOperatorBuilder(id, "OlapTableSinkV2Operator", sink) {}
+
+ OperatorPtr build_operator() override;
+};
+
+class OlapTableSinkV2Operator final : public
DataSinkOperator<OlapTableSinkV2OperatorBuilder> {
+public:
+ OlapTableSinkV2Operator(OperatorBuilderBase* operator_builder, DataSink*
sink)
+ : DataSinkOperator(operator_builder, sink) {}
+
+ bool can_write() override { return true; } // TODO: need use mem_limit
+};
+
+OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() {
+ return std::make_shared<OlapTableSinkV2Operator>(this, _sink);
+}
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 871c554781..c2a5080eed 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -62,6 +62,7 @@
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
+#include "pipeline/exec/olap_table_sink_v2_operator.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
@@ -753,8 +754,13 @@ Status PipelineFragmentContext::_create_sink(int
sender_id, const TDataSink& thr
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
- sink_ =
std::make_shared<OlapTableSinkOperatorBuilder>(next_operator_builder_id(),
- _sink.get());
+ if (state->query_options().enable_memtable_on_sink_node) {
+ sink_ =
std::make_shared<OlapTableSinkV2OperatorBuilder>(next_operator_builder_id(),
+
_sink.get());
+ } else {
+ sink_ =
std::make_shared<OlapTableSinkOperatorBuilder>(next_operator_builder_id(),
+
_sink.get());
+ }
break;
}
case TDataSinkType::MYSQL_TABLE_SINK:
diff --git a/docs/en/docs/advanced/variables.md
b/docs/en/docs/advanced/variables.md
index 5829d63cf6..97dee5752c 100644
--- a/docs/en/docs/advanced/variables.md
+++ b/docs/en/docs/advanced/variables.md
@@ -683,6 +683,15 @@ Translated with www.DeepL.com/Translator (free version)
It is used for the ReplacingMergeTree table engine of ClickHouse to
deduplicate queries.
+* `enable_memtable_on_sink_node`
+
+ <version since="2.1.0">
+ Whether to enable MemTable on DataSink node when loading data, default is
false.
+ </version>
+
+ Build MemTable on DataSink node, and send segments to other backends through
brpc streaming.
+ It reduces duplicate work among replicas, and saves time in data
serialization & deserialization.
+
***
#### Supplementary instructions on statement execution timeout control
diff --git a/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
b/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
index 530cadf317..f18bccc435 100644
--- a/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
+++ b/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
@@ -231,6 +231,15 @@ Stream load uses HTTP protocol, so all parameters related
to import tasks are se
<version since="1.2.7">When `enable_profile` is true, the Stream Load
profile will be printed to logs (be.INFO).</version>
++ memtable_on_sink_node
+
+ <version since="2.1.0">
+ Whether to enable MemTable on DataSink node when loading data, default is
false.
+ </version>
+
+ Build MemTable on DataSink node, and send segments to other backends through
brpc streaming.
+ It reduces duplicate work among replicas, and saves time in data
serialization & deserialization.
+
### Use stream load with SQL
You can add a `sql` parameter to the `Header` to replace the
`column_separator`, `line_delimiter`, `where`, `columns` in the previous
parameter, which is convenient to use.
diff --git a/docs/zh-CN/docs/advanced/variables.md
b/docs/zh-CN/docs/advanced/variables.md
index 315fc04355..c07185dc1f 100644
--- a/docs/zh-CN/docs/advanced/variables.md
+++ b/docs/zh-CN/docs/advanced/variables.md
@@ -670,6 +670,15 @@ try (Connection conn =
DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/
用于 ClickHouse 的 ReplacingMergeTree 表引擎查询去重
+* `enable_memtable_on_sink_node`
+
+ <version since="2.1.0">
+ 是否在数据导入中启用 MemTable 前移,默认为 false
+ </version>
+
+ 在 DataSink 节点上构建 MemTable,并通过 brpc streaming 发送 segment 到其他 BE。
+ 该方法减少了多副本之间的重复工作,并且节省了数据序列化和反序列化的时间。
+
***
#### 关于语句执行超时控制的补充说明
diff --git
a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
index 7e5d6a4dbf..115cc954b8 100644
--- a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
+++ b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
@@ -242,6 +242,15 @@ Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的
<version since="1.2.7">当 `enable_profile` 为 true 时,Stream Load profile
将会被打印到 be.INFO 日志中。</version>
+- memtable_on_sink_node
+
+ <version since="2.1.0">
+ 是否在数据导入中启用 MemTable 前移,默认为 false
+ </version>
+
+ 在 DataSink 节点上构建 MemTable,并通过 brpc streaming 发送 segment 到其他 BE。
+ 该方法减少了多副本之间的重复工作,并且节省了数据序列化和反序列化的时间。
+
### 使用SQL表达Stream Load的参数
可以在Header中添加一个`sql`的参数,去替代之前参数中的`column_separator`、`line_delimiter`、`where`、`columns`参数,方便使用。
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index b0e145dd71..5bf07f1f72 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -298,6 +298,7 @@ public class StreamLoadPlanner {
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
+
queryOptions.setEnableMemtableOnSinkNode(taskInfo.isMemtableOnSinkNode());
params.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
@@ -504,6 +505,7 @@ public class StreamLoadPlanner {
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
+
queryOptions.setEnableMemtableOnSinkNode(taskInfo.isMemtableOnSinkNode());
pipParams.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index a61b49781a..ee66fa3eb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -383,6 +383,9 @@ public class SessionVariable implements Serializable,
Writable {
public static final String JDBC_CLICKHOUSE_QUERY_FINAL =
"jdbc_clickhouse_query_final";
+ public static final String ENABLE_MEMTABLE_ON_SINK_NODE =
+ "enable_memtable_on_sink_node";
+
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
@@ -1119,6 +1122,9 @@ public class SessionVariable implements Serializable,
Writable {
needForward = true)
public boolean truncateCharOrVarcharColumns = false;
+ @VariableMgr.VarAttr(name = ENABLE_MEMTABLE_ON_SINK_NODE, needForward =
true)
+ public boolean enableMemtableOnSinkNode = false;
+
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to
generate some variables,
// not the default value set in the code.
public void initFuzzyModeVariables() {
@@ -2200,6 +2206,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableOrcLazyMat(enableOrcLazyMat);
tResult.setTruncateCharOrVarcharColumns(truncateCharOrVarcharColumns);
+ tResult.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode);
return tResult;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index 4fe306d3d8..3174e4d5c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -121,6 +121,10 @@ public interface LoadTaskInfo {
return false;
}
+ default boolean isMemtableOnSinkNode() {
+ return false;
+ }
+
class ImportColumnDescs {
public List<ImportColumnDesc> descs = Lists.newArrayList();
public boolean isColumnDescsRewrited = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 222ca2dd21..c99c720ee0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -88,6 +88,8 @@ public class StreamLoadTask implements LoadTaskInfo {
private int skipLines = 0;
private boolean enableProfile = false;
+ private boolean memtableOnSinkNode = false;
+
private byte enclose = 0;
private byte escape = 0;
@@ -297,6 +299,15 @@ public class StreamLoadTask implements LoadTaskInfo {
return isPartialUpdate;
}
+ @Override
+ public boolean isMemtableOnSinkNode() {
+ return memtableOnSinkNode;
+ }
+
+ public void setMemtableOnSinkNode(boolean memtableOnSinkNode) {
+ this.memtableOnSinkNode = memtableOnSinkNode;
+ }
+
public static StreamLoadTask
fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new
StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType(),
@@ -430,6 +441,9 @@ public class StreamLoadTask implements LoadTaskInfo {
if (request.isSetPartialUpdate()) {
isPartialUpdate = request.isPartialUpdate();
}
+ if (request.isSetMemtableOnSinkNode()) {
+ this.memtableOnSinkNode = request.isMemtableOnSinkNode();
+ }
}
// used for stream load
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index d916cfe9c1..c1023331a2 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -614,6 +614,7 @@ struct TStreamLoadPutRequest {
51: optional i8 enclose
// only valid when file type is CSV
52: optional i8 escape
+ 53: optional bool memtable_on_sink_node;
}
struct TStreamLoadPutResult {
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index afc1b11a5d..ba204f474e 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -233,6 +233,8 @@ struct TQueryOptions {
78: optional bool enable_hash_join_early_start_probe = false
79: optional bool enable_pipeline_x_engine = false;
+
+ 80: optional bool enable_memtable_on_sink_node = false;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]