This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fe306fc5 Epoch/Topology Garbage Collection
fe306fc5 is described below

commit fe306fc5539b40d1c9d49f9afd0ca45bb74c49d3
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Mon Feb 17 18:49:40 2025 +0100

    Epoch/Topology Garbage Collection
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for 
CASSANDRA-20347.
---
 .../main/java/accord/api/ConfigurationService.java |  11 +-
 accord-core/src/main/java/accord/api/Journal.java  |   5 +-
 .../accord/impl/AbstractConfigurationService.java  |  68 ++++--
 accord-core/src/main/java/accord/local/Node.java   |  26 +-
 .../accord/local/durability/DurabilityService.java |  12 +-
 .../accord/local/durability/ShardDurability.java   |  18 ++
 .../java/accord/primitives/AbstractRanges.java     |   5 +
 .../src/main/java/accord/topology/Topologies.java  |   2 +-
 .../main/java/accord/topology/TopologyManager.java | 261 ++++++++++++++++++---
 .../accord/burn/BurnTestConfigurationService.java  |   8 +
 .../accord/coordinate/CoordinateSyncPointTest.java |   4 +-
 .../impl/AbstractConfigurationServiceTest.java     |  25 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |   4 +-
 .../java/accord/impl/basic/InMemoryJournal.java    |  17 +-
 .../java/accord/impl/basic/LoggingJournal.java     |   7 +-
 .../src/test/java/accord/impl/list/ListAgent.java  |  13 +-
 .../test/java/accord/impl/list/ListRequest.java    |  22 +-
 .../src/test/java/accord/impl/list/ListStore.java  |   5 -
 .../accord/impl/mock/MockConfigurationService.java |   6 +
 .../java/accord/topology/TopologyManagerTest.java  |  40 ++--
 .../src/test/java/accord/utils/Property.java       |   1 -
 .../src/main/java/accord/maelstrom/Cluster.java    |   3 +-
 .../java/accord/maelstrom/SimpleConfigService.java |   5 +
 23 files changed, 429 insertions(+), 139 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java 
b/accord-core/src/main/java/accord/api/ConfigurationService.java
index b8bbcbfc..90d53468 100644
--- a/accord-core/src/main/java/accord/api/ConfigurationService.java
+++ b/accord-core/src/main/java/accord/api/ConfigurationService.java
@@ -142,12 +142,6 @@ public interface ConfigurationService
          */
         void onRemoteSyncComplete(Node.Id node, long epoch);
 
-        /**
-         * Called when the configuration service is meant to truncate it's 
topology data up to (but not including)
-         * the given epoch
-         */
-        void truncateTopologyUntil(long epoch);
-
         /**
          * Called when no new TxnId may be agreed with an epoch less than or 
equal to the provided one.
          * This means future epochs are now aware of all TxnId with this epoch 
or earlier that may be executed
@@ -197,4 +191,9 @@ public interface ConfigurationService
     void reportEpochClosed(Ranges ranges, long epoch);
 
     void reportEpochRetired(Ranges ranges, long epoch);
+
+    /**
+     * Called after this epoch is garbage collected / removed on the current 
node.
+     */
+    void reportEpochRemoved(long epoch);
 }
diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
index ac95bda4..00e3ffb4 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -28,6 +28,7 @@ import accord.local.Command;
 import accord.local.CommandStores;
 import accord.local.DurableBefore;
 import accord.local.RedundantBefore;
+import accord.primitives.EpochSupplier;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -47,10 +48,10 @@ public interface Journal
     // TODO (required): propagate exceptions (i.e. using OnDone instead of 
Runnable)
     void saveCommand(int store, CommandUpdate value, Runnable onFlush);
 
-    Iterator<TopologyUpdate> replayTopologies();
+    Iterator<? extends TopologyUpdate> replayTopologies();
     void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush);
 
-    void purge(CommandStores commandStores);
+    void purge(CommandStores commandStores, EpochSupplier minEpoch);
     void replay(CommandStores commandStores);
 
     RedundantBefore loadRedundantBefore(int store);
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java 
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index fe8046fe..3cf7df2a 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -69,11 +69,25 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             return epoch;
         }
 
+        public boolean isReady()
+        {
+            return reads != null && reads.isDone();
+        }
+
         @Override
         public String toString()
         {
             return "EpochState{" + epoch + '}';
         }
+
+        @VisibleForTesting
+        public synchronized void setReadyForTesting(Topology topology)
+        {
+            this.topology = topology;
+            received.setSuccess(topology);
+            acknowledged.setSuccess(null);
+            reads = AsyncResults.success(null);
+        }
     }
 
     /**
@@ -96,6 +110,7 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
          */
         private volatile long lastReceived = 0;
         private volatile long lastAcknowledged = 0;
+        private volatile long lastTruncated = -1;
 
         protected abstract EpochState createEpochState(long epoch);
 
@@ -132,6 +147,11 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             return epochs.size();
         }
 
+        public boolean wasTruncated(long epoch)
+        {
+            return lastTruncated > 0 && lastTruncated >= epoch;
+        }
+
         public boolean isEmpty()
         {
             return lastReceived == 0;
@@ -139,8 +159,9 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
 
         synchronized EpochState getOrCreate(long epoch)
         {
+            Invariants.requireArgument(!wasTruncated(epoch), "Can not 
re-create truncated epoch %d. Last truncated: %d", epoch, lastTruncated);
             Invariants.requireArgument(epoch >= 0, "Epoch must be non-negative 
but given %d", epoch);
-            Invariants.requireArgument(epoch > 0 || (lastReceived == 0 && 
epochs.isEmpty()), "Received epoch 0 after initialization. Last received %d, 
epochsf; %s", lastReceived, epochs);
+            Invariants.requireArgument(epoch > 0 || (lastReceived == 0 && 
epochs.isEmpty()), "Received epoch 0 after initialization. Last received %d, 
epochs; %s", lastReceived, epochs);
             if (epochs.isEmpty())
             {
                 EpochState state = createEpochState(epoch);
@@ -237,15 +258,31 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             return getOrCreate(epoch).acknowledged;
         }
 
-        synchronized void truncateUntil(long epoch)
+        public synchronized void truncateUntil(long epoch)
         {
             Invariants.requireArgument(epoch <= maxEpoch(), "epoch %d > %d", 
epoch, maxEpoch());
             long minEpoch = minEpoch();
-            int toTrim = Ints.checkedCast(epoch - minEpoch);
-            if (toTrim <= 0)
+            int trimFrom = Ints.checkedCast(epoch - minEpoch);
+
+            final int count = epochs.size();
+            int highestReadyIdx = -1;
+            for (int i = trimFrom; i >= 0; i--)
+            {
+                if (epochs.get(i).isReady())
+                {
+                    highestReadyIdx = i;
+                    break;
+                }
+            }
+
+            // Always leave least 1 ready epoch after truncation
+            if (highestReadyIdx <= 0)
                 return;
 
-            epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size()));
+            List<EpochState> next = new 
ArrayList<>(epochs.subList(highestReadyIdx, count));
+            Invariants.require(next.get(0).reads.isDone());
+            epochs = next;
+            lastTruncated = next.get(0).epoch - 1;
         }
     }
 
@@ -294,12 +331,14 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
     @Override
     public void acknowledgeEpoch(EpochReady ready, boolean startSync)
     {
+        if (epochs.wasTruncated(ready.epoch))
+            return;
+
         ready.metadata.addCallback(() -> epochs.acknowledge(ready));
-        ready.coordinate.addCallback(() ->  
localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync));
+        ready.coordinate.addCallback(() -> 
localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync));
         ready.reads.addCallback(() ->  
localBootstrapsComplete(epochs.getOrCreate(ready.epoch).topology));
     }
 
-    protected void topologyUpdatePreListenerNotify(Topology topology) {}
     protected void topologyUpdatePostListenerNotify(Topology topology) {}
 
     public void reportTopology(Topology topology, boolean isLoad, boolean 
startSync)
@@ -332,7 +371,6 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         }
 
         epochs.receive(topology);
