sashapolo commented on code in PR #5547: URL: https://github.com/apache/ignite-3/pull/5547#discussion_r2026470233
########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.StampedLock; + +/** + * Asynchronous analogue of read-write lock. It has the following properties: Review Comment: ```suggestion * Asynchronous analogue of a read-write lock. It has the following properties: ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.StampedLock; + +/** + * Asynchronous analogue of read-write lock. It has the following properties: + * + * <ul> + * <li>Write lock is exclusive; if the lock is write-locked, other attempts to acquire any lock waits for the write lock to be released + * </li> + * <li>Read lock is non-exclusive: if the lock is read-locked (and there are no waiting write lock attempts), other read locks are + * acquired immediately, but attempts to acquire write locks wait for all read locks to be releasaed</li> + * <li>Write locks have priority over read locks: if the lock is read-locked, and there is a waiting write lock attempt, read lock + * attempts will queue until all write lock attempts are satisfied and released</li> + * <li>Lock holder is not bound to any thread; instead, a lock holder gets a stamp that can be used to release the lock</li> + * </ul> + * + * <p>This implementation is naive because it implies that time-to-hold the locks can be pretty long and there will be no + * high contension on the acquiring side; this simplifies the implementation.</p> Review Comment: ```suggestion * high contention on the acquiring side; this simplifies the implementation.</p> ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.StampedLock; + +/** + * Asynchronous analogue of read-write lock. It has the following properties: + * + * <ul> + * <li>Write lock is exclusive; if the lock is write-locked, other attempts to acquire any lock waits for the write lock to be released + * </li> + * <li>Read lock is non-exclusive: if the lock is read-locked (and there are no waiting write lock attempts), other read locks are + * acquired immediately, but attempts to acquire write locks wait for all read locks to be releasaed</li> + * <li>Write locks have priority over read locks: if the lock is read-locked, and there is a waiting write lock attempt, read lock + * attempts will queue until all write lock attempts are satisfied and released</li> + * <li>Lock holder is not bound to any thread; instead, a lock holder gets a stamp that can be used to release the lock</li> + * </ul> + * + * <p>This implementation is naive because it implies that time-to-hold the locks can be pretty long and there will be no + * high contension on the acquiring side; this simplifies the implementation.</p> + */ +public class NaiveAsyncReadWriteLock { + /** Executor in which the waiting lock attempts' futures are completed. */ + private final Executor futureCompletionExecutor; + + /** Used to manage the lock state (including issuing and using stamps). */ + private final StampedLock stampedLock = new StampedLock(); + + /** Used to linearize access to waiters collections. */ + private final Object mutex = new Object(); + + /** Queue of futures waiting for write lock to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> writeLockWaiters = new ArrayDeque<>(); + + /** Queue of futures waiting for read locks to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> readLockWaiters = new ArrayDeque<>(); + + public NaiveAsyncReadWriteLock(Executor futureCompletionExecutor) { + this.futureCompletionExecutor = futureCompletionExecutor; + } + + /** + * Attempts to acquire the write lock. + * + * @return Future completed with the stamp of the acquired lock; completed when the lock is acquired. + */ + public CompletableFuture<Long> writeLock() { + synchronized (mutex) { + long stamp = stampedLock.tryWriteLock(); + if (stamp != 0) { + return completedFuture(stamp); + } + + CompletableFuture<Long> lockFuture = new CompletableFuture<>(); + + writeLockWaiters.add(lockFuture); + + return lockFuture; + } + } + + /** + * Unlocks write lock previously obtained via {@link #writeLock()}. + * + * @param stamp Stamp returned via write lock future. + */ + public void unlockWrite(long stamp) { + synchronized (mutex) { + stampedLock.unlockWrite(stamp); + + CompletableFuture<Long> writeLockWaiter = writeLockWaiters.poll(); + + if (writeLockWaiter != null) { + // Someone is waiting for a write lock, satisfy the request. + long newWriteStamp = stampedLock.tryWriteLock(); + assert newWriteStamp != 0; + + futureCompletionExecutor.execute(() -> writeLockWaiter.complete(newWriteStamp)); + } else { + // Someone might be waiting for read locks. + satisfyReadLockWaiters(); + } + } + } + + private void satisfyReadLockWaiters() { + Long2ObjectMap<CompletableFuture<Long>> readLockWaitersMap = null; + + for (CompletableFuture<Long> readLockWaiter : readLockWaiters) { + if (readLockWaitersMap == null) { + readLockWaitersMap = new Long2ObjectAVLTreeMap<>(); + } + + long newReadStamp = stampedLock.tryReadLock(); + assert newReadStamp != 0; + readLockWaitersMap.put(newReadStamp, readLockWaiter); + + for (Entry<CompletableFuture<Long>> entry : readLockWaitersMap.long2ObjectEntrySet()) { + futureCompletionExecutor.execute(() -> entry.getValue().complete(entry.getLongKey())); Review Comment: same here about `completeAsync` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.StampedLock; + +/** + * Asynchronous analogue of read-write lock. It has the following properties: + * + * <ul> + * <li>Write lock is exclusive; if the lock is write-locked, other attempts to acquire any lock waits for the write lock to be released + * </li> + * <li>Read lock is non-exclusive: if the lock is read-locked (and there are no waiting write lock attempts), other read locks are + * acquired immediately, but attempts to acquire write locks wait for all read locks to be releasaed</li> + * <li>Write locks have priority over read locks: if the lock is read-locked, and there is a waiting write lock attempt, read lock + * attempts will queue until all write lock attempts are satisfied and released</li> + * <li>Lock holder is not bound to any thread; instead, a lock holder gets a stamp that can be used to release the lock</li> + * </ul> + * + * <p>This implementation is naive because it implies that time-to-hold the locks can be pretty long and there will be no + * high contension on the acquiring side; this simplifies the implementation.</p> + */ +public class NaiveAsyncReadWriteLock { + /** Executor in which the waiting lock attempts' futures are completed. */ + private final Executor futureCompletionExecutor; + + /** Used to manage the lock state (including issuing and using stamps). */ + private final StampedLock stampedLock = new StampedLock(); + + /** Used to linearize access to waiters collections. */ + private final Object mutex = new Object(); + + /** Queue of futures waiting for write lock to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> writeLockWaiters = new ArrayDeque<>(); + + /** Queue of futures waiting for read locks to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> readLockWaiters = new ArrayDeque<>(); + + public NaiveAsyncReadWriteLock(Executor futureCompletionExecutor) { + this.futureCompletionExecutor = futureCompletionExecutor; + } + + /** + * Attempts to acquire the write lock. + * + * @return Future completed with the stamp of the acquired lock; completed when the lock is acquired. + */ + public CompletableFuture<Long> writeLock() { + synchronized (mutex) { + long stamp = stampedLock.tryWriteLock(); + if (stamp != 0) { + return completedFuture(stamp); + } + + CompletableFuture<Long> lockFuture = new CompletableFuture<>(); + + writeLockWaiters.add(lockFuture); + + return lockFuture; + } + } + + /** + * Unlocks write lock previously obtained via {@link #writeLock()}. + * + * @param stamp Stamp returned via write lock future. + */ + public void unlockWrite(long stamp) { + synchronized (mutex) { + stampedLock.unlockWrite(stamp); + + CompletableFuture<Long> writeLockWaiter = writeLockWaiters.poll(); + + if (writeLockWaiter != null) { + // Someone is waiting for a write lock, satisfy the request. + long newWriteStamp = stampedLock.tryWriteLock(); + assert newWriteStamp != 0; + + futureCompletionExecutor.execute(() -> writeLockWaiter.complete(newWriteStamp)); Review Comment: ```suggestion writeLockWaiter.completeAsync(() -> newWriteStamp, futureCompletionExecutor); ``` ########## modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLockTest.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; +import org.apache.ignite.internal.testframework.InjectExecutorService; +import org.apache.ignite.internal.util.CompletableFutures; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ExecutorServiceExtension.class) +class NaiveAsyncReadWriteLockTest { + private final NaiveAsyncReadWriteLock lock = new NaiveAsyncReadWriteLock(ForkJoinPool.commonPool()); + + @Test + void writeLockAttemptsArrivingWhenWriteLockedGetSatisfiedWhenUnlocked() { + List<CompletableFuture<?>> futures = new ArrayList<>(); + + CompletableFuture<Long> firstFuture = lock.writeLock(); + + for (int i = 0; i < 10; i++) { + futures.add(lock.writeLock().thenAccept(lock::unlockWrite)); + } + + assertThat(firstFuture.thenAccept(lock::unlockWrite), willCompleteSuccessfully()); + + assertThat(CompletableFutures.allOf(futures), willCompleteSuccessfully()); + } + + @Test + void readLockAttemptsArrivingWhenWriteLockedGetSatisfiedWhenUnlocked() { + List<CompletableFuture<?>> futures = new ArrayList<>(); + + CompletableFuture<Long> firstFuture = lock.writeLock(); + + for (int i = 0; i < 10; i++) { + futures.add(lock.readLock().thenAccept(lock::unlockRead)); + } + + assertThat(firstFuture.thenAccept(lock::unlockWrite), willCompleteSuccessfully()); + + assertThat(CompletableFutures.allOf(futures), willCompleteSuccessfully()); + } + + @Test + void writeLockAttemptsArrivingWhenReadLockedGetSatisfiedWhenUnlocked() { + CompletableFuture<Long> readLockFuture1 = lock.readLock(); + CompletableFuture<Long> readLockFuture2 = lock.readLock(); + + CompletableFuture<Long> writeLockFuture = lock.writeLock(); + + assertThat(readLockFuture1.thenAccept(lock::unlockRead), willCompleteSuccessfully()); + assertThat(readLockFuture2.thenAccept(lock::unlockRead), willCompleteSuccessfully()); + + assertThat(writeLockFuture, willCompleteSuccessfully()); + assertDoesNotThrow(() -> lock.unlockWrite(writeLockFuture.join())); + } + + @Test + void testConcurrency(@InjectExecutorService(threadCount = 4) ExecutorService executor) { + List<CompletableFuture<?>> futures = new CopyOnWriteArrayList<>(); + + for (int thread = 0; thread < 4; thread++) { + executor.execute(() -> { + for (int i = 0; i < 10000; i++) { + CompletableFuture<?> future; + + if (i % 2 == 0) { + future = lock.readLock().thenCompose(stamp -> { + return CompletableFuture.runAsync(() -> sleep(1)) Review Comment: Should this stuff be submitted on an explicit executor? This can drain the default pool otherwise ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -823,14 +846,17 @@ private CompletableFuture<Void> prepareTableResourcesAndLoadToZoneReplica( tables.put(tableId, table); // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible performance degradation. - return createPartsFut.thenAccept(ignore -> startedTables.put(tableId, table)) - .whenComplete((v, th) -> { - partitionReplicaLifecycleManager.unlockZoneForRead(zoneDescriptor.id(), stamp); + return createPartsFut.thenAccept(ignore -> { + startedTables.put(tableId, table); - if (th == null) { - addTableToZone(zoneDescriptor.id(), table); - } - }); + addTableToZone(zoneDescriptor.id(), table); + }); + } + + private void unlockZoneForRead(CatalogZoneDescriptor zoneDescriptor, CompletableFuture<Long> readLockAcquiryFuture) { + readLockAcquiryFuture.thenAccept(stamp -> { Review Comment: This makes me think: does the lock actually need the continuation executor? Shouldn't we just use `*Async` methods for some heavy chaining operations and for smaller ones use the default befavior? Do we really need an extra executor submission? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -777,11 +777,33 @@ private CompletableFuture<Void> prepareTableResourcesAndLoadToZoneReplica( return schemaManager.schemaRegistry(causalityToken, tableId).thenAccept(table::schemaView); })); - long stamp = partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id()); + // Obtain future, but don't chain on it yet because update() on VVs must be called in the same thread. The method we call + // will call update() on VVs and inside those updates it will chain on the lock acquiry future. + CompletableFuture<Long> lockAcquiryFuture = partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id()); Review Comment: ```suggestion CompletableFuture<Long> lockAcquisitionFuture = partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id()); ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -777,11 +777,33 @@ private CompletableFuture<Void> prepareTableResourcesAndLoadToZoneReplica( return schemaManager.schemaRegistry(causalityToken, tableId).thenAccept(table::schemaView); })); - long stamp = partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id()); + // Obtain future, but don't chain on it yet because update() on VVs must be called in the same thread. The method we call + // will call update() on VVs and inside those updates it will chain on the lock acquiry future. + CompletableFuture<Long> lockAcquiryFuture = partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id()); + try { + return prepareTableResourcesAndLoadHavingZoneReadLock(lockAcquiryFuture, causalityToken, zoneDescriptor, onNodeRecovery, table) + .whenComplete((res, ex) -> { Review Comment: ```suggestion .whenComplete((res, ex) -> unlockZoneForRead(zoneDescriptor, lockAcquiryFuture)); ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.StampedLock; + +/** + * Asynchronous analogue of read-write lock. It has the following properties: + * + * <ul> + * <li>Write lock is exclusive; if the lock is write-locked, other attempts to acquire any lock waits for the write lock to be released + * </li> + * <li>Read lock is non-exclusive: if the lock is read-locked (and there are no waiting write lock attempts), other read locks are + * acquired immediately, but attempts to acquire write locks wait for all read locks to be releasaed</li> + * <li>Write locks have priority over read locks: if the lock is read-locked, and there is a waiting write lock attempt, read lock + * attempts will queue until all write lock attempts are satisfied and released</li> + * <li>Lock holder is not bound to any thread; instead, a lock holder gets a stamp that can be used to release the lock</li> + * </ul> + * + * <p>This implementation is naive because it implies that time-to-hold the locks can be pretty long and there will be no + * high contension on the acquiring side; this simplifies the implementation.</p> + */ +public class NaiveAsyncReadWriteLock { + /** Executor in which the waiting lock attempts' futures are completed. */ + private final Executor futureCompletionExecutor; + + /** Used to manage the lock state (including issuing and using stamps). */ + private final StampedLock stampedLock = new StampedLock(); + + /** Used to linearize access to waiters collections. */ + private final Object mutex = new Object(); + + /** Queue of futures waiting for write lock to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> writeLockWaiters = new ArrayDeque<>(); + + /** Queue of futures waiting for read locks to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> readLockWaiters = new ArrayDeque<>(); + + public NaiveAsyncReadWriteLock(Executor futureCompletionExecutor) { + this.futureCompletionExecutor = futureCompletionExecutor; + } + + /** + * Attempts to acquire the write lock. + * + * @return Future completed with the stamp of the acquired lock; completed when the lock is acquired. + */ + public CompletableFuture<Long> writeLock() { + synchronized (mutex) { + long stamp = stampedLock.tryWriteLock(); + if (stamp != 0) { + return completedFuture(stamp); + } + + CompletableFuture<Long> lockFuture = new CompletableFuture<>(); + + writeLockWaiters.add(lockFuture); + + return lockFuture; + } + } + + /** + * Unlocks write lock previously obtained via {@link #writeLock()}. + * + * @param stamp Stamp returned via write lock future. + */ + public void unlockWrite(long stamp) { + synchronized (mutex) { + stampedLock.unlockWrite(stamp); + + CompletableFuture<Long> writeLockWaiter = writeLockWaiters.poll(); + + if (writeLockWaiter != null) { + // Someone is waiting for a write lock, satisfy the request. + long newWriteStamp = stampedLock.tryWriteLock(); + assert newWriteStamp != 0; + + futureCompletionExecutor.execute(() -> writeLockWaiter.complete(newWriteStamp)); + } else { + // Someone might be waiting for read locks. + satisfyReadLockWaiters(); + } + } + } + + private void satisfyReadLockWaiters() { + Long2ObjectMap<CompletableFuture<Long>> readLockWaitersMap = null; + + for (CompletableFuture<Long> readLockWaiter : readLockWaiters) { + if (readLockWaitersMap == null) { + readLockWaitersMap = new Long2ObjectAVLTreeMap<>(); + } + + long newReadStamp = stampedLock.tryReadLock(); + assert newReadStamp != 0; + readLockWaitersMap.put(newReadStamp, readLockWaiter); + + for (Entry<CompletableFuture<Long>> entry : readLockWaitersMap.long2ObjectEntrySet()) { + futureCompletionExecutor.execute(() -> entry.getValue().complete(entry.getLongKey())); + } + } + + readLockWaiters.clear(); + } + + /** + * Attempts to acquire a read lock. + * + * @return Future completed with the stamp of the acquired lock; completed when the lock is acquired. + */ + public CompletableFuture<Long> readLock() { + synchronized (mutex) { + // Write lock attempts have priority over read lock attempts, so first check whether someone waits for write lock. + if (writeLockWaiters.isEmpty()) { + long stamp = stampedLock.tryReadLock(); + if (stamp != 0) { + return completedFuture(stamp); + } + } + + CompletableFuture<Long> lockFuture = new CompletableFuture<>(); + + readLockWaiters.add(lockFuture); + + return lockFuture; + } + } + + /** + * Unlocks read lock previously obtained via {@link #readLock()}. + * + * @param stamp Stamp returned via read lock future. + */ + public void unlockRead(long stamp) { + synchronized (mutex) { + stampedLock.unlockRead(stamp); + + CompletableFuture<Long> writeLockWaiter = writeLockWaiters.peek(); + + if (writeLockWaiter != null) { + long newWriteStamp = stampedLock.tryWriteLock(); + if (newWriteStamp != 0) { + writeLockWaiters.remove(); + + futureCompletionExecutor.execute(() -> writeLockWaiter.complete(newWriteStamp)); Review Comment: Same stuff about `completeAsync` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -1515,18 +1517,20 @@ public void unloadTableResourcesFromZoneReplica( } private <T> CompletableFuture<T> executeUnderZoneWriteLock(int zoneId, Supplier<CompletableFuture<T>> action) { - StampedLock lock = zonePartitionsLocks.computeIfAbsent(zoneId, id -> new StampedLock()); + NaiveAsyncReadWriteLock lock = zonePartitionsLocks.computeIfAbsent(zoneId, id -> newZoneLock()); - long stamp = lock.writeLock(); - - try { - return action.get() - .whenComplete((v, e) -> lock.unlockWrite(stamp)); - } catch (Throwable e) { - lock.unlockWrite(stamp); + return lock.writeLock().thenCompose(stamp -> { + try { + return action.get() + .whenComplete((v, e) -> { Review Comment: ```suggestion .whenComplete((v, e) -> lock.unlockWrite(stamp)); ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.StampedLock; + +/** + * Asynchronous analogue of read-write lock. It has the following properties: + * + * <ul> + * <li>Write lock is exclusive; if the lock is write-locked, other attempts to acquire any lock waits for the write lock to be released + * </li> + * <li>Read lock is non-exclusive: if the lock is read-locked (and there are no waiting write lock attempts), other read locks are + * acquired immediately, but attempts to acquire write locks wait for all read locks to be releasaed</li> + * <li>Write locks have priority over read locks: if the lock is read-locked, and there is a waiting write lock attempt, read lock + * attempts will queue until all write lock attempts are satisfied and released</li> + * <li>Lock holder is not bound to any thread; instead, a lock holder gets a stamp that can be used to release the lock</li> + * </ul> + * + * <p>This implementation is naive because it implies that time-to-hold the locks can be pretty long and there will be no + * high contension on the acquiring side; this simplifies the implementation.</p> + */ +public class NaiveAsyncReadWriteLock { + /** Executor in which the waiting lock attempts' futures are completed. */ + private final Executor futureCompletionExecutor; + + /** Used to manage the lock state (including issuing and using stamps). */ + private final StampedLock stampedLock = new StampedLock(); + + /** Used to linearize access to waiters collections. */ + private final Object mutex = new Object(); + + /** Queue of futures waiting for write lock to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> writeLockWaiters = new ArrayDeque<>(); + + /** Queue of futures waiting for read locks to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> readLockWaiters = new ArrayDeque<>(); + + public NaiveAsyncReadWriteLock(Executor futureCompletionExecutor) { + this.futureCompletionExecutor = futureCompletionExecutor; + } + + /** + * Attempts to acquire the write lock. + * + * @return Future completed with the stamp of the acquired lock; completed when the lock is acquired. + */ + public CompletableFuture<Long> writeLock() { + synchronized (mutex) { + long stamp = stampedLock.tryWriteLock(); + if (stamp != 0) { + return completedFuture(stamp); + } + + CompletableFuture<Long> lockFuture = new CompletableFuture<>(); + + writeLockWaiters.add(lockFuture); + + return lockFuture; + } + } + + /** + * Unlocks write lock previously obtained via {@link #writeLock()}. + * + * @param stamp Stamp returned via write lock future. + */ + public void unlockWrite(long stamp) { + synchronized (mutex) { + stampedLock.unlockWrite(stamp); + + CompletableFuture<Long> writeLockWaiter = writeLockWaiters.poll(); + + if (writeLockWaiter != null) { + // Someone is waiting for a write lock, satisfy the request. + long newWriteStamp = stampedLock.tryWriteLock(); + assert newWriteStamp != 0; + + futureCompletionExecutor.execute(() -> writeLockWaiter.complete(newWriteStamp)); + } else { + // Someone might be waiting for read locks. + satisfyReadLockWaiters(); + } + } + } + + private void satisfyReadLockWaiters() { + Long2ObjectMap<CompletableFuture<Long>> readLockWaitersMap = null; + + for (CompletableFuture<Long> readLockWaiter : readLockWaiters) { + if (readLockWaitersMap == null) { + readLockWaitersMap = new Long2ObjectAVLTreeMap<>(); Review Comment: Why do you need a sorted map here? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.StampedLock; + +/** + * Asynchronous analogue of read-write lock. It has the following properties: + * + * <ul> + * <li>Write lock is exclusive; if the lock is write-locked, other attempts to acquire any lock waits for the write lock to be released + * </li> + * <li>Read lock is non-exclusive: if the lock is read-locked (and there are no waiting write lock attempts), other read locks are + * acquired immediately, but attempts to acquire write locks wait for all read locks to be releasaed</li> + * <li>Write locks have priority over read locks: if the lock is read-locked, and there is a waiting write lock attempt, read lock + * attempts will queue until all write lock attempts are satisfied and released</li> + * <li>Lock holder is not bound to any thread; instead, a lock holder gets a stamp that can be used to release the lock</li> + * </ul> + * + * <p>This implementation is naive because it implies that time-to-hold the locks can be pretty long and there will be no + * high contension on the acquiring side; this simplifies the implementation.</p> + */ +public class NaiveAsyncReadWriteLock { + /** Executor in which the waiting lock attempts' futures are completed. */ + private final Executor futureCompletionExecutor; + + /** Used to manage the lock state (including issuing and using stamps). */ + private final StampedLock stampedLock = new StampedLock(); + + /** Used to linearize access to waiters collections. */ + private final Object mutex = new Object(); + + /** Queue of futures waiting for write lock to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> writeLockWaiters = new ArrayDeque<>(); + + /** Queue of futures waiting for read locks to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> readLockWaiters = new ArrayDeque<>(); + + public NaiveAsyncReadWriteLock(Executor futureCompletionExecutor) { + this.futureCompletionExecutor = futureCompletionExecutor; + } + + /** + * Attempts to acquire the write lock. + * + * @return Future completed with the stamp of the acquired lock; completed when the lock is acquired. + */ + public CompletableFuture<Long> writeLock() { + synchronized (mutex) { + long stamp = stampedLock.tryWriteLock(); + if (stamp != 0) { + return completedFuture(stamp); + } + + CompletableFuture<Long> lockFuture = new CompletableFuture<>(); + + writeLockWaiters.add(lockFuture); + + return lockFuture; + } + } + + /** + * Unlocks write lock previously obtained via {@link #writeLock()}. + * + * @param stamp Stamp returned via write lock future. + */ + public void unlockWrite(long stamp) { + synchronized (mutex) { + stampedLock.unlockWrite(stamp); + + CompletableFuture<Long> writeLockWaiter = writeLockWaiters.poll(); + + if (writeLockWaiter != null) { + // Someone is waiting for a write lock, satisfy the request. + long newWriteStamp = stampedLock.tryWriteLock(); + assert newWriteStamp != 0; + + futureCompletionExecutor.execute(() -> writeLockWaiter.complete(newWriteStamp)); + } else { + // Someone might be waiting for read locks. + satisfyReadLockWaiters(); + } + } + } + + private void satisfyReadLockWaiters() { + Long2ObjectMap<CompletableFuture<Long>> readLockWaitersMap = null; + + for (CompletableFuture<Long> readLockWaiter : readLockWaiters) { + if (readLockWaitersMap == null) { + readLockWaitersMap = new Long2ObjectAVLTreeMap<>(); + } + + long newReadStamp = stampedLock.tryReadLock(); + assert newReadStamp != 0; + readLockWaitersMap.put(newReadStamp, readLockWaiter); + + for (Entry<CompletableFuture<Long>> entry : readLockWaitersMap.long2ObjectEntrySet()) { + futureCompletionExecutor.execute(() -> entry.getValue().complete(entry.getLongKey())); + } + } + + readLockWaiters.clear(); + } + + /** + * Attempts to acquire a read lock. + * + * @return Future completed with the stamp of the acquired lock; completed when the lock is acquired. + */ + public CompletableFuture<Long> readLock() { + synchronized (mutex) { + // Write lock attempts have priority over read lock attempts, so first check whether someone waits for write lock. + if (writeLockWaiters.isEmpty()) { + long stamp = stampedLock.tryReadLock(); + if (stamp != 0) { + return completedFuture(stamp); + } + } + + CompletableFuture<Long> lockFuture = new CompletableFuture<>(); + + readLockWaiters.add(lockFuture); + + return lockFuture; + } + } + + /** + * Unlocks read lock previously obtained via {@link #readLock()}. + * + * @param stamp Stamp returned via read lock future. + */ + public void unlockRead(long stamp) { + synchronized (mutex) { + stampedLock.unlockRead(stamp); + + CompletableFuture<Long> writeLockWaiter = writeLockWaiters.peek(); Review Comment: Can we use `isReadLocked` check instead? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.StampedLock; + +/** + * Asynchronous analogue of read-write lock. It has the following properties: + * + * <ul> + * <li>Write lock is exclusive; if the lock is write-locked, other attempts to acquire any lock waits for the write lock to be released + * </li> + * <li>Read lock is non-exclusive: if the lock is read-locked (and there are no waiting write lock attempts), other read locks are + * acquired immediately, but attempts to acquire write locks wait for all read locks to be releasaed</li> + * <li>Write locks have priority over read locks: if the lock is read-locked, and there is a waiting write lock attempt, read lock + * attempts will queue until all write lock attempts are satisfied and released</li> + * <li>Lock holder is not bound to any thread; instead, a lock holder gets a stamp that can be used to release the lock</li> + * </ul> + * + * <p>This implementation is naive because it implies that time-to-hold the locks can be pretty long and there will be no + * high contension on the acquiring side; this simplifies the implementation.</p> + */ +public class NaiveAsyncReadWriteLock { + /** Executor in which the waiting lock attempts' futures are completed. */ + private final Executor futureCompletionExecutor; + + /** Used to manage the lock state (including issuing and using stamps). */ + private final StampedLock stampedLock = new StampedLock(); + + /** Used to linearize access to waiters collections. */ + private final Object mutex = new Object(); + + /** Queue of futures waiting for write lock to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> writeLockWaiters = new ArrayDeque<>(); + + /** Queue of futures waiting for read locks to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> readLockWaiters = new ArrayDeque<>(); + + public NaiveAsyncReadWriteLock(Executor futureCompletionExecutor) { + this.futureCompletionExecutor = futureCompletionExecutor; + } + + /** + * Attempts to acquire the write lock. + * + * @return Future completed with the stamp of the acquired lock; completed when the lock is acquired. + */ + public CompletableFuture<Long> writeLock() { + synchronized (mutex) { + long stamp = stampedLock.tryWriteLock(); + if (stamp != 0) { + return completedFuture(stamp); + } + + CompletableFuture<Long> lockFuture = new CompletableFuture<>(); + + writeLockWaiters.add(lockFuture); + + return lockFuture; + } + } + + /** + * Unlocks write lock previously obtained via {@link #writeLock()}. + * + * @param stamp Stamp returned via write lock future. + */ + public void unlockWrite(long stamp) { + synchronized (mutex) { + stampedLock.unlockWrite(stamp); + + CompletableFuture<Long> writeLockWaiter = writeLockWaiters.poll(); + + if (writeLockWaiter != null) { + // Someone is waiting for a write lock, satisfy the request. + long newWriteStamp = stampedLock.tryWriteLock(); + assert newWriteStamp != 0; + + futureCompletionExecutor.execute(() -> writeLockWaiter.complete(newWriteStamp)); + } else { + // Someone might be waiting for read locks. + satisfyReadLockWaiters(); + } + } + } + + private void satisfyReadLockWaiters() { + Long2ObjectMap<CompletableFuture<Long>> readLockWaitersMap = null; + + for (CompletableFuture<Long> readLockWaiter : readLockWaiters) { + if (readLockWaitersMap == null) { + readLockWaitersMap = new Long2ObjectAVLTreeMap<>(); + } + + long newReadStamp = stampedLock.tryReadLock(); + assert newReadStamp != 0; + readLockWaitersMap.put(newReadStamp, readLockWaiter); + + for (Entry<CompletableFuture<Long>> entry : readLockWaitersMap.long2ObjectEntrySet()) { Review Comment: I'm confused, shouldn't this cycle be outside of the outer cycle? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.StampedLock; + +/** + * Asynchronous analogue of read-write lock. It has the following properties: + * + * <ul> + * <li>Write lock is exclusive; if the lock is write-locked, other attempts to acquire any lock waits for the write lock to be released + * </li> + * <li>Read lock is non-exclusive: if the lock is read-locked (and there are no waiting write lock attempts), other read locks are + * acquired immediately, but attempts to acquire write locks wait for all read locks to be releasaed</li> Review Comment: ```suggestion * acquired immediately, but attempts to acquire write locks wait for all read locks to be released</li> ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -777,11 +777,33 @@ private CompletableFuture<Void> prepareTableResourcesAndLoadToZoneReplica( return schemaManager.schemaRegistry(causalityToken, tableId).thenAccept(table::schemaView); })); - long stamp = partitionReplicaLifecycleManager.lockZoneForRead(zoneDescriptor.id()); + // Obtain future, but don't chain on it yet because update() on VVs must be called in the same thread. The method we call + // will call update() on VVs and inside those updates it will chain on the lock acquiry future. Review Comment: ```suggestion // will call update() on VVs and inside those updates it will chain on the lock acquisition future. ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.StampedLock; + +/** + * Asynchronous analogue of read-write lock. It has the following properties: + * + * <ul> + * <li>Write lock is exclusive; if the lock is write-locked, other attempts to acquire any lock waits for the write lock to be released + * </li> + * <li>Read lock is non-exclusive: if the lock is read-locked (and there are no waiting write lock attempts), other read locks are + * acquired immediately, but attempts to acquire write locks wait for all read locks to be releasaed</li> + * <li>Write locks have priority over read locks: if the lock is read-locked, and there is a waiting write lock attempt, read lock + * attempts will queue until all write lock attempts are satisfied and released</li> + * <li>Lock holder is not bound to any thread; instead, a lock holder gets a stamp that can be used to release the lock</li> + * </ul> + * + * <p>This implementation is naive because it implies that time-to-hold the locks can be pretty long and there will be no + * high contension on the acquiring side; this simplifies the implementation.</p> + */ +public class NaiveAsyncReadWriteLock { + /** Executor in which the waiting lock attempts' futures are completed. */ + private final Executor futureCompletionExecutor; + + /** Used to manage the lock state (including issuing and using stamps). */ + private final StampedLock stampedLock = new StampedLock(); + + /** Used to linearize access to waiters collections. */ + private final Object mutex = new Object(); + + /** Queue of futures waiting for write lock to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> writeLockWaiters = new ArrayDeque<>(); + + /** Queue of futures waiting for read locks to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> readLockWaiters = new ArrayDeque<>(); + + public NaiveAsyncReadWriteLock(Executor futureCompletionExecutor) { + this.futureCompletionExecutor = futureCompletionExecutor; + } + + /** + * Attempts to acquire the write lock. + * + * @return Future completed with the stamp of the acquired lock; completed when the lock is acquired. + */ + public CompletableFuture<Long> writeLock() { + synchronized (mutex) { + long stamp = stampedLock.tryWriteLock(); + if (stamp != 0) { + return completedFuture(stamp); + } + + CompletableFuture<Long> lockFuture = new CompletableFuture<>(); + + writeLockWaiters.add(lockFuture); + + return lockFuture; + } + } + + /** + * Unlocks write lock previously obtained via {@link #writeLock()}. + * + * @param stamp Stamp returned via write lock future. + */ + public void unlockWrite(long stamp) { + synchronized (mutex) { + stampedLock.unlockWrite(stamp); + + CompletableFuture<Long> writeLockWaiter = writeLockWaiters.poll(); + + if (writeLockWaiter != null) { + // Someone is waiting for a write lock, satisfy the request. + long newWriteStamp = stampedLock.tryWriteLock(); + assert newWriteStamp != 0; + + futureCompletionExecutor.execute(() -> writeLockWaiter.complete(newWriteStamp)); + } else { + // Someone might be waiting for read locks. + satisfyReadLockWaiters(); + } + } + } + + private void satisfyReadLockWaiters() { + Long2ObjectMap<CompletableFuture<Long>> readLockWaitersMap = null; + + for (CompletableFuture<Long> readLockWaiter : readLockWaiters) { + if (readLockWaitersMap == null) { + readLockWaitersMap = new Long2ObjectAVLTreeMap<>(); + } + + long newReadStamp = stampedLock.tryReadLock(); + assert newReadStamp != 0; + readLockWaitersMap.put(newReadStamp, readLockWaiter); + + for (Entry<CompletableFuture<Long>> entry : readLockWaitersMap.long2ObjectEntrySet()) { + futureCompletionExecutor.execute(() -> entry.getValue().complete(entry.getLongKey())); + } + } + + readLockWaiters.clear(); + } + + /** + * Attempts to acquire a read lock. + * + * @return Future completed with the stamp of the acquired lock; completed when the lock is acquired. + */ + public CompletableFuture<Long> readLock() { + synchronized (mutex) { + // Write lock attempts have priority over read lock attempts, so first check whether someone waits for write lock. Review Comment: It is strange that write locks are prioritized, however, when unlocking a write lock, we drain the whole readers queue, doesn't it break this invariant? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.StampedLock; + +/** + * Asynchronous analogue of read-write lock. It has the following properties: + * + * <ul> + * <li>Write lock is exclusive; if the lock is write-locked, other attempts to acquire any lock waits for the write lock to be released + * </li> + * <li>Read lock is non-exclusive: if the lock is read-locked (and there are no waiting write lock attempts), other read locks are + * acquired immediately, but attempts to acquire write locks wait for all read locks to be releasaed</li> + * <li>Write locks have priority over read locks: if the lock is read-locked, and there is a waiting write lock attempt, read lock + * attempts will queue until all write lock attempts are satisfied and released</li> + * <li>Lock holder is not bound to any thread; instead, a lock holder gets a stamp that can be used to release the lock</li> + * </ul> + * + * <p>This implementation is naive because it implies that time-to-hold the locks can be pretty long and there will be no + * high contension on the acquiring side; this simplifies the implementation.</p> + */ +public class NaiveAsyncReadWriteLock { + /** Executor in which the waiting lock attempts' futures are completed. */ + private final Executor futureCompletionExecutor; + + /** Used to manage the lock state (including issuing and using stamps). */ + private final StampedLock stampedLock = new StampedLock(); + + /** Used to linearize access to waiters collections. */ + private final Object mutex = new Object(); + + /** Queue of futures waiting for write lock to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> writeLockWaiters = new ArrayDeque<>(); + + /** Queue of futures waiting for read locks to be acquired; served in the order of appearance. */ + private final Queue<CompletableFuture<Long>> readLockWaiters = new ArrayDeque<>(); + + public NaiveAsyncReadWriteLock(Executor futureCompletionExecutor) { + this.futureCompletionExecutor = futureCompletionExecutor; + } + + /** + * Attempts to acquire the write lock. + * + * @return Future completed with the stamp of the acquired lock; completed when the lock is acquired. + */ + public CompletableFuture<Long> writeLock() { + synchronized (mutex) { + long stamp = stampedLock.tryWriteLock(); + if (stamp != 0) { + return completedFuture(stamp); + } + + CompletableFuture<Long> lockFuture = new CompletableFuture<>(); + + writeLockWaiters.add(lockFuture); + + return lockFuture; + } + } + + /** + * Unlocks write lock previously obtained via {@link #writeLock()}. + * + * @param stamp Stamp returned via write lock future. + */ + public void unlockWrite(long stamp) { + synchronized (mutex) { + stampedLock.unlockWrite(stamp); + + CompletableFuture<Long> writeLockWaiter = writeLockWaiters.poll(); + + if (writeLockWaiter != null) { + // Someone is waiting for a write lock, satisfy the request. + long newWriteStamp = stampedLock.tryWriteLock(); + assert newWriteStamp != 0; + + futureCompletionExecutor.execute(() -> writeLockWaiter.complete(newWriteStamp)); + } else { + // Someone might be waiting for read locks. + satisfyReadLockWaiters(); + } + } + } + + private void satisfyReadLockWaiters() { + Long2ObjectMap<CompletableFuture<Long>> readLockWaitersMap = null; + + for (CompletableFuture<Long> readLockWaiter : readLockWaiters) { + if (readLockWaitersMap == null) { + readLockWaitersMap = new Long2ObjectAVLTreeMap<>(); Review Comment: Also, why is a map needed at all? Why can't we satisfy a waiter immediately? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/NaiveAsyncReadWriteLock.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator; + +import static java.util.concurrent.CompletableFuture.completedFuture; + +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap.Entry; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.StampedLock; + +/** + * Asynchronous analogue of read-write lock. It has the following properties: + * + * <ul> + * <li>Write lock is exclusive; if the lock is write-locked, other attempts to acquire any lock waits for the write lock to be released + * </li> + * <li>Read lock is non-exclusive: if the lock is read-locked (and there are no waiting write lock attempts), other read locks are + * acquired immediately, but attempts to acquire write locks wait for all read locks to be releasaed</li> + * <li>Write locks have priority over read locks: if the lock is read-locked, and there is a waiting write lock attempt, read lock + * attempts will queue until all write lock attempts are satisfied and released</li> + * <li>Lock holder is not bound to any thread; instead, a lock holder gets a stamp that can be used to release the lock</li> + * </ul> + * + * <p>This implementation is naive because it implies that time-to-hold the locks can be pretty long and there will be no Review Comment: `time-to-hold` - why did you write it this way? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org