This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6d0c1ad6328b4443934dfa489609cc9c8148c669 Author: meiyi <myime...@gmail.com> AuthorDate: Thu Mar 7 11:13:31 2024 +0800 [fix](http stream) http stream support memtable_on_sink_node header (#31866) --- be/src/http/action/http_stream.cpp | 4 ++++ .../apache/doris/service/FrontendServiceImpl.java | 22 +++++++++++++++++----- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index bb5ce729df8..272ef928a4e 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -294,6 +294,10 @@ Status HttpStreamAction::process_put(HttpRequest* http_req, TStreamLoadPutRequest request; if (http_req != nullptr) { request.__set_load_sql(http_req->header(HTTP_SQL)); + if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { + bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); + request.__set_memtable_on_sink_node(value); + } } else { request.__set_token(ctx->auth.token); request.__set_load_sql(ctx->sql_str); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 1316425df4b..97b6f52e331 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2036,6 +2036,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); } + if (request.isSetMemtableOnSinkNode()) { + ctx.getSessionVariable().enableMemtableOnSinkNode = request.isMemtableOnSinkNode(); + } else { + ctx.getSessionVariable().enableMemtableOnSinkNode = Config.stream_load_default_memtable_on_sink_node; + } SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode()); SqlParser parser = new SqlParser(input); try { @@ -2057,6 +2062,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { Coordinator coord = new Coordinator(ctx, analyzer, executor.planner()); coord.setLoadMemLimit(request.getExecMemLimit()); coord.setQueryType(TQueryType.LOAD); + Table table = parsedStmt.getTargetTable(); + if (table instanceof OlapTable) { + boolean isEnableMemtableOnSinkNode = + ((OlapTable) table).getTableProperty().getUseSchemaLightChange() + ? coord.getQueryOptions().isEnableMemtableOnSinkNode() : false; + coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode); + } TExecPlanFragmentParams plan = coord.getStreamLoadPlan(); int loadStreamPerNode = 20; @@ -2073,11 +2085,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { // The txn_id here is obtained from the NativeInsertStmt result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id)); result.getParams().setImportLabel(parsedStmt.getLabel()); - result.setDbId(parsedStmt.getTargetTable().getDatabase().getId()); - result.setTableId(parsedStmt.getTargetTable().getId()); - result.setBaseSchemaVersion(((OlapTable) parsedStmt.getTargetTable()).getBaseSchemaVersion()); - result.setGroupCommitIntervalMs(((OlapTable) parsedStmt.getTargetTable()).getGroupCommitIntervalMs()); - result.setGroupCommitDataBytes(((OlapTable) parsedStmt.getTargetTable()).getGroupCommitDataBytes()); + result.setDbId(table.getDatabase().getId()); + result.setTableId(table.getId()); + result.setBaseSchemaVersion(((OlapTable) table).getBaseSchemaVersion()); + result.setGroupCommitIntervalMs(((OlapTable) table).getGroupCommitIntervalMs()); + result.setGroupCommitDataBytes(((OlapTable) table).getGroupCommitDataBytes()); result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish); } catch (UserException e) { LOG.warn("exec sql error", e); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org