This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e2583bbdbf3f13c476bc32baf06deccf9d12fe58
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Mon Aug 26 12:51:29 2024 +0200

    Switch to infinite loop executor instead of a while-loop thread.
    
    Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-19864
---
 .../cassandra/service/accord/AccordJournal.java    | 142 ++++++++++++---------
 1 file changed, 83 insertions(+), 59 deletions(-)

diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 956473cfd8..cf4fab6e16 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -52,6 +52,8 @@ import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import org.agrona.collections.Long2ObjectHashMap;
 import org.agrona.collections.LongArrayList;
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
+import org.apache.cassandra.concurrent.Interruptible;
 import org.apache.cassandra.concurrent.ManyToOneConcurrentLinkedQueue;
 import org.apache.cassandra.concurrent.Shutdownable;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -105,6 +107,9 @@ import static 
accord.messages.MessageType.SET_SHARD_DURABLE_REQ;
 import static accord.messages.MessageType.STABLE_FAST_PATH_REQ;
 import static accord.messages.MessageType.STABLE_MAXIMAL_REQ;
 import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ;
+import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE;
+import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
 import static 
org.apache.cassandra.service.accord.AccordMessageSink.AccordMessageType.INTEROP_APPLY_MAXIMAL_REQ;
 import static 
org.apache.cassandra.service.accord.AccordMessageSink.AccordMessageType.INTEROP_APPLY_MINIMAL_REQ;
 import static 
org.apache.cassandra.service.accord.AccordMessageSink.AccordMessageType.INTEROP_COMMIT_MAXIMAL_REQ;
@@ -165,7 +170,7 @@ public class AccordJournal implements IJournal, Shutdownable
     {
         Invariants.checkState(status == Status.STARTED);
         status = Status.TERMINATING;
-        delayedRequestProcessor.runOnce();
+        delayedRequestProcessor.shutdown();
         journal.shutdown();
         status = Status.TERMINATED;
     }
@@ -625,12 +630,18 @@ public class AccordJournal implements IJournal, 
Shutdownable
      * Handling topology changes / epoch shift
      */
 
