This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 4c7ed0f05b2 Add DistributedLockHolder in cluster-mode-repository-core module (#35111) 4c7ed0f05b2 is described below commit 4c7ed0f05b2c933edd624515562399e6a356b068 Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Sun Mar 30 22:17:31 2025 +0800 Add DistributedLockHolder in cluster-mode-repository-core module (#35111) * Add DistributedLockHolder in cluster-mode-repository-core module * Add DistributedLockHolder in cluster-mode-repository-core module --- .../lock/global/GlobalLockPersistService.java | 22 +++++++---- .../fixture/ClusterPersistRepositoryFixture.java | 6 +++ .../lock/global/GlobalLockPersistServiceTest.java | 30 ++++++++------ .../cluster/ClusterPersistRepository.java | 9 +++++ .../cluster/core/lock/DistributedLockHolder.java | 46 ++++++++++++++++++++++ .../repository/cluster/etcd/EtcdRepository.java | 7 ++++ .../cluster/zookeeper/ZookeeperRepository.java | 7 ++++ .../fixture/ProxyPersistRepositoryFixture.java | 6 +++ 8 files changed, 113 insertions(+), 20 deletions(-) diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistService.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistService.java index 49e6bfcec15..e4ae3065c9f 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistService.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistService.java @@ -17,22 +17,22 @@ package org.apache.shardingsphere.mode.manager.cluster.lock.global; +import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; +import org.apache.shardingsphere.mode.repository.cluster.core.lock.DefaultDistributedLock; +import org.apache.shardingsphere.mode.repository.cluster.core.lock.DistributedLockHolder; import org.apache.shardingsphere.mode.repository.cluster.core.lock.props.DefaultLockTypedProperties; -import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; +import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; import java.util.Properties; /** * Global lock persist service. */ +@RequiredArgsConstructor public final class GlobalLockPersistService { - private final DistributedLockHolder lockHolder; - - public GlobalLockPersistService(final ClusterPersistRepository repository) { - lockHolder = repository.getDistributedLockHolder().orElseGet(() -> new DistributedLockHolder("default", this, new DefaultLockTypedProperties(new Properties()))); - } + private final ClusterPersistRepository repository; /** * Try lock. @@ -42,7 +42,8 @@ public final class GlobalLockPersistService { * @return is locked or not */ public boolean tryLock(final GlobalLockDefinition lockDefinition, final long timeoutMillis) { - return lockHolder.getDistributedLock(lockDefinition.getLockKey()).tryLock(timeoutMillis); + String lockKey = lockDefinition.getLockKey(); + return DistributedLockHolder.getDistributedLock(lockKey, () -> getDistributedLock(lockKey)).tryLock(timeoutMillis); } /** @@ -51,6 +52,11 @@ public final class GlobalLockPersistService { * @param lockDefinition lock definition */ public void unlock(final GlobalLockDefinition lockDefinition) { - lockHolder.getDistributedLock(lockDefinition.getLockKey()).unlock(); + String lockKey = lockDefinition.getLockKey(); + DistributedLockHolder.getDistributedLock(lockKey, () -> getDistributedLock(lockKey)).unlock(); + } + + private DistributedLock getDistributedLock(final String lockKey) { + return repository.getDistributedLock(lockKey).orElseGet(() -> new DefaultDistributedLock(lockKey, repository, new DefaultLockTypedProperties(new Properties()))); } } diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/fixture/ClusterPersistRepositoryFixture.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/fixture/ClusterPersistRepositoryFixture.java index ce4eeb2f776..b2ed671279d 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/fixture/ClusterPersistRepositoryFixture.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/fixture/ClusterPersistRepositoryFixture.java @@ -21,6 +21,7 @@ import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; +import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; import org.apache.shardingsphere.mode.repository.cluster.core.lock.props.DefaultLockTypedProperties; @@ -72,6 +73,11 @@ public final class ClusterPersistRepositoryFixture implements ClusterPersistRepo return Optional.of(new DistributedLockHolder("default", this, new DefaultLockTypedProperties(new Properties()))); } + @Override + public Optional<DistributedLock> getDistributedLock(final String lockKey) { + return Optional.empty(); + } + @Override public void delete(final String key) { } diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistServiceTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistServiceTest.java index 338eab7dabf..a6fc14d8bfd 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistServiceTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/lock/global/GlobalLockPersistServiceTest.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.mode.manager.cluster.lock.global; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; -import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; +import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -29,8 +29,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -40,29 +38,37 @@ class GlobalLockPersistServiceTest { @Mock private GlobalLock globalLock; + @Mock + private DistributedLock distributedLock; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) private ClusterPersistRepository repository; + private GlobalLockPersistService globalLockPersistService; + @BeforeEach void setUp() { - DistributedLockHolder distributedLockHolder = mock(DistributedLockHolder.class, RETURNS_DEEP_STUBS); - when(repository.getDistributedLockHolder()).thenReturn(Optional.of(distributedLockHolder)); - when(globalLock.getName()).thenReturn("foo_lock"); + globalLockPersistService = new GlobalLockPersistService(repository); } - @SuppressWarnings("OptionalGetWithoutIsPresent") @Test void assertTryLock() { - when(repository.getDistributedLockHolder().get().getDistributedLock("/lock/global/locks/foo_lock").tryLock(1000L)).thenReturn(true); + mockLock("foo_lock", "/lock/global/locks/foo_lock"); + when(distributedLock.tryLock(1000L)).thenReturn(true); GlobalLockDefinition lockDefinition = new GlobalLockDefinition(globalLock); - assertTrue(new GlobalLockPersistService(repository).tryLock(lockDefinition, 1000L)); + assertTrue(globalLockPersistService.tryLock(lockDefinition, 1000L)); } - @SuppressWarnings("OptionalGetWithoutIsPresent") @Test void assertUnlock() { + mockLock("bar_lock", "/lock/global/locks/bar_lock"); GlobalLockDefinition lockDefinition = new GlobalLockDefinition(globalLock); - new GlobalLockPersistService(repository).unlock(lockDefinition); - verify(repository.getDistributedLockHolder().get().getDistributedLock("/lock/global/locks/foo_lock")).unlock(); + globalLockPersistService.unlock(lockDefinition); + verify(distributedLock).unlock(); + } + + private void mockLock(final String lockName, final String lockKey) { + when(globalLock.getName()).thenReturn(lockName); + when(repository.getDistributedLock(lockKey)).thenReturn(Optional.of(distributedLock)); } } diff --git a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java index 1579bf32275..bd1db5ab955 100644 --- a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java +++ b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java @@ -19,6 +19,7 @@ package org.apache.shardingsphere.mode.repository.cluster; import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; +import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; import org.apache.shardingsphere.mode.spi.repository.PersistRepository; @@ -61,6 +62,14 @@ public interface ClusterPersistRepository extends PersistRepository { */ Optional<DistributedLockHolder> getDistributedLockHolder(); + /** + * Get distributed lock. + * + * @param lockKey kock key + * @return distributed lock + */ + Optional<DistributedLock> getDistributedLock(String lockKey); + /** * Watch key or path of governance server. * diff --git a/mode/type/cluster/repository/core/src/main/java/org/apache/shardingsphere/mode/repository/cluster/core/lock/DistributedLockHolder.java b/mode/type/cluster/repository/core/src/main/java/org/apache/shardingsphere/mode/repository/cluster/core/lock/DistributedLockHolder.java new file mode 100644 index 00000000000..9aa916444fe --- /dev/null +++ b/mode/type/cluster/repository/core/src/main/java/org/apache/shardingsphere/mode/repository/cluster/core/lock/DistributedLockHolder.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.repository.cluster.core.lock; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * Distributed lock holder. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class DistributedLockHolder { + + private static final Map<String, DistributedLock> LOCKS = new ConcurrentHashMap<>(); + + /** + * Get distributed lock. + * + * @param lockKey lock key + * @param distributedLock distributed lock if absent + * @return got distributed lock + */ + public static DistributedLock getDistributedLock(final String lockKey, final Supplier<DistributedLock> distributedLock) { + return LOCKS.computeIfAbsent(lockKey, key -> distributedLock.get()); + } +} diff --git a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java index 20213cfdb28..09ad849800a 100644 --- a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java +++ b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java @@ -39,9 +39,11 @@ import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; +import org.apache.shardingsphere.mode.repository.cluster.etcd.lock.EtcdDistributedLock; import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties; import org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdPropertyKey; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; +import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; import java.nio.charset.StandardCharsets; @@ -137,6 +139,11 @@ public final class EtcdRepository implements ClusterPersistRepository { return Optional.of(distributedLockHolder); } + @Override + public Optional<DistributedLock> getDistributedLock(final String lockKey) { + return Optional.of(new EtcdDistributedLock(lockKey, client, etcdProps)); + } + private void buildParentPath(final String key) throws ExecutionException, InterruptedException { StringBuilder parentPath = new StringBuilder(); String[] partPath = key.split(PATH_SEPARATOR); diff --git a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java index e38d39ff0f1..e1f15a850f8 100644 --- a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java +++ b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java @@ -33,9 +33,11 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterRepositoryPersistException; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; +import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; import org.apache.shardingsphere.mode.repository.cluster.zookeeper.exception.ZookeeperExceptionHandler; import org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionReconnectListener; +import org.apache.shardingsphere.mode.repository.cluster.zookeeper.lock.ZookeeperDistributedLock; import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties; import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperPropertyKey; import org.apache.zookeeper.CreateMode; @@ -227,6 +229,11 @@ public final class ZookeeperRepository implements ClusterPersistRepository { return Optional.of(distributedLockHolder); } + @Override + public Optional<DistributedLock> getDistributedLock(final String lockKey) { + return Optional.of(new ZookeeperDistributedLock(lockKey, client)); + } + @Override public void delete(final String key) { try { diff --git a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ProxyPersistRepositoryFixture.java b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ProxyPersistRepositoryFixture.java index 0fc89ded490..ac867a86597 100644 --- a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ProxyPersistRepositoryFixture.java +++ b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ProxyPersistRepositoryFixture.java @@ -22,6 +22,7 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; import org.apache.shardingsphere.mode.repository.cluster.core.lock.props.DefaultLockTypedProperties; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; +import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; import java.util.Collections; @@ -78,6 +79,11 @@ public final class ProxyPersistRepositoryFixture implements ClusterPersistReposi return Optional.of(new DistributedLockHolder("default", this, new DefaultLockTypedProperties(new Properties()))); } + @Override + public Optional<DistributedLock> getDistributedLock(final String lockKey) { + return Optional.empty(); + } + @Override public void delete(final String key) { }