This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 7362f112db0 Add TransactionOption (#35180) 7362f112db0 is described below commit 7362f112db0da22eb857801dd3b5eea05eb619c5 Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Sun Apr 13 18:25:27 2025 +0800 Add TransactionOption (#35180) * Add TransactionOption * Add TransactionOption --- .../metadata/database/DialectDatabaseMetaData.java | 46 +++------------------- .../database/option/TransactionOption.java | 40 +++++++++++++++++++ .../database/FirebirdDatabaseMetaData.java | 5 ++- .../metadata/database/MySQLDatabaseMetaData.java | 5 ++- .../database/OpenGaussDatabaseMetaData.java | 15 ++----- .../database/PostgreSQLDatabaseMetaData.java | 10 ++--- .../jdbc/DriverJDBCPushDownExecuteExecutor.java | 8 ++-- .../DriverJDBCPushDownExecuteUpdateExecutor.java | 8 ++-- .../pipeline/cdc/handler/CDCBackendHandler.java | 5 +-- .../proxy/backend/connector/ProxySQLExecutor.java | 9 ++--- .../backend/handler/tcl/TCLBackendHandler.java | 2 +- 11 files changed, 72 insertions(+), 81 deletions(-) diff --git a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java index a33aa501de6..335babd1ffc 100644 --- a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java +++ b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/DialectDatabaseMetaData.java @@ -20,6 +20,7 @@ package org.apache.shardingsphere.infra.database.core.metadata.database; import org.apache.shardingsphere.infra.database.core.metadata.database.enums.NullsOrderType; import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter; import org.apache.shardingsphere.infra.database.core.metadata.database.option.JoinOrderOption; +import org.apache.shardingsphere.infra.database.core.metadata.database.option.TransactionOption; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; @@ -132,49 +133,12 @@ public interface DialectDatabaseMetaData extends DatabaseTypedSPI { } /** - * Is support global CSN. + * Get transaction option. * - * @return support or not - */ - default boolean isSupportGlobalCSN() { - return false; - } - - /** - * Whether DDL need implicit commit. - * - * @return need or not - */ - default boolean isDDLNeedImplicitCommit() { - return false; - } - - /** - * Whether support auto commit when nested transaction. - * - * @return support or not - */ - default boolean isSupportAutoCommitInNestedTransaction() { - return false; - } - - /** - * Whether support DDL in XA transaction. - * - * @return support or not - */ - default boolean isSupportDDLInXATransaction() { - return false; - } - - /** - * Whether support meta data refresh in transaction. - * - * @return support or not + * @return transaction option */ - // TODO Investgate the reason of some databases cannot support meta data refreshed in transaction. The method should be removed finally after metadata refresh supported for all database. - default boolean isSupportMetaDataRefreshInTransaction() { - return true; + default TransactionOption getTransactionOption() { + return new TransactionOption(false, false, false, false, true); } /** diff --git a/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/option/TransactionOption.java b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/option/TransactionOption.java new file mode 100644 index 00000000000..e0f388f7498 --- /dev/null +++ b/infra/database/core/src/main/java/org/apache/shardingsphere/infra/database/core/metadata/database/option/TransactionOption.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.database.core.metadata.database.option; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Transaction option. + */ +@RequiredArgsConstructor +@Getter +public final class TransactionOption { + + private final boolean isSupportGlobalCSN; + + private final boolean isDDLNeedImplicitCommit; + + private final boolean isSupportAutoCommitInNestedTransaction; + + private final boolean isSupportDDLInXATransaction; + + // TODO Investgate the reason of some databases cannot support meta data refreshed in transaction. The method should be removed finally after metadata refresh supported for all database. + private final boolean isSupportMetaDataRefreshInTransaction; +} diff --git a/infra/database/type/firebird/src/main/java/org/apache/shardingsphere/infra/database/firebird/metadata/database/FirebirdDatabaseMetaData.java b/infra/database/type/firebird/src/main/java/org/apache/shardingsphere/infra/database/firebird/metadata/database/FirebirdDatabaseMetaData.java index cf86f821b7b..f43a641ecbe 100644 --- a/infra/database/type/firebird/src/main/java/org/apache/shardingsphere/infra/database/firebird/metadata/database/FirebirdDatabaseMetaData.java +++ b/infra/database/type/firebird/src/main/java/org/apache/shardingsphere/infra/database/firebird/metadata/database/FirebirdDatabaseMetaData.java @@ -20,6 +20,7 @@ package org.apache.shardingsphere.infra.database.firebird.metadata.database; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.metadata.database.enums.NullsOrderType; import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter; +import org.apache.shardingsphere.infra.database.core.metadata.database.option.TransactionOption; /** * Database metadata of Firebird. @@ -42,8 +43,8 @@ public final class FirebirdDatabaseMetaData implements DialectDatabaseMetaData { } @Override - public boolean isDDLNeedImplicitCommit() { - return true; + public TransactionOption getTransactionOption() { + return new TransactionOption(false, true, false, false, true); } @Override diff --git a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/database/MySQLDatabaseMetaData.java b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/database/MySQLDatabaseMetaData.java index 8329a4a1b96..7c70c6d03e9 100644 --- a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/database/MySQLDatabaseMetaData.java +++ b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/database/MySQLDatabaseMetaData.java @@ -21,6 +21,7 @@ import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDa import org.apache.shardingsphere.infra.database.core.metadata.database.enums.NullsOrderType; import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter; import org.apache.shardingsphere.infra.database.core.metadata.database.option.JoinOrderOption; +import org.apache.shardingsphere.infra.database.core.metadata.database.option.TransactionOption; import java.math.BigInteger; import java.sql.Types; @@ -84,8 +85,8 @@ public final class MySQLDatabaseMetaData implements DialectDatabaseMetaData { } @Override - public boolean isSupportAutoCommitInNestedTransaction() { - return true; + public TransactionOption getTransactionOption() { + return new TransactionOption(false, false, true, false, true); } @Override diff --git a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java index 736caf735b2..fa869c2cd78 100644 --- a/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java +++ b/infra/database/type/opengauss/src/main/java/org/apache/shardingsphere/infra/database/opengauss/metadata/database/OpenGaussDatabaseMetaData.java @@ -21,6 +21,7 @@ import com.cedarsoftware.util.CaseInsensitiveMap; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.metadata.database.enums.NullsOrderType; import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter; +import org.apache.shardingsphere.infra.database.core.metadata.database.option.TransactionOption; import java.sql.Types; import java.util.Map; @@ -67,8 +68,8 @@ public final class OpenGaussDatabaseMetaData implements DialectDatabaseMetaData } @Override - public boolean isSupportGlobalCSN() { - return true; + public TransactionOption getTransactionOption() { + return new TransactionOption(true, false, false, true, false); } @Override @@ -76,16 +77,6 @@ public final class OpenGaussDatabaseMetaData implements DialectDatabaseMetaData return tableNamePattern.toLowerCase(); } - @Override - public boolean isSupportDDLInXATransaction() { - return true; - } - - @Override - public boolean isSupportMetaDataRefreshInTransaction() { - return false; - } - @Override public String getDatabaseType() { return "openGauss"; diff --git a/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/metadata/database/PostgreSQLDatabaseMetaData.java b/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/metadata/database/PostgreSQLDatabaseMetaData.java index 718c1b5f05b..5ca211aa756 100644 --- a/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/metadata/database/PostgreSQLDatabaseMetaData.java +++ b/infra/database/type/postgresql/src/main/java/org/apache/shardingsphere/infra/database/postgresql/metadata/database/PostgreSQLDatabaseMetaData.java @@ -21,6 +21,7 @@ import com.cedarsoftware.util.CaseInsensitiveMap; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.metadata.database.enums.NullsOrderType; import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter; +import org.apache.shardingsphere.infra.database.core.metadata.database.option.TransactionOption; import java.sql.Types; import java.util.Map; @@ -80,13 +81,8 @@ public final class PostgreSQLDatabaseMetaData implements DialectDatabaseMetaData } @Override - public boolean isSupportDDLInXATransaction() { - return true; - } - - @Override - public boolean isSupportMetaDataRefreshInTransaction() { - return false; + public TransactionOption getTransactionOption() { + return new TransactionOption(false, false, false, true, false); } @Override diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java index a385aca4ff0..a5b937b3512 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java @@ -27,8 +27,8 @@ import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConne import org.apache.shardingsphere.driver.jdbc.core.resultset.ShardingSphereResultSet; import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext; -import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; -import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; +import org.apache.shardingsphere.infra.database.core.metadata.database.option.TransactionOption; +import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext; @@ -132,8 +132,8 @@ public final class DriverJDBCPushDownExecuteExecutor { } private boolean isNeedImplicitCommit(final SQLStatementContext sqlStatementContext) { - DialectDatabaseMetaData dialectDatabaseMetaData = DatabaseTypedSPILoader.getService(DialectDatabaseMetaData.class, sqlStatementContext.getDatabaseType()); - return !connection.getAutoCommit() && sqlStatementContext.getSqlStatement() instanceof DDLStatement && dialectDatabaseMetaData.isDDLNeedImplicitCommit(); + TransactionOption transactionOption = new DatabaseTypeRegistry(sqlStatementContext.getDatabaseType()).getDialectDatabaseMetaData().getTransactionOption(); + return !connection.getAutoCommit() && sqlStatementContext.getSqlStatement() instanceof DDLStatement && transactionOption.isDDLNeedImplicitCommit(); } /** diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java index edf93f86a6c..169c9b262e1 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java @@ -26,8 +26,8 @@ import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConne import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; import org.apache.shardingsphere.infra.binder.context.type.TableAvailable; import org.apache.shardingsphere.infra.config.props.ConfigurationProperties; -import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; -import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; +import org.apache.shardingsphere.infra.database.core.metadata.database.option.TransactionOption; +import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext; import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext; @@ -139,8 +139,8 @@ public final class DriverJDBCPushDownExecuteUpdateExecutor { } private boolean isNeedImplicitCommit(final SQLStatementContext sqlStatementContext) { - DialectDatabaseMetaData dialectDatabaseMetaData = DatabaseTypedSPILoader.getService(DialectDatabaseMetaData.class, sqlStatementContext.getDatabaseType()); - return !connection.getAutoCommit() && sqlStatementContext.getSqlStatement() instanceof DDLStatement && dialectDatabaseMetaData.isDDLNeedImplicitCommit(); + TransactionOption transactionOption = new DatabaseTypeRegistry(sqlStatementContext.getDatabaseType()).getDialectDatabaseMetaData().getTransactionOption(); + return !connection.getAutoCommit() && sqlStatementContext.getSqlStatement() instanceof DDLStatement && transactionOption.isDDLNeedImplicitCommit(); } private boolean isNeedAccumulate(final Collection<ShardingSphereRule> rules, final SQLStatementContext sqlStatementContext) { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java index bbdace4ab03..b2067cf02bb 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java @@ -33,8 +33,8 @@ import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter; import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager; import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink; import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper; -import org.apache.shardingsphere.data.pipeline.cdc.exception.StreamDatabaseNotFoundException; import org.apache.shardingsphere.data.pipeline.cdc.exception.MissingRequiredStreamDataSourceException; +import org.apache.shardingsphere.data.pipeline.cdc.exception.StreamDatabaseNotFoundException; import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody; @@ -53,7 +53,6 @@ import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; -import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.datanode.DataNode; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; @@ -120,7 +119,7 @@ public final class CDCBackendHandler { Map<String, List<DataNode>> actualDataNodesMap = CDCDataNodeUtils.buildDataNodesMap(database, tableNames); ShardingSpherePreconditions.checkNotEmpty(actualDataNodesMap, () -> new PipelineInvalidParameterException(String.format("Not find table %s", tableNames))); // TODO Add globalCSNSupported to isolate it with decodeWithTx flag, they're different. And also update CDCJobPreparer needSorting flag. - boolean decodeWithTx = DatabaseTypedSPILoader.getService(DialectDatabaseMetaData.class, database.getProtocolType()).isSupportGlobalCSN(); + boolean decodeWithTx = new DatabaseTypeRegistry(database.getProtocolType()).getDialectDatabaseMetaData().getTransactionOption().isSupportGlobalCSN(); StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new ArrayList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx); String jobId = jobAPI.create(parameter, CDCSinkType.SOCKET, new Properties()); connectionContext.setJobId(jobId); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java index 393c23e1958..96e490801d5 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java @@ -22,8 +22,7 @@ import lombok.Getter; import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; import org.apache.shardingsphere.infra.binder.context.type.TableAvailable; import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey; -import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; -import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; +import org.apache.shardingsphere.infra.database.core.metadata.database.option.TransactionOption; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; @@ -138,13 +137,13 @@ public final class ProxySQLExecutor { } private boolean isSupportDDLInTransaction(final DDLStatement sqlStatement) { - DialectDatabaseMetaData dialectDatabaseMetaData = DatabaseTypedSPILoader.getService(DialectDatabaseMetaData.class, sqlStatement.getDatabaseType()); + TransactionOption transactionOption = new DatabaseTypeRegistry(sqlStatement.getDatabaseType()).getDialectDatabaseMetaData().getTransactionOption(); boolean isDDLWithoutMetaDataChanged = isDDLWithoutMetaDataChanged(sqlStatement); if (isInXATransaction()) { - return dialectDatabaseMetaData.isSupportDDLInXATransaction() && (isDDLWithoutMetaDataChanged || dialectDatabaseMetaData.isSupportMetaDataRefreshInTransaction()); + return transactionOption.isSupportDDLInXATransaction() && (isDDLWithoutMetaDataChanged || transactionOption.isSupportMetaDataRefreshInTransaction()); } if (isInLocalTransaction()) { - return dialectDatabaseMetaData.isSupportMetaDataRefreshInTransaction() || isDDLWithoutMetaDataChanged; + return transactionOption.isSupportMetaDataRefreshInTransaction() || isDDLWithoutMetaDataChanged; } return true; } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/tcl/TCLBackendHandler.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/tcl/TCLBackendHandler.java index 12e7c922ee4..fe50e07baca 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/tcl/TCLBackendHandler.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/tcl/TCLBackendHandler.java @@ -99,7 +99,7 @@ public final class TCLBackendHandler implements ProxyBackendHandler { private void handleBegin() throws SQLException { if (connectionSession.getTransactionStatus().isInTransaction()) { - if (dialectDatabaseMetaData.isSupportAutoCommitInNestedTransaction()) { + if (dialectDatabaseMetaData.getTransactionOption().isSupportAutoCommitInNestedTransaction()) { backendTransactionManager.commit(); } else if (dialectDatabaseMetaData.getDefaultSchema().isPresent()) { throw new InTransactionException();