This is an automated email from the ASF dual-hosted git repository. totalo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 3d421ac95d5 Merge ProcessReporter and ProcessEngine (#25427) 3d421ac95d5 is described below commit 3d421ac95d5680efa8d8d1e1b5fa9ba1c7141bd6 Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Tue May 2 01:23:29 2023 +0800 Merge ProcessReporter and ProcessEngine (#25427) --- .../engine/driver/jdbc/JDBCExecutorCallback.java | 2 +- .../sql/execute/engine/raw/RawExecutor.java | 4 +- .../raw/callback/RawSQLExecutorCallback.java | 6 +- .../infra/executor/sql/process/ProcessEngine.java | 55 ++++++++----- ...ExecuteIDContext.java => ProcessIDContext.java} | 29 +++---- .../executor/sql/process/ProcessReporter.java | 91 ---------------------- ...essReporterTest.java => ProcessEngineTest.java} | 30 ++++--- ...DContextTest.java => ProcessIDContextTest.java} | 36 ++++----- .../driver/executor/DriverJDBCExecutor.java | 12 +-- .../executor/FilterableTableScanExecutor.java | 4 +- .../executor/TranslatableTableScanExecutor.java | 8 +- .../connector/jdbc/executor/ProxyJDBCExecutor.java | 4 +- .../netty/FrontendChannelInboundHandler.java | 4 +- 13 files changed, 109 insertions(+), 176 deletions(-) diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java index 2113466b73e..7f3704f1d38 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java @@ -86,7 +86,7 @@ public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCEx sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread); T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType); sqlExecutionHook.finishSuccess(); - processEngine.finishExecution(); + processEngine.completeSQLUnitExecution(); return result; } catch (final SQLException ex) { if (!storageType.equals(protocolType)) { diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java index b481e08d80c..6cfdddcfafb 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java @@ -57,12 +57,12 @@ public final class RawExecutor { public List<ExecuteResult> execute(final ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext, final QueryContext queryContext, final RawSQLExecutorCallback callback) throws SQLException { try { - processEngine.initializeExecution(executionGroupContext, queryContext); + processEngine.executeSQL(executionGroupContext, queryContext); // TODO Load query header for first query List<ExecuteResult> results = execute(executionGroupContext, (RawSQLExecutorCallback) null, callback); return results.isEmpty() || Objects.isNull(results.get(0)) ? Collections.singletonList(new UpdateResult(0, 0L)) : results; } finally { - processEngine.cleanExecution(); + processEngine.completeSQLExecution(); } } diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java index 01a3880d6c3..8edc78144ee 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java @@ -21,7 +21,7 @@ import com.google.common.base.Preconditions; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback; import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit; import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult; -import org.apache.shardingsphere.infra.executor.sql.process.ExecuteIDContext; +import org.apache.shardingsphere.infra.executor.sql.process.ProcessIDContext; import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine; import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader; @@ -47,9 +47,9 @@ public final class RawSQLExecutorCallback implements ExecutorCallback<RawSQLExec @Override public Collection<ExecuteResult> execute(final Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread) throws SQLException { Collection<ExecuteResult> result = callbacks.iterator().next().execute(inputs, isTrunkThread); - if (!ExecuteIDContext.isEmpty()) { + if (!ProcessIDContext.isEmpty()) { for (int i = 0; i < inputs.size(); i++) { - processEngine.finishExecution(); + processEngine.completeSQLUnitExecution(); } } return result; diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java index f745b07e19c..10a322445df 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java @@ -19,72 +19,85 @@ package org.apache.shardingsphere.infra.executor.sql.process; import org.apache.shardingsphere.infra.binder.QueryContext; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext; +import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext; import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit; import org.apache.shardingsphere.infra.metadata.user.Grantee; +import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader; import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement; import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement; import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLStatement; +import java.util.Collections; + /** * Process engine. */ public final class ProcessEngine { - private final ProcessReporter reporter = new ProcessReporter(); - /** - * Initialize connection. + * Connect. * * @param grantee grantee * @param databaseName database name * @return process ID */ - public String initializeConnection(final Grantee grantee, final String databaseName) { - return reporter.reportConnect(grantee, databaseName); + public String connect(final Grantee grantee, final String databaseName) { + ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), new ExecutionGroupReportContext(databaseName, grantee)); + ProcessContext processContext = new ProcessContext(executionGroupContext); + ProcessRegistry.getInstance().putProcessContext(processContext.getId(), processContext); + return executionGroupContext.getReportContext().getProcessID(); } /** - * Finish connection. + * Disconnect. * * @param processID process ID */ - public void finishConnection(final String processID) { - reporter.remove(processID); + public void disconnect(final String processID) { + ProcessRegistry.getInstance().removeProcessContext(processID); + } /** - * Initialize execution. + * Execute SQL. * * @param executionGroupContext execution group context * @param queryContext query context */ - public void initializeExecution(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) { + public void executeSQL(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) { if (isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement())) { - ExecuteIDContext.set(executionGroupContext.getReportContext().getProcessID()); - reporter.reportExecute(queryContext, executionGroupContext); + ProcessIDContext.set(executionGroupContext.getReportContext().getProcessID()); + ProcessContext processContext = new ProcessContext(queryContext.getSql(), executionGroupContext); + ProcessRegistry.getInstance().putProcessContext(processContext.getId(), processContext); } } /** - * Finish execution. + * Complete SQL unit execution. */ - public void finishExecution() { - if (ExecuteIDContext.isEmpty()) { + public void completeSQLUnitExecution() { + if (ProcessIDContext.isEmpty()) { return; } - reporter.reportComplete(ExecuteIDContext.get()); + ProcessRegistry.getInstance().getProcessContext(ProcessIDContext.get()).completeExecutionUnit(); } /** - * Clean execution. + * Complete SQL execution. */ - public void cleanExecution() { - if (ExecuteIDContext.isEmpty()) { + public void completeSQLExecution() { + if (ProcessIDContext.isEmpty()) { return; } - reporter.reset(ExecuteIDContext.get()); - ExecuteIDContext.remove(); + ProcessContext context = ProcessRegistry.getInstance().getProcessContext(ProcessIDContext.get()); + if (null == context) { + return; + } + for (ProcessReporterCleaner each : ShardingSphereServiceLoader.getServiceInstances(ProcessReporterCleaner.class)) { + each.reset(context); + } + ProcessIDContext.remove(); } private boolean isMySQLDDLOrDMLStatement(final SQLStatement sqlStatement) { diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContext.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContext.java similarity index 68% rename from infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContext.java rename to infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContext.java index e1f5bfbf18e..c444515ec9d 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContext.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContext.java @@ -21,45 +21,46 @@ import com.alibaba.ttl.TransmittableThreadLocal; import lombok.AccessLevel; import lombok.NoArgsConstructor; +// TODO should remove the class, process ID should same with connection ID /** - * Execute ID context. + * Process ID context. */ @NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class ExecuteIDContext { +public final class ProcessIDContext { - private static final TransmittableThreadLocal<String> EXECUTE_ID = new TransmittableThreadLocal<>(); + private static final TransmittableThreadLocal<String> PROCESS_ID = new TransmittableThreadLocal<>(); /** - * Judge whether execute ID is empty or not. + * Judge whether process ID is empty or not. * - * @return whether execute ID is empty or not + * @return whether process ID is empty or not */ public static boolean isEmpty() { - return null == EXECUTE_ID.get(); + return null == PROCESS_ID.get(); } /** - * Get execute ID. + * Get process ID. * - * @return execute ID + * @return process ID */ public static String get() { - return EXECUTE_ID.get(); + return PROCESS_ID.get(); } /** - * Set execute ID. + * Set process ID. * - * @param executeId execute ID + * @param executeId process ID */ public static void set(final String executeId) { - EXECUTE_ID.set(executeId); + PROCESS_ID.set(executeId); } /** - * Remove execute ID. + * Remove process ID. */ public static void remove() { - EXECUTE_ID.remove(); + PROCESS_ID.remove(); } } diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java deleted file mode 100644 index bbb5e2510f9..00000000000 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporter.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.infra.executor.sql.process; - -import org.apache.shardingsphere.infra.binder.QueryContext; -import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext; -import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit; -import org.apache.shardingsphere.infra.metadata.user.Grantee; -import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader; - -import java.util.Collections; - -/** - * Process report. - */ -public final class ProcessReporter { - - /** - * Report connect. - * - * @param grantee grantee - * @param databaseName databaseName - * @return process ID - */ - public String reportConnect(final Grantee grantee, final String databaseName) { - ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), new ExecutionGroupReportContext(databaseName, grantee)); - ProcessContext processContext = new ProcessContext(executionGroupContext); - ProcessRegistry.getInstance().putProcessContext(processContext.getId(), processContext); - return executionGroupContext.getReportContext().getProcessID(); - } - - /** - * Report execute. - * - * @param queryContext query context - * @param executionGroupContext execution group context - */ - public void reportExecute(final QueryContext queryContext, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) { - ProcessContext processContext = new ProcessContext(queryContext.getSql(), executionGroupContext); - ProcessRegistry.getInstance().putProcessContext(processContext.getId(), processContext); - } - - /** - * Report complete execution unit. - * - * @param processID process ID - */ - public void reportComplete(final String processID) { - ProcessRegistry.getInstance().getProcessContext(processID).completeExecutionUnit(); - } - - /** - * Reset report. - * - * @param processID process ID - */ - public void reset(final String processID) { - ProcessContext context = ProcessRegistry.getInstance().getProcessContext(processID); - if (null == context) { - return; - } - for (ProcessReporterCleaner each : ShardingSphereServiceLoader.getServiceInstances(ProcessReporterCleaner.class)) { - each.reset(context); - } - } - - /** - * Remove process context. - * - * @param processID process ID - */ - public void remove(final String processID) { - ProcessRegistry.getInstance().removeProcessContext(processID); - } -} diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java similarity index 69% rename from infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java rename to infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java index 6a53ce5185a..2f8fdf05683 100644 --- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessReporterTest.java +++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java @@ -18,9 +18,15 @@ package org.apache.shardingsphere.infra.executor.sql.process; import org.apache.shardingsphere.infra.binder.QueryContext; +import org.apache.shardingsphere.infra.binder.statement.dml.UpdateStatementContext; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext; import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit; +import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.assignment.SetAssignmentSegment; +import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment; +import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment; +import org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue; +import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLUpdateStatement; import org.apache.shardingsphere.test.mock.AutoMockExtension; import org.apache.shardingsphere.test.mock.StaticMockSettings; import org.junit.jupiter.api.BeforeEach; @@ -28,6 +34,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import java.util.Collections; import java.util.UUID; import static org.mockito.ArgumentMatchers.any; @@ -38,7 +45,7 @@ import static org.mockito.Mockito.when; @ExtendWith(AutoMockExtension.class) @StaticMockSettings(ProcessRegistry.class) -class ProcessReporterTest { +class ProcessEngineTest { @Mock private ProcessRegistry processRegistry; @@ -49,9 +56,9 @@ class ProcessReporterTest { } @Test - void assertReportExecute() { + void assertExecuteSQL() { ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext = mockExecutionGroupContext(); - new ProcessReporter().reportExecute(new QueryContext(null, null, null), executionGroupContext); + new ProcessEngine().executeSQL(executionGroupContext, new QueryContext(new UpdateStatementContext(getSQLStatement()), null, null)); verify(processRegistry).putProcessContext(eq(executionGroupContext.getReportContext().getProcessID()), any()); } @@ -64,16 +71,19 @@ class ProcessReporterTest { return result; } - @Test - void assertReportUnit() { - when(processRegistry.getProcessContext("foo_id")).thenReturn(mock(ProcessContext.class)); - new ProcessReporter().reportComplete("foo_id"); - verify(processRegistry).getProcessContext("foo_id"); + private MySQLUpdateStatement getSQLStatement() { + MySQLUpdateStatement result = new MySQLUpdateStatement(); + result.setTable(new SimpleTableSegment(new TableNameSegment(0, 0, new IdentifierValue("foo_tbl")))); + result.setSetAssignment(new SetAssignmentSegment(0, 0, Collections.emptyList())); + return result; } @Test - void assertReportClean() { + void assertCompleteSQLUnitExecution() { + ProcessIDContext.set("foo_id"); when(processRegistry.getProcessContext("foo_id")).thenReturn(mock(ProcessContext.class)); - new ProcessReporter().reset("foo_id"); + new ProcessEngine().completeSQLUnitExecution(); + verify(processRegistry).getProcessContext("foo_id"); + ProcessIDContext.remove(); } } diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContextTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContextTest.java similarity index 61% rename from infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContextTest.java rename to infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContextTest.java index d6f108b747d..a0e61c6c793 100644 --- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteIDContextTest.java +++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIDContextTest.java @@ -26,42 +26,42 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -class ExecuteIDContextTest { +class ProcessIDContextTest { @AfterEach void tearDown() { - ExecuteIDContext.remove(); + ProcessIDContext.remove(); } @Test void assertIsEmpty() { - assertTrue(ExecuteIDContext.isEmpty()); - ExecuteIDContext.set("123e4567e89b12d3a456426655440000"); - assertFalse(ExecuteIDContext.isEmpty()); + assertTrue(ProcessIDContext.isEmpty()); + ProcessIDContext.set("123e4567e89b12d3a456426655440000"); + assertFalse(ProcessIDContext.isEmpty()); } @Test void assertGet() { - assertNull(ExecuteIDContext.get()); - ExecuteIDContext.set("123e4567e89b12d3a456426655440000"); - assertThat(ExecuteIDContext.get(), is("123e4567e89b12d3a456426655440000")); + assertNull(ProcessIDContext.get()); + ProcessIDContext.set("123e4567e89b12d3a456426655440000"); + assertThat(ProcessIDContext.get(), is("123e4567e89b12d3a456426655440000")); } @Test void assertSet() { - assertNull(ExecuteIDContext.get()); - ExecuteIDContext.set("123e4567e89b12d3a456426655440000"); - assertThat(ExecuteIDContext.get(), is("123e4567e89b12d3a456426655440000")); - ExecuteIDContext.set("123e4567e89b12d3a456426655440001"); - assertThat(ExecuteIDContext.get(), is("123e4567e89b12d3a456426655440001")); + assertNull(ProcessIDContext.get()); + ProcessIDContext.set("123e4567e89b12d3a456426655440000"); + assertThat(ProcessIDContext.get(), is("123e4567e89b12d3a456426655440000")); + ProcessIDContext.set("123e4567e89b12d3a456426655440001"); + assertThat(ProcessIDContext.get(), is("123e4567e89b12d3a456426655440001")); } @Test void assertRemove() { - assertNull(ExecuteIDContext.get()); - ExecuteIDContext.set("123e4567e89b12d3a456426655440000"); - assertThat(ExecuteIDContext.get(), is("123e4567e89b12d3a456426655440000")); - ExecuteIDContext.remove(); - assertNull(ExecuteIDContext.get()); + assertNull(ProcessIDContext.get()); + ProcessIDContext.set("123e4567e89b12d3a456426655440000"); + assertThat(ProcessIDContext.get(), is("123e4567e89b12d3a456426655440000")); + ProcessIDContext.remove(); + assertNull(ProcessIDContext.get()); } } diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java index 9633ac77d09..c185f9d7455 100644 --- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java +++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java @@ -72,10 +72,10 @@ public final class DriverJDBCExecutor { public List<QueryResult> executeQuery(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final QueryContext queryContext, final ExecuteQueryCallback callback) throws SQLException { try { - processEngine.initializeExecution(executionGroupContext, queryContext); + processEngine.executeSQL(executionGroupContext, queryContext); return jdbcExecutor.execute(executionGroupContext, callback); } finally { - processEngine.cleanExecution(); + processEngine.completeSQLExecution(); } } @@ -92,12 +92,12 @@ public final class DriverJDBCExecutor { public int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final QueryContext queryContext, final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Integer> callback) throws SQLException { try { - processEngine.initializeExecution(executionGroupContext, queryContext); + processEngine.executeSQL(executionGroupContext, queryContext); SQLStatementContext<?> sqlStatementContext = queryContext.getSqlStatementContext(); List<Integer> results = doExecute(executionGroupContext, sqlStatementContext, routeUnits, callback); return isNeedAccumulate(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(), sqlStatementContext) ? accumulate(results) : results.get(0); } finally { - processEngine.cleanExecution(); + processEngine.completeSQLExecution(); } } @@ -131,11 +131,11 @@ public final class DriverJDBCExecutor { public boolean execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final QueryContext queryContext, final Collection<RouteUnit> routeUnits, final JDBCExecutorCallback<Boolean> callback) throws SQLException { try { - processEngine.initializeExecution(executionGroupContext, queryContext); + processEngine.executeSQL(executionGroupContext, queryContext); List<Boolean> results = doExecute(executionGroupContext, queryContext.getSqlStatementContext(), routeUnits, callback); return null != results && !results.isEmpty() && null != results.get(0) && results.get(0); } finally { - processEngine.cleanExecution(); + processEngine.completeSQLExecution(); } } diff --git a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java index 8b7948d8b6d..537df40f871 100644 --- a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java +++ b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java @@ -155,7 +155,7 @@ public final class FilterableTableScanExecutor implements TableScanExecutor { ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName())); setParameters(executionGroupContext.getInputGroups()); - processEngine.initializeExecution(executionGroupContext, context.getQueryContext()); + processEngine.executeSQL(executionGroupContext, context.getQueryContext()); List<QueryResult> queryResults = execute(executionGroupContext, databaseType); // TODO need to get session context MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext()); @@ -165,7 +165,7 @@ public final class FilterableTableScanExecutor implements TableScanExecutor { } catch (final SQLException ex) { throw new SQLWrapperException(ex); } finally { - processEngine.cleanExecution(); + processEngine.completeSQLExecution(); } } diff --git a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java index b30fb712a7b..461e1549b2f 100644 --- a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java +++ b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java @@ -181,7 +181,7 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor { ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName())); setParameters(executionGroupContext.getInputGroups()); - processEngine.initializeExecution(executionGroupContext, context.getQueryContext()); + processEngine.executeSQL(executionGroupContext, context.getQueryContext()); List<QueryResult> queryResults = execute(executionGroupContext, databaseType); MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext()); MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext()); @@ -190,7 +190,7 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor { } catch (final SQLException ex) { throw new SQLWrapperException(ex); } finally { - processEngine.cleanExecution(); + processEngine.completeSQLExecution(); } } @@ -240,7 +240,7 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor { ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits(), new ExecutionGroupReportContext(database.getName())); setParameters(executionGroupContext.getInputGroups()); - processEngine.initializeExecution(executionGroupContext, context.getQueryContext()); + processEngine.executeSQL(executionGroupContext, context.getQueryContext()); List<QueryResult> queryResults = execute(executionGroupContext, databaseType); MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext()); MergedResult mergedResult = mergeEngine.merge(queryResults, queryContext.getSqlStatementContext()); @@ -249,7 +249,7 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor { } catch (final SQLException ex) { throw new SQLWrapperException(ex); } finally { - processEngine.cleanExecution(); + processEngine.completeSQLExecution(); } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java index 64eb74db2c7..1f98a935e90 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java @@ -70,7 +70,7 @@ public final class ProxyJDBCExecutor { ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName()); DatabaseType protocolType = database.getProtocolType(); Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes(); - processEngine.initializeExecution(executionGroupContext, queryContext); + processEngine.executeSQL(executionGroupContext, queryContext); SQLStatementContext<?> context = queryContext.getSqlStatementContext(); return jdbcExecutor.execute(executionGroupContext, ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, storageTypes, context.getSqlStatement(), databaseConnector, isReturnGeneratedKeys, isExceptionThrown, @@ -78,7 +78,7 @@ public final class ProxyJDBCExecutor { ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, storageTypes, context.getSqlStatement(), databaseConnector, isReturnGeneratedKeys, isExceptionThrown, false)); } finally { - processEngine.cleanExecution(); + processEngine.completeSQLExecution(); } } } diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java index 7388694be47..d046b9dcc5b 100644 --- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java @@ -82,7 +82,7 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd if (authResult.isFinished()) { connectionSession.setGrantee(new Grantee(authResult.getUsername(), authResult.getHostname())); connectionSession.setCurrentDatabase(authResult.getDatabase()); - connectionSession.setProcessId(processEngine.initializeConnection(connectionSession.getGrantee(), connectionSession.getDatabaseName())); + connectionSession.setProcessId(processEngine.connect(connectionSession.getGrantee(), connectionSession.getDatabaseName())); } return authResult.isFinished(); // CHECKSTYLE:OFF @@ -108,7 +108,7 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd private void closeAllResources() { ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(connectionSession.getConnectionId()); connectionSession.getBackendConnection().closeAllResources(); - Optional.ofNullable(connectionSession.getProcessId()).ifPresent(processEngine::finishConnection); + Optional.ofNullable(connectionSession.getProcessId()).ifPresent(processEngine::disconnect); databaseProtocolFrontendEngine.release(connectionSession); }