This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch ex-sp-pl
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 1999a635c36739df37c67f27222d66b9c5c040d4
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sun Sep 29 08:41:06 2024 +0100

    unused imports
---
 accord-core/src/main/java/accord/api/Agent.java    |  3 +-
 .../java/accord/impl/InMemoryCommandStore.java     |  4 --
 .../src/main/java/accord/local/Cleanup.java        |  1 -
 .../src/main/java/accord/local/CommandStore.java   |  3 +-
 .../src/main/java/accord/local/Commands.java       |  1 -
 .../main/java/accord/local/RedundantBefore.java    | 45 ++++++++++------------
 .../main/java/accord/local/SafeCommandStore.java   |  1 -
 .../main/java/accord/local/StoreParticipants.java  |  2 +-
 .../main/java/accord/local/cfk/CommandsForKey.java | 14 ++++---
 .../main/java/accord/local/cfk/PostProcess.java    |  2 -
 .../src/main/java/accord/topology/Topologies.java  |  6 ---
 .../src/main/java/accord/topology/Topology.java    |  1 -
 .../src/test/java/accord/impl/TestAgent.java       |  4 +-
 .../src/test/java/accord/impl/basic/Cluster.java   | 44 +++++++++++----------
 .../accord/impl/basic/DelayedCommandStores.java    |  6 +++
 .../src/test/java/accord/impl/basic/Journal.java   |  7 ++--
 .../src/test/java/accord/impl/list/ListAgent.java  |  2 +-
 .../src/test/java/accord/impl/list/ListStore.java  | 26 +++++++++----
 .../java/accord/local/cfk/CommandsForKeyTest.java  |  4 +-
 .../main/java/accord/maelstrom/MaelstromAgent.java |  2 +-
 20 files changed, 90 insertions(+), 88 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Agent.java 
b/accord-core/src/main/java/accord/api/Agent.java
index 2f42da1d..38ff4ccd 100644
--- a/accord-core/src/main/java/accord/api/Agent.java
+++ b/accord-core/src/main/java/accord/api/Agent.java
@@ -64,8 +64,7 @@ public interface Agent extends UncaughtExceptionListener
 
     @Override
     void onUncaughtException(Throwable t);