-        topologyUpdatePreListenerNotify(topology);
         for (Listener listener : listeners)
             listener.onTopologyUpdate(topology, isLoad, startSync);
         topologyUpdatePostListenerNotify(topology);
@@ -364,18 +402,6 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             listener.onEpochRetired(ranges, epoch);
     }
 
-    protected void truncateTopologiesPreListenerNotify(long epoch) {}
-    protected void truncateTopologiesPostListenerNotify(long epoch) {}
-
-    public void truncateTopologiesUntil(long epoch)
-    {
-        truncateTopologiesPreListenerNotify(epoch);
-        for (Listener listener : listeners)
-            listener.truncateTopologyUntil(epoch);
-        truncateTopologiesPostListenerNotify(epoch);
-        epochs.truncateUntil(epoch);
-    }
-
     // synchronized because state.reads is written
     public AsyncChain<Void> epochReady(long epoch)
     {
@@ -419,4 +445,4 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             return new EpochHistory();
         }
     }
-}
+}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 958a23c6..446633c5 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -335,7 +335,8 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
             if (this.topology.isEmpty()) return bootstrap.get();
             return 
orderFastPathReporting(this.topology.epochReady(topology.epoch() - 1), 
bootstrap.get());
         };
-        return this.topology.onTopologyUpdate(topology, 
orderFastPathReporting);
+
+        return this.topology.onTopologyUpdate(topology, 
orderFastPathReporting, configService::reportEpochRemoved);
     }
 
     private static EpochReady orderFastPathReporting(EpochReady previous, 
EpochReady next)
@@ -367,12 +368,6 @@ public class Node implements 
ConfigurationService.Listener, NodeCommandStoreServ
         topology.onEpochSyncComplete(node, epoch);
     }
 
