This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new cf739e7496 [Enhancement](Stmt) Set insert_into timeout session variable separately (#16343) cf739e7496 is described below commit cf739e74962c544a84208cf9c3121c219ddb392f Author: 奕冷 <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Sun Feb 12 16:56:10 2023 +0800 [Enhancement](Stmt) Set insert_into timeout session variable separately (#16343) --- be/src/exprs/runtime_filter.cpp | 4 +- be/src/runtime/fragment_mgr.cpp | 4 +- be/src/runtime/result_buffer_mgr.cpp | 4 +- be/src/runtime/result_buffer_mgr.h | 2 +- be/src/runtime/runtime_state.h | 2 + be/src/vec/sink/vdata_stream_sender.cpp | 2 +- be/src/vec/sink/vresult_file_sink.cpp | 2 +- be/src/vec/sink/vresult_sink.cpp | 2 +- be/src/vec/sink/vtablet_sink.cpp | 6 +-- docs/en/docs/advanced/variables.md | 7 ++- .../import/import-way/insert-into-manual.md | 4 +- .../Manipulation/INSERT.md | 2 +- .../SET-VARIABLE.md | 1 + docs/zh-CN/docs/advanced/variables.md | 6 ++- .../import/import-way/insert-into-manual.md | 4 +- .../Manipulation/INSERT.md | 2 +- .../SET-VARIABLE.md | 1 + .../java/org/apache/doris/analysis/InsertStmt.java | 2 +- .../java/org/apache/doris/qe/ConnectContext.java | 55 ++++++++++++++++------ .../java/org/apache/doris/qe/ConnectProcessor.java | 2 + .../main/java/org/apache/doris/qe/Coordinator.java | 7 +-- .../java/org/apache/doris/qe/SessionVariable.java | 15 ++++++ .../java/org/apache/doris/qe/StmtExecutor.java | 9 +++- .../org/apache/doris/qe/ConnectContextTest.java | 8 +++- .../org/apache/doris/qe/SessionVariablesTest.java | 2 + .../java/org/apache/doris/qe/VariableMgrTest.java | 3 ++ gensrc/thrift/PaloInternalService.thrift | 4 ++ .../suites/datev2/ssb_sf0.1_p1/load.groovy | 4 +- .../suites/datev2/ssb_sf1_p2/load.groovy | 4 +- regression-test/suites/ssb_sf0.1_p1/load.groovy | 4 +- regression-test/suites/ssb_sf100_p2/load.groovy | 4 +- regression-test/suites/ssb_sf1_p2/load.groovy | 4 +- tools/ssb-tools/bin/load-ssb-data.sh | 7 +-- 33 files changed, 129 insertions(+), 60 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index c8db62c3b6..7ec1f5a21c 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1159,7 +1159,7 @@ bool IRuntimeFilter::await() { DCHECK(is_consumer()); // bitmap filter is precise filter and only filter once, so it must be applied. int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER - ? _state->query_options().query_timeout + ? _state->execution_timeout() : _state->runtime_filter_wait_time_ms(); if (_state->enable_pipeline_exec()) { auto expected = _rf_state_atomic.load(std::memory_order_acquire); @@ -1209,7 +1209,7 @@ bool IRuntimeFilter::is_ready_or_timeout() { auto cur_state = _rf_state_atomic.load(std::memory_order_acquire); // bitmap filter is precise filter and only filter once, so it must be applied. int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER - ? _state->query_options().query_timeout + ? _state->execution_timeout() : _state->runtime_filter_wait_time_ms(); int64_t ms_since_registration = MonotonicMillis() - registration_time_; if (!_state->enable_pipeline_exec()) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 654c252d58..c18f3e9866 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -192,7 +192,7 @@ FragmentExecState::FragmentExecState(const TUniqueId& query_id, Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) { if (params.__isset.query_options) { - _timeout_second = params.query_options.query_timeout; + _timeout_second = params.query_options.execution_timeout; } if (_fragments_ctx == nullptr) { @@ -648,7 +648,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled( pipeline_engine_enabled); - fragments_ctx->timeout_second = params.query_options.query_timeout; + fragments_ctx->timeout_second = params.query_options.execution_timeout; _set_scan_concurrency(params, fragments_ctx.get()); bool has_query_mem_tracker = diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index 35d533e4a2..19fb2522b8 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -55,7 +55,7 @@ Status ResultBufferMgr::init() { Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size, std::shared_ptr<BufferControlBlock>* sender, - bool enable_pipeline, int query_timeout) { + bool enable_pipeline, int exec_timout) { *sender = find_control_block(query_id); if (*sender != nullptr) { LOG(WARNING) << "already have buffer control block for this instance " << query_id; @@ -78,7 +78,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size // otherwise in some case may block all fragment handle threads // details see issue https://github.com/apache/doris/issues/16203 // add extra 5s for avoid corner case - int64_t max_timeout = time(nullptr) + query_timeout + 5; + int64_t max_timeout = time(nullptr) + exec_timout + 5; cancel_at_time(max_timeout, query_id); } *sender = control_block; diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index 426af75047..250551f42a 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -48,7 +48,7 @@ public: // sender is not used when call cancel or unregister Status create_sender(const TUniqueId& query_id, int buffer_size, std::shared_ptr<BufferControlBlock>* sender, bool enable_pipeline, - int query_timeout); + int exec_timeout); void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 7a555e0336..eb185472c7 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -86,6 +86,8 @@ public: } int max_errors() const { return _query_options.max_errors; } int query_timeout() const { return _query_options.query_timeout; } + int insert_timeout() const { return _query_options.insert_timeout; } + int execution_timeout() const { return _query_options.execution_timeout; } int max_io_buffers() const { return _query_options.max_io_buffers; } int num_scanner_threads() const { return _query_options.num_scanner_threads; } TQueryType::type query_type() const { return _query_options.query_type; } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index c7a9e62335..4260882c3c 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -59,7 +59,7 @@ Status Channel::init(RuntimeState* state) { _brpc_request.set_sender_id(_parent->_sender_id); _brpc_request.set_be_number(_be_number); - _brpc_timeout_ms = std::min(3600, state->query_options().query_timeout) * 1000; + _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000; if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) { _brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client( diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index 5f7e874b09..d85318b297 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -102,7 +102,7 @@ Status VResultFileSink::prepare(RuntimeState* state) { // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec(), - state->query_timeout())); + state->execution_timeout())); // create writer _writer.reset(new (std::nothrow) VFileResultWriter( _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs, diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index af71b76280..b5840b22fd 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -63,7 +63,7 @@ Status VResultSink::prepare(RuntimeState* state) { // create sender RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( state->fragment_instance_id(), _buf_size, &_sender, state->enable_pipeline_exec(), - state->query_timeout())); + state->execution_timeout())); // create writer based on sink type switch (_sink_type) { diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 54f761955b..d21b4b4bf5 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -222,7 +222,7 @@ Status VNodeChannel::init(RuntimeState* state) { return Status::InternalError("get rpc stub failed"); } - _rpc_timeout_ms = state->query_options().query_timeout * 1000; + _rpc_timeout_ms = state->execution_timeout() * 1000; _timeout_watch.start(); // Initialize _cur_add_block_request @@ -821,8 +821,8 @@ Status VOlapTableSink::prepare(RuntimeState* state) { _sender_id = state->per_fragment_instance_idx(); _num_senders = state->num_per_fragment_instances(); - _is_high_priority = (state->query_options().query_timeout <= - config::load_task_high_priority_threshold_second); + _is_high_priority = + (state->execution_timeout() <= config::load_task_high_priority_threshold_second); // profile must add to state's object pool _profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink")); diff --git a/docs/en/docs/advanced/variables.md b/docs/en/docs/advanced/variables.md index b995bc3d44..cc434c6ff6 100644 --- a/docs/en/docs/advanced/variables.md +++ b/docs/en/docs/advanced/variables.md @@ -69,6 +69,7 @@ Variables that support both session-level and global-level setting include: * `sql_mode` * `enable_profile` * `query_timeout` +* `insert_timeout` * `exec_mem_limit` * `batch_size` * `parallel_fragment_exec_instance_num` @@ -358,7 +359,11 @@ Translated with www.DeepL.com/Translator (free version) * `query_timeout` - Used to set the query timeout. This variable applies to all query statements in the current connection, as well as INSERT statements. The default is 5 minutes, in seconds. + Used to set the query timeout. This variable applies to all query statements in the current connection. Particularly, timeout of INSERT statements is recommended to be managed by the insert_timeout below. The default is 5 minutes, in seconds. + +* `insert_timeout` + Used to set the insert timeout. This variable applies to INSERT statements particularly in the current connection, and is recommended to manage long-duration INSERT action. The default is 4 hours, in seconds. It will lose effect when query_timeout is + greater than itself to make it compatible with the habits of older version users to use query_timeout to control the timeout of INSERT statements. * `resource_group` diff --git a/docs/en/docs/data-operate/import/import-way/insert-into-manual.md b/docs/en/docs/data-operate/import/import-way/insert-into-manual.md index bd00cbb55c..5337322588 100644 --- a/docs/en/docs/data-operate/import/import-way/insert-into-manual.md +++ b/docs/en/docs/data-operate/import/import-way/insert-into-manual.md @@ -189,7 +189,7 @@ This command returns the insert results and the details of the corresponding tra At present, Insert Into does not support custom import timeout time. All Insert Into imports have a uniform timeout time. The default timeout time is 1 hour. If the imported source file cannot complete the import within the specified time, the parameter `insert_load_default_timeout_second` of FE needs to be adjusted. - At the same time, the Insert Into statement receives the restriction of the Session variable `query_timeout`. You can increase the timeout time by `SET query_timeout = xxx;` in seconds. + At the same time, the Insert Into statement receives the restriction of the Session variable `insert_timeout`. You can increase the timeout time by `SET insert_timeout = xxx;` in seconds. ### Session Variables @@ -199,7 +199,7 @@ This command returns the insert results and the details of the corresponding tra - query u timeout - Insert Into itself is also an SQL command, so the Insert Into statement is also restricted by the Session variable `query_timeout`. You can increase the timeout time by `SET query_timeout = xxx;` in seconds. + Insert Into itself is also an SQL command, and the Insert Into statement is restricted by the Session variable `insert_timeout`. You can increase the timeout time by `SET insert_timeout = xxx;` in seconds. ## Best Practices diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md index eefd424c14..08cc77ab21 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md @@ -226,7 +226,7 @@ Since the previous import methods of Doris are all asynchronous import methods, 2. Timeout time - The timeout for INSERT operations is controlled by [session variable](../../../../advanced/variables.md) `query_timeout`. The default is 5 minutes. If it times out, the job will be canceled. + The timeout for INSERT operations is controlled by [session variable](../../../../advanced/variables.md) `insert_timeout`. The default is 4 hours. If it times out, the job will be canceled. 3. Label and atomicity diff --git a/docs/en/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md b/docs/en/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md index 4467c4aaff..41fa568ca8 100644 --- a/docs/en/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md +++ b/docs/en/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md @@ -58,6 +58,7 @@ Variables that support both the current session and the global effect include: - `sql_mode` - `enable_profile` - `query_timeout` +- `insert_timeout` - `exec_mem_limit` - `batch_size` - `allow_partition_column_nullable` diff --git a/docs/zh-CN/docs/advanced/variables.md b/docs/zh-CN/docs/advanced/variables.md index dcbdb501e3..6bd438af60 100644 --- a/docs/zh-CN/docs/advanced/variables.md +++ b/docs/zh-CN/docs/advanced/variables.md @@ -71,6 +71,7 @@ SET GLOBAL exec_mem_limit = 137438953472 - `sql_mode` - `enable_profile` - `query_timeout` +- `insert_timeout` - `exec_mem_limit` - `batch_size` - `allow_partition_column_nullable` @@ -355,7 +356,10 @@ SELECT /*+ SET_VAR(query_timeout = 1, enable_partition_cache=true) */ sleep(3); - `query_timeout` - 用于设置查询超时。该变量会作用于当前连接中所有的查询语句,以及 INSERT 语句。默认为 5 分钟,单位为秒。 + 用于设置查询超时。该变量会作用于当前连接中所有的查询语句,对于 INSERT 语句推荐使用insert_timeout。默认为 5 分钟,单位为秒。 + +- `insert_timeout` + 用于设置针对 INSERT 语句的超时。该变量仅作用于 INSERT 语句,建议在 INSERT 行为易持续较长时间的场景下设置。默认为 4 小时,单位为秒。由于旧版本用户会通过延长 query_timeout 来防止 INSERT 语句超时,insert_timeout 在 query_timeout 大于自身的情况下将会失效, 以兼容旧版本用户的习惯。 - `resource_group` diff --git a/docs/zh-CN/docs/data-operate/import/import-way/insert-into-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/insert-into-manual.md index 198e79e02e..ff2f75e54c 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/insert-into-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/insert-into-manual.md @@ -192,7 +192,7 @@ TransactionStatus: VISIBLE 目前 Insert Into 并不支持自定义导入的 timeout 时间,所有 Insert Into 导入的超时时间是统一的,默认的 timeout 时间为1小时。如果导入的源文件无法在规定时间内完成导入,则需要调整 FE 的参数```insert_load_default_timeout_second```。 - 同时 Insert Into 语句受到 Session 变量 `query_timeout` 的限制。可以通过 `SET query_timeout = xxx;` 来增加超时时间,单位是秒。 + 同时 Insert Into 语句受到 Session 变量 `insert_timeout` 的限制。可以通过 `SET insert_timeout = xxx;` 来增加超时时间,单位是秒。 ### Session 变量 @@ -208,7 +208,7 @@ TransactionStatus: VISIBLE + query\_timeout - Insert Into 本身也是一个 SQL 命令,因此 Insert Into 语句也受到 Session 变量 `query_timeout` 的限制。可以通过 `SET query_timeout = xxx;` 来增加超时时间,单位是秒。 + Insert Into 本身也是一个 SQL 命令,Insert Into 语句受到 Session 变量 `insert_timeout` 的限制。可以通过 `SET insert_timeout = xxx;` 来增加超时时间,单位是秒。 ## 最佳实践 diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md index d19592319c..33018fcd0d 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md @@ -226,7 +226,7 @@ INSERT INTO test WITH LABEL `label1` (c1, c2) SELECT * from test2; 2. 超时时间 - INSERT 操作的超时时间由 [会话变量](../../../../advanced/variables.md) `query_timeout` 控制。默认为5分钟。超时则作业会被取消。 + INSERT 操作的超时时间由 [会话变量](../../../../advanced/variables.md) `insert_timeout` 控制。默认为4小时。超时则作业会被取消。 3. Label 和原子性 diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md b/docs/zh-CN/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md index 796ecf6660..6e77762319 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Database-Administration-Statements/SET-VARIABLE.md @@ -58,6 +58,7 @@ SET variable_assignment [, variable_assignment] ... - `sql_mode` - `enable_profile` - `query_timeout` +- `insert_timeout` - `exec_mem_limit` - `batch_size` - `allow_partition_column_nullable` diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java index a9269db687..bf587ab97d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -304,7 +304,7 @@ public class InsertStmt extends DdlStmt { db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); // create label and begin transaction - long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS(); + long timeoutSecond = ConnectContext.get().getExecTimeout(); if (Strings.isNullOrEmpty(label)) { label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 9f4be5fdde..1b79fa6771 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -152,6 +152,15 @@ public class ConnectContext { private long userQueryTimeout; + /** + * the global execution timeout in seconds, currently set according to query_timeout and insert_timeout. + * <p> + * when a connection is established, exec_timeout is set by query_timeout, when the statement is an insert stmt, + * then it is set to max(query_timeout, insert_timeout) with {@link #resetExecTimeout()} in + * {@link ConnectProcessor#handleQuery()} after the StmtExecutor is specified. + */ + private int executionTimeoutS; + public void setUserQueryTimeout(long queryTimeout) { this.userQueryTimeout = queryTimeout; } @@ -164,7 +173,7 @@ public class ConnectContext { } public void setOrUpdateInsertResult(long txnId, String label, String db, String tbl, - TransactionStatus txnStatus, long loadedRows, int filteredRows) { + TransactionStatus txnStatus, long loadedRows, int filteredRows) { if (isTxnModel() && insertResult != null) { insertResult.updateResult(txnStatus, loadedRows, filteredRows); } else { @@ -220,6 +229,8 @@ public class ConnectContext { if (Config.use_fuzzy_session_variable) { sessionVariable.initFuzzyModeVariables(); } + // initialize executionTimeoutS to default to queryTimeout + executionTimeoutS = sessionVariable.getQueryTimeoutS(); } public boolean isTxnModel() { @@ -568,7 +579,7 @@ public class ConnectContext { boolean killFlag = false; boolean killConnection = false; if (command == MysqlCommand.COM_SLEEP) { - if (delta > sessionVariable.getWaitTimeoutS() * 1000) { + if (delta > sessionVariable.getWaitTimeoutS() * 1000L) { // Need kill this connection. LOG.warn("kill wait timeout connection, remote: {}, wait timeout: {}", getMysqlChannel().getRemoteHostPortString(), sessionVariable.getWaitTimeoutS()); @@ -577,25 +588,27 @@ public class ConnectContext { killConnection = true; } } else { + long timeout; + String timeoutTag = "query"; if (userQueryTimeout > 0) { // user set query_timeout property - if (delta > userQueryTimeout * 1000) { - LOG.warn("kill query timeout, remote: {}, query timeout: {}", - getMysqlChannel().getRemoteHostPortString(), userQueryTimeout); - - killFlag = true; - } + timeout = userQueryTimeout * 1000L; } else { - // default use session query_timeout - if (delta > sessionVariable.getQueryTimeoutS() * 1000) { - LOG.warn("kill query timeout, remote: {}, query timeout: {}", - getMysqlChannel().getRemoteHostPortString(), sessionVariable.getQueryTimeoutS()); + //to ms + timeout = executionTimeoutS * 1000L; + } + //deal with insert stmt particularly + if (executor != null && executor.isInsertStmt()) { + timeoutTag = "insert"; + } - // Only kill - killFlag = true; - } + if (delta > timeout) { + LOG.warn("kill {} timeout, remote: {}, query timeout: {}", + timeoutTag, getMysqlChannel().getRemoteHostPortString(), timeout); + killFlag = true; } } + if (killFlag) { kill(killConnection); } @@ -635,6 +648,18 @@ public class ConnectContext { return mysqlChannel == null ? "" : mysqlChannel.getRemoteIp(); } + public void resetExecTimeout() { + if (executor != null && executor.isInsertStmt()) { + // particular timeout for insert stmt, we can make other particular timeout in the same way. + // set the execution timeout as max(insert_timeout,query_timeout) to be compatible with older versions + executionTimeoutS = Math.max(sessionVariable.getInsertTimeoutS(), executionTimeoutS); + } + } + + public int getExecTimeout() { + return executionTimeoutS; + } + public class ThreadInfo { public boolean isFull; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 00f047b318..2493ed2010 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -408,6 +408,8 @@ public class ConnectProcessor { parsedStmt.setUserInfo(ctx.getCurrentUserIdentity()); executor = new StmtExecutor(ctx, parsedStmt); ctx.setExecutor(executor); + // reset the executionTimeout corresponding with the StmtExecutor + ctx.resetExecTimeout(); try { executor.execute(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 77775c713c..4d29d94333 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -235,7 +235,7 @@ public class Coordinator { private final TUniqueId nextInstanceId; // a timestamp represent the absolute timeout - // eg, System.currentTimeMillis() + query_timeout * 1000 + // eg, System.currentTimeMillis() + executeTimeoutS * 1000 private long timeoutDeadline; private boolean enableShareHashTableForBroadcastJoin = false; @@ -389,6 +389,7 @@ public class Coordinator { this.queryOptions.setEnableVectorizedEngine(VectorizedUtil.isVectorized()); this.queryOptions.setEnablePipelineEngine(VectorizedUtil.isPipeline()); this.queryOptions.setBeExecVersion(Config.be_exec_version); + this.queryOptions.setExecutionTimeout(context.getExecTimeout()); } public long getJobId() { @@ -585,7 +586,7 @@ public class Coordinator { PlanFragmentId topId = fragments.get(0).getFragmentId(); FragmentExecParams topParams = fragmentExecParamsMap.get(topId); DataSink topDataSink = topParams.fragment.getSink(); - this.timeoutDeadline = System.currentTimeMillis() + queryOptions.query_timeout * 1000L; + this.timeoutDeadline = System.currentTimeMillis() + queryOptions.getExecutionTimeout() * 1000L; if (topDataSink instanceof ResultSink || topDataSink instanceof ResultFileSink) { TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; receiver = new ResultReceiver(topParams.instanceExecParams.get(0).instanceId, @@ -785,7 +786,7 @@ public class Coordinator { String operation) throws RpcException, UserException { if (leftTimeMs <= 0) { throw new UserException("timeout before waiting for " + operation + " RPC. Elapse(sec): " + ( - (System.currentTimeMillis() - timeoutDeadline) / 1000 + queryOptions.query_timeout)); + (System.currentTimeMillis() - timeoutDeadline) / 1000 + queryOptions.getExecutionTimeout())); } long timeoutMs = Math.min(leftTimeMs, Config.remote_fragment_exec_timeout_ms); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 28c2028226..5be52b1097 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -57,6 +57,7 @@ public class SessionVariable implements Serializable, Writable { public static final String EXEC_MEM_LIMIT = "exec_mem_limit"; public static final String QUERY_TIMEOUT = "query_timeout"; + public static final String INSERT_TIMEOUT = "insert_timeout"; public static final String ENABLE_PROFILE = "enable_profile"; public static final String SQL_MODE = "sql_mode"; public static final String RESOURCE_VARIABLE = "resource_group"; @@ -293,6 +294,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = QUERY_TIMEOUT) public int queryTimeoutS = 300; + @VariableMgr.VarAttr(name = INSERT_TIMEOUT) + public int insertTimeoutS = 14400; + // if true, need report to coordinator when plan fragment execute successfully. @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true) public boolean enableProfile = false; @@ -820,6 +824,14 @@ public class SessionVariable implements Serializable, Writable { return queryTimeoutS; } + public int getInsertTimeoutS() { + return insertTimeoutS; + } + + public void setInsertTimeoutS(int insertTimeoutS) { + this.insertTimeoutS = insertTimeoutS; + } + public boolean enableProfile() { return enableProfile; } @@ -1674,6 +1686,9 @@ public class SessionVariable implements Serializable, Writable { if (queryOptions.isSetQueryTimeout()) { setQueryTimeoutS(queryOptions.getQueryTimeout()); } + if (queryOptions.isSetInsertTimeout()) { + setInsertTimeoutS(queryOptions.getInsertTimeout()); + } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 86bf8945a6..bef4253baf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -393,6 +393,10 @@ public class StmtExecutor implements ProfileWriter { return parsedStmt != null && parsedStmt instanceof QueryStmt; } + public boolean isInsertStmt() { + return parsedStmt != null && parsedStmt instanceof InsertStmt; + } + /** * Used for audit in ConnectProcessor. * <p> @@ -1562,8 +1566,9 @@ public class StmtExecutor implements ProfileWriter { QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); coord.exec(); - - boolean notTimeout = coord.join(context.getSessionVariable().getQueryTimeoutS()); + int execTimeout = context.getExecTimeout(); + LOG.debug("Insert execution timeout:{}", execTimeout); + boolean notTimeout = coord.join(execTimeout); if (!coord.isDone()) { coord.cancel(); if (notTimeout) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java index f8c94694d4..fdee7a7a37 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java @@ -196,12 +196,16 @@ public class ConnectContextTest { // sleep no time out Assert.assertFalse(ctx.isKilled()); - long now = ctx.getSessionVariable().getQueryTimeoutS() * 1000 - 1; + ctx.setExecutor(executor); + ctx.resetExecTimeout(); + long now = ctx.getExecTimeout() * 1000L - 1; ctx.checkTimeout(now); Assert.assertFalse(ctx.isKilled()); // Timeout - now = ctx.getSessionVariable().getQueryTimeoutS() * 1000 + 1; + ctx.setExecutor(executor); + ctx.resetExecTimeout(); + now = ctx.getExecTimeout() * 1000L + 1; ctx.checkTimeout(now); Assert.assertFalse(ctx.isKilled()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java index 947e83d95d..5193dd9a82 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java @@ -79,8 +79,10 @@ public class SessionVariablesTest extends TestWithFeService { Assertions.assertTrue(queryOptions.isSetQueryTimeout()); queryOptions.setQueryTimeout(123); + queryOptions.setInsertTimeout(123); sessionVariable.setForwardedSessionVariables(queryOptions); Assertions.assertEquals(123, sessionVariable.getQueryTimeoutS()); + Assertions.assertEquals(123, sessionVariable.getInsertTimeoutS()); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java index 1067a0c1a3..71740f061f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/VariableMgrTest.java @@ -64,6 +64,7 @@ public class VariableMgrTest { long originExecMemLimit = var.getMaxExecMemByte(); boolean originEnableProfile = var.enableProfile(); long originQueryTimeOut = var.getQueryTimeoutS(); + final int originInsertTimeout = var.getInsertTimeoutS(); List<List<String>> rows = VariableMgr.dump(SetType.SESSION, var, null); Assert.assertTrue(rows.size() > 5); @@ -76,6 +77,8 @@ public class VariableMgrTest { Assert.assertEquals(String.valueOf(originQueryTimeOut), row.get(1)); } else if (row.get(0).equalsIgnoreCase("sql_mode")) { Assert.assertEquals("", row.get(1)); + } else if (row.get(0).equalsIgnoreCase("insert_timeout")) { + Assert.assertEquals(String.valueOf(originInsertTimeout), row.get(1)); } } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 65471d8f5c..f3befbb80a 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -194,6 +194,10 @@ struct TQueryOptions { 59: optional i32 partitioned_hash_agg_rows_threshold = 0 60: optional bool enable_file_cache = true + + 61: optional i32 insert_timeout = 14400 + + 62: optional i32 execution_timeout = 3600 } diff --git a/regression-test/suites/datev2/ssb_sf0.1_p1/load.groovy b/regression-test/suites/datev2/ssb_sf0.1_p1/load.groovy index b851c13c4d..6ed3a68b91 100644 --- a/regression-test/suites/datev2/ssb_sf0.1_p1/load.groovy +++ b/regression-test/suites/datev2/ssb_sf0.1_p1/load.groovy @@ -88,8 +88,8 @@ suite("load") { def rowCount = sql "select count(*) from ${table}" if (rowCount[0][0] != table_rows) { sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text - sql "set global query_timeout=3600" - def r = sql "select @@query_timeout" + sql "set global insert_timeout=3600" + def r = sql "select @@insert_timeout" assertEquals(3600, r[0][0]) year_cons = [ 'lo_orderdate<19930101', diff --git a/regression-test/suites/datev2/ssb_sf1_p2/load.groovy b/regression-test/suites/datev2/ssb_sf1_p2/load.groovy index fe0cead6f4..5006e1a384 100644 --- a/regression-test/suites/datev2/ssb_sf1_p2/load.groovy +++ b/regression-test/suites/datev2/ssb_sf1_p2/load.groovy @@ -88,8 +88,8 @@ suite("load") { def rowCount = sql "select count(*) from ${table}" if (rowCount[0][0] != table_rows) { sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text - sql "set global query_timeout=3600" - def r = sql "select @@query_timeout" + sql "set global insert_timeout=3600" + def r = sql "select @@insert_timeout" assertEquals(3600, r[0][0]) year_cons = [ 'lo_orderdate<19930101', diff --git a/regression-test/suites/ssb_sf0.1_p1/load.groovy b/regression-test/suites/ssb_sf0.1_p1/load.groovy index 6baf4d2bb0..0c81fb4a6f 100644 --- a/regression-test/suites/ssb_sf0.1_p1/load.groovy +++ b/regression-test/suites/ssb_sf0.1_p1/load.groovy @@ -87,8 +87,8 @@ suite("load") { def rowCount = sql "select count(*) from ${table}" if (rowCount[0][0] != table_rows) { sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text - sql "set global query_timeout=3600" - def r = sql "select @@query_timeout" + sql "set global insert_timeout=3600" + def r = sql "select @@insert_timeout" assertEquals(3600, r[0][0]) year_cons = [ 'lo_orderdate<19930101', diff --git a/regression-test/suites/ssb_sf100_p2/load.groovy b/regression-test/suites/ssb_sf100_p2/load.groovy index 492e121027..bcf423f7d4 100644 --- a/regression-test/suites/ssb_sf100_p2/load.groovy +++ b/regression-test/suites/ssb_sf100_p2/load.groovy @@ -77,9 +77,9 @@ suite('load') { def rowCount = sql "select count(*) from ${table}" if (rowCount[0][0] != table_rows) { sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text - sql "set global query_timeout=3600" + sql "set global insert_timeout=3600" sql "sync" - def r = sql "select @@query_timeout" + def r = sql "select @@insert_timeout" assertEquals(3600, r[0][0]) year_cons = [ 'lo_orderdate<19930101', diff --git a/regression-test/suites/ssb_sf1_p2/load.groovy b/regression-test/suites/ssb_sf1_p2/load.groovy index fdaaad5c3b..ee0163b641 100644 --- a/regression-test/suites/ssb_sf1_p2/load.groovy +++ b/regression-test/suites/ssb_sf1_p2/load.groovy @@ -89,8 +89,8 @@ suite("load") { def rowCount = sql "select count(*) from ${table}" if (rowCount[0][0] != table_rows) { sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text - sql "set global query_timeout=3600" - def r = sql "select @@query_timeout" + sql "set global insert_timeout=3600" + def r = sql "select @@insert_timeout" assertEquals(3600, r[0][0]) year_cons = [ 'lo_orderdate<19930101', diff --git a/tools/ssb-tools/bin/load-ssb-data.sh b/tools/ssb-tools/bin/load-ssb-data.sh index be5c550f93..db958b3921 100755 --- a/tools/ssb-tools/bin/load-ssb-data.sh +++ b/tools/ssb-tools/bin/load-ssb-data.sh @@ -257,16 +257,12 @@ date echo "==========Start to insert data into ssb flat table==========" echo "change some session variables before load, and then restore after load." -origin_query_timeout=$( - set -e - run_sql 'select @@query_timeout;' | sed -n '3p' -) origin_parallel=$( set -e run_sql 'select @@parallel_fragment_exec_instance_num;' | sed -n '3p' ) # set parallel_fragment_exec_instance_num=1, loading maybe slow but stable. -run_sql "set global query_timeout=7200;" +run_sql "set global insert_timeout=7200;" run_sql "set global parallel_fragment_exec_instance_num=1;" echo '============================================' date @@ -274,7 +270,6 @@ load_lineitem_flat date echo '============================================' echo "restore session variables" -run_sql "set global query_timeout=${origin_query_timeout};" run_sql "set global parallel_fragment_exec_instance_num=${origin_parallel};" echo '============================================' --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org