denis-chudov commented on code in PR #4540:
URL: https://github.com/apache/ignite-3/pull/4540#discussion_r1808276224


##########
modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * ReadWriteLock with striping mechanics. Compared to {@link 
ReentrantReadWriteLock} it has slightly improved performance of
+ * {@link ReadWriteLock#readLock()} operations at the cost of {@link 
ReadWriteLock#writeLock()} operations and memory consumption. It also
+ * supports reentrancy semantics like {@link ReentrantReadWriteLock}.
+ */
+public class StripedCompositeReadWriteLock implements ReadWriteLock {
+    /** Index generator. */
+    private static final AtomicInteger IDX_GEN = new AtomicInteger();
+
+    /** Index. */
+    private static final ThreadLocal<Integer> IDX = ThreadLocal.withInitial(() 
-> IDX_GEN.incrementAndGet());
+
+    /** Locks. */
+    private final ReentrantReadWriteLock[] locks;
+
+    /** Composite write lock. */
+    private final WriteLock writeLock;
+
+    /**
+     * Creates a new instance with given concurrency level.
+     *
+     * @param concurrencyLvl Number of internal read locks.
+     */
+    public StripedCompositeReadWriteLock(int concurrencyLvl) {
+        locks = new ReadLock[concurrencyLvl];
+
+        for (int i = 0; i < concurrencyLvl; i++) {
+            locks[i] = new ReadLock();
+        }
+
+        writeLock = new WriteLock();
+    }
+
+    /**
+     * Gets current index.
+     *
+     * @return Index of current thread stripe.
+     */
+    private int curIdx() {
+        int idx = IDX.get();
+
+        return idx % locks.length;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Lock readLock() {
+        return locks[curIdx()].readLock();
+    }
+
+    /**
+     * Get a lock by stripe.
+     *
+     * @param idx Stripe index.
+     * @return The lock.
+     */
+    public Lock readLock(int idx) {
+        return locks[idx].readLock();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Lock writeLock() {
+        return writeLock;
+    }
+
+    /**
+     * Queries if the write lock is held by the current thread.
+     *
+     * @return {@code true} if the current thread holds the write lock and 
{@code false} otherwise
+     */
+    public boolean isWriteLockedByCurrentThread() {
+        return locks[locks.length - 1].isWriteLockedByCurrentThread();
+    }
+
+    /**
+     * Queries the number of reentrant read holds on this lock by the current 
thread.  A reader thread has a hold on a lock for each lock
+     * action that is not matched by an unlock action.
+     *
+     * @return the number of holds on the read lock by the current thread, or 
zero if the read lock is not held by the current thread
+     */
+    public int getReadHoldCount() {
+        return locks[curIdx()].getReadHoldCount();
+    }
+
+    /**
+     * Read lock.
+     */
+    @SuppressWarnings("unused")
+    private static class ReadLock extends ReentrantReadWriteLock {
+        private long p0;
+
+        private long p1;
+
+        private long p2;
+
+        private long p3;
+
+        private long p4;
+
+        private long p5;
+
+        private long p6;
+
+        private long p7;

Review Comment:
   what is it for?)



##########
modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * ReadWriteLock with striping mechanics. Compared to {@link 
ReentrantReadWriteLock} it has slightly improved performance of
+ * {@link ReadWriteLock#readLock()} operations at the cost of {@link 
ReadWriteLock#writeLock()} operations and memory consumption. It also
+ * supports reentrancy semantics like {@link ReentrantReadWriteLock}.
+ */
+public class StripedCompositeReadWriteLock implements ReadWriteLock {

Review Comment:
   You added a new concurrent primitive without any tests, could you pls add 
some?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java:
##########
@@ -92,18 +113,30 @@ public void upsert() {
     public static void main(String[] args) throws RunnerException {
         Options opt = new OptionsBuilder()
                 .include(".*" + UpsertKvBenchmark.class.getSimpleName() + ".*")
+                // .jvmArgsAppend("-Djmh.executor=VIRTUAL")
+                // .addProfiler(JavaFlightRecorderProfiler.class, 
"configName=profile.jfc")

Review Comment:
   is it intended?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -328,10 +357,331 @@ private LockState adjustLockState(LockState state, 
LockState v) {
         }
     }
 
+    private void track(UUID txId, Releasable val) {
+        txMap.compute(txId, (k, v) -> {
+            if (v == null) {
+                v = new ConcurrentLinkedQueue<>();
+            }
+
+            v.add(val);
+
+            return v;
+        });
+    }
+
+    /**
+     * Create lock exception with given parameters.
+     *
+     * @param locker Locker.
+     * @param holder Lock holder.
+     * @return Lock exception.
+     */
+    private static LockException lockException(UUID locker, UUID holder) {
+        return new LockException(ACQUIRE_LOCK_ERR,
+                "Failed to acquire a lock due to a possible deadlock [locker=" 
+ locker + ", holder=" + holder + ']');
+    }
+
     /**
-     * A lock state.
+     * Create lock exception when lock holder is believed to be missing.
+     *
+     * @param locker Locker.
+     * @param holder Lock holder.
+     * @return Lock exception.
+     */
+    private static LockException abandonedLockException(UUID locker, UUID 
holder) {
+        return new LockException(ACQUIRE_LOCK_ERR,
+                "Failed to acquire an abandoned lock due to a possible 
deadlock [locker=" + locker + ", holder=" + holder + ']');
+    }
+
+    /**
+     * Common interface for releasing transaction locks.
      */
-    public class LockState {
+    interface Releasable {
+        /**
+         * Tries to release a lock.
+         *
+         * @param txId Tx id.
+         * @return {@code True} if lock state requires cleanup after release.
+         */
+        boolean tryRelease(UUID txId);
+
+        /**
+         * Gets associated lock key.
+         *
+         * @return Lock key.
+         */
+        LockKey key();
+
+        /**
+         * Returns the lock which is held by given tx.
+         *
+         * @param txId Tx id.
+         * @return The lock or null if no lock exist.
+         */
+        @Nullable Lock lock(UUID txId);
+
+        /**
+         * Returns lock type.
+         *
+         * @return The type.
+         */
+        boolean coarse();
+    }
+
+    /**
+     * Coarse lock.
+     */
+    public class CoarseLockState implements Releasable {
+        private final StripedCompositeReadWriteLock stripedLock = new 
StripedCompositeReadWriteLock(CONCURRENCY);
+
+        private final ConcurrentHashMap<UUID, Lock> ixlockOwners = new 
ConcurrentHashMap<>();
+        private final Map<UUID, IgniteBiTuple<Lock, CompletableFuture<Lock>>> 
slockWaiters = new HashMap<>();
+        private final ConcurrentHashMap<UUID, Lock> slockOwners = new 
ConcurrentHashMap<>();
+
+        private final LockKey lockKey;
+
+        CoarseLockState(LockKey lockKey) {
+            this.lockKey = lockKey;
+        }
+
+        @Override
+        public boolean tryRelease(UUID txId) {
+            Lock lock = lock(txId);
+
+            release(lock);
+
+            return false;
+        }
+
+        @Override
+        public LockKey key() {
+            return lockKey;
+        }
+
+        @Override
+        public Lock lock(UUID txId) {
+            Lock lock = ixlockOwners.get(txId);
+
+            if (lock != null) {
+                return lock;
+            }
+
+            int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY);
+
+            stripedLock.readLock(idx).lock();
+
+            try {
+                return slockOwners.get(txId);
+            } finally {
+                stripedLock.readLock(idx).unlock();
+            }
+        }
+
+        @Override
+        public boolean coarse() {
+            return true;
+        }
+
+        /**
+         * Acquires a lock.
+         *
+         * @param txId Tx id.
+         * @param lockKey Lock key.
+         * @param lockMode Lock mode.
+         * @return The future.
+         */
+        public CompletableFuture<Lock> acquire(UUID txId, LockKey lockKey, 
LockMode lockMode) {
+            switch (lockMode) {
+                case S:
+                    stripedLock.writeLock().lock();
+
+                    try {
+                        if (!ixlockOwners.isEmpty()) {
+                            if (ixlockOwners.containsKey(txId)) {
+                                // IX-locks can't be modified under the 
striped lock.
+                                if (ixlockOwners.size() == 1) {
+                                    // Safe to upgrade.
+                                    track(txId, this); // Double track.
+                                    Lock lock = new Lock(lockKey, lockMode, 
txId);
+                                    slockOwners.putIfAbsent(txId, lock);
+                                    return completedFuture(lock);
+                                } else {
+                                    // Attempt to upgrade to SIX in the 
presence of concurrent transactions. Deny lock attempt.
+                                    for (Lock lock : ixlockOwners.values()) {
+                                        if (!lock.txId().equals(txId)) {
+                                            return 
failedFuture(lockException(txId, lock.txId()));
+                                        }
+                                    }
+                                }
+
+                                assert false : "Should not reach here";
+                            }
+
+                            UUID holderTx = 
ixlockOwners.keySet().iterator().next();
+                            CompletableFuture<Void> res = 
fireEvent(LOCK_CONFLICT, new LockEventParameters(txId, holderTx));
+                            if (res.isCompletedExceptionally()) {
+                                return 
failedFuture(abandonedLockException(txId, holderTx));
+                            }
+
+                            track(txId, this);
+
+                            CompletableFuture<Lock> fut = new 
CompletableFuture<>();
+                            IgniteBiTuple<Lock, CompletableFuture<Lock>> prev 
= slockWaiters.putIfAbsent(txId,
+                                    new IgniteBiTuple<>(new Lock(lockKey, 
lockMode, txId), fut));
+                            return prev == null ? fut : prev.get2();
+                        } else {
+                            Lock lock = new Lock(lockKey, lockMode, txId);
+                            Lock prev = slockOwners.putIfAbsent(txId, lock);
+
+                            if (prev == null) {
+                                track(txId, this); // Do not track on reenter.
+                            }
+
+                            return completedFuture(lock);
+                        }
+                    } finally {
+                        stripedLock.writeLock().unlock();
+                    }
+
+                case IX:
+                    int idx = Math.floorMod(spread(txId.hashCode()), 
CONCURRENCY);
+
+                    while (true) {
+                        // This will prevent IX to block S, giving deadlock 
avoidance.
+                        boolean locked = stripedLock.readLock(idx).tryLock();
+
+                        // Striped lock is held for short time. Can spin wait 
here.
+                        if (!locked) {
+                            Thread.onSpinWait();
+                        } else {
+                            break;
+                        }
+                    }
+
+                    try {
+                        if (!slockOwners.isEmpty()) {
+                            if (slockOwners.containsKey(txId)) {
+                                // S-locks can't be modified under the striped 
lock.
+                                if (slockOwners.size() == 1) {
+                                    // Safe to upgrade.
+                                    track(txId, this); // Double track.
+                                    Lock lock = new Lock(lockKey, lockMode, 
txId);
+                                    ixlockOwners.putIfAbsent(txId, lock);
+                                    return completedFuture(lock);
+                                } else {
+                                    // Attempt to upgrade to SIX in the 
presence of concurrent transactions. Deny lock attempt.
+                                    for (Lock lock : slockOwners.values()) {
+                                        if (!lock.txId().equals(txId)) {
+                                            return 
failedFuture(lockException(txId, lock.txId()));
+                                        }
+                                    }
+                                }
+
+                                assert false : "Should not reach here";
+                            }
+
+                            UUID holderTx = 
slockOwners.keySet().iterator().next();
+                            CompletableFuture<Void> eventResult = 
fireEvent(LOCK_CONFLICT, new LockEventParameters(txId, holderTx));
+                            if (eventResult.isCompletedExceptionally()) {

Review Comment:
   pls add todo here: `// TODO: 
https://issues.apache.org/jira/browse/IGNITE-21153`



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -328,10 +357,331 @@ private LockState adjustLockState(LockState state, 
LockState v) {
         }
     }
 
+    private void track(UUID txId, Releasable val) {
+        txMap.compute(txId, (k, v) -> {
+            if (v == null) {
+                v = new ConcurrentLinkedQueue<>();
+            }
+
+            v.add(val);
+
+            return v;
+        });
+    }
+
+    /**
+     * Create lock exception with given parameters.
+     *
+     * @param locker Locker.
+     * @param holder Lock holder.
+     * @return Lock exception.
+     */
+    private static LockException lockException(UUID locker, UUID holder) {
+        return new LockException(ACQUIRE_LOCK_ERR,
+                "Failed to acquire a lock due to a possible deadlock [locker=" 
+ locker + ", holder=" + holder + ']');
+    }
+
     /**
-     * A lock state.
+     * Create lock exception when lock holder is believed to be missing.
+     *
+     * @param locker Locker.
+     * @param holder Lock holder.
+     * @return Lock exception.
+     */
+    private static LockException abandonedLockException(UUID locker, UUID 
holder) {
+        return new LockException(ACQUIRE_LOCK_ERR,
+                "Failed to acquire an abandoned lock due to a possible 
deadlock [locker=" + locker + ", holder=" + holder + ']');
+    }
+
+    /**
+     * Common interface for releasing transaction locks.
      */
-    public class LockState {
+    interface Releasable {
+        /**
+         * Tries to release a lock.
+         *
+         * @param txId Tx id.
+         * @return {@code True} if lock state requires cleanup after release.
+         */
+        boolean tryRelease(UUID txId);
+
+        /**
+         * Gets associated lock key.
+         *
+         * @return Lock key.
+         */
+        LockKey key();
+
+        /**
+         * Returns the lock which is held by given tx.
+         *
+         * @param txId Tx id.
+         * @return The lock or null if no lock exist.
+         */
+        @Nullable Lock lock(UUID txId);
+
+        /**
+         * Returns lock type.
+         *
+         * @return The type.
+         */
+        boolean coarse();
+    }
+
+    /**
+     * Coarse lock.
+     */
+    public class CoarseLockState implements Releasable {
+        private final StripedCompositeReadWriteLock stripedLock = new 
StripedCompositeReadWriteLock(CONCURRENCY);
+
+        private final ConcurrentHashMap<UUID, Lock> ixlockOwners = new 
ConcurrentHashMap<>();
+        private final Map<UUID, IgniteBiTuple<Lock, CompletableFuture<Lock>>> 
slockWaiters = new HashMap<>();
+        private final ConcurrentHashMap<UUID, Lock> slockOwners = new 
ConcurrentHashMap<>();
+
+        private final LockKey lockKey;
+
+        CoarseLockState(LockKey lockKey) {
+            this.lockKey = lockKey;
+        }
+
+        @Override
+        public boolean tryRelease(UUID txId) {
+            Lock lock = lock(txId);
+
+            release(lock);
+
+            return false;
+        }
+
+        @Override
+        public LockKey key() {
+            return lockKey;
+        }
+
+        @Override
+        public Lock lock(UUID txId) {
+            Lock lock = ixlockOwners.get(txId);
+
+            if (lock != null) {
+                return lock;
+            }
+
+            int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY);
+
+            stripedLock.readLock(idx).lock();
+
+            try {
+                return slockOwners.get(txId);
+            } finally {
+                stripedLock.readLock(idx).unlock();
+            }
+        }
+
+        @Override
+        public boolean coarse() {
+            return true;
+        }
+
+        /**
+         * Acquires a lock.
+         *
+         * @param txId Tx id.
+         * @param lockKey Lock key.
+         * @param lockMode Lock mode.
+         * @return The future.
+         */
+        public CompletableFuture<Lock> acquire(UUID txId, LockKey lockKey, 
LockMode lockMode) {
+            switch (lockMode) {
+                case S:
+                    stripedLock.writeLock().lock();
+
+                    try {
+                        if (!ixlockOwners.isEmpty()) {
+                            if (ixlockOwners.containsKey(txId)) {
+                                // IX-locks can't be modified under the 
striped lock.
+                                if (ixlockOwners.size() == 1) {
+                                    // Safe to upgrade.
+                                    track(txId, this); // Double track.
+                                    Lock lock = new Lock(lockKey, lockMode, 
txId);
+                                    slockOwners.putIfAbsent(txId, lock);
+                                    return completedFuture(lock);
+                                } else {
+                                    // Attempt to upgrade to SIX in the 
presence of concurrent transactions. Deny lock attempt.
+                                    for (Lock lock : ixlockOwners.values()) {
+                                        if (!lock.txId().equals(txId)) {
+                                            return 
failedFuture(lockException(txId, lock.txId()));
+                                        }
+                                    }
+                                }
+
+                                assert false : "Should not reach here";
+                            }
+
+                            UUID holderTx = 
ixlockOwners.keySet().iterator().next();
+                            CompletableFuture<Void> res = 
fireEvent(LOCK_CONFLICT, new LockEventParameters(txId, holderTx));
+                            if (res.isCompletedExceptionally()) {
+                                return 
failedFuture(abandonedLockException(txId, holderTx));
+                            }
+
+                            track(txId, this);
+
+                            CompletableFuture<Lock> fut = new 
CompletableFuture<>();
+                            IgniteBiTuple<Lock, CompletableFuture<Lock>> prev 
= slockWaiters.putIfAbsent(txId,
+                                    new IgniteBiTuple<>(new Lock(lockKey, 
lockMode, txId), fut));
+                            return prev == null ? fut : prev.get2();
+                        } else {
+                            Lock lock = new Lock(lockKey, lockMode, txId);
+                            Lock prev = slockOwners.putIfAbsent(txId, lock);
+
+                            if (prev == null) {
+                                track(txId, this); // Do not track on reenter.
+                            }
+
+                            return completedFuture(lock);
+                        }
+                    } finally {
+                        stripedLock.writeLock().unlock();
+                    }
+
+                case IX:
+                    int idx = Math.floorMod(spread(txId.hashCode()), 
CONCURRENCY);
+
+                    while (true) {
+                        // This will prevent IX to block S, giving deadlock 
avoidance.
+                        boolean locked = stripedLock.readLock(idx).tryLock();
+
+                        // Striped lock is held for short time. Can spin wait 
here.
+                        if (!locked) {
+                            Thread.onSpinWait();
+                        } else {
+                            break;
+                        }
+                    }
+
+                    try {
+                        if (!slockOwners.isEmpty()) {
+                            if (slockOwners.containsKey(txId)) {
+                                // S-locks can't be modified under the striped 
lock.
+                                if (slockOwners.size() == 1) {
+                                    // Safe to upgrade.
+                                    track(txId, this); // Double track.
+                                    Lock lock = new Lock(lockKey, lockMode, 
txId);
+                                    ixlockOwners.putIfAbsent(txId, lock);
+                                    return completedFuture(lock);
+                                } else {
+                                    // Attempt to upgrade to SIX in the 
presence of concurrent transactions. Deny lock attempt.
+                                    for (Lock lock : slockOwners.values()) {
+                                        if (!lock.txId().equals(txId)) {
+                                            return 
failedFuture(lockException(txId, lock.txId()));
+                                        }
+                                    }
+                                }
+
+                                assert false : "Should not reach here";
+                            }
+
+                            UUID holderTx = 
slockOwners.keySet().iterator().next();
+                            CompletableFuture<Void> eventResult = 
fireEvent(LOCK_CONFLICT, new LockEventParameters(txId, holderTx));
+                            if (eventResult.isCompletedExceptionally()) {
+                                return 
failedFuture(abandonedLockException(txId, holderTx));
+                            } else {
+                                return failedFuture(lockException(txId, 
holderTx)); // IX locks never wait.
+                            }
+                        } else {
+                            Lock lock = new Lock(lockKey, lockMode, txId);
+                            Lock prev = ixlockOwners.putIfAbsent(txId, lock); 
// Avoid overwrite existing lock.
+
+                            if (prev == null) {
+                                track(txId, this); // Do not track on reenter.
+                            }
+
+                            return completedFuture(lock);
+                        }
+                    } finally {
+                        stripedLock.readLock(idx).unlock();
+                    }
+
+                default:
+                    assert false : "Unsupported coarse lock mode: " + lockMode;
+            }
+
+            return null; // Should not be here.

Review Comment:
   let's add assertion here



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -328,10 +357,331 @@ private LockState adjustLockState(LockState state, 
LockState v) {
         }
     }
 
+    private void track(UUID txId, Releasable val) {
+        txMap.compute(txId, (k, v) -> {
+            if (v == null) {
+                v = new ConcurrentLinkedQueue<>();
+            }
+
+            v.add(val);
+
+            return v;
+        });
+    }
+
+    /**
+     * Create lock exception with given parameters.
+     *
+     * @param locker Locker.
+     * @param holder Lock holder.
+     * @return Lock exception.
+     */
+    private static LockException lockException(UUID locker, UUID holder) {
+        return new LockException(ACQUIRE_LOCK_ERR,
+                "Failed to acquire a lock due to a possible deadlock [locker=" 
+ locker + ", holder=" + holder + ']');
+    }
+
     /**
-     * A lock state.
+     * Create lock exception when lock holder is believed to be missing.
+     *
+     * @param locker Locker.
+     * @param holder Lock holder.
+     * @return Lock exception.
+     */
+    private static LockException abandonedLockException(UUID locker, UUID 
holder) {
+        return new LockException(ACQUIRE_LOCK_ERR,
+                "Failed to acquire an abandoned lock due to a possible 
deadlock [locker=" + locker + ", holder=" + holder + ']');
+    }
+
+    /**
+     * Common interface for releasing transaction locks.
      */
-    public class LockState {
+    interface Releasable {
+        /**
+         * Tries to release a lock.
+         *
+         * @param txId Tx id.
+         * @return {@code True} if lock state requires cleanup after release.
+         */
+        boolean tryRelease(UUID txId);
+
+        /**
+         * Gets associated lock key.
+         *
+         * @return Lock key.
+         */
+        LockKey key();
+
+        /**
+         * Returns the lock which is held by given tx.
+         *
+         * @param txId Tx id.
+         * @return The lock or null if no lock exist.
+         */
+        @Nullable Lock lock(UUID txId);
+
+        /**
+         * Returns lock type.
+         *
+         * @return The type.
+         */
+        boolean coarse();
+    }
+
+    /**
+     * Coarse lock.
+     */
+    public class CoarseLockState implements Releasable {
+        private final StripedCompositeReadWriteLock stripedLock = new 
StripedCompositeReadWriteLock(CONCURRENCY);
+
+        private final ConcurrentHashMap<UUID, Lock> ixlockOwners = new 
ConcurrentHashMap<>();
+        private final Map<UUID, IgniteBiTuple<Lock, CompletableFuture<Lock>>> 
slockWaiters = new HashMap<>();
+        private final ConcurrentHashMap<UUID, Lock> slockOwners = new 
ConcurrentHashMap<>();
+
+        private final LockKey lockKey;
+
+        CoarseLockState(LockKey lockKey) {
+            this.lockKey = lockKey;
+        }
+
+        @Override
+        public boolean tryRelease(UUID txId) {
+            Lock lock = lock(txId);
+
+            release(lock);
+
+            return false;
+        }
+
+        @Override
+        public LockKey key() {
+            return lockKey;
+        }
+
+        @Override
+        public Lock lock(UUID txId) {
+            Lock lock = ixlockOwners.get(txId);
+
+            if (lock != null) {
+                return lock;
+            }
+
+            int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY);
+
+            stripedLock.readLock(idx).lock();
+
+            try {
+                return slockOwners.get(txId);
+            } finally {
+                stripedLock.readLock(idx).unlock();
+            }
+        }
+
+        @Override
+        public boolean coarse() {
+            return true;
+        }
+
+        /**
+         * Acquires a lock.
+         *
+         * @param txId Tx id.
+         * @param lockKey Lock key.
+         * @param lockMode Lock mode.
+         * @return The future.
+         */
+        public CompletableFuture<Lock> acquire(UUID txId, LockKey lockKey, 
LockMode lockMode) {
+            switch (lockMode) {
+                case S:
+                    stripedLock.writeLock().lock();
+
+                    try {
+                        if (!ixlockOwners.isEmpty()) {
+                            if (ixlockOwners.containsKey(txId)) {
+                                // IX-locks can't be modified under the 
striped lock.
+                                if (ixlockOwners.size() == 1) {
+                                    // Safe to upgrade.
+                                    track(txId, this); // Double track.
+                                    Lock lock = new Lock(lockKey, lockMode, 
txId);
+                                    slockOwners.putIfAbsent(txId, lock);
+                                    return completedFuture(lock);
+                                } else {
+                                    // Attempt to upgrade to SIX in the 
presence of concurrent transactions. Deny lock attempt.
+                                    for (Lock lock : ixlockOwners.values()) {
+                                        if (!lock.txId().equals(txId)) {
+                                            return 
failedFuture(lockException(txId, lock.txId()));
+                                        }
+                                    }
+                                }
+
+                                assert false : "Should not reach here";
+                            }
+
+                            UUID holderTx = 
ixlockOwners.keySet().iterator().next();
+                            CompletableFuture<Void> res = 
fireEvent(LOCK_CONFLICT, new LockEventParameters(txId, holderTx));
+                            if (res.isCompletedExceptionally()) {
+                                return 
failedFuture(abandonedLockException(txId, holderTx));
+                            }
+
+                            track(txId, this);
+
+                            CompletableFuture<Lock> fut = new 
CompletableFuture<>();
+                            IgniteBiTuple<Lock, CompletableFuture<Lock>> prev 
= slockWaiters.putIfAbsent(txId,
+                                    new IgniteBiTuple<>(new Lock(lockKey, 
lockMode, txId), fut));
+                            return prev == null ? fut : prev.get2();
+                        } else {
+                            Lock lock = new Lock(lockKey, lockMode, txId);
+                            Lock prev = slockOwners.putIfAbsent(txId, lock);
+
+                            if (prev == null) {
+                                track(txId, this); // Do not track on reenter.
+                            }
+
+                            return completedFuture(lock);
+                        }
+                    } finally {
+                        stripedLock.writeLock().unlock();
+                    }
+
+                case IX:
+                    int idx = Math.floorMod(spread(txId.hashCode()), 
CONCURRENCY);
+
+                    while (true) {
+                        // This will prevent IX to block S, giving deadlock 
avoidance.
+                        boolean locked = stripedLock.readLock(idx).tryLock();
+
+                        // Striped lock is held for short time. Can spin wait 
here.
+                        if (!locked) {
+                            Thread.onSpinWait();
+                        } else {
+                            break;
+                        }
+                    }
+
+                    try {
+                        if (!slockOwners.isEmpty()) {
+                            if (slockOwners.containsKey(txId)) {
+                                // S-locks can't be modified under the striped 
lock.
+                                if (slockOwners.size() == 1) {
+                                    // Safe to upgrade.
+                                    track(txId, this); // Double track.
+                                    Lock lock = new Lock(lockKey, lockMode, 
txId);
+                                    ixlockOwners.putIfAbsent(txId, lock);
+                                    return completedFuture(lock);
+                                } else {
+                                    // Attempt to upgrade to SIX in the 
presence of concurrent transactions. Deny lock attempt.
+                                    for (Lock lock : slockOwners.values()) {
+                                        if (!lock.txId().equals(txId)) {
+                                            return 
failedFuture(lockException(txId, lock.txId()));
+                                        }
+                                    }
+                                }
+
+                                assert false : "Should not reach here";
+                            }
+
+                            UUID holderTx = 
slockOwners.keySet().iterator().next();
+                            CompletableFuture<Void> eventResult = 
fireEvent(LOCK_CONFLICT, new LockEventParameters(txId, holderTx));
+                            if (eventResult.isCompletedExceptionally()) {
+                                return 
failedFuture(abandonedLockException(txId, holderTx));
+                            } else {
+                                return failedFuture(lockException(txId, 
holderTx)); // IX locks never wait.
+                            }
+                        } else {
+                            Lock lock = new Lock(lockKey, lockMode, txId);
+                            Lock prev = ixlockOwners.putIfAbsent(txId, lock); 
// Avoid overwrite existing lock.
+
+                            if (prev == null) {
+                                track(txId, this); // Do not track on reenter.
+                            }
+
+                            return completedFuture(lock);
+                        }
+                    } finally {
+                        stripedLock.readLock(idx).unlock();
+                    }
+
+                default:
+                    assert false : "Unsupported coarse lock mode: " + lockMode;
+            }
+
+            return null; // Should not be here.
+        }
+
+        /**
+         * Releases the lock. Should be called from {@link #releaseAll(UUID)}.
+         *
+         * @param lock The lock.
+         */
+        public void release(@Nullable Lock lock) {
+            if (lock == null) {
+                return;
+            }
+
+            switch (lock.lockMode()) {
+                case S:
+                    stripedLock.writeLock().lock();
+
+                    try {
+                        Lock removed = slockOwners.remove(lock.txId());
+
+                        assert removed != null : "Attempt to release not 
acquired lock: " + lock.txId();
+                    } finally {
+                        stripedLock.writeLock().unlock();
+                    }
+
+                    break;
+                case IX:
+                    int idx = Math.floorMod(spread(lock.txId().hashCode()), 
CONCURRENCY);
+
+                    Map<UUID, IgniteBiTuple<Lock, CompletableFuture<Lock>>> 
wakeups;
+
+                    stripedLock.readLock(idx).lock();
+
+                    try {
+                        var removed = ixlockOwners.remove(lock.txId());
+
+                        assert removed != null : "Attempt to release not 
acquired lock: " + lock.txId();
+
+                        if (slockWaiters.isEmpty()) {
+                            return; // Nothing to do.
+                        }
+
+                        if (!ixlockOwners.isEmpty()) {
+                            assert slockOwners.isEmpty() || 
slockOwners.containsKey(lock.txId());
+
+                            return; // Nothing to do.
+                        }
+
+                        // No race here because no new locks can be acquired 
after releaseAll due to 2-phase locking protocol.
+
+                        // Promote waiters to owners.
+                        wakeups = new HashMap<>(slockWaiters);
+
+                        slockWaiters.clear();
+
+                        for (IgniteBiTuple<Lock, CompletableFuture<Lock>> 
value : wakeups.values()) {
+                            slockOwners.put(value.getKey().txId(), 
value.getKey());
+                        }
+                    } finally {
+                        stripedLock.readLock(idx).unlock();
+                    }
+
+                    for (Entry<UUID, IgniteBiTuple<Lock, 
CompletableFuture<Lock>>> entry : wakeups.entrySet()) {
+                        
entry.getValue().get2().complete(entry.getValue().get1());
+                    }
+
+                    break;
+                default:
+                    assert false : "Unsupported coarse unlock mode: " + 
lock.lockMode();

Review Comment:
   seems SIX lock mode is also possible here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to