-    @Override
-    public void truncateTopologyUntil(long epoch)
-    {
-        topology.truncateTopologyUntil(epoch);
-    }
-
     @Override
     public void onEpochClosed(Ranges ranges, long epoch)
     {
@@ -383,6 +378,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
     public void onEpochRetired(Ranges ranges, long epoch)
     {
         topology.onEpochRetired(ranges, epoch);
+        durabilityService.onEpochRetired(ranges, epoch);
     }
 
     // TODO (required): audit error handling, as the refactor to provide epoch 
timeouts appears to have broken a number of coordination
@@ -397,6 +393,12 @@ public class Node implements 
ConfigurationService.Listener, NodeCommandStoreServ
 
     public void withEpoch(long epoch, BiConsumer<Void, Throwable> callback)
     {
+        if (epoch < topology.minEpoch())
+        {
+            callback.accept(null, new 
TopologyManager.TopologyRetiredException(epoch, topology.minEpoch()));
+            return;
+        }
+
         if (topology.hasAtLeastEpoch(epoch))
         {
             callback.accept(null, null);
@@ -410,7 +412,9 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
 
     public void withEpoch(long epoch, BiConsumer<?, ? super Throwable> 
ifFailure, Runnable ifSuccess)
     {
-        if (topology.hasEpoch(epoch))
+        if (epoch < topology.minEpoch())
+            throw new TopologyManager.TopologyRetiredException(epoch, 
topology.minEpoch());
+        if (topology.hasAtLeastEpoch(epoch))
         {
             ifSuccess.run();
         }
@@ -426,6 +430,8 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
 
     public void withEpoch(long epoch, BiConsumer<?, Throwable> ifFailure, 
Function<Throwable, Throwable> onFailure, Runnable ifSuccess)
     {
+        if (epoch < topology.minEpoch())
+            throw new TopologyManager.TopologyRetiredException(epoch, 
topology.minEpoch());
         if (topology.hasEpoch(epoch))
         {
             ifSuccess.run();
@@ -443,6 +449,8 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
     @Inline
     public <T> AsyncChain<T> withEpoch(long epoch, Supplier<? extends 
AsyncChain<T>> supplier)
     {
+        if (epoch < topology.minEpoch())
+            throw new TopologyManager.TopologyRetiredException(epoch, 
topology.minEpoch());
         if (topology.hasEpoch(epoch))
         {
             return supplier.get();
@@ -932,4 +940,4 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
     {
         return time;
     }
-}
+}
\ No newline at end of file
diff --git 
a/accord-core/src/main/java/accord/local/durability/DurabilityService.java 
b/accord-core/src/main/java/accord/local/durability/DurabilityService.java
index e497b7ec..4502e084 100644
--- a/accord-core/src/main/java/accord/local/durability/DurabilityService.java
+++ b/accord-core/src/main/java/accord/local/durability/DurabilityService.java
@@ -177,18 +177,18 @@ public class DurabilityService implements 
ConfigurationService.Listener
     {
     }
 
-    @Override
-    public void truncateTopologyUntil(long epoch)
-    {
-    }
-
     @Override
     public void onEpochClosed(Ranges ranges, long epoch)
     {
     }
 
     @Override
-    public void onEpochRetired(Ranges ranges, long epoch)
+    public void onEpochRetired(Ranges retiredRanges, long epoch)
     {
+        // No need to cancel work for ranges that are still active
+        if (!node.topology().isFullyRetired(retiredRanges))
+            return;
+
+        shards.retireRanges(retiredRanges, epoch);
     }
 }
\ No newline at end of file
diff --git 
a/accord-core/src/main/java/accord/local/durability/ShardDurability.java 
b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
index 0bfa81c6..4c17b8c4 100644
--- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
+++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
@@ -496,6 +496,24 @@ public class ShardDurability
         }
     }
 
+    synchronized void retireRanges(Ranges retiredRanges, long epoch)
+    {
+        Map<Range, ShardScheduler> prev = new HashMap<>(this.shardSchedulers);
+        this.shardSchedulers.clear();
+
+        for (Map.Entry<Range, ShardScheduler> e : prev.entrySet())
+        {
+            if (retiredRanges.contains(e.getKey()))
+            {
+                logger.info("Cancelling durability scheduling for {}, since it 
was retired in epoch {}",
+                            e.getKey(), epoch);
+                e.getValue().markDefunct();
+            }
+            else
+                this.shardSchedulers.put(e.getKey(), e.getValue());
+        }
+    }
+
     synchronized void updateTopology(Topology latestGlobal)
     {
         if (latestGlobal.epoch() <= latestEpoch)
diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java 
b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
index 6b483ca3..dc19ff20 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
@@ -127,6 +127,11 @@ public abstract class AbstractRanges implements 
Iterable<Range>, Routables<Range
         return ((int) supersetLinearMerge(this.ranges, that.ranges)) == 
that.size();
     }
 
+    public boolean contains(Range range)
+    {
+        return indexOf(range, FAST) >= 0;
+    }
+
     @Override
     public int size()
     {
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java 
b/accord-core/src/main/java/accord/topology/Topologies.java
index 3a5dc838..a727acf3 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -573,7 +573,7 @@ public interface Topologies extends TopologySorter
 
         public Builder(int initialCapacity)
         {
-            buffer = ArrayBuffers.cachedAny().get(4);
+            buffer = ArrayBuffers.cachedAny().get(initialCapacity);
         }
 
         public void add(Topology topology)
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 5e2ce80a..4c22806a 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -20,18 +20,24 @@ package accord.topology;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.LongConsumer;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
 import accord.api.ConfigurationService;
@@ -40,6 +46,7 @@ import 
accord.api.ProtocolModifiers.QuorumEpochIntersections.Include;
 import accord.api.Timeouts;
 import accord.api.Timeouts.RegisteredTimeout;
 import accord.api.TopologySorter;
+import accord.api.VisibleForImplementation;
 import accord.coordinate.EpochTimeout;
 import accord.coordinate.tracking.QuorumTracker;
 import accord.local.CommandStore;
@@ -65,12 +72,13 @@ import static 
accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Owne
 import static 
accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Unsynced;
 import static accord.coordinate.tracking.RequestStatus.Success;
 import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
-import static 
accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithoutDeps;
 import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithDeps;
+import static 
accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithoutDeps;
 import static accord.primitives.TxnId.FastPath.Unoptimised;
 import static accord.utils.Invariants.illegalState;
 import static accord.utils.Invariants.nonNull;
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.stream.Collectors.joining;
 
 /**
  * Manages topology state changes and update bookkeeping
@@ -87,6 +95,7 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS;
  */
 public class TopologyManager
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(TopologyManager.class);
     private static final FutureEpoch SUCCESS;
 
     static
@@ -110,6 +119,21 @@ public class TopologyManager
         @GuardedBy("TopologyManager.this")
         Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY;
 
+        private volatile boolean allRetired;
+
+        public boolean allRetired()
+        {
+            if (allRetired)
+                return true;
+
+            if (!retired.containsAll(global.ranges))
+                return false;
+
+            Invariants.require(closed.containsAll(global.ranges));
+            allRetired = true;
+            return true;
+        }
+
         EpochState(Id node, Topology global, TopologySorter sorter, Ranges 
prevRanges)
         {
             this.self = node;
@@ -117,7 +141,7 @@ public class TopologyManager
             this.local = global.forNode(node).trim();
             Invariants.requireArgument(!global().isSubset());
             this.curShardSyncComplete = new BitSet(global.shards.length);
-            if (global().size() > 0)
+            if (!global().isEmpty())
                 this.syncTracker = new QuorumTracker(new Single(sorter, 
global()));
             else
                 this.syncTracker = null;
@@ -234,8 +258,10 @@ public class TopologyManager
             Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY;
         }
 
-        private static final Epochs EMPTY = new Epochs(new EpochState[0]);
+        private static final Epochs EMPTY = new Epochs(new EpochState[0], 
Collections.emptyList(), Collections.emptyList(), -1);
         private final long currentEpoch;
+        private final long firstNonEmptyEpoch;
+        // Epochs are sorted in _descending_ order
         private final EpochState[] epochs;
         // nodes we've received sync complete notifications from, for epochs 
we do not yet have topologies for.
         // Pending sync notifications are indexed by epoch, with the current 
epoch as index[0], and future epochs
@@ -249,9 +275,16 @@ public class TopologyManager
         // NOTE: this is NOT copy-on-write. This is mutated in place!
         private final List<FutureEpoch> futureEpochs;
 
-        private Epochs(EpochState[] epochs, List<Notifications> pending, 
List<FutureEpoch> futureEpochs)
+        private Epochs(EpochState[] epochs, List<Notifications> pending, 
List<FutureEpoch> futureEpochs, long prevFirstNonEmptyEpoch)
         {
             this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0;
+            if (prevFirstNonEmptyEpoch != -1)
+                this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch;
+            else if (epochs.length > 0  && !epochs[0].global().isEmpty())
+                this.firstNonEmptyEpoch = currentEpoch;
+            else
+                this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch;
+
             this.pending = pending;
             this.futureEpochs = futureEpochs;
             if (!futureEpochs.isEmpty())
@@ -261,12 +294,45 @@ public class TopologyManager
                 Invariants.requireArgument(futureEpochs.get(i).epoch == 
futureEpochs.get(i - 1).epoch + 1);
             for (int i = 1; i < epochs.length; i++)
                 Invariants.requireArgument(epochs[i].epoch() == epochs[i - 
1].epoch() - 1);
-            this.epochs = epochs;
-        }
+            int truncateFrom = -1;
+            // > 0 because we do not want to be left without epochs in case 
they're all empty
+            for (int i = epochs.length - 1; i > 0; i--)
+            {
+                EpochState epochState = epochs[i];
+                if (epochState.allRetired() &&
+                    (truncateFrom == -1 || truncateFrom == i + 1))
+                {
+                    Invariants.require(epochs[i].syncComplete());
+                    truncateFrom = i;
+                }
+            }
 
-        private Epochs(EpochState[] epochs)
-        {
-            this(epochs, new ArrayList<>(), new ArrayList<>());
+            if (truncateFrom == -1)
+            {
+                this.epochs = epochs;
+            }
+            else
+            {
+                this.epochs = Arrays.copyOf(epochs, truncateFrom);
+                if (logger.isDebugEnabled())
+                {
+                    for (int i = truncateFrom; i < epochs.length; i++)
+                    {
+                        EpochState state = epochs[i];
+                        Invariants.require(epochs[i].syncComplete());
+                        logger.debug("Retired epoch {} with added/removed 
ranges {}/{}. Topology: {}. Closed: {}", state.epoch(), state.addedRanges, 
state.removedRanges, state.global.ranges, state.closed);
+                    }
+                }
+                if (logger.isTraceEnabled())
+                {
+                    for (int i = 0; i < truncateFrom; i++)
+                    {
+                        EpochState state = epochs[i];
+                        Invariants.require(state.syncComplete());
+                        logger.trace("Leaving epoch {} with added/removed 
ranges {}/{}", state.epoch(), state.addedRanges, state.removedRanges);
+                    }
+                }
+            }
         }
 
         private FutureEpoch awaitEpoch(long epoch, TopologyManager manager)
@@ -365,30 +431,38 @@ public class TopologyManager
             {
                 i = indexOf(epoch);
             }
+
+            if (i == -1)
+            {
+                Invariants.require(epoch < minEpoch(), "Could not find epoch 
%d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch);
+                return; // notification came for an already truncated epoch
+            }
             while (epochs[i].recordClosed(ranges) && ++i < epochs.length) {}
         }
 
         /**
-         * Mark the epoch as "redundant" for the provided ranges; this means 
that all transactions that can be
+         * Mark the epoch as "retired" for the provided ranges; this means 
that all transactions that can be
          * proposed for this epoch have now been executed globally.
          */
         public void epochRetired(Ranges ranges, long epoch)
         {
             Invariants.requireArgument(epoch > 0);
-            int i;
+            int retiredIdx;
             if (epoch > currentEpoch)
             {
                 Notifications notifications = pending(epoch);
                 notifications.retired = 
notifications.retired.union(MERGE_ADJACENT, ranges);
-                i = 0; // record these ranges as complete for all earlier 
epochs as well
+                retiredIdx = 0; // record these ranges as complete for all 
earlier epochs as well
             }
             else
             {
-                i = indexOf(epoch);
-                if (i < 0)
+                retiredIdx = indexOf(epoch);
+                if (retiredIdx < 0)
                     return;
             }
-            while (epochs[i].recordRetired(ranges) && ++i < epochs.length) {}
+
+            for (int i = retiredIdx; i < epochs.length; i++)
+                epochs[i].recordRetired(ranges);
         }
 
         private Notifications pending(long epoch)
@@ -633,33 +707,39 @@ public class TopologyManager
         return new EpochsSnapshot(builder.build());
     }
 
-    public EpochReady onTopologyUpdate(Topology topology, Supplier<EpochReady> 
bootstrap)
+    public EpochReady onTopologyUpdate(Topology topology, Supplier<EpochReady> 
bootstrap, LongConsumer truncate)
     {
         FutureEpoch notifyDone;
         EpochReady ready;
+        Epochs prev;
+        Epochs next;
         synchronized (this)
         {
-            Epochs current = epochs;
-            Invariants.requireArgument(topology.epoch == current.nextEpoch() 
|| epochs == Epochs.EMPTY,
-                                       "Expected topology update %d to be %d", 
topology.epoch, current.nextEpoch());
-            EpochState[] nextEpochs = new EpochState[current.epochs.length + 
1];
-            List<Epochs.Notifications> pending = new 
ArrayList<>(current.pending);
+            prev = epochs;
+            Invariants.requireArgument(topology.epoch == prev.nextEpoch() || 
epochs == Epochs.EMPTY,
+                                       "Expected topology update %d to be %d", 
topology.epoch, prev.nextEpoch());
+            EpochState[] nextEpochs = new EpochState[prev.epochs.length + 1];
+            List<Epochs.Notifications> pending = new ArrayList<>(prev.pending);
             Epochs.Notifications notifications = pending.isEmpty() ? new 
Epochs.Notifications() : pending.remove(0);
 
-            System.arraycopy(current.epochs, 0, nextEpochs, 1, 
current.epochs.length);
+            System.arraycopy(prev.epochs, 0, nextEpochs, 1, 
prev.epochs.length);
 
-            Ranges prevAll = current.epochs.length == 0 ? Ranges.EMPTY : 
current.epochs[0].global.ranges;
+            Ranges prevAll = prev.epochs.length == 0 ? Ranges.EMPTY : 
prev.epochs[0].global.ranges;
             nextEpochs[0] = new EpochState(self, topology, 
sorter.get(topology), prevAll);
             
notifications.syncComplete.forEach(nextEpochs[0]::recordSyncComplete);
             nextEpochs[0].recordClosed(notifications.closed);
             nextEpochs[0].recordRetired(notifications.retired);
 
-            List<FutureEpoch> futureEpochs = new 
ArrayList<>(current.futureEpochs);
+            List<FutureEpoch> futureEpochs = new 
ArrayList<>(prev.futureEpochs);
             notifyDone = !futureEpochs.isEmpty() ? futureEpochs.remove(0) : 
null;
-            epochs = new Epochs(nextEpochs, pending, futureEpochs);
+            next = new Epochs(nextEpochs, pending, futureEpochs, 
prev.firstNonEmptyEpoch);
+            epochs = next;
             ready = nextEpochs[0].ready = bootstrap.get();
         }
 
+        if (next.minEpoch() != prev.minEpoch())
+            truncate.accept(epochs.minEpoch());
+        
         if (notifyDone != null)
             notifyDone.setDone();
 
@@ -709,7 +789,7 @@ public class TopologyManager
         return epochs.get(epoch).synced;
     }
 
-    public synchronized void truncateTopologyUntil(long epoch)
+    public synchronized void truncateTopologiesUntil(long epoch)
     {
         Epochs current = epochs;
         Invariants.requireArgument(current.epoch() >= epoch, "Unable to 
truncate; epoch %d is > current epoch %d", epoch , current.epoch());
@@ -722,7 +802,7 @@ public class TopologyManager
 
         EpochState[] nextEpochs = new EpochState[newLen];
         System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen);
-        epochs = new Epochs(nextEpochs, current.pending, current.futureEpochs);
+        epochs = new Epochs(nextEpochs, current.pending, current.futureEpochs, 
current.firstNonEmptyEpoch);
     }
 
     public synchronized void onEpochClosed(Ranges ranges, long epoch)
@@ -730,6 +810,28 @@ public class TopologyManager
         epochs.epochClosed(ranges, epoch);
     }
 
+    /**
+     * If ranges were added in epoch X, and are _not_ present in the current 
epoch, they
+     * are purged and durability scheduling for them should be cancelled.
+     */
+    public synchronized boolean isFullyRetired(Ranges ranges)
+    {
+        Epochs epochs = this.epochs;
+        EpochState current = epochs.get(epochs.currentEpoch);
+        if (!current.addedRanges.containsAll(ranges))
+            return false;
+
+        long minEpoch = epochs.minEpoch();
+        for (long i = minEpoch; i < epochs.currentEpoch; i++)
+        {
+            EpochState retiredIn = epochs.get(i);
+            if (retiredIn.allRetired() && 
retiredIn.addedRanges.containsAll(ranges))
+                return true;
+        }
+
+        return false;
+    }
+
     public synchronized void onEpochRetired(Ranges ranges, long epoch)
     {
         epochs.epochRetired(ranges, epoch);
@@ -760,8 +862,16 @@ public class TopologyManager
         return current().epoch;
     }
 
+    // TODO (desired): add tests for epoch GC and tracking
+    @VisibleForImplementation
+    public long firstNonEmpty()
+    {
+        return epochs.firstNonEmptyEpoch;
+    }
+
     public long minEpoch()
     {
+        Epochs epochs = this.epochs;
         return epochs.minEpoch();
     }
 
@@ -771,6 +881,71 @@ public class TopologyManager
         return epochs.get(epoch);
     }
 
+    /**
+     * Fetch topologies between {@param minEpoch} (inclusive), and {@param 
maxEpoch} (inclusive).
+     */
+    public TopologyRange between(long minEpoch, long maxEpoch)
+    {
+        Epochs epochs = this.epochs;
+        // No epochs known to Accord
+        if (epochs.firstNonEmptyEpoch == -1)
+            return new TopologyRange(epochs.minEpoch(), epochs.currentEpoch, 
epochs.firstNonEmptyEpoch, Collections.emptyList());
+
+        minEpoch = Math.max(minEpoch, epochs.minEpoch());
+        int diff =  Math.toIntExact(epochs.currentEpoch - minEpoch + 1);
+        List<Topology> topologies = new ArrayList<>(diff);
+        for (int i = 0; epochs.minEpoch() + i <= maxEpoch && i < diff; i++)
+            topologies.add(epochs.get(minEpoch + i).global);
+
+        return new TopologyRange(epochs.minEpoch(), epochs.currentEpoch, 
epochs.firstNonEmptyEpoch, topologies);
+    }
+
+    public static class TopologyRange
+    {
+        public final long min;
+        public final long current;
+        public final long firstNonEmpty;
+        public final List<Topology> topologies;
+
+        public TopologyRange(long min, long current, long firstNonEmpty, 
List<Topology> topologies)
+        {
+            this.min = min;
+            this.current = current;
+            this.topologies = topologies;
+            this.firstNonEmpty = firstNonEmpty;
+        }
+
+        public void forEach(Consumer<Topology> forEach, long minEpoch, int 
count)
+        {
+            if (minEpoch == 0) // Bootstrap
+                minEpoch = this.min;
+
+            long emptyUpTo = firstNonEmpty == -1 ? current : firstNonEmpty - 1;
+            // Report empty epochs
+            for (long epoch = minEpoch; epoch <= emptyUpTo && count > 0; 
epoch++, count--)
+                forEach.accept(new Topology(epoch));
+
+            // Report known non-empty epochs
+            for (int i = 0; i < topologies.size() && count > 0; i++, count--)
+            {
+                Topology topology = topologies.get(i);
+                Invariants.require(i > 0 || topology.epoch() == minEpoch || 
firstNonEmpty == topology.epoch(),
+                                   "Min epoch: %d. Range: %s", minEpoch, this);
+                forEach.accept(topology);
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("TopologyRange{min=%d, current=%d, 
firstNonEmpty=%d, topologies=[%s]}",
+                                 min,
+                                 current,
+                                 firstNonEmpty,
+                                 topologies.stream().map(t -> 
Long.toString(t.epoch())).collect(joining(",")));
+        }
+    }
+
     public Topologies preciseEpochs(long epoch)
     {
         return new Single(sorter, epochs.get(epoch).global);
@@ -864,9 +1039,8 @@ public class TopologyManager
         return atLeast(select, minEpoch, maxEpoch, isSufficientFor, collector);
     }
 
-
     private <C, K extends Routables<?>, T> T atLeast(K select, long minEpoch, 
long maxEpoch, Function<EpochState, Ranges> isSufficientFor,
-                                                     Collectors<C, K, T> 
collectors)
+                                                     Collectors<C, K, T> 
collectors) throws IllegalArgumentException
     {
         Invariants.requireArgument(minEpoch <= maxEpoch);
         Epochs snapshot = epochs;
@@ -898,8 +1072,11 @@ public class TopologyManager
 
         if (i == snapshot.epochs.length)
         {
-            if (!select.isEmpty())
-                throw new IllegalArgumentException("Ranges " + select + " 
could not be found");
+            // Epochs earlier than minEpoch might have been GC'd, so we can 
not collect
+            // matching ranges for them. However, if ranges were still present 
in the min epoch,
+            // we have reported them.
+            if (!select.isEmpty() && 
!select.without(snapshot.get(minEpoch).global.ranges).isEmpty())
+                throw Invariants.illegalArgument("Ranges %s could not be 
found", select);
             return collectors.multi(collector);
         }
 
@@ -922,11 +1099,15 @@ public class TopologyManager
             collector = collectors.update(collector, next, select, false);
             prev = next;
         } while (i < snapshot.epochs.length);
-        // needd to remove sufficent / added else remaining may not be empty 
when the final matches are the last epoch
+        // need to remove sufficient / added else remaining may not be empty 
when the final matches are the last epoch
         remaining = remaining.without(isSufficientFor.apply(prev));
         remaining = remaining.without(prev.addedRanges);
 
-        if (!remaining.isEmpty()) throw new IllegalArgumentException("Ranges " 
+ remaining + " could not be found");
+        // Epochs earlier than minEpoch might have been GC'd, so we can not 
collect
+        // matching ranges for them. However, if ranges were still present in 
the min epoch,
+        // we have reported them.
+        if (!remaining.isEmpty() && 
!select.without(snapshot.get(minEpoch).global.ranges).isEmpty())
+            Invariants.illegalArgument("Ranges %s could not be found", 
remaining);
 
         return collectors.multi(collector);
     }
@@ -1038,8 +1219,8 @@ public class TopologyManager
     public Topologies preciseEpochs(Unseekables<?> select, long minEpoch, long 
maxEpoch, SelectNodeOwnership selectNodeOwnership, SelectFunction 
selectFunction)
     {
         Epochs snapshot = epochs;
-
         EpochState maxState = snapshot.get(maxEpoch);
+
         Invariants.require(maxState != null, "Unable to find epoch %d; known 
epochs are %d -> %d", maxEpoch, snapshot.minEpoch(), snapshot.currentEpoch);
         if (minEpoch == maxEpoch)
             return new Single(sorter, 
selectFunction.apply(snapshot.get(minEpoch).global, select, 
selectNodeOwnership));
@@ -1091,6 +1272,8 @@ public class TopologyManager
 
     public Topology localForEpoch(long epoch)
     {
+        if (epoch < minEpoch())
+            throw new TopologyRetiredException(epoch, minEpoch());
         EpochState epochState = epochs.get(epoch);
         if (epochState == null)
             throw illegalState("Unknown epoch " + epoch);
@@ -1099,6 +1282,8 @@ public class TopologyManager
 
     public Ranges localRangesForEpoch(long epoch)
     {
+        if (epoch < minEpoch())
+            throw new TopologyRetiredException(epoch, minEpoch());
         return epochs.get(epoch).local().rangesForNode(self);
     }
 
@@ -1299,4 +1484,12 @@ public class TopologyManager
         T one(EpochState epoch, K select, boolean permitMissing);
         T multi(C collector);
     }
+
+    public static class TopologyRetiredException extends RuntimeException
+    {
+        public TopologyRetiredException(long epoch, long minEpoch)
+        {
+            super(String.format("Topology %s retired. Min topology %d", epoch, 
minEpoch));
+        }
+    }
 }
diff --git 
a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java 
b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index 3495c0f9..ce6f18e1 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -139,7 +139,10 @@ public class BurnTestConfigurationService extends 
AbstractConfigurationService.M
         public void onSuccess(Node.Id from, FetchTopologyReply reply)
         {
             if (reply.topology != null)
+            {
                 reportTopology(reply.topology);
+                pendingEpochs.remove(reply.topology.epoch());
+            }
             else
                 sendNext();
         }
@@ -224,4 +227,9 @@ public class BurnTestConfigurationService extends 
AbstractConfigurationService.M
         if (topology != null)
             topologyUpdates.epochRetired(lookup.apply(localId), 
topology.nodes(), ranges, epoch);
     }
+
+    @Override
+    public void reportEpochRemoved(long epoch)
+    {
+    }
 }
