This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e2583bbdbf3f13c476bc32baf06deccf9d12fe58 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Mon Aug 26 12:51:29 2024 +0200 Switch to infinite loop executor instead of a while-loop thread. Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-19864 --- .../cassandra/service/accord/AccordJournal.java | 142 ++++++++++++--------- 1 file changed, 83 insertions(+), 59 deletions(-) diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 956473cfd8..cf4fab6e16 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -52,6 +52,8 @@ import accord.primitives.TxnId; import accord.utils.Invariants; import org.agrona.collections.Long2ObjectHashMap; import org.agrona.collections.LongArrayList; +import org.apache.cassandra.concurrent.InfiniteLoopExecutor; +import org.apache.cassandra.concurrent.Interruptible; import org.apache.cassandra.concurrent.ManyToOneConcurrentLinkedQueue; import org.apache.cassandra.concurrent.Shutdownable; import org.apache.cassandra.config.DatabaseDescriptor; @@ -105,6 +107,9 @@ import static accord.messages.MessageType.SET_SHARD_DURABLE_REQ; import static accord.messages.MessageType.STABLE_FAST_PATH_REQ; import static accord.messages.MessageType.STABLE_MAXIMAL_REQ; import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ; +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; +import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE; +import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL; import static org.apache.cassandra.service.accord.AccordMessageSink.AccordMessageType.INTEROP_APPLY_MAXIMAL_REQ; import static org.apache.cassandra.service.accord.AccordMessageSink.AccordMessageType.INTEROP_APPLY_MINIMAL_REQ; import static org.apache.cassandra.service.accord.AccordMessageSink.AccordMessageType.INTEROP_COMMIT_MAXIMAL_REQ; @@ -165,7 +170,7 @@ public class AccordJournal implements IJournal, Shutdownable { Invariants.checkState(status == Status.STARTED); status = Status.TERMINATING; - delayedRequestProcessor.runOnce(); + delayedRequestProcessor.shutdown(); journal.shutdown(); status = Status.TERMINATED; } @@ -625,12 +630,18 @@ public class AccordJournal implements IJournal, Shutdownable * Handling topology changes / epoch shift */ - private final class DelayedRequestProcessor extends Thread + private class DelayedRequestProcessor implements Interruptible.Task { private final ManyToOneConcurrentLinkedQueue<RequestContext> delayedRequests = new ManyToOneConcurrentLinkedQueue<>(); private final LongArrayList waitForEpochs = new LongArrayList(); private final Long2ObjectHashMap<List<RequestContext>> byEpoch = new Long2ObjectHashMap<>(); private final AtomicReference<Condition> signal = new AtomicReference<>(Condition.newOneTimeCondition()); + private volatile Interruptible executor; + + public void start() + { + executor = executorFactory().infiniteLoop("AccordJournal-delayed-request-processor", this::run, SAFE, InfiniteLoopExecutor.Daemon.NON_DAEMON, InfiniteLoopExecutor.Interrupts.SYNCHRONIZED); + } private void delay(RequestContext requestContext) { @@ -643,81 +654,94 @@ public class AccordJournal implements IJournal, Shutdownable signal.get().signal(); } - public void run() + @Override + public void run(Interruptible.State state) { - while (!Thread.currentThread().isInterrupted() && isRunnable(status)) + if (state != NORMAL || Thread.currentThread().isInterrupted() || !isRunnable(status)) + return; + + try { - try + Condition signal = Condition.newOneTimeCondition(); + this.signal.set(signal); + // First, poll delayed requests, put them into by epoch + while (!delayedRequests.isEmpty()) { - Condition signal = Condition.newOneTimeCondition(); - this.signal.set(signal); - // First, poll delayed requests, put them into by epoch - while (!delayedRequests.isEmpty()) + RequestContext context = delayedRequests.poll(); + long waitForEpoch = context.waitForEpoch; + + List<RequestContext> l = byEpoch.computeIfAbsent(waitForEpoch, (ignore) -> new ArrayList<>()); + if (l.isEmpty()) + waitForEpochs.pushLong(waitForEpoch); + l.add(context); + BiConsumer<Void, Throwable> withEpochCallback = new BiConsumer<>() { - RequestContext context = delayedRequests.poll(); - long waitForEpoch = context.waitForEpoch; - - List<RequestContext> l = byEpoch.computeIfAbsent(waitForEpoch, (ignore) -> new ArrayList<>()); - if (l.isEmpty()) - waitForEpochs.pushLong(waitForEpoch); - l.add(context); - BiConsumer<Void, Throwable> withEpochCallback = new BiConsumer<>() + @Override + public void accept(Void unused, Throwable withEpochFailure) { - @Override - public void accept(Void unused, Throwable withEpochFailure) + if (withEpochFailure != null) { - if (withEpochFailure != null) + // Nothing to do but keep waiting + if (withEpochFailure instanceof Timeout) { - // Nothing to do but keep waiting - if (withEpochFailure instanceof Timeout) - { - node.withEpoch(waitForEpoch, this); - return; - } - else - throw new RuntimeException(withEpochFailure); + node.withEpoch(waitForEpoch, this); + return; } - runOnce(); + else + throw new RuntimeException(withEpochFailure); } - }; - node.withEpoch(waitForEpoch, withEpochCallback); - } + runOnce(); + } + }; + node.withEpoch(waitForEpoch, withEpochCallback); + } - // Next, process all delayed epochs - for (int i = 0; i < waitForEpochs.size(); i++) + // Next, process all delayed epochs + for (int i = 0; i < waitForEpochs.size(); i++) + { + long epoch = waitForEpochs.getLong(i); + if (node.topology().hasEpoch(epoch)) { - long epoch = waitForEpochs.getLong(i); - if (node.topology().hasEpoch(epoch)) + List<RequestContext> requests = byEpoch.remove(epoch); + assert requests != null : String.format("%s %s (%d)", byEpoch, waitForEpochs, epoch); + for (RequestContext request : requests) { - List<RequestContext> requests = byEpoch.remove(epoch); - assert requests != null : String.format("%s %s (%d)", byEpoch, waitForEpochs, epoch); - for (RequestContext request : requests) + try { - try - { - request.process(node, endpointMapper); - } - catch (Throwable t) - { - logger.error(String.format("Caught an exception while processing a delayed request %s", request), t); - } + request.process(node, endpointMapper); + } + catch (Throwable t) + { + logger.error("Caught an exception while processing a delayed request {}", request, t); } } } + } - waitForEpochs.removeIfLong(epoch -> !byEpoch.containsKey(epoch)); + waitForEpochs.removeIfLong(epoch -> !byEpoch.containsKey(epoch)); - signal.await(); - } - catch (InterruptedException e) - { - logger.info("Delayed request processor thread interrupted. Shutting down."); - return; - } - catch (Throwable t) - { - logger.error("Caught an exception in delayed processor", t); - } + signal.await(); + } + catch (InterruptedException e) + { + logger.info("Delayed request processor thread interrupted. Shutting down."); + } + catch (Throwable t) + { + logger.error("Caught an exception in delayed processor", t); + } + } + + private void shutdown() + { + executor.shutdown(); + try + { + executor.awaitTermination(1, TimeUnit.MINUTES); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org