This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 835ccdea5508b3c30571fe366f78bb45b7a4742d Author: Alex Petrov <[email protected]> AuthorDate: Fri Oct 4 11:35:01 2024 +0200 Fix condition on where we shut down accord; move scheduled executor shutdown until after MS. Wake up segment prepared after shutting down allocator, as no new segments will ever be allocated. Shut down flusher slightly differently: we do not signal from fsync complete, since all blocks should have been fsynced by then, but we will add invariant check to notice runaway threads. Wait for quiescense Truncate blocking Wait for scheduler shutdown before shutting down command store Shut down accord after shutting down messaging Truncate caches before replay --- .gitmodules | 4 +- modules/accord | 2 +- src/java/org/apache/cassandra/journal/Flusher.java | 8 +++- src/java/org/apache/cassandra/journal/Journal.java | 32 +++++++++++++- .../apache/cassandra/service/StorageService.java | 12 +++-- .../service/accord/AccordCommandStore.java | 5 +++ .../service/accord/AccordCommandStores.java | 38 ++++++++++++++++ .../cassandra/service/accord/AccordJournal.java | 51 ++++++++++------------ .../cassandra/service/accord/AccordKeyspace.java | 17 +++++--- .../cassandra/service/accord/AccordService.java | 7 ++- .../service/accord/AccordVerbHandler.java | 6 +++ .../org/apache/cassandra/utils/ExecutorUtils.java | 28 ++++++++++++ 12 files changed, 165 insertions(+), 45 deletions(-) diff --git a/.gitmodules b/.gitmodules index 1e61f63e19..616dacf610 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/belliottsmith/cassandra-accord.git - branch = followup3 + url = https://github.com/apache/cassandra-accord.git + branch = trunk diff --git a/modules/accord b/modules/accord index 17314e15d4..08ee5ce1c6 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 17314e15d45f46a1572cd10867efc0660169de13 +Subproject commit 08ee5ce1c6301201ccaf7d580a6af289ab4c5765 diff --git a/src/java/org/apache/cassandra/journal/Flusher.java b/src/java/org/apache/cassandra/journal/Flusher.java index 709c68615b..5dfa76e894 100644 --- a/src/java/org/apache/cassandra/journal/Flusher.java +++ b/src/java/org/apache/cassandra/journal/Flusher.java @@ -489,7 +489,13 @@ final class Flusher<K, V> { WaitQueue.Signal signal = fsyncComplete.register(context, Timer.Context::stop); if (fsyncFinishedFor < flushTime) - signal.awaitUninterruptibly(); + { + signal.awaitThrowUncheckedOnInterrupt(); + + Journal.State state = journal.state.get(); + Invariants.checkState(state == Journal.State.NORMAL, + "Thread %s outlived journal, which is in %s state", Thread.currentThread(), state); + } else signal.cancel(); } diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index ba4bf503b2..5501146d8d 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -111,7 +111,10 @@ public class Journal<K, V> implements Shutdownable private final AtomicReference<Segments<K, V>> segments = new AtomicReference<>(); + final AtomicReference<State> state = new AtomicReference<>(State.UNINITIALIZED); + Interruptible allocator; + // TODO (required): we do not need wait queues here, we can just wait on a signal on a segment while its byte buffer is being allocated private final WaitQueue segmentPrepared = newWaitQueue(); private final WaitQueue allocatorThreadWaitQueue = newWaitQueue(); private final BooleanSupplier allocatorThreadWaitCondition = () -> (availableSegment == null); @@ -210,6 +213,8 @@ public class Journal<K, V> implements Shutdownable public void start() { + Invariants.checkState(state.compareAndSet(State.UNINITIALIZED, State.INITIALIZING), + "Unexpected journal state during initialization", state); metrics.register(flusher); deleteTmpFiles(); @@ -228,6 +233,8 @@ public class Journal<K, V> implements Shutdownable advanceSegment(null); flusher.start(); compactor.start(); + Invariants.checkState(state.compareAndSet(State.INITIALIZING, State.NORMAL), + "Unexpected journal state after initialization", state); } @VisibleForTesting @@ -253,16 +260,19 @@ public class Journal<K, V> implements Shutdownable @Override public boolean isTerminated() { - return false; + return state.get() == State.TERMINATED; } public void shutdown() { try { + Invariants.checkState(state.compareAndSet(State.NORMAL, State.SHUTDOWN), + "Unexpected journal state while trying to shut down", state); allocator.shutdown(); wakeAllocator(); // Wake allocator to force it into shutdown allocator.awaitTermination(1, TimeUnit.MINUTES); + segmentPrepared.signalAll(); // Wake up all threads waiting on the new segment compactor.shutdown(); compactor.awaitTermination(1, TimeUnit.MINUTES); flusher.shutdown(); @@ -270,6 +280,8 @@ public class Journal<K, V> implements Shutdownable closer.awaitTermination(1, TimeUnit.MINUTES); closeAllSegments(); metrics.deregister(); + Invariants.checkState(state.compareAndSet(State.SHUTDOWN, State.TERMINATED), + "Unexpected journal state while trying to shut down", state); } catch (InterruptedException e) { @@ -574,7 +586,14 @@ public class Journal<K, V> implements Shutdownable { WaitQueue.Signal prepared = segmentPrepared.register(metrics.waitingOnSegmentAllocation.time(), Context::stop); if (availableSegment == null && currentSegment == currentActiveSegment) - prepared.awaitUninterruptibly(); + { + prepared.awaitThrowUncheckedOnInterrupt(); + + // In case we woke up due to shutdown signal or interrupt, check mode + State state = this.state.get(); + if (state.ordinal() > State.NORMAL.ordinal()) + throw new IllegalStateException("Can not obtain allocated segment due to shutdown " + state); + } else prepared.cancel(); } @@ -1024,4 +1043,13 @@ public class Journal<K, V> implements Shutdownable segments.close(); } } + + enum State + { + UNINITIALIZED, + INITIALIZING, + NORMAL, + SHUTDOWN, + TERMINATED + } } diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 54ffa99fe3..ee92ba9054 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -173,6 +173,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.schema.ViewMetadata; import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.AccordVerbHandler; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget; import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster; @@ -3961,9 +3962,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.debug(msg); transientMode = Optional.of(Mode.DRAINING); - if (DatabaseDescriptor.getAccordTransactionsEnabled()) - AccordService.instance().shutdownAndWait(1, MINUTES); - try { /* not clear this is reasonable time, but propagated from prior embedded behaviour */ @@ -3979,7 +3977,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (daemon != null) shutdownClientServers(); - ScheduledExecutors.optionalTasks.shutdown(); + Gossiper.instance.stop(); ActiveRepairService.instance().stop(); @@ -3989,6 +3987,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE transientMode = Optional.of(Mode.DRAINING); } + if (AccordService.isSetup()) + AccordService.instance().shutdownAndWait(1, MINUTES); + // In-progress writes originating here could generate hints to be written, // which is currently scheduled on the mutation stage. So shut down MessagingService // before mutation stage, so we can get all the hints saved before shutting down. @@ -4003,6 +4004,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.error("Messaging service timed out shutting down", t); } + // ScheduledExecutors shuts down after MessagingService, as MessagingService may issue tasks to it. + ScheduledExecutors.optionalTasks.shutdown(); + if (!isFinalShutdown) { logger.debug("clearing mutation stage"); diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 51bc4d85f6..91e4030d5d 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -694,6 +694,11 @@ public class AccordCommandStore extends CommandStore this.threadId = threadId; } + public boolean hasTasks() + { + return delegate.getPendingTaskCount() > 0 || delegate.getActiveTaskCount() > 0; + } + CommandStoreExecutor(AccordStateCache stateCache, SequentialExecutorPlus delegate) { this.stateCache = stateCache; diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java index 6d9744310f..620bad8b16 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java @@ -17,6 +17,10 @@ */ package org.apache.cassandra.service.accord; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -41,6 +45,7 @@ import org.apache.cassandra.metrics.CacheSizeMetrics; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.accord.AccordCommandStore.CommandStoreExecutor; import org.apache.cassandra.service.accord.api.AccordRoutingKey; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; @@ -154,6 +159,39 @@ public class AccordCommandStores extends CommandStores implements CacheSize }; } + public void waitForQuiescense() + { + boolean hadPending; + try + { + do + { + hadPending = false; + List<Future<?>> futures = new ArrayList<>(); + for (CommandStoreExecutor executor : executors) + { + if (executor.hasTasks()) + { + futures.add(executor.submit(() -> {})); + hadPending = true; + } + } + for (Future<?> future : futures) + future.get(); + futures.clear(); + } + while (hadPending); + } + catch (ExecutionException e) + { + throw new IllegalStateException("Should have never been thrown", e); + } + catch (InterruptedException e) + { + throw new UncheckedInterruptedException(e); + } + } + @Override public synchronized void shutdown() { diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 49af8042a8..38bd9f9101 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -26,7 +26,6 @@ import java.util.NavigableMap; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -40,9 +39,10 @@ import accord.local.CommandStores.RangesForEpoch; import accord.local.DurableBefore; import accord.local.Node; import accord.local.RedundantBefore; -import accord.primitives.SaveStatus; +import accord.local.cfk.CommandsForKey; import accord.primitives.Deps; import accord.primitives.Ranges; +import accord.primitives.SaveStatus; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.utils.Invariants; @@ -73,9 +73,6 @@ import static org.apache.cassandra.service.accord.AccordJournalValueSerializers. public class AccordJournal implements IJournal, Shutdownable { - - private final AtomicBoolean isReplay = new AtomicBoolean(false); - static { // make noise early if we forget to update our version mappings @@ -93,7 +90,7 @@ public class AccordJournal implements IJournal, Shutdownable private final Params params; Node node; - enum Status { INITIALIZED, STARTING, STARTED, TERMINATING, TERMINATED } + enum Status { INITIALIZED, STARTING, REPLAY, STARTED, TERMINATING, TERMINATED } private volatile Status status = Status.INITIALIZED; @VisibleForTesting @@ -127,10 +124,14 @@ public class AccordJournal implements IJournal, Shutdownable this.node = node; status = Status.STARTING; journal.start(); - status = Status.STARTED; return this; } + public boolean started() + { + return status == Status.STARTED; + } + public Params configuration() { return params; @@ -150,7 +151,7 @@ public class AccordJournal implements IJournal, Shutdownable @Override public void shutdown() { - Invariants.checkState(status == Status.STARTED); + Invariants.checkState(status == Status.REPLAY || status == Status.STARTED); status = Status.TERMINATING; journal.shutdown(); status = Status.TERMINATED; @@ -230,7 +231,7 @@ public class AccordJournal implements IJournal, Shutdownable @Override public void appendCommand(int store, SavedCommand.DiffWriter value, Runnable onFlush) { - if (value == null || isReplay.get()) + if (value == null || status == Status.REPLAY) { if (onFlush != null) onFlush.run(); @@ -252,7 +253,7 @@ public class AccordJournal implements IJournal, Shutdownable @Override public AsyncResult<?> persist(DurableBefore addDurableBefore, DurableBefore newDurableBefore) { - if (isReplay.get()) + if (status == Status.REPLAY) return AsyncResults.success(null); AsyncResult.Settable<Void> result = AsyncResults.settable(); @@ -306,18 +307,6 @@ public class AccordJournal implements IJournal, Shutdownable return builder; } - public List<SavedCommand.Builder> loadSeparateDiffs(int commandStoreId, TxnId txnId) - { - JournalKey key = new JournalKey(txnId, JournalKey.Type.COMMAND_DIFF, commandStoreId); - List<SavedCommand.Builder> builders = new ArrayList<>(); - journalTable.readAll(key, (in, version) -> { - SavedCommand.Builder builder = new SavedCommand.Builder(txnId); - builder.deserializeNext(in, version); - builders.add(builder); - }); - return builders; - } - private <BUILDER> BUILDER readAll(JournalKey key) { BUILDER builder = (BUILDER) key.type.serializer.mergerFor(key); @@ -367,6 +356,10 @@ public class AccordJournal implements IJournal, Shutdownable public void replay() { + logger.info("Starting journal replay."); + CommandsForKey.disableLinearizabilityViolationsReporting(); + AccordKeyspace.truncateAllCaches(); + // TODO (expected): optimize replay memory footprint class ToApply { @@ -383,8 +376,6 @@ public class AccordJournal implements IJournal, Shutdownable List<ToApply> toApply = new ArrayList<>(); try (AccordJournalTable.KeyOrderIterator<JournalKey> iter = journalTable.readAll()) { - isReplay.set(true); - JournalKey key; SavedCommand.Builder builder = new SavedCommand.Builder(); while ((key = iter.key()) != null) @@ -425,17 +416,20 @@ public class AccordJournal implements IJournal, Shutdownable for (ToApply apply : toApply) { AccordCommandStore commandStore = (AccordCommandStore) node.commandStores().forId(apply.key.commandStoreId); + logger.info("Apply {}", apply.command); commandStore.loader().apply(apply.command); } + + logger.info("Waiting for command stores to quiesce."); + ((AccordCommandStores)node.commandStores()).waitForQuiescense(); + CommandsForKey.enableLinearizabilityViolationsReporting(); + logger.info("Finished journal replay."); + status = Status.STARTED; } catch (Throwable t) { throw new RuntimeException("Can not replay journal.", t); } - finally - { - isReplay.set(false); - } } // TODO: this is here temporarily; for debugging purposes @@ -492,7 +486,6 @@ public class AccordJournal implements IJournal, Shutdownable t); } } - } } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 0a8cb085fb..bddf73f0aa 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -37,23 +37,22 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; -import org.apache.cassandra.tcm.ClusterMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.RoutingKey; -import accord.local.StoreParticipants; -import accord.local.cfk.CommandsForKey; import accord.impl.TimestampsForKey; import accord.local.Command; import accord.local.CommandStore; import accord.local.Node; import accord.local.RedundantBefore; +import accord.local.StoreParticipants; +import accord.local.cfk.CommandsForKey; +import accord.primitives.Ranges; +import accord.primitives.Route; import accord.primitives.SaveStatus; import accord.primitives.Status; import accord.primitives.Status.Durability; -import accord.primitives.Ranges; -import accord.primitives.Route; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.topology.Topology; @@ -138,6 +137,7 @@ import org.apache.cassandra.service.accord.serializers.AccordRoutingKeyByteSourc import org.apache.cassandra.service.accord.serializers.CommandSerializers; import org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer; import org.apache.cassandra.service.accord.serializers.KeySerializers; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.Clock.Global; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.btree.BTree; @@ -747,6 +747,13 @@ public class AccordKeyspace return Tables.of(Commands, TimestampsForKeys, CommandsForKeys, Topologies, EpochMetadata, Journal); } + public static void truncateAllCaches() + { + Keyspace ks = Keyspace.open(ACCORD_KEYSPACE_NAME); + for (String table : new String[]{ TimestampsForKeys.name, CommandsForKeys.name }) + ks.getColumnFamilyStore(table).truncateBlocking(); + } + private static <T> ByteBuffer serialize(T obj, LocalVersionedSerializer<T> serializer) throws IOException { int size = (int) serializer.serializedSize(obj); diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 6fdf9e8586..f132840c82 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -201,6 +201,7 @@ public class AccordService implements IAccordService, Shutdownable private final CoordinateDurabilityScheduling durabilityScheduling; private final AccordVerbHandler<? extends Request> requestHandler; private final LocalConfig configuration; + @GuardedBy("this") private State state = State.INIT; @@ -381,6 +382,10 @@ public class AccordService implements IAccordService, Shutdownable i.shutdownAndWait(timeout, unit); } + public boolean shouldAcceptMessages() + { + return state == State.STARTED && journal.started(); + } public static IAccordService instance() { if (!DatabaseDescriptor.getAccordTransactionsEnabled()) @@ -964,7 +969,7 @@ public class AccordService implements IAccordService, Shutdownable { if (state != State.STARTED) return; - ExecutorUtils.shutdown(shutdownableSubsystems()); + ExecutorUtils.shutdownSequentiallyAndWait(shutdownableSubsystems(), 1, TimeUnit.MINUTES); state = State.SHUTDOWN; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java index 5d8747d4a5..34c7b26bd9 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java +++ b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java @@ -43,6 +43,12 @@ public class AccordVerbHandler<T extends Request> implements IVerbHandler<T> @Override public void doVerb(Message<T> message) throws IOException { + if (!((AccordService)AccordService.instance()).shouldAcceptMessages()) + { + logger.debug("Dropping message {} from {}", message.verb(), message.from()); + return; + } + logger.trace("Receiving {} from {}", message.payload, message.from()); T request = message.payload; diff --git a/src/java/org/apache/cassandra/utils/ExecutorUtils.java b/src/java/org/apache/cassandra/utils/ExecutorUtils.java index 5bb841f32b..5b449e3096 100644 --- a/src/java/org/apache/cassandra/utils/ExecutorUtils.java +++ b/src/java/org/apache/cassandra/utils/ExecutorUtils.java @@ -79,6 +79,34 @@ public class ExecutorUtils } } + public static void shutdownSequentiallyAndWait(Iterable<?> executors, long timeout, TimeUnit unit) + { + long deadline = System.nanoTime() + unit.toNanos(timeout); + + for (Object executor : executors) + { + try + { + if (executor instanceof ExecutorService) + { + ((ExecutorService) executor).shutdown(); + ((ExecutorService) executor).awaitTermination(Math.max(0, deadline - System.nanoTime()), NANOSECONDS); + } + else if (executor instanceof Shutdownable) + { + ((Shutdownable) executor).shutdown(); + ((Shutdownable) executor).awaitTermination(Math.max(0, deadline - System.nanoTime()), NANOSECONDS); + } + else + throw new IllegalArgumentException(executor.toString()); + } + catch (Throwable t) + { + throw new IllegalStateException("Caught interrupt while shutting down " + executor); + } + } + } + public static void shutdown(ExecutorService ... executors) { shutdown(Arrays.asList(executors)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
