This is an automated email from the ASF dual-hosted git repository. menghaoran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new f4cbd19f197 Optimize log for inter mutex lock in cluster mode (#17807) f4cbd19f197 is described below commit f4cbd19f1970938761670d57ff0f0d9fc1766fb6 Author: gin <jacky7...@163.com> AuthorDate: Thu May 19 19:16:58 2022 +0800 Optimize log for inter mutex lock in cluster mode (#17807) --- .../lock/manager/DistributeLockManager.java | 5 ++ .../coordinator/lock/mutex/InterMutexLock.java | 72 ++++++++++++---------- .../mutex/ShardingSphereDistributeMutexLock.java | 13 ++-- 3 files changed, 55 insertions(+), 35 deletions(-) diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/manager/DistributeLockManager.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/manager/DistributeLockManager.java index 94af0556a10..24b3550d22a 100644 --- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/manager/DistributeLockManager.java +++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/manager/DistributeLockManager.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.manager; import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.infra.lock.ShardingSphereLock; import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.database.ShardingSphereDistributeDatabaseLock; import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.mutex.InterReentrantMutexLock; @@ -28,6 +29,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.Time /** * Distribute lock manager. */ +@Slf4j public final class DistributeLockManager implements ShardingSphereLockManager { private InterReentrantMutexLock sequencedLock; @@ -61,12 +63,15 @@ public final class DistributeLockManager implements ShardingSphereLockManager { private synchronized boolean innerDatabaseTryLock(final String databaseName, final long timeoutMilliseconds) { Preconditions.checkNotNull(databaseName, "Try Lock write for database args database name can not be null."); if (!sequencedLock.tryLock(TimeoutMilliseconds.DEFAULT_REGISTRY)) { + log.debug("Distribute database lock acquire sequenced failed, database name: {}", databaseName); return false; } try { + log.debug("Distribute database lock acquire sequenced success, database name: {}", databaseName); return databaseLock.tryLock(databaseName, timeoutMilliseconds - TimeoutMilliseconds.DEFAULT_REGISTRY); } finally { sequencedLock.unlock(); + log.debug("Distribute database lock release sequenced success, database name: {}", databaseName); } } diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/mutex/InterMutexLock.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/mutex/InterMutexLock.java index 4729c8a23d3..a2641bf9b60 100644 --- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/mutex/InterMutexLock.java +++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/mutex/InterMutexLock.java @@ -65,41 +65,44 @@ public final class InterMutexLock implements MutexLock, LockAckAble { private boolean innerTryLock(final String lockName, final long timeout) { if (!synchronizedLockState.compareAndSet(LockState.UNLOCKED, LockState.LOCKING)) { - log.debug("Inner try Lock failed, lockName: {} is locking", lockName); + log.debug("Inter mutex lock try Lock set lock state failed, lock name: {}, lock state: {}", lockName, synchronizedLockState.get().name()); return false; } if (!isOwner.compareAndSet(false, true)) { - log.debug("Inner try Lock set owner failed, lockName: {}", lockName); + log.debug("Inter mutex lock try Lock set lock owner failed, lock name: {}, lock is owner: {}", lockName, isOwner.get()); return false; } if (acquire(lockName, timeout)) { if (synchronizedLockState.compareAndSet(LockState.LOCKING, LockState.LOCKED)) { - log.debug("Inner try Lock locked success, lockName: {}", lockName); + log.debug("Inter mutex lock try Lock acquire lock success, lock name: {}", lockName); return true; } } reSetLock(); - log.debug("Inner try Lock locked failed, lockName: {}", lockName); + log.debug("Inter mutex lock try Lock acquire lock failed, lock name: {}", lockName); return false; } private boolean acquire(final String lockName, final long timeout) { - long consumeTime = 0; String currentInstanceId = getCurrentInstanceId(); - do { - boolean isLocked = lockService.tryLock(lockName, TimeoutMilliseconds.DEFAULT_REGISTRY); - consumeTime += TimeoutMilliseconds.DEFAULT_REGISTRY; - if (isLocked) { - lockedInstances.add(currentInstanceId); - if (isAckOK(timeout - consumeTime)) { - return true; - } else { - lockedInstances.remove(currentInstanceId); - return false; - } + long acquireStart = System.currentTimeMillis(); + boolean isLocked = lockService.tryLock(lockName, timeout); + if (isLocked) { + lockedInstances.add(currentInstanceId); + long acquireEnd = System.currentTimeMillis(); + long acquireExpend = acquireEnd - acquireStart; + log.debug("inter mutex lock acquire lock success then await for ack, lock name: {}, expend time millis {}ms", lockName, acquireExpend); + if (isAckOK(timeout - acquireExpend)) { + long ackExpend = System.currentTimeMillis() - acquireEnd; + log.debug("inter mutex lock acquire lock success and ack success, lock name: {}, expend time millis {}ms", lockName, ackExpend); + return true; + } else { + lockService.releaseLock(lockName); + lockedInstances.remove(currentInstanceId); + return false; } - } while (timeout > consumeTime); - log.debug("inner try lock acquire timeout, lockName={}", lockName); + } + log.debug("inter mutex lock acquire lock timeout. lock name: {}, timeout millis {}ms", lockName, timeout); return false; } @@ -121,7 +124,7 @@ public final class InterMutexLock implements MutexLock, LockAckAble { sleepInterval(); expend += TimeoutMilliseconds.DEFAULT_REGISTRY; } while (timeout > expend); - log.debug("is lock ack OK timeout"); + log.debug("inter mutex ack lock timeout, timeout millis {}ms", timeout); return false; } @@ -149,19 +152,20 @@ public final class InterMutexLock implements MutexLock, LockAckAble { @Override public void unlock() { - if (LockState.LOCKED == synchronizedLockState.get()) { - log.debug("release lock, lockName={}", lockName); - String currentInstanceId = getCurrentInstanceId(); + LockState lockState = synchronizedLockState.get(); + if (LockState.LOCKED == lockState) { + log.debug("inter mutex lock unlock. lock name: {}", lockName); if (isOwner.get()) { lockService.releaseLock(lockName); - lockedInstances.remove(currentInstanceId); + lockedInstances.remove(getCurrentInstanceId()); reSetLock(); + log.debug("inter mutex lock owner lock release lock success. lock name: {}", lockName); return; } lockService.removeLock(lockName); - releaseAckLock(lockName, currentInstanceId); + log.debug("inter mutex lock not owner remove lock success. lock name: {}", lockName); } - log.debug("release lock, state is not locked, ignore, lockName={}", lockName); + log.debug("inter mutex lock ignore unlock, lock name: {} lock state: {}", lockName, lockState); } @Override @@ -171,31 +175,37 @@ public final class InterMutexLock implements MutexLock, LockAckAble { @Override public void ackLock(final String ackLockName, final String lockedInstanceId) { - if (!isOwner.get() && LockState.UNLOCKED == synchronizedLockState.get()) { + LockState lockState = synchronizedLockState.get(); + boolean owner = isOwner.get(); + if (!owner && LockState.UNLOCKED == lockState) { lockService.ackLock(ackLockName, lockedInstanceId); lockedInstances.add(lockedInstanceId); synchronizedLockState.compareAndSet(LockState.UNLOCKED, LockState.LOCKED); + log.debug("inter mutex lock ack lock success, ack lock name: {}", ackLockName); } + log.debug("inter mutex lock ignore ack lock, ack lock name: {}, lock state: {}, lock owner: {}", ackLockName, lockState, owner); } @Override public void releaseAckLock(final String ackLockName, final String lockedInstanceId) { - if (!isOwner.get()) { + boolean owner = isOwner.get(); + if (!owner) { lockService.releaseAckLock(ackLockName); - } else { - isOwner.compareAndSet(true, false); + log.debug("inter mutex lock not owner release ack lock success, ack lock name: {}, locked instanceId: {}", ackLockName, lockedInstanceId); } - lockedInstances.remove(getCurrentInstanceId()); - synchronizedLockState.compareAndSet(LockState.LOCKED, LockState.UNLOCKED); + lockedInstances.remove(lockedInstanceId); + reSetLock(); } @Override public void addLockedInstance(final String lockedInstanceId) { lockedInstances.add(lockedInstanceId); + log.debug("inter mutex lock add locked instance id, id: {}", lockedInstanceId); } @Override public void removeLockedInstance(final String lockedInstanceId) { lockedInstances.remove(lockedInstanceId); + log.debug("inter mutex lock remove locked instance id, id: {}", lockedInstanceId); } } diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/mutex/ShardingSphereDistributeMutexLock.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/mutex/ShardingSphereDistributeMutexLock.java index d77ff2c47e7..60139ae02cb 100644 --- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/mutex/ShardingSphereDistributeMutexLock.java +++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/mutex/ShardingSphereDistributeMutexLock.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.mutex; import com.google.common.eventbus.Subscribe; +import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus; import org.apache.shardingsphere.infra.lock.ShardingSphereLock; import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockNodeService; @@ -32,17 +33,18 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.util.Time /** * Distribute mutex lock of ShardingSphere. */ +@Slf4j public final class ShardingSphereDistributeMutexLock implements ShardingSphereLock { private final LockNodeService lockNodeService = LockNodeServiceFactory.getInstance().getLockNodeService(LockNodeType.MUTEX); - private final MutexLock sequencedLock; + private final MutexLock sequenced; private final ShardingSphereInterMutexLockHolder lockHolder; public ShardingSphereDistributeMutexLock(final ShardingSphereInterMutexLockHolder lockHolder) { this.lockHolder = lockHolder; - this.sequencedLock = lockHolder.getInterReentrantMutexLock(lockNodeService.getSequenceNodePath()); + this.sequenced = lockHolder.getInterReentrantMutexLock(lockNodeService.getSequenceNodePath()); ShardingSphereEventBus.getInstance().register(this); lockHolder.synchronizeMutexLock(lockNodeService); } @@ -58,13 +60,16 @@ public final class ShardingSphereDistributeMutexLock implements ShardingSphereLo } private boolean innerTryLock(final String lockName, final long timeoutMillis) { - if (!sequencedLock.tryLock(TimeoutMilliseconds.DEFAULT_REGISTRY)) { + if (!sequenced.tryLock(TimeoutMilliseconds.DEFAULT_REGISTRY)) { + log.debug("Distribute mutex lock acquire sequenced failed, lock name: {}", lockName); return false; } try { + log.debug("Distribute mutex lock acquire sequenced success, lock name: {}", lockName); return getInterMutexLock(lockName).tryLock(timeoutMillis); } finally { - sequencedLock.unlock(); + sequenced.unlock(); + log.debug("Distribute mutex lock release sequenced success, lock name: {}", lockName); } }