This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new d0f057351f0 Refactor GlobalClockTransactionHook (#35040) d0f057351f0 is described below commit d0f057351f0b35091a42ed427b91a9be36dfcd14 Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Thu Mar 20 18:02:51 2025 +0800 Refactor GlobalClockTransactionHook (#35040) * Refactor GlobalClockTransactionHook * Refactor GlobalClockTransactionHook --- .../executor/GlobalClockTransactionHook.java | 21 ++++++------ .../executor/GlobalClockTransactionHookTest.java | 11 +------ .../type/tso/provider/redis/RedisTSOProvider.java | 2 ++ .../transaction/BackendTransactionManager.java | 37 +++++++++++----------- 4 files changed, 33 insertions(+), 38 deletions(-) diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java index c6093c1d03e..01c6562a080 100644 --- a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java +++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java @@ -84,25 +84,26 @@ public final class GlobalClockTransactionHook implements TransactionHook<GlobalC if (!rule.getConfiguration().isEnabled()) { return; } - if (lockContext.tryLock(lockDefinition, 200L)) { - Optional<GlobalClockTransactionExecutor> globalClockTransactionExecutor = DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType); - if (!globalClockTransactionExecutor.isPresent()) { - return; - } - Optional<GlobalClockProvider> globalClockProvider = rule.getGlobalClockProvider(); - Preconditions.checkState(globalClockProvider.isPresent()); - globalClockTransactionExecutor.get().sendCommitTimestamp(connections, globalClockProvider.get().getCurrentTimestamp()); + // FIXME if timeout when lock required, TSO not assigned, but commit will continue, solution is use redis lock in impl to instead of reg center's lock. #35041 + if (!lockContext.tryLock(lockDefinition, 200L)) { + return; } + Optional<GlobalClockTransactionExecutor> globalClockTransactionExecutor = DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType); + Preconditions.checkArgument(globalClockTransactionExecutor.isPresent()); + Optional<GlobalClockProvider> globalClockProvider = rule.getGlobalClockProvider(); + Preconditions.checkState(globalClockProvider.isPresent()); + globalClockTransactionExecutor.get().sendCommitTimestamp(connections, globalClockProvider.get().getCurrentTimestamp()); } @Override public void afterCommit(final GlobalClockRule rule, final DatabaseType databaseType, final Collection<Connection> connections, final TransactionConnectionContext transactionContext, final LockContext lockContext) { - Optional<GlobalClockProvider> globalClockProvider = rule.getGlobalClockProvider(); - if (!globalClockProvider.isPresent()) { + if (!rule.getConfiguration().isEnabled()) { return; } try { + Optional<GlobalClockProvider> globalClockProvider = rule.getGlobalClockProvider(); + Preconditions.checkState(globalClockProvider.isPresent()); globalClockProvider.get().getNextTimestamp(); } finally { lockContext.unlock(lockDefinition); diff --git a/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java b/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java index 6e092cfd2bf..0557bfe2208 100644 --- a/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java +++ b/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java @@ -160,16 +160,6 @@ class GlobalClockTransactionHookTest { verify(globalClockTransactionExecutor, times(0)).sendCommitTimestamp(any(), anyLong()); } - @Test - void assertBeforeCommitWhenGlobalClockTransactionExecutorAbsent() throws SQLException { - when(rule.getConfiguration().isEnabled()).thenReturn(true); - LockContext lockContext = mock(LockContext.class); - when(lockContext.tryLock(any(), anyLong())).thenReturn(true); - when(DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, databaseType)).thenReturn(Optional.empty()); - transactionHook.beforeCommit(rule, databaseType, Collections.emptyList(), transactionContext, lockContext); - verify(globalClockTransactionExecutor, times(0)).sendCommitTimestamp(any(), anyLong()); - } - @Test void assertBeforeCommit() throws SQLException { when(rule.getConfiguration().isEnabled()).thenReturn(true); @@ -190,6 +180,7 @@ class GlobalClockTransactionHookTest { @Test void assertAfterCommitWhenGlobalClockProviderPresent() { + when(rule.getConfiguration().isEnabled()).thenReturn(true); when(rule.getGlobalClockProvider()).thenReturn(Optional.of(globalClockProvider)); transactionHook.afterCommit(rule, databaseType, Collections.emptyList(), transactionContext, mock(LockContext.class)); verify(globalClockProvider).getNextTimestamp(); diff --git a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/redis/RedisTSOProvider.java b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/redis/RedisTSOProvider.java index 71fe5cd3224..747d38ecbba 100644 --- a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/redis/RedisTSOProvider.java +++ b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/redis/RedisTSOProvider.java @@ -80,6 +80,7 @@ public final class RedisTSOProvider implements TSOProvider { @Override public long getCurrentTimestamp() { try (Jedis jedis = jedisPool.getResource()) { + // TODO use redis lock to instead of reg center's lock. lock here #35041 return Long.parseLong(jedis.get(CSN_KEY)); } } @@ -88,6 +89,7 @@ public final class RedisTSOProvider implements TSOProvider { public long getNextTimestamp() { try (Jedis jedis = jedisPool.getResource()) { return jedis.incr(CSN_KEY); + // TODO use redis lock to instead of reg center's lock. unlock here #35041 } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java index 3108d904ddb..a93c4cceaa5 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java @@ -98,26 +98,27 @@ public final class BackendTransactionManager implements TransactionManager { public void commit() throws SQLException { DatabaseType databaseType = ProxyContext.getInstance().getDatabaseType(); LockContext lockContext = ProxyContext.getInstance().getContextManager().getLockContext(); - for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) { - entry.getValue().beforeCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext(), lockContext); + if (!connection.getConnectionSession().getTransactionStatus().isInTransaction()) { + return; } - if (connection.getConnectionSession().getTransactionStatus().isInTransaction()) { - try { - if (TransactionType.LOCAL == TransactionUtils.getTransactionType(getTransactionContext()) || null == distributedTransactionManager) { - localTransactionManager.commit(); - } else { - distributedTransactionManager.commit(getTransactionContext().isExceptionOccur()); - } - } finally { - for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) { - entry.getValue().afterCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext(), lockContext); - } - for (Connection each : connection.getCachedConnections().values()) { - ConnectionSavepointManager.getInstance().transactionFinished(each); - } - connection.getConnectionSession().getTransactionStatus().setInTransaction(false); - connection.getConnectionSession().getConnectionContext().close(); + try { + for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) { + entry.getValue().beforeCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext(), lockContext); + } + if (TransactionType.LOCAL == TransactionUtils.getTransactionType(getTransactionContext()) || null == distributedTransactionManager) { + localTransactionManager.commit(); + } else { + distributedTransactionManager.commit(getTransactionContext().isExceptionOccur()); + } + } finally { + for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) { + entry.getValue().afterCommit(entry.getKey(), databaseType, connection.getCachedConnections().values(), getTransactionContext(), lockContext); + } + for (Connection each : connection.getCachedConnections().values()) { + ConnectionSavepointManager.getInstance().transactionFinished(each); } + connection.getConnectionSession().getTransactionStatus().setInTransaction(false); + connection.getConnectionSession().getConnectionContext().close(); } }