diff --git 
a/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java 
b/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
index c7f727d2..50578cd5 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java
@@ -70,7 +70,7 @@ class CoordinateSyncPointTest
                                    Utils.shard(removed, new 
SortedArrayList<>(new Node.Id[] { N2 })));
 
         Node n1 = Utils.createNode(N1, t1, happyPathMessaging(), new 
MockCluster.Clock(0), new TestAgent.RethrowAgent());
-        n1.topology().onTopologyUpdate(t2, () -> null);
+        n1.topology().onTopologyUpdate(t2, () -> null, e -> {});
         for (Node.Id node : ALL)
             n1.topology().onEpochSyncComplete(node, t1.epoch());
 
@@ -86,7 +86,7 @@ class CoordinateSyncPointTest
                                    Utils.shard(IntKey.range(0, 10), ALL));
 
         Node n1 = Utils.createNode(N1, t1, happyPathMessaging(), new 
MockCluster.Clock(0), new TestAgent.RethrowAgent());
-        n1.topology().onTopologyUpdate(t2, () -> null);
+        n1.topology().onTopologyUpdate(t2, () -> null, e -> {});
         for (Node.Id node : ALL)
             n1.topology().onEpochSyncComplete(node, t1.epoch());
 
diff --git 
a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java 
b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
index 6402d071..a26c2181 100644
--- 
a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
+++ 
b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
@@ -78,13 +78,6 @@ public class AbstractConfigurationServiceTest
                 throw new AssertionError(String.format("Recieved multiple 
syncs for epoch %s from %s", epoch, node));
         }
 
-        @Override
-        public void truncateTopologyUntil(long epoch)
-        {
-            if (!truncates.add(epoch))
-                throw new AssertionError(String.format("Recieved multiple 
truncates for epoch", epoch));
-        }
-
         @Override
         public void onEpochClosed(Ranges ranges, long epoch)
         {
@@ -170,6 +163,12 @@ public class AbstractConfigurationServiceTest
         public void reportEpochRetired(Ranges ranges, long epoch)
         {
         }
+
+        @Override
+        public void reportEpochRemoved(long epoch)
+        {
+
+        }
     }
 
     private static final Id ID1 = new Id(1);
@@ -313,12 +312,12 @@ public class AbstractConfigurationServiceTest
         EpochHistory history = new EpochHistory();
         Assertions.assertEquals(0, history.size());
 
-        history.getOrCreate(1);
-        history.getOrCreate(2);
-        history.getOrCreate(3);
-        history.getOrCreate(4);
-        history.getOrCreate(5);
-        history.getOrCreate(6);
+        history.getOrCreate(1).setReadyForTesting(new Topology(1));
+        history.getOrCreate(2).setReadyForTesting(new Topology(2));
+        history.getOrCreate(3).setReadyForTesting(new Topology(3));
+        history.getOrCreate(4).setReadyForTesting(new Topology(4));
+        history.getOrCreate(5).setReadyForTesting(new Topology(5));
+        history.getOrCreate(6).setReadyForTesting(new Topology(6));
 
         assertHistoryEpochs(history, 1, 2, 3, 4, 5, 6);
 
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 7f71da11..752acfe5 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -760,7 +760,7 @@ public class Cluster
 
                 // Replay journal
                 Journal journal = journalMap.get(id);
-                Iterator<Journal.TopologyUpdate> iter = 
journal.replayTopologies();
+                Iterator<? extends Journal.TopologyUpdate> iter = 
journal.replayTopologies();
                 Journal.TopologyUpdate lastUpdate = null;
                 while (iter.hasNext())
                 {
@@ -859,7 +859,7 @@ public class Cluster
             CommandStores stores = nodeMap.get(node.id()).commandStores();
             // run on node scheduler so doesn't run during replay
             scheduled = node.scheduler().selfRecurring(() -> {
-                journal.purge(stores);
+                journal.purge(stores, node.topology()::minEpoch);
                 schedule(clusterScheduler, rs, nodes, nodeMap, journalMap);
             }, 0, SECONDS);
         }
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 855c2f83..1f54dbfa 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -46,6 +46,7 @@ import accord.local.Node;
 import accord.local.RedundantBefore;
 import accord.local.StoreParticipants;
 import accord.primitives.Ballot;
+import accord.primitives.EpochSupplier;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
 import accord.primitives.Ranges;
@@ -105,6 +106,7 @@ public class InMemoryJournal implements Journal
     {
         this.id = id;
         this.agent = agent;
+
     }
 
     @Override
@@ -217,14 +219,14 @@ public class InMemoryJournal implements Journal
 
     public void truncateTopologiesForTesting(long minEpoch)
     {
-        List<TopologyUpdate> next = new ArrayList<>();
-        for (int i = 0; i < topologyUpdates.size(); i++)
+        Iterator<TopologyUpdate> iter = topologyUpdates.iterator();
+        while (iter.hasNext())
         {
-            TopologyUpdate update = topologyUpdates.get(i);
-            if (update.global.epoch() >= minEpoch)
-                next.add(update);
+            TopologyUpdate current = iter.next();
+            if (current.global.epoch() >= minEpoch)
+                break;
+            iter.remove();
         }
-        topologyUpdates.retainAll(next);
     }
 
     @Override
@@ -294,8 +296,9 @@ public class InMemoryJournal implements Journal
     }
 
     @Override
-    public void purge(CommandStores commandStores)
+    public void purge(CommandStores commandStores, EpochSupplier minEpoch)
     {
+        truncateTopologiesForTesting(minEpoch.epoch());
         for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> e : 
diffsPerCommandStore.entrySet())
         {
             int commandStoreId = e.getKey();
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index 564fe00b..1469bc47 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -32,6 +32,7 @@ import accord.local.Command;
 import accord.local.CommandStores;
 import accord.local.DurableBefore;
 import accord.local.RedundantBefore;
+import accord.primitives.EpochSupplier;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -92,7 +93,7 @@ public class LoggingJournal implements Journal
     }
 
     @Override
-    public Iterator<TopologyUpdate> replayTopologies()
+    public Iterator<? extends TopologyUpdate> replayTopologies()
     {
         log("REPLAY TOPOLOGIES\n");
         return delegate.replayTopologies();
@@ -108,10 +109,10 @@ public class LoggingJournal implements Journal
     }
 
     @Override
-    public void purge(CommandStores commandStores)
+    public void purge(CommandStores commandStores, EpochSupplier minEpoch)
     {
         log("PURGE\n");
-        delegate.purge(commandStores);
+        delegate.purge(commandStores, minEpoch);
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java 
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index a66b30a8..0a2083ff 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -18,6 +18,9 @@
 
 package accord.impl.list;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
@@ -46,6 +49,7 @@ import accord.primitives.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
+import accord.topology.TopologyManager;
 import accord.utils.Invariants;
 import accord.utils.RandomSource;
 import accord.utils.async.AsyncChain;
@@ -122,12 +126,17 @@ public class ListAgent implements Agent
         onStale.accept(staleSince, ranges);
     }
 
+    private static final Set<Class<?>> expectedExceptions = new 
HashSet<>(Arrays.asList(SimulatedFault.class, 
ExecuteSyncPoint.SyncPointErased.class, CancellationException.class, 
TopologyManager.TopologyRetiredException.class));
     @Override
     public void onUncaughtException(Throwable t)
     {
+        if (expectedExceptions.contains(t.getClass()))
+            return;
+
         // TODO (required): why are we now seeing SnapshotAborted? Nothing 
inherently wrong with it, but should find out what has changed.
-        if (!(t instanceof CoordinationFailed) && !(t instanceof 
SimulatedFault) && !(t instanceof ExecuteSyncPoint.SyncPointErased)
-            && !(t instanceof CancellationException) && !(t.getCause() 
instanceof CancellationException) && !(t instanceof ListStore.SnapshotAborted))
+        if (!(t instanceof CoordinationFailed)
+            && !(t.getCause() instanceof CancellationException)
+            && !(t instanceof ListStore.SnapshotAborted))
             onFailure.accept(t);
     }
 
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java 
b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index 18f7b07b..ddc06379 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -49,6 +49,7 @@ import accord.messages.Request;
 import accord.primitives.RoutingKeys;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
+import accord.topology.TopologyManager;
 
 import javax.annotation.Nullable;
 
@@ -95,8 +96,15 @@ public class ListRequest implements Request
 
         static void checkOnResult(Node node, TxnId txnId, RoutingKey homeKey, 
BiConsumer<Outcome, Throwable> callback)
         {
-            CheckOnResult result = new CheckOnResult(node, txnId, homeKey, 
callback);
-            result.start();
+            try
+            {
+                CheckOnResult result = new CheckOnResult(node, txnId, homeKey, 
callback);
+                result.start();
+            }
+            catch (Throwable t)
+            {
+                callback.accept(null, t);
+            }
         }
 
         @Override
@@ -104,7 +112,7 @@ public class ListRequest implements Request
         {
             ++count;
             // this method is called for each reply, so if we see a reply 
where the status is not known, it may be known on others;
-            // once all status are merged, then onDone will apply aditional 
logic to make sure things are safe.
+            // once all status are merged, then onDone will apply additional 
logic to make sure things are safe.
             if (ok.maxKnowledgeSaveStatus == SaveStatus.Uninitialised)
                 return Action.ApproveIfQuorum;
             return ok.maxKnowledgeSaveStatus.hasBeen(PreApplied) ? 
Action.Approve : Action.Reject;
@@ -205,6 +213,14 @@ public class ListRequest implements Request
                 node.withEpoch(txnId.epoch(), (success, fail) -> 
checkOnResult(hk, txnId, attempt + 1, t));
                 return;
             }
+
+            if (txnId.epoch() < node.topology().minEpoch())
+            {
+                node.reply(client, replyContext, ListResult.failure(client, 
((Packet)replyContext).requestId, txnId), null);
+                node.agent().onUncaughtException(new 
TopologyManager.TopologyRetiredException(txnId.epoch(), 
node.topology().minEpoch()));
+                return;
+            }
+
             if (homeKey == null)
                 homeKey = node.computeRoute(txnId, txn.keys()).homeKey();
             RoutingKey finalHomeKey = homeKey;
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java 
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index 848f4292..ac1f3496 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -596,11 +596,6 @@ public class ListStore implements DataStore, 
ConfigurationService.Listener
     {
     }
 
-    @Override
-    public void truncateTopologyUntil(long epoch)
-    {
-    }
-
     @Override
     public void onEpochClosed(Ranges ranges, long epoch)
     {
diff --git 
a/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java 
b/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java
index 0ed285d6..b90d5cda 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java
@@ -104,6 +104,12 @@ public class MockConfigurationService implements 
TestableConfigurationService
     {
     }
 
+    @Override
+    public void reportEpochRemoved(long epoch)
+    {
+
+    }
+
     @Override
     public synchronized void reportTopology(Topology topology)
     {
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java 
b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index 221a91ac..3752a953 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -85,8 +85,8 @@ public class TopologyManagerTest
         int[] unmoved = { 1, 3, 5 };
         int[] moved = { 2, 4 };
         TopologyManager service = testTopologyManager(SUPPLIER, ID);
-        service.onTopologyUpdate(t1, () -> null);
-        service.onTopologyUpdate(t2, () -> null);
+        service.onTopologyUpdate(t1, () -> null, e -> {});
+        service.onTopologyUpdate(t2, () -> null, e -> {});
 
         for (Unseekables<?> select : 
Arrays.asList(Ranges.ofSortedAndDeoverlapped(range(10, 20)), 
Ranges.ofSortedAndDeoverlapped(range(110, 120))))
         {
@@ -113,8 +113,8 @@ public class TopologyManagerTest
         TopologyManager service = testTopologyManager(SUPPLIER, ID);
 
         Assertions.assertSame(Topology.EMPTY, service.current());
-        service.onTopologyUpdate(topology1, () -> null);
-        service.onTopologyUpdate(topology2, () -> null);
+        service.onTopologyUpdate(topology1, () -> null, e -> {});
+        service.onTopologyUpdate(topology2, () -> null, e -> {});
 
         Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
         Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
@@ -136,8 +136,8 @@ public class TopologyManagerTest
                                       shard(range(200, 300), idList(4, 5, 6), 
idSet(5, 6)));
 
         TopologyManager service = testTopologyManager(SUPPLIER, ID);
-        service.onTopologyUpdate(topology1, () -> null);
-        service.onTopologyUpdate(topology2, () -> null);
+        service.onTopologyUpdate(topology1, () -> null, e -> {});
+        service.onTopologyUpdate(topology2, () -> null, e -> {});
 
         return service;
     }
@@ -164,9 +164,9 @@ public class TopologyManagerTest
         Shard[] shards = { shard(range(0, 100), idList(1, 2, 3), idSet(1, 2, 
3)),
                            shard(range(100, 200), idList(3, 4, 5), idSet(3, 4, 
5)) };
 
-        service.onTopologyUpdate(topology(1, shards), () -> null);
-        service.onTopologyUpdate(topology(2, shards), () -> null);
-        service.onTopologyUpdate(topology(3, shards), () -> null);
+        service.onTopologyUpdate(topology(1, shards), () -> null, e -> {});
+        service.onTopologyUpdate(topology(2, shards), () -> null, e -> {});
+        service.onTopologyUpdate(topology(3, shards), () -> null, e -> {});
 
         for (int i = 1; i <= 5; i++)
             service.onEpochSyncComplete(id(i), service.epoch());
@@ -186,20 +186,18 @@ public class TopologyManagerTest
         Range range = range(100, 200);
         Topology topology1 = topology(1, shard(range, idList(1, 2, 3), 
idSet(1, 2)));
         Topology topology2 = topology(2, shard(range, idList(1, 2, 3), 
idSet(2, 3)));
-//        Topology topology3 = topology(3, shard(range, idList(1, 2, 3), 
idSet(3, 4)));
 
         TopologyManager service = testTopologyManager(SUPPLIER, ID);
-        service.onTopologyUpdate(topology1, () -> null);
+        service.onTopologyUpdate(topology1, () -> null, e -> {});
 
         // sync epoch 2
         service.onEpochSyncComplete(id(2), 2);
         service.onEpochSyncComplete(id(3), 2);
 
         // learn of epoch 2
-        service.onTopologyUpdate(topology2, () -> null);
+        service.onTopologyUpdate(topology2, () -> null, e -> {});
         Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
         Assertions.assertTrue(service.getEpochStateUnsafe(2).syncComplete());
-//        Assertions.assertTrue(service.getEpochStateUnsafe(3).syncComplete());
     }
 
     @Test
