This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 973335bf Follow-up to CASSANDRA-21042: SyncPoints should operate 
against the last epoch containing their ranges, which should refuse other 
transactions
973335bf is described below

commit 973335bfe7e930c6646016a4a48cff084f49b660
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Nov 26 10:30:58 2025 +0000

    Follow-up to CASSANDRA-21042: SyncPoints should operate against the last 
epoch containing their ranges, which should refuse other transactions
---
 .../coordinate/AbstractCoordinatePreAccept.java    |  6 +-
 .../accord/coordinate/CoordinateMaxConflict.java   |  6 +-
 .../accord/coordinate/CoordinateSyncPoint.java     |  2 +-
 .../accord/coordinate/CoordinationAdapter.java     |  4 +-
 .../java/accord/coordinate/ExecuteSyncPoint.java   |  2 +-
 .../src/main/java/accord/local/Bootstrap.java      |  2 +-
 accord-core/src/main/java/accord/local/Node.java   | 56 ++++++++++-------
 .../accord/local/durability/DurabilityService.java | 19 +++---
 .../accord/local/durability/ShardDurability.java   |  3 +-
 .../main/java/accord/messages/NoWaitRequest.java   |  1 +
 .../src/main/java/accord/topology/ActiveEpoch.java |  9 ++-
 .../main/java/accord/topology/ActiveEpochs.java    | 12 +++-
 .../src/main/java/accord/topology/Topology.java    | 37 ++++++++++-
 .../main/java/accord/topology/TopologyManager.java | 73 +++++++++++++++++++++-
 .../java/accord/topology/TopologyMismatch.java     | 69 ++++++++------------
 .../src/test/java/accord/burn/TopologyUpdates.java |  1 +
 .../coordinate/CoordinateTransactionTest.java      | 17 +++--
 .../java/accord/coordinate/TopologyChangeTest.java |  6 +-
 .../src/test/java/accord/impl/list/ListAgent.java  |  3 +-
 .../src/test/java/accord/local/CommandsTest.java   |  2 +-
 .../java/accord/local/ImmutableCommandTest.java    |  4 +-
 .../test/java/accord/messages/ReadDataTest.java    |  2 +-
 .../java/accord/topology/TopologyRandomizer.java   |  7 ++-
 23 files changed, 226 insertions(+), 117 deletions(-)

diff --git 
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java 
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
index 1bb115ee..f31ab8a0 100644
--- 
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
+++ 
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
@@ -27,12 +27,8 @@ import accord.messages.Callback;
 import accord.primitives.FullRoute;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
-import accord.topology.Topology;
 import accord.topology.TopologyException;
 import accord.topology.TopologyMismatch;
