This is an automated email from the ASF dual-hosted git repository.

jiangmaolin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0731b44c62a Fix the connection leak caused by rollback failure in 
Proxy (#35867)
0731b44c62a is described below

commit 0731b44c62a6a013988359458b530b57d855e954
Author: Raigor <raigor.ji...@gmail.com>
AuthorDate: Thu Jul 3 17:08:45 2025 +0800

    Fix the connection leak caused by rollback failure in Proxy (#35867)
    
    * Fix the connection leak caused by rollback failure in Proxy
    
    * Update RELEASE-NOTES.md
---
 RELEASE-NOTES.md                                   |  1 +
 .../connector/ProxyDatabaseConnectionManager.java  | 38 ++++++++++++++++++----
 .../netty/FrontendChannelInboundHandler.java       | 23 ++++++++++++-
 3 files changed, 55 insertions(+), 7 deletions(-)

diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index e19fd37e292..abe4f860882 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -72,6 +72,7 @@
 1. Encrypt: Resolve rewrite issue in nested concat function - 
[35815](https://github.com/apache/shardingsphere/pull/35815)
 1. JDBC: Fix the issue where cached connections in 
DriverDatabaseConnectionManager were not released in time - 
[35834](https://github.com/apache/shardingsphere/pull/35834)
 1. Proxy: Fix column length for PostgreSQL string binary protocol value - 
[35840](https://github.com/apache/shardingsphere/pull/35840)
+1. Proxy: Fix the connection leak caused by rollback failure in Proxy - 
[35867](https://github.com/apache/shardingsphere/pull/35867)
 
 ### Change Logs
 
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
index c82dd4c01ab..517df05ef4c 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java
@@ -22,6 +22,7 @@ import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Multimap;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DatabaseConnectionManager;
@@ -53,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  * Database connection manager of ShardingSphere-Proxy.
  */
+@Slf4j
 @RequiredArgsConstructor
 @Getter
 public final class ProxyDatabaseConnectionManager implements 
DatabaseConnectionManager<Connection> {
@@ -71,6 +73,8 @@ public final class ProxyDatabaseConnectionManager implements 
DatabaseConnectionM
     
     private final AtomicBoolean closed = new AtomicBoolean(false);
     
+    private final Object closeLock = new Object();
+    
     @SuppressWarnings("rawtypes")
     private final Map<ShardingSphereRule, TransactionHook> transactionHooks = 
OrderedSPILoader.getServices(
             TransactionHook.class, 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules());
@@ -259,7 +263,7 @@ public final class ProxyDatabaseConnectionManager 
implements DatabaseConnectionM
      * @throws BackendConnectionException backend connection exception
      */
     public void closeExecutionResources() throws BackendConnectionException {
-        synchronized (this) {
+        synchronized (closeLock) {
             Collection<Exception> result = new 
LinkedList<>(closeHandlers(false));
             if 
(!connectionSession.getTransactionStatus().isInConnectionHeldTransaction(TransactionUtils.getTransactionType(connectionSession.getConnectionContext().getTransactionContext())))
 {
                 result.addAll(closeHandlers(true));
@@ -277,12 +281,16 @@ public final class ProxyDatabaseConnectionManager 
implements DatabaseConnectionM
     
     /**
      * Close all resources.
+     *
+     * @return exceptions occurred during closing resources
      */
-    public void closeAllResources() {
-        synchronized (this) {
+    public Collection<SQLException> closeAllResources() {
+        synchronized (closeLock) {
             closed.set(true);
-            closeHandlers(true);
-            closeConnections(true);
+            Collection<SQLException> result = new LinkedList<>();
+            result.addAll(closeHandlers(true));
+            result.addAll(closeConnections(true));
+            return result;
         }
     }
     
@@ -326,9 +334,17 @@ public final class ProxyDatabaseConnectionManager 
implements DatabaseConnectionM
                     if (forceRollback && 
connectionSession.getTransactionStatus().isInTransaction()) {
                         each.rollback();
                     }
-                    each.close();
                 } catch (final SQLException ex) {
                     result.add(ex);
+                } finally {
+                    try {
+                        each.close();
+                    } catch (final SQLException ex) {
+                        if (!isClosed(each)) {
+                            log.warn("Close connection {} failed.", each, ex);
+                        }
+                        result.add(ex);
+                    }
                 }
             }
             cachedConnections.clear();
@@ -339,6 +355,16 @@ public final class ProxyDatabaseConnectionManager 
implements DatabaseConnectionM
         return result;
     }
     
+    private boolean isClosed(final Connection connection) {
+        try {
+            if (connection.isClosed()) {
+                return true;
+            }
+        } catch (final SQLException ignored) {
+        }
+        return false;
+    }
+    
     private void resetSessionVariablesIfNecessary(final Collection<Connection> 
values, final Collection<SQLException> exceptions) {
         if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() 
|| values.isEmpty()) {
             return;
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index d3ea1b3a4b0..4896d220373 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -33,6 +33,8 @@ import 
org.apache.shardingsphere.proxy.frontend.executor.UserExecutorGroup;
 import 
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
 import org.apache.shardingsphere.proxy.frontend.state.ProxyStateContext;
 
+import java.sql.SQLException;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -105,11 +107,30 @@ public final class FrontendChannelInboundHandler extends 
ChannelInboundHandlerAd
     
     private void closeAllResources() {
         
ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionSession.getConnectionId());
-        connectionSession.getDatabaseConnectionManager().closeAllResources();
+        
processCloseExceptions(connectionSession.getDatabaseConnectionManager().closeAllResources());
         
Optional.ofNullable(connectionSession.getProcessId()).ifPresent(processEngine::disconnect);
         databaseProtocolFrontendEngine.release(connectionSession);
     }
     
+    private void processCloseExceptions(final Collection<SQLException> 
exceptions) {
+        if (exceptions.isEmpty()) {
+            return;
+        }
+        SQLException ex = new SQLException("");
+        for (SQLException each : exceptions) {
+            ex.setNextException(each);
+        }
+        processException(ex);
+    }
+    
+    private void processException(final Exception cause) {
+        if (ExpectedExceptions.isExpected(cause.getClass())) {
+            log.debug("Exception occur: ", cause);
+        } else {
+            log.error("Exception occur: ", cause);
+        }
+    }
+    
     @Override
     public void channelWritabilityChanged(final ChannelHandlerContext context) 
{
         if (context.channel().isWritable()) {

Reply via email to