This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b6d876280 [feature](move-memtable)[6/7] add options to enable 
memtable on sink node (#23470)
2b6d876280 is described below

commit 2b6d876280cd52e3b1389559876edad6fe2ed060
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Aug 25 22:32:22 2023 +0800

    [feature](move-memtable)[6/7] add options to enable memtable on sink node 
(#23470)
    
    
    Co-authored-by: Siyang Tang 
<[email protected]>
---
 be/src/exec/data_sink.cpp                          | 13 +++++-
 be/src/http/action/stream_load.cpp                 |  4 ++
 be/src/http/http_common.h                          |  1 +
 be/src/pipeline/exec/olap_table_sink_v2_operator.h | 49 ++++++++++++++++++++++
 be/src/pipeline/pipeline_fragment_context.cpp      | 10 ++++-
 docs/en/docs/advanced/variables.md                 |  9 ++++
 .../import/import-way/stream-load-manual.md        |  9 ++++
 docs/zh-CN/docs/advanced/variables.md              |  9 ++++
 .../import/import-way/stream-load-manual.md        |  9 ++++
 .../apache/doris/planner/StreamLoadPlanner.java    |  2 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  7 ++++
 .../java/org/apache/doris/task/LoadTaskInfo.java   |  4 ++
 .../java/org/apache/doris/task/StreamLoadTask.java | 14 +++++++
 gensrc/thrift/FrontendService.thrift               |  1 +
 gensrc/thrift/PaloInternalService.thrift           |  2 +
 15 files changed, 139 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 886233ad01..53f44dd590 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -37,6 +37,7 @@
 #include "vec/sink/vresult_sink.h"
 #include "vec/sink/vtable_sink.h"
 #include "vec/sink/vtablet_sink.h"
+#include "vec/sink/vtablet_sink_v2.h"
 
 namespace doris {
 class DescriptorTbl;
@@ -148,7 +149,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
     case TDataSinkType::OLAP_TABLE_SINK: {
         Status status;
         DCHECK(thrift_sink.__isset.olap_table_sink);
-        sink->reset(new stream_load::VOlapTableSink(pool, row_desc, 
output_exprs, &status));
+        if (state->query_options().enable_memtable_on_sink_node) {
+            sink->reset(new stream_load::VOlapTableSinkV2(pool, row_desc, 
output_exprs, &status));
+        } else {
+            sink->reset(new stream_load::VOlapTableSink(pool, row_desc, 
output_exprs, &status));
+        }
         RETURN_IF_ERROR(status);
         break;
     }
@@ -286,7 +291,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
     case TDataSinkType::OLAP_TABLE_SINK: {
         Status status;
         DCHECK(thrift_sink.__isset.olap_table_sink);
-        sink->reset(new stream_load::VOlapTableSink(pool, row_desc, 
output_exprs, &status));
+        if (state->query_options().enable_memtable_on_sink_node) {
+            sink->reset(new stream_load::VOlapTableSinkV2(pool, row_desc, 
output_exprs, &status));
+        } else {
+            sink->reset(new stream_load::VOlapTableSink(pool, row_desc, 
output_exprs, &status));
+        }
         RETURN_IF_ERROR(status);
         break;
     }
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 6f5e1ed91d..85f1fb80a9 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -547,6 +547,10 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
             request.__set_partial_update(false);
         }
     }
+    if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
+        bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), 
"true");
+        request.__set_memtable_on_sink_node(value);
+    }
 
 #ifndef BE_TEST
     // plan this load
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index ed6a77eb5d..bcbfa33e10 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -62,5 +62,6 @@ static const std::string HTTP_SQL = "sql";
 static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
 static const std::string HTTP_TXN_ID_KEY = "txn_id";
 static const std::string HTTP_TXN_OPERATION_KEY = "txn_operation";
