denis-chudov commented on code in PR #4540: URL: https://github.com/apache/ignite-3/pull/4540#discussion_r1813045997
########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java: ########## @@ -328,10 +357,331 @@ private LockState adjustLockState(LockState state, LockState v) { } } + private void track(UUID txId, Releasable val) { + txMap.compute(txId, (k, v) -> { + if (v == null) { + v = new ConcurrentLinkedQueue<>(); + } + + v.add(val); + + return v; + }); + } + + /** + * Create lock exception with given parameters. + * + * @param locker Locker. + * @param holder Lock holder. + * @return Lock exception. + */ + private static LockException lockException(UUID locker, UUID holder) { + return new LockException(ACQUIRE_LOCK_ERR, + "Failed to acquire a lock due to a possible deadlock [locker=" + locker + ", holder=" + holder + ']'); + } + /** - * A lock state. + * Create lock exception when lock holder is believed to be missing. + * + * @param locker Locker. + * @param holder Lock holder. + * @return Lock exception. + */ + private static LockException abandonedLockException(UUID locker, UUID holder) { + return new LockException(ACQUIRE_LOCK_ERR, + "Failed to acquire an abandoned lock due to a possible deadlock [locker=" + locker + ", holder=" + holder + ']'); + } + + /** + * Common interface for releasing transaction locks. */ - public class LockState { + interface Releasable { + /** + * Tries to release a lock. + * + * @param txId Tx id. + * @return {@code True} if lock state requires cleanup after release. + */ + boolean tryRelease(UUID txId); + + /** + * Gets associated lock key. + * + * @return Lock key. + */ + LockKey key(); + + /** + * Returns the lock which is held by given tx. + * + * @param txId Tx id. + * @return The lock or null if no lock exist. + */ + @Nullable Lock lock(UUID txId); + + /** + * Returns lock type. + * + * @return The type. + */ + boolean coarse(); + } + + /** + * Coarse lock. + */ + public class CoarseLockState implements Releasable { + private final StripedCompositeReadWriteLock stripedLock = new StripedCompositeReadWriteLock(CONCURRENCY); + + private final ConcurrentHashMap<UUID, Lock> ixlockOwners = new ConcurrentHashMap<>(); + private final Map<UUID, IgniteBiTuple<Lock, CompletableFuture<Lock>>> slockWaiters = new HashMap<>(); + private final ConcurrentHashMap<UUID, Lock> slockOwners = new ConcurrentHashMap<>(); + + private final LockKey lockKey; + + CoarseLockState(LockKey lockKey) { + this.lockKey = lockKey; + } + + @Override + public boolean tryRelease(UUID txId) { + Lock lock = lock(txId); + + release(lock); + + return false; + } + + @Override + public LockKey key() { + return lockKey; + } + + @Override + public Lock lock(UUID txId) { + Lock lock = ixlockOwners.get(txId); + + if (lock != null) { + return lock; + } + + int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY); + + stripedLock.readLock(idx).lock(); + + try { + return slockOwners.get(txId); + } finally { + stripedLock.readLock(idx).unlock(); + } + } + + @Override + public boolean coarse() { + return true; + } + + /** + * Acquires a lock. + * + * @param txId Tx id. + * @param lockKey Lock key. + * @param lockMode Lock mode. + * @return The future. + */ + public CompletableFuture<Lock> acquire(UUID txId, LockKey lockKey, LockMode lockMode) { + switch (lockMode) { + case S: + stripedLock.writeLock().lock(); + + try { + if (!ixlockOwners.isEmpty()) { + if (ixlockOwners.containsKey(txId)) { + // IX-locks can't be modified under the striped lock. + if (ixlockOwners.size() == 1) { + // Safe to upgrade. + track(txId, this); // Double track. + Lock lock = new Lock(lockKey, lockMode, txId); + slockOwners.putIfAbsent(txId, lock); + return completedFuture(lock); + } else { + // Attempt to upgrade to SIX in the presence of concurrent transactions. Deny lock attempt. + for (Lock lock : ixlockOwners.values()) { + if (!lock.txId().equals(txId)) { + return failedFuture(lockException(txId, lock.txId())); + } + } + } + + assert false : "Should not reach here"; + } + + UUID holderTx = ixlockOwners.keySet().iterator().next(); + CompletableFuture<Void> res = fireEvent(LOCK_CONFLICT, new LockEventParameters(txId, holderTx)); + if (res.isCompletedExceptionally()) { + return failedFuture(abandonedLockException(txId, holderTx)); + } + + track(txId, this); + + CompletableFuture<Lock> fut = new CompletableFuture<>(); + IgniteBiTuple<Lock, CompletableFuture<Lock>> prev = slockWaiters.putIfAbsent(txId, + new IgniteBiTuple<>(new Lock(lockKey, lockMode, txId), fut)); + return prev == null ? fut : prev.get2(); + } else { + Lock lock = new Lock(lockKey, lockMode, txId); + Lock prev = slockOwners.putIfAbsent(txId, lock); + + if (prev == null) { + track(txId, this); // Do not track on reenter. + } + + return completedFuture(lock); + } + } finally { + stripedLock.writeLock().unlock(); + } + + case IX: + int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY); + + while (true) { + // This will prevent IX to block S, giving deadlock avoidance. + boolean locked = stripedLock.readLock(idx).tryLock(); + + // Striped lock is held for short time. Can spin wait here. + if (!locked) { + Thread.onSpinWait(); + } else { + break; + } + } + + try { + if (!slockOwners.isEmpty()) { + if (slockOwners.containsKey(txId)) { + // S-locks can't be modified under the striped lock. + if (slockOwners.size() == 1) { + // Safe to upgrade. + track(txId, this); // Double track. + Lock lock = new Lock(lockKey, lockMode, txId); + ixlockOwners.putIfAbsent(txId, lock); + return completedFuture(lock); + } else { + // Attempt to upgrade to SIX in the presence of concurrent transactions. Deny lock attempt. + for (Lock lock : slockOwners.values()) { + if (!lock.txId().equals(txId)) { + return failedFuture(lockException(txId, lock.txId())); + } + } + } + + assert false : "Should not reach here"; + } + + UUID holderTx = slockOwners.keySet().iterator().next(); + CompletableFuture<Void> eventResult = fireEvent(LOCK_CONFLICT, new LockEventParameters(txId, holderTx)); + if (eventResult.isCompletedExceptionally()) { + return failedFuture(abandonedLockException(txId, holderTx)); + } else { + return failedFuture(lockException(txId, holderTx)); // IX locks never wait. + } + } else { + Lock lock = new Lock(lockKey, lockMode, txId); + Lock prev = ixlockOwners.putIfAbsent(txId, lock); // Avoid overwrite existing lock. + + if (prev == null) { + track(txId, this); // Do not track on reenter. + } + + return completedFuture(lock); + } + } finally { + stripedLock.readLock(idx).unlock(); + } + + default: + assert false : "Unsupported coarse lock mode: " + lockMode; + } + + return null; // Should not be here. + } + + /** + * Releases the lock. Should be called from {@link #releaseAll(UUID)}. + * + * @param lock The lock. + */ + public void release(@Nullable Lock lock) { + if (lock == null) { + return; + } + + switch (lock.lockMode()) { + case S: + stripedLock.writeLock().lock(); + + try { + Lock removed = slockOwners.remove(lock.txId()); + + assert removed != null : "Attempt to release not acquired lock: " + lock.txId(); + } finally { + stripedLock.writeLock().unlock(); + } + + break; + case IX: + int idx = Math.floorMod(spread(lock.txId().hashCode()), CONCURRENCY); + + Map<UUID, IgniteBiTuple<Lock, CompletableFuture<Lock>>> wakeups; + + stripedLock.readLock(idx).lock(); + + try { + var removed = ixlockOwners.remove(lock.txId()); + + assert removed != null : "Attempt to release not acquired lock: " + lock.txId(); + + if (slockWaiters.isEmpty()) { + return; // Nothing to do. + } + + if (!ixlockOwners.isEmpty()) { + assert slockOwners.isEmpty() || slockOwners.containsKey(lock.txId()); + + return; // Nothing to do. + } + + // No race here because no new locks can be acquired after releaseAll due to 2-phase locking protocol. + + // Promote waiters to owners. + wakeups = new HashMap<>(slockWaiters); + + slockWaiters.clear(); + + for (IgniteBiTuple<Lock, CompletableFuture<Lock>> value : wakeups.values()) { + slockOwners.put(value.getKey().txId(), value.getKey()); + } + } finally { + stripedLock.readLock(idx).unlock(); + } + + for (Entry<UUID, IgniteBiTuple<Lock, CompletableFuture<Lock>>> entry : wakeups.entrySet()) { + entry.getValue().get2().complete(entry.getValue().get1()); + } + + break; + default: + assert false : "Unsupported coarse unlock mode: " + lock.lockMode(); Review Comment: Agree, I should have checked more carefully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org