This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 3febac1d916 [fix](connection) kill connection when meeting Write mysql packet failed error #36559 (#36616) 3febac1d916 is described below commit 3febac1d916c0219d91c61b8c65feba2b64383fa Author: Mingyu Chen <morning...@163.com> AuthorDate: Thu Jun 20 22:27:01 2024 +0800 [fix](connection) kill connection when meeting Write mysql packet failed error #36559 (#36616) bp #36559 --- .../apache/doris/common/ConnectionException.java | 35 ++++++++++++++++++++++ .../java/org/apache/doris/mysql/MysqlChannel.java | 7 +++-- .../java/org/apache/doris/qe/ConnectProcessor.java | 17 +++++++---- .../org/apache/doris/qe/MysqlConnectProcessor.java | 5 ++-- .../arrowflight/FlightSqlConnectProcessor.java | 3 +- 5 files changed, 57 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ConnectionException.java b/fe/fe-core/src/main/java/org/apache/doris/common/ConnectionException.java new file mode 100644 index 00000000000..3f1de2ae2b8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ConnectionException.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import java.io.IOException; + +/** + * This is a special exception. + * If this exception is thrown, it means that the connection to the server is abnormal. + * We need to kill the connection actively. + */ +public class ConnectionException extends IOException { + public ConnectionException(String message) { + super(message); + } + + public ConnectionException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java index d22ba393699..392b0587585 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java @@ -17,6 +17,7 @@ package org.apache.doris.mysql; +import org.apache.doris.common.ConnectionException; import org.apache.doris.common.util.NetUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectProcessor; @@ -401,11 +402,13 @@ public class MysqlChannel implements BytesChannel { protected void realNetSend(ByteBuffer buffer) throws IOException { buffer = encryptData(buffer); long bufLen = buffer.remaining(); + long start = System.currentTimeMillis(); long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, context.getNetWriteTimeout(), TimeUnit.SECONDS); if (bufLen != writeLen) { - throw new IOException("Write mysql packet failed.[write=" + writeLen - + ", needToWrite=" + bufLen + "]"); + long duration = System.currentTimeMillis() - start; + throw new ConnectionException("Write mysql packet failed.[write=" + writeLen + + ", needToWrite=" + bufLen + "], duration: " + duration + " ms"); } Channels.flushBlocking(conn.getSinkChannel(), context.getNetWriteTimeout(), TimeUnit.SECONDS); isSend = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index d54b708e818..51911c03333 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -32,6 +32,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.ConnectionException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.NotImplementedException; @@ -198,9 +199,11 @@ public abstract class ConnectProcessor { } // only throw an exception when there is a problem interacting with the requesting client - protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) { + protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) throws ConnectionException { try { executeQuery(mysqlCommand, originStmt); + } catch (ConnectionException exception) { + throw exception; } catch (Exception ignored) { // saved use handleQueryException } @@ -414,14 +417,18 @@ public abstract class ConnectProcessor { // Use a handler for exception to avoid big try catch block which is a little hard to understand protected void handleQueryException(Throwable throwable, String origStmt, - StatementBase parsedStmt, Data.PQueryStatistics statistics) { + StatementBase parsedStmt, Data.PQueryStatistics statistics) throws ConnectionException { if (ctx.getMinidump() != null) { MinidumpUtils.saveMinidumpString(ctx.getMinidump(), DebugUtil.printId(ctx.queryId())); } - if (throwable instanceof IOException) { + if (throwable instanceof ConnectionException) { + // Throw this exception to close the connection outside. + LOG.warn("Process one query failed because ConnectionException: ", throwable); + throw (ConnectionException) throwable; + } else if (throwable instanceof IOException) { // Client failed. LOG.warn("Process one query failed because IOException: ", throwable); - ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris process failed"); + ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris process failed: " + throwable.getMessage()); } else if (throwable instanceof UserException) { LOG.warn("Process one query failed because.", throwable); ctx.getState().setError(((UserException) throwable).getMysqlErrorCode(), throwable.getMessage()); @@ -479,7 +486,7 @@ public abstract class ConnectProcessor { // Get the column definitions of a table @SuppressWarnings("rawtypes") - protected void handleFieldList(String tableName) { + protected void handleFieldList(String tableName) throws ConnectionException { // Already get command code. if (Strings.isNullOrEmpty(tableName)) { ctx.getState().setError(ErrorCode.ERR_UNKNOWN_TABLE, "Empty tableName"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java index fc19330268c..6637f62f1ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java @@ -24,6 +24,7 @@ import org.apache.doris.analysis.PrepareStmt; import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.StatementBase; import org.apache.doris.catalog.MysqlColType; +import org.apache.doris.common.ConnectionException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.mysql.MysqlChannel; @@ -245,7 +246,7 @@ public class MysqlConnectProcessor extends ConnectProcessor { } // Process COM_QUERY statement, - private void handleQuery(MysqlCommand mysqlCommand) { + private void handleQuery(MysqlCommand mysqlCommand) throws ConnectionException { // convert statement to Java string byte[] bytes = packetBuf.array(); int ending = packetBuf.limit() - 1; @@ -307,7 +308,7 @@ public class MysqlConnectProcessor extends ConnectProcessor { } } - private void handleFieldList() { + private void handleFieldList() throws ConnectionException { String tableName = new String(MysqlProto.readNulTerminateString(packetBuf), StandardCharsets.UTF_8); handleFieldList(tableName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index 6f51c0391af..a4aa5a88c8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -18,6 +18,7 @@ package org.apache.doris.service.arrowflight; import org.apache.doris.analysis.Expr; +import org.apache.doris.common.ConnectionException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.Status; @@ -81,7 +82,7 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC ctx.setStartTime(); } - public void handleQuery(String query) { + public void handleQuery(String query) throws ConnectionException { MysqlCommand command = MysqlCommand.COM_QUERY; prepare(command); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org