This is an automated email from the ASF dual-hosted git repository. jiangmaolin pushed a commit to branch dev-5.5.1 in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
commit 0aa118a1e4985ba72bbbbf660ee49e93344c3584 Author: Haoran Meng <[email protected]> AuthorDate: Wed Nov 6 20:15:04 2024 +0800 Add read write lock for MetaDataContextsFactory --- .../shardingsphere/infra/lock/GlobalLockNames.java | 3 ++ .../shardingsphere/infra/lock/LockContext.java | 48 ++++++++++++++++++ .../mode/lock/SphereExLockContext.java | 20 ++++++++ .../mode/lock/GlobalLockContext.java | 25 ++++++++++ .../mode/lock/LockPersistService.java | 48 ++++++++++++++++++ .../mode/metadata/MetaDataContextsFactory.java | 42 ++++++++++++++-- .../mode/metadata/MetaDataContextsFactoryTest.java | 11 ++++ .../cluster/lock/GlobalLockPersistService.java | 25 ++++++++++ .../fixture/DistributedLockCreatorFixture.java} | 23 ++++----- .../cluster/fixture/DistributedLockFixture.java} | 41 +++++++++------ .../fixture/ClusterPersistRepositoryFixture.java | 4 +- ...ory.cluster.lock.creator.DistributedLockCreator | 18 +++++++ .../fixture/DistributedLockCreatorFixture.java} | 23 ++++----- .../fixture/DistributedLockFixture.java} | 41 +++++++++------ .../EnterpriseClusterPersistRepositoryFixture.java | 2 +- ...ory.cluster.lock.creator.DistributedLockCreator | 18 +++++++ .../repository/cluster/lock/DistributedLock.java | 46 +++++++++++++++++ .../zookeeper/lock/ZookeeperDistributedLock.java | 58 ++++++++++++++++++++++ 18 files changed, 436 insertions(+), 60 deletions(-) diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java index e9f2bab1664..adc4d411e17 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/GlobalLockNames.java @@ -36,6 +36,9 @@ public enum GlobalLockNames { @SphereEx SYNCING_DDL("syncing_ddl_%s"), + @SphereEx + STARTUP_READ_WRITE_LOCK("startup_read_write_lock"), + @SphereEx UPDATE_METADATA("update_metadata"); diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java index 450bc7c94a0..8b4faaf18e2 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/lock/LockContext.java @@ -53,4 +53,52 @@ public interface LockContext<T extends LockDefinition> { default boolean isLocked(LockDefinition lockDefinition) { throw new UnsupportedOperationException("LockContext.isLocked()"); } + + /** + * Try read lock. + * + * @param lockDefinition lock definition + * @param timeoutMillis timeout millis + * @return is locked or not + * @throws UnsupportedOperationException unsupported operation exception + */ + @SphereEx + default boolean tryReadLock(T lockDefinition, long timeoutMillis) { + throw new UnsupportedOperationException(); + } + + /** + * Try write lock. + * + * @param lockDefinition lock definition + * @param timeoutMillis timeout millis + * @return is locked or not + * @throws UnsupportedOperationException unsupported operation exception + */ + @SphereEx + default boolean tryWriteLock(T lockDefinition, long timeoutMillis) { + throw new UnsupportedOperationException(); + } + + /** + * Unlock read. + * + * @param lockDefinition lock definition + * @throws UnsupportedOperationException unsupported operation exception + */ + @SphereEx + default void unlockRead(T lockDefinition) { + throw new UnsupportedOperationException(); + } + + /** + * Unlock write. + * + * @param lockDefinition lock definition + * @throws UnsupportedOperationException unsupported operation exception + */ + @SphereEx + default void unlockWrite(T lockDefinition) { + throw new UnsupportedOperationException(); + } } diff --git a/mode/core/src/main/java/com/sphereex/dbplusengine/mode/lock/SphereExLockContext.java b/mode/core/src/main/java/com/sphereex/dbplusengine/mode/lock/SphereExLockContext.java index 61460b969f7..e6560e9d696 100644 --- a/mode/core/src/main/java/com/sphereex/dbplusengine/mode/lock/SphereExLockContext.java +++ b/mode/core/src/main/java/com/sphereex/dbplusengine/mode/lock/SphereExLockContext.java @@ -68,4 +68,24 @@ public final class SphereExLockContext implements LockContext { ShardingSpherePreconditions.checkState(lockDefinition instanceof ResourceLockDefinition, () -> new UnsupportedOperationException("LockContext.isLocked()")); return resourceLockManager.isLocked((ResourceLockDefinition) lockDefinition); } + + @Override + public boolean tryReadLock(final LockDefinition lockDefinition, final long timeoutMillis) { + return globalLockPersistService.tryReadLock((GlobalLockDefinition) lockDefinition, timeoutMillis); + } + + @Override + public boolean tryWriteLock(final LockDefinition lockDefinition, final long timeoutMillis) { + return globalLockPersistService.tryWriteLock((GlobalLockDefinition) lockDefinition, timeoutMillis); + } + + @Override + public void unlockRead(final LockDefinition lockDefinition) { + globalLockPersistService.unlockRead((GlobalLockDefinition) lockDefinition); + } + + @Override + public void unlockWrite(final LockDefinition lockDefinition) { + globalLockPersistService.unlockWrite((GlobalLockDefinition) lockDefinition); + } } diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/GlobalLockContext.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/GlobalLockContext.java index 59d07d24d6d..76fafe3b10e 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/GlobalLockContext.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/GlobalLockContext.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.mode.lock; +import com.sphereex.dbplusengine.SphereEx; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.lock.LockContext; @@ -37,4 +38,28 @@ public final class GlobalLockContext implements LockContext<GlobalLockDefinition public void unlock(final GlobalLockDefinition lockDefinition) { globalLockPersistService.unlock(lockDefinition); } + + @SphereEx + @Override + public boolean tryReadLock(final GlobalLockDefinition lockDefinition, final long timeoutMillis) { + return globalLockPersistService.tryReadLock(lockDefinition, timeoutMillis); + } + + @SphereEx + @Override + public boolean tryWriteLock(final GlobalLockDefinition lockDefinition, final long timeoutMillis) { + return globalLockPersistService.tryWriteLock(lockDefinition, timeoutMillis); + } + + @SphereEx + @Override + public void unlockRead(final GlobalLockDefinition lockDefinition) { + globalLockPersistService.unlockRead(lockDefinition); + } + + @SphereEx + @Override + public void unlockWrite(final GlobalLockDefinition lockDefinition) { + globalLockPersistService.unlockWrite(lockDefinition); + } } diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/LockPersistService.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/LockPersistService.java index ee730411dfc..1d56ea03d3d 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/LockPersistService.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/LockPersistService.java @@ -55,4 +55,52 @@ public interface LockPersistService<T extends LockDefinition> { default Collection<T> loadLocks() { return Collections.emptyList(); } + + /** + * Try read lock. + * + * @param lockDefinition lock definition + * @param timeoutMillis timeout millis + * @return is locked or not + * @throws UnsupportedOperationException unsupported operation exception + */ + @SphereEx + default boolean tryReadLock(T lockDefinition, long timeoutMillis) { + throw new UnsupportedOperationException(); + } + + /** + * Try write lock. + * + * @param lockDefinition lock definition + * @param timeoutMillis timeout millis + * @return is locked or not + * @throws UnsupportedOperationException unsupported operation exception + */ + @SphereEx + default boolean tryWriteLock(T lockDefinition, long timeoutMillis) { + throw new UnsupportedOperationException(); + } + + /** + * Unlock read. + * + * @param lockDefinition lock definition + * @throws UnsupportedOperationException unsupported operation exception + */ + @SphereEx + default void unlockRead(T lockDefinition) { + throw new UnsupportedOperationException(); + } + + /** + * Unlock write. + * + * @param lockDefinition lock definition + * @throws UnsupportedOperationException unsupported operation exception + */ + @SphereEx + default void unlockWrite(T lockDefinition) { + throw new UnsupportedOperationException(); + } } diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactory.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactory.java index c2930ed3a2b..d57b4f87dbe 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactory.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactory.java @@ -22,6 +22,7 @@ import com.google.common.base.Splitter; import com.sphereex.dbplusengine.SphereEx; import com.sphereex.dbplusengine.SphereEx.Type; import com.sphereex.dbplusengine.authority.rule.AuthorityRuleConfigurationPersistDecorator; +import com.sphereex.dbplusengine.infra.exception.lock.LockWaitTimeoutException; import com.sphereex.dbplusengine.infra.state.datasource.DataSourceStateManager; import com.sphereex.dbplusengine.mode.metadata.transaction.TransactionConfigurationManager; import com.sphereex.dbplusengine.transaction.spi.TransactionConfigurationGenerator; @@ -44,6 +45,8 @@ import org.apache.shardingsphere.infra.datasource.pool.config.DataSourceConfigur import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer; import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; +import org.apache.shardingsphere.infra.lock.GlobalLockNames; +import org.apache.shardingsphere.infra.lock.GlobalLockNames; import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData; @@ -64,6 +67,8 @@ import org.apache.shardingsphere.infra.state.datasource.qualified.QualifiedDataS import org.apache.shardingsphere.metadata.factory.ExternalMetaDataFactory; import org.apache.shardingsphere.metadata.factory.InternalMetaDataFactory; import org.apache.shardingsphere.metadata.persist.MetaDataPersistService; +import org.apache.shardingsphere.mode.lock.GlobalLockDefinition; +import org.apache.shardingsphere.mode.lock.GlobalLockDefinition; import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter; import org.apache.shardingsphere.mode.metadata.decorator.RuleConfigurationPersistDecorateEngine; import org.apache.shardingsphere.mode.metadata.manager.SwitchingResource; @@ -117,9 +122,11 @@ public final class MetaDataContextsFactory { @SphereEx public static MetaDataContexts create(final MetaDataPersistService persistService, final ContextManagerBuilderParameter param, final ComputeNodeInstanceContext computeNodeInstanceContext, @SphereEx final Map<String, QualifiedDataSourceState> statusMap) throws SQLException { - return persistService.getDatabaseMetaDataFacade().getDatabase().loadAllDatabaseNames().isEmpty() + // SPEX CHANGED: BEGIN + return isCreateByLocal(persistService, computeNodeInstanceContext) ? createByLocal(persistService, param, computeNodeInstanceContext, statusMap) : createByRepository(persistService, param, computeNodeInstanceContext, statusMap); + // SPEX CHANGED: END } /** @@ -133,6 +140,23 @@ public final class MetaDataContextsFactory { return new MetaDataContexts(metaData, initStatistics(persistService, metaData)); } + @SphereEx + private static boolean isCreateByLocal(final MetaDataPersistService persistService, final ComputeNodeInstanceContext instanceContext) { + boolean isCreateByLocal; + GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockNames.STARTUP_READ_WRITE_LOCK.getLockName()); + if (instanceContext.getModeConfiguration().isCluster() && !instanceContext.getLockContext().tryReadLock(lockDefinition, 60000L)) { + throw new LockWaitTimeoutException(GlobalLockNames.STARTUP_READ_WRITE_LOCK, 60000L); + } + try { + isCreateByLocal = persistService.getDatabaseMetaDataFacade().getDatabase().loadAllDatabaseNames().isEmpty(); + } finally { + if (instanceContext.getModeConfiguration().isCluster()) { + instanceContext.getLockContext().unlockRead(lockDefinition); + } + } + return isCreateByLocal; + } + private static MetaDataContexts createByLocal(final MetaDataPersistService persistService, final ContextManagerBuilderParameter param, final ComputeNodeInstanceContext computeNodeInstanceContext, @SphereEx final Map<String, QualifiedDataSourceState> statusMap) throws SQLException { Map<String, DatabaseConfiguration> effectiveDatabaseConfigs = param.getDatabaseConfigs(); @@ -156,9 +180,19 @@ public final class MetaDataContextsFactory { // SPEX ADDED: END MetaDataContexts result = newMetaDataContexts(persistService, param, globalRuleConfigs, databases, props); // SPEX CHANGED: BEGIN - persistDatabaseConfigurations(result, param, persistService, computeNodeInstanceContext, getPasswordEncryptProperties()); + GlobalLockDefinition lockDefinition = new GlobalLockDefinition(GlobalLockNames.STARTUP_READ_WRITE_LOCK.getLockName()); + if (computeNodeInstanceContext.getModeConfiguration().isCluster() && !computeNodeInstanceContext.getLockContext().tryWriteLock(lockDefinition, 60000L)) { + throw new LockWaitTimeoutException(GlobalLockNames.STARTUP_READ_WRITE_LOCK, 60000L); + } + try { + persistDatabaseConfigurations(result, param, persistService, computeNodeInstanceContext, getPasswordEncryptProperties()); + persistMetaData(result, persistService); + } finally { + if (computeNodeInstanceContext.getModeConfiguration().isCluster()) { + computeNodeInstanceContext.getLockContext().unlockWrite(lockDefinition); + } + } // SPEX CHANGED: END - persistMetaData(result, persistService); return result; } @@ -168,7 +202,7 @@ public final class MetaDataContextsFactory { result.put(AuthorityRuleConfigurationPersistDecorator.ENCRYPT_UNIQUE_VALUE_KEY, new ShardingSphereRowData(Collections.singletonList(ShardingSphereVersion.VERSION)).getUniqueKey()); return result; } - + private static MetaDataContexts createByRepository(final MetaDataPersistService persistService, final ContextManagerBuilderParameter param, final ComputeNodeInstanceContext computeNodeInstanceContext, @SphereEx final Map<String, QualifiedDataSourceState> statusMap) { @SphereEx diff --git a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactoryTest.java b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactoryTest.java index d3a27ef3ac2..1ebdd17fdec 100644 --- a/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactoryTest.java +++ b/mode/core/src/test/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactoryTest.java @@ -33,6 +33,7 @@ import org.apache.shardingsphere.metadata.persist.service.config.database.Databa import org.apache.shardingsphere.metadata.persist.service.config.global.GlobalRulePersistService; import org.apache.shardingsphere.metadata.persist.service.config.global.PropertiesPersistService; import org.apache.shardingsphere.metadata.persist.service.metadata.DatabaseMetaDataPersistFacade; +import org.apache.shardingsphere.mode.lock.GlobalLockDefinition; import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter; import org.apache.shardingsphere.test.fixture.infra.rule.MockedRule; import org.apache.shardingsphere.test.fixture.infra.rule.MockedRuleConfiguration; @@ -57,6 +58,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyCollection; @@ -114,6 +116,10 @@ class MetaDataContextsFactoryTest { when(computeNodeInstanceContext.getInstance().getMetaData()).thenReturn(mock(JDBCInstanceMetaData.class)); // SPEX ADDED: BEGIN when(metaDataPersistService.getDatabaseMetaDataFacade().getDatabase().loadAllDatabaseNames()).thenReturn(Collections.emptyList()); + // SPEX ADDED: BEGIN + when(computeNodeInstanceContext.getLockContext().tryReadLock(any(GlobalLockDefinition.class), eq(60000L))).thenReturn(true); + when(computeNodeInstanceContext.getLockContext().tryWriteLock(any(GlobalLockDefinition.class), eq(60000L))).thenReturn(true); + // SPEX ADDED: END // SPEX ADDED: END try (MetaDataContexts actual = MetaDataContextsFactory.create(metaDataPersistService, createContextManagerBuilderParameter(), computeNodeInstanceContext)) { // SPEX CHANGED: BEGIN @@ -127,6 +133,11 @@ class MetaDataContextsFactoryTest { @Test void assertCreateWithProxyInstanceMetaData() throws SQLException { + // SPEX ADDED: BEGIN + ComputeNodeInstanceContext instanceContext = mock(ComputeNodeInstanceContext.class, RETURNS_DEEP_STUBS); + when(instanceContext.getLockContext().tryReadLock(any(GlobalLockDefinition.class), eq(60000L))).thenReturn(true); + when(instanceContext.getLockContext().tryWriteLock(any(GlobalLockDefinition.class), eq(60000L))).thenReturn(true); + // SPEX ADDED: END when(databaseMetaDataPersistFacade.getDatabase().loadAllDatabaseNames()).thenReturn(Collections.singletonList("foo_db")); when(metaDataPersistService.getDatabaseMetaDataFacade()).thenReturn(databaseMetaDataPersistFacade); try (MetaDataContexts actual = MetaDataContextsFactory.create(metaDataPersistService, createContextManagerBuilderParameter(), mock(ComputeNodeInstanceContext.class, RETURNS_DEEP_STUBS))) { diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/GlobalLockPersistService.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/GlobalLockPersistService.java index b60670f4b58..2a36100c6d3 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/GlobalLockPersistService.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/GlobalLockPersistService.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.mode.manager.cluster.lock; +import com.sphereex.dbplusengine.SphereEx; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.mode.lock.GlobalLockDefinition; import org.apache.shardingsphere.mode.lock.LockPersistService; @@ -39,4 +40,28 @@ public final class GlobalLockPersistService implements LockPersistService<Global public void unlock(final GlobalLockDefinition lockDefinition) { repository.getDistributedLockHolder().getDistributedLock(lockDefinition.getLockKey()).unlock(); } + + @SphereEx + @Override + public boolean tryReadLock(final GlobalLockDefinition lockDefinition, final long timeoutMillis) { + return repository.getDistributedLockHolder().getDistributedLock(lockDefinition.getLockKey()).tryReadLock(timeoutMillis); + } + + @SphereEx + @Override + public boolean tryWriteLock(final GlobalLockDefinition lockDefinition, final long timeoutMillis) { + return repository.getDistributedLockHolder().getDistributedLock(lockDefinition.getLockKey()).tryWriteLock(timeoutMillis); + } + + @SphereEx + @Override + public void unlockRead(final GlobalLockDefinition lockDefinition) { + repository.getDistributedLockHolder().getDistributedLock(lockDefinition.getLockKey()).unlockRead(); + } + + @SphereEx + @Override + public void unlockWrite(final GlobalLockDefinition lockDefinition) { + repository.getDistributedLockHolder().getDistributedLock(lockDefinition.getLockKey()).unlockWrite(); + } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/GlobalLockPersistService.java b/mode/type/cluster/core/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/fixture/DistributedLockCreatorFixture.java similarity index 53% copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/GlobalLockPersistService.java copy to mode/type/cluster/core/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/fixture/DistributedLockCreatorFixture.java index b60670f4b58..0f2c29cde7e 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/GlobalLockPersistService.java +++ b/mode/type/cluster/core/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/fixture/DistributedLockCreatorFixture.java @@ -15,28 +15,25 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.manager.cluster.lock; +package com.sphereex.dbplusengine.mode.manager.cluster.fixture; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.mode.lock.GlobalLockDefinition; -import org.apache.shardingsphere.mode.lock.LockPersistService; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; +import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; +import org.apache.shardingsphere.mode.repository.cluster.lock.creator.DistributedLockCreator; +import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties; /** - * Global lock persist service. + * Distributed lock creator fixture. */ -@RequiredArgsConstructor -public final class GlobalLockPersistService implements LockPersistService<GlobalLockDefinition> { - - private final ClusterPersistRepository repository; +public final class DistributedLockCreatorFixture implements DistributedLockCreator<ClusterPersistRepository, DefaultLockTypedProperties> { @Override - public boolean tryLock(final GlobalLockDefinition lockDefinition, final long timeoutMillis) { - return repository.getDistributedLockHolder().getDistributedLock(lockDefinition.getLockKey()).tryLock(timeoutMillis); + public DistributedLock create(final String lockKey, final ClusterPersistRepository client, final DefaultLockTypedProperties props) { + return new DistributedLockFixture(); } @Override - public void unlock(final GlobalLockDefinition lockDefinition) { - repository.getDistributedLockHolder().getDistributedLock(lockDefinition.getLockKey()).unlock(); + public String getType() { + return "FIXTURE"; } } diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/GlobalLockContext.java b/mode/type/cluster/core/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/fixture/DistributedLockFixture.java similarity index 55% copy from mode/core/src/main/java/org/apache/shardingsphere/mode/lock/GlobalLockContext.java copy to mode/type/cluster/core/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/fixture/DistributedLockFixture.java index 59d07d24d6d..6bba08ed7b7 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/GlobalLockContext.java +++ b/mode/type/cluster/core/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/fixture/DistributedLockFixture.java @@ -15,26 +15,39 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.lock; +package com.sphereex.dbplusengine.mode.manager.cluster.fixture; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.lock.LockContext; +import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; /** - * Global lock context. + * Distributed lock fixture. */ -@RequiredArgsConstructor -public final class GlobalLockContext implements LockContext<GlobalLockDefinition> { - - private final LockPersistService<GlobalLockDefinition> globalLockPersistService; - +public final class DistributedLockFixture implements DistributedLock { + + @Override + public boolean tryLock(final long timeoutMillis) { + return true; + } + + @Override + public void unlock() { + } + @Override - public boolean tryLock(final GlobalLockDefinition lockDefinition, final long timeoutMillis) { - return globalLockPersistService.tryLock(lockDefinition, timeoutMillis); + public boolean tryReadLock(final long timeoutMillis) { + return true; } - + + @Override + public boolean tryWriteLock(final long timeoutMillis) { + return true; + } + + @Override + public void unlockRead() { + } + @Override - public void unlock(final GlobalLockDefinition lockDefinition) { - globalLockPersistService.unlock(lockDefinition); + public void unlockWrite() { } } diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/fixture/ClusterPersistRepositoryFixture.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/fixture/ClusterPersistRepositoryFixture.java index 83508fb9eea..e18857a8cc6 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/fixture/ClusterPersistRepositoryFixture.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/fixture/ClusterPersistRepositoryFixture.java @@ -64,7 +64,9 @@ public final class ClusterPersistRepositoryFixture implements ClusterPersistRepo @Override public DistributedLockHolder getDistributedLockHolder() { - return new DistributedLockHolder("default", this, new DefaultLockTypedProperties(new Properties())); + // SPEX CHANGED: END + return new DistributedLockHolder("FIXTURE", this, new DefaultLockTypedProperties(new Properties())); + // SPEX CHANGED: END } @Override diff --git a/mode/type/cluster/core/src/test/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.lock.creator.DistributedLockCreator b/mode/type/cluster/core/src/test/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.lock.creator.DistributedLockCreator new file mode 100644 index 00000000000..dca1aa2496b --- /dev/null +++ b/mode/type/cluster/core/src/test/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.lock.creator.DistributedLockCreator @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +com.sphereex.dbplusengine.mode.manager.cluster.fixture.DistributedLockCreatorFixture diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/GlobalLockPersistService.java b/mode/type/cluster/enterprise/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/enterprise/fixture/DistributedLockCreatorFixture.java similarity index 53% copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/GlobalLockPersistService.java copy to mode/type/cluster/enterprise/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/enterprise/fixture/DistributedLockCreatorFixture.java index b60670f4b58..a12985d1dbd 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/lock/GlobalLockPersistService.java +++ b/mode/type/cluster/enterprise/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/enterprise/fixture/DistributedLockCreatorFixture.java @@ -15,28 +15,25 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.manager.cluster.lock; +package com.sphereex.dbplusengine.mode.manager.cluster.enterprise.fixture; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.mode.lock.GlobalLockDefinition; -import org.apache.shardingsphere.mode.lock.LockPersistService; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; +import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; +import org.apache.shardingsphere.mode.repository.cluster.lock.creator.DistributedLockCreator; +import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties; /** - * Global lock persist service. + * Distributed lock creator fixture. */ -@RequiredArgsConstructor -public final class GlobalLockPersistService implements LockPersistService<GlobalLockDefinition> { - - private final ClusterPersistRepository repository; +public final class DistributedLockCreatorFixture implements DistributedLockCreator<ClusterPersistRepository, DefaultLockTypedProperties> { @Override - public boolean tryLock(final GlobalLockDefinition lockDefinition, final long timeoutMillis) { - return repository.getDistributedLockHolder().getDistributedLock(lockDefinition.getLockKey()).tryLock(timeoutMillis); + public DistributedLock create(final String lockKey, final ClusterPersistRepository client, final DefaultLockTypedProperties props) { + return new DistributedLockFixture(); } @Override - public void unlock(final GlobalLockDefinition lockDefinition) { - repository.getDistributedLockHolder().getDistributedLock(lockDefinition.getLockKey()).unlock(); + public String getType() { + return "FIXTURE"; } } diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/GlobalLockContext.java b/mode/type/cluster/enterprise/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/enterprise/fixture/DistributedLockFixture.java similarity index 55% copy from mode/core/src/main/java/org/apache/shardingsphere/mode/lock/GlobalLockContext.java copy to mode/type/cluster/enterprise/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/enterprise/fixture/DistributedLockFixture.java index 59d07d24d6d..8e3d8903f34 100644 --- a/mode/core/src/main/java/org/apache/shardingsphere/mode/lock/GlobalLockContext.java +++ b/mode/type/cluster/enterprise/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/enterprise/fixture/DistributedLockFixture.java @@ -15,26 +15,39 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.lock; +package com.sphereex.dbplusengine.mode.manager.cluster.enterprise.fixture; -import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.lock.LockContext; +import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; /** - * Global lock context. + * Distributed lock fixture. */ -@RequiredArgsConstructor -public final class GlobalLockContext implements LockContext<GlobalLockDefinition> { - - private final LockPersistService<GlobalLockDefinition> globalLockPersistService; - +public final class DistributedLockFixture implements DistributedLock { + + @Override + public boolean tryLock(final long timeoutMillis) { + return true; + } + + @Override + public void unlock() { + } + @Override - public boolean tryLock(final GlobalLockDefinition lockDefinition, final long timeoutMillis) { - return globalLockPersistService.tryLock(lockDefinition, timeoutMillis); + public boolean tryReadLock(final long timeoutMillis) { + return true; } - + + @Override + public boolean tryWriteLock(final long timeoutMillis) { + return true; + } + + @Override + public void unlockRead() { + } + @Override - public void unlock(final GlobalLockDefinition lockDefinition) { - globalLockPersistService.unlock(lockDefinition); + public void unlockWrite() { } } diff --git a/mode/type/cluster/enterprise/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/enterprise/fixture/EnterpriseClusterPersistRepositoryFixture.java b/mode/type/cluster/enterprise/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/enterprise/fixture/EnterpriseClusterPersistRepositoryFixture.java index 9414accd597..6e5e4c9402c 100644 --- a/mode/type/cluster/enterprise/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/enterprise/fixture/EnterpriseClusterPersistRepositoryFixture.java +++ b/mode/type/cluster/enterprise/src/test/java/com/sphereex/dbplusengine/mode/manager/cluster/enterprise/fixture/EnterpriseClusterPersistRepositoryFixture.java @@ -64,7 +64,7 @@ public final class EnterpriseClusterPersistRepositoryFixture implements ClusterP @Override public DistributedLockHolder getDistributedLockHolder() { - return new DistributedLockHolder("default", this, new DefaultLockTypedProperties(new Properties())); + return new DistributedLockHolder("FIXTURE", this, new DefaultLockTypedProperties(new Properties())); } @Override diff --git a/mode/type/cluster/enterprise/src/test/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.lock.creator.DistributedLockCreator b/mode/type/cluster/enterprise/src/test/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.lock.creator.DistributedLockCreator new file mode 100644 index 00000000000..27e23764961 --- /dev/null +++ b/mode/type/cluster/enterprise/src/test/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.lock.creator.DistributedLockCreator @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +com.sphereex.dbplusengine.mode.manager.cluster.enterprise.fixture.DistributedLockCreatorFixture diff --git a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/lock/DistributedLock.java b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/lock/DistributedLock.java index e0a4ab3a528..e97767c97ac 100644 --- a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/lock/DistributedLock.java +++ b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/lock/DistributedLock.java @@ -17,6 +17,8 @@ package org.apache.shardingsphere.mode.repository.cluster.lock; +import com.sphereex.dbplusengine.SphereEx; + /** * Distributed lock. */ @@ -34,4 +36,48 @@ public interface DistributedLock { * Unlock. */ void unlock(); + + /** + * Try read lock. + * + * @param timeoutMillis timeout millis + * @return is locked or not + * @throws UnsupportedOperationException unsupported operation exception + */ + @SphereEx + default boolean tryReadLock(long timeoutMillis) { + throw new UnsupportedOperationException(); + } + + /** + * Try write lock. + * + * @param timeoutMillis timeout millis + * @return is locked or not + * @throws UnsupportedOperationException unsupported operation exception + */ + @SphereEx + default boolean tryWriteLock(long timeoutMillis) { + throw new UnsupportedOperationException(); + } + + /** + * Unlock read. + * + * @throws UnsupportedOperationException unsupported operation exception + */ + @SphereEx + default void unlockRead() { + throw new UnsupportedOperationException(); + } + + /** + * Unlock write. + * + * @throws UnsupportedOperationException unsupported operation exception + */ + @SphereEx + default void unlockWrite() { + throw new UnsupportedOperationException(); + } } diff --git a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/lock/ZookeeperDistributedLock.java b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/lock/ZookeeperDistributedLock.java index 8cc65005cc7..a2b03284243 100644 --- a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/lock/ZookeeperDistributedLock.java +++ b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/lock/ZookeeperDistributedLock.java @@ -17,9 +17,11 @@ package org.apache.shardingsphere.mode.repository.cluster.zookeeper.lock; +import com.sphereex.dbplusengine.SphereEx; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; import org.apache.shardingsphere.mode.repository.cluster.lock.DistributedLock; import org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.ZookeeperExceptionHandler; @@ -32,8 +34,14 @@ public final class ZookeeperDistributedLock implements DistributedLock { private final InterProcessLock lock; + @SphereEx + private final InterProcessReadWriteLock readWriteLock; + public ZookeeperDistributedLock(final String lockKey, final CuratorFramework client) { lock = new InterProcessMutex(client, lockKey); + // SPEX ADDED: BEGIN + readWriteLock = new InterProcessReadWriteLock(client, lockKey); + // SPEX ADDED: END } @Override @@ -58,4 +66,54 @@ public final class ZookeeperDistributedLock implements DistributedLock { ZookeeperExceptionHandler.handleException(ex); } } + + @SphereEx + @Override + public boolean tryReadLock(final long timeoutMillis) { + try { + return readWriteLock.readLock().acquire(timeoutMillis, TimeUnit.MILLISECONDS); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + ZookeeperExceptionHandler.handleException(ex); + } + return false; + } + + @SphereEx + @Override + public boolean tryWriteLock(final long timeoutMillis) { + try { + return readWriteLock.writeLock().acquire(timeoutMillis, TimeUnit.MILLISECONDS); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + ZookeeperExceptionHandler.handleException(ex); + } + return false; + } + + @SphereEx + @Override + public void unlockRead() { + try { + readWriteLock.readLock().release(); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + ZookeeperExceptionHandler.handleException(ex); + } + } + + @SphereEx + @Override + public void unlockWrite() { + try { + readWriteLock.writeLock().release(); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + ZookeeperExceptionHandler.handleException(ex); + } + } }
