This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 22529241907 Refactor lock position on BackendTransactionManager (#35046) 22529241907 is described below commit 22529241907d62a51629db904bcf0641ba308a6b Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Thu Mar 20 18:31:55 2025 +0800 Refactor lock position on BackendTransactionManager (#35046) * Refactor lock position on BackendTransactionManager * Refactor lock position on BackendTransactionManager --- .../executor/GlobalClockTransactionHook.java | 23 ++++++++-------------- .../executor/GlobalClockTransactionHookTest.java | 9 --------- .../transaction/spi/TransactionHook.java | 7 +++++++ .../transaction/BackendTransactionManager.java | 12 +++++++++++ 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java index 01c6562a080..4d563fda00d 100644 --- a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java +++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java @@ -24,9 +24,7 @@ import org.apache.shardingsphere.globalclock.rule.constant.GlobalClockOrder; import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.lock.LockContext; -import org.apache.shardingsphere.infra.lock.LockDefinition; import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext; -import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition; import org.apache.shardingsphere.sql.parser.statement.core.enums.TransactionIsolationLevel; import org.apache.shardingsphere.transaction.spi.TransactionHook; @@ -40,8 +38,6 @@ import java.util.Optional; */ public final class GlobalClockTransactionHook implements TransactionHook<GlobalClockRule> { - private final LockDefinition lockDefinition = new GlobalLockDefinition(new GlobalClockLock()); - @Override public void beforeBegin(final GlobalClockRule rule, final DatabaseType databaseType, final TransactionConnectionContext transactionContext) { } @@ -84,10 +80,6 @@ public final class GlobalClockTransactionHook implements TransactionHook<GlobalC if (!rule.getConfiguration().isEnabled()) { return; } - // FIXME if timeout when lock required, TSO not assigned, but commit will continue, solution is use redis lock in impl to instead of reg center's lock. #35041 - if (!lockContext.tryLock(lockDefinition, 200L)) { - return; - } Optional<GlobalClockTransactionExecutor> globalClockTransactionExecutor = DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType); Preconditions.checkArgument(globalClockTransactionExecutor.isPresent()); Optional<GlobalClockProvider> globalClockProvider = rule.getGlobalClockProvider(); @@ -101,13 +93,14 @@ public final class GlobalClockTransactionHook implements TransactionHook<GlobalC if (!rule.getConfiguration().isEnabled()) { return; } - try { - Optional<GlobalClockProvider> globalClockProvider = rule.getGlobalClockProvider(); - Preconditions.checkState(globalClockProvider.isPresent()); - globalClockProvider.get().getNextTimestamp(); - } finally { - lockContext.unlock(lockDefinition); - } + Optional<GlobalClockProvider> globalClockProvider = rule.getGlobalClockProvider(); + Preconditions.checkState(globalClockProvider.isPresent()); + globalClockProvider.get().getNextTimestamp(); + } + + @Override + public boolean isNeedLockWhenCommit() { + return true; } @Override diff --git a/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java b/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java index 0557bfe2208..f3b9ee69d4f 100644 --- a/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java +++ b/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java @@ -152,21 +152,12 @@ class GlobalClockTransactionHookTest { verify(lockContext, times(0)).tryLock(any(), anyLong()); } - @Test - void assertBeforeCommitWhenTryLockFailed() throws SQLException { - when(rule.getConfiguration().isEnabled()).thenReturn(true); - LockContext lockContext = mock(LockContext.class); - transactionHook.beforeCommit(rule, databaseType, Collections.emptyList(), transactionContext, lockContext); - verify(globalClockTransactionExecutor, times(0)).sendCommitTimestamp(any(), anyLong()); - } - @Test void assertBeforeCommit() throws SQLException { when(rule.getConfiguration().isEnabled()).thenReturn(true); when(rule.getGlobalClockProvider()).thenReturn(Optional.of(globalClockProvider)); when(globalClockProvider.getCurrentTimestamp()).thenReturn(10L); LockContext lockContext = mock(LockContext.class); - when(lockContext.tryLock(any(), anyLong())).thenReturn(true); when(DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType)).thenReturn(Optional.of(globalClockTransactionExecutor)); transactionHook.beforeCommit(rule, databaseType, Collections.emptyList(), transactionContext, lockContext); verify(globalClockTransactionExecutor).sendCommitTimestamp(Collections.emptyList(), 10L); diff --git a/kernel/transaction/api/src/main/java/org/apache/shardingsphere/transaction/spi/TransactionHook.java b/kernel/transaction/api/src/main/java/org/apache/shardingsphere/transaction/spi/TransactionHook.java index a02673b8d27..1eeee79a3c7 100644 --- a/kernel/transaction/api/src/main/java/org/apache/shardingsphere/transaction/spi/TransactionHook.java +++ b/kernel/transaction/api/src/main/java/org/apache/shardingsphere/transaction/spi/TransactionHook.java @@ -103,6 +103,13 @@ public interface TransactionHook<T extends ShardingSphereRule> extends OrderedSP */ void afterCommit(T rule, DatabaseType databaseType, Collection<Connection> connections, TransactionConnectionContext transactionContext, LockContext lockContext) throws SQLException; + /** + * Whether to need lock when transaction committed. + * + * @return need lock or not + */ + boolean isNeedLockWhenCommit(); + /** * Process before rolling back the transaction. * diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java index a93c4cceaa5..b83446e15e8 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java @@ -17,11 +17,14 @@ package org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction; +import org.apache.shardingsphere.globalclock.executor.GlobalClockLock; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.lock.LockContext; +import org.apache.shardingsphere.infra.lock.LockDefinition; import org.apache.shardingsphere.infra.rule.ShardingSphereRule; import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext; import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader; +import org.apache.shardingsphere.mode.lock.global.GlobalLockDefinition; import org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager; import org.apache.shardingsphere.proxy.backend.connector.TransactionManager; import org.apache.shardingsphere.proxy.backend.context.ProxyContext; @@ -101,7 +104,13 @@ public final class BackendTransactionManager implements TransactionManager { if (!connection.getConnectionSession().getTransactionStatus().isInTransaction()) { return; } + boolean isNeedLock = transactionHooks.values().stream().anyMatch(TransactionHook::isNeedLockWhenCommit); + LockDefinition lockDefinition = new GlobalLockDefinition(new GlobalClockLock()); try { + // FIXME if timeout when lock required, TSO not assigned, but commit will continue, solution is use redis lock in impl to instead of reg center's lock. #35041 + if (isNeedLock && !lockContext.tryLock(lockDefinition, 200L)) { + return; + } for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) { entry.getValue().beforeCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext(), lockContext); } @@ -114,6 +123,9 @@ public final class BackendTransactionManager implements TransactionManager { for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) { entry.getValue().afterCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext(), lockContext); } + if (isNeedLock) { + lockContext.unlock(lockDefinition); + } for (Connection each : connection.getCachedConnections().values()) { ConnectionSavepointManager.getInstance().transactionFinished(each); }