-    private final class DelayedRequestProcessor extends Thread
+    private class DelayedRequestProcessor implements Interruptible.Task
     {
         private final ManyToOneConcurrentLinkedQueue<RequestContext> 
delayedRequests = new ManyToOneConcurrentLinkedQueue<>();
         private final LongArrayList waitForEpochs = new LongArrayList();
         private final Long2ObjectHashMap<List<RequestContext>> byEpoch = new 
Long2ObjectHashMap<>();
         private final AtomicReference<Condition> signal = new 
AtomicReference<>(Condition.newOneTimeCondition());
+        private volatile Interruptible executor;
+
+        public void start()
+        {
+             executor = 
executorFactory().infiniteLoop("AccordJournal-delayed-request-processor", 
this::run, SAFE, InfiniteLoopExecutor.Daemon.NON_DAEMON, 
InfiniteLoopExecutor.Interrupts.SYNCHRONIZED);
+        }
 
         private void delay(RequestContext requestContext)
         {
@@ -643,81 +654,94 @@ public class AccordJournal implements IJournal, 
Shutdownable
             signal.get().signal();
         }
 
-        public void run()
+        @Override
+        public void run(Interruptible.State state)
         {
-            while (!Thread.currentThread().isInterrupted() && 
isRunnable(status))
+            if (state != NORMAL || Thread.currentThread().isInterrupted() || 
!isRunnable(status))
+                return;
+
+            try
             {
-                try
+                Condition signal = Condition.newOneTimeCondition();
+                this.signal.set(signal);
+                // First, poll delayed requests, put them into by epoch
+                while (!delayedRequests.isEmpty())
                 {
-                    Condition signal = Condition.newOneTimeCondition();
-                    this.signal.set(signal);
-                    // First, poll delayed requests, put them into by epoch
-                    while (!delayedRequests.isEmpty())
+                    RequestContext context = delayedRequests.poll();
+                    long waitForEpoch = context.waitForEpoch;
+
+                    List<RequestContext> l = 
byEpoch.computeIfAbsent(waitForEpoch, (ignore) -> new ArrayList<>());
+                    if (l.isEmpty())
+                        waitForEpochs.pushLong(waitForEpoch);
+                    l.add(context);
+                    BiConsumer<Void, Throwable> withEpochCallback = new 
BiConsumer<>()
                     {
-                        RequestContext context = delayedRequests.poll();
-                        long waitForEpoch = context.waitForEpoch;
-
-                        List<RequestContext> l = 
byEpoch.computeIfAbsent(waitForEpoch, (ignore) -> new ArrayList<>());
-                        if (l.isEmpty())
-                            waitForEpochs.pushLong(waitForEpoch);
-                        l.add(context);
-                        BiConsumer<Void, Throwable> withEpochCallback = new 
BiConsumer<>()
+                        @Override
+                        public void accept(Void unused, Throwable 
withEpochFailure)
                         {
-                            @Override
-                            public void accept(Void unused, Throwable 
withEpochFailure)
+                            if (withEpochFailure != null)
                             {
-                                if (withEpochFailure != null)
+                                // Nothing to do but keep waiting
+                                if (withEpochFailure instanceof Timeout)
                                 {
-                                    // Nothing to do but keep waiting
-                                    if (withEpochFailure instanceof Timeout)
-                                    {
-                                        node.withEpoch(waitForEpoch, this);
-                                        return;
-                                    }
-                                    else
-                                        throw new 
RuntimeException(withEpochFailure);
+                                    node.withEpoch(waitForEpoch, this);
+                                    return;
                                 }
-                                runOnce();
+                                else
+                                    throw new 
RuntimeException(withEpochFailure);
                             }
-                        };
-                        node.withEpoch(waitForEpoch, withEpochCallback);
-                    }
+                            runOnce();
+                        }
+                    };
+                    node.withEpoch(waitForEpoch, withEpochCallback);
+                }
 
-                    // Next, process all delayed epochs
-                    for (int i = 0; i < waitForEpochs.size(); i++)
+                // Next, process all delayed epochs
+                for (int i = 0; i < waitForEpochs.size(); i++)
+                {
+                    long epoch = waitForEpochs.getLong(i);
+                    if (node.topology().hasEpoch(epoch))
                     {
-                        long epoch = waitForEpochs.getLong(i);
-                        if (node.topology().hasEpoch(epoch))
+                        List<RequestContext> requests = byEpoch.remove(epoch);
+                        assert requests != null : String.format("%s %s (%d)", 
byEpoch, waitForEpochs, epoch);
+                        for (RequestContext request : requests)
                         {
-                            List<RequestContext> requests = 
byEpoch.remove(epoch);
-                            assert requests != null : String.format("%s %s 
(%d)", byEpoch, waitForEpochs, epoch);
-                            for (RequestContext request : requests)
+                            try
                             {
-                                try
-                                {
-                                    request.process(node, endpointMapper);
-                                }
-                                catch (Throwable t)
-                                {
-                                    logger.error(String.format("Caught an 
exception while processing a delayed request %s", request), t);
-                                }
+                                request.process(node, endpointMapper);
+                            }
+                            catch (Throwable t)
+                            {
+                                logger.error("Caught an exception while 
processing a delayed request {}", request, t);
                             }
                         }
                     }
+                }
 
-                    waitForEpochs.removeIfLong(epoch -> 
!byEpoch.containsKey(epoch));
+                waitForEpochs.removeIfLong(epoch -> 
!byEpoch.containsKey(epoch));
 
-                    signal.await();
-                }
-                catch (InterruptedException e)
-                {
-                    logger.info("Delayed request processor thread interrupted. 
Shutting down.");
-                    return;
-                }
-                catch (Throwable t)
-                {
-                    logger.error("Caught an exception in delayed processor", 
t);
-                }
+                signal.await();
+            }
+            catch (InterruptedException e)
+            {
+                logger.info("Delayed request processor thread interrupted. 
Shutting down.");
+            }
+            catch (Throwable t)
+            {
+                logger.error("Caught an exception in delayed processor", t);
+            }
+        }
+
+        private void shutdown()
+        {
+            executor.shutdown();
+            try
+            {
+                executor.awaitTermination(1, TimeUnit.MINUTES);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to