-
-    void onHandledException(Throwable t);
+    void onHandledException(Throwable t, String context);
 
     /**
      * @return PreAccept timeout with implementation-defined resolution of the 
hybrid logical clock
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 5b425201..8f32380e 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
 import accord.api.DataStore;
-import accord.api.Key;
 import accord.api.ProgressLog;
 import accord.local.Cleanup;
 import accord.local.Command;
@@ -72,7 +71,6 @@ import accord.local.cfk.CommandsForKey;
 import accord.primitives.AbstractRanges;
 import accord.primitives.AbstractUnseekableKeys;
 import accord.primitives.Deps;
-import accord.primitives.Keys;
 import accord.primitives.PartialDeps;
 import accord.primitives.Participants;
 import accord.primitives.Range;
@@ -81,7 +79,6 @@ import accord.primitives.Routable.Domain;
 import accord.primitives.RoutableKey;
 import accord.primitives.Routables;
 import accord.primitives.Route;
-import accord.primitives.RoutingKeys;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status;
 import accord.primitives.Timestamp;
@@ -90,7 +87,6 @@ import accord.primitives.TxnId;
 import accord.primitives.Unseekable;
 import accord.primitives.Unseekables;
 import accord.utils.Invariants;
-import accord.utils.ReducingRangeMap;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java 
b/accord-core/src/main/java/accord/local/Cleanup.java
index b1a4a901..888fe0ba 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -23,7 +23,6 @@ import javax.annotation.Nonnull;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status.Durability;
 import accord.primitives.TxnId;
-import accord.primitives.Unseekables;
 
 import static accord.local.RedundantBefore.PreBootstrapOrStale.FULLY;
 import static accord.local.RedundantStatus.LIVE;
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index e149f1ad..19db3e7a 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -161,7 +161,6 @@ public abstract class CommandStore implements AgentExecutor
     private DurableBefore durableBefore = DurableBefore.EMPTY;
     private MaxConflicts maxConflicts = MaxConflicts.EMPTY;
     protected RangesForEpoch rangesForEpoch;
-    protected Ranges activeRanges, leavingRanges;
 
     /**
      * safeToRead is related to RedundantBefore, but a distinct concept.
@@ -535,7 +534,7 @@ public abstract class CommandStore implements AgentExecutor
         safeStore.dataStore().snapshot(slicedRanges, 
globalSyncId).begin((success, fail) -> {
             if (fail != null)
             {
-                logger.error("Unsuccessful dataStore snapshot; unable to 
update GC markers", fail);
+                agent.onHandledException(fail, "Unsuccessful dataStore 
snapshot; unable to update GC markers");
                 return;
             }
 
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 3c453d3f..2ddd2dbf 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -79,7 +79,6 @@ import static accord.primitives.Status.PreApplied;
 import static accord.primitives.Status.PreCommitted;
 import static accord.primitives.Status.Stable;
 import static accord.primitives.Status.Truncated;
-import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.Route.isFullRoute;
 import static accord.utils.Invariants.illegalState;
 
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java 
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 8da8a050..1ba6e207 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -61,7 +61,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
 
     public enum PreBootstrapOrStale { NOT_OWNED, FULLY, PARTIALLY, 
POST_BOOTSTRAP }
 
-    // TODO (expected): rationalise the various bounds we maintain; make merge 
idempotent and apply any filtering by superseding bounds on access
+    // TODO (required): rationalise the various bounds we maintain; make merge 
idempotent and apply any filtering by superseding bounds on access
     public static class Entry
     {
         // TODO (desired): we don't need to maintain this now, can migrate to 
ReducingRangeMap.foldWithBounds
@@ -130,11 +130,6 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
          */
         public final @Nullable Timestamp staleUntilAtLeast;
 
-        public Entry(Range range, long startOwnershipEpoch, long 
endOwnershipEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull 
TxnId shardAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardOnlyAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId 
bootstrappedAt, @Nullable Timestamp staleUntilAtLeast)
-        {
-            this(range, startOwnershipEpoch, endOwnershipEpoch, 
locallyAppliedOrInvalidatedBefore, locallyAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, 
bootstrappedAt, staleUntilAtLeast);
-        }
-
         public Entry(Range range, long startOwnershipEpoch, long 
endOwnershipEpoch, @Nonnull TxnId locallyAppliedOrInvalidatedBefore, @Nonnull 
TxnId locallyDecidedAndAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardAppliedOrInvalidatedBefore, @Nonnull TxnId 
shardOnlyAppliedOrInvalidatedBefore, @Nonnull TxnId gcBefore, @Nonnull TxnId 
bootstrappedAt, @Nullable Timestamp staleUntilAtLeast)
         {
             this.range = range;
@@ -207,21 +202,20 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             return new Entry(range, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, 
bootstrappedAt, staleUntilAtLeast);
         }
 
