This is an automated email from the ASF dual-hosted git repository.
adonisling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f5d958ccf9 [fix](MTMV) Reset insert timeout in handleInsert (#17249)
f5d958ccf9 is described below
commit f5d958ccf9127e13385bfbbaff8fa2ac1afac2b6
Author: huangzhaowei <[email protected]>
AuthorDate: Fri Mar 3 11:32:50 2023 +0800
[fix](MTMV) Reset insert timeout in handleInsert (#17249)
In #16343, we split the timeout variable into two ones (one is for query
and another is for insertion).
The function `ConnectProcessor::handleQuery` uses the corresponding session
variable to change the timeout for the queries requested by MySQL client.
However, the function `StmtExecutor::handleInsert` doesn't use the session
variable to change the timeout, so we can't change the timeout for the CTAS and
MTMV insertion job.
---
.../main/java/org/apache/doris/analysis/InsertStmt.java | 2 +-
.../main/java/org/apache/doris/qe/ConnectContext.java | 17 +++++++++--------
.../main/java/org/apache/doris/qe/ConnectProcessor.java | 2 --
.../src/main/java/org/apache/doris/qe/StmtExecutor.java | 4 +++-
.../java/org/apache/doris/qe/ConnectContextTest.java | 4 ++--
5 files changed, 15 insertions(+), 14 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 79173f42e8..10cffc53d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -305,7 +305,7 @@ public class InsertStmt extends DdlStmt {
db =
analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
// create label and begin transaction
- long timeoutSecond = ConnectContext.get().getExecTimeout();
+ long timeoutSecond = ConnectContext.get().resetExecTimeoutByInsert();
if (Strings.isNullOrEmpty(label)) {
label = "insert_" +
DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_");
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index fb8c650004..1a5143a5f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -155,8 +155,8 @@ public class ConnectContext {
* the global execution timeout in seconds, currently set according to
query_timeout and insert_timeout.
* <p>
* when a connection is established, exec_timeout is set by query_timeout,
when the statement is an insert stmt,
- * then it is set to max(query_timeout, insert_timeout) with {@link
#resetExecTimeout()} in
- * after the StmtExecutor is specified.
+ * then it is set to max(executionTimeoutS, insert_timeout) using {@link
#setExecTimeout(int timeout)} at
+ * {@link StmtExecutor}.
*/
private int executionTimeoutS;
@@ -640,12 +640,13 @@ public class ConnectContext {
return currentConnectedFEIp;
}
- public void resetExecTimeout() {
- if (executor != null && executor.isInsertStmt()) {
- // particular timeout for insert stmt, we can make other
particular timeout in the same way.
- // set the execution timeout as max(insert_timeout,query_timeout)
to be compatible with older versions
- executionTimeoutS = Math.max(sessionVariable.getInsertTimeoutS(),
executionTimeoutS);
- }
+ public void setExecTimeout(int timeout) {
+ executionTimeoutS = timeout;
+ }
+
+ public long resetExecTimeoutByInsert() {
+ executionTimeoutS = Math.max(executionTimeoutS,
sessionVariable.getInsertTimeoutS());
+ return executionTimeoutS;
}
public int getExecTimeout() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 6574929d7d..b45564ec74 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -408,8 +408,6 @@ public class ConnectProcessor {
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
executor = new StmtExecutor(ctx, parsedStmt);
ctx.setExecutor(executor);
- // reset the executionTimeout corresponding with the StmtExecutor
- ctx.resetExecTimeout();
try {
executor.execute();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 5c032c28db..bf34dd03a9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1489,7 +1489,7 @@ public class StmtExecutor implements ProfileWriter {
InterruptedException, ExecutionException, TimeoutException {
TransactionEntry txnEntry = context.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
- long timeoutSecond =
ConnectContext.get().getSessionVariable().getQueryTimeoutS();
+ long timeoutSecond = ConnectContext.get().getExecTimeout();
TransactionState.LoadJobSourceType sourceType =
TransactionState.LoadJobSourceType.INSERT_STREAMING;
Database dbObj = Env.getCurrentInternalCatalog()
.getDbOrException(dbName, s -> new TException("database is
invalid for dbName: " + s));
@@ -1550,6 +1550,8 @@ public class StmtExecutor implements ProfileWriter {
}
analyzeVariablesInStmt(insertStmt.getQueryStmt());
+ // reset the executionTimeout since query hint maybe change the
insert_timeout again
+ context.resetExecTimeoutByInsert();
long createTime = System.currentTimeMillis();
Throwable throwable = null;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
index a0f3a942a4..68e4394f67 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
@@ -176,14 +176,14 @@ public class ConnectContextTest {
// sleep no time out
Assert.assertFalse(ctx.isKilled());
ctx.setExecutor(executor);
- ctx.resetExecTimeout();
+ ctx.setExecTimeout(ctx.getSessionVariable().getInsertTimeoutS());
long now = ctx.getExecTimeout() * 1000L - 1;
ctx.checkTimeout(now);
Assert.assertFalse(ctx.isKilled());
// Timeout
ctx.setExecutor(executor);
- ctx.resetExecTimeout();
+ ctx.setExecTimeout(ctx.getSessionVariable().getInsertTimeoutS());
now = ctx.getExecTimeout() * 1000L + 1;
ctx.checkTimeout(now);
Assert.assertFalse(ctx.isKilled());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]