+static const std::string HTTP_MEMTABLE_ON_SINKNODE = "memtable_on_sink_node";
 
 } // namespace doris
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
new file mode 100644
index 0000000000..75f56f6ba6
--- /dev/null
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "operator.h"
+#include "vec/sink/vtablet_sink_v2.h"
+
+namespace doris {
+
+namespace pipeline {
+
+class OlapTableSinkV2OperatorBuilder final
+        : public DataSinkOperatorBuilder<stream_load::VOlapTableSinkV2> {
+public:
+    OlapTableSinkV2OperatorBuilder(int32_t id, DataSink* sink)
+            : DataSinkOperatorBuilder(id, "OlapTableSinkV2Operator", sink) {}
+
+    OperatorPtr build_operator() override;
+};
+
+class OlapTableSinkV2Operator final : public 
DataSinkOperator<OlapTableSinkV2OperatorBuilder> {
+public:
+    OlapTableSinkV2Operator(OperatorBuilderBase* operator_builder, DataSink* 
sink)
+            : DataSinkOperator(operator_builder, sink) {}
+
+    bool can_write() override { return true; } // TODO: need use mem_limit
+};
+
+OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() {
+    return std::make_shared<OlapTableSinkV2Operator>(this, _sink);
+}
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 871c554781..c2a5080eed 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -62,6 +62,7 @@
 #include "pipeline/exec/nested_loop_join_build_operator.h"
 #include "pipeline/exec/nested_loop_join_probe_operator.h"
 #include "pipeline/exec/olap_table_sink_operator.h"
+#include "pipeline/exec/olap_table_sink_v2_operator.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/partition_sort_sink_operator.h"
 #include "pipeline/exec/partition_sort_source_operator.h"
@@ -753,8 +754,13 @@ Status PipelineFragmentContext::_create_sink(int 
sender_id, const TDataSink& thr
         break;
     }
     case TDataSinkType::OLAP_TABLE_SINK: {
-        sink_ = 
std::make_shared<OlapTableSinkOperatorBuilder>(next_operator_builder_id(),
-                                                               _sink.get());
+        if (state->query_options().enable_memtable_on_sink_node) {
+            sink_ = 
std::make_shared<OlapTableSinkV2OperatorBuilder>(next_operator_builder_id(),
+                                                                     
_sink.get());
+        } else {
+            sink_ = 
std::make_shared<OlapTableSinkOperatorBuilder>(next_operator_builder_id(),
+                                                                   
_sink.get());
+        }
         break;
     }
     case TDataSinkType::MYSQL_TABLE_SINK:
diff --git a/docs/en/docs/advanced/variables.md 
b/docs/en/docs/advanced/variables.md
index 5829d63cf6..97dee5752c 100644
--- a/docs/en/docs/advanced/variables.md
+++ b/docs/en/docs/advanced/variables.md
@@ -683,6 +683,15 @@ Translated with www.DeepL.com/Translator (free version)
   
   It is used for the ReplacingMergeTree table engine of ClickHouse to 
deduplicate queries.
 
+* `enable_memtable_on_sink_node`
+
+  <version since="2.1.0">
+  Whether to enable MemTable on DataSink node when loading data, default is 
false.
+  </version>
+
+  Build MemTable on DataSink node, and send segments to other backends through 
brpc streaming.
+  It reduces duplicate work among replicas, and saves time in data 
serialization & deserialization.
+
 ***
 
 #### Supplementary instructions on statement execution timeout control
diff --git a/docs/en/docs/data-operate/import/import-way/stream-load-manual.md 
b/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
index 530cadf317..f18bccc435 100644
--- a/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
+++ b/docs/en/docs/data-operate/import/import-way/stream-load-manual.md
@@ -231,6 +231,15 @@ Stream load uses HTTP protocol, so all parameters related 
to import tasks are se
 
   <version since="1.2.7">When `enable_profile` is true, the Stream Load 
profile will be printed to logs (be.INFO).</version>
 
++ memtable_on_sink_node
+
+  <version since="2.1.0">
+  Whether to enable MemTable on DataSink node when loading data, default is 
false.
+  </version>
+
+  Build MemTable on DataSink node, and send segments to other backends through 
brpc streaming.
+  It reduces duplicate work among replicas, and saves time in data 
serialization & deserialization.
+
 ### Use stream load with SQL
 
 You can add a `sql` parameter to the `Header` to replace the 
`column_separator`, `line_delimiter`, `where`, `columns` in the previous 
parameter, which is convenient to use.
diff --git a/docs/zh-CN/docs/advanced/variables.md 
b/docs/zh-CN/docs/advanced/variables.md
index 315fc04355..c07185dc1f 100644
--- a/docs/zh-CN/docs/advanced/variables.md
+++ b/docs/zh-CN/docs/advanced/variables.md
@@ -670,6 +670,15 @@ try (Connection conn = 
DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/
 
   用于 ClickHouse 的 ReplacingMergeTree 表引擎查询去重
 
+* `enable_memtable_on_sink_node`
+
+  <version since="2.1.0">
+  是否在数据导入中启用 MemTable 前移,默认为 false
+  </version>
+
+  在 DataSink 节点上构建 MemTable,并通过 brpc streaming 发送 segment 到其他 BE。
+  该方法减少了多副本之间的重复工作,并且节省了数据序列化和反序列化的时间。
+
 ***
 
 #### 关于语句执行超时控制的补充说明
diff --git 
a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md 
b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
index 7e5d6a4dbf..115cc954b8 100644
--- a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
+++ b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md
@@ -242,6 +242,15 @@ Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的
 
   <version since="1.2.7">当 `enable_profile` 为 true 时,Stream Load profile 
将会被打印到 be.INFO 日志中。</version>
 
+- memtable_on_sink_node
+
+  <version since="2.1.0">
+  是否在数据导入中启用 MemTable 前移,默认为 false
+  </version>
+
+  在 DataSink 节点上构建 MemTable,并通过 brpc streaming 发送 segment 到其他 BE。
+  该方法减少了多副本之间的重复工作,并且节省了数据序列化和反序列化的时间。
+
 ### 使用SQL表达Stream Load的参数
 
 
可以在Header中添加一个`sql`的参数,去替代之前参数中的`column_separator`、`line_delimiter`、`where`、`columns`参数,方便使用。
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index b0e145dd71..5bf07f1f72 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -298,6 +298,7 @@ public class StreamLoadPlanner {
         queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
         queryOptions.setBeExecVersion(Config.be_exec_version);
         queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
+        
queryOptions.setEnableMemtableOnSinkNode(taskInfo.isMemtableOnSinkNode());
 
         params.setQueryOptions(queryOptions);
         TQueryGlobals queryGlobals = new TQueryGlobals();
@@ -504,6 +505,7 @@ public class StreamLoadPlanner {
         queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
         queryOptions.setBeExecVersion(Config.be_exec_version);
         queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
+        
queryOptions.setEnableMemtableOnSinkNode(taskInfo.isMemtableOnSinkNode());
 
         pipParams.setQueryOptions(queryOptions);
         TQueryGlobals queryGlobals = new TQueryGlobals();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index a61b49781a..ee66fa3eb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -383,6 +383,9 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String JDBC_CLICKHOUSE_QUERY_FINAL = 
"jdbc_clickhouse_query_final";
 
+    public static final String ENABLE_MEMTABLE_ON_SINK_NODE =
+            "enable_memtable_on_sink_node";
+
     public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
             SKIP_DELETE_PREDICATE,
             SKIP_DELETE_BITMAP,
@@ -1119,6 +1122,9 @@ public class SessionVariable implements Serializable, 
Writable {
             needForward = true)
     public boolean truncateCharOrVarcharColumns = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_MEMTABLE_ON_SINK_NODE, needForward = 
true)
+    public boolean enableMemtableOnSinkNode = false;
+
     // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to 
generate some variables,
     // not the default value set in the code.
     public void initFuzzyModeVariables() {
@@ -2200,6 +2206,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setEnableOrcLazyMat(enableOrcLazyMat);
 
         tResult.setTruncateCharOrVarcharColumns(truncateCharOrVarcharColumns);
+        tResult.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode);
 
         return tResult;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index 4fe306d3d8..3174e4d5c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -121,6 +121,10 @@ public interface LoadTaskInfo {
         return false;
     }
 
+    default boolean isMemtableOnSinkNode() {
+        return false;
+    }
+
     class ImportColumnDescs {
         public List<ImportColumnDesc> descs = Lists.newArrayList();
         public boolean isColumnDescsRewrited = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 222ca2dd21..c99c720ee0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -88,6 +88,8 @@ public class StreamLoadTask implements LoadTaskInfo {
     private int skipLines = 0;
     private boolean enableProfile = false;
 
+    private boolean memtableOnSinkNode = false;
+
     private byte enclose = 0;
 
     private byte escape = 0;
@@ -297,6 +299,15 @@ public class StreamLoadTask implements LoadTaskInfo {
         return isPartialUpdate;
     }
 
+    @Override
+    public boolean isMemtableOnSinkNode() {
+        return memtableOnSinkNode;
+    }
+
+    public void setMemtableOnSinkNode(boolean memtableOnSinkNode) {
+        this.memtableOnSinkNode = memtableOnSinkNode;
+    }
+
     public static StreamLoadTask 
fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
         StreamLoadTask streamLoadTask = new 
StreamLoadTask(request.getLoadId(), request.getTxnId(),
                 request.getFileType(), request.getFormatType(),
@@ -430,6 +441,9 @@ public class StreamLoadTask implements LoadTaskInfo {
         if (request.isSetPartialUpdate()) {
             isPartialUpdate = request.isPartialUpdate();
         }
+        if (request.isSetMemtableOnSinkNode()) {
+            this.memtableOnSinkNode = request.isMemtableOnSinkNode();
+        }
     }
 
     // used for stream load
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index d916cfe9c1..c1023331a2 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -614,6 +614,7 @@ struct TStreamLoadPutRequest {
     51: optional i8 enclose
     // only valid when file type is CSV
     52: optional i8 escape
+    53: optional bool memtable_on_sink_node;
 }
 
 struct TStreamLoadPutResult {
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index afc1b11a5d..ba204f474e 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -233,6 +233,8 @@ struct TQueryOptions {
   78: optional bool enable_hash_join_early_start_probe = false
 
   79: optional bool enable_pipeline_x_engine = false;
+
+  80: optional bool enable_memtable_on_sink_node = false;
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to