-        public Entry withShardAppliedOrInvalidatedBeforeAtLeast(TxnId 
newShardAppliedOrInvalidatedBefore)
+        public Entry withGcBeforeBeforeAtLeast(TxnId newGcBefore)
         {
-            if 
(newShardAppliedOrInvalidatedBefore.compareTo(locallyAppliedOrInvalidatedBefore)
 >= 0)
-                newShardAppliedOrInvalidatedBefore = 
locallyAppliedOrInvalidatedBefore;
-
-            if 
(newShardAppliedOrInvalidatedBefore.compareTo(shardAppliedOrInvalidatedBefore) 
<= 0)
+            if (newGcBefore.compareTo(gcBefore) <= 0)
                 return this;
 
-            return new Entry(range, startOwnershipEpoch, endOwnershipEpoch, 
locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, 
newShardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, 
gcBefore, bootstrappedAt, staleUntilAtLeast);
+            TxnId locallyAppliedOrInvalidatedBefore = 
TxnId.nonNullOrMax(this.locallyAppliedOrInvalidatedBefore, newGcBefore);
+            TxnId shardAppliedOrInvalidatedBefore = 
TxnId.nonNullOrMax(this.shardAppliedOrInvalidatedBefore, newGcBefore);
+            return new Entry(range, startOwnershipEpoch, endOwnershipEpoch, 
locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, 
newGcBefore, bootstrappedAt, staleUntilAtLeast);
         }
 
         @VisibleForImplementation
         public Entry withEpochs(long start, long end)
         {
-            return new Entry(range, start, end, 
locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, 
shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, 
staleUntilAtLeast);
+            return new Entry(range, start, end, 
locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, 
bootstrappedAt, staleUntilAtLeast);
         }
 
         static @Nonnull RedundantStatus getAndMerge(Entry entry, @Nonnull 
RedundantStatus prev, TxnId txnId, Object ignore)
@@ -282,8 +276,8 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             if (entry == null)
                 return prev;
 
-            if 
(entry.shardAppliedOrInvalidatedBefore.compareTo(Timestamp.NONE) > 0)
-                prev.add(entry.range, entry.shardAppliedOrInvalidatedBefore);
+            if (entry.gcBefore.compareTo(Timestamp.NONE) > 0)
+                prev.add(entry.range, entry.gcBefore);
 
             return prev;
         }
@@ -315,7 +309,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             if (entry == null || (executeAt == null ? entry.outOfBounds(txnId) 
: entry.outOfBounds(txnId, executeAt)))
                 return notRedundant;
 
-            if (txnId.compareTo(entry.shardAppliedOrInvalidatedBefore) < 0)
+            if (txnId.compareTo(entry.gcBefore) < 0)
                 return notRedundant.without(Ranges.of(entry.range));
 
             return notRedundant;
@@ -348,7 +342,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
 
             if (locallyAppliedOrInvalidatedBefore.compareTo(txnId) > 0)
             {
-                if (shardAppliedOrInvalidatedBefore.compareTo(txnId) > 0)
+                if (gcBefore.compareTo(txnId) > 0)
                     return SHARD_REDUNDANT;
                 return LOCALLY_REDUNDANT;
             }
@@ -368,7 +362,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
 
         public final TxnId shardRedundantBefore()
         {
-            return shardAppliedOrInvalidatedBefore;
+            return gcBefore;
         }
 
         public final TxnId locallyRedundantBefore()
@@ -395,7 +389,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
         private boolean isComplete()
         {
             // TODO (required): carefully consider whether we should ALSO 
expect some local property to be met here
-            return endOwnershipEpoch <= 
shardAppliedOrInvalidatedBefore.epoch();
+            return endOwnershipEpoch <= gcBefore.epoch();
         }
 
         private boolean outOfBounds(Timestamp lb)
@@ -405,12 +399,12 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
 
         Entry withEpochs(int startEpoch, int endEpoch)
         {
-            return new Entry(range, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, 
shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, 
staleUntilAtLeast);
+            return new Entry(range, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, 
bootstrappedAt, staleUntilAtLeast);
         }
 
         public Entry withRange(Range range)
         {
-            return new Entry(range, startOwnershipEpoch, endOwnershipEpoch, 
locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, 
shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, 
staleUntilAtLeast);
+            return new Entry(range, startOwnershipEpoch, endOwnershipEpoch, 
locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, 
bootstrappedAt, staleUntilAtLeast);
         }
 
         public boolean equals(Object that)
@@ -430,6 +424,8 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
                    && 
this.locallyAppliedOrInvalidatedBefore.equals(that.locallyAppliedOrInvalidatedBefore)
                    && 
this.locallyDecidedAndAppliedOrInvalidatedBefore.equals(that.locallyDecidedAndAppliedOrInvalidatedBefore)
                    && 
