This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 37ef88f62d87d2c5220fa701f7f075bff0649a6b
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Sat Mar 15 13:30:06 2025 +0000

    Switch to plain Thread for AccordExecutor, and improve testing for ASYNC 
mode
    
    patch by Benedict; reviewed by David Capwell for CASSANDRA-20521
---
 modules/accord                                     |   2 +-
 .../accord/AccordExecutorAbstractLockLoop.java     | 116 ++++++++------
 .../AccordExecutorAbstractSemiSyncSubmit.java      |  11 +-
 .../service/accord/AccordExecutorAsyncSubmit.java  |  18 +--
 .../accord/AccordExecutorInfiniteLoops.java        | 101 -------------
 .../service/accord/AccordExecutorLoops.java        |  87 +++++++++++
 .../accord/AccordExecutorSemiSyncSubmit.java       |  18 +--
 .../service/accord/AccordExecutorSyncSubmit.java   |  30 ++--
 .../cassandra/service/accord/AccordTask.java       |   5 +-
 .../utils/concurrent/LockWithAsyncSignal.java      |  12 +-
 .../utils/concurrent/LockWithAsyncSignalTest.java  | 167 +++++++++++++++++++++
 11 files changed, 362 insertions(+), 205 deletions(-)

diff --git a/modules/accord b/modules/accord
index 0a5446a365..1192d253f3 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 0a5446a365fe7eb1a15b6a2d0d72f9475c51bc47
+Subproject commit 1192d253f36d072b056f1d16e292bdf202018758
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
index c92dccd0f3..8ef72c95b0 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java
@@ -23,23 +23,23 @@ import java.util.concurrent.locks.Lock;
 import accord.api.Agent;
 import accord.utils.QuadFunction;
 import accord.utils.QuintConsumer;
-import org.apache.cassandra.concurrent.Interruptible;
 import org.apache.cassandra.metrics.AccordCacheMetrics;
 import org.apache.cassandra.utils.concurrent.ConcurrentLinkedStack;
 
-import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
 import static 
org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK;
 
 abstract class AccordExecutorAbstractLockLoop extends AccordExecutor
 {
     final ConcurrentLinkedStack<Object> submitted = new 
ConcurrentLinkedStack<>();
     boolean isHeldByExecutor;
+    boolean shutdown;
 
     AccordExecutorAbstractLockLoop(Lock lock, int executorId, 
AccordCacheMetrics metrics, ExecutorFunctionFactory loadExecutor, 
ExecutorFunctionFactory saveExecutor, ExecutorFunctionFactory 
rangeLoadExecutor, Agent agent)
     {
         super(lock, executorId, metrics, loadExecutor, saveExecutor, 
rangeLoadExecutor, agent);
     }
 
+    abstract void notifyWork();
     abstract void notifyWorkExclusive();
     abstract void awaitExclusive() throws InterruptedException;
     abstract boolean isInLoop();
@@ -129,67 +129,73 @@ abstract class AccordExecutorAbstractLockLoop extends 
AccordExecutor
         ++running;
     }
 
-    Interruptible.Task task(Mode mode)
+    Runnable task(Mode mode)
     {
         return mode == RUN_WITH_LOCK ? this::runWithLock : 
this::runWithoutLock;
     }
 
