This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c7ed0f05b2 Add DistributedLockHolder in cluster-mode-repository-core 
module (#35111)
4c7ed0f05b2 is described below

commit 4c7ed0f05b2c933edd624515562399e6a356b068
Author: Liang Zhang <zhangli...@apache.org>
AuthorDate: Sun Mar 30 22:17:31 2025 +0800

    Add DistributedLockHolder in cluster-mode-repository-core module (#35111)
    
    * Add DistributedLockHolder in cluster-mode-repository-core module
    
    * Add DistributedLockHolder in cluster-mode-repository-core module
---
 .../lock/global/GlobalLockPersistService.java      | 22 +++++++----
 .../fixture/ClusterPersistRepositoryFixture.java   |  6 +++
 .../lock/global/GlobalLockPersistServiceTest.java  | 30 ++++++++------
 .../cluster/ClusterPersistRepository.java          |  9 +++++
 .../cluster/core/lock/DistributedLockHolder.java   | 46 ++++++++++++++++++++++
 .../repository/cluster/etcd/EtcdRepository.java    |  7 ++++
 .../cluster/zookeeper/ZookeeperRepository.java     |  7 ++++
 .../fixture/ProxyPersistRepositoryFixture.java     |  6 +++
 8 files changed, 113 insertions(+), 20 deletions(-)

diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistService.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistService.java
index 49e6bfcec15..e4ae3065c9f 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistService.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistService.java
@@ -17,22 +17,22 @@
 
 package org.apache.shardingsphere.mode.manager.cluster.lock.global;
 
+import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.core.lock.DefaultDistributedLock;
+import 
org.apache.shardingsphere.mode.repository.cluster.core.lock.DistributedLockHolder;
 import 
org.apache.shardingsphere.mode.repository.cluster.core.lock.props.DefaultLockTypedProperties;
-import 
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
+import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
 
 import java.util.Properties;
 
 /**
  * Global lock persist service.
  */
+@RequiredArgsConstructor
 public final class GlobalLockPersistService {
     
-    private final DistributedLockHolder lockHolder;
-    
-    public GlobalLockPersistService(final ClusterPersistRepository repository) 
{
-        lockHolder = repository.getDistributedLockHolder().orElseGet(() -> new 
DistributedLockHolder("default", this, new DefaultLockTypedProperties(new 
Properties())));
-    }
+    private final ClusterPersistRepository repository;
     
     /**
      * Try lock.
@@ -42,7 +42,8 @@ public final class GlobalLockPersistService {
      * @return is locked or not
      */
     public boolean tryLock(final GlobalLockDefinition lockDefinition, final 
long timeoutMillis) {
-        return 
lockHolder.getDistributedLock(lockDefinition.getLockKey()).tryLock(timeoutMillis);
+        String lockKey = lockDefinition.getLockKey();
+        return DistributedLockHolder.getDistributedLock(lockKey, () -> 
getDistributedLock(lockKey)).tryLock(timeoutMillis);
     }
     
     /**
@@ -51,6 +52,11 @@ public final class GlobalLockPersistService {
      * @param lockDefinition lock definition
      */
     public void unlock(final GlobalLockDefinition lockDefinition) {
-        lockHolder.getDistributedLock(lockDefinition.getLockKey()).unlock();
+        String lockKey = lockDefinition.getLockKey();
+        DistributedLockHolder.getDistributedLock(lockKey, () -> 
getDistributedLock(lockKey)).unlock();
+    }
+    
+    private DistributedLock getDistributedLock(final String lockKey) {
+        return repository.getDistributedLock(lockKey).orElseGet(() -> new 
DefaultDistributedLock(lockKey, repository, new DefaultLockTypedProperties(new 
Properties())));
     }
 }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/fixture/ClusterPersistRepositoryFixture.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/fixture/ClusterPersistRepositoryFixture.java
index ce4eeb2f776..b2ed671279d 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/fixture/ClusterPersistRepositoryFixture.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/fixture/ClusterPersistRepositoryFixture.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
 import 
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 import 
org.apache.shardingsphere.mode.repository.cluster.core.lock.props.DefaultLockTypedProperties;
 
@@ -72,6 +73,11 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
         return Optional.of(new DistributedLockHolder("default", this, new 
DefaultLockTypedProperties(new Properties())));
     }
     
+    @Override
+    public Optional<DistributedLock> getDistributedLock(final String lockKey) {
+        return Optional.empty();
+    }
+    
     @Override
     public void delete(final String key) {
     }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistServiceTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistServiceTest.java
index 338eab7dabf..a6fc14d8bfd 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistServiceTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistServiceTest.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.mode.manager.cluster.lock.global;
 
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import 
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
+import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -29,8 +29,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -40,29 +38,37 @@ class GlobalLockPersistServiceTest {
     @Mock
     private GlobalLock globalLock;
     
+    @Mock
+    private DistributedLock distributedLock;
+    
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ClusterPersistRepository repository;
     
+    private GlobalLockPersistService globalLockPersistService;
+    
     @BeforeEach
     void setUp() {
-        DistributedLockHolder distributedLockHolder = 
mock(DistributedLockHolder.class, RETURNS_DEEP_STUBS);
-        
when(repository.getDistributedLockHolder()).thenReturn(Optional.of(distributedLockHolder));
-        when(globalLock.getName()).thenReturn("foo_lock");
+        globalLockPersistService = new GlobalLockPersistService(repository);
     }
     
-    @SuppressWarnings("OptionalGetWithoutIsPresent")
     @Test
     void assertTryLock() {
-        
when(repository.getDistributedLockHolder().get().getDistributedLock("/lock/global/locks/foo_lock").tryLock(1000L)).thenReturn(true);
+        mockLock("foo_lock", "/lock/global/locks/foo_lock");
+        when(distributedLock.tryLock(1000L)).thenReturn(true);
         GlobalLockDefinition lockDefinition = new 
GlobalLockDefinition(globalLock);
-        assertTrue(new 
GlobalLockPersistService(repository).tryLock(lockDefinition, 1000L));
+        assertTrue(globalLockPersistService.tryLock(lockDefinition, 1000L));
     }
     
-    @SuppressWarnings("OptionalGetWithoutIsPresent")
     @Test
     void assertUnlock() {
+        mockLock("bar_lock", "/lock/global/locks/bar_lock");
         GlobalLockDefinition lockDefinition = new 
GlobalLockDefinition(globalLock);
-        new GlobalLockPersistService(repository).unlock(lockDefinition);
-        
verify(repository.getDistributedLockHolder().get().getDistributedLock("/lock/global/locks/foo_lock")).unlock();
+        globalLockPersistService.unlock(lockDefinition);
+        verify(distributedLock).unlock();
+    }
+    
+    private void mockLock(final String lockName, final String lockKey) {
+        when(globalLock.getName()).thenReturn(lockName);
+        
when(repository.getDistributedLock(lockKey)).thenReturn(Optional.of(distributedLock));
     }
 }
diff --git 
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
 
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
index 1579bf32275..bd1db5ab955 100644
--- 
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++ 
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.mode.repository.cluster;
 
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
 import 
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 import org.apache.shardingsphere.mode.spi.repository.PersistRepository;
 
@@ -61,6 +62,14 @@ public interface ClusterPersistRepository extends 
PersistRepository {
      */
     Optional<DistributedLockHolder> getDistributedLockHolder();
     
+    /**
+     * Get distributed lock.
+     *
+     * @param lockKey kock key
+     * @return distributed lock
+     */
+    Optional<DistributedLock> getDistributedLock(String lockKey);
+    
     /**
      * Watch key or path of governance server.
      *
diff --git 
a/mode/type/cluster/repository/core/src/main/java/org/apache/shardingsphere/mode/repository/cluster/core/lock/DistributedLockHolder.java
 
b/mode/type/cluster/repository/core/src/main/java/org/apache/shardingsphere/mode/repository/cluster/core/lock/DistributedLockHolder.java
new file mode 100644
index 00000000000..9aa916444fe
--- /dev/null
+++ 
b/mode/type/cluster/repository/core/src/main/java/org/apache/shardingsphere/mode/repository/cluster/core/lock/DistributedLockHolder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.repository.cluster.core.lock;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+/**
+ * Distributed lock holder.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DistributedLockHolder {
+    
+    private static final Map<String, DistributedLock> LOCKS = new 
ConcurrentHashMap<>();
+    
+    /**
+     * Get distributed lock.
+     *
+     * @param lockKey lock key
+     * @param distributedLock distributed lock if absent
+     * @return got distributed lock
+     */
+    public static DistributedLock getDistributedLock(final String lockKey, 
final Supplier<DistributedLock> distributedLock) {
+        return LOCKS.computeIfAbsent(lockKey, key -> distributedLock.get());
+    }
+}
diff --git 
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
 
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
index 20213cfdb28..09ad849800a 100644
--- 
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++ 
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -39,9 +39,11 @@ import org.apache.shardingsphere.mode.event.DataChangedEvent;
 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import 
org.apache.shardingsphere.mode.repository.cluster.etcd.lock.EtcdDistributedLock;
 import 
org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
 import 
org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdPropertyKey;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
 import 
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 
 import java.nio.charset.StandardCharsets;
@@ -137,6 +139,11 @@ public final class EtcdRepository implements 
ClusterPersistRepository {
         return Optional.of(distributedLockHolder);
     }
     
+    @Override
+    public Optional<DistributedLock> getDistributedLock(final String lockKey) {
+        return Optional.of(new EtcdDistributedLock(lockKey, client, 
etcdProps));
+    }
+    
     private void buildParentPath(final String key) throws ExecutionException, 
InterruptedException {
         StringBuilder parentPath = new StringBuilder();
         String[] partPath = key.split(PATH_SEPARATOR);
diff --git 
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
 
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
index e38d39ff0f1..e1f15a850f8 100644
--- 
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
+++ 
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
@@ -33,9 +33,11 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import 
org.apache.shardingsphere.mode.repository.cluster.exception.ClusterRepositoryPersistException;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
 import 
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.exception.ZookeeperExceptionHandler;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionReconnectListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.lock.ZookeeperDistributedLock;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties;
 import 
org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperPropertyKey;
 import org.apache.zookeeper.CreateMode;
@@ -227,6 +229,11 @@ public final class ZookeeperRepository implements 
ClusterPersistRepository {
         return Optional.of(distributedLockHolder);
     }
     
+    @Override
+    public Optional<DistributedLock> getDistributedLock(final String lockKey) {
+        return Optional.of(new ZookeeperDistributedLock(lockKey, client));
+    }
+    
     @Override
     public void delete(final String key) {
         try {
diff --git 
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ProxyPersistRepositoryFixture.java
 
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ProxyPersistRepositoryFixture.java
index 0fc89ded490..ac867a86597 100644
--- 
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ProxyPersistRepositoryFixture.java
+++ 
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ProxyPersistRepositoryFixture.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import 
org.apache.shardingsphere.mode.repository.cluster.core.lock.props.DefaultLockTypedProperties;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
+import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock;
 import 
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 
 import java.util.Collections;
@@ -78,6 +79,11 @@ public final class ProxyPersistRepositoryFixture implements 
ClusterPersistReposi
         return Optional.of(new DistributedLockHolder("default", this, new 
DefaultLockTypedProperties(new Properties())));
     }
     
+    @Override
+    public Optional<DistributedLock> getDistributedLock(final String lockKey) {
+        return Optional.empty();
+    }
+    
     @Override
     public void delete(final String key) {
     }

Reply via email to