this.shardAppliedOrInvalidatedBefore.equals(that.shardAppliedOrInvalidatedBefore)
+                   && 
this.shardOnlyAppliedOrInvalidatedBefore.equals(that.shardOnlyAppliedOrInvalidatedBefore)
+                   && this.gcBefore.equals(that.gcBefore)
                    && this.bootstrappedAt.equals(that.bootstrappedAt)
                    && Objects.equals(this.staleUntilAtLeast, 
that.staleUntilAtLeast);
         }
@@ -502,8 +498,9 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
         if (ranges.isEmpty())
             return new RedundantBefore();
 
+        TxnId locallyDecidedAndAppliedOrInvalidatedBefore = 
locallyAppliedOrInvalidatedBefore;
         TxnId shardAppliedOrInvalidatedBefore = 
TxnId.min(locallyAppliedOrInvalidatedBefore, 
shardOnlyAppliedOrInvalidatedBefore);
-        Entry entry = new Entry(null, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, 
shardOnlyAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, 
staleUntilAtLeast);
+        Entry entry = new Entry(null, startEpoch, endEpoch, 
locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, 
shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, gcBefore, 
bootstrappedAt, staleUntilAtLeast);
         Builder builder = new Builder(ranges.get(0).endInclusive(), 
ranges.size() * 2);
         for (int i = 0 ; i < ranges.size() ; ++i)
         {
@@ -616,7 +613,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             if (v.range.start().equals(start) && v.range.end().equals(end))
                 return v;
 
-            return new Entry(v.range.newRange(start, end), 
v.startOwnershipEpoch, v.endOwnershipEpoch, 
v.locallyAppliedOrInvalidatedBefore, v.shardAppliedOrInvalidatedBefore, 
v.shardOnlyAppliedOrInvalidatedBefore, v.gcBefore, v.bootstrappedAt, 
v.staleUntilAtLeast);
+            return new Entry(v.range.newRange(start, end), 
v.startOwnershipEpoch, v.endOwnershipEpoch, 
v.locallyAppliedOrInvalidatedBefore, 
v.locallyDecidedAndAppliedOrInvalidatedBefore, 
v.shardAppliedOrInvalidatedBefore, v.shardOnlyAppliedOrInvalidatedBefore, 
v.gcBefore, v.bootstrappedAt, v.staleUntilAtLeast);
         }
 
         @Override
@@ -635,7 +632,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             return new Entry(a.range.newRange(
                 a.range.start().compareTo(b.range.start()) <= 0 ? 
a.range.start() : b.range.start(),
                 a.range.end().compareTo(b.range.end()) >= 0 ? a.range.end() : 
b.range.end()
-            ), a.startOwnershipEpoch, a.endOwnershipEpoch, 
a.locallyAppliedOrInvalidatedBefore, a.shardAppliedOrInvalidatedBefore, 
a.shardOnlyAppliedOrInvalidatedBefore, a.gcBefore, a.bootstrappedAt, 
a.staleUntilAtLeast);
+            ), a.startOwnershipEpoch, a.endOwnershipEpoch, 
a.locallyDecidedAndAppliedOrInvalidatedBefore, 
a.locallyAppliedOrInvalidatedBefore, a.shardAppliedOrInvalidatedBefore, 
a.shardOnlyAppliedOrInvalidatedBefore, a.gcBefore, a.bootstrappedAt, 
a.staleUntilAtLeast);
         }
 
         @Override
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 50c6410c..5c4c174a 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -46,7 +46,6 @@ import accord.primitives.Unseekable;
 import accord.primitives.Unseekables;
 import accord.utils.Invariants;
 
-import static accord.local.Cleanup.NO;
 import static accord.local.KeyHistory.COMMANDS;
 import static accord.local.RedundantBefore.PreBootstrapOrStale.FULLY;
 import static accord.primitives.Routables.Slice.Minimal;