-import accord.topology.TopologyRetiredException;
-
-import static accord.topology.TopologyMismatch.TopologyMatch.LATEST;
 
 /**
  * Abstract parent class for implementing preaccept-like operations where we 
may need to fetch additional replies
@@ -64,7 +60,7 @@ abstract class AbstractCoordinatePreAccept<Result, Reply 
extends accord.messages
         TopologyMismatch mismatch;
         try
         {
-            mismatch = TopologyMismatch.checkForMismatch(latestEpoch, scope, 
node.topology().active(), LATEST);
+            mismatch = TopologyMismatch.checkForMismatch(latestEpoch, scope, 
node.topology().active(), txnId.kind());
         }
         catch (TopologyException t)
         {
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateMaxConflict.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateMaxConflict.java
index 2f0f589c..a4a42cd1 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateMaxConflict.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateMaxConflict.java
@@ -33,6 +33,7 @@ import accord.messages.GetMaxConflict.GetMaxConflictOk;
 import accord.primitives.FullRoute;
 import accord.primitives.Routables;
 import accord.primitives.Timestamp;
+import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.topology.ActiveEpochs;
 import accord.topology.Topologies;
@@ -43,7 +44,6 @@ import accord.utils.async.Cancellable;
 
 import static accord.coordinate.tracking.RequestStatus.Failed;
 import static accord.coordinate.tracking.RequestStatus.Success;
-import static accord.topology.TopologyMismatch.TopologyMatch.ANY;
 
 /**
  * Calculate the maximum TxnId that could have been agreed before this 
operation started
@@ -82,8 +82,8 @@ public class CoordinateMaxConflict extends 
AbstractCoordinatePreAccept<Timestamp
         try
         {
             ActiveEpochs active = node.topology().active();
-            long epoch = active.epoch();
-            FullRoute<?> route = node.computeRoute(epoch, keysOrRanges, 
active, ANY);
+            long epoch = active.maxEpoch(Long.MIN_VALUE, keysOrRanges);
+            FullRoute<?> route = node.computeRoute(epoch, keysOrRanges, 
active, Txn.Kind.ExclusiveSyncPoint);
             Topologies topologies = active.withUnsyncedEpochs(route, epoch, 
epoch);
             coordinate = new CoordinateMaxConflict(node, 
node.someSequentialExecutor(), topologies, route, epoch, callback);
         }
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
index 88b2f2aa..2c8ae89c 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
@@ -96,7 +96,7 @@ public class CoordinateSyncPoint<R> extends 
CoordinatePreAccept<R>
     public static AsyncChain<SyncPoint> coordinate(Node node, Txn.Kind kind, 
Ranges ranges, SyncPointAdapter<SyncPoint> adapter)
     {
         Invariants.requireArgument(kind.isSyncPoint());
-        TxnId txnId = node.nextTxnIdWithDefaultFlags(kind, ranges.domain(), 
cardinality(ranges));
+        TxnId txnId = node.nextTxnIdWithDefaultFlags(ranges, kind, 
ranges.domain(), cardinality(ranges));
         return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node, 
txnId, ranges, adapter));
     }
 
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java 
b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
index 7d8e8510..bf169935 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
@@ -462,7 +462,7 @@ public interface CoordinationAdapter<R>
                 if (txnId.is(Routable.Domain.Range))
                 {
                     MinimalSyncPoint syncPoint = new MinimalSyncPoint(txnId, 
executeAt, (RangeRoute) route);
-                    node.topology().onEpochClosed(syncPoint.route.toRanges(), 
syncPoint.syncId.epoch() - 1);
+                    node.topology().onEpochClosed(syncPoint.route.toRanges(), 
syncPoint.syncId);
                     node.durability().report(new DurabilityResult(syncPoint, 
DurabilityLevel.NONE, null));
                 }
             }
@@ -482,7 +482,7 @@ public interface CoordinationAdapter<R>
                 }
                 if (txnId.is(Routable.Domain.Range))
                 {
-                    node.topology().onEpochClosed(syncPoint.route.toRanges(), 
syncPoint.syncId.epoch() - 1);
+                    node.topology().onEpochClosed(syncPoint.route.toRanges(), 
syncPoint.syncId);
                     node.durability().report(new DurabilityResult(syncPoint, 
DurabilityLevel.NONE, null));
                 }
             }
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index 986f9bc8..7cac839e 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -240,7 +240,7 @@ public class ExecuteSyncPoint extends 
AbstractCoordination<Route<Range>, Durabil
             DurabilityResult result = current();
             if (result.min.remote == SyncRemote.All)
             {
-                node.topology().onEpochRetired(scope.toRanges(), 
syncPoint.syncId.epoch() - 1);
+                node.topology().onEpochRetired(scope.toRanges(), 
syncPoint.syncId);
                 node.send(tracker.topologies(), new SetShardDurable(syncPoint, 
Universal));
             }
             else if (result.min.remote == SyncRemote.Quorum)
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java 
b/accord-core/src/main/java/accord/local/Bootstrap.java
index 56661dbd..27aba94c 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -118,7 +118,7 @@ class Bootstrap
 
         TxnId start(SafeCommandStore safeStore)
         {
-            globalSyncId = node.nextTxnIdWithDefaultFlags(epoch, 0, 
ExclusiveSyncPoint, Domain.Range, Cardinality.Any);
+            globalSyncId = node.nextTxnIdWithDefaultFlags(epoch, 0, valid, 
ExclusiveSyncPoint, Domain.Range, Cardinality.Any);
             Invariants.require(epoch <= globalSyncId.epoch(), "Attempting to 
use local epoch %d which is larger than global epoch %d", epoch, 
globalSyncId.epoch());
 
             if (valid.isEmpty())
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 36cd6710..0ff73086 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -82,7 +82,6 @@ import accord.primitives.TxnId;
 import accord.primitives.TxnId.Cardinality;
 import accord.topology.TopologyException;
 import accord.topology.TopologyManager;
-import accord.topology.TopologyMismatch.TopologyMatch;
 import accord.topology.TopologyRetiredException;
 import accord.utils.Invariants;
 import accord.utils.PersistentField;
@@ -567,47 +566,56 @@ public class Node implements NodeCommandStoreService
         messageSink.reply(replyingToNode, replyContext, send);
     }
 
-    public TxnId nextTxnIdWithDefaultFlags(Txn.Kind rw, Domain domain)
+    public TxnId nextTxnIdWithDefaultFlags(Seekables<?, ?> keys, Txn.Kind 
kind, Domain domain)
     {
-        return nextTxnIdWithFlags(rw, domain, Any, defaultMediumPath().bit());
+        return nextTxnIdWithFlags(keys, kind, domain, Any, 
defaultMediumPath().bit());
     }
 
-    public TxnId nextStaleTxnIdWithDefaultFlags(long minEpoch, long minHlc, 
Txn.Kind rw, Domain domain)
+    public TxnId nextStaleTxnIdWithDefaultFlags(long minEpoch, long minHlc, 
Seekables<?, ?> keys, Txn.Kind kind, Domain domain)
     {
-        return nextStaleTxnIdWithFlags(minEpoch, minHlc, rw, domain, Any, 
defaultMediumPath().bit());
+        return nextStaleTxnIdWithFlags(minEpoch, minHlc, keys, kind, domain, 
Any, defaultMediumPath().bit());
     }
 
-    public TxnId nextTxnIdWithDefaultFlags(Txn.Kind rw, Domain domain, 
Cardinality cardinality)
+    public TxnId nextTxnIdWithDefaultFlags(Seekables<?, ?> keys, Txn.Kind 
kind, Domain domain, Cardinality cardinality)
     {
-        return nextTxnIdWithFlags(rw, domain, cardinality, 
defaultMediumPath().bit());
+        return nextTxnIdWithFlags(keys, kind, domain, cardinality, 
defaultMediumPath().bit());
     }
 
-    public TxnId nextTxnIdWithDefaultFlags(long minEpoch, long minHlc, 
Txn.Kind rw, Domain domain, Cardinality cardinality)
+    private long epoch(long minEpoch, Seekables<?, ?> keys, Txn.Kind kind)
     {
-        return newTxnId(Math.max(minEpoch, epoch()), uniqueNow(minHlc), rw, 
domain, cardinality, defaultMediumPath().bit(), id);
+        if (!kind.isSyncPoint())
+            return Math.max(minEpoch, epoch());
+
+        return topology.active().maxEpoch(minEpoch, keys);
+    }
+
+    public TxnId nextTxnIdWithDefaultFlags(long minEpoch, long minHlc, 
Seekables<?, ?> keys, Txn.Kind kind, Domain domain, Cardinality cardinality)
+    {
+        long epoch = epoch(minEpoch, keys, kind);
+        return newTxnId(epoch, uniqueNow(minHlc), kind, domain, cardinality, 
defaultMediumPath().bit(), id);
     }
 
     /**
      * TODO (required): Make sure we cannot re-issue the same txnid on startup
      * TODO (required): Don't use new epoch for TxnId until a quorum is ready 
to coordinate it
      */
