This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch ex-sp-pl in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 1999a635c36739df37c67f27222d66b9c5c040d4 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Sun Sep 29 08:41:06 2024 +0100 unused imports --- accord-core/src/main/java/accord/api/Agent.java | 3 +- .../java/accord/impl/InMemoryCommandStore.java | 4 -- .../src/main/java/accord/local/Cleanup.java | 1 - .../src/main/java/accord/local/CommandStore.java | 3 +- .../src/main/java/accord/local/Commands.java | 1 - .../main/java/accord/local/RedundantBefore.java | 45 ++++++++++------------ .../main/java/accord/local/SafeCommandStore.java | 1 - .../main/java/accord/local/StoreParticipants.java | 2 +- .../main/java/accord/local/cfk/CommandsForKey.java | 14 ++++--- .../main/java/accord/local/cfk/PostProcess.java | 2 - .../src/main/java/accord/topology/Topologies.java | 6 --- .../src/main/java/accord/topology/Topology.java | 1 - .../src/test/java/accord/impl/TestAgent.java | 4 +- .../src/test/java/accord/impl/basic/Cluster.java | 44 +++++++++++---------- .../accord/impl/basic/DelayedCommandStores.java | 6 +++ .../src/test/java/accord/impl/basic/Journal.java | 7 ++-- .../src/test/java/accord/impl/list/ListAgent.java | 2 +- .../src/test/java/accord/impl/list/ListStore.java | 26 +++++++++---- .../java/accord/local/cfk/CommandsForKeyTest.java | 4 +- .../main/java/accord/maelstrom/MaelstromAgent.java | 2 +- 20 files changed, 90 insertions(+), 88 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Agent.java b/accord-core/src/main/java/accord/api/Agent.java index 2f42da1d..38ff4ccd 100644 --- a/accord-core/src/main/java/accord/api/Agent.java +++ b/accord-core/src/main/java/accord/api/Agent.java @@ -64,8 +64,7 @@ public interface Agent extends UncaughtExceptionListener @Override void onUncaughtException(Throwable t); - - void onHandledException(Throwable t); + void onHandledException(Throwable t, String context); /** * @return PreAccept timeout with implementation-defined resolution of the hybrid logical clock diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 5b425201..8f32380e 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory; import accord.api.Agent; import accord.api.DataStore; -import accord.api.Key; import accord.api.ProgressLog; import accord.local.Cleanup; import accord.local.Command; @@ -72,7 +71,6 @@ import accord.local.cfk.CommandsForKey; import accord.primitives.AbstractRanges; import accord.primitives.AbstractUnseekableKeys; import accord.primitives.Deps; -import accord.primitives.Keys; import accord.primitives.PartialDeps; import accord.primitives.Participants; import accord.primitives.Range; @@ -81,7 +79,6 @@ import accord.primitives.Routable.Domain; import accord.primitives.RoutableKey; import accord.primitives.Routables; import accord.primitives.Route; -import accord.primitives.RoutingKeys; import accord.primitives.SaveStatus; import accord.primitives.Status; import accord.primitives.Timestamp; @@ -90,7 +87,6 @@ import accord.primitives.TxnId; import accord.primitives.Unseekable; import accord.primitives.Unseekables; import accord.utils.Invariants; -import accord.utils.ReducingRangeMap; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; diff --git a/accord-core/src/main/java/accord/local/Cleanup.java b/accord-core/src/main/java/accord/local/Cleanup.java index b1a4a901..888fe0ba 100644 --- a/accord-core/src/main/java/accord/local/Cleanup.java +++ b/accord-core/src/main/java/accord/local/Cleanup.java @@ -23,7 +23,6 @@ import javax.annotation.Nonnull; import accord.primitives.SaveStatus; import accord.primitives.Status.Durability; import accord.primitives.TxnId; -import accord.primitives.Unseekables; import static accord.local.RedundantBefore.PreBootstrapOrStale.FULLY; import static accord.local.RedundantStatus.LIVE; diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index e149f1ad..19db3e7a 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -161,7 +161,6 @@ public abstract class CommandStore implements AgentExecutor private DurableBefore durableBefore = DurableBefore.EMPTY; private MaxConflicts maxConflicts = MaxConflicts.EMPTY; protected RangesForEpoch rangesForEpoch; - protected Ranges activeRanges, leavingRanges; /** * safeToRead is related to RedundantBefore, but a distinct concept. @@ -535,7 +534,7 @@ public abstract class CommandStore implements AgentExecutor safeStore.dataStore().snapshot(slicedRanges, globalSyncId).begin((success, fail) -> { if (fail != null) { - logger.error("Unsuccessful dataStore snapshot; unable to update GC markers", fail); + agent.onHandledException(fail, "Unsuccessful dataStore snapshot; unable to update GC markers"); return; } diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index 3c453d3f..2ddd2dbf 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -79,7 +79,6 @@ import static accord.primitives.Status.PreApplied; import static accord.primitives.Status.PreCommitted; import static accord.primitives.Status.Stable; import static accord.primitives.Status.Truncated; -import static accord.primitives.Routables.Slice.Minimal; import static accord.primitives.Route.isFullRoute; import static accord.utils.Invariants.illegalState; diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java b/accord-core/src/main/java/accord/local/RedundantBefore.java index 8da8a050..1ba6e207 100644 --- a/accord-core/src/main/java/accord/local/RedundantBefore.java +++ b/accord-core/src/main/java/accord/local/RedundantBefore.java @@ -61,7 +61,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> public enum PreBootstrapOrStale { NOT_OWNED, FULLY, PARTIALLY, POST_BOOTSTRAP } - // TODO (expected): rationalise the various bounds we maintain; make merge idempotent and apply any filtering by superseding bounds on access + // TODO (required): rationalise the various bounds we maintain; make merge idempotent and apply any filtering by superseding bounds on access public static class Entry { // TODO (desired): we don't need to maintain this now, can migrate to ReducingRangeMap.foldWithBounds @@ -130,11 +130,6 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> */ public final @Nullable Timestamp staleUntilAtLeast; - public Entry(Range range, long startOwnershipEpoch, long endOwnershipEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId shardOnlyAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId bootstrappedAt, @Nullable Timestamp staleUntilAtLeast) - { - this(range, startOwnershipEpoch, endOwnershipEpoch, locallyAppliedOrInvalidatedBefore, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); - } - public Entry(Range range, long startOwnershipEpoch, long endOwnershipEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull TxnId locallyDecidedAndAppliedOrInvalidatedBefore, @Nonnull TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId shardOnlyAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId bootstrappedAt, @Nullable Timestamp staleUntilAtLeast) { this.range = range; @@ -207,21 +202,20 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> return new Entry(range, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); } - public Entry withShardAppliedOrInvalidatedBeforeAtLeast(TxnId newShardAppliedOrInvalidatedBefore) + public Entry withGcBeforeBeforeAtLeast(TxnId newGcBefore) { - if (newShardAppliedOrInvalidatedBefore.compareTo(locallyAppliedOrInvalidatedBefore) >= 0) - newShardAppliedOrInvalidatedBefore = locallyAppliedOrInvalidatedBefore; - - if (newShardAppliedOrInvalidatedBefore.compareTo(shardAppliedOrInvalidatedBefore) <= 0) + if (newGcBefore.compareTo(gcBefore) <= 0) return this; - return new Entry(range, startOwnershipEpoch, endOwnershipEpoch, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, newShardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); + TxnId locallyAppliedOrInvalidatedBefore = TxnId.nonNullOrMax(this.locallyAppliedOrInvalidatedBefore, newGcBefore); + TxnId shardAppliedOrInvalidatedBefore = TxnId.nonNullOrMax(this.shardAppliedOrInvalidatedBefore, newGcBefore); + return new Entry(range, startOwnershipEpoch, endOwnershipEpoch, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, newGcBefore, bootstrappedAt, staleUntilAtLeast); } @VisibleForImplementation public Entry withEpochs(long start, long end) { - return new Entry(range, start, end, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); + return new Entry(range, start, end, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); } static @Nonnull RedundantStatus getAndMerge(Entry entry, @Nonnull RedundantStatus prev, TxnId txnId, Object ignore) @@ -282,8 +276,8 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> if (entry == null) return prev; - if (entry.shardAppliedOrInvalidatedBefore.compareTo(Timestamp.NONE) > 0) - prev.add(entry.range, entry.shardAppliedOrInvalidatedBefore); + if (entry.gcBefore.compareTo(Timestamp.NONE) > 0) + prev.add(entry.range, entry.gcBefore); return prev; } @@ -315,7 +309,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> if (entry == null || (executeAt == null ? entry.outOfBounds(txnId) : entry.outOfBounds(txnId, executeAt))) return notRedundant; - if (txnId.compareTo(entry.shardAppliedOrInvalidatedBefore) < 0) + if (txnId.compareTo(entry.gcBefore) < 0) return notRedundant.without(Ranges.of(entry.range)); return notRedundant; @@ -348,7 +342,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> if (locallyAppliedOrInvalidatedBefore.compareTo(txnId) > 0) { - if (shardAppliedOrInvalidatedBefore.compareTo(txnId) > 0) + if (gcBefore.compareTo(txnId) > 0) return SHARD_REDUNDANT; return LOCALLY_REDUNDANT; } @@ -368,7 +362,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> public final TxnId shardRedundantBefore() { - return shardAppliedOrInvalidatedBefore; + return gcBefore; } public final TxnId locallyRedundantBefore() @@ -395,7 +389,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> private boolean isComplete() { // TODO (required): carefully consider whether we should ALSO expect some local property to be met here - return endOwnershipEpoch <= shardAppliedOrInvalidatedBefore.epoch(); + return endOwnershipEpoch <= gcBefore.epoch(); } private boolean outOfBounds(Timestamp lb) @@ -405,12 +399,12 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> Entry withEpochs(int startEpoch, int endEpoch) { - return new Entry(range, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); + return new Entry(range, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); } public Entry withRange(Range range) { - return new Entry(range, startOwnershipEpoch, endOwnershipEpoch, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); + return new Entry(range, startOwnershipEpoch, endOwnershipEpoch, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); } public boolean equals(Object that) @@ -430,6 +424,8 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> && this.locallyAppliedOrInvalidatedBefore.equals(that.locallyAppliedOrInvalidatedBefore) && this.locallyDecidedAndAppliedOrInvalidatedBefore.equals(that.locallyDecidedAndAppliedOrInvalidatedBefore) && this.shardAppliedOrInvalidatedBefore.equals(that.shardAppliedOrInvalidatedBefore) + && this.shardOnlyAppliedOrInvalidatedBefore.equals(that.shardOnlyAppliedOrInvalidatedBefore) + && this.gcBefore.equals(that.gcBefore) && this.bootstrappedAt.equals(that.bootstrappedAt) && Objects.equals(this.staleUntilAtLeast, that.staleUntilAtLeast); } @@ -502,8 +498,9 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> if (ranges.isEmpty()) return new RedundantBefore(); + TxnId locallyDecidedAndAppliedOrInvalidatedBefore = locallyAppliedOrInvalidatedBefore; TxnId shardAppliedOrInvalidatedBefore = TxnId.min(locallyAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore); - Entry entry = new Entry(null, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); + Entry entry = new Entry(null, startEpoch, endEpoch, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); Builder builder = new Builder(ranges.get(0).endInclusive(), ranges.size() * 2); for (int i = 0 ; i < ranges.size() ; ++i) { @@ -616,7 +613,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> if (v.range.start().equals(start) && v.range.end().equals(end)) return v; - return new Entry(v.range.newRange(start, end), v.startOwnershipEpoch, v.endOwnershipEpoch, v.locallyAppliedOrInvalidatedBefore, v.shardAppliedOrInvalidatedBefore, v.shardOnlyAppliedOrInvalidatedBefore, v.gcBefore, v.bootstrappedAt, v.staleUntilAtLeast); + return new Entry(v.range.newRange(start, end), v.startOwnershipEpoch, v.endOwnershipEpoch, v.locallyAppliedOrInvalidatedBefore, v.locallyDecidedAndAppliedOrInvalidatedBefore, v.shardAppliedOrInvalidatedBefore, v.shardOnlyAppliedOrInvalidatedBefore, v.gcBefore, v.bootstrappedAt, v.staleUntilAtLeast); } @Override @@ -635,7 +632,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> return new Entry(a.range.newRange( a.range.start().compareTo(b.range.start()) <= 0 ? a.range.start() : b.range.start(), a.range.end().compareTo(b.range.end()) >= 0 ? a.range.end() : b.range.end() - ), a.startOwnershipEpoch, a.endOwnershipEpoch, a.locallyAppliedOrInvalidatedBefore, a.shardAppliedOrInvalidatedBefore, a.shardOnlyAppliedOrInvalidatedBefore, a.gcBefore, a.bootstrappedAt, a.staleUntilAtLeast); + ), a.startOwnershipEpoch, a.endOwnershipEpoch, a.locallyDecidedAndAppliedOrInvalidatedBefore, a.locallyAppliedOrInvalidatedBefore, a.shardAppliedOrInvalidatedBefore, a.shardOnlyAppliedOrInvalidatedBefore, a.gcBefore, a.bootstrappedAt, a.staleUntilAtLeast); } @Override diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 50c6410c..5c4c174a 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -46,7 +46,6 @@ import accord.primitives.Unseekable; import accord.primitives.Unseekables; import accord.utils.Invariants; -import static accord.local.Cleanup.NO; import static accord.local.KeyHistory.COMMANDS; import static accord.local.RedundantBefore.PreBootstrapOrStale.FULLY; import static accord.primitives.Routables.Slice.Minimal; diff --git a/accord-core/src/main/java/accord/local/StoreParticipants.java b/accord-core/src/main/java/accord/local/StoreParticipants.java index 89825407..20e5b404 100644 --- a/accord-core/src/main/java/accord/local/StoreParticipants.java +++ b/accord-core/src/main/java/accord/local/StoreParticipants.java @@ -271,7 +271,7 @@ public class StoreParticipants return new StoreParticipants(route, empty, empty, empty); } - public static StoreParticipants all(TxnId txnId, Route<?> route) + public static StoreParticipants all(Route<?> route) { return new StoreParticipants(route, route, route, route); } diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java index d4fe54c5..5aad9df6 100644 --- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java +++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java @@ -957,7 +957,7 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm static TxnId bootstrappedAt(RedundantBefore.Entry boundsInfo) { TxnId bootstrappedAt = boundsInfo.bootstrappedAt; - if (bootstrappedAt.compareTo(boundsInfo.shardRedundantBefore()) <= 0) + if (bootstrappedAt.compareTo(boundsInfo.gcBefore) <= 0) bootstrappedAt = null; return bootstrappedAt; } @@ -969,7 +969,8 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm static TxnId redundantBefore(RedundantBefore.Entry boundsInfo) { - return boundsInfo.shardAppliedOrInvalidatedBefore; + // TODO (expected): this can be weakened to shardAppliedOrInvalidatedBefore + return boundsInfo.gcBefore; } public boolean isPostBootstrapAndOwned(TxnId txnId) @@ -1327,7 +1328,7 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm { int prunedBeforeById = Arrays.binarySearch(byId, prunedBefore); Invariants.checkState(prunedBeforeById >= 0 || prunedBefore.equals(TxnId.NONE)); - return reconstruct(key, NO_BOUNDS_INFO.withShardAppliedOrInvalidatedBeforeAtLeast(redundantBefore), + return reconstruct(key, NO_BOUNDS_INFO.withGcBeforeBeforeAtLeast(redundantBefore), byId, BTree.empty(), prunedBeforeById, unmanageds); } @@ -1627,13 +1628,14 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm */ public CommandsForKeyUpdate withRedundantBeforeAtLeast(RedundantBefore.Entry newBoundsInfo, boolean force) { - if (!force && newBoundsInfo.shardAppliedOrInvalidatedBefore.equals(boundsInfo.shardAppliedOrInvalidatedBefore) + // TODO (required): handle receiving an entry from the past, e.g. on reload (OR expunge all CFK on restart) + if (!force && newBoundsInfo.gcBefore.equals(boundsInfo.gcBefore) && newBoundsInfo.bootstrappedAt.equals(boundsInfo.bootstrappedAt) && newBoundsInfo.locallyDecidedAndAppliedOrInvalidatedBefore.equals(boundsInfo.locallyDecidedAndAppliedOrInvalidatedBefore) && newBoundsInfo.endOwnershipEpoch == boundsInfo.endOwnershipEpoch) return this; - if (newBoundsInfo.shardAppliedOrInvalidatedBefore.epoch() >= newBoundsInfo.endOwnershipEpoch) + if (newBoundsInfo.gcBefore.epoch() >= newBoundsInfo.endOwnershipEpoch) { // we should be completely finished; notify every unmanaged and return an empty CFK // we special case this to handle the case of future dependencies supplied to us by other CommandsForKey that had pruned their dependencies; @@ -1661,7 +1663,7 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm @VisibleForImplementation public CommandsForKey withRedundantBeforeAtLeast(TxnId newRedundantBefore) { - RedundantBefore.Entry newBoundsInfo = boundsInfo.withShardAppliedOrInvalidatedBeforeAtLeast(newRedundantBefore); + RedundantBefore.Entry newBoundsInfo = boundsInfo.withGcBeforeBeforeAtLeast(newRedundantBefore); TxnInfo[] newById = pruneById(byId, boundsInfo, newBoundsInfo); int newPrunedBeforeById = prunedBeforeId(newById, prunedBefore(), newRedundantBefore); diff --git a/accord-core/src/main/java/accord/local/cfk/PostProcess.java b/accord-core/src/main/java/accord/local/cfk/PostProcess.java index a6e944e3..80c71666 100644 --- a/accord-core/src/main/java/accord/local/cfk/PostProcess.java +++ b/accord-core/src/main/java/accord/local/cfk/PostProcess.java @@ -46,7 +46,6 @@ import static accord.local.cfk.CommandsForKey.InternalStatus.INVALID_OR_TRUNCATE import static accord.local.cfk.CommandsForKey.InternalStatus.STABLE; import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.APPLY; import static accord.local.cfk.CommandsForKey.maxContiguousManagedAppliedIndex; -import static accord.local.cfk.CommandsForKey.updateAndNotifyUnmanageds; import static accord.local.cfk.Updating.updateUnmanaged; import static accord.local.cfk.Updating.updateUnmanagedAsync; import static accord.local.cfk.Utils.findApply; @@ -54,7 +53,6 @@ import static accord.local.cfk.Utils.findCommit; import static accord.local.cfk.Utils.findFirstApply; import static accord.local.cfk.Utils.removeUnmanaged; import static accord.local.cfk.Utils.selectUnmanaged; -import static accord.primitives.Timestamp.max; import static accord.primitives.TxnId.NO_TXNIDS; import static accord.utils.ArrayBuffers.cachedTxnIds; diff --git a/accord-core/src/main/java/accord/topology/Topologies.java b/accord-core/src/main/java/accord/topology/Topologies.java index 1208077e..54a05eca 100644 --- a/accord-core/src/main/java/accord/topology/Topologies.java +++ b/accord-core/src/main/java/accord/topology/Topologies.java @@ -18,13 +18,7 @@ package accord.topology; -import java.util.AbstractCollection; import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.Set; - -import com.google.common.collect.Iterators; import accord.api.TopologySorter; import accord.local.Node; diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index 81a2b5ff..47e791e9 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -22,7 +22,6 @@ import java.util.AbstractList; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java b/accord-core/src/test/java/accord/impl/TestAgent.java index dc35335d..6e3ddcb4 100644 --- a/accord-core/src/test/java/accord/impl/TestAgent.java +++ b/accord-core/src/test/java/accord/impl/TestAgent.java @@ -69,7 +69,7 @@ public class TestAgent implements Agent } @Override - public void onHandledException(Throwable t) + public void onHandledException(Throwable t, String context) { throw new AssertionError("Unexpected exception", t); } @@ -108,7 +108,7 @@ public class TestAgent implements Agent } @Override - public void onHandledException(Throwable t) + public void onHandledException(Throwable t, String context) { } diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index e3f9eb3f..1e568a0c 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -533,27 +533,29 @@ public class Cluster implements Scheduler Scheduled restart = sinks.recurring(() -> { Id id = random.pick(nodes); CommandStore[] stores = nodeMap.get(id).commandStores().all(); - Predicate<Pending> pred = getPendingPredicate(stores); - while (sinks.drain(pred)); - - // Journal cleanup is a rough equivalent of a node restart. - trace.debug("Triggering journal cleanup for node " + id); - CommandsForKey.disableLinearizabilityViolationsReporting(); - ListStore listStore = (ListStore) nodeMap.get(id).commandStores().dataStore(); - listStore.clear(); - listStore.restoreFromSnapshot(); - - Journal journal = journalMap.get(id); - for (CommandStore s : stores) - { - DelayedCommandStores.DelayedCommandStore store = (DelayedCommandStores.DelayedCommandStore) s; - store.clearForTesting(); - journal.reconstructAll(store.loader(), store.id()); - journal.loadHistoricalTransactions(store::load, store.id()); - } - while (sinks.drain(pred)); - CommandsForKey.enableLinearizabilityViolationsReporting(); - trace.debug("Done with cleanup."); + ((DelayedCommandStore)stores[0]).unsafeRunIn(() -> { + Predicate<Pending> pred = getPendingPredicate(stores); + while (sinks.drain(pred)); + + // Journal cleanup is a rough equivalent of a node restart. + trace.debug("Triggering journal cleanup for node " + id); + CommandsForKey.disableLinearizabilityViolationsReporting(); + ListStore listStore = (ListStore) nodeMap.get(id).commandStores().dataStore(); + listStore.clear(); + listStore.restoreFromSnapshot(); + + Journal journal = journalMap.get(id); + for (CommandStore s : stores) + { + DelayedCommandStores.DelayedCommandStore store = (DelayedCommandStores.DelayedCommandStore) s; + store.clearForTesting(); + journal.reconstructAll(store.loader(), store.id()); + journal.loadHistoricalTransactions(store::load, store.id()); + } + while (sinks.drain(pred)); + CommandsForKey.enableLinearizabilityViolationsReporting(); + trace.debug("Done with cleanup."); + }); }, () -> random.nextInt(1, 10), SECONDS); durabilityScheduling.forEach(CoordinateDurabilityScheduling::start); diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index 0de6debc..20c6ec36 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -286,6 +286,12 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread journal.registerHistoricalTransactions(id(), deps); super.registerHistoricalTransactions(deps, safeStore); } + + @Override + public void unsafeRunIn(Runnable fn) + { + super.unsafeRunIn(fn); + } } public static class DelayedSafeStore extends InMemoryCommandStore.InMemorySafeStore diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java b/accord-core/src/test/java/accord/impl/basic/Journal.java index 64a1e53c..019801b7 100644 --- a/accord-core/src/test/java/accord/impl/basic/Journal.java +++ b/accord-core/src/test/java/accord/impl/basic/Journal.java @@ -88,7 +88,8 @@ public class Journal Command command = reconstruct(diffs, Reconstruct.Last).get(0); if (command.status() == Truncated || command.status() == Invalidated) continue; // Already truncated - Cleanup cleanup = Cleanup.shouldCleanup(store, command, command.participants()); + StoreParticipants participants = Invariants.nonNull(command.participants()); + Cleanup cleanup = Cleanup.shouldCleanup(store, command, participants); switch (cleanup) { case NO: @@ -254,8 +255,8 @@ public class Journal attrs.partialTxn(partialTxn); if (durability != null) attrs.durability(durability); - if (participants != null) - attrs.setParticipants(participants); + if (participants != null) attrs.setParticipants(participants); + else attrs.setParticipants(StoreParticipants.empty(txnId)); // TODO (desired): we can simplify this logic if, instead of diffing, we will infer the diff from the status if (partialDeps != null && diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java index 08db0e3e..4d35a03f 100644 --- a/accord-core/src/test/java/accord/impl/list/ListAgent.java +++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java @@ -109,7 +109,7 @@ public class ListAgent implements Agent } @Override - public void onHandledException(Throwable t) + public void onHandledException(Throwable t, String context) { } diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/test/java/accord/impl/list/ListStore.java index 01c11aa0..d2a071bd 100644 --- a/accord-core/src/test/java/accord/impl/list/ListStore.java +++ b/accord-core/src/test/java/accord/impl/list/ListStore.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -179,12 +180,12 @@ public class ListStore implements DataStore private static final class PendingSnapshot { final long delay; - final Runnable runnable; + final Consumer<Boolean> onCompletion; - private PendingSnapshot(long delay, Runnable runnable) + private PendingSnapshot(long delay, Consumer<Boolean> onCompletion) { this.delay = delay; - this.runnable = runnable; + this.onCompletion = onCompletion; } } @@ -198,9 +199,16 @@ public class ListStore implements DataStore AsyncResult.Settable<Void> result = new AsyncResults.SettableResult<>(); long delay = Math.max(1, random.nextBiasedLong(100, 1000, 5000) - pendingDelay); pendingDelay += delay; - pendingSnapshots.add(new PendingSnapshot(delay, () -> { - this.snapshot = snapshot; - result.setSuccess(null); + pendingSnapshots.add(new PendingSnapshot(delay, success -> { + if (success) + { + this.snapshot = snapshot; + result.setSuccess(null); + } + else + { + result.setFailure(new RuntimeException("Snapshot aborted due to earlier snapshot being restored")); + } })); if (pendingSnapshots.size() == 1) @@ -216,7 +224,7 @@ public class ListStore implements DataStore return; PendingSnapshot pendingSnapshot = pendingSnapshots.pollFirst(); - pendingSnapshot.runnable.run(); + pendingSnapshot.onCompletion.accept(true); pendingDelay -= pendingSnapshot.delay; if (!pendingSnapshots.isEmpty()) scheduleRunSnapshot(); @@ -234,6 +242,9 @@ public class ListStore implements DataStore purgedAts.addAll(snapshot.purgedAts); fetchCompletes.addAll(snapshot.fetchCompletes); pendingRemoves.addAll(snapshot.pendingRemoves); + + while (!pendingSnapshots.isEmpty()) + pendingSnapshots.pollFirst().onCompletion.accept(false); } public void clear() @@ -547,6 +558,7 @@ public class ListStore implements DataStore // TODO (effeciency, correctness): remove the delayed removal logic. // This logic was added to make sure the sequence of events made sense but doesn't handle everything perfectly; I (David C) believe that this code // will suffer from the ABA problem; if a range is removed, then added back it is not likley to be handled correctly (the add will no-op as it wasn't removed, then the remove will remove it!) + if (pendingRemoves.isEmpty()) return; if (pendingRemoves.get(0) != epoch) { onRemovalDone.add(() -> performRemoval(epoch, removed, s)); diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java index 974e0009..d1754b6d 100644 --- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java +++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java @@ -536,7 +536,7 @@ public class CommandsForKeyTest { CommonAttributes.Mutable result = new CommonAttributes.Mutable(txnId) .durability(NotDurable) - .updateParticipants(StoreParticipants.all(txnId, txnId.is(Key) ? KEY_ROUTE : RANGE_ROUTE)); + .updateParticipants(StoreParticipants.all(txnId.is(Key) ? KEY_ROUTE : RANGE_ROUTE)); if (withDefinition) result.partialTxn((txnId.domain() == Key ? KEY_TXN : RANGE_TXN).slice(RANGES, true)); @@ -992,7 +992,7 @@ public class CommandsForKeyTest } @Override - public void onHandledException(Throwable t) + public void onHandledException(Throwable t, String context) { throw new UnsupportedOperationException(); } diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java index 79a1f3cd..098910da 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java @@ -80,7 +80,7 @@ public class MaelstromAgent implements Agent } @Override - public void onHandledException(Throwable t) + public void onHandledException(Throwable t, String context) { } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
