This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a5e6b52a4d7 Merge SchemaAssignedDatabaseBackendHandler into
DatabaseCommunicationEngine (#19576)
a5e6b52a4d7 is described below
commit a5e6b52a4d7ec8f449af6e304936c40b521bc79a
Author: 吴伟杰 <[email protected]>
AuthorDate: Tue Jul 26 19:38:16 2022 +0800
Merge SchemaAssignedDatabaseBackendHandler into DatabaseCommunicationEngine
(#19576)
---
.../communication/DatabaseCommunicationEngine.java | 71 +++++++++--
.../jdbc/JDBCDatabaseCommunicationEngine.java | 8 +-
.../vertx/VertxDatabaseCommunicationEngine.java | 9 +-
.../admin/mysql/MySQLSetVariableAdminExecutor.java | 5 +-
.../executor/UnicastResourceShowExecutor.java | 2 +-
.../backend/text/data/DatabaseBackendHandler.java | 10 --
.../text/data/DatabaseBackendHandlerFactory.java | 4 +-
.../impl/SchemaAssignedDatabaseBackendHandler.java | 136 ---------------------
.../data/impl/UnicastDatabaseBackendHandler.java | 11 +-
.../TransactionBackendHandlerFactory.java | 4 +-
.../text/transaction/TransactionXAHandler.java | 7 +-
.../DatabaseCommunicationEngineFactoryTest.java | 4 +-
.../jdbc/JDBCDatabaseCommunicationEngineTest.java | 4 +-
.../TextProtocolBackendHandlerFactoryTest.java | 4 +-
.../mysql/MySQLSetVariableAdminExecutorTest.java | 11 +-
.../data/DatabaseBackendHandlerFactoryTest.java | 34 +++++-
.../SchemaAssignedDatabaseBackendHandlerTest.java | 135 --------------------
.../impl/UnicastDatabaseBackendHandlerTest.java | 4 +-
.../TransactionBackendHandlerFactoryTest.java | 12 +-
.../execute/MySQLComStmtExecuteExecutor.java | 2 +-
.../fieldlist/MySQLComFieldListPacketExecutor.java | 27 ++--
.../command/MySQLCommandExecutorFactoryTest.java | 8 --
.../execute/MySQLComStmtExecuteExecutorTest.java | 2 +-
.../command/query/extended/JDBCPortal.java | 2 +-
.../command/query/extended/JDBCPortalTest.java | 4 +-
.../ReactiveMySQLComStmtExecuteExecutor.java | 4 +-
.../ReactiveMySQLComFieldListPacketExecutor.java | 4 +-
27 files changed, 163 insertions(+), 365 deletions(-)
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index 2596d2ea665..ad5aded840c 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -17,15 +17,21 @@
package org.apache.shardingsphere.proxy.backend.communication;
+import com.google.common.base.Preconditions;
import lombok.AccessLevel;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.Setter;
+import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.binder.LogicSQL;
+import org.apache.shardingsphere.infra.binder.aware.CursorDefinitionAware;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.ddl.CloseStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.ddl.CursorStatementContext;
import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.binder.type.CursorAvailable;
import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.context.refresher.MetaDataRefreshEngine;
+import
org.apache.shardingsphere.infra.distsql.exception.resource.RequiredResourceMissedException;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
@@ -33,11 +39,13 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.event.MetaDataRefreshedEvent;
+import
org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtil;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
import org.apache.shardingsphere.mode.manager.lock.LockJudgeEngine;
import org.apache.shardingsphere.mode.manager.lock.LockJudgeEngineFactory;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import
org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException;
import
org.apache.shardingsphere.proxy.backend.exception.UnsupportedUpdateOperationException;
import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell;
import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
@@ -45,6 +53,9 @@ import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeaderBuilderEngine;
import
org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import
org.apache.shardingsphere.proxy.backend.text.data.DatabaseBackendHandler;
+import
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueGroupsHolder;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -54,13 +65,10 @@ import java.util.Optional;
/**
* Database communication engine.
- *
- * @param <T> type of execute result
*/
-@RequiredArgsConstructor
@Getter(AccessLevel.PROTECTED)
@Setter(AccessLevel.PROTECTED)
-public abstract class DatabaseCommunicationEngine<T> {
+public abstract class DatabaseCommunicationEngine implements
DatabaseBackendHandler {
private final String driverType;
@@ -81,6 +89,8 @@ public abstract class DatabaseCommunicationEngine<T> {
private final LockJudgeEngine lockJudgeEngine;
public DatabaseCommunicationEngine(final String driverType, final
ShardingSphereDatabase database, final LogicSQL logicSQL, final
BackendConnection<?> backendConnection) {
+ SQLStatementContext<?> sqlStatementContext =
logicSQL.getSqlStatementContext();
+ failedIfBackendNotReady(backendConnection.getConnectionSession(),
sqlStatementContext);
this.driverType = driverType;
this.database = database;
this.logicSQL = logicSQL;
@@ -91,14 +101,49 @@ public abstract class DatabaseCommunicationEngine<T> {
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getOptimizerContext().getPlannerContexts(),
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getProps());
lockJudgeEngine = LockJudgeEngineFactory.getInstance();
+ if (sqlStatementContext instanceof CursorAvailable) {
+ prepareCursorStatementContext((CursorAvailable)
sqlStatementContext, backendConnection.getConnectionSession());
+ }
}
- /**
- * Execute.
- *
- * @return execute result
- */
- public abstract T execute();
+ @SneakyThrows(SQLException.class)
+ private void failedIfBackendNotReady(final ConnectionSession
connectionSession, final SQLStatementContext<?> sqlStatementContext) {
+ ShardingSphereDatabase database =
ProxyContext.getInstance().getDatabase(connectionSession.getDatabaseName());
+ boolean isSystemSchema =
SystemSchemaUtil.containsSystemSchema(sqlStatementContext.getDatabaseType(),
sqlStatementContext.getTablesContext().getSchemaNames(), database);
+ if (!isSystemSchema && !database.containsDataSource()) {
+ throw new
RequiredResourceMissedException(connectionSession.getDatabaseName());
+ }
+ if (!isSystemSchema && !database.isComplete()) {
+ throw new RuleNotExistedException();
+ }
+ }
+
+ private void prepareCursorStatementContext(final CursorAvailable
statementContext, final ConnectionSession connectionSession) {
+ if (statementContext.getCursorName().isPresent()) {
+ String cursorName =
statementContext.getCursorName().get().getIdentifier().getValue().toLowerCase();
+ prepareCursorStatementContext(statementContext, connectionSession,
cursorName);
+ }
+ if (statementContext instanceof CloseStatementContext &&
((CloseStatementContext) statementContext).getSqlStatement().isCloseAll()) {
+ FetchOrderByValueGroupsHolder.remove();
+ connectionSession.getCursorDefinitions().clear();
+ }
+ }
+
+ private void prepareCursorStatementContext(final CursorAvailable
statementContext, final ConnectionSession connectionSession, final String
cursorName) {
+ if (statementContext instanceof CursorStatementContext) {
+ connectionSession.getCursorDefinitions().put(cursorName,
(CursorStatementContext) statementContext);
+ }
+ if (statementContext instanceof CursorDefinitionAware) {
+ CursorStatementContext cursorStatementContext =
connectionSession.getCursorDefinitions().get(cursorName);
+ Preconditions.checkArgument(null != cursorStatementContext,
"Cursor %s does not exist.", cursorName);
+ ((CursorDefinitionAware)
statementContext).setUpCursorDefinition(cursorStatementContext);
+ }
+ if (statementContext instanceof CloseStatementContext) {
+
FetchOrderByValueGroupsHolder.getOrderByValueGroups().remove(cursorName);
+
FetchOrderByValueGroupsHolder.getMinGroupRowCounts().remove(cursorName);
+ connectionSession.getCursorDefinitions().remove(cursorName);
+ }
+ }
protected void refreshMetaData(final ExecutionContext executionContext)
throws SQLException {
Optional<MetaDataRefreshedEvent> event =
metadataRefreshEngine.refresh(executionContext.getSqlStatementContext(),
executionContext.getRouteContext().getRouteUnits());
@@ -177,6 +222,7 @@ public abstract class DatabaseCommunicationEngine<T> {
* @return has more result value or not
* @throws SQLException SQL exception
*/
+ @Override
public boolean next() throws SQLException {
return null != mergedResult && mergedResult.next();
}
@@ -187,7 +233,8 @@ public abstract class DatabaseCommunicationEngine<T> {
* @return query response row
* @throws SQLException SQL exception
*/
- public QueryResponseRow getQueryResponseRow() throws SQLException {
+ @Override
+ public QueryResponseRow getRowData() throws SQLException {
List<QueryResponseCell> cells = new ArrayList<>(queryHeaders.size());
for (int columnIndex = 1; columnIndex <= queryHeaders.size();
columnIndex++) {
Object data = mergedResult.getValue(columnIndex, Object.class);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index 8ef03a65a9f..a5ccb845589 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.proxy.backend.communication.jdbc;
-import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.binder.LogicSQL;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
@@ -68,7 +67,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
/**
* JDBC database communication engine.
*/
-public final class JDBCDatabaseCommunicationEngine extends
DatabaseCommunicationEngine<ResponseHeader> {
+public final class JDBCDatabaseCommunicationEngine extends
DatabaseCommunicationEngine {
private final ProxySQLExecutor proxySQLExecutor;
@@ -117,8 +116,8 @@ public final class JDBCDatabaseCommunicationEngine extends
DatabaseCommunication
* @return backend response
*/
@SuppressWarnings({"unchecked", "rawtypes"})
- @SneakyThrows(SQLException.class)
- public ResponseHeader execute() {
+ @Override
+ public ResponseHeader execute() throws SQLException {
LogicSQL logicSQL = getLogicSQL();
MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
ExecutionContext executionContext =
getKernelProcessor().generateExecutionContext(
@@ -181,6 +180,7 @@ public final class JDBCDatabaseCommunicationEngine extends
DatabaseCommunication
*
* @throws SQLException SQL exception
*/
+ @Override
public void close() throws SQLException {
Collection<SQLException> result = new LinkedList<>();
result.addAll(closeResultSets());
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxDatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxDatabaseCommunicationEngine.java
index 16f2be765ac..971da7427be 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxDatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/vertx/VertxDatabaseCommunicationEngine.java
@@ -36,7 +36,7 @@ import java.util.List;
/**
* Vert.x database communication engine.
*/
-public final class VertxDatabaseCommunicationEngine extends
DatabaseCommunicationEngine<Future<ResponseHeader>> {
+public final class VertxDatabaseCommunicationEngine extends
DatabaseCommunicationEngine {
private final ReactiveProxySQLExecutor reactiveProxySQLExecutor;
@@ -52,7 +52,7 @@ public final class VertxDatabaseCommunicationEngine extends
DatabaseCommunicatio
*/
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
- public Future<ResponseHeader> execute() {
+ public Future<ResponseHeader> executeFuture() {
try {
ShardingSphereMetaData metaData =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData();
ExecutionContext executionContext = getKernelProcessor()
@@ -82,4 +82,9 @@ public final class VertxDatabaseCommunicationEngine extends
DatabaseCommunicatio
return Future.failedFuture(ex);
}
}
+
+ @Override
+ public ResponseHeader execute() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutor.java
index f290bbc2cf6..5433d819eac 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutor.java
@@ -23,11 +23,11 @@ import
org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.proxy.backend.text.admin.executor.DatabaseAdminExecutor;
import
org.apache.shardingsphere.proxy.backend.text.data.DatabaseBackendHandler;
-import
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.dal.VariableAssignSegment;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.SetStatement;
@@ -82,7 +82,8 @@ public final class MySQLSetVariableAdminExecutor implements
DatabaseAdminExecuto
SQLStatement sqlStatement =
sqlParserRule.getSQLParserEngine(DatabaseTypeFactory.getInstance("MySQL").getType()).parse(sql,
false);
SQLStatementContext<?> sqlStatementContext =
SQLStatementContextFactory.newInstance(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabases(),
sqlStatement, connectionSession.getDefaultDatabaseName());
- DatabaseBackendHandler databaseBackendHandler = new
SchemaAssignedDatabaseBackendHandler(sqlStatementContext, sql,
connectionSession);
+ DatabaseBackendHandler databaseBackendHandler =
DatabaseCommunicationEngineFactory.getInstance()
+ .newDatabaseCommunicationEngine(sqlStatementContext, sql,
connectionSession.getBackendConnection(), false);
try {
databaseBackendHandler.execute();
} finally {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/UnicastResourceShowExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/UnicastResourceShowExecutor.java
index dad3c756fb2..7a128bb2e77 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/UnicastResourceShowExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/executor/UnicastResourceShowExecutor.java
@@ -111,7 +111,7 @@ public final class UnicastResourceShowExecutor implements
DatabaseAdminQueryExec
private QueryResult createQueryResult() throws SQLException {
List<MemoryQueryResultDataRow> rows = new LinkedList<>();
while (databaseCommunicationEngine.next()) {
- List<Object> data =
databaseCommunicationEngine.getQueryResponseRow().getData();
+ List<Object> data =
databaseCommunicationEngine.getRowData().getData();
rows.add(new MemoryQueryResultDataRow(data));
}
return new RawMemoryQueryResult(getQueryResultMetaData(), rows);
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandler.java
index 65a8827e4c1..158e72aa4a7 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandler.java
@@ -17,20 +17,10 @@
package org.apache.shardingsphere.proxy.backend.text.data;
-import io.vertx.core.Future;
-import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
/**
* Database backend handler.
*/
public interface DatabaseBackendHandler extends TextProtocolBackendHandler {
-
- /**
- * Handlers which communicate with database must implement async execution.
- *
- * @return future of response header
- */
- @Override
- Future<ResponseHeader> executeFuture();
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactory.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactory.java
index c00436ec3e8..4c9f4a236c5 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactory.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactory.java
@@ -21,10 +21,10 @@ import io.vertx.core.Future;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
-import
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
import
org.apache.shardingsphere.proxy.backend.text.data.impl.UnicastDatabaseBackendHandler;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
@@ -68,6 +68,6 @@ public final class DatabaseBackendHandlerFactory {
if (sqlStatement instanceof DALStatement || (sqlStatement instanceof
SelectStatement && null == ((SelectStatement) sqlStatement).getFrom())) {
return new UnicastDatabaseBackendHandler(sqlStatementContext, sql,
connectionSession);
}
- return new SchemaAssignedDatabaseBackendHandler(sqlStatementContext,
sql, connectionSession);
+ return
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(sqlStatementContext,
sql, connectionSession.getBackendConnection(), false);
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
deleted file mode 100644
index d19956213c7..00000000000
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.backend.text.data.impl;
-
-import com.google.common.base.Preconditions;
-import io.vertx.core.Future;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.binder.aware.CursorDefinitionAware;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import
org.apache.shardingsphere.infra.binder.statement.ddl.CloseStatementContext;
-import
org.apache.shardingsphere.infra.binder.statement.ddl.CursorStatementContext;
-import org.apache.shardingsphere.infra.binder.type.CursorAvailable;
-import
org.apache.shardingsphere.infra.distsql.exception.resource.RequiredResourceMissedException;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtil;
-import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
-import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
-import
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import
org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException;
-import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
-import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
-import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
-import
org.apache.shardingsphere.proxy.backend.text.data.DatabaseBackendHandler;
-import
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueGroupsHolder;
-
-import java.sql.SQLException;
-
-/**
- * Database backend handler with assigned schema.
- */
-@RequiredArgsConstructor
-public final class SchemaAssignedDatabaseBackendHandler implements
DatabaseBackendHandler {
-
- private final DatabaseCommunicationEngineFactory
databaseCommunicationEngineFactory =
DatabaseCommunicationEngineFactory.getInstance();
-
- private final SQLStatementContext<?> sqlStatementContext;
-
- private final String sql;
-
- private final ConnectionSession connectionSession;
-
- private DatabaseCommunicationEngine<?> databaseCommunicationEngine;
-
- @Override
- public ResponseHeader execute() throws SQLException {
- prepareDatabaseCommunicationEngine();
- return (ResponseHeader) databaseCommunicationEngine.execute();
- }
-
- @Override
- public Future<ResponseHeader> executeFuture() {
- try {
- prepareDatabaseCommunicationEngine();
- return (Future<ResponseHeader>)
databaseCommunicationEngine.execute();
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- return Future.failedFuture(ex);
- }
- }
-
- private void prepareDatabaseCommunicationEngine() throws
RequiredResourceMissedException {
- ShardingSphereDatabase database =
ProxyContext.getInstance().getDatabase(connectionSession.getDatabaseName());
- boolean isSystemSchema =
SystemSchemaUtil.containsSystemSchema(sqlStatementContext.getDatabaseType(),
sqlStatementContext.getTablesContext().getSchemaNames(), database);
- if (!isSystemSchema && !database.containsDataSource()) {
- throw new
RequiredResourceMissedException(connectionSession.getDatabaseName());
- }
- if (!isSystemSchema && !database.isComplete()) {
- throw new RuleNotExistedException();
- }
- if (sqlStatementContext instanceof CursorAvailable) {
- prepareCursorStatementContext((CursorAvailable)
sqlStatementContext, connectionSession);
- }
- databaseCommunicationEngine =
databaseCommunicationEngineFactory.newDatabaseCommunicationEngine(sqlStatementContext,
sql, connectionSession.getBackendConnection(), false);
- }
-
- private void prepareCursorStatementContext(final CursorAvailable
statementContext, final ConnectionSession connectionSession) {
- if (statementContext.getCursorName().isPresent()) {
- String cursorName =
statementContext.getCursorName().get().getIdentifier().getValue().toLowerCase();
- prepareCursorStatementContext(statementContext, connectionSession,
cursorName);
- }
- if (statementContext instanceof CloseStatementContext &&
((CloseStatementContext) statementContext).getSqlStatement().isCloseAll()) {
- FetchOrderByValueGroupsHolder.remove();
- connectionSession.getCursorDefinitions().clear();
- }
- }
-
- private void prepareCursorStatementContext(final CursorAvailable
statementContext, final ConnectionSession connectionSession, final String
cursorName) {
- if (statementContext instanceof CursorStatementContext) {
- connectionSession.getCursorDefinitions().put(cursorName,
(CursorStatementContext) statementContext);
- }
- if (statementContext instanceof CursorDefinitionAware) {
- CursorStatementContext cursorStatementContext =
connectionSession.getCursorDefinitions().get(cursorName);
- Preconditions.checkArgument(null != cursorStatementContext,
"Cursor %s does not exist.", cursorName);
- ((CursorDefinitionAware)
statementContext).setUpCursorDefinition(cursorStatementContext);
- }
- if (statementContext instanceof CloseStatementContext) {
-
FetchOrderByValueGroupsHolder.getOrderByValueGroups().remove(cursorName);
-
FetchOrderByValueGroupsHolder.getMinGroupRowCounts().remove(cursorName);
- connectionSession.getCursorDefinitions().remove(cursorName);
- }
- }
-
- @Override
- public boolean next() throws SQLException {
- return databaseCommunicationEngine.next();
- }
-
- @Override
- public QueryResponseRow getRowData() throws SQLException {
- return databaseCommunicationEngine.getQueryResponseRow();
- }
-
- @Override
- public void close() throws SQLException {
- if (databaseCommunicationEngine instanceof
JDBCDatabaseCommunicationEngine) {
- ((JDBCDatabaseCommunicationEngine)
databaseCommunicationEngine).close();
- }
- }
-}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
index d99ed210592..c6639c79eb0 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
@@ -22,7 +22,6 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
-import
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.exception.NoDatabaseSelectedException;
import
org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException;
@@ -60,7 +59,7 @@ public final class UnicastDatabaseBackendHandler implements
DatabaseBackendHandl
}
connectionSession.setCurrentDatabase(databaseName);
databaseCommunicationEngine =
databaseCommunicationEngineFactory.newDatabaseCommunicationEngine(sqlStatementContext,
sql, connectionSession.getBackendConnection(), false);
- return ((Future<ResponseHeader>)
databaseCommunicationEngine.execute()).eventually(unused -> {
+ return databaseCommunicationEngine.executeFuture().eventually(unused
-> {
connectionSession.setCurrentDatabase(databaseName);
return Future.succeededFuture();
});
@@ -76,7 +75,7 @@ public final class UnicastDatabaseBackendHandler implements
DatabaseBackendHandl
try {
connectionSession.setCurrentDatabase(databaseName);
databaseCommunicationEngine =
databaseCommunicationEngineFactory.newDatabaseCommunicationEngine(sqlStatementContext,
sql, connectionSession.getBackendConnection(), false);
- return (ResponseHeader) databaseCommunicationEngine.execute();
+ return databaseCommunicationEngine.execute();
} finally {
connectionSession.setCurrentDatabase(databaseName);
}
@@ -101,13 +100,11 @@ public final class UnicastDatabaseBackendHandler
implements DatabaseBackendHandl
@Override
public QueryResponseRow getRowData() throws SQLException {
- return databaseCommunicationEngine.getQueryResponseRow();
+ return databaseCommunicationEngine.getRowData();
}
@Override
public void close() throws SQLException {
- if (databaseCommunicationEngine instanceof
JDBCDatabaseCommunicationEngine) {
- ((JDBCDatabaseCommunicationEngine)
databaseCommunicationEngine).close();
- }
+ databaseCommunicationEngine.close();
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
index 3ba2526aa5e..5f180541ab0 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactory.java
@@ -20,9 +20,9 @@ package
org.apache.shardingsphere.proxy.backend.text.transaction;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
-import
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
import org.apache.shardingsphere.sql.parser.sql.common.constant.OperationScope;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.BeginTransactionStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
@@ -78,6 +78,6 @@ public final class TransactionBackendHandlerFactory {
if (tclStatement instanceof XAStatement) {
return new TransactionXAHandler(sqlStatementContext, sql,
connectionSession);
}
- return new SchemaAssignedDatabaseBackendHandler(sqlStatementContext,
sql, connectionSession);
+ return
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(sqlStatementContext,
sql, connectionSession.getBackendConnection(), false);
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
index 98b4deb53c9..5ac2f610c80 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
@@ -19,11 +19,12 @@ package
org.apache.shardingsphere.proxy.backend.text.transaction;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
-import
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
import
org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueGroupsHolder;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.XAStatement;
@@ -43,12 +44,12 @@ public final class TransactionXAHandler implements
TextProtocolBackendHandler {
private final ConnectionSession connectionSession;
- private final SchemaAssignedDatabaseBackendHandler backendHandler;
+ private final JDBCDatabaseCommunicationEngine backendHandler;
public TransactionXAHandler(final SQLStatementContext<? extends
TCLStatement> sqlStatementContext, final String sql, final ConnectionSession
connectionSession) {
this.tclStatement = (XAStatement)
sqlStatementContext.getSqlStatement();
this.connectionSession = connectionSession;
- this.backendHandler = new
SchemaAssignedDatabaseBackendHandler(sqlStatementContext, sql,
connectionSession);
+ this.backendHandler =
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(sqlStatementContext,
sql, connectionSession.getBackendConnection(), false);
}
@Override
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
index 398b1b43163..a727a44048d 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactoryTest.java
@@ -57,6 +57,8 @@ public final class DatabaseCommunicationEngineFactoryTest
extends ProxyContextRe
private Map<String, ShardingSphereDatabase> getDatabases() {
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class,
RETURNS_DEEP_STUBS);
+ when(database.containsDataSource()).thenReturn(true);
+ when(database.isComplete()).thenReturn(true);
when(database.getResource().getDatabaseType()).thenReturn(new
H2DatabaseType());
when(database.getRuleMetaData().getRules()).thenReturn(Collections.emptyList());
Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
@@ -70,7 +72,7 @@ public final class DatabaseCommunicationEngineFactoryTest
extends ProxyContextRe
when(backendConnection.getConnectionSession().getDatabaseName()).thenReturn("db");
SQLStatementContext<?> sqlStatementContext =
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
when(sqlStatementContext.getTablesContext().getSchemaNames()).thenReturn(Collections.emptyList());
- DatabaseCommunicationEngine<?> engine =
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(sqlStatementContext,
"schemaName", backendConnection, false);
+ DatabaseCommunicationEngine engine =
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(sqlStatementContext,
"schemaName", backendConnection, false);
assertThat(engine, instanceOf(DatabaseCommunicationEngine.class));
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
index a0788131e97..3c865fa9cd5 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngineTest.java
@@ -104,6 +104,8 @@ public final class JDBCDatabaseCommunicationEngineTest
extends ProxyContextResto
private Map<String, ShardingSphereDatabase> mockDatabases() {
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class,
RETURNS_DEEP_STUBS);
+ when(database.containsDataSource()).thenReturn(true);
+ when(database.isComplete()).thenReturn(true);
when(database.getResource().getDatabaseType()).thenReturn(new
H2DatabaseType());
when(database.getRuleMetaData().getRules()).thenReturn(Collections.emptyList());
Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
@@ -135,7 +137,7 @@ public final class JDBCDatabaseCommunicationEngineTest
extends ProxyContextResto
});
Exception ex = null;
try {
- engine.getQueryResponseRow();
+ engine.getRowData();
} catch (final SQLException | IndexOutOfBoundsException e) {
ex = e;
} finally {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactoryTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactoryTest.java
index e2495f3c0ec..7b5e8177aed 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactoryTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactoryTest.java
@@ -27,12 +27,12 @@ import
org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.proxy.backend.text.admin.DatabaseAdminQueryBackendHandler;
import
org.apache.shardingsphere.proxy.backend.text.admin.DatabaseAdminUpdateBackendHandler;
-import
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
import
org.apache.shardingsphere.proxy.backend.text.data.impl.UnicastDatabaseBackendHandler;
import
org.apache.shardingsphere.proxy.backend.text.distsql.ral.QueryableRALBackendHandler;
import
org.apache.shardingsphere.proxy.backend.text.distsql.ral.hint.HintRALBackendHandler;
@@ -213,7 +213,7 @@ public final class TextProtocolBackendHandlerFactoryTest
extends ProxyContextRes
when(proxyContext.getAllDatabaseNames()).thenReturn(new
HashSet<>(Collections.singletonList("db")));
when(proxyContext.getDatabase("db").containsDataSource()).thenReturn(true);
TextProtocolBackendHandler actual =
TextProtocolBackendHandlerFactory.newInstance(databaseType, sql,
connectionSession);
- assertThat(actual,
instanceOf(SchemaAssignedDatabaseBackendHandler.class));
+ assertThat(actual, instanceOf(DatabaseCommunicationEngine.class));
sql = "select * from information_schema.schemata limit 1";
actual = TextProtocolBackendHandlerFactory.newInstance(databaseType,
sql, connectionSession);
assertThat(actual, instanceOf(DatabaseAdminQueryBackendHandler.class));
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutorTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutorTest.java
index db7da356a9a..0d95a0e4784 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutorTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/admin/mysql/MySQLSetVariableAdminExecutorTest.java
@@ -17,13 +17,15 @@
package org.apache.shardingsphere.proxy.backend.text.admin.mysql;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
-import
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
import org.apache.shardingsphere.proxy.backend.util.ProxyContextRestorer;
import org.apache.shardingsphere.sql.parser.api.CacheOption;
import
org.apache.shardingsphere.sql.parser.sql.common.segment.dal.VariableAssignSegment;
@@ -50,10 +52,15 @@ public final class MySQLSetVariableAdminExecutorTest
extends ProxyContextRestore
MySQLSetVariableAdminExecutor executor = new
MySQLSetVariableAdminExecutor(setStatement);
ConnectionSession connectionSession = mock(ConnectionSession.class);
when(connectionSession.getDatabaseName()).thenReturn("db");
+ JDBCBackendConnection backendConnection =
mock(JDBCBackendConnection.class);
+
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
+
when(backendConnection.getConnectionSession()).thenReturn(connectionSession);
ProxyContext.init(mock(ContextManager.class, RETURNS_DEEP_STUBS));
+
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db")).thenReturn(mock(ShardingSphereDatabase.class));
+
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().containsDatabase("db")).thenReturn(true);
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData())
.thenReturn(new
ShardingSphereRuleMetaData(Collections.singletonList(new SQLParserRule(new
SQLParserRuleConfiguration(false, new CacheOption(1, 1), new CacheOption(1,
1))))));
- try (MockedConstruction<SchemaAssignedDatabaseBackendHandler>
mockConstruction =
mockConstruction(SchemaAssignedDatabaseBackendHandler.class)) {
+ try (MockedConstruction<JDBCDatabaseCommunicationEngine>
mockConstruction = mockConstruction(JDBCDatabaseCommunicationEngine.class)) {
executor.execute(connectionSession);
verify(mockConstruction.constructed().get(0)).execute();
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactoryTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactoryTest.java
index ff82e73d0ac..703ead41f37 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactoryTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/DatabaseBackendHandlerFactoryTest.java
@@ -18,20 +18,31 @@
package org.apache.shardingsphere.proxy.backend.text.data;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
+import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
-import
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
import
org.apache.shardingsphere.proxy.backend.text.data.impl.UnicastDatabaseBackendHandler;
+import org.apache.shardingsphere.proxy.backend.util.ProxyContextRestorer;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.junit.Test;
+import org.mockito.MockedConstruction;
+
+import java.util.Collections;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.when;
-public final class DatabaseBackendHandlerFactoryTest {
+public final class DatabaseBackendHandlerFactoryTest extends
ProxyContextRestorer {
@Test
public void
assertNewInstanceReturnedUnicastDatabaseBackendHandlerWithDAL() {
@@ -54,9 +65,22 @@ public final class DatabaseBackendHandlerFactoryTest {
@Test
public void
assertNewInstanceReturnedSchemaAssignedDatabaseBackendHandler() {
String sql = "SELECT 1 FROM user WHERE id = 1";
- SQLStatementContext<SQLStatement> context =
mock(SQLStatementContext.class);
+ SQLStatementContext<SQLStatement> context =
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
when(context.getSqlStatement()).thenReturn(mock(SQLStatement.class));
- DatabaseBackendHandler actual =
DatabaseBackendHandlerFactory.newInstance(context, sql,
mock(ConnectionSession.class));
- assertThat(actual,
instanceOf(SchemaAssignedDatabaseBackendHandler.class));
+
when(context.getTablesContext().getSchemaNames()).thenReturn(Collections.emptyList());
+ ProxyContext.init(mock(ContextManager.class, RETURNS_DEEP_STUBS));
+ ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
+ when(database.isComplete()).thenReturn(true);
+ when(database.containsDataSource()).thenReturn(true);
+
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase("db")).thenReturn(database);
+
when(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().containsDatabase("db")).thenReturn(true);
+ ConnectionSession connectionSession = mock(ConnectionSession.class);
+ when(connectionSession.getDatabaseName()).thenReturn("db");
+
when(connectionSession.getBackendConnection()).thenReturn(mock(JDBCBackendConnection.class));
+
when(connectionSession.getBackendConnection().getConnectionSession()).thenReturn(connectionSession);
+ try (MockedConstruction<JDBCDatabaseCommunicationEngine> unused =
mockConstruction(JDBCDatabaseCommunicationEngine.class)) {
+ DatabaseBackendHandler actual =
DatabaseBackendHandlerFactory.newInstance(context, sql, connectionSession);
+ assertThat(actual, instanceOf(DatabaseCommunicationEngine.class));
+ }
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandlerTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandlerTest.java
deleted file mode 100644
index 61e4652c921..00000000000
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandlerTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.backend.text.data.impl;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
-import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
-import
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
-import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
-import
org.apache.shardingsphere.proxy.backend.text.data.DatabaseBackendHandler;
-import org.apache.shardingsphere.proxy.backend.util.ProxyContextRestorer;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.lang.reflect.Field;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class SchemaAssignedDatabaseBackendHandlerTest extends
ProxyContextRestorer {
-
- private static final String EXECUTE_SQL = "USE test";
-
- private static final String DATABASE_PATTERN = "db_%s";
-
- private SchemaAssignedDatabaseBackendHandler
schemaAssignedDatabaseBackendHandler;
-
- @Mock
- private ConnectionSession connectionSession;
-
- @Mock
- private DatabaseCommunicationEngineFactory
databaseCommunicationEngineFactory;
-
- @Mock
- private DatabaseCommunicationEngine databaseCommunicationEngine;
-
- @Before
- public void setUp() throws IllegalAccessException, NoSuchFieldException,
SQLException {
- ContextManager contextManager = mock(ContextManager.class,
RETURNS_DEEP_STUBS);
- MetaDataContexts metaDataContexts = new
MetaDataContexts(mock(MetaDataPersistService.class),
- new ShardingSphereMetaData(getDatabases(),
mock(ShardingSphereRuleMetaData.class), new ConfigurationProperties(new
Properties())), mock(OptimizerContext.class));
-
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
- ProxyContext.init(contextManager);
-
when(connectionSession.getDatabaseName()).thenReturn(String.format(DATABASE_PATTERN,
0));
- mockDatabaseCommunicationEngine(new
UpdateResponseHeader(mock(SQLStatement.class)));
- SQLStatementContext<?> sqlStatementContext =
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
- when(sqlStatementContext.getDatabaseType()).thenReturn(new
MySQLDatabaseType());
-
when(sqlStatementContext.getTablesContext().getSchemaNames()).thenReturn(Collections.emptyList());
- schemaAssignedDatabaseBackendHandler = new
SchemaAssignedDatabaseBackendHandler(sqlStatementContext, EXECUTE_SQL,
connectionSession);
- setBackendHandlerFactory(schemaAssignedDatabaseBackendHandler);
- }
-
- private Map<String, ShardingSphereDatabase> getDatabases() {
- Map<String, ShardingSphereDatabase> result = new HashMap<>(10, 1);
- for (int i = 0; i < 10; i++) {
- ShardingSphereDatabase database =
mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
- when(database.isComplete()).thenReturn(true);
- when(database.containsDataSource()).thenReturn(true);
- when(database.getResource().getDatabaseType()).thenReturn(new
H2DatabaseType());
- result.put(String.format(DATABASE_PATTERN, i), database);
- }
- return result;
- }
-
- private void mockDatabaseCommunicationEngine(final ResponseHeader
responseHeader) {
- when(databaseCommunicationEngine.execute()).thenReturn(responseHeader);
-
when(databaseCommunicationEngineFactory.newDatabaseCommunicationEngine(any(),
anyString(), any(), eq(false))).thenReturn(databaseCommunicationEngine);
- }
-
- @SneakyThrows(ReflectiveOperationException.class)
- private void setBackendHandlerFactory(final DatabaseBackendHandler
schemaDatabaseBackendHandler) {
- Field field =
schemaDatabaseBackendHandler.getClass().getDeclaredField("databaseCommunicationEngineFactory");
- field.setAccessible(true);
- field.set(schemaDatabaseBackendHandler,
databaseCommunicationEngineFactory);
- }
-
- @Test
- public void assertExecuteDatabaseBackendHandler() throws SQLException {
- ResponseHeader actual = schemaAssignedDatabaseBackendHandler.execute();
- assertThat(actual, instanceOf(UpdateResponseHeader.class));
- }
-
- @Test
- public void assertDatabaseUsingStream() throws SQLException {
- schemaAssignedDatabaseBackendHandler.execute();
- while (schemaAssignedDatabaseBackendHandler.next()) {
-
assertThat(schemaAssignedDatabaseBackendHandler.getRowData().getData().size(),
is(1));
- }
- }
-}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandlerTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandlerTest.java
index f4d661f498f..bc0ec864f0e 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandlerTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandlerTest.java
@@ -78,7 +78,7 @@ public final class UnicastDatabaseBackendHandlerTest extends
ProxyContextRestore
private DatabaseCommunicationEngine databaseCommunicationEngine;
@Before
- public void setUp() {
+ public void setUp() throws SQLException {
ContextManager contextManager = mock(ContextManager.class,
RETURNS_DEEP_STUBS);
MetaDataContexts metaDataContexts = new
MetaDataContexts(mock(MetaDataPersistService.class),
new ShardingSphereMetaData(getDatabases(),
mock(ShardingSphereRuleMetaData.class), new ConfigurationProperties(new
Properties())), mock(OptimizerContext.class));
@@ -101,7 +101,7 @@ public final class UnicastDatabaseBackendHandlerTest
extends ProxyContextRestore
return result;
}
- private void mockDatabaseCommunicationEngine(final ResponseHeader
responseHeader) {
+ private void mockDatabaseCommunicationEngine(final ResponseHeader
responseHeader) throws SQLException {
when(databaseCommunicationEngine.execute()).thenReturn(responseHeader);
when(databaseCommunicationEngineFactory.newDatabaseCommunicationEngine(any(),
anyString(), any(), eq(false))).thenReturn(databaseCommunicationEngine);
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactoryTest.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactoryTest.java
index f6114c60fec..5b6d8891dce 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactoryTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionBackendHandlerFactoryTest.java
@@ -21,12 +21,13 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
+import
org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.JDBCBackendTransactionManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
-import
org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
import org.apache.shardingsphere.proxy.backend.util.ProxyContextRestorer;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackStatement;
@@ -37,6 +38,7 @@ import org.hamcrest.Matcher;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Answers;
+import org.mockito.MockedStatic;
import java.lang.reflect.Field;
@@ -45,6 +47,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
public final class TransactionBackendHandlerFactoryTest extends
ProxyContextRestorer {
@@ -92,7 +95,12 @@ public final class TransactionBackendHandlerFactoryTest
extends ProxyContextRest
public void assertBroadcastBackendHandlerReturnedWhenTCLStatementNotHit() {
SQLStatementContext<TCLStatement> context =
mock(SQLStatementContext.class);
when(context.getSqlStatement()).thenReturn(mock(TCLStatement.class));
- assertThat(TransactionBackendHandlerFactory.newInstance(context, null,
mock(ConnectionSession.class)),
instanceOf(SchemaAssignedDatabaseBackendHandler.class));
+ try (MockedStatic<DatabaseCommunicationEngineFactory> mockedStatic =
mockStatic(DatabaseCommunicationEngineFactory.class)) {
+ DatabaseCommunicationEngineFactory mockFactory =
mock(DatabaseCommunicationEngineFactory.class);
+
mockedStatic.when(DatabaseCommunicationEngineFactory::getInstance).thenReturn(mockFactory);
+ when(mockFactory.newDatabaseCommunicationEngine(context, null,
null, false)).thenReturn(mock(DatabaseCommunicationEngine.class));
+ assertThat(TransactionBackendHandlerFactory.newInstance(context,
null, mock(ConnectionSession.class)),
instanceOf(DatabaseCommunicationEngine.class));
+ }
}
@SuppressWarnings("unchecked")
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index 2a05336d484..5beb630eb54 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -148,7 +148,7 @@ public final class MySQLComStmtExecuteExecutor implements
QueryCommandExecutor {
@Override
public MySQLPacket getQueryRowPacket() throws SQLException {
- QueryResponseRow queryResponseRow =
databaseCommunicationEngine.getQueryResponseRow();
+ QueryResponseRow queryResponseRow =
databaseCommunicationEngine.getRowData();
return new MySQLBinaryResultSetRowPacket(++currentSequenceId,
createBinaryRow(queryResponseRow));
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
index e5b0530919c..aa4eafcb756 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
@@ -17,6 +17,7 @@
package
org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.fieldlist;
+import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
@@ -44,6 +45,7 @@ import java.util.LinkedList;
/**
* COM_FIELD_LIST packet executor for MySQL.
*/
+@RequiredArgsConstructor
public final class MySQLComFieldListPacketExecutor implements CommandExecutor {
private static final String SQL = "SHOW COLUMNS FROM %s FROM %s";
@@ -52,18 +54,13 @@ public final class MySQLComFieldListPacketExecutor
implements CommandExecutor {
private final ConnectionSession connectionSession;
- private final String databaseName;
-
- private final JDBCDatabaseCommunicationEngine databaseCommunicationEngine;
-
- private final int characterSet;
+ private JDBCDatabaseCommunicationEngine databaseCommunicationEngine;
private int currentSequenceId;
- public MySQLComFieldListPacketExecutor(final MySQLComFieldListPacket
packet, final ConnectionSession connectionSession) {
- this.packet = packet;
- this.connectionSession = connectionSession;
- databaseName = connectionSession.getDefaultDatabaseName();
+ @Override
+ public Collection<DatabasePacket<?>> execute() throws SQLException {
+ String databaseName = connectionSession.getDefaultDatabaseName();
String sql = String.format(SQL, packet.getTable(), databaseName);
MetaDataContexts metaDataContexts =
ProxyContext.getInstance().getContextManager().getMetaDataContexts();
SQLParserRule sqlParserRule =
metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
@@ -72,19 +69,15 @@ public final class MySQLComFieldListPacketExecutor
implements CommandExecutor {
SQLStatementContext<?> sqlStatementContext =
SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData().getDatabases(),
sqlStatement, databaseName);
JDBCBackendConnection backendConnection = (JDBCBackendConnection)
connectionSession.getBackendConnection();
databaseCommunicationEngine =
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(sqlStatementContext,
sql, backendConnection, false);
- characterSet =
connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
- }
-
- @Override
- public Collection<DatabasePacket<?>> execute() throws SQLException {
databaseCommunicationEngine.execute();
- return createColumnDefinition41Packets();
+ return createColumnDefinition41Packets(databaseName);
}
- private Collection<DatabasePacket<?>> createColumnDefinition41Packets()
throws SQLException {
+ private Collection<DatabasePacket<?>>
createColumnDefinition41Packets(final String databaseName) throws SQLException {
Collection<DatabasePacket<?>> result = new LinkedList<>();
+ int characterSet =
connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
while (databaseCommunicationEngine.next()) {
- String columnName =
databaseCommunicationEngine.getQueryResponseRow().getCells().iterator().next().getData().toString();
+ String columnName =
databaseCommunicationEngine.getRowData().getCells().iterator().next().getData().toString();
result.add(new MySQLColumnDefinition41Packet(
++currentSequenceId, characterSet, databaseName,
packet.getTable(), packet.getTable(), columnName, columnName, 100,
MySQLBinaryColumnType.MYSQL_TYPE_VARCHAR, 0, true));
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
index 5b0a2ac9e91..77aa2189558 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactoryTest.java
@@ -39,8 +39,6 @@ import
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRule
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.parser.rule.SQLParserRule;
-import
org.apache.shardingsphere.parser.rule.builder.DefaultSQLParserRuleConfigurationBuilder;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -87,11 +85,8 @@ public final class MySQLCommandExecutorFactoryTest extends
ProxyContextRestorer
@Before
public void setUp() {
- when(connectionSession.getDatabaseName()).thenReturn("logic_db");
-
when(connectionSession.getDefaultDatabaseName()).thenReturn("logic_db");
when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_GENERAL_CI);
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
-
when(backendConnection.getConnectionSession()).thenReturn(connectionSession);
ShardingSphereDatabase database = mockDatabase();
Map<String, ShardingSphereDatabase> databases = new LinkedHashMap<>(1,
1);
databases.put("logic_db", database);
@@ -101,8 +96,6 @@ public final class MySQLCommandExecutorFactoryTest extends
ProxyContextRestorer
mock(OptimizerContext.class, RETURNS_DEEP_STUBS));
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
ProxyContext.init(contextManager);
-
when(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class))
- .thenReturn(new SQLParserRule(new
DefaultSQLParserRuleConfigurationBuilder().build()));
}
private ShardingSphereDatabase mockDatabase() {
@@ -126,7 +119,6 @@ public final class MySQLCommandExecutorFactoryTest extends
ProxyContextRestorer
@Test
public void assertNewInstanceWithComFieldList() throws SQLException {
MySQLComFieldListPacket packet = mock(MySQLComFieldListPacket.class);
- when(packet.getTable()).thenReturn("test");
assertThat(MySQLCommandExecutorFactory.newInstance(MySQLCommandPacketType.COM_FIELD_LIST,
packet, connectionSession), instanceOf(MySQLComFieldListPacketExecutor.class));
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
index 2c0623e9b25..676d1ed8a05 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
@@ -168,7 +168,7 @@ public final class MySQLComStmtExecuteExecutorTest extends
ProxyContextRestorer
MySQLComStmtExecuteExecutor mysqlComStmtExecuteExecutor = new
MySQLComStmtExecuteExecutor(packet, connectionSession);
when(databaseCommunicationEngine.execute()).thenReturn(new
QueryResponseHeader(Collections.singletonList(mock(QueryHeader.class))));
when(databaseCommunicationEngine.next()).thenReturn(true, false);
- when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(new
QueryResponseRow(Collections.singletonList(new QueryResponseCell(Types.INTEGER,
1))));
+ when(databaseCommunicationEngine.getRowData()).thenReturn(new
QueryResponseRow(Collections.singletonList(new QueryResponseCell(Types.INTEGER,
1))));
Iterator<DatabasePacket<?>> actual;
try (MockedStatic<DatabaseCommunicationEngineFactory> mockedStatic =
mockStatic(DatabaseCommunicationEngineFactory.class, RETURNS_DEEP_STUBS)) {
mockedStatic.when(() ->
DatabaseCommunicationEngineFactory.getInstance().newDatabaseCommunicationEngine(any(SQLStatementContext.class),
anyString(), anyList(), eq(backendConnection)))
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortal.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortal.java
index 766ff7a75ad..d786b388b09 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortal.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortal.java
@@ -182,7 +182,7 @@ public final class JDBCPortal implements Portal<Void> {
}
private PostgreSQLPacket nextPacket() throws SQLException {
- return new PostgreSQLDataRowPacket(getData(null !=
databaseCommunicationEngine ? databaseCommunicationEngine.getQueryResponseRow()
: textProtocolBackendHandler.getRowData()));
+ return new PostgreSQLDataRowPacket(getData(null !=
databaseCommunicationEngine ? databaseCommunicationEngine.getRowData() :
textProtocolBackendHandler.getRowData()));
}
private List<Object> getData(final QueryResponseRow queryResponseRow) {
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortalTest.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortalTest.java
index a735569a6db..07aca1ee282 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortalTest.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/JDBCPortalTest.java
@@ -115,7 +115,7 @@ public final class JDBCPortalTest extends
ProxyContextRestorer {
when(responseHeader.getQueryHeaders()).thenReturn(Collections.singletonList(queryHeader));
when(databaseCommunicationEngine.execute()).thenReturn(responseHeader);
when(databaseCommunicationEngine.next()).thenReturn(true, true, false);
- when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(new
QueryResponseRow(Collections.singletonList(new QueryResponseCell(Types.INTEGER,
0))),
+ when(databaseCommunicationEngine.getRowData()).thenReturn(new
QueryResponseRow(Collections.singletonList(new QueryResponseCell(Types.INTEGER,
0))),
new QueryResponseRow(Collections.singletonList(new
QueryResponseCell(Types.INTEGER, 1))));
portal.bind();
assertThat(portal.describe(),
instanceOf(PostgreSQLRowDescriptionPacket.class));
@@ -137,7 +137,7 @@ public final class JDBCPortalTest extends
ProxyContextRestorer {
when(responseHeader.getQueryHeaders()).thenReturn(Collections.singletonList(queryHeader));
when(databaseCommunicationEngine.execute()).thenReturn(responseHeader);
when(databaseCommunicationEngine.next()).thenReturn(true, true);
- when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(
+ when(databaseCommunicationEngine.getRowData()).thenReturn(
new QueryResponseRow(Collections.singletonList(new
QueryResponseCell(Types.INTEGER, 0))),
new QueryResponseRow(Collections.singletonList(new
QueryResponseCell(Types.INTEGER, 1))));
setField(portal, "resultFormats",
Collections.singletonList(PostgreSQLValueFormat.BINARY));
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
index 7811d7bde2e..b62f3c180cf 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
@@ -109,7 +109,7 @@ public final class ReactiveMySQLComStmtExecuteExecutor
implements ReactiveComman
databaseCommunicationEngine =
DatabaseCommunicationEngineFactory.getInstance()
.newDatabaseCommunicationEngine(sqlStatementContext,
preparedStatement.getSql(), parameters,
connectionSession.getBackendConnection());
}
- return (null != databaseCommunicationEngine ?
databaseCommunicationEngine.execute() :
textProtocolBackendHandler.executeFuture()).compose(responseHeader -> {
+ return (null != databaseCommunicationEngine ?
databaseCommunicationEngine.executeFuture() :
textProtocolBackendHandler.executeFuture()).compose(responseHeader -> {
Collection<DatabasePacket<?>> headerPackets = responseHeader
instanceof QueryResponseHeader ? processQuery((QueryResponseHeader)
responseHeader, characterSet)
: processUpdate((UpdateResponseHeader) responseHeader);
List<DatabasePacket<?>> result = new LinkedList<>(headerPackets);
@@ -160,7 +160,7 @@ public final class ReactiveMySQLComStmtExecuteExecutor
implements ReactiveComman
}
private MySQLPacket getQueryRowPacket() throws SQLException {
- QueryResponseRow queryResponseRow =
databaseCommunicationEngine.getQueryResponseRow();
+ QueryResponseRow queryResponseRow =
databaseCommunicationEngine.getRowData();
return new MySQLBinaryResultSetRowPacket(++currentSequenceId,
createBinaryRow(queryResponseRow));
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/text/fieldlist/ReactiveMySQLComFieldListPacketExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/text/fieldlist/ReactiveMySQLComFieldListPacketExecutor.java
index 768aa4255e7..75d51f11cea 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/text/fieldlist/ReactiveMySQLComFieldListPacketExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/text/fieldlist/ReactiveMySQLComFieldListPacketExecutor.java
@@ -76,7 +76,7 @@ public final class ReactiveMySQLComFieldListPacketExecutor
implements ReactiveCo
@Override
public Future<Collection<DatabasePacket<?>>> executeFuture() {
- return databaseCommunicationEngine.execute().compose(unused -> {
+ return databaseCommunicationEngine.executeFuture().compose(unused -> {
try {
return
Future.succeededFuture(createColumnDefinition41Packets());
} catch (SQLException ex) {
@@ -88,7 +88,7 @@ public final class ReactiveMySQLComFieldListPacketExecutor
implements ReactiveCo
private Collection<DatabasePacket<?>> createColumnDefinition41Packets()
throws SQLException {
Collection<DatabasePacket<?>> result = new LinkedList<>();
while (databaseCommunicationEngine.next()) {
- String columnName =
databaseCommunicationEngine.getQueryResponseRow().getCells().iterator().next().getData().toString();
+ String columnName =
databaseCommunicationEngine.getRowData().getCells().iterator().next().getData().toString();
result.add(new MySQLColumnDefinition41Packet(
++currentSequenceId, characterSet, databaseName,
packet.getTable(), packet.getTable(), columnName, columnName, 100,
MySQLBinaryColumnType.MYSQL_TYPE_VARCHAR, 0, true));
}