-    public TxnId nextTxnIdWithFlags(Txn.Kind rw, Domain domain, Cardinality 
cardinality, int flags)
+    public TxnId nextTxnIdWithFlags(Seekables<?, ?> keys, Txn.Kind kind, 
Domain domain, Cardinality cardinality, int flags)
     {
-        return newTxnId(epoch(), uniqueNow(), rw, domain, cardinality, flags, 
id);
+        return newTxnId(epoch(Long.MIN_VALUE, keys, kind), uniqueNow(), kind, 
domain, cardinality, flags, id);
     }
 
-    public TxnId nextStaleTxnIdWithFlags(long minEpoch, long minHlc, Txn.Kind 
rw, Domain domain, Cardinality cardinality, int flags)
+    public TxnId nextStaleTxnIdWithFlags(long minEpoch, long minHlc, 
Seekables<?, ?> keys, Txn.Kind kind, Domain domain, Cardinality cardinality, 
int flags)
     {
-        long epoch = Math.max(minEpoch, epoch());
+        long epoch = epoch(minEpoch, keys, kind);
         long hlc = uniqueStale(minHlc);
-        return newTxnId(epoch, hlc, rw, domain, cardinality, flags, id);
+        return newTxnId(epoch, hlc, kind, domain, cardinality, flags, id);
     }
 
