This is an automated email from the ASF dual-hosted git repository. jiangmaolin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 0731b44c62a Fix the connection leak caused by rollback failure in Proxy (#35867) 0731b44c62a is described below commit 0731b44c62a6a013988359458b530b57d855e954 Author: Raigor <raigor.ji...@gmail.com> AuthorDate: Thu Jul 3 17:08:45 2025 +0800 Fix the connection leak caused by rollback failure in Proxy (#35867) * Fix the connection leak caused by rollback failure in Proxy * Update RELEASE-NOTES.md --- RELEASE-NOTES.md | 1 + .../connector/ProxyDatabaseConnectionManager.java | 38 ++++++++++++++++++---- .../netty/FrontendChannelInboundHandler.java | 23 ++++++++++++- 3 files changed, 55 insertions(+), 7 deletions(-) diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index e19fd37e292..abe4f860882 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -72,6 +72,7 @@ 1. Encrypt: Resolve rewrite issue in nested concat function - [35815](https://github.com/apache/shardingsphere/pull/35815) 1. JDBC: Fix the issue where cached connections in DriverDatabaseConnectionManager were not released in time - [35834](https://github.com/apache/shardingsphere/pull/35834) 1. Proxy: Fix column length for PostgreSQL string binary protocol value - [35840](https://github.com/apache/shardingsphere/pull/35840) +1. Proxy: Fix the connection leak caused by rollback failure in Proxy - [35867](https://github.com/apache/shardingsphere/pull/35867) ### Change Logs diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java index c82dd4c01ab..517df05ef4c 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxyDatabaseConnectionManager.java @@ -22,6 +22,7 @@ import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Multimap; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DatabaseConnectionManager; @@ -53,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * Database connection manager of ShardingSphere-Proxy. */ +@Slf4j @RequiredArgsConstructor @Getter public final class ProxyDatabaseConnectionManager implements DatabaseConnectionManager<Connection> { @@ -71,6 +73,8 @@ public final class ProxyDatabaseConnectionManager implements DatabaseConnectionM private final AtomicBoolean closed = new AtomicBoolean(false); + private final Object closeLock = new Object(); + @SuppressWarnings("rawtypes") private final Map<ShardingSphereRule, TransactionHook> transactionHooks = OrderedSPILoader.getServices( TransactionHook.class, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules()); @@ -259,7 +263,7 @@ public final class ProxyDatabaseConnectionManager implements DatabaseConnectionM * @throws BackendConnectionException backend connection exception */ public void closeExecutionResources() throws BackendConnectionException { - synchronized (this) { + synchronized (closeLock) { Collection<Exception> result = new LinkedList<>(closeHandlers(false)); if (!connectionSession.getTransactionStatus().isInConnectionHeldTransaction(TransactionUtils.getTransactionType(connectionSession.getConnectionContext().getTransactionContext()))) { result.addAll(closeHandlers(true)); @@ -277,12 +281,16 @@ public final class ProxyDatabaseConnectionManager implements DatabaseConnectionM /** * Close all resources. + * + * @return exceptions occurred during closing resources */ - public void closeAllResources() { - synchronized (this) { + public Collection<SQLException> closeAllResources() { + synchronized (closeLock) { closed.set(true); - closeHandlers(true); - closeConnections(true); + Collection<SQLException> result = new LinkedList<>(); + result.addAll(closeHandlers(true)); + result.addAll(closeConnections(true)); + return result; } } @@ -326,9 +334,17 @@ public final class ProxyDatabaseConnectionManager implements DatabaseConnectionM if (forceRollback && connectionSession.getTransactionStatus().isInTransaction()) { each.rollback(); } - each.close(); } catch (final SQLException ex) { result.add(ex); + } finally { + try { + each.close(); + } catch (final SQLException ex) { + if (!isClosed(each)) { + log.warn("Close connection {} failed.", each, ex); + } + result.add(ex); + } } } cachedConnections.clear(); @@ -339,6 +355,16 @@ public final class ProxyDatabaseConnectionManager implements DatabaseConnectionM return result; } + private boolean isClosed(final Connection connection) { + try { + if (connection.isClosed()) { + return true; + } + } catch (final SQLException ignored) { + } + return false; + } + private void resetSessionVariablesIfNecessary(final Collection<Connection> values, final Collection<SQLException> exceptions) { if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || values.isEmpty()) { return; diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java index d3ea1b3a4b0..4896d220373 100644 --- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java @@ -33,6 +33,8 @@ import org.apache.shardingsphere.proxy.frontend.executor.UserExecutorGroup; import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine; import org.apache.shardingsphere.proxy.frontend.state.ProxyStateContext; +import java.sql.SQLException; +import java.util.Collection; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -105,11 +107,30 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd private void closeAllResources() { ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionSession.getConnectionId()); - connectionSession.getDatabaseConnectionManager().closeAllResources(); + processCloseExceptions(connectionSession.getDatabaseConnectionManager().closeAllResources()); Optional.ofNullable(connectionSession.getProcessId()).ifPresent(processEngine::disconnect); databaseProtocolFrontendEngine.release(connectionSession); } + private void processCloseExceptions(final Collection<SQLException> exceptions) { + if (exceptions.isEmpty()) { + return; + } + SQLException ex = new SQLException(""); + for (SQLException each : exceptions) { + ex.setNextException(each); + } + processException(ex); + } + + private void processException(final Exception cause) { + if (ExpectedExceptions.isExpected(cause.getClass())) { + log.debug("Exception occur: ", cause); + } else { + log.error("Exception occur: ", cause); + } + } + @Override public void channelWritabilityChanged(final ChannelHandlerContext context) { if (context.channel().isWritable()) {