This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cf739e7496 [Enhancement](Stmt) Set insert_into timeout session 
variable separately (#16343)
cf739e7496 is described below

commit cf739e74962c544a84208cf9c3121c219ddb392f
Author: 奕冷 <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Sun Feb 12 16:56:10 2023 +0800

    [Enhancement](Stmt) Set insert_into timeout session variable separately 
(#16343)
---
 be/src/exprs/runtime_filter.cpp                    |  4 +-
 be/src/runtime/fragment_mgr.cpp                    |  4 +-
 be/src/runtime/result_buffer_mgr.cpp               |  4 +-
 be/src/runtime/result_buffer_mgr.h                 |  2 +-
 be/src/runtime/runtime_state.h                     |  2 +
 be/src/vec/sink/vdata_stream_sender.cpp            |  2 +-
 be/src/vec/sink/vresult_file_sink.cpp              |  2 +-
 be/src/vec/sink/vresult_sink.cpp                   |  2 +-
 be/src/vec/sink/vtablet_sink.cpp                   |  6 +--
 docs/en/docs/advanced/variables.md                 |  7 ++-
 .../import/import-way/insert-into-manual.md        |  4 +-
 .../Manipulation/INSERT.md                         |  2 +-
 .../SET-VARIABLE.md                                |  1 +
 docs/zh-CN/docs/advanced/variables.md              |  6 ++-
 .../import/import-way/insert-into-manual.md        |  4 +-
 .../Manipulation/INSERT.md                         |  2 +-
 .../SET-VARIABLE.md                                |  1 +
 .../java/org/apache/doris/analysis/InsertStmt.java |  2 +-
 .../java/org/apache/doris/qe/ConnectContext.java   | 55 ++++++++++++++++------
 .../java/org/apache/doris/qe/ConnectProcessor.java |  2 +
 .../main/java/org/apache/doris/qe/Coordinator.java |  7 +--
 .../java/org/apache/doris/qe/SessionVariable.java  | 15 ++++++
 .../java/org/apache/doris/qe/StmtExecutor.java     |  9 +++-
 .../org/apache/doris/qe/ConnectContextTest.java    |  8 +++-
 .../org/apache/doris/qe/SessionVariablesTest.java  |  2 +
 .../java/org/apache/doris/qe/VariableMgrTest.java  |  3 ++
 gensrc/thrift/PaloInternalService.thrift           |  4 ++
 .../suites/datev2/ssb_sf0.1_p1/load.groovy         |  4 +-
 .../suites/datev2/ssb_sf1_p2/load.groovy           |  4 +-
 regression-test/suites/ssb_sf0.1_p1/load.groovy    |  4 +-
 regression-test/suites/ssb_sf100_p2/load.groovy    |  4 +-
 regression-test/suites/ssb_sf1_p2/load.groovy      |  4 +-
 tools/ssb-tools/bin/load-ssb-data.sh               |  7 +--
 33 files changed, 129 insertions(+), 60 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c8db62c3b6..7ec1f5a21c 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1159,7 +1159,7 @@ bool IRuntimeFilter::await() {
     DCHECK(is_consumer());
     // bitmap filter is precise filter and only filter once, so it must be 
applied.
     int64_t wait_times_ms = _wrapper->get_real_type() == 
RuntimeFilterType::BITMAP_FILTER
-                                    ? _state->query_options().query_timeout
+                                    ? _state->execution_timeout()
                                     : _state->runtime_filter_wait_time_ms();
     if (_state->enable_pipeline_exec()) {
         auto expected = _rf_state_atomic.load(std::memory_order_acquire);
@@ -1209,7 +1209,7 @@ bool IRuntimeFilter::is_ready_or_timeout() {
     auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
     // bitmap filter is precise filter and only filter once, so it must be 
applied.
     int64_t wait_times_ms = _wrapper->get_real_type() == 
RuntimeFilterType::BITMAP_FILTER
-                                    ? _state->query_options().query_timeout
+                                    ? _state->execution_timeout()
                                     : _state->runtime_filter_wait_time_ms();
     int64_t ms_since_registration = MonotonicMillis() - registration_time_;
     if (!_state->enable_pipeline_exec()) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 654c252d58..c18f3e9866 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -192,7 +192,7 @@ FragmentExecState::FragmentExecState(const TUniqueId& 
query_id,
 
 Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {
     if (params.__isset.query_options) {
-        _timeout_second = params.query_options.query_timeout;
+        _timeout_second = params.query_options.execution_timeout;
     }
 
     if (_fragments_ctx == nullptr) {
@@ -648,7 +648,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
 
         
fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(
                 pipeline_engine_enabled);
-        fragments_ctx->timeout_second = params.query_options.query_timeout;
+        fragments_ctx->timeout_second = params.query_options.execution_timeout;
         _set_scan_concurrency(params, fragments_ctx.get());
 
         bool has_query_mem_tracker =
diff --git a/be/src/runtime/result_buffer_mgr.cpp 
b/be/src/runtime/result_buffer_mgr.cpp
index 35d533e4a2..19fb2522b8 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -55,7 +55,7 @@ Status ResultBufferMgr::init() {
 
 Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int 
buffer_size,
                                       std::shared_ptr<BufferControlBlock>* 
sender,
-                                      bool enable_pipeline, int query_timeout) 
{
+                                      bool enable_pipeline, int exec_timout) {
     *sender = find_control_block(query_id);
     if (*sender != nullptr) {
         LOG(WARNING) << "already have buffer control block for this instance " 
<< query_id;
@@ -78,7 +78,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& 
query_id, int buffer_size
         // otherwise in some case may block all fragment handle threads
         // details see issue https://github.com/apache/doris/issues/16203
         // add extra 5s for avoid corner case
-        int64_t max_timeout = time(nullptr) + query_timeout + 5;
+        int64_t max_timeout = time(nullptr) + exec_timout + 5;
         cancel_at_time(max_timeout, query_id);
     }
     *sender = control_block;
diff --git a/be/src/runtime/result_buffer_mgr.h 
b/be/src/runtime/result_buffer_mgr.h
index 426af75047..250551f42a 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -48,7 +48,7 @@ public:
     // sender is not used when call cancel or unregister
     Status create_sender(const TUniqueId& query_id, int buffer_size,
                          std::shared_ptr<BufferControlBlock>* sender, bool 
enable_pipeline,
-                         int query_timeout);
+                         int exec_timeout);
 
     void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx);
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 7a555e0336..eb185472c7 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -86,6 +86,8 @@ public:
     }
     int max_errors() const { return _query_options.max_errors; }
     int query_timeout() const { return _query_options.query_timeout; }
+    int insert_timeout() const { return _query_options.insert_timeout; }
+    int execution_timeout() const { return _query_options.execution_timeout; }
     int max_io_buffers() const { return _query_options.max_io_buffers; }
     int num_scanner_threads() const { return 
_query_options.num_scanner_threads; }
     TQueryType::type query_type() const { return _query_options.query_type; }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index c7a9e62335..4260882c3c 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -59,7 +59,7 @@ Status Channel::init(RuntimeState* state) {
     _brpc_request.set_sender_id(_parent->_sender_id);
     _brpc_request.set_be_number(_be_number);
 
-    _brpc_timeout_ms = std::min(3600, state->query_options().query_timeout) * 
1000;
+    _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
 
     if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
         _brpc_stub = 
state->exec_env()->brpc_internal_client_cache()->get_client(
diff --git a/be/src/vec/sink/vresult_file_sink.cpp 
b/be/src/vec/sink/vresult_file_sink.cpp
index 5f7e874b09..d85318b297 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -102,7 +102,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
         // create sender
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
                 state->fragment_instance_id(), _buf_size, &_sender, 
state->enable_pipeline_exec(),
-                state->query_timeout()));
+                state->execution_timeout()));
         // create writer
         _writer.reset(new (std::nothrow) VFileResultWriter(
                 _file_opts.get(), _storage_type, 
state->fragment_instance_id(), _output_vexpr_ctxs,
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index af71b76280..b5840b22fd 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -63,7 +63,7 @@ Status VResultSink::prepare(RuntimeState* state) {
     // create sender
     RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
             state->fragment_instance_id(), _buf_size, &_sender, 
state->enable_pipeline_exec(),
-            state->query_timeout()));
+            state->execution_timeout()));
 
     // create writer based on sink type
     switch (_sink_type) {
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 54f761955b..d21b4b4bf5 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -222,7 +222,7 @@ Status VNodeChannel::init(RuntimeState* state) {
         return Status::InternalError("get rpc stub failed");
     }
 
-    _rpc_timeout_ms = state->query_options().query_timeout * 1000;
+    _rpc_timeout_ms = state->execution_timeout() * 1000;
     _timeout_watch.start();
 
     // Initialize _cur_add_block_request
@@ -821,8 +821,8 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
 
     _sender_id = state->per_fragment_instance_idx();
     _num_senders = state->num_per_fragment_instances();
-    _is_high_priority = (state->query_options().query_timeout <=
-                         config::load_task_high_priority_threshold_second);
+    _is_high_priority =
+            (state->execution_timeout() <= 
config::load_task_high_priority_threshold_second);
 
     // profile must add to state's object pool
     _profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink"));
diff --git a/docs/en/docs/advanced/variables.md 
b/docs/en/docs/advanced/variables.md
index b995bc3d44..cc434c6ff6 100644
--- a/docs/en/docs/advanced/variables.md
+++ b/docs/en/docs/advanced/variables.md
@@ -69,6 +69,7 @@ Variables that support both session-level and global-level 
setting include:
 * `sql_mode`
 * `enable_profile`
 * `query_timeout`
+* `insert_timeout`
 * `exec_mem_limit`
 * `batch_size`
 * `parallel_fragment_exec_instance_num`
@@ -358,7 +359,11 @@ Translated with www.DeepL.com/Translator (free version)
     
 * `query_timeout`
 
-    Used to set the query timeout. This variable applies to all query 
statements in the current connection, as well as INSERT statements. The default 
is 5 minutes, in seconds.
+    Used to set the query timeout. This variable applies to all query 
statements in the current connection. Particularly, timeout of INSERT 
statements is recommended to be managed by the insert_timeout below. The 
default is 5 minutes, in seconds.
+
+* `insert_timeout`
+    Used to set the insert timeout. This variable applies to INSERT statements 
particularly in the current connection, and is recommended to manage 
long-duration INSERT action. The default is 4 hours, in seconds. It will lose 
effect when query_timeout is
+    greater than itself to make it compatible with the habits of older version 
users to use query_timeout to control the timeout of INSERT statements.
 
 * `resource_group`
 
diff --git a/docs/en/docs/data-operate/import/import-way/insert-into-manual.md 
b/docs/en/docs/data-operate/import/import-way/insert-into-manual.md
index bd00cbb55c..5337322588 100644
--- a/docs/en/docs/data-operate/import/import-way/insert-into-manual.md
+++ b/docs/en/docs/data-operate/import/import-way/insert-into-manual.md
@@ -189,7 +189,7 @@ This command returns the insert results and the details of 
the corresponding tra
 
   At present, Insert Into does not support custom import timeout time. All 
Insert Into imports have a uniform timeout time. The default timeout time is 1 
hour. If the imported source file cannot complete the import within the 
specified time, the parameter `insert_load_default_timeout_second` of FE needs 
to be adjusted.
 
-  At the same time, the Insert Into statement receives the restriction of the 
Session variable `query_timeout`. You can increase the timeout time by `SET 
query_timeout = xxx;` in seconds.
+  At the same time, the Insert Into statement receives the restriction of the 
Session variable `insert_timeout`. You can increase the timeout time by `SET 
insert_timeout = xxx;` in seconds.
 
 ### Session Variables
 
@@ -199,7 +199,7 @@ This command returns the insert results and the details of 
the corresponding tra
 
 - query u timeout
 
-  Insert Into itself is also an SQL command, so the Insert Into statement is 
also restricted by the Session variable `query_timeout`. You can increase the 
timeout time by `SET query_timeout = xxx;` in seconds.
+  Insert Into itself is also an SQL command, and the Insert Into statement is 
restricted by the Session variable `insert_timeout`. You can increase the 
timeout time by `SET insert_timeout = xxx;` in seconds.
 
 ## Best Practices
 
diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
index eefd424c14..08cc77ab21 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
@@ -226,7 +226,7 @@ Since the previous import methods of Doris are all 
asynchronous import methods,
 
 2. Timeout time
 
-   The timeout for INSERT operations is controlled by [session 
variable](../../../../advanced/variables.md) `query_timeout`. The default is 5 
minutes. If it times out, the job will be canceled.
+   The timeout for INSERT operations is controlled by [session 
variable](../../../../advanced/variables.md) `insert_timeout`. The default is 4 
hours. If it times out, the job will be canceled.
 
 3. Label and atomicity
 
diff --git 
a/docs/en/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
 
b/docs/en/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
index 4467c4aaff..41fa568ca8 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
@@ -58,6 +58,7 @@ Variables that support both the current session and the 
global effect include:
 - `sql_mode`
 - `enable_profile`
 - `query_timeout`
+- `insert_timeout`
 - `exec_mem_limit`
 - `batch_size`
 - `allow_partition_column_nullable`
diff --git a/docs/zh-CN/docs/advanced/variables.md 
b/docs/zh-CN/docs/advanced/variables.md
index dcbdb501e3..6bd438af60 100644
--- a/docs/zh-CN/docs/advanced/variables.md
+++ b/docs/zh-CN/docs/advanced/variables.md
@@ -71,6 +71,7 @@ SET GLOBAL exec_mem_limit = 137438953472
 - `sql_mode`
 - `enable_profile`
 - `query_timeout`
+- `insert_timeout`
 - `exec_mem_limit`
 - `batch_size`
 - `allow_partition_column_nullable`
@@ -355,7 +356,10 @@ SELECT /*+ SET_VAR(query_timeout = 1, 
enable_partition_cache=true) */ sleep(3);
 
 - `query_timeout`
 
-  用于设置查询超时。该变量会作用于当前连接中所有的查询语句,以及 INSERT 语句。默认为 5 分钟,单位为秒。
+  用于设置查询超时。该变量会作用于当前连接中所有的查询语句,对于 INSERT 语句推荐使用insert_timeout。默认为 5 分钟,单位为秒。
+
+- `insert_timeout`
+  用于设置针对 INSERT 语句的超时。该变量仅作用于 INSERT 语句,建议在 INSERT 行为易持续较长时间的场景下设置。默认为 4 
小时,单位为秒。由于旧版本用户会通过延长 query_timeout 来防止 INSERT 语句超时,insert_timeout 在 
query_timeout 大于自身的情况下将会失效, 以兼容旧版本用户的习惯。
 
 - `resource_group`
 
diff --git 
a/docs/zh-CN/docs/data-operate/import/import-way/insert-into-manual.md 
b/docs/zh-CN/docs/data-operate/import/import-way/insert-into-manual.md
index 198e79e02e..ff2f75e54c 100644
--- a/docs/zh-CN/docs/data-operate/import/import-way/insert-into-manual.md
+++ b/docs/zh-CN/docs/data-operate/import/import-way/insert-into-manual.md
@@ -192,7 +192,7 @@ TransactionStatus: VISIBLE
 
   目前 Insert Into 并不支持自定义导入的 timeout 时间,所有 Insert Into 导入的超时时间是统一的,默认的 timeout 
时间为1小时。如果导入的源文件无法在规定时间内完成导入,则需要调整 FE 
的参数```insert_load_default_timeout_second```。
 
-  同时 Insert Into 语句受到 Session 变量 `query_timeout` 的限制。可以通过 `SET query_timeout = 
xxx;` 来增加超时时间,单位是秒。
+  同时 Insert Into 语句受到 Session 变量 `insert_timeout` 的限制。可以通过 `SET insert_timeout 
= xxx;` 来增加超时时间,单位是秒。
 
 ### Session 变量
 
@@ -208,7 +208,7 @@ TransactionStatus: VISIBLE
 
 + query\_timeout
 
-  Insert Into 本身也是一个 SQL 命令,因此 Insert Into 语句也受到 Session 变量 `query_timeout` 
的限制。可以通过 `SET query_timeout = xxx;` 来增加超时时间,单位是秒。
+  Insert Into 本身也是一个 SQL 命令,Insert Into 语句受到 Session 变量 `insert_timeout` 
的限制。可以通过 `SET insert_timeout = xxx;` 来增加超时时间,单位是秒。
 
 ## 最佳实践
 
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
index d19592319c..33018fcd0d 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md
@@ -226,7 +226,7 @@ INSERT INTO test WITH LABEL `label1` (c1, c2) SELECT * from 
test2;
 
 2. 超时时间
 
-   INSERT 操作的超时时间由 [会话变量](../../../../advanced/variables.md) `query_timeout` 
控制。默认为5分钟。超时则作业会被取消。
+   INSERT 操作的超时时间由 [会话变量](../../../../advanced/variables.md) `insert_timeout` 
控制。默认为4小时。超时则作业会被取消。
 
 3. Label 和原子性
 
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
index 796ecf6660..6e77762319 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md
@@ -58,6 +58,7 @@ SET variable_assignment [, variable_assignment] ...
 - `sql_mode`
 - `enable_profile`
 - `query_timeout`
+- `insert_timeout`
 - `exec_mem_limit`
 - `batch_size`
 - `allow_partition_column_nullable`
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index a9269db687..bf587ab97d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -304,7 +304,7 @@ public class InsertStmt extends DdlStmt {
 
         db = 
analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
         // create label and begin transaction
-        long timeoutSecond = 
ConnectContext.get().getSessionVariable().getQueryTimeoutS();
+        long timeoutSecond = ConnectContext.get().getExecTimeout();
         if (Strings.isNullOrEmpty(label)) {
             label = "insert_" + 
DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_");
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 9f4be5fdde..1b79fa6771 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -152,6 +152,15 @@ public class ConnectContext {
 
     private long userQueryTimeout;
 
+    /**
+     * the global execution timeout in seconds, currently set according to 
query_timeout and insert_timeout.
+     * <p>
+     * when a connection is established, exec_timeout is set by query_timeout, 
when the statement is an insert stmt,
+     * then it is set to max(query_timeout, insert_timeout) with {@link 
#resetExecTimeout()} in
+     * {@link ConnectProcessor#handleQuery()} after the StmtExecutor is 
specified.
+     */
+    private int executionTimeoutS;
+
     public void setUserQueryTimeout(long queryTimeout) {
         this.userQueryTimeout = queryTimeout;
     }
@@ -164,7 +173,7 @@ public class ConnectContext {
     }
 
     public void setOrUpdateInsertResult(long txnId, String label, String db, 
String tbl,
-                                        TransactionStatus txnStatus, long 
loadedRows, int filteredRows) {
+            TransactionStatus txnStatus, long loadedRows, int filteredRows) {
         if (isTxnModel() && insertResult != null) {
             insertResult.updateResult(txnStatus, loadedRows, filteredRows);
         } else {
@@ -220,6 +229,8 @@ public class ConnectContext {
         if (Config.use_fuzzy_session_variable) {
             sessionVariable.initFuzzyModeVariables();
         }
+        // initialize executionTimeoutS to default to queryTimeout
+        executionTimeoutS = sessionVariable.getQueryTimeoutS();
     }
 
     public boolean isTxnModel() {
@@ -568,7 +579,7 @@ public class ConnectContext {
         boolean killFlag = false;
         boolean killConnection = false;
         if (command == MysqlCommand.COM_SLEEP) {
-            if (delta > sessionVariable.getWaitTimeoutS() * 1000) {
+            if (delta > sessionVariable.getWaitTimeoutS() * 1000L) {
                 // Need kill this connection.
                 LOG.warn("kill wait timeout connection, remote: {}, wait 
timeout: {}",
                         getMysqlChannel().getRemoteHostPortString(), 
sessionVariable.getWaitTimeoutS());
@@ -577,25 +588,27 @@ public class ConnectContext {
                 killConnection = true;
             }
         } else {
+            long timeout;
+            String timeoutTag = "query";
             if (userQueryTimeout > 0) {
                 // user set query_timeout property
-                if (delta > userQueryTimeout * 1000) {
-                    LOG.warn("kill query timeout, remote: {}, query timeout: 
{}",
-                            getMysqlChannel().getRemoteHostPortString(), 
userQueryTimeout);
-
-                    killFlag = true;
-                }
+                timeout = userQueryTimeout * 1000L;
             } else {
-                // default use session query_timeout
-                if (delta > sessionVariable.getQueryTimeoutS() * 1000) {
-                    LOG.warn("kill query timeout, remote: {}, query timeout: 
{}",
-                            getMysqlChannel().getRemoteHostPortString(), 
sessionVariable.getQueryTimeoutS());
+                //to ms
+                timeout = executionTimeoutS * 1000L;
+            }
+            //deal with insert stmt particularly
+            if (executor != null && executor.isInsertStmt()) {
+                timeoutTag = "insert";
+            }
 
-                    // Only kill
-                    killFlag = true;
-                }
+            if (delta > timeout) {
+                LOG.warn("kill {} timeout, remote: {}, query timeout: {}",
+                        timeoutTag, 
getMysqlChannel().getRemoteHostPortString(), timeout);
+                killFlag = true;
             }
         }
+
         if (killFlag) {
             kill(killConnection);
         }
@@ -635,6 +648,18 @@ public class ConnectContext {
         return mysqlChannel == null ? "" : mysqlChannel.getRemoteIp();
     }
 
+    public void resetExecTimeout() {
+        if (executor != null && executor.isInsertStmt()) {
+            // particular timeout for insert stmt, we can make other 
particular timeout in the same way.
+            // set the execution timeout as max(insert_timeout,query_timeout) 
to be compatible with older versions
+            executionTimeoutS = Math.max(sessionVariable.getInsertTimeoutS(), 
executionTimeoutS);
+        }
+    }
+
+    public int getExecTimeout() {
+        return executionTimeoutS;
+    }
+
     public class ThreadInfo {
         public boolean isFull;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 00f047b318..2493ed2010 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -408,6 +408,8 @@ public class ConnectProcessor {
             parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
             executor = new StmtExecutor(ctx, parsedStmt);
             ctx.setExecutor(executor);
+            // reset the executionTimeout corresponding with the StmtExecutor
+            ctx.resetExecTimeout();
 
             try {
                 executor.execute();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 77775c713c..4d29d94333 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -235,7 +235,7 @@ public class Coordinator {
     private final TUniqueId nextInstanceId;
 
     // a timestamp represent the absolute timeout
-    // eg, System.currentTimeMillis() + query_timeout * 1000
+    // eg, System.currentTimeMillis() + executeTimeoutS * 1000
     private long timeoutDeadline;
 
     private boolean enableShareHashTableForBroadcastJoin = false;
@@ -389,6 +389,7 @@ public class Coordinator {
         
this.queryOptions.setEnableVectorizedEngine(VectorizedUtil.isVectorized());
         this.queryOptions.setEnablePipelineEngine(VectorizedUtil.isPipeline());
         this.queryOptions.setBeExecVersion(Config.be_exec_version);
+        this.queryOptions.setExecutionTimeout(context.getExecTimeout());
     }
 
     public long getJobId() {
@@ -585,7 +586,7 @@ public class Coordinator {
         PlanFragmentId topId = fragments.get(0).getFragmentId();
         FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
         DataSink topDataSink = topParams.fragment.getSink();
-        this.timeoutDeadline = System.currentTimeMillis() + 
queryOptions.query_timeout * 1000L;
+        this.timeoutDeadline = System.currentTimeMillis() + 
queryOptions.getExecutionTimeout() * 1000L;
         if (topDataSink instanceof ResultSink || topDataSink instanceof 
ResultFileSink) {
             TNetworkAddress execBeAddr = 
topParams.instanceExecParams.get(0).host;
             receiver = new 
ResultReceiver(topParams.instanceExecParams.get(0).instanceId,
@@ -785,7 +786,7 @@ public class Coordinator {
             String operation) throws RpcException, UserException {
         if (leftTimeMs <= 0) {
             throw new UserException("timeout before waiting for " + operation 
+ " RPC. Elapse(sec): " + (
-                    (System.currentTimeMillis() - timeoutDeadline) / 1000 + 
queryOptions.query_timeout));
+                    (System.currentTimeMillis() - timeoutDeadline) / 1000 + 
queryOptions.getExecutionTimeout()));
         }
 
         long timeoutMs = Math.min(leftTimeMs, 
Config.remote_fragment_exec_timeout_ms);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 28c2028226..5be52b1097 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -57,6 +57,7 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
     public static final String QUERY_TIMEOUT = "query_timeout";
+    public static final String INSERT_TIMEOUT = "insert_timeout";
     public static final String ENABLE_PROFILE = "enable_profile";
     public static final String SQL_MODE = "sql_mode";
     public static final String RESOURCE_VARIABLE = "resource_group";
@@ -293,6 +294,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = QUERY_TIMEOUT)
     public int queryTimeoutS = 300;
 
+    @VariableMgr.VarAttr(name = INSERT_TIMEOUT)
+    public int insertTimeoutS = 14400;
+
     // if true, need report to coordinator when plan fragment execute 
successfully.
     @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
     public boolean enableProfile = false;
@@ -820,6 +824,14 @@ public class SessionVariable implements Serializable, 
Writable {
         return queryTimeoutS;
     }
 
+    public int getInsertTimeoutS() {
+        return insertTimeoutS;
+    }
+
+    public void setInsertTimeoutS(int insertTimeoutS) {
+        this.insertTimeoutS = insertTimeoutS;
+    }
+
     public boolean enableProfile() {
         return enableProfile;
     }
@@ -1674,6 +1686,9 @@ public class SessionVariable implements Serializable, 
Writable {
         if (queryOptions.isSetQueryTimeout()) {
             setQueryTimeoutS(queryOptions.getQueryTimeout());
         }
+        if (queryOptions.isSetInsertTimeout()) {
+            setInsertTimeoutS(queryOptions.getInsertTimeout());
+        }
     }
 
     /**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 86bf8945a6..bef4253baf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -393,6 +393,10 @@ public class StmtExecutor implements ProfileWriter {
         return parsedStmt != null && parsedStmt instanceof QueryStmt;
     }
 
+    public boolean isInsertStmt() {
+        return parsedStmt != null && parsedStmt instanceof InsertStmt;
+    }
+
     /**
      * Used for audit in ConnectProcessor.
      * <p>
@@ -1562,8 +1566,9 @@ public class StmtExecutor implements ProfileWriter {
                 QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), 
coord);
 
                 coord.exec();
-
-                boolean notTimeout = 
coord.join(context.getSessionVariable().getQueryTimeoutS());
+                int execTimeout = context.getExecTimeout();
+                LOG.debug("Insert execution timeout:{}", execTimeout);
+                boolean notTimeout = coord.join(execTimeout);
                 if (!coord.isDone()) {
                     coord.cancel();
                     if (notTimeout) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
index f8c94694d4..fdee7a7a37 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
@@ -196,12 +196,16 @@ public class ConnectContextTest {
 
         // sleep no time out
         Assert.assertFalse(ctx.isKilled());
-        long now = ctx.getSessionVariable().getQueryTimeoutS() * 1000 - 1;
+        ctx.setExecutor(executor);
+        ctx.resetExecTimeout();
+        long now = ctx.getExecTimeout() * 1000L - 1;
         ctx.checkTimeout(now);
         Assert.assertFalse(ctx.isKilled());
 
         // Timeout
-        now = ctx.getSessionVariable().getQueryTimeoutS() * 1000 + 1;
+        ctx.setExecutor(executor);
+        ctx.resetExecTimeout();
+        now = ctx.getExecTimeout() * 1000L + 1;
         ctx.checkTimeout(now);
         Assert.assertFalse(ctx.isKilled());
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
index 947e83d95d..5193dd9a82 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
@@ -79,8 +79,10 @@ public class SessionVariablesTest extends TestWithFeService {
         Assertions.assertTrue(queryOptions.isSetQueryTimeout());
 
         queryOptions.setQueryTimeout(123);
+        queryOptions.setInsertTimeout(123);
         sessionVariable.setForwardedSessionVariables(queryOptions);
         Assertions.assertEquals(123, sessionVariable.getQueryTimeoutS());
+        Assertions.assertEquals(123, sessionVariable.getInsertTimeoutS());
     }
 
     @Test
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
index 1067a0c1a3..71740f061f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java
@@ -64,6 +64,7 @@ public class VariableMgrTest {
         long originExecMemLimit = var.getMaxExecMemByte();
         boolean originEnableProfile = var.enableProfile();
         long originQueryTimeOut = var.getQueryTimeoutS();
+        final int originInsertTimeout = var.getInsertTimeoutS();
 
         List<List<String>> rows = VariableMgr.dump(SetType.SESSION, var, null);
         Assert.assertTrue(rows.size() > 5);
@@ -76,6 +77,8 @@ public class VariableMgrTest {
                 Assert.assertEquals(String.valueOf(originQueryTimeOut), 
row.get(1));
             } else if (row.get(0).equalsIgnoreCase("sql_mode")) {
                 Assert.assertEquals("", row.get(1));
+            } else if (row.get(0).equalsIgnoreCase("insert_timeout")) {
+                Assert.assertEquals(String.valueOf(originInsertTimeout), 
row.get(1));
             }
         }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 65471d8f5c..f3befbb80a 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -194,6 +194,10 @@ struct TQueryOptions {
   59: optional i32 partitioned_hash_agg_rows_threshold = 0
 
   60: optional bool enable_file_cache = true
+  
+  61: optional i32 insert_timeout = 14400
+
+  62: optional i32 execution_timeout = 3600
 }
     
 
diff --git a/regression-test/suites/datev2/ssb_sf0.1_p1/load.groovy 
b/regression-test/suites/datev2/ssb_sf0.1_p1/load.groovy
index b851c13c4d..6ed3a68b91 100644
--- a/regression-test/suites/datev2/ssb_sf0.1_p1/load.groovy
+++ b/regression-test/suites/datev2/ssb_sf0.1_p1/load.groovy
@@ -88,8 +88,8 @@ suite("load") {
     def rowCount = sql "select count(*) from ${table}"
     if (rowCount[0][0] != table_rows) {
         sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
-        sql "set global query_timeout=3600"
-        def r = sql "select @@query_timeout"
+        sql "set global insert_timeout=3600"
+        def r = sql "select @@insert_timeout"
         assertEquals(3600, r[0][0])
         year_cons = [
             'lo_orderdate<19930101',
diff --git a/regression-test/suites/datev2/ssb_sf1_p2/load.groovy 
b/regression-test/suites/datev2/ssb_sf1_p2/load.groovy
index fe0cead6f4..5006e1a384 100644
--- a/regression-test/suites/datev2/ssb_sf1_p2/load.groovy
+++ b/regression-test/suites/datev2/ssb_sf1_p2/load.groovy
@@ -88,8 +88,8 @@ suite("load") {
     def rowCount = sql "select count(*) from ${table}"
     if (rowCount[0][0] != table_rows) {
         sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
-        sql "set global query_timeout=3600"
-        def r = sql "select @@query_timeout"
+        sql "set global insert_timeout=3600"
+        def r = sql "select @@insert_timeout"
         assertEquals(3600, r[0][0])
         year_cons = [
             'lo_orderdate<19930101',
diff --git a/regression-test/suites/ssb_sf0.1_p1/load.groovy 
b/regression-test/suites/ssb_sf0.1_p1/load.groovy
index 6baf4d2bb0..0c81fb4a6f 100644
--- a/regression-test/suites/ssb_sf0.1_p1/load.groovy
+++ b/regression-test/suites/ssb_sf0.1_p1/load.groovy
@@ -87,8 +87,8 @@ suite("load") {
     def rowCount = sql "select count(*) from ${table}"
     if (rowCount[0][0] != table_rows) {
         sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
-        sql "set global query_timeout=3600"
-        def r = sql "select @@query_timeout"
+        sql "set global insert_timeout=3600"
+        def r = sql "select @@insert_timeout"
         assertEquals(3600, r[0][0])
         year_cons = [
             'lo_orderdate<19930101',
diff --git a/regression-test/suites/ssb_sf100_p2/load.groovy 
b/regression-test/suites/ssb_sf100_p2/load.groovy
index 492e121027..bcf423f7d4 100644
--- a/regression-test/suites/ssb_sf100_p2/load.groovy
+++ b/regression-test/suites/ssb_sf100_p2/load.groovy
@@ -77,9 +77,9 @@ suite('load') {
     def rowCount = sql "select count(*) from ${table}"
     if (rowCount[0][0] != table_rows) {
         sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
-        sql "set global query_timeout=3600"
+        sql "set global insert_timeout=3600"
         sql "sync"
-        def r = sql "select @@query_timeout"
+        def r = sql "select @@insert_timeout"
         assertEquals(3600, r[0][0])
         year_cons = [
             'lo_orderdate<19930101',
diff --git a/regression-test/suites/ssb_sf1_p2/load.groovy 
b/regression-test/suites/ssb_sf1_p2/load.groovy
index fdaaad5c3b..ee0163b641 100644
--- a/regression-test/suites/ssb_sf1_p2/load.groovy
+++ b/regression-test/suites/ssb_sf1_p2/load.groovy
@@ -89,8 +89,8 @@ suite("load") {
     def rowCount = sql "select count(*) from ${table}"
     if (rowCount[0][0] != table_rows) {
         sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
-        sql "set global query_timeout=3600"
-        def r = sql "select @@query_timeout"
+        sql "set global insert_timeout=3600"
+        def r = sql "select @@insert_timeout"
         assertEquals(3600, r[0][0])
         year_cons = [
             'lo_orderdate<19930101',
diff --git a/tools/ssb-tools/bin/load-ssb-data.sh 
b/tools/ssb-tools/bin/load-ssb-data.sh
index be5c550f93..db958b3921 100755
--- a/tools/ssb-tools/bin/load-ssb-data.sh
+++ b/tools/ssb-tools/bin/load-ssb-data.sh
@@ -257,16 +257,12 @@ date
 
 echo "==========Start to insert data into ssb flat table=========="
 echo "change some session variables before load, and then restore after load."
-origin_query_timeout=$(
-    set -e
-    run_sql 'select @@query_timeout;' | sed -n '3p'
-)
 origin_parallel=$(
     set -e
     run_sql 'select @@parallel_fragment_exec_instance_num;' | sed -n '3p'
 )
 # set parallel_fragment_exec_instance_num=1, loading maybe slow but stable.
-run_sql "set global query_timeout=7200;"
+run_sql "set global insert_timeout=7200;"
 run_sql "set global parallel_fragment_exec_instance_num=1;"
 echo '============================================'
 date
@@ -274,7 +270,6 @@ load_lineitem_flat
 date
 echo '============================================'
 echo "restore session variables"
-run_sql "set global query_timeout=${origin_query_timeout};"
 run_sql "set global parallel_fragment_exec_instance_num=${origin_parallel};"
 echo '============================================'
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to