-    private static TxnId newTxnId(long epoch, long now, Txn.Kind rw, Domain 
domain, Cardinality cardinality, int flags, Node.Id node)
+    private static TxnId newTxnId(long epoch, long now, Txn.Kind kind, Domain 
domain, Cardinality cardinality, int flags, Node.Id node)
     {
-        Invariants.require(domain == Key || rw != Write, "Range writes not 
supported without forwarding uniqueHlc information to WaitingOn for direct 
dependencies");
-        Invariants.require(domain == Range || !rw.isSyncPoint, "Key 
ExclusiveSyncPoint not supported without improvements to CommandsForKey for 
managing execution");
-        TxnId txnId = new TxnId(epoch, now, flags, rw, domain, cardinality, 
node);
+        Invariants.require(domain == Key || kind != Write, "Range writes not 
supported without forwarding uniqueHlc information to WaitingOn for direct 
dependencies");
+        Invariants.require(domain == Range || !kind.isSyncPoint, "Key 
ExclusiveSyncPoint not supported without improvements to CommandsForKey for 
managing execution");
+        TxnId txnId = new TxnId(epoch, now, flags, kind, domain, cardinality, 
node);
         Invariants.require((txnId.lsb & (0xffff & ~TxnId.IDENTITY_FLAGS)) == 
0);
         return txnId;
     }
@@ -635,9 +643,9 @@ public class Node implements NodeCommandStoreService
         Cardinality cardinality = cardinality(domain, keys);
 
         if (!usePrivilegedCoordinator() || (kind != Read && kind != Write))
-            return nextTxnIdWithDefaultFlags(minEpoch, minHlc, kind, domain, 
cardinality);
+            return nextTxnIdWithDefaultFlags(minEpoch, minHlc, keys, kind, 
domain, cardinality);
 
-        long epoch = Math.max(minEpoch, epoch());
+        long epoch = epoch(minEpoch, keys, kind);
         long hlc = uniqueNow(minHlc);
         int flags = computeBestDefaultTxnIdFlags(keys, epoch);
         TxnId txnId = new TxnId(epoch, hlc, flags, kind, domain, cardinality, 
id);
@@ -662,7 +670,7 @@ public class Node implements NodeCommandStoreService
         Txn.Kind kind = txn.kind();
         Domain domain = keys.domain();
 
-        long epoch = epoch();
+        long epoch = epoch(Long.MIN_VALUE, keys, kind);
         long now = uniqueNow();
         fastPath = ensurePermitted(fastPath);
         if (fastPath != Unoptimised && (!epochs.hasEpoch(epoch) || 
!epochs.supportsPrivilegedFastPath(keys, epoch)))
@@ -699,17 +707,17 @@ public class Node implements NodeCommandStoreService
 
     public FullRoute<?> computeRoute(TxnId txnId, Routables<?> keysOrRanges) 
throws TopologyException
     {
-        return computeRoute(txnId.epoch(), keysOrRanges, topology.active(), 
txnId.isSyncPoint() ? TopologyMatch.ANY : TopologyMatch.LATEST);
+        return computeRoute(txnId.epoch(), keysOrRanges, topology.active(), 
txnId.kind());
     }
 
-    public FullRoute<?> computeRoute(long epoch, Routables<?> keysOrRanges, 
ActiveEpochs active, TopologyMatch match) throws TopologyException
+    public FullRoute<?> computeRoute(long epoch, Routables<?> keysOrRanges, 
ActiveEpochs active, Txn.Kind kind) throws TopologyException
     {
         Invariants.requireArgument(!keysOrRanges.isEmpty(), "Attempted to 
compute a route from empty keys or ranges");
 
         RoutingKey homeKey = selectHomeKey(active.get(epoch), keysOrRanges);
         FullRoute<?> route = keysOrRanges.toRoute(homeKey);
 
-        TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(epoch, 
keysOrRanges, active, match);
+        TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(epoch, 
keysOrRanges, active, kind);
         if (mismatch != null)
             throw mismatch;
 
diff --git 
a/accord-core/src/main/java/accord/local/durability/DurabilityService.java 
b/accord-core/src/main/java/accord/local/durability/DurabilityService.java
index 18adccee..eba7b53a 100644
--- a/accord-core/src/main/java/accord/local/durability/DurabilityService.java
+++ b/accord-core/src/main/java/accord/local/durability/DurabilityService.java
@@ -40,6 +40,7 @@ import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.topology.ActiveEpoch;
 import accord.topology.ActiveEpochs;
+import accord.topology.Shard;
 import accord.topology.Topology;
 import accord.topology.TopologyException;
 import accord.utils.Invariants;
@@ -239,12 +240,11 @@ public class DurabilityService implements TopologyListener
             next.reportSuccess();
     }
 
-
     @Override
-    public void onReceived(Topology topology)
+    public void onActive(ActiveEpoch epoch)
     {
-        shards.updateTopology(topology);
-        global.updateTopology(topology);
+        shards.updateTopology(epoch.global());
+        global.updateTopology(epoch.global());
     }
 
     @Override
@@ -261,10 +261,13 @@ public class DurabilityService implements TopologyListener
             e = epochs.getKnown(epochs.minEpoch());
         }
 
+        Ranges retiredAndRemoved = e.global().foldl(retiredRanges, (shard, rs, 
i) -> {
+            if (shard.is(Shard.Flag.PENDING_REMOVAL))
+                return rs.with(Ranges.of(shard.range));
+            return rs;
+        }, Ranges.EMPTY);
         // if the ranges are retired and have been removed in the epoch in 
which they're retired, then we can retire the associated scheduler(s)
-        retiredRanges = retiredRanges.without(e.global().ranges());
-        if (retiredRanges.isEmpty())
-            return;
-        shards.retireRanges(retiredRanges, epoch);
+        if (!retiredAndRemoved.isEmpty())
+            shards.retireRanges(retiredAndRemoved, epoch);
     }
 }
\ No newline at end of file
diff --git 
a/accord-core/src/main/java/accord/local/durability/ShardDurability.java 
b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
index ca269279..fb287db4 100644
--- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
+++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
@@ -38,7 +38,6 @@ import accord.coordinate.CoordinationFailed;
 import accord.local.DurableBefore;
 import accord.local.Node;
 import accord.local.ShardDistributor;
-import accord.primitives.FullRoute;
 import accord.primitives.SyncPoint;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
@@ -369,7 +368,7 @@ public class ShardDurability
                 }
             }
             minHlc = Math.max(minHlc, node.agent().minStaleHlc(node, 
activeRequest != null));
-            TxnId staleId = node.nextStaleTxnIdWithDefaultFlags(minEpoch, 
minHlc, kind, Domain.Range);
+            TxnId staleId = node.nextStaleTxnIdWithDefaultFlags(minEpoch, 
minHlc, ranges, kind, Domain.Range);
             if (activeRequest != null) logger.info("Initiating RX requested by 
{} for {} with TxnId {}. Remaining: {}.", activeRequest.requestedBy, ranges, 
staleId, active);
             else logger.debug("Initiating RX for durability of {} with TxnId 
{}.", ranges, staleId);
 
diff --git a/accord-core/src/main/java/accord/messages/NoWaitRequest.java 
b/accord-core/src/main/java/accord/messages/NoWaitRequest.java
index fa58359e..ea6a252d 100644
--- a/accord-core/src/main/java/accord/messages/NoWaitRequest.java
+++ b/accord-core/src/main/java/accord/messages/NoWaitRequest.java
@@ -124,6 +124,7 @@ public abstract class NoWaitRequest<P extends 
Participants<?>, R extends Reply>
 
     protected void acceptInternal(R reply, Throwable failure)
     {
+        Invariants.require(reply != null || failure != null, "No reply 
produced for %s", this);
         if (failure != null || reply.isFinal())
         {
             Invariants.require(!hasSentFinalReply);
diff --git a/accord-core/src/main/java/accord/topology/ActiveEpoch.java 
b/accord-core/src/main/java/accord/topology/ActiveEpoch.java
index 6beadf49..89ef2b73 100644
--- a/accord-core/src/main/java/accord/topology/ActiveEpoch.java
+++ b/accord-core/src/main/java/accord/topology/ActiveEpoch.java
@@ -30,6 +30,7 @@ import accord.utils.SimpleBitSet;
 
 import static accord.coordinate.tracking.RequestStatus.Success;
 import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
+import static accord.primitives.Routables.Slice.Minimal;
 
 public final class ActiveEpoch
 {
@@ -165,7 +166,8 @@ public final class ActiveEpoch
         ranges = ranges.without(closed);
         if (ranges.isEmpty())
             return ranges;
-        closed = closed.union(MERGE_ADJACENT, ranges);
+        Ranges add = ranges.slice(global.ranges, Minimal);
+        closed = closed.union(MERGE_ADJACENT, add);
         Invariants.require(closed.mergeTouching() == closed);
         return ranges.without(addedRanges);
     }
@@ -177,8 +179,9 @@ public final class ActiveEpoch
         if (ranges.isEmpty())
             return ranges;
         quorumReady = quorumReady.union(MERGE_ADJACENT, ranges);
-        closed = closed.union(MERGE_ADJACENT, ranges);
-        retired = retired.union(MERGE_ADJACENT, ranges);
+        Ranges add = ranges.slice(global.ranges, Minimal);
+        closed = closed.union(MERGE_ADJACENT, add);
+        retired = retired.union(MERGE_ADJACENT, add);
         Invariants.require(closed.mergeTouching() == closed);
         Invariants.require(retired.mergeTouching() == retired);
         return ranges.without(addedRanges);
diff --git a/accord-core/src/main/java/accord/topology/ActiveEpochs.java 
b/accord-core/src/main/java/accord/topology/ActiveEpochs.java
index c5883a10..a0e16752 100644
--- a/accord-core/src/main/java/accord/topology/ActiveEpochs.java
+++ b/accord-core/src/main/java/accord/topology/ActiveEpochs.java
@@ -132,6 +132,16 @@ public final class ActiveEpochs implements 
Iterable<ActiveEpoch>
         return currentEpoch;
     }
 
+    public long maxEpoch(long minEpoch, Routables<?> keys)
+    {
+        long epoch = Math.max(epoch(), minEpoch);
+        while (!getKnown(epoch).global().ranges().containsAll(keys))
+        {
+            if (--epoch < minEpoch())
+                throw new IllegalArgumentException(keys + " not found in any 
active epoch");
+        }
+        return epoch;
+    }
     public Topology current()
     {
         return epochs.length > 0 ? epochs[0].global() : Topology.EMPTY;
@@ -606,7 +616,7 @@ public final class ActiveEpochs implements 
Iterable<ActiveEpoch>
         ActiveEpoch e = ifExists(epoch);
         if (e == null)
             return null;
-        return e.global().forKey(key);
+        return e.global().forKeyIfKnown(key);
     }
 
     public Shard forEpoch(RoutableKey key, long epoch) throws TopologyException
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index c79fb762..4f8e4103 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -348,12 +348,20 @@ public class Topology
     // TODO (low priority, efficiency): optimised HomeKey concept containing 
the Key, Shard and Topology to avoid lookups when topology hasn't changed
     public Shard forKey(RoutableKey key)
     {
-        int i = subsetOfRanges.indexOf(key);
+        int i = indexForKey(key);
         if (i < 0)
             throw illegalArgument("Range not found for " + key);
         return shards[supersetIndexes[i]];
     }
 
+    public Shard forKeyIfKnown(RoutableKey key)
+    {
+        int i = indexForKey(key);
+        if (i < 0)
+            return null;
+        return shards[supersetIndexes[i]];
+    }
+
     public int indexForKey(RoutableKey key)
     {
         return subsetOfRanges.indexOf(key);
@@ -497,6 +505,7 @@ public class Topology
                     if (abi < 0)
                         break;
 
+                    ai = (int)abi;
                     bi = (int)(abi >>> 32);
                     if (count == newSubset.length)
                         newSubset = cachedInts.resize(newSubset, count, count 
* 2);
@@ -547,6 +556,32 @@ public class Topology
         return accumulator;
     }
 
+    public <P, T> T foldlWithDefault(Routables<?> select, 
IndexedTriFunction<Shard, P, T, T> function, Shard ifNull, P param, T 
accumulator)
+    {
+        Routables<?> as = select;
+        Ranges bs = subsetOfRanges;
+        int ai = 0, amax = 0, bi = 0;
+
+        while (true)
+        {
+            long abi = as.findNextIntersection(ai, bs, bi);
+            if (abi < 0)
+                break;
+
+            int nextai = (int)(abi);
+            bi = (int)(abi >>> 32);
+
+            if (nextai > amax + 1)
+                accumulator = function.apply(null, param, accumulator, -1 - 
bi);
+            accumulator = function.apply(shards[supersetIndexes[bi]], param, 
accumulator, bi);
+            amax = nextai + 1;
+            ai = nextai;
+            ++bi;
+        }
+
+        return accumulator;
+    }
+
     public void forEachOn(Id on, IndexedConsumer<Shard> consumer)
     {
         NodeInfo info = nodeLookup.get(on.id);
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 1b4a1e89..2e56d077 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -41,6 +41,7 @@ import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.TimeService;
 import accord.primitives.Ranges;
+import accord.primitives.TxnId;
 import accord.topology.Topologies.SelectNodeOwnership;
 import accord.topology.TopologyCollector.BestFastPath;
 import accord.topology.TopologyCollector.Simple;
@@ -116,11 +117,37 @@ public class TopologyManager
     }
 
     public void onEpochClosed(Ranges ranges, long epoch)
+    {
+        onEpochClosed(ranges, epoch, null);
+    }
+
+    public void onEpochClosed(Ranges ranges, TxnId txnId)
+    {
+        onEpochClosed(ranges, txnId.epoch(), txnId);
+    }
+
+    private void onEpochClosed(Ranges ranges, long epoch, @Nullable TxnId 
txnId)
     {
         Topology topology;
         synchronized (this)
         {
-            topology = active.maybeGlobalForEpoch(epoch);
+            if (txnId == null)
+            {
+                topology = active.maybeGlobalForEpoch(epoch);
+            }
+            else
+            {
+                ActiveEpoch e = active.ifExists(epoch);
+                if (e != null)
+                {
+                    ranges = ranges.without(e.addedRanges);
+                    if (ranges.isEmpty())
+                        return;
+                }
+
+                --epoch;
+                topology = active.maybeGlobalForEpoch(epoch);
+            }
             if (epoch > active.currentEpoch)
                 ranges = pending.closed(ranges, epoch);
             ranges = active.closed(ranges, epoch);
@@ -134,10 +161,41 @@ public class TopologyManager
 
     public void onEpochRetired(Ranges ranges, long epoch)
     {
-        Topology topology;
+        onEpochRetired(ranges, epoch, null);
+    }
+
+    public void onEpochRetired(Ranges ranges, TxnId txnId)
+    {
+        onEpochRetired(ranges, txnId.epoch(), txnId);
+    }
+
+    private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId 
txnId)
+    {
+        Topology topology = null;
         synchronized (this)
         {
-            topology = active.maybeGlobalForEpoch(epoch);
+            if (txnId == null)
+            {
+                topology = active.maybeGlobalForEpoch(epoch);
+            }
+            else
+            {
+                ActiveEpoch e = active.ifExists(epoch);
+                if (e != null)
+                {
+                    ranges = ranges.without(e.addedRanges);
+                    if (ranges.isEmpty())
+                        return;
+
+                    topology = active.maybeGlobalForEpoch(epoch);
+                }
+
+                if (topology == null || notPendingRemoval(ranges, topology))
+                {
+                    --epoch;
+                    topology = active.maybeGlobalForEpoch(epoch);
+                }
+            }
             if (epoch > active.currentEpoch)
                 ranges = pending.retired(ranges, epoch);
             ranges = active.retired(ranges, epoch);
@@ -149,6 +207,15 @@ public class TopologyManager
         }
     }
 
+    private boolean notPendingRemoval(Ranges ranges, Topology topology)
+    {
+        return topology.foldlWithDefault(ranges, (shard, p, v, i) -> {
+            if (shard == null || !shard.is(Shard.Flag.PENDING_REMOVAL))
+                return Boolean.TRUE;
+            return v;
+        }, null, null, Boolean.FALSE);
+    }
+
     public synchronized void truncateTopologiesUntil(long epoch)
     {
         ActiveEpochs current = active;
diff --git a/accord-core/src/main/java/accord/topology/TopologyMismatch.java 
b/accord-core/src/main/java/accord/topology/TopologyMismatch.java
index a299a82c..b3e0f989 100644
--- a/accord-core/src/main/java/accord/topology/TopologyMismatch.java
+++ b/accord-core/src/main/java/accord/topology/TopologyMismatch.java
@@ -20,28 +20,12 @@ package accord.topology;
 
 import javax.annotation.Nullable;
 
-import accord.primitives.Ranges;
 import accord.primitives.Routables;
+import accord.primitives.Txn;
 import accord.utils.UnhandledEnum;
 
 public final class TopologyMismatch extends TopologyException
 {
-    public enum TopologyMatch
-    {
-        /**
-         * All participating topologies are expected to contain all 
participating keys.
-         * This is for user transactions, which must operate at all times on 
topologies
-         * consistent with the operation.
-         */
-        LATEST,
-
-        /**
-         * All participating keys are expected to be contained in SOME 
participating epoch.
-         * This is used for sync points which may be run after some range has 
been removed.
-         */
-        ANY
-    }
-
     private TopologyMismatch(String message)
     {
         super(message);
@@ -58,37 +42,34 @@ public final class TopologyMismatch extends 
TopologyException
         return new TopologyMismatch(getMessage(), this);
     }
 
+    private enum Mismatch { NOT_KNOWN, PENDING_REMOVAL }
+
     @Nullable
-    public static TopologyMismatch checkForMismatch(long epoch, Routables<?> 
keysOrRanges, ActiveEpochs active, TopologyMatch match) throws TopologyException
+    public static TopologyMismatch checkForMismatch(long epoch, Routables<?> 
keysOrRanges, ActiveEpochs active, Txn.Kind kind) throws TopologyException
     {
-        switch (match)
-        {
-            default: throw new UnhandledEnum(match);
-            case ANY:
-            {
-                long e = Math.min(active.currentEpoch, epoch);
-                while (e >= active.minEpoch())
-                {
-                    Ranges rs = active.getKnown(e).global.ranges();
-                    if (rs.containsAll(keysOrRanges))
-                        return null;
+        Topology topology = active.globalForEpoch(epoch);
+        Mismatch result = topology.foldlWithDefault(keysOrRanges, (shard, k, 
v, i) -> {
+            if (shard == null)
+                return Mismatch.NOT_KNOWN;
+            if (shard.is(Shard.Flag.PENDING_REMOVAL) && !k.isSyncPoint())
+                return Mismatch.PENDING_REMOVAL;
+            return v;
+        }, null, kind, null);
 
-                    keysOrRanges = keysOrRanges.without(rs);
-                    --e;
-                }
+        if (result == null)
+            return null;
 
-                String message = String.format("Txn attempted to access keys 
or ranges that are not known in any epoch (%s)", keysOrRanges);
-                return new TopologyMismatch(message);
-            }
-            case LATEST:
-            {
-                Topology topology = active.globalForEpoch(epoch);
-                if (topology.ranges().containsAll(keysOrRanges))
-                    return null;
-
-                String message = String.format("Txn attempted to access keys 
or ranges that are not known in the epoch %d (%s)", topology.epoch(), 
keysOrRanges);
-                return new TopologyMismatch(message);
-            }
+        String message;
+        switch (result)
+        {
+            default: throw new UnhandledEnum(result);
+            case PENDING_REMOVAL:
+                message = String.format("Txn attempted to access keys or 
ranges that are being removed in epoch %d (%s)", topology.epoch(), 
keysOrRanges);
+                break;
+            case NOT_KNOWN:
+                message = String.format("Txn attempted to access keys or 
ranges that are not known in the epoch %d (%s)", topology.epoch(), 
keysOrRanges);
+                break;
         }
+        return new TopologyMismatch(message);
     }
 }
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java 
b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index 98ac4da1..50501d9b 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -21,6 +21,7 @@ package accord.burn;
 import accord.api.AsyncExecutor;
 import accord.local.Node;
 import accord.primitives.Ranges;
+import accord.primitives.TxnId;
 import accord.topology.Topology;
 import accord.utils.Invariants;
 import accord.utils.MessageTask;
diff --git 
a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java 
b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
index d27d65e2..e5c836e8 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
@@ -30,8 +30,6 @@ import accord.impl.mock.MockStore;
 import accord.local.Node;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRangeRoute;
 import accord.primitives.Keys;
 import accord.primitives.Ranges;
 import accord.primitives.Txn;
@@ -63,8 +61,8 @@ public class CoordinateTransactionTest
             Node node = cluster.get(1);
             assertNotNull(node);
 
-            TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key);
             Keys keys = keys(10);
+            TxnId txnId = node.nextTxnIdWithDefaultFlags(keys, Write, Key);
             Txn txn = writeTxn(keys);
             Result result = 
getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn));
             assertEquals(MockStore.RESULT, result);
@@ -79,8 +77,8 @@ public class CoordinateTransactionTest
             Node node = cluster.get(1);
             assertNotNull(node);
 
-            TxnId txnId = node.nextTxnIdWithDefaultFlags(Read, Range);
             Ranges keys = ranges(range(1, 2));
+            TxnId txnId = node.nextTxnIdWithDefaultFlags(keys, Read, Range);
             Txn txn = writeTxn(keys);
             Result result = 
getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn));
             assertEquals(MockStore.RESULT, result);
@@ -95,10 +93,11 @@ public class CoordinateTransactionTest
             Node node = cluster.get(1);
             assertNotNull(node);
 
-            TxnId oldId1 = node.nextTxnIdWithDefaultFlags(Write, Key);
-            TxnId oldId2 = node.nextTxnIdWithDefaultFlags(Write, Key);
+            Ranges syncRanges = ranges(range(0, 1));
+            TxnId oldId1 = node.nextTxnIdWithDefaultFlags(syncRanges, Write, 
Key);
+            TxnId oldId2 = node.nextTxnIdWithDefaultFlags(syncRanges, Write, 
Key);
 
-            getUninterruptibly(CoordinateSyncPoint.exclusive(node, 
ranges(range(0, 1))));
+            getUninterruptibly(CoordinateSyncPoint.exclusive(node, 
syncRanges));
             try
             {
                 Keys keys = keys(1);
@@ -135,7 +134,7 @@ public class CoordinateTransactionTest
 
     private TxnId coordinate(Node node, long clock, Keys keys) throws Throwable
     {
-        TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key);
+        TxnId txnId = node.nextTxnIdWithDefaultFlags(keys, Write, Key);
         txnId = new TxnId(txnId.epoch(), txnId.hlc() + clock, 0, Write, Key, 
txnId.node);
         Txn txn = writeTxn(keys);
         Result result = 
getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn));
@@ -199,9 +198,9 @@ public class CoordinateTransactionTest
             Node node = cluster.get(1);
             assertNotNull(node);
 
-            TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key);
             Keys oneKey = keys(10);
             Keys twoKeys = keys(10, 20);
+            TxnId txnId = node.nextTxnIdWithDefaultFlags(twoKeys, Write, Key);
             Txn txn = new Txn.InMemory(oneKey, MockStore.read(oneKey), 
MockStore.QUERY, MockStore.update(twoKeys));
             Result result = 
getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn));
             assertEquals(MockStore.RESULT, result);
