This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 3febac1d916 [fix](connection) kill connection when meeting Write mysql 
packet failed error #36559 (#36616)
3febac1d916 is described below

commit 3febac1d916c0219d91c61b8c65feba2b64383fa
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Thu Jun 20 22:27:01 2024 +0800

    [fix](connection) kill connection when meeting Write mysql packet failed 
error #36559 (#36616)
    
    bp #36559
---
 .../apache/doris/common/ConnectionException.java   | 35 ++++++++++++++++++++++
 .../java/org/apache/doris/mysql/MysqlChannel.java  |  7 +++--
 .../java/org/apache/doris/qe/ConnectProcessor.java | 17 +++++++----
 .../org/apache/doris/qe/MysqlConnectProcessor.java |  5 ++--
 .../arrowflight/FlightSqlConnectProcessor.java     |  3 +-
 5 files changed, 57 insertions(+), 10 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/ConnectionException.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ConnectionException.java
new file mode 100644
index 00000000000..3f1de2ae2b8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ConnectionException.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import java.io.IOException;
+
+/**
+ * This is a special exception.
+ * If this exception is thrown, it means that the connection to the server is 
abnormal.
+ * We need to kill the connection actively.
+ */
+public class ConnectionException extends IOException {
+    public ConnectionException(String message) {
+        super(message);
+    }
+
+    public ConnectionException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
index d22ba393699..392b0587585 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.mysql;
 
+import org.apache.doris.common.ConnectionException;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ConnectProcessor;
@@ -401,11 +402,13 @@ public class MysqlChannel implements BytesChannel {
     protected void realNetSend(ByteBuffer buffer) throws IOException {
         buffer = encryptData(buffer);
         long bufLen = buffer.remaining();
+        long start = System.currentTimeMillis();
         long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, 
context.getNetWriteTimeout(),
                 TimeUnit.SECONDS);
         if (bufLen != writeLen) {
-            throw new IOException("Write mysql packet failed.[write=" + 
writeLen
-                    + ", needToWrite=" + bufLen + "]");
+            long duration = System.currentTimeMillis() - start;
+            throw new ConnectionException("Write mysql packet failed.[write=" 
+ writeLen
+                    + ", needToWrite=" + bufLen + "], duration: " + duration + 
" ms");
         }
         Channels.flushBlocking(conn.getSinkChannel(), 
context.getNetWriteTimeout(), TimeUnit.SECONDS);
         isSend = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index d54b708e818..51911c03333 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.ConnectionException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.NotImplementedException;
@@ -198,9 +199,11 @@ public abstract class ConnectProcessor {
     }
 
     // only throw an exception when there is a problem interacting with the 
requesting client
-    protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) {
+    protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) 
throws ConnectionException {
         try {
             executeQuery(mysqlCommand, originStmt);
+        } catch (ConnectionException exception) {
+            throw exception;
         } catch (Exception ignored) {
             // saved use handleQueryException
         }
@@ -414,14 +417,18 @@ public abstract class ConnectProcessor {
 
     // Use a handler for exception to avoid big try catch block which is a 
little hard to understand
     protected void handleQueryException(Throwable throwable, String origStmt,
-            StatementBase parsedStmt, Data.PQueryStatistics statistics) {
+            StatementBase parsedStmt, Data.PQueryStatistics statistics) throws 
ConnectionException {
         if (ctx.getMinidump() != null) {
             MinidumpUtils.saveMinidumpString(ctx.getMinidump(), 
DebugUtil.printId(ctx.queryId()));
         }
-        if (throwable instanceof IOException) {
+        if (throwable instanceof ConnectionException) {
+            // Throw this exception to close the connection outside.
+            LOG.warn("Process one query failed because ConnectionException: ", 
throwable);
+            throw (ConnectionException) throwable;
+        } else if (throwable instanceof IOException) {
             // Client failed.
             LOG.warn("Process one query failed because IOException: ", 
throwable);
-            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris 
process failed");
+            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris 
process failed: " + throwable.getMessage());
         } else if (throwable instanceof UserException) {
             LOG.warn("Process one query failed because.", throwable);
             ctx.getState().setError(((UserException) 
throwable).getMysqlErrorCode(), throwable.getMessage());
@@ -479,7 +486,7 @@ public abstract class ConnectProcessor {
 
     // Get the column definitions of a table
     @SuppressWarnings("rawtypes")
-    protected void handleFieldList(String tableName) {
+    protected void handleFieldList(String tableName) throws 
ConnectionException {
         // Already get command code.
         if (Strings.isNullOrEmpty(tableName)) {
             ctx.getState().setError(ErrorCode.ERR_UNKNOWN_TABLE, "Empty 
tableName");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
index fc19330268c..6637f62f1ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.PrepareStmt;
 import org.apache.doris.analysis.QueryStmt;
 import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.catalog.MysqlColType;
+import org.apache.doris.common.ConnectionException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.mysql.MysqlChannel;
@@ -245,7 +246,7 @@ public class MysqlConnectProcessor extends ConnectProcessor 
{
     }
 
     // Process COM_QUERY statement,
-    private void handleQuery(MysqlCommand mysqlCommand) {
+    private void handleQuery(MysqlCommand mysqlCommand) throws 
ConnectionException {
         // convert statement to Java string
         byte[] bytes = packetBuf.array();
         int ending = packetBuf.limit() - 1;
@@ -307,7 +308,7 @@ public class MysqlConnectProcessor extends ConnectProcessor 
{
         }
     }
 
-    private void handleFieldList() {
+    private void handleFieldList() throws ConnectionException {
         String tableName = new 
String(MysqlProto.readNulTerminateString(packetBuf), StandardCharsets.UTF_8);
         handleFieldList(tableName);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index 6f51c0391af..a4aa5a88c8f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.doris.service.arrowflight;
 
 import org.apache.doris.analysis.Expr;
+import org.apache.doris.common.ConnectionException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.Status;
@@ -81,7 +82,7 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
         ctx.setStartTime();
     }
 
-    public void handleQuery(String query) {
+    public void handleQuery(String query) throws ConnectionException {
         MysqlCommand command = MysqlCommand.COM_QUERY;
         prepare(command);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to