This is an automated email from the ASF dual-hosted git repository. panjuan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new bea3c69a112 Refactor ProcessContext.completeExecutionUnit (#25417) bea3c69a112 is described below commit bea3c69a112f42778a0971ef5b27c9f554bd7e9c Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Sun Apr 30 13:18:19 2023 +0800 Refactor ProcessContext.completeExecutionUnit (#25417) --- .../engine/driver/jdbc/JDBCExecutorCallback.java | 2 +- .../raw/callback/RawSQLExecutorCallback.java | 4 +- .../infra/executor/sql/process/ProcessContext.java | 8 ++- .../infra/executor/sql/process/ProcessEngine.java | 6 +-- .../executor/sql/process/ProcessReporter.java | 5 +- .../sql/process/ProcessReporterCleaner.java | 2 +- .../sql/process/ShowProcessListManager.java | 58 +++++++++++----------- .../executor/sql/process/ProcessReporterTest.java | 2 +- .../sql/process/DriverProcessReporterCleaner.java | 2 +- .../subscriber/ProcessListChangedSubscriber.java | 2 +- .../sql/process/ProxyProcessReporterCleaner.java | 2 +- 11 files changed, 44 insertions(+), 49 deletions(-) diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java index eb26d4003a5..ce41216f46a 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java @@ -84,7 +84,7 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread); T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType); sqlExecutionHook.finishSuccess(); - new ProcessEngine().finishExecution(1); + new ProcessEngine().finishExecution(); return result; } catch (final SQLException ex) { if (!storageType.equals(protocolType)) { diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java index 9aedb8f4f1d..3adfcde0ac5 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java @@ -46,7 +46,9 @@ public final class RawSQLExecutorCallback implements ExecutorCallback<RawSQLExec public Collection<ExecuteResult> execute(final Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread) throws SQLException { Collection<ExecuteResult> result = callbacks.iterator().next().execute(inputs, isTrunkThread); if (!ExecuteIDContext.isEmpty()) { - new ProcessEngine().finishExecution(inputs.size()); + for (int i = 0; i < inputs.size(); i++) { + new ProcessEngine().finishExecution(); + } } return result; } diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java index dcb5b939475..f40ef34967f 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessContext.java @@ -90,12 +90,10 @@ public final class ProcessContext { } /** - * Complete execution units. - * - * @param completedExecutionUnitCount completed execution unit count + * Complete execution unit. */ - public void completeExecutionUnits(final int completedExecutionUnitCount) { - completedUnitCount.addAndGet(completedExecutionUnitCount); + public void completeExecutionUnit() { + completedUnitCount.incrementAndGet(); } /** diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java index 46ab40cb4dc..23cd0e1d098 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java @@ -68,14 +68,12 @@ public final class ProcessEngine { /** * Finish execution. - * - * @param completedExecutionUnitCount completed execution unit count */ - public void finishExecution(final int completedExecutionUnitCount) { + public void finishExecution() { if (ExecuteIDContext.isEmpty()) { return; } - reporter.reportComplete(ExecuteIDContext.get(), completedExecutionUnitCount); + reporter.reportComplete(ExecuteIDContext.get()); } /** diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java index e5c70a88e95..02d76033117 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java @@ -61,10 +61,9 @@ public final class ProcessReporter { * Report complete execution unit. * * @param executionID execution ID - * @param completedExecutionUnitCount completed execution unit count */ - public void reportComplete(final String executionID, final int completedExecutionUnitCount) { - ShowProcessListManager.getInstance().getProcessContext(executionID).completeExecutionUnits(completedExecutionUnitCount); + public void reportComplete(final String executionID) { + ShowProcessListManager.getInstance().getProcessContext(executionID).completeExecutionUnit(); } /** diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterCleaner.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterCleaner.java index c87fec8623a..6b7ff6dba91 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterCleaner.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterCleaner.java @@ -28,7 +28,7 @@ public interface ProcessReporterCleaner { /** * Reset reporter. * - * @param context execute process context + * @param context process context */ void reset(ProcessContext context); } diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java index 4075f48592a..7d14297168b 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ShowProcessListManager.java @@ -32,17 +32,15 @@ import java.util.concurrent.ConcurrentHashMap; * Show process list manager. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) +@Getter public final class ShowProcessListManager { private static final ShowProcessListManager INSTANCE = new ShowProcessListManager(); - @Getter private final Map<String, ProcessContext> processContexts = new ConcurrentHashMap<>(); - @Getter private final Map<String, Collection<Statement>> processStatements = new ConcurrentHashMap<>(); - @Getter private final Map<String, ShowProcessListSimpleLock> locks = new ConcurrentHashMap<>(); /** @@ -55,72 +53,72 @@ public final class ShowProcessListManager { } /** - * Put execute process context. + * Put process context. * - * @param executionId execution id + * @param executionID execution ID * @param processContext process context */ - public void putProcessContext(final String executionId, final ProcessContext processContext) { - processContexts.put(executionId, processContext); + public void putProcessContext(final String executionID, final ProcessContext processContext) { + processContexts.put(executionID, processContext); } /** * Put process statements. * - * @param executionId execution id + * @param executionID execution ID * @param statements statements */ - public void putProcessStatement(final String executionId, final Collection<Statement> statements) { + public void putProcessStatement(final String executionID, final Collection<Statement> statements) { if (statements.isEmpty()) { return; } - processStatements.put(executionId, statements); + processStatements.put(executionID, statements); } /** - * Get execute process context. + * Get process context. * - * @param executionId execution id - * @return execute process context + * @param executionID execution ID + * @return process context */ - public ProcessContext getProcessContext(final String executionId) { - return processContexts.get(executionId); + public ProcessContext getProcessContext(final String executionID) { + return processContexts.get(executionID); } /** - * Get execute process statement. + * Get process statement. * - * @param executionId execution id + * @param executionID execution ID * @return execute statements */ - public Collection<Statement> getProcessStatement(final String executionId) { - return processStatements.getOrDefault(executionId, Collections.emptyList()); + public Collection<Statement> getProcessStatement(final String executionID) { + return processStatements.getOrDefault(executionID, Collections.emptyList()); } /** - * Remove execute process context. + * Remove process context. * - * @param executionId execution id + * @param executionID execution ID */ - public void removeProcessContext(final String executionId) { - processContexts.remove(executionId); + public void removeProcessContext(final String executionID) { + processContexts.remove(executionID); } /** - * Remove execute process statement. + * Remove process statement. * - * @param executionId execution id + * @param executionID execution ID */ - public void removeProcessStatement(final String executionId) { - processStatements.remove(executionId); + public void removeProcessStatement(final String executionID) { + processStatements.remove(executionID); } /** - * Get all execute process context. + * Get all process contexts. * - * @return collection execute process context + * @return all process contexts */ - public Collection<ProcessContext> getAllProcessContext() { + public Collection<ProcessContext> getAllProcessContexts() { return processContexts.values(); } } diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java index 5fdbf0890d5..4642b82ef2d 100644 --- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java +++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java @@ -67,7 +67,7 @@ class ProcessReporterTest { @Test void assertReportUnit() { when(showProcessListManager.getProcessContext("foo_id")).thenReturn(mock(ProcessContext.class)); - new ProcessReporter().reportComplete("foo_id", 1); + new ProcessReporter().reportComplete("foo_id"); verify(showProcessListManager).getProcessContext("foo_id"); } diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/sql/process/DriverProcessReporterCleaner.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/sql/process/DriverProcessReporterCleaner.java index cbe8338ee34..033e72ae8ea 100644 --- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/sql/process/DriverProcessReporterCleaner.java +++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/sql/process/DriverProcessReporterCleaner.java @@ -22,7 +22,7 @@ import org.apache.shardingsphere.infra.executor.sql.process.ShowProcessListManag import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext; /** - * Execute process reporter cleaner for driver. + * Process reporter cleaner for driver. */ public final class DriverProcessReporterCleaner implements ProcessReporterCleaner { diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java index bd8245b123c..890a7507c09 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java @@ -64,7 +64,7 @@ public final class ProcessListChangedSubscriber { if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) { return; } - Collection<ProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext(); + Collection<ProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContexts(); if (!processContexts.isEmpty()) { registryCenter.getRepository().persist( ProcessNode.getProcessListInstancePath(event.getProcessId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processContexts))); diff --git a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/executor/sql/process/ProxyProcessReporterCleaner.java b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/executor/sql/process/ProxyProcessReporterCleaner.java index c0889d52f11..f6b3203df35 100644 --- a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/executor/sql/process/ProxyProcessReporterCleaner.java +++ b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/executor/sql/process/ProxyProcessReporterCleaner.java @@ -21,7 +21,7 @@ import org.apache.shardingsphere.infra.executor.sql.process.ProcessReporterClean import org.apache.shardingsphere.infra.executor.sql.process.ProcessContext; /** - * Execute process reporter cleaner for proxy. + * Process reporter cleaner for proxy. */ public final class ProxyProcessReporterCleaner implements ProcessReporterCleaner {