This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d0f057351f0 Refactor GlobalClockTransactionHook (#35040)
d0f057351f0 is described below

commit d0f057351f0b35091a42ed427b91a9be36dfcd14
Author: Liang Zhang <zhangli...@apache.org>
AuthorDate: Thu Mar 20 18:02:51 2025 +0800

    Refactor GlobalClockTransactionHook (#35040)
    
    * Refactor GlobalClockTransactionHook
    
    * Refactor GlobalClockTransactionHook
---
 .../executor/GlobalClockTransactionHook.java       | 21 ++++++------
 .../executor/GlobalClockTransactionHookTest.java   | 11 +------
 .../type/tso/provider/redis/RedisTSOProvider.java  |  2 ++
 .../transaction/BackendTransactionManager.java     | 37 +++++++++++-----------
 4 files changed, 33 insertions(+), 38 deletions(-)

diff --git 
a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java
 
b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java
index c6093c1d03e..01c6562a080 100644
--- 
a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java
+++ 
b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHook.java
@@ -84,25 +84,26 @@ public final class GlobalClockTransactionHook implements 
TransactionHook<GlobalC
         if (!rule.getConfiguration().isEnabled()) {
             return;
         }
-        if (lockContext.tryLock(lockDefinition, 200L)) {
-            Optional<GlobalClockTransactionExecutor> 
globalClockTransactionExecutor = 
DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, 
databaseType);
-            if (!globalClockTransactionExecutor.isPresent()) {
-                return;
-            }
-            Optional<GlobalClockProvider> globalClockProvider = 
rule.getGlobalClockProvider();
-            Preconditions.checkState(globalClockProvider.isPresent());
-            
globalClockTransactionExecutor.get().sendCommitTimestamp(connections, 
globalClockProvider.get().getCurrentTimestamp());
+        // FIXME if timeout when lock required, TSO not assigned, but commit 
will continue, solution is use redis lock in impl to instead of reg center's 
lock. #35041
+        if (!lockContext.tryLock(lockDefinition, 200L)) {
+            return;
         }
+        Optional<GlobalClockTransactionExecutor> 
globalClockTransactionExecutor = 
DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, 
databaseType);
+        
Preconditions.checkArgument(globalClockTransactionExecutor.isPresent());
+        Optional<GlobalClockProvider> globalClockProvider = 
rule.getGlobalClockProvider();
+        Preconditions.checkState(globalClockProvider.isPresent());
+        globalClockTransactionExecutor.get().sendCommitTimestamp(connections, 
globalClockProvider.get().getCurrentTimestamp());
     }
     
     @Override
     public void afterCommit(final GlobalClockRule rule, final DatabaseType 
databaseType, final Collection<Connection> connections, final 
TransactionConnectionContext transactionContext,
                             final LockContext lockContext) {
-        Optional<GlobalClockProvider> globalClockProvider = 
rule.getGlobalClockProvider();
-        if (!globalClockProvider.isPresent()) {
+        if (!rule.getConfiguration().isEnabled()) {
             return;
         }
         try {
+            Optional<GlobalClockProvider> globalClockProvider = 
rule.getGlobalClockProvider();
+            Preconditions.checkState(globalClockProvider.isPresent());
             globalClockProvider.get().getNextTimestamp();
         } finally {
             lockContext.unlock(lockDefinition);
diff --git 
a/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java
 
b/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java
index 6e092cfd2bf..0557bfe2208 100644
--- 
a/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java
+++ 
b/kernel/global-clock/core/src/test/java/org/apache/shardingsphere/globalclock/executor/GlobalClockTransactionHookTest.java
@@ -160,16 +160,6 @@ class GlobalClockTransactionHookTest {
         verify(globalClockTransactionExecutor, 
times(0)).sendCommitTimestamp(any(), anyLong());
     }
     
-    @Test
-    void assertBeforeCommitWhenGlobalClockTransactionExecutorAbsent() throws 
SQLException {
-        when(rule.getConfiguration().isEnabled()).thenReturn(true);
-        LockContext lockContext = mock(LockContext.class);
-        when(lockContext.tryLock(any(), anyLong())).thenReturn(true);
-        
when(DatabaseTypedSPILoader.findService(GlobalClockTransactionExecutor.class, 
databaseType)).thenReturn(Optional.empty());
-        transactionHook.beforeCommit(rule, databaseType, 
Collections.emptyList(), transactionContext, lockContext);
-        verify(globalClockTransactionExecutor, 
times(0)).sendCommitTimestamp(any(), anyLong());
-    }
-    
     @Test
     void assertBeforeCommit() throws SQLException {
         when(rule.getConfiguration().isEnabled()).thenReturn(true);
@@ -190,6 +180,7 @@ class GlobalClockTransactionHookTest {
     
     @Test
     void assertAfterCommitWhenGlobalClockProviderPresent() {
+        when(rule.getConfiguration().isEnabled()).thenReturn(true);
         
when(rule.getGlobalClockProvider()).thenReturn(Optional.of(globalClockProvider));
         transactionHook.afterCommit(rule, databaseType, 
Collections.emptyList(), transactionContext, mock(LockContext.class));
         verify(globalClockProvider).getNextTimestamp();
diff --git 
a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/redis/RedisTSOProvider.java
 
b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/redis/RedisTSOProvider.java
index 71fe5cd3224..747d38ecbba 100644
--- 
a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/redis/RedisTSOProvider.java
+++ 
b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/redis/RedisTSOProvider.java
@@ -80,6 +80,7 @@ public final class RedisTSOProvider implements TSOProvider {
     @Override
     public long getCurrentTimestamp() {
         try (Jedis jedis = jedisPool.getResource()) {
+            // TODO use redis lock to instead of reg center's lock. lock here 
#35041
             return Long.parseLong(jedis.get(CSN_KEY));
         }
     }
@@ -88,6 +89,7 @@ public final class RedisTSOProvider implements TSOProvider {
     public long getNextTimestamp() {
         try (Jedis jedis = jedisPool.getResource()) {
             return jedis.incr(CSN_KEY);
+            // TODO use redis lock to instead of reg center's lock. unlock 
here #35041
         }
     }
     
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java
index 3108d904ddb..a93c4cceaa5 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java
@@ -98,26 +98,27 @@ public final class BackendTransactionManager implements 
TransactionManager {
     public void commit() throws SQLException {
         DatabaseType databaseType = 
ProxyContext.getInstance().getDatabaseType();
         LockContext lockContext = 
ProxyContext.getInstance().getContextManager().getLockContext();
-        for (Entry<ShardingSphereRule, TransactionHook> entry : 
transactionHooks.entrySet()) {
-            entry.getValue().beforeCommit(entry.getKey(), databaseType, 
connection.getCachedConnections().values(), getTransactionContext(), 
lockContext);
+        if 
(!connection.getConnectionSession().getTransactionStatus().isInTransaction()) {
+            return;
         }
-        if 
(connection.getConnectionSession().getTransactionStatus().isInTransaction()) {
-            try {
-                if (TransactionType.LOCAL == 
TransactionUtils.getTransactionType(getTransactionContext()) || null == 
distributedTransactionManager) {
-                    localTransactionManager.commit();
-                } else {
-                    
distributedTransactionManager.commit(getTransactionContext().isExceptionOccur());
-                }
-            } finally {
-                for (Entry<ShardingSphereRule, TransactionHook> entry : 
transactionHooks.entrySet()) {
-                    entry.getValue().afterCommit(entry.getKey(), databaseType, 
connection.getCachedConnections().values(), getTransactionContext(), 
lockContext);
-                }
-                for (Connection each : 
connection.getCachedConnections().values()) {
-                    
ConnectionSavepointManager.getInstance().transactionFinished(each);
-                }
-                
connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
-                
connection.getConnectionSession().getConnectionContext().close();
+        try {
+            for (Entry<ShardingSphereRule, TransactionHook> entry : 
transactionHooks.entrySet()) {
+                entry.getValue().beforeCommit(entry.getKey(), databaseType, 
connection.getCachedConnections().values(), getTransactionContext(), 
lockContext);
+            }
+            if (TransactionType.LOCAL == 
TransactionUtils.getTransactionType(getTransactionContext()) || null == 
distributedTransactionManager) {
+                localTransactionManager.commit();
+            } else {
+                
distributedTransactionManager.commit(getTransactionContext().isExceptionOccur());
+            }
+        } finally {
+            for (Entry<ShardingSphereRule, TransactionHook> entry : 
transactionHooks.entrySet()) {
+                entry.getValue().afterCommit(entry.getKey(), databaseType, 
connection.getCachedConnections().values(), getTransactionContext(), 
lockContext);
+            }
+            for (Connection each : connection.getCachedConnections().values()) 
{
+                
ConnectionSavepointManager.getInstance().transactionFinished(each);
             }
+            
connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
+            connection.getConnectionSession().getConnectionContext().close();
         }
     }
     

Reply via email to