This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1d7db925dbc Add EnumerableScanImplementorTest (#37284)
1d7db925dbc is described below
commit 1d7db925dbcd5884931420f9a765cbdd0145f469
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 6 21:58:41 2025 +0800
Add EnumerableScanImplementorTest (#37284)
* Add tests for MemoryTableStatisticsBuilder and ExecutorBindContext
* Add EnumerableScanImplementorTest
* Add EnumerableScanImplementorTest
* Add EnumerableScanImplementorTest
---
.../implementor/EnumerableScanImplementor.java | 137 ++++----
.../implementor/EnumerableScanImplementorTest.java | 368 +++++++++++++++++++++
.../executor/EnumerableScanImplementorTest.java | 95 ------
3 files changed, 427 insertions(+), 173 deletions(-)
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementor.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementor.java
index fadc6b6b123..45d62e5b1d4 100644
---
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementor.java
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementor.java
@@ -62,6 +62,7 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
@@ -86,11 +87,14 @@ public final class EnumerableScanImplementor implements
ScanImplementor {
@Override
public Enumerable<Object> implement(final ShardingSphereTable table, final
ScanImplementorContext scanContext) {
SQLStatementContext sqlStatementContext =
queryContext.getSqlStatementContext();
- if (containsSystemSchema(sqlStatementContext)) {
- return createMemoryEnumerable(sqlStatementContext, table);
+ DatabaseType databaseType =
sqlStatementContext.getSqlStatement().getDatabaseType();
+ Collection<String> systemSchemas = new
SystemDatabase(databaseType).getSystemSchemas();
+ if
(sqlStatementContext.getTablesContext().getSchemaNames().stream().anyMatch(systemSchemas::contains))
{
+ return createMemoryEnumerable(sqlStatementContext, databaseType,
table);
}
- QueryContext scanQueryContext =
createQueryContext(queryContext.getMetaData(), scanContext,
sqlStatementContext.getSqlStatement().getDatabaseType(),
queryContext.isUseCache());
- ExecutionContext executionContext = new
KernelProcessor().generateExecutionContext(scanQueryContext,
queryContext.getMetaData().getGlobalRuleMetaData(),
queryContext.getMetaData().getProps());
+ QueryContext scanQueryContext =
createQueryContext(queryContext.getMetaData(), scanContext, databaseType,
queryContext.isUseCache());
+ ExecutionContext executionContext = new
KernelProcessor().generateExecutionContext(
+ scanQueryContext,
queryContext.getMetaData().getGlobalRuleMetaData(),
queryContext.getMetaData().getProps());
if (executorContext.isPreview()) {
executorContext.getPreviewExecutionUnits().addAll(executionContext.getExecutionUnits());
return createEmptyEnumerable();
@@ -98,15 +102,50 @@ public final class EnumerableScanImplementor implements
ScanImplementor {
return createJDBCEnumerable(scanQueryContext,
queryContext.getMetaData().getDatabase(executorContext.getCurrentDatabaseName()),
executionContext);
}
- private boolean containsSystemSchema(final SQLStatementContext
sqlStatementContext) {
- Collection<String> usedSchemaNames =
sqlStatementContext.getTablesContext().getSchemaNames();
- Collection<String> systemSchemas = new
SystemDatabase(sqlStatementContext.getSqlStatement().getDatabaseType()).getSystemSchemas();
- for (String each : usedSchemaNames) {
- if (systemSchemas.contains(each)) {
- return true;
- }
+ private Enumerable<Object> createMemoryEnumerable(final
SQLStatementContext sqlStatementContext, final DatabaseType databaseType, final
ShardingSphereTable table) {
+ Optional<DialectDriverQuerySystemCatalogOption>
driverQuerySystemCatalogOption = new
DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getDriverQuerySystemCatalogOption();
+ if (driverQuerySystemCatalogOption.isPresent() &&
driverQuerySystemCatalogOption.get().isSystemTable(table.getName())) {
+ return
createMemoryEnumerator(MemoryTableStatisticsBuilder.buildTableStatistics(table,
queryContext.getMetaData(), driverQuerySystemCatalogOption.get()), table,
databaseType);
}
- return false;
+ String databaseName =
sqlStatementContext.getTablesContext().getDatabaseName().orElse(executorContext.getCurrentDatabaseName());
+ String schemaName =
sqlStatementContext.getTablesContext().getSchemaName().orElse(executorContext.getCurrentSchemaName());
+ Optional<TableStatistics> tableStatistics =
Optional.ofNullable(executorContext.getStatistics().getDatabaseStatistics(databaseName))
+ .map(optional ->
optional.getSchemaStatistics(schemaName)).map(optional ->
optional.getTableStatistics(table.getName()));
+ return tableStatistics.map(optional ->
createMemoryEnumerator(optional, table,
databaseType)).orElseGet(this::createEmptyEnumerable);
+ }
+
+ private Enumerable<Object> createMemoryEnumerator(final TableStatistics
tableStatistics, final ShardingSphereTable table, final DatabaseType
databaseType) {
+ return new AbstractEnumerable<Object>() {
+
+ @Override
+ public Enumerator<Object> enumerator() {
+ return new MemoryDataRowEnumerator(tableStatistics.getRows(),
table.getAllColumns(), databaseType);
+ }
+ };
+ }
+
+ private QueryContext createQueryContext(final ShardingSphereMetaData
metaData, final ScanImplementorContext sqlString, final DatabaseType
databaseType, final boolean useCache) {
+ String sql = sqlString.getSql().replace(System.lineSeparator(), " ");
+ SQLStatement sqlStatement =
compilerContext.getSqlParserRule().getSQLParserEngine(databaseType).parse(sql,
useCache);
+ HintValueContext hintValueContext = new HintValueContext();
+ SQLStatementContext sqlStatementContext = new SQLBindEngine(metaData,
executorContext.getCurrentDatabaseName(), hintValueContext).bind(sqlStatement);
+ return new QueryContext(sqlStatementContext, sql,
getParameters(sqlString.getParamIndexes()), hintValueContext,
queryContext.getConnectionContext(), metaData, useCache);
+ }
+
+ private List<Object> getParameters(final int[] paramIndexes) {
+ return null == paramIndexes
+ ? Collections.emptyList()
+ : Arrays.stream(paramIndexes).mapToObj(each ->
queryContext.getParameters().get(each)).collect(Collectors.toCollection(() ->
new ArrayList<>(paramIndexes.length)));
+ }
+
+ private AbstractEnumerable<Object> createEmptyEnumerable() {
+ return new AbstractEnumerable<Object>() {
+
+ @Override
+ public Enumerator<Object> enumerator() {
+ return new EmptyDataRowEnumerator();
+ }
+ };
}
private AbstractEnumerable<Object> createJDBCEnumerable(final QueryContext
queryContext, final ShardingSphereDatabase database, final ExecutionContext
executionContext) {
@@ -130,44 +169,17 @@ public final class EnumerableScanImplementor implements
ScanImplementor {
};
}
- private ExecutionGroupContext<JDBCExecutionUnit> prepare(final
ShardingSphereDatabase database, final ExecutionContext executionContext)
throws SQLException {
- // TODO pass grantee from proxy and jdbc adapter
- return executorContext.getPrepareEngine().prepare(database.getName(),
executionContext, executorContext.getConnectionOffsets(),
executionContext.getExecutionUnits(),
- new
ExecutionGroupReportContext(executorContext.getProcessId(),
database.getName()));
- }
-
private void computeConnectionOffsets(final ExecutionContext context) {
for (ExecutionUnit each : context.getExecutionUnits()) {
- if
(executorContext.getConnectionOffsets().containsKey(each.getDataSourceName())) {
- int connectionOffset =
executorContext.getConnectionOffsets().get(each.getDataSourceName());
-
executorContext.getConnectionOffsets().put(each.getDataSourceName(),
++connectionOffset);
- } else {
-
executorContext.getConnectionOffsets().put(each.getDataSourceName(), 0);
- }
+ int connectionOffset =
executorContext.getConnectionOffsets().containsKey(each.getDataSourceName()) ?
executorContext.getConnectionOffsets().get(each.getDataSourceName()) + 1 : 0;
+
executorContext.getConnectionOffsets().put(each.getDataSourceName(),
connectionOffset);
}
}
- private Enumerable<Object> createMemoryEnumerable(final
SQLStatementContext sqlStatementContext, final ShardingSphereTable table) {
- DatabaseType databaseType =
sqlStatementContext.getSqlStatement().getDatabaseType();
- Optional<DialectDriverQuerySystemCatalogOption>
driverQuerySystemCatalogOption = new
DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getDriverQuerySystemCatalogOption();
- if (driverQuerySystemCatalogOption.isPresent() &&
driverQuerySystemCatalogOption.get().isSystemTable(table.getName())) {
- return
createMemoryEnumerator(MemoryTableStatisticsBuilder.buildTableStatistics(table,
queryContext.getMetaData(), driverQuerySystemCatalogOption.get()), table,
databaseType);
- }
- String databaseName =
sqlStatementContext.getTablesContext().getDatabaseName().orElse(executorContext.getCurrentDatabaseName());
- String schemaName =
sqlStatementContext.getTablesContext().getSchemaName().orElse(executorContext.getCurrentSchemaName());
- Optional<TableStatistics> tableStatistics =
Optional.ofNullable(executorContext.getStatistics().getDatabaseStatistics(databaseName))
- .map(optional ->
optional.getSchemaStatistics(schemaName)).map(optional ->
optional.getTableStatistics(table.getName()));
- return tableStatistics.map(optional ->
createMemoryEnumerator(optional, table,
databaseType)).orElseGet(this::createEmptyEnumerable);
- }
-
- private Enumerable<Object> createMemoryEnumerator(final TableStatistics
tableStatistics, final ShardingSphereTable table, final DatabaseType
databaseType) {
- return new AbstractEnumerable<Object>() {
-
- @Override
- public Enumerator<Object> enumerator() {
- return new MemoryDataRowEnumerator(tableStatistics.getRows(),
table.getAllColumns(), databaseType);
- }
- };
+ private ExecutionGroupContext<JDBCExecutionUnit> prepare(final
ShardingSphereDatabase database, final ExecutionContext executionContext)
throws SQLException {
+ // TODO pass grantee from proxy and jdbc adapter
+ return executorContext.getPrepareEngine().prepare(database.getName(),
executionContext, executorContext.getConnectionOffsets(),
executionContext.getExecutionUnits(),
+ new
ExecutionGroupReportContext(executorContext.getProcessId(),
database.getName()));
}
private Collection<Statement> getStatements(final
Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups) {
@@ -183,10 +195,9 @@ public final class EnumerableScanImplementor implements
ScanImplementor {
private void setParameters(final
Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups) {
for (ExecutionGroup<JDBCExecutionUnit> each : inputGroups) {
for (JDBCExecutionUnit executionUnit : each.getInputs()) {
- if (!(executionUnit.getStorageResource() instanceof
PreparedStatement)) {
- continue;
+ if (executionUnit.getStorageResource() instanceof
PreparedStatement) {
+ setParameters((PreparedStatement)
executionUnit.getStorageResource(),
executionUnit.getExecutionUnit().getSqlUnit().getParameters());
}
- setParameters((PreparedStatement)
executionUnit.getStorageResource(),
executionUnit.getExecutionUnit().getSqlUnit().getParameters());
}
}
}
@@ -197,34 +208,4 @@ public final class EnumerableScanImplementor implements
ScanImplementor {
preparedStatement.setObject(i + 1, params.get(i));
}
}
-
- private QueryContext createQueryContext(final ShardingSphereMetaData
metaData, final ScanImplementorContext sqlString, final DatabaseType
databaseType, final boolean useCache) {
- String sql = sqlString.getSql().replace(System.lineSeparator(), " ");
- SQLStatement sqlStatement =
compilerContext.getSqlParserRule().getSQLParserEngine(databaseType).parse(sql,
useCache);
- List<Object> params = getParameters(sqlString.getParamIndexes());
- HintValueContext hintValueContext = new HintValueContext();
- SQLStatementContext sqlStatementContext = new SQLBindEngine(metaData,
executorContext.getCurrentDatabaseName(), hintValueContext).bind(sqlStatement);
- return new QueryContext(sqlStatementContext, sql, params,
hintValueContext, queryContext.getConnectionContext(), metaData, useCache);
- }
-
- private List<Object> getParameters(final int[] paramIndexes) {
- if (null == paramIndexes) {
- return Collections.emptyList();
- }
- List<Object> result = new ArrayList<>(paramIndexes.length);
- for (int each : paramIndexes) {
- result.add(queryContext.getParameters().get(each));
- }
- return result;
- }
-
- private AbstractEnumerable<Object> createEmptyEnumerable() {
- return new AbstractEnumerable<Object>() {
-
- @Override
- public Enumerator<Object> enumerator() {
- return new EmptyDataRowEnumerator();
- }
- };
- }
}
diff --git
a/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementorTest.java
b/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementorTest.java
new file mode 100644
index 00000000000..c51cccf8122
--- /dev/null
+++
b/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/implementor/EnumerableScanImplementorTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.sqlfederation.executor.enumerable.implementor;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.DialectDatabaseMetaData;
+import
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.option.datatype.DialectDataTypeOption;
+import
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.option.table.DialectDriverQuerySystemCatalogOption;
+import
org.apache.shardingsphere.database.connector.core.metadata.database.system.SystemDatabase;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
+import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.context.statement.type.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
+import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
+import
org.apache.shardingsphere.infra.exception.kernel.connection.SQLExecutionInterruptedException;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.Process;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
+import org.apache.shardingsphere.infra.merge.MergeEngine;
+import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
+import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
+import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
+import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
+import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import
org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
+import
org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext;
+import
org.apache.shardingsphere.sqlfederation.compiler.implementor.ScanImplementorContext;
+import
org.apache.shardingsphere.sqlfederation.executor.context.ExecutorContext;
+import
org.apache.shardingsphere.sqlfederation.executor.enumerable.enumerator.memory.MemoryTableStatisticsBuilder;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class EnumerableScanImplementorTest {
+
+ private final DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
+
+ @Test
+ void assertImplementWithSystemTable() {
+ SelectStatementContext sqlStatementContext =
mock(SelectStatementContext.class, RETURNS_DEEP_STUBS);
+
when(sqlStatementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
+
when(sqlStatementContext.getTablesContext().getSchemaNames()).thenReturn(Collections.singleton("pg_catalog"));
+ QueryContext queryContext = mock(QueryContext.class);
+
when(queryContext.getSqlStatementContext()).thenReturn(sqlStatementContext);
+ ShardingSphereMetaData metaData = mock(ShardingSphereMetaData.class);
+ when(queryContext.getMetaData()).thenReturn(metaData);
+
when(queryContext.getConnectionContext()).thenReturn(mock(ConnectionContext.class));
+ ShardingSphereTable table = mock(ShardingSphereTable.class);
+ when(table.getName()).thenReturn("pg_database");
+ when(table.getAllColumns()).thenReturn(Collections.singleton(new
ShardingSphereColumn("datname", Types.VARCHAR, true, false, false, false, true,
false)));
+ TableStatistics tableStatistics = mock(TableStatistics.class);
+
when(tableStatistics.getRows()).thenReturn(Collections.singletonList(new
RowStatistics(Collections.singletonList("foo_db"))));
+ DialectDriverQuerySystemCatalogOption driverOption =
mock(DialectDriverQuerySystemCatalogOption.class);
+ when(driverOption.isSystemTable("pg_database")).thenReturn(true);
+ DialectDatabaseMetaData dialectDatabaseMetaData =
mock(DialectDatabaseMetaData.class);
+
when(dialectDatabaseMetaData.getDriverQuerySystemCatalogOption()).thenReturn(Optional.of(driverOption));
+ DialectDataTypeOption dataTypeOption =
mock(DialectDataTypeOption.class);
+ when(dataTypeOption.findExtraSQLTypeClass(anyInt(),
anyBoolean())).thenReturn(Optional.empty());
+
when(dialectDatabaseMetaData.getDataTypeOption()).thenReturn(dataTypeOption);
+ try (
+ MockedConstruction<SystemDatabase> mockedSystemDatabase =
mockConstruction(SystemDatabase.class,
+ (constructed, context) ->
when(constructed.getSystemSchemas()).thenReturn(Collections.singletonList("pg_catalog")));
+ MockedConstruction<DatabaseTypeRegistry> mockedTypeRegistry =
mockConstruction(DatabaseTypeRegistry.class,
+ (constructed, context) ->
when(constructed.getDialectDatabaseMetaData()).thenReturn(dialectDatabaseMetaData));
+ MockedStatic<MemoryTableStatisticsBuilder>
memoryBuilderMockedStatic = mockStatic(MemoryTableStatisticsBuilder.class)) {
+ memoryBuilderMockedStatic.when(() ->
MemoryTableStatisticsBuilder.buildTableStatistics(table, metaData,
driverOption)).thenReturn(tableStatistics);
+ Enumerable<Object> enumerable = new
EnumerableScanImplementor(queryContext, mock(), mock()).implement(table, new
ScanImplementorContext(mock(), "SELECT datname FROM pg_database", null));
+ try (Enumerator<Object> actual = enumerable.enumerator()) {
+ assertTrue(actual.moveNext());
+ assertThat(((Object[]) actual.current())[0], is("foo_db"));
+ }
+ assertFalse(mockedSystemDatabase.constructed().isEmpty());
+ assertFalse(mockedTypeRegistry.constructed().isEmpty());
+ }
+ }
+
+ @Test
+ void assertImplementWithDriverOptionNotSystemTable() {
+ SelectStatementContext sqlStatementContext =
mock(SelectStatementContext.class, RETURNS_DEEP_STUBS);
+
when(sqlStatementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
+
when(sqlStatementContext.getTablesContext().getDatabaseName()).thenReturn(Optional.of("foo_db"));
+
when(sqlStatementContext.getTablesContext().getSchemaNames()).thenReturn(Collections.singleton("pg_catalog"));
+
when(sqlStatementContext.getTablesContext().getSchemaName()).thenReturn(Optional.of("pg_catalog"));
+ QueryContext queryContext = mock(QueryContext.class);
+
when(queryContext.getSqlStatementContext()).thenReturn(sqlStatementContext);
+
when(queryContext.getConnectionContext()).thenReturn(mock(ConnectionContext.class));
+ ShardingSphereTable table = mock(ShardingSphereTable.class);
+ when(table.getName()).thenReturn("custom_stats");
+ when(table.getAllColumns()).thenReturn(Collections.singleton(new
ShardingSphereColumn("id", Types.INTEGER, true, false, false, false, true,
false)));
+ ExecutorContext executorContext = mock(ExecutorContext.class);
+ when(executorContext.getCurrentDatabaseName()).thenReturn("foo_db");
+ when(executorContext.getCurrentSchemaName()).thenReturn("pg_catalog");
+ ShardingSphereStatistics statistics =
mock(ShardingSphereStatistics.class, RETURNS_DEEP_STUBS);
+ TableStatistics tableStatistics = mock(TableStatistics.class);
+
when(tableStatistics.getRows()).thenReturn(Collections.singletonList(new
RowStatistics(Collections.singletonList(2))));
+
when(statistics.getDatabaseStatistics("foo_db").getSchemaStatistics("pg_catalog").getTableStatistics("custom_stats")).thenReturn(tableStatistics);
+ when(executorContext.getStatistics()).thenReturn(statistics);
+ DialectDatabaseMetaData dialectDatabaseMetaData =
mock(DialectDatabaseMetaData.class);
+
when(dialectDatabaseMetaData.getDriverQuerySystemCatalogOption()).thenReturn(Optional.of(mock(DialectDriverQuerySystemCatalogOption.class)));
+ DialectDataTypeOption dataTypeOption =
mock(DialectDataTypeOption.class);
+ when(dataTypeOption.findExtraSQLTypeClass(anyInt(),
anyBoolean())).thenReturn(Optional.empty());
+
when(dialectDatabaseMetaData.getDataTypeOption()).thenReturn(dataTypeOption);
+ try (
+ MockedConstruction<SystemDatabase> mockedSystemDatabase =
mockConstruction(SystemDatabase.class,
+ (constructed, context) ->
when(constructed.getSystemSchemas()).thenReturn(Collections.singletonList("pg_catalog")));
+ MockedConstruction<DatabaseTypeRegistry> mockedTypeRegistry =
mockConstruction(DatabaseTypeRegistry.class,
+ (constructed, context) ->
when(constructed.getDialectDatabaseMetaData()).thenReturn(dialectDatabaseMetaData)))
{
+ Enumerable<Object> enumerable = new
EnumerableScanImplementor(queryContext, mock(), executorContext)
+ .implement(table, new ScanImplementorContext(mock(),
"SELECT id FROM custom_stats", null));
+ try (Enumerator<Object> actual = enumerable.enumerator()) {
+ assertTrue(actual.moveNext());
+ assertThat(((Object[]) actual.current())[0], is(2));
+ }
+ assertFalse(mockedSystemDatabase.constructed().isEmpty());
+ assertFalse(mockedTypeRegistry.constructed().isEmpty());
+ }
+ }
+
+ @Test
+ void assertImplementWithDriverOptionButNonSystemTable() {
+ SelectStatementContext sqlStatementContext =
mock(SelectStatementContext.class, RETURNS_DEEP_STUBS);
+
when(sqlStatementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
+
when(sqlStatementContext.getTablesContext().getDatabaseName()).thenReturn(Optional.of("foo_db"));
+
when(sqlStatementContext.getTablesContext().getSchemaNames()).thenReturn(Collections.singleton("pg_catalog"));
+
when(sqlStatementContext.getTablesContext().getSchemaName()).thenReturn(Optional.of("pg_catalog"));
+ QueryContext queryContext = mock(QueryContext.class);
+
when(queryContext.getSqlStatementContext()).thenReturn(sqlStatementContext);
+ ShardingSphereTable table = mock(ShardingSphereTable.class);
+ when(table.getName()).thenReturn("non_system");
+ when(table.getAllColumns()).thenReturn(Collections.singleton(new
ShardingSphereColumn("id", Types.INTEGER, true, false, false, false, true,
false)));
+ ExecutorContext executorContext = mock(ExecutorContext.class);
+ when(executorContext.getCurrentDatabaseName()).thenReturn("foo_db");
+ when(executorContext.getCurrentSchemaName()).thenReturn("pg_catalog");
+ ShardingSphereStatistics statistics =
mock(ShardingSphereStatistics.class, RETURNS_DEEP_STUBS);
+ TableStatistics tableStatistics = mock(TableStatistics.class);
+
when(tableStatistics.getRows()).thenReturn(Collections.singletonList(new
RowStatistics(Collections.singletonList(1))));
+
when(statistics.getDatabaseStatistics("foo_db").getSchemaStatistics("pg_catalog").getTableStatistics("non_system")).thenReturn(tableStatistics);
+ when(executorContext.getStatistics()).thenReturn(statistics);
+ try (
+ MockedConstruction<SystemDatabase> ignored =
mockConstruction(SystemDatabase.class,
+ (constructed, context) ->
when(constructed.getSystemSchemas()).thenReturn(Collections.singletonList("pg_catalog"))))
{
+ Enumerable<Object> enumerable = new
EnumerableScanImplementor(queryContext, mock(),
executorContext).implement(table, new ScanImplementorContext(mock(), "SELECT
1", null));
+ try (Enumerator<Object> actual = enumerable.enumerator()) {
+ assertTrue(actual.moveNext());
+ }
+ }
+ }
+
+ @Test
+ void assertImplementWithNonSystemSchema() {
+ SQLStatementContext sqlStatementContext =
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
+
when(sqlStatementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
+
when(sqlStatementContext.getTablesContext().getSchemaNames()).thenReturn(Collections.singletonList("custom_schema"));
+ QueryContext queryContext = mock(QueryContext.class,
RETURNS_DEEP_STUBS);
+
when(queryContext.getSqlStatementContext()).thenReturn(sqlStatementContext);
+
when(queryContext.getParameters()).thenReturn(Collections.singletonList("param_0"));
+ SQLStatement sqlStatement = mock(SQLStatement.class);
+ CompilerContext compilerContext = mock(CompilerContext.class,
RETURNS_DEEP_STUBS);
+
when(compilerContext.getSqlParserRule().getSQLParserEngine(databaseType).parse("SELECT
1", false)).thenReturn(sqlStatement);
+ ExecutorContext executorContext = mock(ExecutorContext.class);
+ when(executorContext.isPreview()).thenReturn(true);
+ when(executorContext.getPreviewExecutionUnits()).thenReturn(new
LinkedList<>());
+ when(executorContext.getCurrentDatabaseName()).thenReturn("foo_db");
+ ExecutionUnit executionUnit = new ExecutionUnit("ds_0", new
SQLUnit("SELECT 1", Collections.emptyList()));
+ ExecutionContext executionContext = mock(ExecutionContext.class);
+
when(executionContext.getExecutionUnits()).thenReturn(Collections.singleton(executionUnit));
+ SQLStatementContext boundStatementContext =
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
+
when(boundStatementContext.getTablesContext().getDatabaseNames()).thenReturn(Collections.singletonList("foo_db"));
+ when(boundStatementContext.getSqlStatement()).thenReturn(sqlStatement);
+ try (
+ MockedConstruction<SystemDatabase> mockedSystemDatabase =
mockConstruction(SystemDatabase.class,
+ (constructed, context) ->
when(constructed.getSystemSchemas()).thenReturn(Collections.singletonList("pg_catalog")));
+ MockedConstruction<SQLBindEngine> ignoredSQLBindEngine =
mockConstruction(SQLBindEngine.class,
+ (constructed, context) ->
when(constructed.bind(sqlStatement)).thenReturn(boundStatementContext));
+ MockedConstruction<KernelProcessor> ignoredKernelProcessor =
mockConstruction(KernelProcessor.class,
+ (constructed, context) ->
when(constructed.generateExecutionContext(any(), any(),
any())).thenReturn(executionContext))) {
+ Enumerable<Object> enumerable = new
EnumerableScanImplementor(queryContext, compilerContext, executorContext)
+ .implement(mock(ShardingSphereTable.class), new
ScanImplementorContext(mock(), "SELECT 1", null));
+ assertThat(executorContext.getPreviewExecutionUnits(),
is(Collections.singletonList(executionUnit)));
+ try (Enumerator<Object> actual = enumerable.enumerator()) {
+ assertFalse(actual.moveNext());
+ }
+ assertFalse(mockedSystemDatabase.constructed().isEmpty());
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Test
+ void assertImplementWithJDBCEnumerable() throws SQLException {
+ SQLStatementContext sqlStatementContext =
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
+
when(sqlStatementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
+ QueryContext queryContext = mock(QueryContext.class);
+
when(queryContext.getSqlStatementContext()).thenReturn(sqlStatementContext);
+ when(queryContext.isUseCache()).thenReturn(true);
+ ShardingSphereMetaData metaData = mock(ShardingSphereMetaData.class);
+ ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
+ when(database.getName()).thenReturn("foo_db");
+ when(metaData.getDatabase("foo_db")).thenReturn(database);
+
when(metaData.getGlobalRuleMetaData()).thenReturn(mock(RuleMetaData.class));
+ when(queryContext.getMetaData()).thenReturn(metaData);
+
when(queryContext.getParameters()).thenReturn(Collections.singletonList("param_0"));
+ SQLStatement sqlStatement = mock(SQLStatement.class);
+ CompilerContext compilerContext = mock(CompilerContext.class,
RETURNS_DEEP_STUBS);
+
when(compilerContext.getSqlParserRule().getSQLParserEngine(databaseType).parse("SELECT
? FROM tbl", true)).thenReturn(sqlStatement);
+ SQLStatementContext boundStatementContext =
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
+
when(boundStatementContext.getTablesContext().getDatabaseNames()).thenReturn(Collections.singletonList("foo_db"));
+ when(boundStatementContext.getSqlStatement()).thenReturn(sqlStatement);
+ ExecutorContext executorContext = mock(ExecutorContext.class);
+ Map<String, Integer> connectionOffsets = new LinkedHashMap<>();
+
when(executorContext.getConnectionOffsets()).thenReturn(connectionOffsets);
+ when(executorContext.getCurrentDatabaseName()).thenReturn("foo_db");
+ when(executorContext.getProcessId()).thenReturn("process_id");
+ DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = mock(DriverExecutionPrepareEngine.class);
+ when(executorContext.getPrepareEngine()).thenReturn(prepareEngine);
+ JDBCExecutor jdbcExecutor = mock(JDBCExecutor.class);
+ when(executorContext.getJdbcExecutor()).thenReturn(jdbcExecutor);
+ JDBCExecutorCallback<QueryResult> queryCallback =
(JDBCExecutorCallback<QueryResult>) mock(JDBCExecutorCallback.class);
+
when(executorContext.getQueryCallback()).thenReturn((JDBCExecutorCallback)
queryCallback);
+ ExecutionUnit executionUnit = new ExecutionUnit("ds_0", new
SQLUnit("SELECT 1", Collections.emptyList()));
+ ExecutionContext executionContext = mock(ExecutionContext.class);
+
when(executionContext.getExecutionUnits()).thenReturn(Arrays.asList(executionUnit,
new ExecutionUnit("ds_0", new SQLUnit("SELECT 2", Collections.emptyList()))));
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ JDBCExecutionUnit jdbcExecutionUnit = new
JDBCExecutionUnit(executionUnit, ConnectionMode.MEMORY_STRICTLY, mock());
+ JDBCExecutionUnit jdbcPreparedExecutionUnit = new
JDBCExecutionUnit(new ExecutionUnit("ds_0", new SQLUnit("SELECT ?",
Collections.singletonList("bar_param"))),
+ ConnectionMode.CONNECTION_STRICTLY, preparedStatement);
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = new
ExecutionGroupContext<>(
+ Collections.singleton(new
ExecutionGroup<>(Arrays.asList(jdbcExecutionUnit, jdbcPreparedExecutionUnit))),
new ExecutionGroupReportContext("process_id", "foo_db"));
+ doAnswer(invocation ->
executionGroupContext).when(prepareEngine).prepare(anyString(), any(),
anyMap(), anyCollection(), any());
+ when(jdbcExecutor.execute(executionGroupContext,
queryCallback)).thenReturn(Collections.singletonList(mock(QueryResult.class)));
+ ProcessRegistry.getInstance().add(new Process(new
ExecutionGroupContext<>(Collections.emptyList(), new
ExecutionGroupReportContext("process_id", "foo_db"))));
+ ScanImplementorContext scanContext = new
ScanImplementorContext(mock(DataContext.class), "SELECT ? FROM tbl", new
int[]{0});
+ ShardingSphereTable table = mock(ShardingSphereTable.class,
RETURNS_DEEP_STUBS);
+ when(table.getAllColumns()).thenReturn(Collections.singleton(new
ShardingSphereColumn("id", Types.INTEGER, true, false, false, false, true,
false)));
+ try (
+ MockedConstruction<SQLBindEngine> ignoredSQLBindEngine =
mockConstruction(SQLBindEngine.class,
+ (constructed, context) ->
when(constructed.bind(sqlStatement)).thenReturn(boundStatementContext));
+ MockedConstruction<KernelProcessor> ignoredKernelProcessor =
mockConstruction(KernelProcessor.class,
+ (constructed, context) ->
when(constructed.generateExecutionContext(any(), any(),
any())).thenReturn(executionContext));
+ MockedConstruction<MergeEngine> mergeEngineMockedConstruction
= mockConstruction(MergeEngine.class,
+ (constructed, context) ->
when(constructed.merge(anyList(),
any(QueryContext.class))).thenReturn(mock(MergedResult.class)))) {
+ Enumerable<Object> enumerable = new
EnumerableScanImplementor(queryContext, compilerContext,
executorContext).implement(table, scanContext);
+ try (Enumerator<Object> ignored = enumerable.enumerator()) {
+ assertThat(connectionOffsets.get("ds_0"), is(1));
+ verify(preparedStatement).setObject(1, "bar_param");
+
assertFalse(mergeEngineMockedConstruction.constructed().isEmpty());
+ }
+ } finally {
+ ProcessRegistry.getInstance().remove("process_id");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ void assertImplementWithInterruptedProcess() throws SQLException {
+ SQLStatementContext sqlStatementContext =
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
+
when(sqlStatementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
+
when(sqlStatementContext.getTablesContext().getSchemaNames()).thenReturn(Collections.emptyList());
+ QueryContext queryContext = mock(QueryContext.class,
RETURNS_DEEP_STUBS);
+
when(queryContext.getSqlStatementContext()).thenReturn(sqlStatementContext);
+ when(queryContext.isUseCache()).thenReturn(true);
+
when(queryContext.getParameters()).thenReturn(Collections.singletonList("param_0"));
+ CompilerContext compilerContext = mock(CompilerContext.class,
RETURNS_DEEP_STUBS);
+ SQLStatement sqlStatement = mock(SQLStatement.class);
+
when(compilerContext.getSqlParserRule().getSQLParserEngine(databaseType).parse("SELECT
? FROM tbl", true)).thenReturn(sqlStatement);
+ SQLStatementContext boundStatementContext =
mock(SQLStatementContext.class, RETURNS_DEEP_STUBS);
+
when(boundStatementContext.getTablesContext().getDatabaseNames()).thenReturn(Collections.singletonList("foo_db"));
+ when(boundStatementContext.getSqlStatement()).thenReturn(sqlStatement);
+ ExecutorContext executorContext = mock(ExecutorContext.class);
+ when(executorContext.getConnectionOffsets()).thenReturn(new
HashMap<>());
+ when(executorContext.getCurrentDatabaseName()).thenReturn("foo_db");
+ when(executorContext.getProcessId()).thenReturn("process_interrupted");
+ ExecutionUnit executionUnit = new ExecutionUnit("ds_0", new
SQLUnit("SELECT ?", Collections.singletonList("bar_param")));
+ ExecutionContext executionContext = mock(ExecutionContext.class);
+
when(executionContext.getExecutionUnits()).thenReturn(Collections.singleton(executionUnit));
+ JDBCExecutionUnit jdbcExecutionUnit = new
JDBCExecutionUnit(executionUnit, ConnectionMode.CONNECTION_STRICTLY, mock());
+ ExecutionGroup<JDBCExecutionUnit> executionGroup = new
ExecutionGroup<>(Collections.singletonList(jdbcExecutionUnit));
+ ExecutionGroupReportContext reportContext = new
ExecutionGroupReportContext("process_interrupted", "foo_db");
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = new
ExecutionGroupContext<>(Collections.singleton(executionGroup), reportContext);
+ DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = mock(DriverExecutionPrepareEngine.class);
+ when(prepareEngine.prepare(any(), any(), anyMap(), anyCollection(),
any())).thenReturn(executionGroupContext);
+ when(executorContext.getPrepareEngine()).thenReturn(prepareEngine);
+ ScanImplementorContext scanContext = new
ScanImplementorContext(mock(DataContext.class), "SELECT ? FROM tbl", new
int[]{0});
+ ShardingSphereTable table = mock(ShardingSphereTable.class,
RETURNS_DEEP_STUBS);
+ when(table.getAllColumns()).thenReturn(Collections.singleton(new
ShardingSphereColumn("id", Types.INTEGER, true, false, false, false, true,
false)));
+ ProcessRegistry processRegistry = mock(ProcessRegistry.class);
+ Process interruptedProcess = new Process(executionGroupContext);
+ interruptedProcess.setInterrupted(true);
+
when(processRegistry.get("process_interrupted")).thenReturn(interruptedProcess);
+ try (
+ MockedStatic<ProcessRegistry> mockedStatic =
mockStatic(ProcessRegistry.class);
+ MockedConstruction<SQLBindEngine> ignoredSQLBindEngine =
mockConstruction(SQLBindEngine.class,
+ (constructed, context) ->
when(constructed.bind(sqlStatement)).thenReturn(boundStatementContext));
+ MockedConstruction<KernelProcessor> ignoredKernelProcessor =
mockConstruction(KernelProcessor.class,
+ (constructed, context) ->
when(constructed.generateExecutionContext(any(), any(),
any())).thenReturn(executionContext))) {
+
mockedStatic.when(ProcessRegistry::getInstance).thenReturn(processRegistry);
+ Enumerable<Object> enumerable = new
EnumerableScanImplementor(queryContext, compilerContext,
executorContext).implement(table, scanContext);
+ assertThrows(SQLExecutionInterruptedException.class,
enumerable::enumerator);
+ }
+ }
+}
diff --git
a/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/executor/EnumerableScanImplementorTest.java
b/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/executor/EnumerableScanImplementorTest.java
deleted file mode 100644
index a310e63c019..00000000000
---
a/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/executor/EnumerableScanImplementorTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.sqlfederation.executor.executor;
-
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
-import
org.apache.shardingsphere.infra.binder.context.statement.type.dml.SelectStatementContext;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.metadata.statistics.DatabaseStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.SchemaStatistics;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
-import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
-import org.apache.shardingsphere.infra.session.query.QueryContext;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import
org.apache.shardingsphere.sqlfederation.compiler.context.CompilerContext;
-import
org.apache.shardingsphere.sqlfederation.compiler.implementor.ScanImplementorContext;
-import
org.apache.shardingsphere.sqlfederation.executor.context.ExecutorContext;
-import
org.apache.shardingsphere.sqlfederation.executor.enumerable.implementor.EnumerableScanImplementor;
-import org.junit.jupiter.api.Test;
-
-import java.sql.Types;
-import java.util.Collections;
-import java.util.Optional;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.isA;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-class EnumerableScanImplementorTest {
-
- @Test
- void assertImplementWithStatistics() {
- CompilerContext compilerContext = mock(CompilerContext.class,
RETURNS_DEEP_STUBS);
- ExecutorContext executorContext = mock(ExecutorContext.class);
- when(executorContext.getCurrentDatabaseName()).thenReturn("foo_db");
- when(executorContext.getCurrentSchemaName()).thenReturn("pg_catalog");
- ShardingSphereStatistics statistics = mockStatistics();
- when(executorContext.getStatistics()).thenReturn(statistics);
- ShardingSphereTable table = mock(ShardingSphereTable.class,
RETURNS_DEEP_STUBS);
- when(table.getName()).thenReturn("test");
- when(table.getAllColumns()).thenReturn(Collections.singleton(new
ShardingSphereColumn("id", Types.INTEGER, true, false, false, false, true,
false)));
- QueryContext queryContext = mock(QueryContext.class,
RETURNS_DEEP_STUBS);
- SelectStatementContext selectStatementContext =
mockSelectStatementContext();
-
when(queryContext.getSqlStatementContext()).thenReturn(selectStatementContext);
- Enumerable<Object> enumerable = new
EnumerableScanImplementor(queryContext, compilerContext,
executorContext).implement(table, mock(ScanImplementorContext.class));
- try (Enumerator<Object> actual = enumerable.enumerator()) {
- actual.moveNext();
- Object row = actual.current();
- assertThat(row, isA(Object[].class));
- assertThat(((Object[]) row)[0], is(1));
- }
- }
-
- private ShardingSphereStatistics mockStatistics() {
- ShardingSphereStatistics result = mock(ShardingSphereStatistics.class,
RETURNS_DEEP_STUBS);
- DatabaseStatistics databaseStatistics = mock(DatabaseStatistics.class,
RETURNS_DEEP_STUBS);
-
when(result.getDatabaseStatistics("foo_db")).thenReturn(databaseStatistics);
- SchemaStatistics schemaStatistics = mock(SchemaStatistics.class,
RETURNS_DEEP_STUBS);
-
when(databaseStatistics.getSchemaStatistics("pg_catalog")).thenReturn(schemaStatistics);
- TableStatistics tableStatistics = mock(TableStatistics.class);
-
when(tableStatistics.getRows()).thenReturn(Collections.singletonList(new
RowStatistics(Collections.singletonList(1))));
-
when(schemaStatistics.getTableStatistics("test")).thenReturn(tableStatistics);
- return result;
- }
-
- private SelectStatementContext mockSelectStatementContext() {
- SelectStatementContext result = mock(SelectStatementContext.class,
RETURNS_DEEP_STUBS);
-
when(result.getSqlStatement().getDatabaseType()).thenReturn(TypedSPILoader.getService(DatabaseType.class,
"PostgreSQL"));
-
when(result.getTablesContext().getSchemaNames()).thenReturn(Collections.singletonList("pg_catalog"));
-
when(result.getTablesContext().getDatabaseName()).thenReturn(Optional.of("foo_db"));
-
when(result.getTablesContext().getSchemaName()).thenReturn(Optional.of("pg_catalog"));
- return result;
- }
-}