This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 37ef88f62d87d2c5220fa701f7f075bff0649a6b Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Sat Mar 15 13:30:06 2025 +0000 Switch to plain Thread for AccordExecutor, and improve testing for ASYNC mode patch by Benedict; reviewed by David Capwell for CASSANDRA-20521 --- modules/accord | 2 +- .../accord/AccordExecutorAbstractLockLoop.java | 116 ++++++++------ .../AccordExecutorAbstractSemiSyncSubmit.java | 11 +- .../service/accord/AccordExecutorAsyncSubmit.java | 18 +-- .../accord/AccordExecutorInfiniteLoops.java | 101 ------------- .../service/accord/AccordExecutorLoops.java | 87 +++++++++++ .../accord/AccordExecutorSemiSyncSubmit.java | 18 +-- .../service/accord/AccordExecutorSyncSubmit.java | 30 ++-- .../cassandra/service/accord/AccordTask.java | 5 +- .../utils/concurrent/LockWithAsyncSignal.java | 12 +- .../utils/concurrent/LockWithAsyncSignalTest.java | 167 +++++++++++++++++++++ 11 files changed, 362 insertions(+), 205 deletions(-) diff --git a/modules/accord b/modules/accord index 0a5446a365..1192d253f3 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 0a5446a365fe7eb1a15b6a2d0d72f9475c51bc47 +Subproject commit 1192d253f36d072b056f1d16e292bdf202018758 diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java index c92dccd0f3..8ef72c95b0 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java +++ b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java @@ -23,23 +23,23 @@ import java.util.concurrent.locks.Lock; import accord.api.Agent; import accord.utils.QuadFunction; import accord.utils.QuintConsumer; -import org.apache.cassandra.concurrent.Interruptible; import org.apache.cassandra.metrics.AccordCacheMetrics; import org.apache.cassandra.utils.concurrent.ConcurrentLinkedStack; -import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL; import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK; abstract class AccordExecutorAbstractLockLoop extends AccordExecutor { final ConcurrentLinkedStack<Object> submitted = new ConcurrentLinkedStack<>(); boolean isHeldByExecutor; + boolean shutdown; AccordExecutorAbstractLockLoop(Lock lock, int executorId, AccordCacheMetrics metrics, ExecutorFunctionFactory loadExecutor, ExecutorFunctionFactory saveExecutor, ExecutorFunctionFactory rangeLoadExecutor, Agent agent) { super(lock, executorId, metrics, loadExecutor, saveExecutor, rangeLoadExecutor, agent); } + abstract void notifyWork(); abstract void notifyWorkExclusive(); abstract void awaitExclusive() throws InterruptedException; abstract boolean isInLoop(); @@ -129,67 +129,73 @@ abstract class AccordExecutorAbstractLockLoop extends AccordExecutor ++running; } - Interruptible.Task task(Mode mode) + Runnable task(Mode mode) { return mode == RUN_WITH_LOCK ? this::runWithLock : this::runWithoutLock; } - protected void runWithLock(Interruptible.State state) throws InterruptedException + protected void runWithLock() { - lock.lockInterruptibly(); - try + while (true) { - resumeExclusive(); - enterLockExclusive(); - while (true) + lock.lock(); + try { - Task task = pollWaitingToRunExclusive(); - - if (task != null) + resumeExclusive(); + enterLockExclusive(); + while (true) { - --tasks; - try - { - task.preRunExclusive(); - task.run(); - } - catch (Throwable t) - { - task.fail(t); - } - finally + Task task = pollWaitingToRunExclusive(); + + if (task != null) { - task.cleanupExclusive(); + --tasks; + try + { + task.preRunExclusive(); + task.run(); + } + catch (Throwable t) + { + task.fail(t); + } + finally + { + task.cleanupExclusive(); + } } - } - else - { - if (state != NORMAL) + else { + if (shutdown) + { + pauseExclusive(); + exitLockExclusive(); + notifyWorkExclusive(); // always notify on shutdown + return; + } + pauseExclusive(); - exitLockExclusive(); - return; + awaitExclusive(); + resumeExclusive(); } - - pauseExclusive(); - awaitExclusive(); - resumeExclusive(); } } - } - catch (Throwable t) - { - pauseExclusive(); - exitLockExclusive(); - throw t; - } - finally - { - lock.unlock(); + catch (Throwable t) + { + pauseExclusive(); + exitLockExclusive(); + + try { agent.onUncaughtException(t); } + catch (Throwable t2) { } + } + finally + { + lock.unlock(); + } } } - protected void runWithoutLock(Interruptible.State state) throws InterruptedException + protected void runWithoutLock() { Task task = null; while (true) @@ -210,9 +216,10 @@ abstract class AccordExecutorAbstractLockLoop extends AccordExecutor break; } - if (state != NORMAL) + if (shutdown) { exitLockExclusive(); + notifyWorkExclusive(); return; } @@ -233,11 +240,12 @@ abstract class AccordExecutorAbstractLockLoop extends AccordExecutor catch (Throwable t2) { t.addSuppressed(t2); } try { agent.onUncaughtException(t); } catch (Throwable t2) { /* nothing we can sensibly do after already reporting */ } + task = null; } if (isHeldByExecutor) pauseExclusive(); exitLockExclusive(); - throw t; + continue; } finally { @@ -266,4 +274,18 @@ abstract class AccordExecutorAbstractLockLoop extends AccordExecutor } } } + + @Override + public void shutdown() + { + shutdown = true; + notifyWork(); + } + + @Override + public Object shutdownNow() + { + shutdown(); + return null; + } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractSemiSyncSubmit.java b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractSemiSyncSubmit.java index a216e60ae8..42cde814cf 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractSemiSyncSubmit.java +++ b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractSemiSyncSubmit.java @@ -23,11 +23,8 @@ import java.util.concurrent.locks.Lock; import accord.api.Agent; import accord.utils.QuadFunction; import accord.utils.QuintConsumer; -import org.apache.cassandra.concurrent.Interruptible; import org.apache.cassandra.metrics.AccordCacheMetrics; -import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK; - abstract class AccordExecutorAbstractSemiSyncSubmit extends AccordExecutorAbstractLockLoop { AccordExecutorAbstractSemiSyncSubmit(Lock lock, int executorId, AccordCacheMetrics metrics, ExecutorFunctionFactory loadExecutor, ExecutorFunctionFactory saveExecutor, ExecutorFunctionFactory rangeLoadExecutor, Agent agent) @@ -35,20 +32,14 @@ abstract class AccordExecutorAbstractSemiSyncSubmit extends AccordExecutorAbstra super(lock, executorId, metrics, loadExecutor, saveExecutor, rangeLoadExecutor, agent); } - abstract void notifyWorkAsync(); abstract void awaitExclusive() throws InterruptedException; - Interruptible.Task task(Mode mode) - { - return mode == RUN_WITH_LOCK ? this::runWithLock : this::runWithoutLock; - } - <P1s, P1a, P2, P3, P4> void submitExternal(QuintConsumer<AccordExecutor, P1s, P2, P3, P4> sync, QuadFunction<P1a, P2, P3, P4, Object> async, P1s p1s, P1a p1a, P2 p2, P3 p3, P4 p4) { if (!lock.tryLock()) { submitted.push(async.apply(p1a, p2, p3, p4)); - notifyWorkAsync(); + notifyWork(); return; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutorAsyncSubmit.java b/src/java/org/apache/cassandra/service/accord/AccordExecutorAsyncSubmit.java index c0f0c5c06a..7510e6159a 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordExecutorAsyncSubmit.java +++ b/src/java/org/apache/cassandra/service/accord/AccordExecutorAsyncSubmit.java @@ -28,7 +28,7 @@ import org.apache.cassandra.utils.concurrent.LockWithAsyncSignal; // WARNING: experimental - needs more testing class AccordExecutorAsyncSubmit extends AccordExecutorAbstractSemiSyncSubmit { - private final AccordExecutorInfiniteLoops loops; + private final AccordExecutorLoops loops; private final LockWithAsyncSignal lock; public AccordExecutorAsyncSubmit(int executorId, Mode mode, int threads, IntFunction<String> name, AccordCacheMetrics metrics, ExecutorFunctionFactory loadExecutor, ExecutorFunctionFactory saveExecutor, ExecutorFunctionFactory rangeLoadExecutor, Agent agent) @@ -40,7 +40,7 @@ class AccordExecutorAsyncSubmit extends AccordExecutorAbstractSemiSyncSubmit { super(lock, executorId, metrics, loadExecutor, saveExecutor, rangeLoadExecutor, agent); this.lock = lock; - this.loops = new AccordExecutorInfiniteLoops(mode, threads, name, this::task); + this.loops = new AccordExecutorLoops(mode, threads, name, this::task); } @Override @@ -58,7 +58,7 @@ class AccordExecutorAsyncSubmit extends AccordExecutorAbstractSemiSyncSubmit } @Override - void notifyWorkAsync() + void notifyWork() { lock.signal(); } @@ -75,18 +75,6 @@ class AccordExecutorAsyncSubmit extends AccordExecutorAbstractSemiSyncSubmit return lock.isOwner(Thread.currentThread()); } - @Override - public void shutdown() - { - loops.shutdown(); - } - - @Override - public Object shutdownNow() - { - return loops.shutdownNow(); - } - @Override public boolean isTerminated() { diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutorInfiniteLoops.java b/src/java/org/apache/cassandra/service/accord/AccordExecutorInfiniteLoops.java deleted file mode 100644 index 405479bb9f..0000000000 --- a/src/java/org/apache/cassandra/service/accord/AccordExecutorInfiniteLoops.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.service.accord; - -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.IntFunction; - -import accord.utils.Invariants; -import org.agrona.collections.LongHashSet; -import org.apache.cassandra.concurrent.InfiniteLoopExecutor; -import org.apache.cassandra.concurrent.Interruptible; -import org.apache.cassandra.concurrent.Shutdownable; -import org.apache.cassandra.service.accord.AccordExecutor.Mode; - -import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; -import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.NON_DAEMON; -import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED; -import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE; -import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK; -import static org.apache.cassandra.utils.Clock.Global.nanoTime; - -class AccordExecutorInfiniteLoops implements Shutdownable -{ - private final Interruptible[] loops; - private final LongHashSet threadIds; - - public AccordExecutorInfiniteLoops(Mode mode, int threads, IntFunction<String> name, Function<Mode, Interruptible.Task> tasks) - { - Invariants.require(mode == RUN_WITH_LOCK ? threads == 1 : threads >= 1); - final LongHashSet threadIds = new LongHashSet(threads, 0.5f); - this.loops = new Interruptible[threads]; - for (int i = 0; i < threads; ++i) - { - loops[i] = executorFactory().infiniteLoop(name.apply(i), tasks.apply(mode), SAFE, NON_DAEMON, UNSYNCHRONIZED); - if (loops[i] instanceof InfiniteLoopExecutor) - threadIds.add(((InfiniteLoopExecutor) loops[i]).threadId()); - } - this.threadIds = threadIds; - } - - public boolean isInLoop() - { - return threadIds.contains(Thread.currentThread().getId()); - } - - @Override - public void shutdown() - { - for (Interruptible loop : loops) - loop.shutdown(); - } - - @Override - public Object shutdownNow() - { - for (Interruptible loop : loops) - loop.shutdownNow(); - return null; - } - - @Override - public boolean isTerminated() - { - for (Interruptible loop : loops) - { - if (!loop.isTerminated()) - return false; - } - return true; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException - { - long deadline = nanoTime() + unit.toNanos(timeout); - for (Interruptible loop : loops) - { - long wait = deadline - nanoTime(); - if (!loop.awaitTermination(wait, TimeUnit.NANOSECONDS)) - return false; - } - return true; - } -} diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutorLoops.java b/src/java/org/apache/cassandra/service/accord/AccordExecutorLoops.java new file mode 100644 index 0000000000..0e8aab8c8a --- /dev/null +++ b/src/java/org/apache/cassandra/service/accord/AccordExecutorLoops.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.IntFunction; + +import accord.utils.Invariants; +import io.netty.util.collection.LongObjectHashMap; +import org.apache.cassandra.service.accord.AccordExecutor.Mode; +import org.apache.cassandra.utils.concurrent.Condition; + +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; +import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; + +class AccordExecutorLoops +{ + private final LongObjectHashMap<Thread> loops; + + private final AtomicInteger running = new AtomicInteger(); + private final Condition terminated = Condition.newOneTimeCondition(); + + public AccordExecutorLoops(Mode mode, int threads, IntFunction<String> name, Function<Mode, Runnable> loopFactory) + { + Invariants.require(mode == RUN_WITH_LOCK ? threads == 1 : threads >= 1); + running.addAndGet(threads); + loops = new LongObjectHashMap<>(threads); + for (int i = 0; i < threads; ++i) + { + Thread thread = executorFactory().startThread(name.apply(i), wrap(loopFactory.apply(mode))); + Thread conflict = loops.putIfAbsent(thread.getId(), thread); + Invariants.require(conflict == null || !conflict.isAlive(), "Allocated two threads with the same threadId!"); + } + } + + private Runnable wrap(Runnable run) + { + return () -> + { + try + { + run.run(); + } + finally + { + if (0 == running.decrementAndGet()) + terminated.signalAll(); + } + }; + } + + public boolean isInLoop() + { + Thread thread = Thread.currentThread(); + return loops.get(thread.getId()) == thread; + } + + public boolean isTerminated() + { + return terminated.isSignalled(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException + { + long deadline = nanoTime() + unit.toNanos(timeout); + return terminated.awaitUntil(deadline); + } +} diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutorSemiSyncSubmit.java b/src/java/org/apache/cassandra/service/accord/AccordExecutorSemiSyncSubmit.java index 3272d52971..0e26e38537 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordExecutorSemiSyncSubmit.java +++ b/src/java/org/apache/cassandra/service/accord/AccordExecutorSemiSyncSubmit.java @@ -29,7 +29,7 @@ import org.apache.cassandra.metrics.AccordCacheMetrics; // WARNING: experimental - needs more testing class AccordExecutorSemiSyncSubmit extends AccordExecutorAbstractSemiSyncSubmit { - private final AccordExecutorInfiniteLoops loops; + private final AccordExecutorLoops loops; private final ReentrantLock lock; private final Condition hasWork; @@ -43,7 +43,7 @@ class AccordExecutorSemiSyncSubmit extends AccordExecutorAbstractSemiSyncSubmit super(lock, executorId, metrics, loadExecutor, saveExecutor, rangeLoadExecutor, agent); this.lock = lock; this.hasWork = lock.newCondition(); - this.loops = new AccordExecutorInfiniteLoops(mode, threads, name, this::task); + this.loops = new AccordExecutorLoops(mode, threads, name, this::task); } @Override @@ -60,7 +60,7 @@ class AccordExecutorSemiSyncSubmit extends AccordExecutorAbstractSemiSyncSubmit } @Override - void notifyWorkAsync() + void notifyWork() { // we check running both sides of tryLock for ordering guarantees boolean hadRunning = isHeldByExecutor; @@ -89,18 +89,6 @@ class AccordExecutorSemiSyncSubmit extends AccordExecutorAbstractSemiSyncSubmit return lock.isHeldByCurrentThread(); } - @Override - public void shutdown() - { - loops.shutdown(); - } - - @Override - public Object shutdownNow() - { - return loops.shutdownNow(); - } - @Override public boolean isTerminated() { diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutorSyncSubmit.java b/src/java/org/apache/cassandra/service/accord/AccordExecutorSyncSubmit.java index 2ae7d36ad5..5fb5d7895b 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordExecutorSyncSubmit.java +++ b/src/java/org/apache/cassandra/service/accord/AccordExecutorSyncSubmit.java @@ -31,7 +31,7 @@ import org.apache.cassandra.metrics.AccordCacheMetrics; class AccordExecutorSyncSubmit extends AccordExecutorAbstractLockLoop { - private final AccordExecutorInfiniteLoops loops; + private final AccordExecutorLoops loops; private final ReentrantLock lock; private final Condition hasWork; @@ -55,7 +55,7 @@ class AccordExecutorSyncSubmit extends AccordExecutorAbstractLockLoop super(lock, executorId, metrics, loadExecutor, saveExecutor, rangeLoadExecutor, agent); this.lock = lock; this.hasWork = lock.newCondition(); - this.loops = new AccordExecutorInfiniteLoops(mode, threads, name, this::task); + this.loops = new AccordExecutorLoops(mode, threads, name, this::task); } @Override @@ -77,17 +77,12 @@ class AccordExecutorSyncSubmit extends AccordExecutorAbstractLockLoop } @Override - void notifyWorkExclusive() - { - hasWork.signal(); - } - - <P1s, P1a, P2, P3, P4> void submitExternal(QuintConsumer<AccordExecutor, P1s, P2, P3, P4> sync, QuadFunction<P1a, P2, P3, P4, Object> async, P1s p1s, P1a p1a, P2 p2, P3 p3, P4 p4) + void notifyWork() { lock.lock(); try { - submitExternalExclusive(sync, async, p1s, p1a, p2, p3, p4); + hasWork.signal(); } finally { @@ -96,15 +91,22 @@ class AccordExecutorSyncSubmit extends AccordExecutorAbstractLockLoop } @Override - public void shutdown() + void notifyWorkExclusive() { - loops.shutdown(); + hasWork.signal(); } - @Override - public Object shutdownNow() + <P1s, P1a, P2, P3, P4> void submitExternal(QuintConsumer<AccordExecutor, P1s, P2, P3, P4> sync, QuadFunction<P1a, P2, P3, P4, Object> async, P1s p1s, P1a p1a, P2 p2, P3 p3, P4 p4) { - return loops.shutdownNow(); + lock.lock(); + try + { + submitExternalExclusive(sync, async, p1s, p1a, p2, p3, p4); + } + finally + { + lock.unlock(); + } } @Override diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java b/src/java/org/apache/cassandra/service/accord/AccordTask.java index 38bf137f10..61d86db9bf 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordTask.java +++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -61,6 +62,7 @@ import org.apache.cassandra.service.accord.AccordExecutor.Task; import org.apache.cassandra.service.accord.AccordExecutor.TaskQueue; import org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeyAccessor; import org.apache.cassandra.service.accord.api.TokenKey; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.concurrent.Condition; @@ -190,6 +192,7 @@ public abstract class AccordTask<R> extends Task implements Runnable, Function<S private State state = INITIALIZED; private final PreLoadContext preLoadContext; private final String loggingId; + private static final AtomicLong nextLoggingId = new AtomicLong(Clock.Global.currentTimeMillis()); // TODO (expected): merge all of these maps into one @Nullable Object2ObjectHashMap<TxnId, AccordSafeCommand> commands; @@ -221,7 +224,7 @@ public abstract class AccordTask<R> extends Task implements Runnable, Function<S public AccordTask(AccordCommandStore commandStore, PreLoadContext preLoadContext) { super(commandStore); - this.loggingId = "0x" + Integer.toHexString(System.identityHashCode(this)); + this.loggingId = "0x" + Long.toHexString(nextLoggingId.incrementAndGet()); this.preLoadContext = preLoadContext; if (logger.isTraceEnabled()) diff --git a/src/java/org/apache/cassandra/utils/concurrent/LockWithAsyncSignal.java b/src/java/org/apache/cassandra/utils/concurrent/LockWithAsyncSignal.java index 87a06ea6ed..2f358d9385 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/LockWithAsyncSignal.java +++ b/src/java/org/apache/cassandra/utils/concurrent/LockWithAsyncSignal.java @@ -104,6 +104,16 @@ public class LockWithAsyncSignal implements Lock } public void await() throws InterruptedException + { + await(LockWithAsyncSignal::awaitDeferThrow); + } + + public void awaitUninterruptibly() + { + await(LockWithAsyncSignal::awaitUninterruptibly); + } + + private <T extends Throwable> void await(AwaitFunction<T> await) throws T { Thread thread = Thread.currentThread(); int restoreDepth = depth; @@ -112,7 +122,7 @@ public class LockWithAsyncSignal implements Lock depth = 0; owner = null; - awaitLock(true, thread, restoreDepth, LockWithAsyncSignal::awaitDeferThrow); + awaitLock(true, thread, restoreDepth, await); } public void unlock() diff --git a/test/unit/org/apache/cassandra/utils/concurrent/LockWithAsyncSignalTest.java b/test/unit/org/apache/cassandra/utils/concurrent/LockWithAsyncSignalTest.java new file mode 100644 index 0000000000..d9117454f8 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/concurrent/LockWithAsyncSignalTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.concurrent; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Random; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + +import org.junit.Test; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.utils.EstimatedHistogram; +import org.apache.cassandra.utils.FBUtilities; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; + +public class LockWithAsyncSignalTest +{ + @Test + public void test() + { + ExecutorPlus submitters = executorFactory().pooled("test-submitters", 16); + ExecutorPlus consumers = executorFactory().pooled("test-consumers", 16); + for (int i = 0 ; i < 10 ; ++i) + testOne(submitters, consumers, 4, 8, i); + } + + private static void testOne(ExecutorPlus submitterExecutor, ExecutorPlus consumerExecutor, int submitterCount, int consumerCount, int seconds) + { + class Waiting extends AtomicBoolean implements Comparable<Waiting> + { + final long ticket; + + Waiting(long ticket) + { + this.ticket = ticket; + } + + @Override + public int compareTo(Waiting that) + { + return Long.compare(this.ticket, that.ticket); + } + } + final LockWithAsyncSignal lock = new LockWithAsyncSignal(); + final List<Future<Object>> submitters = new ArrayList<>(); + final List<Future<Object>> consumers = new ArrayList<>(); + final AtomicBoolean submittersRunning = new AtomicBoolean(true); + final AtomicBoolean consumersRunning = new AtomicBoolean(true); + final ConcurrentSkipListMap<Waiting, Boolean> waiting = new ConcurrentSkipListMap<>(); + final AtomicLong nextTicket = new AtomicLong(); + final EstimatedHistogram latency = new EstimatedHistogram(); + for (int i = 0; i < submitterCount + consumerCount ; ++i) + { + boolean submitter = i/2 >= consumerCount || ((i & 1) == 0 && i/2 < submitterCount); + if (submitter) + { + submitters.add(submitterExecutor.submit(() -> { + final Random rnd = new Random(); + while (submittersRunning.get()) + { + LockSupport.parkNanos(TimeUnit.MICROSECONDS.toNanos(rnd.nextInt(100))); + long start = System.nanoTime(); + Waiting awaiting = new Waiting(nextTicket.incrementAndGet()); + waiting.put(awaiting, true); + lock.signal(); + while (!awaiting.get()); + long end = System.nanoTime(); + latency.add(NANOSECONDS.toMicros(end - start)); + } + return null; + })); + } + else + { + consumers.add(consumerExecutor.submit(() -> { + final Random rnd = new Random(); + while (true) + { + if (rnd.nextBoolean()) lock.lock(); + else if (!lock.tryLock()) continue; + + Waiting waitUntil; + try + { + AtomicBoolean awaiting; + while (null != (awaiting = pollFirst(waiting))) + awaiting.set(true); + + lock.await(); + if (null != (awaiting = pollFirst(waiting))) + awaiting.set(true); + + if (!consumersRunning.get()) + { + lock.signal(); + return null; + } + + waitUntil = peekLast(waiting); + if (!waiting.isEmpty()) + lock.signal(); + } + finally + { + lock.unlock(); + } + + if (waitUntil != null) + { + while (!waitUntil.get()); + } + } + })); + } + } + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(seconds); + while (true) + { + long wait = deadline - System.nanoTime(); + if (wait < 0) + break; + LockSupport.parkNanos(wait); + } + submittersRunning.set(false); + lock.signal(); + FBUtilities.waitOnFutures(submitters, 2L, TimeUnit.SECONDS); + consumersRunning.set(false); + FBUtilities.waitOnFutures(consumers, 2L, TimeUnit.SECONDS); + } + + private static <T> T pollFirst(NavigableMap<T, ?> map) + { + Map.Entry<T, ?> e = map.pollFirstEntry(); + return e == null ? null : e.getKey(); + } + + private static <T> T peekLast(NavigableMap<T, ?> map) + { + Map.Entry<T, ?> e = map.lastEntry(); + return e == null ? null : e.getKey(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org