This is an automated email from the ASF dual-hosted git repository.

chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new baefa134dc8 Add heldByConnection to avoid MySQL JDBC memory leak 
(#28162)
baefa134dc8 is described below

commit baefa134dc8961ac3bdfdbe18edbe2f4b5f9902f
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Fri Aug 18 17:57:07 2023 +0800

    Add heldByConnection to avoid MySQL JDBC memory leak (#28162)
    
    * Add heldByConnection to avoid MySQL JDBC memory leak
    
    * fix unit test
    
    * optimize checkstyle
---
 .../infra/executor/sql/process/Process.java             | 13 ++++++++-----
 .../infra/executor/sql/process/ProcessEngine.java       | 17 +++++++++++------
 .../infra/executor/sql/process/yaml/YamlProcess.java    |  2 ++
 .../sql/process/yaml/swapper/YamlProcessSwapper.java    |  3 ++-
 .../yaml/swapper/YamlProcessListSwapperTest.java        |  5 ++++-
 .../process/yaml/swapper/YamlProcessSwapperTest.java    |  5 ++++-
 .../subscriber/ProcessListChangedSubscriberTest.java    |  3 ++-
 .../admin/executor/ShowProcessListExecutorTest.java     |  2 +-
 8 files changed, 34 insertions(+), 16 deletions(-)

diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
index a5f43d2ec3a..8597a563fc4 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
@@ -57,15 +57,17 @@ public final class Process {
     
     private final boolean idle;
     
-    public Process(final ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext) {
-        this("", executionGroupContext, true);
+    private final boolean heldByConnection;
+    
+    public Process(final ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext, final boolean heldByConnection) {
+        this("", executionGroupContext, true, heldByConnection);
     }
     
-    public Process(final String sql, final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext) {
-        this(sql, executionGroupContext, false);
+    public Process(final String sql, final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext, final boolean heldByConnection) {
+        this(sql, executionGroupContext, false, heldByConnection);
     }
     
-    private Process(final String sql, final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext, final boolean idle) {
+    private Process(final String sql, final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext, final boolean idle, final boolean 
heldByConnection) {
         id = executionGroupContext.getReportContext().getProcessId();
         startMillis = System.currentTimeMillis();
         this.sql = sql;
@@ -77,6 +79,7 @@ public final class Process {
         processStatements = getProcessStatements(executionGroupContext);
         completedUnitCount = new AtomicInteger(0);
         this.idle = idle;
+        this.heldByConnection = heldByConnection;
     }
     
     private int getTotalUnitCount(final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext) {
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index 154b83f3aa0..62d708cb0a7 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -17,11 +17,11 @@
 
 package org.apache.shardingsphere.infra.executor.sql.process;
 
-import org.apache.shardingsphere.infra.session.query.QueryContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
@@ -43,7 +43,7 @@ public final class ProcessEngine {
      */
     public String connect(final Grantee grantee, final String databaseName) {
         ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), 
new ExecutionGroupReportContext(databaseName, grantee));
-        Process process = new Process(executionGroupContext);
+        Process process = new Process(executionGroupContext, true);
         ProcessRegistry.getInstance().add(process);
         return executionGroupContext.getReportContext().getProcessId();
     }
@@ -66,9 +66,10 @@ public final class ProcessEngine {
      */
     public void executeSQL(final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) {
         if 
(isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement()))
 {
-            
ProcessIdContext.set(executionGroupContext.getReportContext().getProcessId());
-            Process process = new Process(queryContext.getSql(), 
executionGroupContext);
-            ProcessRegistry.getInstance().add(process);
+            String processId = 
executionGroupContext.getReportContext().getProcessId();
+            boolean heldByConnection = null != 
ProcessRegistry.getInstance().get(processId) && 
ProcessRegistry.getInstance().get(processId).isHeldByConnection();
+            ProcessIdContext.set(processId);
+            ProcessRegistry.getInstance().add(new 
Process(queryContext.getSql(), executionGroupContext, heldByConnection));
         }
     }
     
@@ -95,7 +96,11 @@ public final class ProcessEngine {
         }
         ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext = new ExecutionGroupContext<>(
                 Collections.emptyList(), new 
ExecutionGroupReportContext(ProcessIdContext.get(), process.getDatabaseName(), 
new Grantee(process.getUsername(), process.getHostname())));
-        ProcessRegistry.getInstance().add(new Process(executionGroupContext));
+        if (process.isHeldByConnection()) {
+            ProcessRegistry.getInstance().add(new 
Process(executionGroupContext, true));
+        } else {
+            ProcessRegistry.getInstance().remove(ProcessIdContext.get());
+        }
         ProcessIdContext.remove();
     }
     
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
index d72828dff96..47dba29e7c6 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
@@ -45,4 +45,6 @@ public final class YamlProcess implements YamlConfiguration {
     private int completedUnitCount;
     
     private boolean idle;
+    
+    private boolean heldByConnection;
 }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
index fd98124fd53..e32f9e63b4e 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
@@ -41,12 +41,13 @@ public final class YamlProcessSwapper implements 
YamlConfigurationSwapper<YamlPr
         result.setTotalUnitCount(data.getTotalUnitCount());
         result.setCompletedUnitCount(data.getCompletedUnitCount());
         result.setIdle(data.isIdle());
+        result.setHeldByConnection(data.isHeldByConnection());
         return result;
     }
     
     @Override
     public Process swapToObject(final YamlProcess yamlConfig) {
         return new Process(yamlConfig.getId(), yamlConfig.getStartMillis(), 
yamlConfig.getSql(), yamlConfig.getDatabaseName(), yamlConfig.getUsername(), 
yamlConfig.getHostname(),
-                yamlConfig.getTotalUnitCount(), Collections.emptyList(), new 
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle());
+                yamlConfig.getTotalUnitCount(), Collections.emptyList(), new 
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle(), 
yamlConfig.isHeldByConnection());
     }
 }
diff --git 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
index c1ab8e8ed06..ab6a9f0935e 100644
--- 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
+++ 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
@@ -42,7 +42,7 @@ class YamlProcessListSwapperTest {
     void assertSwapToYamlConfiguration() {
         ExecutionGroupReportContext reportContext = new 
ExecutionGroupReportContext("foo_db", new Grantee("root", "localhost"));
         ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), 
reportContext);
-        Process process = new Process("SELECT 1", executionGroupContext);
+        Process process = new Process("SELECT 1", executionGroupContext, 
false);
         YamlProcessList actual = new 
YamlProcessListSwapper().swapToYamlConfiguration(Collections.singleton(process));
         assertThat(actual.getProcesses().size(), is(1));
         assertYamlProcessContext(actual.getProcesses().iterator().next());
@@ -57,6 +57,7 @@ class YamlProcessListSwapperTest {
         assertThat(actual.getHostname(), is("localhost"));
         assertThat(actual.getCompletedUnitCount(), is(0));
         assertThat(actual.getTotalUnitCount(), is(0));
+        assertThat(actual.isHeldByConnection(), is(false));
         assertFalse(actual.isIdle());
     }
     
@@ -80,6 +81,7 @@ class YamlProcessListSwapperTest {
         result.setTotalUnitCount(10);
         result.setCompletedUnitCount(5);
         result.setIdle(true);
+        result.setHeldByConnection(true);
         return result;
     }
     
@@ -92,6 +94,7 @@ class YamlProcessListSwapperTest {
         assertThat(actual.getHostname(), is("localhost"));
         assertThat(actual.getTotalUnitCount(), is(10));
         assertThat(actual.getCompletedUnitCount(), is(5));
+        assertThat(actual.isHeldByConnection(), is(true));
         assertTrue(actual.isIdle());
     }
 }
diff --git 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
index 6d6fe1361e6..2debede78b8 100644
--- 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
+++ 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
@@ -40,7 +40,7 @@ class YamlProcessSwapperTest {
     void assertSwapToYamlConfiguration() {
         ExecutionGroupReportContext reportContext = new 
ExecutionGroupReportContext("foo_db", new Grantee("root", "localhost"));
         ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), 
reportContext);
-        Process process = new Process("SELECT 1", executionGroupContext);
+        Process process = new Process("SELECT 1", executionGroupContext, true);
         YamlProcess actual = new 
YamlProcessSwapper().swapToYamlConfiguration(process);
         assertNotNull(actual.getId());
         assertThat(actual.getStartMillis(), 
lessThanOrEqualTo(System.currentTimeMillis()));
@@ -50,6 +50,7 @@ class YamlProcessSwapperTest {
         assertThat(actual.getHostname(), is("localhost"));
         assertThat(actual.getCompletedUnitCount(), is(0));
         assertThat(actual.getTotalUnitCount(), is(0));
+        assertThat(actual.isHeldByConnection(), is(true));
         assertFalse(actual.isIdle());
     }
     
@@ -64,6 +65,7 @@ class YamlProcessSwapperTest {
         assertThat(actual.getHostname(), is("localhost"));
         assertThat(actual.getTotalUnitCount(), is(10));
         assertThat(actual.getCompletedUnitCount(), is(5));
+        assertThat(actual.isHeldByConnection(), is(false));
         assertTrue(actual.isIdle());
     }
     
@@ -78,6 +80,7 @@ class YamlProcessSwapperTest {
         result.setTotalUnitCount(10);
         result.setCompletedUnitCount(5);
         result.setIdle(true);
+        result.setHeldByConnection(false);
         return result;
     }
 }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index c022e300b39..537b2b36714 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -119,7 +119,8 @@ class ProcessListChangedSubscriberTest {
         subscriber.reportLocalProcesses(new 
ReportLocalProcessesEvent(instanceId, processId));
         ClusterPersistRepository repository = registryCenter.getRepository();
         verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
-                "processes:" + System.lineSeparator() + "- completedUnitCount: 
0\n  id: foo_id\n  idle: false\n  startMillis: 0\n  totalUnitCount: 0" + 
System.lineSeparator());
+                "processes:" + System.lineSeparator() + "- completedUnitCount: 
0\n  heldByConnection: false\n  id: foo_id\n  idle: false\n  startMillis: 0\n  
totalUnitCount: 0"
+                        + System.lineSeparator());
         
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/" + 
instanceId + ":foo_id");
     }
     
diff --git 
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
 
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
index f1d2b4c5540..158777ad094 100644
--- 
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
+++ 
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
@@ -66,7 +66,7 @@ class ShowProcessListExecutorTest {
     
     private void setupProcesses(final ShowProcessListExecutor 
showProcessListExecutor) throws ReflectiveOperationException {
         Process process = new Process("f6c2336a-63ba-41bf-941e-2e3504eb2c80", 
1617939785160L,
-                "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id", 
"foo_db", "root", "127.0.0.1", 2, Collections.emptyList(), new 
AtomicInteger(1), false);
+                "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id", 
"foo_db", "root", "127.0.0.1", 2, Collections.emptyList(), new 
AtomicInteger(1), false, false);
         Plugins.getMemberAccessor().set(
                 
showProcessListExecutor.getClass().getDeclaredField("processes"), 
showProcessListExecutor, Collections.singleton(process));
     }

Reply via email to