@@ -213,9 +211,9 @@ public class TopologyManagerTest
         TopologyManager service = testTopologyManager(SUPPLIER, ID);
 
         Assertions.assertSame(Topology.EMPTY, service.current());
-        service.onTopologyUpdate(topology1, () -> null);
-        service.onTopologyUpdate(topology2, () -> null);
-        service.onTopologyUpdate(topology3, () -> null);
+        service.onTopologyUpdate(topology1, () -> null, e -> {});
+        service.onTopologyUpdate(topology2, () -> null, e -> {});
+        service.onTopologyUpdate(topology3, () -> null, e -> {});
         Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
 
         RoutingKeys keys = keys(150).toParticipants();
@@ -241,8 +239,8 @@ public class TopologyManagerTest
                                       shard(range(200, 300), idList(4, 5, 6), 
idSet(5, 6)));
 
         TopologyManager service = testTopologyManager(SUPPLIER, ID);
-        service.onTopologyUpdate(topology5, () -> null);
-        service.onTopologyUpdate(topology6, () -> null);
+        service.onTopologyUpdate(topology5, () -> null, e -> {});
+        service.onTopologyUpdate(topology6, () -> null, e -> {});
 
         Assertions.assertSame(topology6, 
service.getEpochStateUnsafe(6).global());
         Assertions.assertSame(topology5, 
service.getEpochStateUnsafe(5).global());
@@ -260,7 +258,7 @@ public class TopologyManagerTest
 
     private static void addAndMarkSynced(TopologyManager service, Topology 
topology)
     {
-        service.onTopologyUpdate(topology, () -> null);
+        service.onTopologyUpdate(topology, () -> null, e -> {});
         markTopologySynced(service, topology.epoch());
     }
 