-    protected void runWithLock(Interruptible.State state) throws 
InterruptedException
+    protected void runWithLock()
     {
-        lock.lockInterruptibly();
-        try
+        while (true)
         {
-            resumeExclusive();
-            enterLockExclusive();
-            while (true)
+            lock.lock();
+            try
             {
-                Task task = pollWaitingToRunExclusive();
-
-                if (task != null)
+                resumeExclusive();
+                enterLockExclusive();
+                while (true)
                 {
-                    --tasks;
-                    try
-                    {
-                        task.preRunExclusive();
-                        task.run();
-                    }
-                    catch (Throwable t)
-                    {
-                        task.fail(t);
-                    }
-                    finally
+                    Task task = pollWaitingToRunExclusive();
+
+                    if (task != null)
                     {
-                        task.cleanupExclusive();
+                        --tasks;
+                        try
+                        {
+                            task.preRunExclusive();
+                            task.run();
+                        }
+                        catch (Throwable t)
+                        {
+                            task.fail(t);
+                        }
+                        finally
+                        {
+                            task.cleanupExclusive();
+                        }
                     }
-                }
-                else
-                {
-                    if (state != NORMAL)
+                    else
                     {
+                        if (shutdown)
+                        {
+                            pauseExclusive();
+                            exitLockExclusive();
+                            notifyWorkExclusive(); // always notify on shutdown
+                            return;
+                        }
+
                         pauseExclusive();
-                        exitLockExclusive();
-                        return;
+                        awaitExclusive();
+                        resumeExclusive();
                     }
-
-                    pauseExclusive();
-                    awaitExclusive();
-                    resumeExclusive();
                 }
             }
-        }
-        catch (Throwable t)
-        {
-            pauseExclusive();
-            exitLockExclusive();
-            throw t;
-        }
-        finally
-        {
-            lock.unlock();
+            catch (Throwable t)
+            {
+                pauseExclusive();
+                exitLockExclusive();
+
+                try { agent.onUncaughtException(t); }
+                catch (Throwable t2) { }
+            }
+            finally
+            {
+                lock.unlock();
+            }
         }
     }
 
-    protected void runWithoutLock(Interruptible.State state) throws 
InterruptedException
+    protected void runWithoutLock()
     {
         Task task = null;
         while (true)
@@ -210,9 +216,10 @@ abstract class AccordExecutorAbstractLockLoop extends 
AccordExecutor
                         break;
                     }
 
-                    if (state != NORMAL)
+                    if (shutdown)
                     {
                         exitLockExclusive();
+                        notifyWorkExclusive();
                         return;
                     }
 
@@ -233,11 +240,12 @@ abstract class AccordExecutorAbstractLockLoop extends 
AccordExecutor
                     catch (Throwable t2) { t.addSuppressed(t2); }
                     try { agent.onUncaughtException(t); }
                     catch (Throwable t2) { /* nothing we can sensibly do after 
already reporting */ }
+                    task = null;
                 }
                 if (isHeldByExecutor)
                     pauseExclusive();
                 exitLockExclusive();
-                throw t;
+                continue;
             }
             finally
             {
@@ -266,4 +274,18 @@ abstract class AccordExecutorAbstractLockLoop extends 
AccordExecutor
             }
         }
     }
