This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6d0c1ad6328b4443934dfa489609cc9c8148c669
Author: meiyi <myime...@gmail.com>
AuthorDate: Thu Mar 7 11:13:31 2024 +0800

    [fix](http stream) http stream support memtable_on_sink_node header (#31866)
---
 be/src/http/action/http_stream.cpp                 |  4 ++++
 .../apache/doris/service/FrontendServiceImpl.java  | 22 +++++++++++++++++-----
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/be/src/http/action/http_stream.cpp 
b/be/src/http/action/http_stream.cpp
index bb5ce729df8..272ef928a4e 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -294,6 +294,10 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,
     TStreamLoadPutRequest request;
     if (http_req != nullptr) {
         request.__set_load_sql(http_req->header(HTTP_SQL));
+        if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
+            bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), 
"true");
+            request.__set_memtable_on_sink_node(value);
+        }
     } else {
         request.__set_token(ctx->auth.token);
         request.__set_load_sql(ctx->sql_str);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 1316425df4b..97b6f52e331 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2036,6 +2036,11 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     request.getTbl(),
                     request.getUserIp(), PrivPredicate.LOAD);
         }
+        if (request.isSetMemtableOnSinkNode()) {
+            ctx.getSessionVariable().enableMemtableOnSinkNode = 
request.isMemtableOnSinkNode();
+        } else {
+            ctx.getSessionVariable().enableMemtableOnSinkNode = 
Config.stream_load_default_memtable_on_sink_node;
+        }
         SqlScanner input = new SqlScanner(new StringReader(originStmt), 
ctx.getSessionVariable().getSqlMode());
         SqlParser parser = new SqlParser(input);
         try {
@@ -2057,6 +2062,13 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             Coordinator coord = new Coordinator(ctx, analyzer, 
executor.planner());
             coord.setLoadMemLimit(request.getExecMemLimit());
             coord.setQueryType(TQueryType.LOAD);
+            Table table = parsedStmt.getTargetTable();
+            if (table instanceof OlapTable) {
+                boolean isEnableMemtableOnSinkNode =
+                        ((OlapTable) 
table).getTableProperty().getUseSchemaLightChange()
+                                ? 
coord.getQueryOptions().isEnableMemtableOnSinkNode() : false;
+                
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
+            }
 
             TExecPlanFragmentParams plan = coord.getStreamLoadPlan();
             int loadStreamPerNode = 20;
@@ -2073,11 +2085,11 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             // The txn_id here is obtained from the NativeInsertStmt
             result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id));
             result.getParams().setImportLabel(parsedStmt.getLabel());
-            result.setDbId(parsedStmt.getTargetTable().getDatabase().getId());
-            result.setTableId(parsedStmt.getTargetTable().getId());
-            result.setBaseSchemaVersion(((OlapTable) 
parsedStmt.getTargetTable()).getBaseSchemaVersion());
-            result.setGroupCommitIntervalMs(((OlapTable) 
parsedStmt.getTargetTable()).getGroupCommitIntervalMs());
-            result.setGroupCommitDataBytes(((OlapTable) 
parsedStmt.getTargetTable()).getGroupCommitDataBytes());
+            result.setDbId(table.getDatabase().getId());
+            result.setTableId(table.getId());
+            result.setBaseSchemaVersion(((OlapTable) 
table).getBaseSchemaVersion());
+            result.setGroupCommitIntervalMs(((OlapTable) 
table).getGroupCommitIntervalMs());
+            result.setGroupCommitDataBytes(((OlapTable) 
table).getGroupCommitDataBytes());
             
result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish);
         } catch (UserException e) {
             LOG.warn("exec sql error", e);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to