diff --git 
a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java 
b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
index 98c27375..0acc3997 100644
--- a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
+++ b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
@@ -69,7 +69,7 @@ public class TopologyChangeTest
                                               .build())
         {
             Node node1 = cluster.get(1);
-            TxnId txnId1 = node1.nextTxnIdWithDefaultFlags(Write, Key);
+            TxnId txnId1 = node1.nextTxnIdWithDefaultFlags(keys, Write, Key);
             Txn txn1 = writeTxn(keys);
             getUninterruptibly(node1.coordinate(txnId1, txn1));
 
@@ -93,7 +93,7 @@ public class TopologyChangeTest
             });
 
             Node node4 = cluster.get(4);
-            TxnId txnId2 = node4.nextTxnIdWithDefaultFlags(Write, Key);
+            TxnId txnId2 = node4.nextTxnIdWithDefaultFlags(keys, Write, Key);
             Txn txn2 = writeTxn(keys);
             getUninterruptibly(node4.coordinate(txnId2, txn2));
 
@@ -202,7 +202,7 @@ public class TopologyChangeTest
             });
 
             Node node4 = cluster.get(4);
-            TxnId epoch2txnId = node4.nextTxnIdWithDefaultFlags(Write, Key);
+            TxnId epoch2txnId = node4.nextTxnIdWithDefaultFlags(keys, Write, 
Key);
             Assertions.assertEquals(2, epoch2txnId.epoch());
 
             cluster.nodes(1, 2, 3, 4, 5).forEach(node -> 
node.topology().reportTopology(topology3));
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java 
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 266900a0..c5a54bb2 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -63,6 +63,7 @@ import accord.primitives.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
+import accord.topology.TopologyMismatch;
 import accord.topology.TopologyRetiredException;
 import accord.utils.Invariants;
 import accord.utils.RandomSource;