+
+    @Override
+    public void shutdown()
+    {
+        shutdown = true;
+        notifyWork();
+    }
+
+    @Override
+    public Object shutdownNow()
+    {
+        shutdown();
+        return null;
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractSemiSyncSubmit.java
 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractSemiSyncSubmit.java
index a216e60ae8..42cde814cf 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractSemiSyncSubmit.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractSemiSyncSubmit.java
@@ -23,11 +23,8 @@ import java.util.concurrent.locks.Lock;
 import accord.api.Agent;
 import accord.utils.QuadFunction;
 import accord.utils.QuintConsumer;
-import org.apache.cassandra.concurrent.Interruptible;
 import org.apache.cassandra.metrics.AccordCacheMetrics;
 
-import static 
org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK;
-
 abstract class AccordExecutorAbstractSemiSyncSubmit extends 
AccordExecutorAbstractLockLoop
 {
     AccordExecutorAbstractSemiSyncSubmit(Lock lock, int executorId, 
AccordCacheMetrics metrics, ExecutorFunctionFactory loadExecutor, 
ExecutorFunctionFactory saveExecutor, ExecutorFunctionFactory 
rangeLoadExecutor, Agent agent)
@@ -35,20 +32,14 @@ abstract class AccordExecutorAbstractSemiSyncSubmit extends 
AccordExecutorAbstra
         super(lock, executorId, metrics, loadExecutor, saveExecutor, 
rangeLoadExecutor, agent);
     }
 
-    abstract void notifyWorkAsync();
     abstract void awaitExclusive() throws InterruptedException;
 
-    Interruptible.Task task(Mode mode)
-    {
-        return mode == RUN_WITH_LOCK ? this::runWithLock : 
this::runWithoutLock;
-    }
-
     <P1s, P1a, P2, P3, P4> void submitExternal(QuintConsumer<AccordExecutor, 
P1s, P2, P3, P4> sync, QuadFunction<P1a, P2, P3, P4, Object> async, P1s p1s, 
P1a p1a, P2 p2, P3 p3, P4 p4)
     {
         if (!lock.tryLock())
         {
             submitted.push(async.apply(p1a, p2, p3, p4));
-            notifyWorkAsync();
+            notifyWork();
             return;
         }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAsyncSubmit.java 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAsyncSubmit.java
index c0f0c5c06a..7510e6159a 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorAsyncSubmit.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorAsyncSubmit.java
@@ -28,7 +28,7 @@ import 
org.apache.cassandra.utils.concurrent.LockWithAsyncSignal;
 // WARNING: experimental - needs more testing
 class AccordExecutorAsyncSubmit extends AccordExecutorAbstractSemiSyncSubmit
 {
-    private final AccordExecutorInfiniteLoops loops;
+    private final AccordExecutorLoops loops;
     private final LockWithAsyncSignal lock;
 
     public AccordExecutorAsyncSubmit(int executorId, Mode mode, int threads, 
IntFunction<String> name, AccordCacheMetrics metrics, ExecutorFunctionFactory 
loadExecutor, ExecutorFunctionFactory saveExecutor, ExecutorFunctionFactory 
rangeLoadExecutor, Agent agent)
@@ -40,7 +40,7 @@ class AccordExecutorAsyncSubmit extends 
AccordExecutorAbstractSemiSyncSubmit
     {
         super(lock, executorId, metrics, loadExecutor, saveExecutor, 
rangeLoadExecutor, agent);
         this.lock = lock;
-        this.loops = new AccordExecutorInfiniteLoops(mode, threads, name, 
this::task);
+        this.loops = new AccordExecutorLoops(mode, threads, name, this::task);
     }
 
     @Override
@@ -58,7 +58,7 @@ class AccordExecutorAsyncSubmit extends 
AccordExecutorAbstractSemiSyncSubmit
     }
 
     @Override
-    void notifyWorkAsync()
+    void notifyWork()
     {
         lock.signal();
     }
@@ -75,18 +75,6 @@ class AccordExecutorAsyncSubmit extends 
AccordExecutorAbstractSemiSyncSubmit
         return lock.isOwner(Thread.currentThread());
     }
 
-    @Override
-    public void shutdown()
-    {
-        loops.shutdown();
-    }
-
-    @Override
-    public Object shutdownNow()
-    {
-        return loops.shutdownNow();
-    }
-
     @Override
     public boolean isTerminated()
     {
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorInfiniteLoops.java 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorInfiniteLoops.java
deleted file mode 100644
index 405479bb9f..0000000000
--- 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorInfiniteLoops.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord;
-
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.IntFunction;
-
-import accord.utils.Invariants;
-import org.agrona.collections.LongHashSet;
-import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
-import org.apache.cassandra.concurrent.Interruptible;
-import org.apache.cassandra.concurrent.Shutdownable;
-import org.apache.cassandra.service.accord.AccordExecutor.Mode;
-
-import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
-import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.NON_DAEMON;
-import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED;
-import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE;
-import static 
org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK;
-import static org.apache.cassandra.utils.Clock.Global.nanoTime;
-
-class AccordExecutorInfiniteLoops implements Shutdownable
-{
-    private final Interruptible[] loops;
-    private final LongHashSet threadIds;
-
-    public AccordExecutorInfiniteLoops(Mode mode, int threads, 
IntFunction<String> name, Function<Mode, Interruptible.Task> tasks)
-    {
-        Invariants.require(mode == RUN_WITH_LOCK ? threads == 1 : threads >= 
1);
-        final LongHashSet threadIds = new LongHashSet(threads, 0.5f);
-        this.loops = new Interruptible[threads];
-        for (int i = 0; i < threads; ++i)
-        {
-            loops[i] = executorFactory().infiniteLoop(name.apply(i), 
tasks.apply(mode), SAFE, NON_DAEMON, UNSYNCHRONIZED);
-            if (loops[i] instanceof InfiniteLoopExecutor)
-                threadIds.add(((InfiniteLoopExecutor) loops[i]).threadId());
-        }
-        this.threadIds = threadIds;
-    }
-
-    public boolean isInLoop()
-    {
-        return threadIds.contains(Thread.currentThread().getId());
-    }
-
-    @Override
-    public void shutdown()
-    {
-        for (Interruptible loop : loops)
-            loop.shutdown();
-    }
-
-    @Override
-    public Object shutdownNow()
-    {
-        for (Interruptible loop : loops)
-            loop.shutdownNow();
-        return null;
-    }
-
-    @Override
-    public boolean isTerminated()
-    {
-        for (Interruptible loop : loops)
-        {
-            if (!loop.isTerminated())
-                return false;
-        }
-        return true;
-    }
-
-    @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException
-    {
-        long deadline = nanoTime() + unit.toNanos(timeout);
-        for (Interruptible loop : loops)
-        {
-            long wait = deadline - nanoTime();
-            if (!loop.awaitTermination(wait, TimeUnit.NANOSECONDS))
-                return false;
-        }
-        return true;
-    }
-}
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorLoops.java 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorLoops.java
new file mode 100644
index 0000000000..0e8aab8c8a
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutorLoops.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+
+import accord.utils.Invariants;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.cassandra.service.accord.AccordExecutor.Mode;
+import org.apache.cassandra.utils.concurrent.Condition;
+
+import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static 
org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
+class AccordExecutorLoops
+{
+    private final LongObjectHashMap<Thread> loops;
+
+    private final AtomicInteger running = new AtomicInteger();
+    private final Condition terminated = Condition.newOneTimeCondition();
+
+    public AccordExecutorLoops(Mode mode, int threads, IntFunction<String> 
name, Function<Mode, Runnable> loopFactory)
+    {
+        Invariants.require(mode == RUN_WITH_LOCK ? threads == 1 : threads >= 
1);
+        running.addAndGet(threads);
+        loops = new LongObjectHashMap<>(threads);
+        for (int i = 0; i < threads; ++i)
+        {
+            Thread thread = executorFactory().startThread(name.apply(i), 
wrap(loopFactory.apply(mode)));
+            Thread conflict = loops.putIfAbsent(thread.getId(), thread);
+            Invariants.require(conflict == null || !conflict.isAlive(), 
"Allocated two threads with the same threadId!");
+        }
+    }
+
+    private Runnable wrap(Runnable run)
+    {
+        return () ->
+        {
+            try
+            {
+                run.run();
+            }
+            finally
+            {
+                if (0 == running.decrementAndGet())
+                    terminated.signalAll();
+            }
+        };
+    }
+
+    public boolean isInLoop()
+    {
+        Thread thread = Thread.currentThread();
+        return loops.get(thread.getId()) == thread;
+    }
+
+    public boolean isTerminated()
+    {
+        return terminated.isSignalled();
+    }
+
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException
+    {
+        long deadline = nanoTime() + unit.toNanos(timeout);
+        return terminated.awaitUntil(deadline);
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorSemiSyncSubmit.java
 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorSemiSyncSubmit.java
index 3272d52971..0e26e38537 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorSemiSyncSubmit.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorSemiSyncSubmit.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.metrics.AccordCacheMetrics;
 // WARNING: experimental - needs more testing
 class AccordExecutorSemiSyncSubmit extends AccordExecutorAbstractSemiSyncSubmit
 {
-    private final AccordExecutorInfiniteLoops loops;
+    private final AccordExecutorLoops loops;
     private final ReentrantLock lock;
     private final Condition hasWork;
 
@@ -43,7 +43,7 @@ class AccordExecutorSemiSyncSubmit extends 
AccordExecutorAbstractSemiSyncSubmit
         super(lock, executorId, metrics, loadExecutor, saveExecutor, 
rangeLoadExecutor, agent);
         this.lock = lock;
         this.hasWork = lock.newCondition();
-        this.loops = new AccordExecutorInfiniteLoops(mode, threads, name, 
this::task);
+        this.loops = new AccordExecutorLoops(mode, threads, name, this::task);
     }
 
     @Override
@@ -60,7 +60,7 @@ class AccordExecutorSemiSyncSubmit extends 
AccordExecutorAbstractSemiSyncSubmit
     }
 
     @Override
-    void notifyWorkAsync()
+    void notifyWork()
     {
         // we check running both sides of tryLock for ordering guarantees
         boolean hadRunning = isHeldByExecutor;
@@ -89,18 +89,6 @@ class AccordExecutorSemiSyncSubmit extends 
AccordExecutorAbstractSemiSyncSubmit
         return lock.isHeldByCurrentThread();
     }
 
-    @Override
-    public void shutdown()
-    {
-        loops.shutdown();
-    }
-
-    @Override
-    public Object shutdownNow()
-    {
-        return loops.shutdownNow();
-    }
-
     @Override
     public boolean isTerminated()
     {
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordExecutorSyncSubmit.java 
b/src/java/org/apache/cassandra/service/accord/AccordExecutorSyncSubmit.java
index 2ae7d36ad5..5fb5d7895b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutorSyncSubmit.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutorSyncSubmit.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.metrics.AccordCacheMetrics;
 
 class AccordExecutorSyncSubmit extends AccordExecutorAbstractLockLoop
 {
-    private final AccordExecutorInfiniteLoops loops;
+    private final AccordExecutorLoops loops;
     private final ReentrantLock lock;
     private final Condition hasWork;
 
@@ -55,7 +55,7 @@ class AccordExecutorSyncSubmit extends 
AccordExecutorAbstractLockLoop
         super(lock, executorId, metrics, loadExecutor, saveExecutor, 
rangeLoadExecutor, agent);
         this.lock = lock;
         this.hasWork = lock.newCondition();
-        this.loops = new AccordExecutorInfiniteLoops(mode, threads, name, 
this::task);
+        this.loops = new AccordExecutorLoops(mode, threads, name, this::task);
     }
 
     @Override
@@ -77,17 +77,12 @@ class AccordExecutorSyncSubmit extends 
AccordExecutorAbstractLockLoop
     }
 
     @Override
-    void notifyWorkExclusive()
-    {
-        hasWork.signal();
-    }
-
-    <P1s, P1a, P2, P3, P4> void submitExternal(QuintConsumer<AccordExecutor, 
P1s, P2, P3, P4> sync, QuadFunction<P1a, P2, P3, P4, Object> async, P1s p1s, 
P1a p1a, P2 p2, P3 p3, P4 p4)
+    void notifyWork()
     {
         lock.lock();
         try
         {
-            submitExternalExclusive(sync, async, p1s, p1a, p2, p3, p4);
+            hasWork.signal();
         }
         finally
         {
@@ -96,15 +91,22 @@ class AccordExecutorSyncSubmit extends 
AccordExecutorAbstractLockLoop
     }
 
     @Override
-    public void shutdown()
+    void notifyWorkExclusive()
     {
-        loops.shutdown();
+        hasWork.signal();
     }
 
-    @Override
-    public Object shutdownNow()
+    <P1s, P1a, P2, P3, P4> void submitExternal(QuintConsumer<AccordExecutor, 
P1s, P2, P3, P4> sync, QuadFunction<P1a, P2, P3, P4, Object> async, P1s p1s, 
P1a p1a, P2 p2, P3 p3, P4 p4)
     {
-        return loops.shutdownNow();
+        lock.lock();
+        try
+        {
+            submitExternalExclusive(sync, async, p1s, p1a, p2, p3, p4);
+        }
+        finally
+        {
+            lock.unlock();
+        }
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java 
b/src/java/org/apache/cassandra/service/accord/AccordTask.java
index 38bf137f10..61d86db9bf 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTask.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
@@ -61,6 +62,7 @@ import 
org.apache.cassandra.service.accord.AccordExecutor.Task;
 import org.apache.cassandra.service.accord.AccordExecutor.TaskQueue;
 import 
org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeyAccessor;
 import org.apache.cassandra.service.accord.api.TokenKey;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.concurrent.Condition;
 
@@ -190,6 +192,7 @@ public abstract class AccordTask<R> extends Task implements 
Runnable, Function<S
     private State state = INITIALIZED;
     private final PreLoadContext preLoadContext;
     private final String loggingId;
+    private static final AtomicLong nextLoggingId = new 
AtomicLong(Clock.Global.currentTimeMillis());
 
     // TODO (expected): merge all of these maps into one
     @Nullable Object2ObjectHashMap<TxnId, AccordSafeCommand> commands;
@@ -221,7 +224,7 @@ public abstract class AccordTask<R> extends Task implements 
Runnable, Function<S
     public AccordTask(AccordCommandStore commandStore, PreLoadContext 
preLoadContext)
     {
         super(commandStore);
-        this.loggingId = "0x" + 
Integer.toHexString(System.identityHashCode(this));
+        this.loggingId = "0x" + 
Long.toHexString(nextLoggingId.incrementAndGet());
         this.preLoadContext = preLoadContext;
 
         if (logger.isTraceEnabled())
diff --git 
a/src/java/org/apache/cassandra/utils/concurrent/LockWithAsyncSignal.java 
b/src/java/org/apache/cassandra/utils/concurrent/LockWithAsyncSignal.java
index 87a06ea6ed..2f358d9385 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/LockWithAsyncSignal.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/LockWithAsyncSignal.java
@@ -104,6 +104,16 @@ public class LockWithAsyncSignal implements Lock
     }
 
     public void await() throws InterruptedException
+    {
+        await(LockWithAsyncSignal::awaitDeferThrow);
+    }
+
+    public void awaitUninterruptibly()
+    {
+        await(LockWithAsyncSignal::awaitUninterruptibly);
+    }
+
+    private <T extends Throwable> void await(AwaitFunction<T> await) throws T
     {
         Thread thread = Thread.currentThread();
         int restoreDepth = depth;
@@ -112,7 +122,7 @@ public class LockWithAsyncSignal implements Lock
         depth = 0;
         owner = null;
 
-        awaitLock(true, thread, restoreDepth, 
LockWithAsyncSignal::awaitDeferThrow);
+        awaitLock(true, thread, restoreDepth, await);
     }
 
     public void unlock()
diff --git 
a/test/unit/org/apache/cassandra/utils/concurrent/LockWithAsyncSignalTest.java 
b/test/unit/org/apache/cassandra/utils/concurrent/LockWithAsyncSignalTest.java
new file mode 100644
index 0000000000..d9117454f8
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/utils/concurrent/LockWithAsyncSignalTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+
+public class LockWithAsyncSignalTest
+{
+    @Test
+    public void test()
+    {
+        ExecutorPlus submitters = executorFactory().pooled("test-submitters", 
16);
+        ExecutorPlus consumers = executorFactory().pooled("test-consumers", 
16);
+        for (int i = 0 ; i < 10 ; ++i)
+            testOne(submitters, consumers, 4, 8, i);
+    }
+
+    private static void testOne(ExecutorPlus submitterExecutor, ExecutorPlus 
consumerExecutor, int submitterCount, int consumerCount, int seconds)
+    {
+        class Waiting extends AtomicBoolean implements Comparable<Waiting>
+        {
+            final long ticket;
+
+            Waiting(long ticket)
+            {
+                this.ticket = ticket;
+            }
+
+            @Override
+            public int compareTo(Waiting that)
+            {
+                return Long.compare(this.ticket, that.ticket);
+            }
+        }
+        final LockWithAsyncSignal lock = new LockWithAsyncSignal();
+        final List<Future<Object>> submitters = new ArrayList<>();
+        final List<Future<Object>> consumers = new ArrayList<>();
+        final AtomicBoolean submittersRunning = new AtomicBoolean(true);
+        final AtomicBoolean consumersRunning = new AtomicBoolean(true);
+        final ConcurrentSkipListMap<Waiting, Boolean> waiting = new 
ConcurrentSkipListMap<>();
+        final AtomicLong nextTicket = new AtomicLong();
+        final EstimatedHistogram latency = new EstimatedHistogram();
+        for (int i = 0; i < submitterCount + consumerCount ; ++i)
+        {
+            boolean submitter = i/2 >= consumerCount || ((i & 1) == 0 && i/2 < 
submitterCount);
+            if (submitter)
+            {
+                submitters.add(submitterExecutor.submit(() -> {
+                    final Random rnd = new Random();
+                    while (submittersRunning.get())
+                    {
+                        
LockSupport.parkNanos(TimeUnit.MICROSECONDS.toNanos(rnd.nextInt(100)));
+                        long start = System.nanoTime();
+                        Waiting awaiting = new 
Waiting(nextTicket.incrementAndGet());
+                        waiting.put(awaiting, true);
+                        lock.signal();
+                        while (!awaiting.get());
+                        long end = System.nanoTime();
+                        latency.add(NANOSECONDS.toMicros(end - start));
+                    }
+                    return null;
+                }));
+            }
+            else
+            {
+                consumers.add(consumerExecutor.submit(() -> {
+                    final Random rnd = new Random();
+                    while (true)
+                    {
+                        if (rnd.nextBoolean()) lock.lock();
+                        else if (!lock.tryLock()) continue;
+
+                        Waiting waitUntil;
+                        try
+                        {
+                            AtomicBoolean awaiting;
+                            while (null != (awaiting = pollFirst(waiting)))
+                                awaiting.set(true);
+
+                            lock.await();
+                            if (null != (awaiting = pollFirst(waiting)))
+                                awaiting.set(true);
+
+                            if (!consumersRunning.get())
+                            {
+                                lock.signal();
+                                return null;
+                            }
+
+                            waitUntil = peekLast(waiting);
+                            if (!waiting.isEmpty())
+                                lock.signal();
+                        }
+                        finally
+                        {
+                            lock.unlock();
+                        }
+
+                        if (waitUntil != null)
+                        {
+                            while (!waitUntil.get());
+                        }
+                    }
+                }));
+            }
+        }
+        long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(seconds);
+        while (true)
+        {
+            long wait = deadline - System.nanoTime();
+            if (wait < 0)
+                break;
+            LockSupport.parkNanos(wait);
+        }
+        submittersRunning.set(false);
+        lock.signal();
+        FBUtilities.waitOnFutures(submitters, 2L, TimeUnit.SECONDS);
+        consumersRunning.set(false);
+        FBUtilities.waitOnFutures(consumers, 2L, TimeUnit.SECONDS);
+    }
+
+    private static <T> T pollFirst(NavigableMap<T, ?> map)
+    {
+        Map.Entry<T, ?> e = map.pollFirstEntry();
+        return e == null ? null : e.getKey();
+    }
+
+    private static <T> T peekLast(NavigableMap<T, ?> map)
+    {
+        Map.Entry<T, ?> e = map.lastEntry();
+        return e == null ? null : e.getKey();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to