This is an automated email from the ASF dual-hosted git repository. duanzhengqiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new a086b39 Revise #10186 (#10189) a086b39 is described below commit a086b3942fc3601870d69c9312cea1787f746dff Author: 吴伟杰 <wuwei...@apache.org> AuthorDate: Sun Apr 25 16:53:36 2021 +0800 Revise #10186 (#10189) * Revise #10186 * Add count before execute and catch exception --- .../backend/communication/jdbc/connection/BackendConnection.java | 2 +- .../proxy/frontend/command/CommandExecutorTask.java | 3 +-- .../shardingsphere/proxy/frontend/state/impl/OKProxyState.java | 8 +++++++- .../proxy/frontend/postgresql/PostgreSQLFrontendEngine.java | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java index 7d1ebad..2d983e5 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java @@ -73,7 +73,7 @@ public final class BackendConnection implements ExecutorJDBCManager { @Setter private volatile CalciteExecutor calciteExecutor; - private final AtomicInteger runningTaskCount = new AtomicInteger(0); + private final AtomicInteger submittedTaskCount = new AtomicInteger(0); private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create(); diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java index 23d8839..0af07b0 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java @@ -61,7 +61,6 @@ public final class CommandExecutorTask implements Runnable { */ @Override public void run() { - backendConnection.getRunningTaskCount().incrementAndGet(); boolean isNeedFlush = false; try (PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) message)) { ConnectionStatus connectionStatus = backendConnection.getConnectionStatus(); @@ -75,7 +74,7 @@ public final class CommandExecutorTask implements Runnable { // CHECKSTYLE:ON processException(ex); } finally { - backendConnection.getRunningTaskCount().decrementAndGet(); + backendConnection.getSubmittedTaskCount().decrementAndGet(); Collection<SQLException> exceptions = closeExecutionResources(); if (isNeedFlush) { context.flush(); diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java index 562b30b..2cad095 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/OKProxyState.java @@ -27,6 +27,7 @@ import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngi import org.apache.shardingsphere.proxy.frontend.state.ProxyState; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; /** * OK proxy state. @@ -39,6 +40,11 @@ public final class OKProxyState implements ProxyState { boolean isOccupyThreadForPerConnection = databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection(); ExecutorService executorService = CommandExecutorSelector.getExecutorService( isOccupyThreadForPerConnection, supportHint, backendConnection.getTransactionStatus().getTransactionType(), context.channel().id()); - executorService.execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message)); + backendConnection.getSubmittedTaskCount().incrementAndGet(); + try { + executorService.execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message)); + } catch (final RejectedExecutionException ignored) { + backendConnection.getSubmittedTaskCount().decrementAndGet(); + } } } diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java index 33f6e88..2f64869 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/PostgreSQLFrontendEngine.java @@ -51,7 +51,7 @@ public final class PostgreSQLFrontendEngine implements DatabaseProtocolFrontendE } private void waitingForFinish(final BackendConnection backendConnection) { - while (backendConnection.getRunningTaskCount().get() > 0) { + while (backendConnection.getSubmittedTaskCount().get() > 0) { try { Thread.sleep(500L); } catch (final InterruptedException ex) {