@@ -167,7 +168,7 @@ public class ListAgent implements InMemoryAgent, 
CoordinatorEventListener, Owner
         ownershipEventListener.onFailedBootstrap(attempt, phase, ranges, 
retry, fail, failure);
     }
 
-    private static final Set<Class<?>> expectedExceptions = new 
HashSet<>(Arrays.asList(SimulatedFault.class, 
ExecuteSyncPoint.SyncPointErased.class, CancellationException.class, 
TopologyRetiredException.class, Snapshotter.SnapshotAborted.class, 
TimeoutException.class, LogUnavailableException.class));
+    private static final Set<Class<?>> expectedExceptions = new 
HashSet<>(Arrays.asList(SimulatedFault.class, 
ExecuteSyncPoint.SyncPointErased.class, CancellationException.class, 
TopologyRetiredException.class, TopologyMismatch.class, 
Snapshotter.SnapshotAborted.class, TimeoutException.class, 
LogUnavailableException.class));
     @Override
     public void onException(Throwable t)
     {
diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java 
b/accord-core/src/test/java/accord/local/CommandsTest.java
index 0c912297..492b43de 100644
--- a/accord-core/src/test/java/accord/local/CommandsTest.java
+++ b/accord-core/src/test/java/accord/local/CommandsTest.java
@@ -95,7 +95,7 @@ class CommandsTest
                     Keys keys = 
Keys.of(Gens.lists(keyGen).unique().ofSizeBetween(1, 10).next(rs));
                     Txn txn = listWriteTxn(from, keys);
 
-                    TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, 
Routable.Domain.Key);
+                    TxnId txnId = node.nextTxnIdWithDefaultFlags(keys, Write, 
Routable.Domain.Key);
 
                     for (Node n : nodeMap.values())
                         node.topology().reportTopology(updatedTopology);
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java 
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index dc41d3e9..037325fb 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -157,9 +157,9 @@ public class ImmutableCommandTest
         CommandStoreSupport support = new CommandStoreSupport();
         Node node = createNode(ID1, support);
         CommandStore commands = node.unsafeByIndex(0);