diff --git a/accord-core/src/main/java/accord/local/StoreParticipants.java 
b/accord-core/src/main/java/accord/local/StoreParticipants.java
index 89825407..20e5b404 100644
--- a/accord-core/src/main/java/accord/local/StoreParticipants.java
+++ b/accord-core/src/main/java/accord/local/StoreParticipants.java
@@ -271,7 +271,7 @@ public class StoreParticipants
         return new StoreParticipants(route, empty, empty, empty);
     }
 
-    public static StoreParticipants all(TxnId txnId, Route<?> route)
+    public static StoreParticipants all(Route<?> route)
     {
         return new StoreParticipants(route, route, route, route);
     }
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index d4fe54c5..5aad9df6 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -957,7 +957,7 @@ public class CommandsForKey extends CommandsForKeyUpdate 
implements CommandsSumm
     static TxnId bootstrappedAt(RedundantBefore.Entry boundsInfo)
     {
         TxnId bootstrappedAt = boundsInfo.bootstrappedAt;
-        if (bootstrappedAt.compareTo(boundsInfo.shardRedundantBefore()) <= 0)
+        if (bootstrappedAt.compareTo(boundsInfo.gcBefore) <= 0)
             bootstrappedAt = null;
         return bootstrappedAt;
     }
@@ -969,7 +969,8 @@ public class CommandsForKey extends CommandsForKeyUpdate 
implements CommandsSumm
 
     static TxnId redundantBefore(RedundantBefore.Entry boundsInfo)
     {
-        return boundsInfo.shardAppliedOrInvalidatedBefore;
+        // TODO (expected): this can be weakened to 
shardAppliedOrInvalidatedBefore
+        return boundsInfo.gcBefore;
     }
 
     public boolean isPostBootstrapAndOwned(TxnId txnId)