@@ -279,7 +277,7 @@ public class TopologyManagerTest
         Assertions.assertTrue(service.hasEpoch(3));
         Assertions.assertTrue(service.hasEpoch(4));
 
-        service.truncateTopologyUntil(3);
+        service.truncateTopologiesUntil(3);
         Assertions.assertFalse(service.hasEpoch(1));
         Assertions.assertFalse(service.hasEpoch(2));
         Assertions.assertTrue(service.hasEpoch(3));
@@ -550,7 +548,7 @@ public class TopologyManagerTest
                 case OnTopologyUpdate:
                     Topology t = next.next();
                     preTopologyUpdate(id, t);
-                    tm.onTopologyUpdate(t, () -> null);
+                    tm.onTopologyUpdate(t, () -> null, e -> {});
                     pendingSyncComplete.put(t.epoch, new HashSet<>(t.nodes()));
                     postTopologyUpdate(id, t);
                     break;
diff --git a/accord-core/src/test/java/accord/utils/Property.java 
b/accord-core/src/test/java/accord/utils/Property.java
index 4a7c2703..bef889e3 100644
--- a/accord-core/src/test/java/accord/utils/Property.java
+++ b/accord-core/src/test/java/accord/utils/Property.java
@@ -495,7 +495,6 @@ public class Property
                 }
                 catch (Throwable t)
                 {
-
                     throw new PropertyError(statefulPropertyError(this, t, 
state, maybeRewriteHistory(history, historyTiming)), t);
                 }
                 if (pure)
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index f26cc7dc..01161363 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -67,6 +67,7 @@ import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
 import accord.messages.SafeCallback;