-        TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key);
-        ((MockCluster.Clock)node.time()).increment(10);
         Keys keys = Keys.of(KEY);
+        TxnId txnId = node.nextTxnIdWithDefaultFlags(keys, Write, Key);
+        ((MockCluster.Clock)node.time()).increment(10);
         Txn txn = writeTxn(keys);
 
         {
diff --git a/accord-core/src/test/java/accord/messages/ReadDataTest.java 
b/accord-core/src/test/java/accord/messages/ReadDataTest.java
index 89533fd4..535ab41f 100644
--- a/accord-core/src/test/java/accord/messages/ReadDataTest.java
+++ b/accord-core/src/test/java/accord/messages/ReadDataTest.java
@@ -97,8 +97,8 @@ class ReadDataTest
         MessageSink sink = Mockito.mock(MessageSink.class);
         Node node = createNode(ID1, TOPOLOGY, sink, new 
MockCluster.Clock(100));
 
-        TxnId txnId = node.nextTxnIdWithDefaultFlags(Txn.Kind.Write, Key);
         Keys keys = Keys.of(IntKey.key(1), IntKey.key(43));
+        TxnId txnId = node.nextTxnIdWithDefaultFlags(keys, Txn.Kind.Write, 
Key);
 
         AsyncResults.SettableResult<Data> readResult = new 
AsyncResults.SettableResult<>();
 
diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java 
b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
index 1a3c6d42..1031ab78 100644
--- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
@@ -103,6 +103,8 @@ public class TopologyRandomizer
     private final Map<Id, Map<Long, Ranges>> bootstrapping = new HashMap<>();
     private final Map<Id, Integer> bootstrappingGeneration = new HashMap<>();
     private final ConcurrentLinkedQueue<Integer> newPrefixes = new 
ConcurrentLinkedQueue<>();
+    // TODO (required): remove this restriction, we should be able to 
replicate previously owned ranges just fine
+    private final Map<Id, Ranges> previouslyReplicated = new HashMap<>();
     private final TopologyUpdates topologyUpdates;
     private final Listener listener;
 
@@ -112,6 +114,8 @@ public class TopologyRandomizer
         this.topologyUpdates = topologyUpdates;
         this.epochs.add(Topology.EMPTY);
         this.epochs.add(initialTopology);
+        for (Id node : initialTopology.nodes())
+            previouslyReplicated.put(node, 
initialTopology.rangesForNode(node));
         this.nodeLookup = nodeLookup;
         this.listener = listener;
         for (int prefix : prefixes)
@@ -466,7 +470,8 @@ public class TopologyRandomizer
             state.shards = newShards;
             Shard[] testShards = type.apply(state, random);
             Arrays.sort(testShards, (a, b) -> a.range.compareTo(b.range));
-            if (!everyShardHasQuorumOverlaps(oldShards, testShards))
+            if (!everyShardHasQuorumOverlaps(oldShards, testShards)
+                || reassignsRanges(current, testShards, previouslyReplicated))
             {
                 ++rejectedMutations;
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to