@@ -1327,7 +1328,7 @@ public class CommandsForKey extends CommandsForKeyUpdate 
implements CommandsSumm
     {
         int prunedBeforeById = Arrays.binarySearch(byId, prunedBefore);
         Invariants.checkState(prunedBeforeById >= 0 || 
prunedBefore.equals(TxnId.NONE));
-        return reconstruct(key, 
NO_BOUNDS_INFO.withShardAppliedOrInvalidatedBeforeAtLeast(redundantBefore),
+        return reconstruct(key, 
NO_BOUNDS_INFO.withGcBeforeBeforeAtLeast(redundantBefore),
                            byId, BTree.empty(), prunedBeforeById, unmanageds);
     }
 
@@ -1627,13 +1628,14 @@ public class CommandsForKey extends 
CommandsForKeyUpdate implements CommandsSumm
      */
     public CommandsForKeyUpdate 
withRedundantBeforeAtLeast(RedundantBefore.Entry newBoundsInfo, boolean force)
     {
-        if (!force && 
newBoundsInfo.shardAppliedOrInvalidatedBefore.equals(boundsInfo.shardAppliedOrInvalidatedBefore)
+        // TODO (required): handle receiving an entry from the past, e.g. on 
reload (OR expunge all CFK on restart)
+        if (!force && newBoundsInfo.gcBefore.equals(boundsInfo.gcBefore)
             && newBoundsInfo.bootstrappedAt.equals(boundsInfo.bootstrappedAt)
             && 
newBoundsInfo.locallyDecidedAndAppliedOrInvalidatedBefore.equals(boundsInfo.locallyDecidedAndAppliedOrInvalidatedBefore)
             && newBoundsInfo.endOwnershipEpoch == boundsInfo.endOwnershipEpoch)
             return this;
 
-        if (newBoundsInfo.shardAppliedOrInvalidatedBefore.epoch() >= 
newBoundsInfo.endOwnershipEpoch)
+        if (newBoundsInfo.gcBefore.epoch() >= newBoundsInfo.endOwnershipEpoch)
         {
             // we should be completely finished; notify every unmanaged and 
return an empty CFK
             // we special case this to handle the case of future dependencies 
supplied to us by other CommandsForKey that had pruned their dependencies;
@@ -1661,7 +1663,7 @@ public class CommandsForKey extends CommandsForKeyUpdate 
implements CommandsSumm
     @VisibleForImplementation
     public CommandsForKey withRedundantBeforeAtLeast(TxnId newRedundantBefore)
     {
-        RedundantBefore.Entry newBoundsInfo = 
boundsInfo.withShardAppliedOrInvalidatedBeforeAtLeast(newRedundantBefore);
+        RedundantBefore.Entry newBoundsInfo = 
boundsInfo.withGcBeforeBeforeAtLeast(newRedundantBefore);
 
         TxnInfo[] newById = pruneById(byId, boundsInfo, newBoundsInfo);
         int newPrunedBeforeById = prunedBeforeId(newById, prunedBefore(), 
newRedundantBefore);
diff --git a/accord-core/src/main/java/accord/local/cfk/PostProcess.java 
b/accord-core/src/main/java/accord/local/cfk/PostProcess.java
index a6e944e3..80c71666 100644
--- a/accord-core/src/main/java/accord/local/cfk/PostProcess.java
+++ b/accord-core/src/main/java/accord/local/cfk/PostProcess.java
@@ -46,7 +46,6 @@ import static 
accord.local.cfk.CommandsForKey.InternalStatus.INVALID_OR_TRUNCATE
 import static accord.local.cfk.CommandsForKey.InternalStatus.STABLE;
 import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.APPLY;
 import static accord.local.cfk.CommandsForKey.maxContiguousManagedAppliedIndex;
-import static accord.local.cfk.CommandsForKey.updateAndNotifyUnmanageds;
 import static accord.local.cfk.Updating.updateUnmanaged;
 import static accord.local.cfk.Updating.updateUnmanagedAsync;
 import static accord.local.cfk.Utils.findApply;
@@ -54,7 +53,6 @@ import static accord.local.cfk.Utils.findCommit;
 import static accord.local.cfk.Utils.findFirstApply;
 import static accord.local.cfk.Utils.removeUnmanaged;
 import static accord.local.cfk.Utils.selectUnmanaged;
-import static accord.primitives.Timestamp.max;
 import static accord.primitives.TxnId.NO_TXNIDS;
 import static accord.utils.ArrayBuffers.cachedTxnIds;
 
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java 
b/accord-core/src/main/java/accord/topology/Topologies.java
index 1208077e..54a05eca 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -18,13 +18,7 @@
 
 package accord.topology;
 
-import java.util.AbstractCollection;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Set;
-
-import com.google.common.collect.Iterators;
 
 import accord.api.TopologySorter;
 import accord.local.Node;
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index 81a2b5ff..47e791e9 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -22,7 +22,6 @@ import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java 
b/accord-core/src/test/java/accord/impl/TestAgent.java
index dc35335d..6e3ddcb4 100644
--- a/accord-core/src/test/java/accord/impl/TestAgent.java
+++ b/accord-core/src/test/java/accord/impl/TestAgent.java
@@ -69,7 +69,7 @@ public class TestAgent implements Agent
         }
 
         @Override
-        public void onHandledException(Throwable t)
+        public void onHandledException(Throwable t, String context)
         {
             throw new AssertionError("Unexpected exception", t);
         }
@@ -108,7 +108,7 @@ public class TestAgent implements Agent
     }
 
     @Override
-    public void onHandledException(Throwable t)
+    public void onHandledException(Throwable t, String context)
     {
     }
 
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index e3f9eb3f..1e568a0c 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -533,27 +533,29 @@ public class Cluster implements Scheduler
             Scheduled restart = sinks.recurring(() -> {
                 Id id = random.pick(nodes);
                 CommandStore[] stores = nodeMap.get(id).commandStores().all();
-                Predicate<Pending> pred = getPendingPredicate(stores);
-                while (sinks.drain(pred));
-
-                // Journal cleanup is a rough equivalent of a node restart.
-                trace.debug("Triggering journal cleanup for node " + id);
-                CommandsForKey.disableLinearizabilityViolationsReporting();
-                ListStore listStore = (ListStore) 
nodeMap.get(id).commandStores().dataStore();
-                listStore.clear();
-                listStore.restoreFromSnapshot();
-
-                Journal journal = journalMap.get(id);
-                for (CommandStore s : stores)
-                {
-                    DelayedCommandStores.DelayedCommandStore store = 
(DelayedCommandStores.DelayedCommandStore) s;
-                    store.clearForTesting();
-                    journal.reconstructAll(store.loader(), store.id());
-                    journal.loadHistoricalTransactions(store::load, 
store.id());
-                }
-                while (sinks.drain(pred));
-                CommandsForKey.enableLinearizabilityViolationsReporting();
-                trace.debug("Done with cleanup.");
+                ((DelayedCommandStore)stores[0]).unsafeRunIn(() -> {
+                    Predicate<Pending> pred = getPendingPredicate(stores);
+                    while (sinks.drain(pred));
+
+                    // Journal cleanup is a rough equivalent of a node restart.
+                    trace.debug("Triggering journal cleanup for node " + id);
+                    CommandsForKey.disableLinearizabilityViolationsReporting();
+                    ListStore listStore = (ListStore) 
nodeMap.get(id).commandStores().dataStore();
+                    listStore.clear();
+                    listStore.restoreFromSnapshot();
+
+                    Journal journal = journalMap.get(id);
+                    for (CommandStore s : stores)
+                    {
+                        DelayedCommandStores.DelayedCommandStore store = 
(DelayedCommandStores.DelayedCommandStore) s;
+                        store.clearForTesting();
+                        journal.reconstructAll(store.loader(), store.id());
+                        journal.loadHistoricalTransactions(store::load, 
store.id());
+                    }
+                    while (sinks.drain(pred));
+                    CommandsForKey.enableLinearizabilityViolationsReporting();
+                    trace.debug("Done with cleanup.");
+                });
             }, () -> random.nextInt(1, 10), SECONDS);
 
             
durabilityScheduling.forEach(CoordinateDurabilityScheduling::start);
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 0de6debc..20c6ec36 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -286,6 +286,12 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             journal.registerHistoricalTransactions(id(), deps);
             super.registerHistoricalTransactions(deps, safeStore);
         }
+
+        @Override
+        public void unsafeRunIn(Runnable fn)
+        {
+            super.unsafeRunIn(fn);
+        }
     }
 
     public static class DelayedSafeStore extends 
InMemoryCommandStore.InMemorySafeStore
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java 
b/accord-core/src/test/java/accord/impl/basic/Journal.java
index 64a1e53c..019801b7 100644
--- a/accord-core/src/test/java/accord/impl/basic/Journal.java
+++ b/accord-core/src/test/java/accord/impl/basic/Journal.java
@@ -88,7 +88,8 @@ public class Journal
                 Command command = reconstruct(diffs, Reconstruct.Last).get(0);
                 if (command.status() == Truncated || command.status() == 
Invalidated)
                     continue; // Already truncated
-                Cleanup cleanup = Cleanup.shouldCleanup(store, command, 
command.participants());
+                StoreParticipants participants = 
Invariants.nonNull(command.participants());
+                Cleanup cleanup = Cleanup.shouldCleanup(store, command, 
participants);
                 switch (cleanup)
                 {
                     case NO:
@@ -254,8 +255,8 @@ public class Journal
                 attrs.partialTxn(partialTxn);
             if (durability != null)
                 attrs.durability(durability);
-            if (participants != null)
-                attrs.setParticipants(participants);
+            if (participants != null) attrs.setParticipants(participants);
+            else attrs.setParticipants(StoreParticipants.empty(txnId));
 
             // TODO (desired): we can simplify this logic if, instead of 
diffing, we will infer the diff from the status
             if (partialDeps != null &&
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java 
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 08db0e3e..4d35a03f 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -109,7 +109,7 @@ public class ListAgent implements Agent
     }
 
     @Override
-    public void onHandledException(Throwable t)
+    public void onHandledException(Throwable t, String context)
     {
     }
 
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java 
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index 01c11aa0..d2a071bd 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -179,12 +180,12 @@ public class ListStore implements DataStore
     private static final class PendingSnapshot
     {
         final long delay;
-        final Runnable runnable;
+        final Consumer<Boolean> onCompletion;
 
-        private PendingSnapshot(long delay, Runnable runnable)
+        private PendingSnapshot(long delay, Consumer<Boolean> onCompletion)
         {
             this.delay = delay;
-            this.runnable = runnable;
+            this.onCompletion = onCompletion;
         }
     }
 
@@ -198,9 +199,16 @@ public class ListStore implements DataStore
         AsyncResult.Settable<Void> result = new 
AsyncResults.SettableResult<>();
         long delay = Math.max(1, random.nextBiasedLong(100, 1000, 5000) - 
pendingDelay);
         pendingDelay += delay;
-        pendingSnapshots.add(new PendingSnapshot(delay, () -> {
-            this.snapshot = snapshot;
-            result.setSuccess(null);
+        pendingSnapshots.add(new PendingSnapshot(delay, success -> {
+            if (success)
+            {
+                this.snapshot = snapshot;
+                result.setSuccess(null);
+            }
+            else
+            {
+                result.setFailure(new RuntimeException("Snapshot aborted due 
to earlier snapshot being restored"));
+            }
         }));
 
         if (pendingSnapshots.size() == 1)
@@ -216,7 +224,7 @@ public class ListStore implements DataStore
                 return;
 
             PendingSnapshot pendingSnapshot = pendingSnapshots.pollFirst();
-            pendingSnapshot.runnable.run();
+            pendingSnapshot.onCompletion.accept(true);
             pendingDelay -= pendingSnapshot.delay;
             if (!pendingSnapshots.isEmpty())
                 scheduleRunSnapshot();
@@ -234,6 +242,9 @@ public class ListStore implements DataStore
         purgedAts.addAll(snapshot.purgedAts);
         fetchCompletes.addAll(snapshot.fetchCompletes);
         pendingRemoves.addAll(snapshot.pendingRemoves);
+
+        while (!pendingSnapshots.isEmpty())
+            pendingSnapshots.pollFirst().onCompletion.accept(false);
     }
 
     public void clear()
@@ -547,6 +558,7 @@ public class ListStore implements DataStore
         // TODO (effeciency, correctness): remove the delayed removal logic.
         // This logic was added to make sure the sequence of events made sense 
but doesn't handle everything perfectly; I (David C) believe that this code
         // will suffer from the ABA problem; if a range is removed, then added 
back it is not likley to be handled correctly (the add will no-op as it wasn't 
removed, then the remove will remove it!)
+        if (pendingRemoves.isEmpty()) return;
         if (pendingRemoves.get(0) != epoch)
         {
             onRemovalDone.add(() -> performRemoval(epoch, removed, s));
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java 
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index 974e0009..d1754b6d 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -536,7 +536,7 @@ public class CommandsForKeyTest
         {
             CommonAttributes.Mutable result = new 
CommonAttributes.Mutable(txnId)
                    .durability(NotDurable)
-                   .updateParticipants(StoreParticipants.all(txnId, 
txnId.is(Key) ? KEY_ROUTE : RANGE_ROUTE));
+                   .updateParticipants(StoreParticipants.all(txnId.is(Key) ? 
KEY_ROUTE : RANGE_ROUTE));
 
             if (withDefinition)
                 result.partialTxn((txnId.domain() == Key ? KEY_TXN : 
RANGE_TXN).slice(RANGES, true));
@@ -992,7 +992,7 @@ public class CommandsForKeyTest
         }
 
         @Override
-        public void onHandledException(Throwable t)
+        public void onHandledException(Throwable t, String context)
         {
             throw new UnsupportedOperationException();
         }
diff --git 
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
index 79a1f3cd..098910da 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
@@ -80,7 +80,7 @@ public class MaelstromAgent implements Agent
     }
 
     @Override
-    public void onHandledException(Throwable t)
+    public void onHandledException(Throwable t, String context)
     {
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to