+import accord.primitives.EpochSupplier;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -392,7 +393,7 @@ public class Cluster implements Scheduler
         @Override public void saveCommand(int store, CommandUpdate value, 
Runnable onFlush)  { throw new IllegalStateException("Not impelemented"); }
         @Override public Iterator<TopologyUpdate> replayTopologies() { throw 
new IllegalStateException("Not impelemented"); }
         @Override public void saveTopology(TopologyUpdate topologyUpdate, 
Runnable onFlush)  { throw new IllegalStateException("Not impelemented"); }
-        @Override public void purge(CommandStores commandStores)  { throw new 
IllegalStateException("Not impelemented"); }
+        @Override public void purge(CommandStores commandStores, EpochSupplier 
minEpoch)  { throw new IllegalStateException("Not impelemented"); }
         @Override public void replay(CommandStores commandStores)  { throw new 
IllegalStateException("Not impelemented"); }
         @Override public RedundantBefore loadRedundantBefore(int store) { 
throw new IllegalStateException("Not impelemented"); }
         @Override public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int 
store) { throw new IllegalStateException("Not impelemented"); }
diff --git 
a/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java
index 6371c0cf..e4e36644 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java
@@ -69,5 +69,10 @@ public class SimpleConfigService implements 
ConfigurationService
     public void reportEpochRetired(Ranges ranges, long epoch)
     {
     }
+
+    @Override
+    public void reportEpochRemoved(long epoch)
+    {
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to