This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a5e6b52a4d7 Merge SchemaAssignedDatabaseBackendHandler into 
DatabaseCommunicationEngine (#19576)
a5e6b52a4d7 is described below

commit a5e6b52a4d7ec8f449af6e304936c40b521bc79a
Author: 吴伟杰 <[email protected]>
AuthorDate: Tue Jul 26 19:38:16 2022 +0800

    Merge SchemaAssignedDatabaseBackendHandler into DatabaseCommunicationEngine 
(#19576)
---
 .../communication/DatabaseCommunicationEngine.java |  71 +++++++++--
 .../jdbc/JDBCDatabaseCommunicationEngine.java      |   8 +-
 .../vertx/VertxDatabaseCommunicationEngine.java    |   9 +-
 .../admin/mysql/MySQLSetVariableAdminExecutor.java |   5 +-
 .../executor/UnicastResourceShowExecutor.java      |   2 +-
 .../backend/text/data/DatabaseBackendHandler.java  |  10 --
 .../text/data/DatabaseBackendHandlerFactory.java   |   4 +-
 .../impl/SchemaAssignedDatabaseBackendHandler.java | 136 ---------------------
 .../data/impl/UnicastDatabaseBackendHandler.java   |  11 +-
 .../TransactionBackendHandlerFactory.java          |   4 +-
 .../text/transaction/TransactionXAHandler.java     |   7 +-
 .../DatabaseCommunicationEngineFactoryTest.java    |   4 +-
 .../jdbc/JDBCDatabaseCommunicationEngineTest.java  |   4 +-
 .../TextProtocolBackendHandlerFactoryTest.java     |   4 +-
 .../mysql/MySQLSetVariableAdminExecutorTest.java   |  11 +-
 .../data/DatabaseBackendHandlerFactoryTest.java    |  34 +++++-
 .../SchemaAssignedDatabaseBackendHandlerTest.java  | 135 --------------------
 .../impl/UnicastDatabaseBackendHandlerTest.java    |   4 +-
 .../TransactionBackendHandlerFactoryTest.java      |  12 +-
 .../execute/MySQLComStmtExecuteExecutor.java       |   2 +-
 .../fieldlist/MySQLComFieldListPacketExecutor.java |  27 ++--
 .../command/MySQLCommandExecutorFactoryTest.java   |   8 --
 .../execute/MySQLComStmtExecuteExecutorTest.java   |   2 +-
 .../command/query/extended/JDBCPortal.java         |   2 +-
 .../command/query/extended/JDBCPortalTest.java     |   4 +-
 .../ReactiveMySQLComStmtExecuteExecutor.java       |   4 +-
 .../ReactiveMySQLComFieldListPacketExecutor.java   |   4 +-
 27 files changed, 163 insertions(+), 365 deletions(-)

diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index 2596d2ea665..ad5aded840c 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -17,15 +17,21 @@
 
 package org.apache.shardingsphere.proxy.backend.communication;
 
+import com.google.common.base.Preconditions;
 import lombok.AccessLevel;
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.Setter;
+import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.binder.LogicSQL;
+import org.apache.shardingsphere.infra.binder.aware.CursorDefinitionAware;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.ddl.CloseStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.ddl.CursorStatementContext;
 import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.binder.type.CursorAvailable;
 import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.context.refresher.MetaDataRefreshEngine;
+import 
org.apache.shardingsphere.infra.distsql.exception.resource.RequiredResourceMissedException;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
@@ -33,11 +39,13 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.event.MetaDataRefreshedEvent;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtil;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
 import org.apache.shardingsphere.mode.manager.lock.LockJudgeEngine;
 import org.apache.shardingsphere.mode.manager.lock.LockJudgeEngineFactory;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import 
org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException;
 import 
org.apache.shardingsphere.proxy.backend.exception.UnsupportedUpdateOperationException;
 import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell;
 import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
@@ -45,6 +53,9 @@ import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderEngine;
 import 
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import 
org.apache.shardingsphere.proxy.backend.text.data.DatabaseBackendHandler;
+import 
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueGroupsHolder;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -54,13 +65,10 @@ import java.util.Optional;
 
 /**
  * Database communication engine.
- *
- * @param <T> type of execute result
  */
-@RequiredArgsConstructor
 @Getter(AccessLevel.PROTECTED)
 @Setter(AccessLevel.PROTECTED)
-public abstract class DatabaseCommunicationEngine<T> {
+public abstract class DatabaseCommunicationEngine implements 
DatabaseBackendHandler {
     
     private final String driverType;
     
@@ -81,6 +89,8 @@ public abstract class DatabaseCommunicationEngine<T> {
     private final LockJudgeEngine lockJudgeEngine;
     
     public DatabaseCommunicationEngine(final String driverType, final 
ShardingSphereDatabase database, final LogicSQL logicSQL, final 
BackendConnection<?> backendConnection) {
+        SQLStatementContext<?> sqlStatementContext = 
logicSQL.getSqlStatementContext();
+        failedIfBackendNotReady(backendConnection.getConnectionSession(), 
sqlStatementContext);
         this.driverType = driverType;
         this.database = database;
         this.logicSQL = logicSQL;
@@ -91,14 +101,49 @@ public abstract class DatabaseCommunicationEngine<T> {
                 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getOptimizerContext().getPlannerContexts(),
                 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getProps());
         lockJudgeEngine = LockJudgeEngineFactory.getInstance();
+        if (sqlStatementContext instanceof CursorAvailable) {
+            prepareCursorStatementContext((CursorAvailable) 
sqlStatementContext, backendConnection.getConnectionSession());
+        }
     }
     
-    /**
-     * Execute.
-     *
-     * @return execute result
-     */
-    public abstract T execute();
+    @SneakyThrows(SQLException.class)
+    private void failedIfBackendNotReady(final ConnectionSession 
connectionSession, final SQLStatementContext<?> sqlStatementContext) {
+        ShardingSphereDatabase database = 
ProxyContext.getInstance().getDatabase(connectionSession.getDatabaseName());
+        boolean isSystemSchema = 
SystemSchemaUtil.containsSystemSchema(sqlStatementContext.getDatabaseType(), 
sqlStatementContext.getTablesContext().getSchemaNames(), database);
+        if (!isSystemSchema && !database.containsDataSource()) {
+            throw new 
RequiredResourceMissedException(connectionSession.getDatabaseName());
+        }
+        if (!isSystemSchema && !database.isComplete()) {
+            throw new RuleNotExistedException();
+        }
+    }
+    
+    private void prepareCursorStatementContext(final CursorAvailable 
statementContext, final ConnectionSession connectionSession) {
+        if (statementContext.getCursorName().isPresent()) {
+            String cursorName = 
statementContext.getCursorName().get().getIdentifier().getValue().toLowerCase();
+            prepareCursorStatementContext(statementContext, connectionSession, 
cursorName);
+        }
+        if (statementContext instanceof CloseStatementContext && 
((CloseStatementContext) statementContext).getSqlStatement().isCloseAll()) {
+            FetchOrderByValueGroupsHolder.remove();
+            connectionSession.getCursorDefinitions().clear();
+        }
+    }
+    
+    private void prepareCursorStatementContext(final CursorAvailable 
statementContext, final ConnectionSession connectionSession, final String 
cursorName) {
+        if (statementContext instanceof CursorStatementContext) {
+            connectionSession.getCursorDefinitions().put(cursorName, 
(CursorStatementContext) statementContext);
+        }
+        if (statementContext instanceof CursorDefinitionAware) {
+            CursorStatementContext cursorStatementContext = 
connectionSession.getCursorDefinitions().get(cursorName);
+            Preconditions.checkArgument(null != cursorStatementContext, 
"Cursor %s does not exist.", cursorName);
+            ((CursorDefinitionAware) 
statementContext).setUpCursorDefinition(cursorStatementContext);
+        }
+        if (statementContext instanceof CloseStatementContext) {
+            
FetchOrderByValueGroupsHolder.getOrderByValueGroups().remove(cursorName);
+            
FetchOrderByValueGroupsHolder.getMinGroupRowCounts().remove(cursorName);
+            connectionSession.getCursorDefinitions().remove(cursorName);
+        }
+    }
     
     protected void refreshMetaData(final ExecutionContext executionContext) 
throws SQLException {
         Optional<MetaDataRefreshedEvent> event = 
metadataRefreshEngine.refresh(executionContext.getSqlStatementContext(), 
executionContext.getRouteContext().getRouteUnits());
@@ -177,6 +222,7 @@ public abstract class DatabaseCommunicationEngine<T> {
      * @return has more result value or not
      * @throws SQLException SQL exception
      */
+    @Override
     public boolean next() throws SQLException {
         return null != mergedResult && mergedResult.next();
     }
@@ -187,7 +233,8 @@ public abstract class DatabaseCommunicationEngine<T> {
      * @return query response row
      * @throws SQLException SQL exception
      */
-    public QueryResponseRow getQueryResponseRow() throws SQLException {
+    @Override
+    public QueryResponseRow getRowData() throws SQLException {
         List<QueryResponseCell> cells = new ArrayList<>(queryHeaders.size());
         for (int columnIndex = 1; columnIndex <= queryHeaders.size(); 
columnIndex++) {
             Object data = mergedResult.getValue(columnIndex, Object.class);
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index 8ef03a65a9f..a5ccb845589 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.proxy.backend.communication.jdbc;
 
-import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.binder.LogicSQL;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
@@ -68,7 +67,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 /**
  * JDBC database communication engine.
  */
-public final class JDBCDatabaseCommunicationEngine extends 
DatabaseCommunicationEngine<ResponseHeader> {
+public final class JDBCDatabaseCommunicationEngine extends 
DatabaseCommunicationEngine {
     
     private final ProxySQLExecutor proxySQLExecutor;
     
@@ -117,8 +116,8 @@ public final class JDBCDatabaseCommunicationEngine extends 
DatabaseCommunication
      * @return backend response
      */
     @SuppressWarnings({"unchecked", "rawtypes"})
-    @SneakyThrows(SQLException.class)
-    public ResponseHeader execute() {
+    @Override
+    public ResponseHeader execute() throws SQLException {
         LogicSQL logicSQL = getLogicSQL();
         MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
         ExecutionContext executionContext = 
getKernelProcessor().generateExecutionContext(
@@ -181,6 +180,7 @@ public final class JDBCDatabaseCommunicationEngine extends 
DatabaseCommunication
      *
      * @throws SQLException SQL exception
      */
+    @Override
     public void close() throws SQLException {
         Collection<SQLException> result = new LinkedList<>();
         result.addAll(closeResultSets());
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxDatabaseCommunicationEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxDatabaseCommunicationEngine.java
index 16f2be765ac..971da7427be 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxDatabaseCommunicationEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxDatabaseCommunicationEngine.java
@@ -36,7 +36,7 @@ import java.util.List;
 /**
  * Vert.x database communication engine.
  */
-public final class VertxDatabaseCommunicationEngine extends 
DatabaseCommunicationEngine<Future<ResponseHeader>> {
+public final class VertxDatabaseCommunicationEngine extends 
DatabaseCommunicationEngine {
     
     private final ReactiveProxySQLExecutor reactiveProxySQLExecutor;
     
@@ -52,7 +52,7 @@ public final class VertxDatabaseCommunicationEngine extends 
DatabaseCommunicatio
      */
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
-    public Future<ResponseHeader> execute() {
+    public Future<ResponseHeader> executeFuture() {
         try {
             ShardingSphereMetaData metaData = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData();
             ExecutionContext executionContext = getKernelProcessor()
@@ -82,4 +82,9 @@ public final class VertxDatabaseCommunicationEngine extends 
DatabaseCommunicatio
             return Future.failedFuture(ex);
         }
     }
+    
+    @Override
+    public ResponseHeader execute() throws SQLException {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutor.java
index f290bbc2cf6..5433d819eac 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutor.java
@@ -23,11 +23,11 @@ import 
org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.backend.text.admin.executor.DatabaseAdminExecutor;
 import 
org.apache.shardingsphere.proxy.backend.text.data.DatabaseBackendHandler;
-import 
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dal.VariableAssignSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.SetStatement;
@@ -82,7 +82,8 @@ public final class MySQLSetVariableAdminExecutor implements 
DatabaseAdminExecuto
         SQLStatement sqlStatement = 
sqlParserRule.getSQLParserEngine(DatabaseTypeFactory.getInstance("MySQL").getType()).parse(sql,
 false);
         SQLStatementContext<?> sqlStatementContext = 
SQLStatementContextFactory.newInstance(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabases(),
                 sqlStatement, connectionSession.getDefaultDatabaseName());
-        DatabaseBackendHandler databaseBackendHandler = new 
SchemaAssignedDatabaseBackendHandler(sqlStatementContext, sql, 
connectionSession);
+        DatabaseBackendHandler databaseBackendHandler = 
DatabaseCommunicationEngineFactory.getInstance()
+                .newDatabaseCommunicationEngine(sqlStatementContext, sql, 
connectionSession.getBackendConnection(), false);
         try {
             databaseBackendHandler.execute();
         } finally {
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/UnicastResourceShowExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/UnicastResourceShowExecutor.java
index dad3c756fb2..7a128bb2e77 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/UnicastResourceShowExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/UnicastResourceShowExecutor.java
@@ -111,7 +111,7 @@ public final class UnicastResourceShowExecutor implements 
DatabaseAdminQueryExec
     private QueryResult createQueryResult() throws SQLException {
         List<MemoryQueryResultDataRow> rows = new LinkedList<>();
         while (databaseCommunicationEngine.next()) {
-            List<Object> data = 
databaseCommunicationEngine.getQueryResponseRow().getData();
+            List<Object> data = 
databaseCommunicationEngine.getRowData().getData();
             rows.add(new MemoryQueryResultDataRow(data));
         }
         return new RawMemoryQueryResult(getQueryResultMetaData(), rows);
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandler.java
index 65a8827e4c1..158e72aa4a7 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandler.java
@@ -17,20 +17,10 @@
 
 package org.apache.shardingsphere.proxy.backend.text.data;
 
-import io.vertx.core.Future;
-import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
 
 /**
  * Database backend handler.
  */
 public interface DatabaseBackendHandler extends TextProtocolBackendHandler {
-    
-    /**
-     * Handlers which communicate with database must implement async execution.
-     *
-     * @return future of response header
-     */
-    @Override
-    Future<ResponseHeader> executeFuture();
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactory.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactory.java
index c00436ec3e8..4c9f4a236c5 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactory.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactory.java
@@ -21,10 +21,10 @@ import io.vertx.core.Future;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
-import 
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
 import 
org.apache.shardingsphere.proxy.backend.text.data.impl.UnicastDatabaseBackendHandler;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
@@ -68,6 +68,6 @@ public final class DatabaseBackendHandlerFactory {
         if (sqlStatement instanceof DALStatement || (sqlStatement instanceof 
SelectStatement && null == ((SelectStatement) sqlStatement).getFrom())) {
             return new UnicastDatabaseBackendHandler(sqlStatementContext, sql, 
connectionSession);
         }
-        return new SchemaAssignedDatabaseBackendHandler(sqlStatementContext, 
sql, connectionSession);
+        return 
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(sqlStatementContext,
 sql, connectionSession.getBackendConnection(), false);
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
deleted file mode 100644
index d19956213c7..00000000000
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.backend.text.data.impl;
-
-import com.google.common.base.Preconditions;
-import io.vertx.core.Future;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.binder.aware.CursorDefinitionAware;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import 
org.apache.shardingsphere.infra.binder.statement.ddl.CloseStatementContext;
-import 
org.apache.shardingsphere.infra.binder.statement.ddl.CursorStatementContext;
-import org.apache.shardingsphere.infra.binder.type.CursorAvailable;
-import 
org.apache.shardingsphere.infra.distsql.exception.resource.RequiredResourceMissedException;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtil;
-import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
-import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
-import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import 
org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException;
-import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
-import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
-import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
-import 
org.apache.shardingsphere.proxy.backend.text.data.DatabaseBackendHandler;
-import 
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueGroupsHolder;
-
-import java.sql.SQLException;
-
-/**
- * Database backend handler with assigned schema.
- */
-@RequiredArgsConstructor
-public final class SchemaAssignedDatabaseBackendHandler implements 
DatabaseBackendHandler {
-    
-    private final DatabaseCommunicationEngineFactory 
databaseCommunicationEngineFactory = 
DatabaseCommunicationEngineFactory.getInstance();
-    
-    private final SQLStatementContext<?> sqlStatementContext;
-    
-    private final String sql;
-    
-    private final ConnectionSession connectionSession;
-    
-    private DatabaseCommunicationEngine<?> databaseCommunicationEngine;
-    
-    @Override
-    public ResponseHeader execute() throws SQLException {
-        prepareDatabaseCommunicationEngine();
-        return (ResponseHeader) databaseCommunicationEngine.execute();
-    }
-    
-    @Override
-    public Future<ResponseHeader> executeFuture() {
-        try {
-            prepareDatabaseCommunicationEngine();
-            return (Future<ResponseHeader>) 
databaseCommunicationEngine.execute();
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            return Future.failedFuture(ex);
-        }
-    }
-    
-    private void prepareDatabaseCommunicationEngine() throws 
RequiredResourceMissedException {
-        ShardingSphereDatabase database = 
ProxyContext.getInstance().getDatabase(connectionSession.getDatabaseName());
-        boolean isSystemSchema = 
SystemSchemaUtil.containsSystemSchema(sqlStatementContext.getDatabaseType(), 
sqlStatementContext.getTablesContext().getSchemaNames(), database);
-        if (!isSystemSchema && !database.containsDataSource()) {
-            throw new 
RequiredResourceMissedException(connectionSession.getDatabaseName());
-        }
-        if (!isSystemSchema && !database.isComplete()) {
-            throw new RuleNotExistedException();
-        }
-        if (sqlStatementContext instanceof CursorAvailable) {
-            prepareCursorStatementContext((CursorAvailable) 
sqlStatementContext, connectionSession);
-        }
-        databaseCommunicationEngine = 
databaseCommunicationEngineFactory.newDatabaseCommunicationEngine(sqlStatementContext,
 sql, connectionSession.getBackendConnection(), false);
-    }
-    
-    private void prepareCursorStatementContext(final CursorAvailable 
statementContext, final ConnectionSession connectionSession) {
-        if (statementContext.getCursorName().isPresent()) {
-            String cursorName = 
statementContext.getCursorName().get().getIdentifier().getValue().toLowerCase();
-            prepareCursorStatementContext(statementContext, connectionSession, 
cursorName);
-        }
-        if (statementContext instanceof CloseStatementContext && 
((CloseStatementContext) statementContext).getSqlStatement().isCloseAll()) {
-            FetchOrderByValueGroupsHolder.remove();
-            connectionSession.getCursorDefinitions().clear();
-        }
-    }
-    
-    private void prepareCursorStatementContext(final CursorAvailable 
statementContext, final ConnectionSession connectionSession, final String 
cursorName) {
-        if (statementContext instanceof CursorStatementContext) {
-            connectionSession.getCursorDefinitions().put(cursorName, 
(CursorStatementContext) statementContext);
-        }
-        if (statementContext instanceof CursorDefinitionAware) {
-            CursorStatementContext cursorStatementContext = 
connectionSession.getCursorDefinitions().get(cursorName);
-            Preconditions.checkArgument(null != cursorStatementContext, 
"Cursor %s does not exist.", cursorName);
-            ((CursorDefinitionAware) 
statementContext).setUpCursorDefinition(cursorStatementContext);
-        }
-        if (statementContext instanceof CloseStatementContext) {
-            
FetchOrderByValueGroupsHolder.getOrderByValueGroups().remove(cursorName);
-            
FetchOrderByValueGroupsHolder.getMinGroupRowCounts().remove(cursorName);
-            connectionSession.getCursorDefinitions().remove(cursorName);
-        }
-    }
-    
-    @Override
-    public boolean next() throws SQLException {
-        return databaseCommunicationEngine.next();
-    }
-    
-    @Override
-    public QueryResponseRow getRowData() throws SQLException {
-        return databaseCommunicationEngine.getQueryResponseRow();
-    }
-    
-    @Override
-    public void close() throws SQLException {
-        if (databaseCommunicationEngine instanceof 
JDBCDatabaseCommunicationEngine) {
-            ((JDBCDatabaseCommunicationEngine) 
databaseCommunicationEngine).close();
-        }
-    }
-}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
index d99ed210592..c6639c79eb0 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
@@ -22,7 +22,6 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
-import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import 
org.apache.shardingsphere.proxy.backend.exception.NoDatabaseSelectedException;
 import 
org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException;
@@ -60,7 +59,7 @@ public final class UnicastDatabaseBackendHandler implements 
DatabaseBackendHandl
         }
         connectionSession.setCurrentDatabase(databaseName);
         databaseCommunicationEngine = 
databaseCommunicationEngineFactory.newDatabaseCommunicationEngine(sqlStatementContext,
 sql, connectionSession.getBackendConnection(), false);
-        return ((Future<ResponseHeader>) 
databaseCommunicationEngine.execute()).eventually(unused -> {
+        return databaseCommunicationEngine.executeFuture().eventually(unused 
-> {
             connectionSession.setCurrentDatabase(databaseName);
             return Future.succeededFuture();
         });
@@ -76,7 +75,7 @@ public final class UnicastDatabaseBackendHandler implements 
DatabaseBackendHandl
         try {
             connectionSession.setCurrentDatabase(databaseName);
             databaseCommunicationEngine = 
databaseCommunicationEngineFactory.newDatabaseCommunicationEngine(sqlStatementContext,
 sql, connectionSession.getBackendConnection(), false);
-            return (ResponseHeader) databaseCommunicationEngine.execute();
+            return databaseCommunicationEngine.execute();
         } finally {
             connectionSession.setCurrentDatabase(databaseName);
         }
@@ -101,13 +100,11 @@ public final class UnicastDatabaseBackendHandler 
implements DatabaseBackendHandl
     
     @Override
     public QueryResponseRow getRowData() throws SQLException {
-        return databaseCommunicationEngine.getQueryResponseRow();
+        return databaseCommunicationEngine.getRowData();
     }
     
     @Override
     public void close() throws SQLException {
-        if (databaseCommunicationEngine instanceof 
JDBCDatabaseCommunicationEngine) {
-            ((JDBCDatabaseCommunicationEngine) 
databaseCommunicationEngine).close();
-        }
+        databaseCommunicationEngine.close();
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
index 3ba2526aa5e..5f180541ab0 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
@@ -20,9 +20,9 @@ package 
org.apache.shardingsphere.proxy.backend.text.transaction;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
-import 
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
 import org.apache.shardingsphere.sql.parser.sql.common.constant.OperationScope;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.BeginTransactionStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
@@ -78,6 +78,6 @@ public final class TransactionBackendHandlerFactory {
         if (tclStatement instanceof XAStatement) {
             return new TransactionXAHandler(sqlStatementContext, sql, 
connectionSession);
         }
-        return new SchemaAssignedDatabaseBackendHandler(sqlStatementContext, 
sql, connectionSession);
+        return 
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(sqlStatementContext,
 sql, connectionSession.getBackendConnection(), false);
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
index 98b4deb53c9..5ac2f610c80 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
@@ -19,11 +19,12 @@ package 
org.apache.shardingsphere.proxy.backend.text.transaction;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
-import 
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
 import 
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueGroupsHolder;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.XAStatement;
@@ -43,12 +44,12 @@ public final class TransactionXAHandler implements 
TextProtocolBackendHandler {
     
     private final ConnectionSession connectionSession;
     
-    private final SchemaAssignedDatabaseBackendHandler backendHandler;
+    private final JDBCDatabaseCommunicationEngine backendHandler;
     
     public TransactionXAHandler(final SQLStatementContext<? extends 
TCLStatement> sqlStatementContext, final String sql, final ConnectionSession 
connectionSession) {
         this.tclStatement = (XAStatement) 
sqlStatementContext.getSqlStatement();
         this.connectionSession = connectionSession;
-        this.backendHandler = new 
SchemaAssignedDatabaseBackendHandler(sqlStatementContext, sql, 
connectionSession);
+        this.backendHandler = 
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(sqlStatementContext,
 sql, connectionSession.getBackendConnection(), false);
     }
     
     @Override
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
index 398b1b43163..a727a44048d 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
@@ -57,6 +57,8 @@ public final class DatabaseCommunicationEngineFactoryTest 
extends ProxyContextRe
     
     private Map<String, ShardingSphereDatabase> getDatabases() {
         ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, 
RETURNS_DEEP_STUBS);
+        when(database.containsDataSource()).thenReturn(true);
+        when(database.isComplete()).thenReturn(true);
         when(database.getResource().getDatabaseType()).thenReturn(new 
H2DatabaseType());
         
when(database.getRuleMetaData().getRules()).thenReturn(Collections.emptyList());
         Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
@@ -70,7 +72,7 @@ public final class DatabaseCommunicationEngineFactoryTest 
extends ProxyContextRe
         
when(backendConnection.getConnectionSession().getDatabaseName()).thenReturn("db");
         SQLStatementContext<?> sqlStatementContext = 
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
         
when(sqlStatementContext.getTablesContext().getSchemaNames()).thenReturn(Collections.emptyList());
-        DatabaseCommunicationEngine<?> engine = 
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(sqlStatementContext,
 "schemaName", backendConnection, false);
+        DatabaseCommunicationEngine engine = 
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(sqlStatementContext,
 "schemaName", backendConnection, false);
         assertThat(engine, instanceOf(DatabaseCommunicationEngine.class));
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
index a0788131e97..3c865fa9cd5 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
@@ -104,6 +104,8 @@ public final class JDBCDatabaseCommunicationEngineTest 
extends ProxyContextResto
     
     private Map<String, ShardingSphereDatabase> mockDatabases() {
         ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, 
RETURNS_DEEP_STUBS);
+        when(database.containsDataSource()).thenReturn(true);
+        when(database.isComplete()).thenReturn(true);
         when(database.getResource().getDatabaseType()).thenReturn(new 
H2DatabaseType());
         
when(database.getRuleMetaData().getRules()).thenReturn(Collections.emptyList());
         Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
@@ -135,7 +137,7 @@ public final class JDBCDatabaseCommunicationEngineTest 
extends ProxyContextResto
         });
         Exception ex = null;
         try {
-            engine.getQueryResponseRow();
+            engine.getRowData();
         } catch (final SQLException | IndexOutOfBoundsException e) {
             ex = e;
         } finally {
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactoryTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactoryTest.java
index e2495f3c0ec..7b5e8177aed 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactoryTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactoryTest.java
@@ -27,12 +27,12 @@ import 
org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import 
org.apache.shardingsphere.proxy.backend.text.admin.DatabaseAdminQueryBackendHandler;
 import 
org.apache.shardingsphere.proxy.backend.text.admin.DatabaseAdminUpdateBackendHandler;
-import 
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
 import 
org.apache.shardingsphere.proxy.backend.text.data.impl.UnicastDatabaseBackendHandler;
 import 
org.apache.shardingsphere.proxy.backend.text.distsql.ral.QueryableRALBackendHandler;
 import 
org.apache.shardingsphere.proxy.backend.text.distsql.ral.hint.HintRALBackendHandler;
@@ -213,7 +213,7 @@ public final class TextProtocolBackendHandlerFactoryTest 
extends ProxyContextRes
         when(proxyContext.getAllDatabaseNames()).thenReturn(new 
HashSet<>(Collections.singletonList("db")));
         
when(proxyContext.getDatabase("db").containsDataSource()).thenReturn(true);
         TextProtocolBackendHandler actual = 
TextProtocolBackendHandlerFactory.newInstance(databaseType, sql, 
connectionSession);
-        assertThat(actual, 
instanceOf(SchemaAssignedDatabaseBackendHandler.class));
+        assertThat(actual, instanceOf(DatabaseCommunicationEngine.class));
         sql = "select * from information_schema.schemata limit 1";
         actual = TextProtocolBackendHandlerFactory.newInstance(databaseType, 
sql, connectionSession);
         assertThat(actual, instanceOf(DatabaseAdminQueryBackendHandler.class));
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutorTest.java
index db7da356a9a..0d95a0e4784 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutorTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutorTest.java
@@ -17,13 +17,15 @@
 
 package org.apache.shardingsphere.proxy.backend.text.admin.mysql;
 
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
-import 
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
 import org.apache.shardingsphere.proxy.backend.util.ProxyContextRestorer;
 import org.apache.shardingsphere.sql.parser.api.CacheOption;
 import 
org.apache.shardingsphere.sql.parser.sql.common.segment.dal.VariableAssignSegment;
@@ -50,10 +52,15 @@ public final class MySQLSetVariableAdminExecutorTest 
extends ProxyContextRestore
         MySQLSetVariableAdminExecutor executor = new 
MySQLSetVariableAdminExecutor(setStatement);
         ConnectionSession connectionSession = mock(ConnectionSession.class);
         when(connectionSession.getDatabaseName()).thenReturn("db");
+        JDBCBackendConnection backendConnection = 
mock(JDBCBackendConnection.class);
+        
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
+        
when(backendConnection.getConnectionSession()).thenReturn(connectionSession);
         ProxyContext.init(mock(ContextManager.class, RETURNS_DEEP_STUBS));
+        
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db")).thenReturn(mock(ShardingSphereDatabase.class));
+        
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().containsDatabase("db")).thenReturn(true);
         
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData())
                 .thenReturn(new 
ShardingSphereRuleMetaData(Collections.singletonList(new SQLParserRule(new 
SQLParserRuleConfiguration(false, new CacheOption(1, 1), new CacheOption(1, 
1))))));
-        try (MockedConstruction<SchemaAssignedDatabaseBackendHandler> 
mockConstruction = 
mockConstruction(SchemaAssignedDatabaseBackendHandler.class)) {
+        try (MockedConstruction<JDBCDatabaseCommunicationEngine> 
mockConstruction = mockConstruction(JDBCDatabaseCommunicationEngine.class)) {
             executor.execute(connectionSession);
             verify(mockConstruction.constructed().get(0)).execute();
         }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactoryTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactoryTest.java
index ff82e73d0ac..703ead41f37 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactoryTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactoryTest.java
@@ -18,20 +18,31 @@
 package org.apache.shardingsphere.proxy.backend.text.data;
 
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
-import 
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
 import 
org.apache.shardingsphere.proxy.backend.text.data.impl.UnicastDatabaseBackendHandler;
+import org.apache.shardingsphere.proxy.backend.util.ProxyContextRestorer;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
 import org.junit.Test;
+import org.mockito.MockedConstruction;
+
+import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
 import static org.mockito.Mockito.when;
 
-public final class DatabaseBackendHandlerFactoryTest {
+public final class DatabaseBackendHandlerFactoryTest extends 
ProxyContextRestorer {
     
     @Test
     public void 
assertNewInstanceReturnedUnicastDatabaseBackendHandlerWithDAL() {
@@ -54,9 +65,22 @@ public final class DatabaseBackendHandlerFactoryTest {
     @Test
     public void 
assertNewInstanceReturnedSchemaAssignedDatabaseBackendHandler() {
         String sql = "SELECT 1 FROM user WHERE id = 1";
-        SQLStatementContext<SQLStatement> context = 
mock(SQLStatementContext.class);
+        SQLStatementContext<SQLStatement> context = 
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
         when(context.getSqlStatement()).thenReturn(mock(SQLStatement.class));
-        DatabaseBackendHandler actual = 
DatabaseBackendHandlerFactory.newInstance(context, sql, 
mock(ConnectionSession.class));
-        assertThat(actual, 
instanceOf(SchemaAssignedDatabaseBackendHandler.class));
+        
when(context.getTablesContext().getSchemaNames()).thenReturn(Collections.emptyList());
+        ProxyContext.init(mock(ContextManager.class, RETURNS_DEEP_STUBS));
+        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
+        when(database.isComplete()).thenReturn(true);
+        when(database.containsDataSource()).thenReturn(true);
+        
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db")).thenReturn(database);
+        
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().containsDatabase("db")).thenReturn(true);
+        ConnectionSession connectionSession = mock(ConnectionSession.class);
+        when(connectionSession.getDatabaseName()).thenReturn("db");
+        
when(connectionSession.getBackendConnection()).thenReturn(mock(JDBCBackendConnection.class));
+        
when(connectionSession.getBackendConnection().getConnectionSession()).thenReturn(connectionSession);
+        try (MockedConstruction<JDBCDatabaseCommunicationEngine> unused = 
mockConstruction(JDBCDatabaseCommunicationEngine.class)) {
+            DatabaseBackendHandler actual = 
DatabaseBackendHandlerFactory.newInstance(context, sql, connectionSession);
+            assertThat(actual, instanceOf(DatabaseCommunicationEngine.class));
+        }
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandlerTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandlerTest.java
deleted file mode 100644
index 61e4652c921..00000000000
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandlerTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.backend.text.data.impl;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import 
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
-import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
-import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
-import 
org.apache.shardingsphere.proxy.backend.text.data.DatabaseBackendHandler;
-import org.apache.shardingsphere.proxy.backend.util.ProxyContextRestorer;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.lang.reflect.Field;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class SchemaAssignedDatabaseBackendHandlerTest extends 
ProxyContextRestorer {
-    
-    private static final String EXECUTE_SQL = "USE test";
-    
-    private static final String DATABASE_PATTERN = "db_%s";
-    
-    private SchemaAssignedDatabaseBackendHandler 
schemaAssignedDatabaseBackendHandler;
-    
-    @Mock
-    private ConnectionSession connectionSession;
-    
-    @Mock
-    private DatabaseCommunicationEngineFactory 
databaseCommunicationEngineFactory;
-    
-    @Mock
-    private DatabaseCommunicationEngine databaseCommunicationEngine;
-    
-    @Before
-    public void setUp() throws IllegalAccessException, NoSuchFieldException, 
SQLException {
-        ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
-        MetaDataContexts metaDataContexts = new 
MetaDataContexts(mock(MetaDataPersistService.class),
-                new ShardingSphereMetaData(getDatabases(), 
mock(ShardingSphereRuleMetaData.class), new ConfigurationProperties(new 
Properties())), mock(OptimizerContext.class));
-        
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
-        ProxyContext.init(contextManager);
-        
when(connectionSession.getDatabaseName()).thenReturn(String.format(DATABASE_PATTERN,
 0));
-        mockDatabaseCommunicationEngine(new 
UpdateResponseHeader(mock(SQLStatement.class)));
-        SQLStatementContext<?> sqlStatementContext = 
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
-        when(sqlStatementContext.getDatabaseType()).thenReturn(new 
MySQLDatabaseType());
-        
when(sqlStatementContext.getTablesContext().getSchemaNames()).thenReturn(Collections.emptyList());
-        schemaAssignedDatabaseBackendHandler = new 
SchemaAssignedDatabaseBackendHandler(sqlStatementContext, EXECUTE_SQL, 
connectionSession);
-        setBackendHandlerFactory(schemaAssignedDatabaseBackendHandler);
-    }
-    
-    private Map<String, ShardingSphereDatabase> getDatabases() {
-        Map<String, ShardingSphereDatabase> result = new HashMap<>(10, 1);
-        for (int i = 0; i < 10; i++) {
-            ShardingSphereDatabase database = 
mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
-            when(database.isComplete()).thenReturn(true);
-            when(database.containsDataSource()).thenReturn(true);
-            when(database.getResource().getDatabaseType()).thenReturn(new 
H2DatabaseType());
-            result.put(String.format(DATABASE_PATTERN, i), database);
-        }
-        return result;
-    }
-    
-    private void mockDatabaseCommunicationEngine(final ResponseHeader 
responseHeader) {
-        when(databaseCommunicationEngine.execute()).thenReturn(responseHeader);
-        
when(databaseCommunicationEngineFactory.newDatabaseCommunicationEngine(any(), 
anyString(), any(), eq(false))).thenReturn(databaseCommunicationEngine);
-    }
-    
-    @SneakyThrows(ReflectiveOperationException.class)
-    private void setBackendHandlerFactory(final DatabaseBackendHandler 
schemaDatabaseBackendHandler) {
-        Field field = 
schemaDatabaseBackendHandler.getClass().getDeclaredField("databaseCommunicationEngineFactory");
-        field.setAccessible(true);
-        field.set(schemaDatabaseBackendHandler, 
databaseCommunicationEngineFactory);
-    }
-    
-    @Test
-    public void assertExecuteDatabaseBackendHandler() throws SQLException {
-        ResponseHeader actual = schemaAssignedDatabaseBackendHandler.execute();
-        assertThat(actual, instanceOf(UpdateResponseHeader.class));
-    }
-    
-    @Test
-    public void assertDatabaseUsingStream() throws SQLException {
-        schemaAssignedDatabaseBackendHandler.execute();
-        while (schemaAssignedDatabaseBackendHandler.next()) {
-            
assertThat(schemaAssignedDatabaseBackendHandler.getRowData().getData().size(), 
is(1));
-        }
-    }
-}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandlerTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandlerTest.java
index f4d661f498f..bc0ec864f0e 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandlerTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandlerTest.java
@@ -78,7 +78,7 @@ public final class UnicastDatabaseBackendHandlerTest extends 
ProxyContextRestore
     private DatabaseCommunicationEngine databaseCommunicationEngine;
     
     @Before
-    public void setUp() {
+    public void setUp() throws SQLException {
         ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
         MetaDataContexts metaDataContexts = new 
MetaDataContexts(mock(MetaDataPersistService.class),
                 new ShardingSphereMetaData(getDatabases(), 
mock(ShardingSphereRuleMetaData.class), new ConfigurationProperties(new 
Properties())), mock(OptimizerContext.class));
@@ -101,7 +101,7 @@ public final class UnicastDatabaseBackendHandlerTest 
extends ProxyContextRestore
         return result;
     }
     
-    private void mockDatabaseCommunicationEngine(final ResponseHeader 
responseHeader) {
+    private void mockDatabaseCommunicationEngine(final ResponseHeader 
responseHeader) throws SQLException {
         when(databaseCommunicationEngine.execute()).thenReturn(responseHeader);
         
when(databaseCommunicationEngineFactory.newDatabaseCommunicationEngine(any(), 
anyString(), any(), eq(false))).thenReturn(databaseCommunicationEngine);
     }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactoryTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactoryTest.java
index f6114c60fec..5b6d8891dce 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactoryTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactoryTest.java
@@ -21,12 +21,13 @@ import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
+import 
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.JDBCBackendTransactionManager;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
-import 
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
 import org.apache.shardingsphere.proxy.backend.util.ProxyContextRestorer;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackStatement;
@@ -37,6 +38,7 @@ import org.hamcrest.Matcher;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Answers;
+import org.mockito.MockedStatic;
 
 import java.lang.reflect.Field;
 
@@ -45,6 +47,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 
 public final class TransactionBackendHandlerFactoryTest extends 
ProxyContextRestorer {
@@ -92,7 +95,12 @@ public final class TransactionBackendHandlerFactoryTest 
extends ProxyContextRest
     public void assertBroadcastBackendHandlerReturnedWhenTCLStatementNotHit() {
         SQLStatementContext<TCLStatement> context = 
mock(SQLStatementContext.class);
         when(context.getSqlStatement()).thenReturn(mock(TCLStatement.class));
-        assertThat(TransactionBackendHandlerFactory.newInstance(context, null, 
mock(ConnectionSession.class)), 
instanceOf(SchemaAssignedDatabaseBackendHandler.class));
+        try (MockedStatic<DatabaseCommunicationEngineFactory> mockedStatic = 
mockStatic(DatabaseCommunicationEngineFactory.class)) {
+            DatabaseCommunicationEngineFactory mockFactory = 
mock(DatabaseCommunicationEngineFactory.class);
+            
mockedStatic.when(DatabaseCommunicationEngineFactory::getInstance).thenReturn(mockFactory);
+            when(mockFactory.newDatabaseCommunicationEngine(context, null, 
null, false)).thenReturn(mock(DatabaseCommunicationEngine.class));
+            assertThat(TransactionBackendHandlerFactory.newInstance(context, 
null, mock(ConnectionSession.class)), 
instanceOf(DatabaseCommunicationEngine.class));
+        }
     }
     
     @SuppressWarnings("unchecked")
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index 2a05336d484..5beb630eb54 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -148,7 +148,7 @@ public final class MySQLComStmtExecuteExecutor implements 
QueryCommandExecutor {
     
     @Override
     public MySQLPacket getQueryRowPacket() throws SQLException {
-        QueryResponseRow queryResponseRow = 
databaseCommunicationEngine.getQueryResponseRow();
+        QueryResponseRow queryResponseRow = 
databaseCommunicationEngine.getRowData();
         return new MySQLBinaryResultSetRowPacket(++currentSequenceId, 
createBinaryRow(queryResponseRow));
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
index e5b0530919c..aa4eafcb756 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.fieldlist;
 
+import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
@@ -44,6 +45,7 @@ import java.util.LinkedList;
 /**
  * COM_FIELD_LIST packet executor for MySQL.
  */
+@RequiredArgsConstructor
 public final class MySQLComFieldListPacketExecutor implements CommandExecutor {
     
     private static final String SQL = "SHOW COLUMNS FROM %s FROM %s";
@@ -52,18 +54,13 @@ public final class MySQLComFieldListPacketExecutor 
implements CommandExecutor {
     
     private final ConnectionSession connectionSession;
     
-    private final String databaseName;
-    
-    private final JDBCDatabaseCommunicationEngine databaseCommunicationEngine;
-    
-    private final int characterSet;
+    private JDBCDatabaseCommunicationEngine databaseCommunicationEngine;
     
     private int currentSequenceId;
     
-    public MySQLComFieldListPacketExecutor(final MySQLComFieldListPacket 
packet, final ConnectionSession connectionSession) {
-        this.packet = packet;
-        this.connectionSession = connectionSession;
-        databaseName = connectionSession.getDefaultDatabaseName();
+    @Override
+    public Collection<DatabasePacket<?>> execute() throws SQLException {
+        String databaseName = connectionSession.getDefaultDatabaseName();
         String sql = String.format(SQL, packet.getTable(), databaseName);
         MetaDataContexts metaDataContexts = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
         SQLParserRule sqlParserRule = 
metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
@@ -72,19 +69,15 @@ public final class MySQLComFieldListPacketExecutor 
implements CommandExecutor {
         SQLStatementContext<?> sqlStatementContext = 
SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData().getDatabases(),
 sqlStatement, databaseName);
         JDBCBackendConnection backendConnection = (JDBCBackendConnection) 
connectionSession.getBackendConnection();
         databaseCommunicationEngine = 
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(sqlStatementContext,
 sql, backendConnection, false);
-        characterSet = 
connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
-    }
-    
-    @Override
-    public Collection<DatabasePacket<?>> execute() throws SQLException {
         databaseCommunicationEngine.execute();
-        return createColumnDefinition41Packets();
+        return createColumnDefinition41Packets(databaseName);
     }
     
-    private Collection<DatabasePacket<?>> createColumnDefinition41Packets() 
throws SQLException {
+    private Collection<DatabasePacket<?>> 
createColumnDefinition41Packets(final String databaseName) throws SQLException {
         Collection<DatabasePacket<?>> result = new LinkedList<>();
+        int characterSet = 
connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
         while (databaseCommunicationEngine.next()) {
-            String columnName = 
databaseCommunicationEngine.getQueryResponseRow().getCells().iterator().next().getData().toString();
+            String columnName = 
databaseCommunicationEngine.getRowData().getCells().iterator().next().getData().toString();
             result.add(new MySQLColumnDefinition41Packet(
                     ++currentSequenceId, characterSet, databaseName, 
packet.getTable(), packet.getTable(), columnName, columnName, 100, 
MySQLBinaryColumnType.MYSQL_TYPE_VARCHAR, 0, true));
         }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
index 5b0a2ac9e91..77aa2189558 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
@@ -39,8 +39,6 @@ import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRule
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.parser.rule.SQLParserRule;
-import 
org.apache.shardingsphere.parser.rule.builder.DefaultSQLParserRuleConfigurationBuilder;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -87,11 +85,8 @@ public final class MySQLCommandExecutorFactoryTest extends 
ProxyContextRestorer
     
     @Before
     public void setUp() {
-        when(connectionSession.getDatabaseName()).thenReturn("logic_db");
-        
when(connectionSession.getDefaultDatabaseName()).thenReturn("logic_db");
         
when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_GENERAL_CI);
         
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
-        
when(backendConnection.getConnectionSession()).thenReturn(connectionSession);
         ShardingSphereDatabase database = mockDatabase();
         Map<String, ShardingSphereDatabase> databases = new LinkedHashMap<>(1, 
1);
         databases.put("logic_db", database);
@@ -101,8 +96,6 @@ public final class MySQLCommandExecutorFactoryTest extends 
ProxyContextRestorer
                 mock(OptimizerContext.class, RETURNS_DEEP_STUBS));
         
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
         ProxyContext.init(contextManager);
-        
when(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class))
-                .thenReturn(new SQLParserRule(new 
DefaultSQLParserRuleConfigurationBuilder().build()));
     }
     
     private ShardingSphereDatabase mockDatabase() {
@@ -126,7 +119,6 @@ public final class MySQLCommandExecutorFactoryTest extends 
ProxyContextRestorer
     @Test
     public void assertNewInstanceWithComFieldList() throws SQLException {
         MySQLComFieldListPacket packet = mock(MySQLComFieldListPacket.class);
-        when(packet.getTable()).thenReturn("test");
         
assertThat(MySQLCommandExecutorFactory.newInstance(MySQLCommandPacketType.COM_FIELD_LIST,
 packet, connectionSession), instanceOf(MySQLComFieldListPacketExecutor.class));
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
index 2c0623e9b25..676d1ed8a05 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
@@ -168,7 +168,7 @@ public final class MySQLComStmtExecuteExecutorTest extends 
ProxyContextRestorer
         MySQLComStmtExecuteExecutor mysqlComStmtExecuteExecutor = new 
MySQLComStmtExecuteExecutor(packet, connectionSession);
         when(databaseCommunicationEngine.execute()).thenReturn(new 
QueryResponseHeader(Collections.singletonList(mock(QueryHeader.class))));
         when(databaseCommunicationEngine.next()).thenReturn(true, false);
-        when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(new 
QueryResponseRow(Collections.singletonList(new QueryResponseCell(Types.INTEGER, 
1))));
+        when(databaseCommunicationEngine.getRowData()).thenReturn(new 
QueryResponseRow(Collections.singletonList(new QueryResponseCell(Types.INTEGER, 
1))));
         Iterator<DatabasePacket<?>> actual;
         try (MockedStatic<DatabaseCommunicationEngineFactory> mockedStatic = 
mockStatic(DatabaseCommunicationEngineFactory.class, RETURNS_DEEP_STUBS)) {
             mockedStatic.when(() -> 
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(any(SQLStatementContext.class),
 anyString(), anyList(), eq(backendConnection)))
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortal.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortal.java
index 766ff7a75ad..d786b388b09 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortal.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortal.java
@@ -182,7 +182,7 @@ public final class JDBCPortal implements Portal<Void> {
     }
     
     private PostgreSQLPacket nextPacket() throws SQLException {
-        return new PostgreSQLDataRowPacket(getData(null != 
databaseCommunicationEngine ? databaseCommunicationEngine.getQueryResponseRow() 
: textProtocolBackendHandler.getRowData()));
+        return new PostgreSQLDataRowPacket(getData(null != 
databaseCommunicationEngine ? databaseCommunicationEngine.getRowData() : 
textProtocolBackendHandler.getRowData()));
     }
     
     private List<Object> getData(final QueryResponseRow queryResponseRow) {
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortalTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortalTest.java
index a735569a6db..07aca1ee282 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortalTest.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortalTest.java
@@ -115,7 +115,7 @@ public final class JDBCPortalTest extends 
ProxyContextRestorer {
         
when(responseHeader.getQueryHeaders()).thenReturn(Collections.singletonList(queryHeader));
         when(databaseCommunicationEngine.execute()).thenReturn(responseHeader);
         when(databaseCommunicationEngine.next()).thenReturn(true, true, false);
-        when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(new 
QueryResponseRow(Collections.singletonList(new QueryResponseCell(Types.INTEGER, 
0))),
+        when(databaseCommunicationEngine.getRowData()).thenReturn(new 
QueryResponseRow(Collections.singletonList(new QueryResponseCell(Types.INTEGER, 
0))),
                 new QueryResponseRow(Collections.singletonList(new 
QueryResponseCell(Types.INTEGER, 1))));
         portal.bind();
         assertThat(portal.describe(), 
instanceOf(PostgreSQLRowDescriptionPacket.class));
@@ -137,7 +137,7 @@ public final class JDBCPortalTest extends 
ProxyContextRestorer {
         
when(responseHeader.getQueryHeaders()).thenReturn(Collections.singletonList(queryHeader));
         when(databaseCommunicationEngine.execute()).thenReturn(responseHeader);
         when(databaseCommunicationEngine.next()).thenReturn(true, true);
-        when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(
+        when(databaseCommunicationEngine.getRowData()).thenReturn(
                 new QueryResponseRow(Collections.singletonList(new 
QueryResponseCell(Types.INTEGER, 0))),
                 new QueryResponseRow(Collections.singletonList(new 
QueryResponseCell(Types.INTEGER, 1))));
         setField(portal, "resultFormats", 
Collections.singletonList(PostgreSQLValueFormat.BINARY));
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
index 7811d7bde2e..b62f3c180cf 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
@@ -109,7 +109,7 @@ public final class ReactiveMySQLComStmtExecuteExecutor 
implements ReactiveComman
             databaseCommunicationEngine = 
DatabaseCommunicationEngineFactory.getInstance()
                     .newDatabaseCommunicationEngine(sqlStatementContext, 
preparedStatement.getSql(), parameters, 
connectionSession.getBackendConnection());
         }
-        return (null != databaseCommunicationEngine ? 
databaseCommunicationEngine.execute() : 
textProtocolBackendHandler.executeFuture()).compose(responseHeader -> {
+        return (null != databaseCommunicationEngine ? 
databaseCommunicationEngine.executeFuture() : 
textProtocolBackendHandler.executeFuture()).compose(responseHeader -> {
             Collection<DatabasePacket<?>> headerPackets = responseHeader 
instanceof QueryResponseHeader ? processQuery((QueryResponseHeader) 
responseHeader, characterSet)
                     : processUpdate((UpdateResponseHeader) responseHeader);
             List<DatabasePacket<?>> result = new LinkedList<>(headerPackets);
@@ -160,7 +160,7 @@ public final class ReactiveMySQLComStmtExecuteExecutor 
implements ReactiveComman
     }
     
     private MySQLPacket getQueryRowPacket() throws SQLException {
-        QueryResponseRow queryResponseRow = 
databaseCommunicationEngine.getQueryResponseRow();
+        QueryResponseRow queryResponseRow = 
databaseCommunicationEngine.getRowData();
         return new MySQLBinaryResultSetRowPacket(++currentSequenceId, 
createBinaryRow(queryResponseRow));
     }
     
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/text/fieldlist/ReactiveMySQLComFieldListPacketExecutor.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/text/fieldlist/ReactiveMySQLComFieldListPacketExecutor.java
index 768aa4255e7..75d51f11cea 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/text/fieldlist/ReactiveMySQLComFieldListPacketExecutor.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/text/fieldlist/ReactiveMySQLComFieldListPacketExecutor.java
@@ -76,7 +76,7 @@ public final class ReactiveMySQLComFieldListPacketExecutor 
implements ReactiveCo
     
     @Override
     public Future<Collection<DatabasePacket<?>>> executeFuture() {
-        return databaseCommunicationEngine.execute().compose(unused -> {
+        return databaseCommunicationEngine.executeFuture().compose(unused -> {
             try {
                 return 
Future.succeededFuture(createColumnDefinition41Packets());
             } catch (SQLException ex) {
@@ -88,7 +88,7 @@ public final class ReactiveMySQLComFieldListPacketExecutor 
implements ReactiveCo
     private Collection<DatabasePacket<?>> createColumnDefinition41Packets() 
throws SQLException {
         Collection<DatabasePacket<?>> result = new LinkedList<>();
         while (databaseCommunicationEngine.next()) {
-            String columnName = 
databaseCommunicationEngine.getQueryResponseRow().getCells().iterator().next().getData().toString();
+            String columnName = 
databaseCommunicationEngine.getRowData().getCells().iterator().next().getData().toString();
             result.add(new MySQLColumnDefinition41Packet(
                     ++currentSequenceId, characterSet, databaseName, 
packet.getTable(), packet.getTable(), columnName, columnName, 100, 
MySQLBinaryColumnType.MYSQL_TYPE_VARCHAR, 0, true));
         }

Reply via email to