This is an automated email from the ASF dual-hosted git repository.
chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new baefa134dc8 Add heldByConnection to avoid MySQL JDBC memory leak
(#28162)
baefa134dc8 is described below
commit baefa134dc8961ac3bdfdbe18edbe2f4b5f9902f
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Fri Aug 18 17:57:07 2023 +0800
Add heldByConnection to avoid MySQL JDBC memory leak (#28162)
* Add heldByConnection to avoid MySQL JDBC memory leak
* fix unit test
* optimize checkstyle
---
.../infra/executor/sql/process/Process.java | 13 ++++++++-----
.../infra/executor/sql/process/ProcessEngine.java | 17 +++++++++++------
.../infra/executor/sql/process/yaml/YamlProcess.java | 2 ++
.../sql/process/yaml/swapper/YamlProcessSwapper.java | 3 ++-
.../yaml/swapper/YamlProcessListSwapperTest.java | 5 ++++-
.../process/yaml/swapper/YamlProcessSwapperTest.java | 5 ++++-
.../subscriber/ProcessListChangedSubscriberTest.java | 3 ++-
.../admin/executor/ShowProcessListExecutorTest.java | 2 +-
8 files changed, 34 insertions(+), 16 deletions(-)
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
index a5f43d2ec3a..8597a563fc4 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
@@ -57,15 +57,17 @@ public final class Process {
private final boolean idle;
- public Process(final ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext) {
- this("", executionGroupContext, true);
+ private final boolean heldByConnection;
+
+ public Process(final ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext, final boolean heldByConnection) {
+ this("", executionGroupContext, true, heldByConnection);
}
- public Process(final String sql, final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext) {
- this(sql, executionGroupContext, false);
+ public Process(final String sql, final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext, final boolean heldByConnection) {
+ this(sql, executionGroupContext, false, heldByConnection);
}
- private Process(final String sql, final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext, final boolean idle) {
+ private Process(final String sql, final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext, final boolean idle, final boolean
heldByConnection) {
id = executionGroupContext.getReportContext().getProcessId();
startMillis = System.currentTimeMillis();
this.sql = sql;
@@ -77,6 +79,7 @@ public final class Process {
processStatements = getProcessStatements(executionGroupContext);
completedUnitCount = new AtomicInteger(0);
this.idle = idle;
+ this.heldByConnection = heldByConnection;
}
private int getTotalUnitCount(final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext) {
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index 154b83f3aa0..62d708cb0a7 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -17,11 +17,11 @@
package org.apache.shardingsphere.infra.executor.sql.process;
-import org.apache.shardingsphere.infra.session.query.QueryContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
@@ -43,7 +43,7 @@ public final class ProcessEngine {
*/
public String connect(final Grantee grantee, final String databaseName) {
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(),
new ExecutionGroupReportContext(databaseName, grantee));
- Process process = new Process(executionGroupContext);
+ Process process = new Process(executionGroupContext, true);
ProcessRegistry.getInstance().add(process);
return executionGroupContext.getReportContext().getProcessId();
}
@@ -66,9 +66,10 @@ public final class ProcessEngine {
*/
public void executeSQL(final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) {
if
(isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement()))
{
-
ProcessIdContext.set(executionGroupContext.getReportContext().getProcessId());
- Process process = new Process(queryContext.getSql(),
executionGroupContext);
- ProcessRegistry.getInstance().add(process);
+ String processId =
executionGroupContext.getReportContext().getProcessId();
+ boolean heldByConnection = null !=
ProcessRegistry.getInstance().get(processId) &&
ProcessRegistry.getInstance().get(processId).isHeldByConnection();
+ ProcessIdContext.set(processId);
+ ProcessRegistry.getInstance().add(new
Process(queryContext.getSql(), executionGroupContext, heldByConnection));
}
}
@@ -95,7 +96,11 @@ public final class ProcessEngine {
}
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(
Collections.emptyList(), new
ExecutionGroupReportContext(ProcessIdContext.get(), process.getDatabaseName(),
new Grantee(process.getUsername(), process.getHostname())));
- ProcessRegistry.getInstance().add(new Process(executionGroupContext));
+ if (process.isHeldByConnection()) {
+ ProcessRegistry.getInstance().add(new
Process(executionGroupContext, true));
+ } else {
+ ProcessRegistry.getInstance().remove(ProcessIdContext.get());
+ }
ProcessIdContext.remove();
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
index d72828dff96..47dba29e7c6 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
@@ -45,4 +45,6 @@ public final class YamlProcess implements YamlConfiguration {
private int completedUnitCount;
private boolean idle;
+
+ private boolean heldByConnection;
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
index fd98124fd53..e32f9e63b4e 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
@@ -41,12 +41,13 @@ public final class YamlProcessSwapper implements
YamlConfigurationSwapper<YamlPr
result.setTotalUnitCount(data.getTotalUnitCount());
result.setCompletedUnitCount(data.getCompletedUnitCount());
result.setIdle(data.isIdle());
+ result.setHeldByConnection(data.isHeldByConnection());
return result;
}
@Override
public Process swapToObject(final YamlProcess yamlConfig) {
return new Process(yamlConfig.getId(), yamlConfig.getStartMillis(),
yamlConfig.getSql(), yamlConfig.getDatabaseName(), yamlConfig.getUsername(),
yamlConfig.getHostname(),
- yamlConfig.getTotalUnitCount(), Collections.emptyList(), new
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle());
+ yamlConfig.getTotalUnitCount(), Collections.emptyList(), new
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle(),
yamlConfig.isHeldByConnection());
}
}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
index c1ab8e8ed06..ab6a9f0935e 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
@@ -42,7 +42,7 @@ class YamlProcessListSwapperTest {
void assertSwapToYamlConfiguration() {
ExecutionGroupReportContext reportContext = new
ExecutionGroupReportContext("foo_db", new Grantee("root", "localhost"));
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(),
reportContext);
- Process process = new Process("SELECT 1", executionGroupContext);
+ Process process = new Process("SELECT 1", executionGroupContext,
false);
YamlProcessList actual = new
YamlProcessListSwapper().swapToYamlConfiguration(Collections.singleton(process));
assertThat(actual.getProcesses().size(), is(1));
assertYamlProcessContext(actual.getProcesses().iterator().next());
@@ -57,6 +57,7 @@ class YamlProcessListSwapperTest {
assertThat(actual.getHostname(), is("localhost"));
assertThat(actual.getCompletedUnitCount(), is(0));
assertThat(actual.getTotalUnitCount(), is(0));
+ assertThat(actual.isHeldByConnection(), is(false));
assertFalse(actual.isIdle());
}
@@ -80,6 +81,7 @@ class YamlProcessListSwapperTest {
result.setTotalUnitCount(10);
result.setCompletedUnitCount(5);
result.setIdle(true);
+ result.setHeldByConnection(true);
return result;
}
@@ -92,6 +94,7 @@ class YamlProcessListSwapperTest {
assertThat(actual.getHostname(), is("localhost"));
assertThat(actual.getTotalUnitCount(), is(10));
assertThat(actual.getCompletedUnitCount(), is(5));
+ assertThat(actual.isHeldByConnection(), is(true));
assertTrue(actual.isIdle());
}
}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
index 6d6fe1361e6..2debede78b8 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
@@ -40,7 +40,7 @@ class YamlProcessSwapperTest {
void assertSwapToYamlConfiguration() {
ExecutionGroupReportContext reportContext = new
ExecutionGroupReportContext("foo_db", new Grantee("root", "localhost"));
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(),
reportContext);
- Process process = new Process("SELECT 1", executionGroupContext);
+ Process process = new Process("SELECT 1", executionGroupContext, true);
YamlProcess actual = new
YamlProcessSwapper().swapToYamlConfiguration(process);
assertNotNull(actual.getId());
assertThat(actual.getStartMillis(),
lessThanOrEqualTo(System.currentTimeMillis()));
@@ -50,6 +50,7 @@ class YamlProcessSwapperTest {
assertThat(actual.getHostname(), is("localhost"));
assertThat(actual.getCompletedUnitCount(), is(0));
assertThat(actual.getTotalUnitCount(), is(0));
+ assertThat(actual.isHeldByConnection(), is(true));
assertFalse(actual.isIdle());
}
@@ -64,6 +65,7 @@ class YamlProcessSwapperTest {
assertThat(actual.getHostname(), is("localhost"));
assertThat(actual.getTotalUnitCount(), is(10));
assertThat(actual.getCompletedUnitCount(), is(5));
+ assertThat(actual.isHeldByConnection(), is(false));
assertTrue(actual.isIdle());
}
@@ -78,6 +80,7 @@ class YamlProcessSwapperTest {
result.setTotalUnitCount(10);
result.setCompletedUnitCount(5);
result.setIdle(true);
+ result.setHeldByConnection(false);
return result;
}
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index c022e300b39..537b2b36714 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -119,7 +119,8 @@ class ProcessListChangedSubscriberTest {
subscriber.reportLocalProcesses(new
ReportLocalProcessesEvent(instanceId, processId));
ClusterPersistRepository repository = registryCenter.getRepository();
verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
- "processes:" + System.lineSeparator() + "- completedUnitCount:
0\n id: foo_id\n idle: false\n startMillis: 0\n totalUnitCount: 0" +
System.lineSeparator());
+ "processes:" + System.lineSeparator() + "- completedUnitCount:
0\n heldByConnection: false\n id: foo_id\n idle: false\n startMillis: 0\n
totalUnitCount: 0"
+ + System.lineSeparator());
verify(repository).delete("/nodes/compute_nodes/show_process_list_trigger/" +
instanceId + ":foo_id");
}
diff --git
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
index f1d2b4c5540..158777ad094 100644
---
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
+++
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
@@ -66,7 +66,7 @@ class ShowProcessListExecutorTest {
private void setupProcesses(final ShowProcessListExecutor
showProcessListExecutor) throws ReflectiveOperationException {
Process process = new Process("f6c2336a-63ba-41bf-941e-2e3504eb2c80",
1617939785160L,
- "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id",
"foo_db", "root", "127.0.0.1", 2, Collections.emptyList(), new
AtomicInteger(1), false);
+ "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id",
"foo_db", "root", "127.0.0.1", 2, Collections.emptyList(), new
AtomicInteger(1), false, false);
Plugins.getMemberAccessor().set(
showProcessListExecutor.getClass().getDeclaredField("processes"),
showProcessListExecutor, Collections.singleton(process));
}