This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ef0031b5027 Refactor RouteSQLRewriteEngine (#37418)
ef0031b5027 is described below
commit ef0031b5027b42c50d8a3357bfdedfec63b8e8f6
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Dec 18 01:01:32 2025 +0800
Refactor RouteSQLRewriteEngine (#37418)
* Refactor RouteSQLRewriteEngine
* Refactor RouteSQLRewriteEngine
* Refactor RouteSQLRewriteEngine
* Refactor RouteSQLRewriteEngine
* Refactor RouteSQLRewriteEngine
---
RELEASE-NOTES.md | 1 +
.../connection/kernel/KernelProcessorTest.java | 2 ++
.../rewrite/engine/RouteSQLRewriteEngine.java | 42 +++++++++++-----------
.../infra/rewrite/SQLRewriteEntryTest.java | 1 +
.../query/MySQLComQueryPacketExecutorTest.java | 7 ++--
...ySQLMultiStatementsProxyBackendHandlerTest.java | 3 ++
...egatedBatchedStatementsCommandExecutorTest.java | 3 ++
.../PostgreSQLBatchedStatementsExecutorTest.java | 5 ++-
.../PostgreSQLComDescribeExecutorTest.java | 5 +--
9 files changed, 42 insertions(+), 27 deletions(-)
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 85d5496bd60..95ba9477d4e 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -44,6 +44,7 @@
1. SQL Binder: Add ALTER TABLE metadata check -
[#35877](https://github.com/apache/shardingsphere/pull/35877)
1. SQL Router: Add SELECT with UNION ALL routing to multi data sources check -
[#35037](https://github.com/apache/shardingsphere/pull/35037)
1. SQL Router: Improve support for executing tableless SQL with single data
source - [#35659](https://github.com/apache/shardingsphere/pull/35659)
+1. SQL Router: Add `max-union-size-per-datasource` property to batch UNION ALL
rewrite per data source and keep parallel execution -
[#37405](https://github.com/apache/shardingsphere/pull/37405)
1. DistSQL: Add job sharding nodes info to the query results of `SHOW
MIGRATION LIST` - [#35053](https://github.com/apache/shardingsphere/pull/35053)
1. DistSQL: Add DistSQL for query storage units which used in single rule -
[#35131](https://github.com/apache/shardingsphere/pull/35131)
1. Proxy: Implement write bool binary data type for PostgreSQL protocol -
[#35831](https://github.com/apache/shardingsphere/pull/35831)
diff --git
a/infra/context/src/test/java/org/apache/shardingsphere/infra/connection/kernel/KernelProcessorTest.java
b/infra/context/src/test/java/org/apache/shardingsphere/infra/connection/kernel/KernelProcessorTest.java
index 5fc5d68328b..18537b2210c 100644
---
a/infra/context/src/test/java/org/apache/shardingsphere/infra/connection/kernel/KernelProcessorTest.java
+++
b/infra/context/src/test/java/org/apache/shardingsphere/infra/connection/kernel/KernelProcessorTest.java
@@ -47,6 +47,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
+import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -70,6 +71,7 @@ class KernelProcessorTest {
ShardingSphereDatabase database = new ShardingSphereDatabase("foo_db",
databaseType, resourceMetaData, new RuleMetaData(mockRules()),
Collections.emptyList());
when(metaData.containsDatabase("foo_db")).thenReturn(true);
when(metaData.getDatabase("foo_db")).thenReturn(database);
+ when(metaData.getProps()).thenReturn(new ConfigurationProperties(new
Properties()));
QueryContext queryContext = new QueryContext(sqlStatementContext,
"SELECT * FROM tbl", Collections.emptyList(), new HintValueContext(),
connectionContext, metaData);
ConfigurationProperties props = new
ConfigurationProperties(PropertiesBuilder.build(new
Property(ConfigurationPropertyKey.SQL_SHOW.getKey(), Boolean.TRUE.toString())));
ExecutionContext actual = new
KernelProcessor().generateExecutionContext(queryContext, new
RuleMetaData(mockRules()), props);
diff --git
a/infra/rewrite/core/src/main/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngine.java
b/infra/rewrite/core/src/main/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngine.java
index 0b6226497fc..9862e753361 100644
---
a/infra/rewrite/core/src/main/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngine.java
+++
b/infra/rewrite/core/src/main/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngine.java
@@ -74,9 +74,7 @@ public final class RouteSQLRewriteEngine {
* @return SQL rewrite result
*/
public RouteSQLRewriteResult rewrite(final SQLRewriteContext
sqlRewriteContext, final RouteContext routeContext, final QueryContext
queryContext) {
- int maxUnionSizePerDataSource =
Optional.ofNullable(queryContext.getMetaData().getProps())
- .map(props ->
props.<Integer>getValue(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE))
-
.orElse(Integer.parseInt(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE.getDefaultValue()));
+ int maxUnionSizePerDataSource =
queryContext.getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE);
return new RouteSQLRewriteResult(translate(queryContext,
createSQLRewriteUnits(sqlRewriteContext, routeContext,
maxUnionSizePerDataSource)));
}
@@ -95,25 +93,6 @@ public final class RouteSQLRewriteEngine {
return result;
}
- private void createAggregatedRewriteUnits(final SQLRewriteContext
sqlRewriteContext, final RouteContext routeContext,
- final List<RouteUnit>
routeUnits, final int maxUnionSizePerDataSource, final Map<RouteUnit,
SQLRewriteUnit> result) {
- if (routeUnits.size() <= maxUnionSizePerDataSource) {
-
result.put(routeUnits.get(ThreadLocalRandom.current().nextInt(routeUnits.size())),
createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits));
- } else {
- for (List<RouteUnit> batch : partitionRouteUnits(routeUnits,
maxUnionSizePerDataSource)) {
-
result.put(batch.get(ThreadLocalRandom.current().nextInt(batch.size())),
createSQLRewriteUnit(sqlRewriteContext, routeContext, batch));
- }
- }
- }
-
- private List<List<RouteUnit>> partitionRouteUnits(final List<RouteUnit>
routeUnits, final int batchSize) {
- List<List<RouteUnit>> result = new ArrayList<>();
- for (int i = 0; i < routeUnits.size(); i += batchSize) {
- result.add(routeUnits.subList(i, Math.min(i + batchSize,
routeUnits.size())));
- }
- return result;
- }
-
private Map<String, List<RouteUnit>> aggregateRouteUnitGroups(final
Collection<RouteUnit> routeUnits) {
Map<String, List<RouteUnit>> result = new
LinkedHashMap<>(routeUnits.size(), 1F);
for (RouteUnit each : routeUnits) {
@@ -135,6 +114,25 @@ public final class RouteSQLRewriteEngine {
return result;
}
+ private void createAggregatedRewriteUnits(final SQLRewriteContext
sqlRewriteContext, final RouteContext routeContext,
+ final List<RouteUnit>
routeUnits, final int maxUnionSizePerDataSource, final Map<RouteUnit,
SQLRewriteUnit> sqlRewriteUnits) {
+ if (routeUnits.size() <= maxUnionSizePerDataSource) {
+
sqlRewriteUnits.put(routeUnits.get(ThreadLocalRandom.current().nextInt(routeUnits.size())),
createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits));
+ } else {
+ for (List<RouteUnit> batch : partitionRouteUnits(routeUnits,
maxUnionSizePerDataSource)) {
+
sqlRewriteUnits.put(batch.get(ThreadLocalRandom.current().nextInt(batch.size())),
createSQLRewriteUnit(sqlRewriteContext, routeContext, batch));
+ }
+ }
+ }
+
+ private List<List<RouteUnit>> partitionRouteUnits(final List<RouteUnit>
routeUnits, final int batchSize) {
+ List<List<RouteUnit>> result = new ArrayList<>();
+ for (int i = 0; i < routeUnits.size(); i += batchSize) {
+ result.add(routeUnits.subList(i, Math.min(i + batchSize,
routeUnits.size())));
+ }
+ return result;
+ }
+
private SQLRewriteUnit createSQLRewriteUnit(final SQLRewriteContext
sqlRewriteContext, final RouteContext routeContext, final Collection<RouteUnit>
routeUnits) {
Collection<String> sql = new LinkedList<>();
List<Object> params = new LinkedList<>();
diff --git
a/infra/rewrite/core/src/test/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntryTest.java
b/infra/rewrite/core/src/test/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntryTest.java
index d39b68f4754..5cd0c85a61b 100644
---
a/infra/rewrite/core/src/test/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntryTest.java
+++
b/infra/rewrite/core/src/test/java/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntryTest.java
@@ -82,6 +82,7 @@ class SQLRewriteEntryTest {
when(sqlStatementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
when(result.getSqlStatementContext()).thenReturn(sqlStatementContext);
when(result.getHintValueContext()).thenReturn(new HintValueContext());
+ when(result.getMetaData().getProps()).thenReturn(new
ConfigurationProperties(new Properties()));
return result;
}
diff --git
a/proxy/frontend/dialect/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutorTest.java
b/proxy/frontend/dialect/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutorTest.java
index a4e23a9c707..d4e834b29b8 100644
---
a/proxy/frontend/dialect/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutorTest.java
+++
b/proxy/frontend/dialect/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutorTest.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.database.protocol.mysql.packet.command.query.te
import
org.apache.shardingsphere.database.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
import
org.apache.shardingsphere.database.protocol.mysql.packet.generic.MySQLOKPacket;
import org.apache.shardingsphere.database.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
@@ -65,6 +66,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -164,8 +166,9 @@ class MySQLComQueryPacketExecutorTest {
RuleMetaData globalRuleMetaData = new RuleMetaData(
Arrays.asList(new SQLParserRule(new
DefaultSQLParserRuleConfigurationBuilder().build()), new SQLTranslatorRule(new
DefaultSQLTranslatorRuleConfigurationBuilder().build())));
when(result.getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
-
when(result.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE)).thenReturn(1);
-
when(result.getMetaData().getProps().<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)).thenReturn(false);
+ Properties props = new Properties();
+
props.setProperty(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE.getKey(), "1");
+ when(result.getMetaData().getProps()).thenReturn(new
ConfigurationProperties(props));
when(result.getMetaData().getDatabase("foo_db")).thenReturn(createDatabase());
when(result.getMetaData().containsDatabase("foo_db")).thenReturn(true);
return result;
diff --git
a/proxy/frontend/dialect/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsProxyBackendHandlerTest.java
b/proxy/frontend/dialect/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsProxyBackendHandlerTest.java
index fbc0beb40f6..37f187bda99 100644
---
a/proxy/frontend/dialect/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsProxyBackendHandlerTest.java
+++
b/proxy/frontend/dialect/mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsProxyBackendHandlerTest.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.proxy.frontend.mysql.command.query.text.query;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -54,6 +55,7 @@ import java.sql.Types;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -191,6 +193,7 @@ class MySQLMultiStatementsProxyBackendHandlerTest {
when(result.getMetaDataContexts().getMetaData().containsDatabase("foo_db")).thenReturn(true);
when(result.getMetaDataContexts().getMetaData().getDatabase("foo_db").containsSchema("foo_db")).thenReturn(true);
when(result.getMetaDataContexts().getMetaData().getDatabase("foo_db").getSchema("foo_db").containsTable("t")).thenReturn(true);
+
when(result.getMetaDataContexts().getMetaData().getProps()).thenReturn(new
ConfigurationProperties(new Properties()));
return result;
}
}
diff --git
a/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
b/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
index d5a86190b9d..55584203c17 100644
---
a/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
+++
b/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
@@ -29,6 +29,7 @@ import
org.apache.shardingsphere.database.protocol.postgresql.packet.command.que
import
org.apache.shardingsphere.database.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.type.dml.InsertStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
@@ -64,6 +65,7 @@ import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -166,6 +168,7 @@ class
PostgreSQLAggregatedBatchedStatementsCommandExecutorTest {
when(database.getSchema("public").containsTable("t_order")).thenReturn(true);
when(result.getMetaDataContexts().getMetaData().getDatabase("foo_db")).thenReturn(database);
when(result.getMetaDataContexts().getMetaData().containsDatabase("foo_db")).thenReturn(true);
+
when(result.getMetaDataContexts().getMetaData().getProps()).thenReturn(new
ConfigurationProperties(new Properties()));
when(database.getSchema("public").getTable("t_order").getAllColumns())
.thenReturn(Collections.singleton(new
ShardingSphereColumn("id", Types.VARCHAR, false, false, false, true, false,
false)));
return result;
diff --git
a/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
b/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
index a6be2248d34..34cdd04dcb1 100644
---
a/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
+++
b/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import
org.apache.shardingsphere.database.protocol.postgresql.packet.command.query.extended.PostgreSQLColumnType;
import
org.apache.shardingsphere.database.protocol.postgresql.packet.command.query.extended.bind.PostgreSQLTypeUnspecifiedSQLParameter;
import
org.apache.shardingsphere.infra.binder.context.statement.type.dml.InsertStatementContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
@@ -62,6 +63,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -143,11 +145,12 @@ class PostgreSQLBatchedStatementsExecutorTest {
when(result.getMetaDataContexts().getMetaData().getDatabase("db")).thenReturn(database);
RuleMetaData globalRuleMetaData = new
RuleMetaData(Collections.singleton(new SQLTranslatorRule(new
DefaultSQLTranslatorRuleConfigurationBuilder().build())));
when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
+
when(result.getMetaDataContexts().getMetaData().getProps()).thenReturn(new
ConfigurationProperties(new Properties()));
return result;
}
private ConnectionSession mockConnectionSession() {
- ConnectionSession result = mock(ConnectionSession.class);
+ ConnectionSession result = mock(ConnectionSession.class,
RETURNS_DEEP_STUBS);
when(result.getCurrentDatabaseName()).thenReturn("db");
when(result.getUsedDatabaseName()).thenReturn("db");
when(result.getDatabaseConnectionManager()).thenReturn(databaseConnectionManager);
diff --git
a/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/describe/PostgreSQLComDescribeExecutorTest.java
b/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/describe/PostgreSQLComDescribeExecutorTest.java
index 50f04dea154..71d5b317f3c 100644
---
a/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/describe/PostgreSQLComDescribeExecutorTest.java
+++
b/proxy/frontend/dialect/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/describe/PostgreSQLComDescribeExecutorTest.java
@@ -31,7 +31,7 @@ import
org.apache.shardingsphere.database.protocol.postgresql.payload.PostgreSQL
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.type.dml.InsertStatementContext;
import
org.apache.shardingsphere.infra.binder.context.statement.type.dml.SelectStatementContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.hint.HintValueContext;
@@ -79,6 +79,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -379,7 +380,7 @@ class PostgreSQLComDescribeExecutorTest {
private ContextManager mockContextManager() {
ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS);
-
when(result.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.SQL_SHOW)).thenReturn(false);
+
when(result.getMetaDataContexts().getMetaData().getProps()).thenReturn(new
ConfigurationProperties(new Properties()));
when(connectionSession.getUsedDatabaseName()).thenReturn(DATABASE_NAME);
when(connectionSession.getCurrentDatabaseName()).thenReturn(DATABASE_NAME);
when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(new
ServerPreparedStatementRegistry());