This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c3a62d77 All pending topology notifications must be propagated to a 
newly published epoch, not only the pending notification for that epoch Also 
Fix:  - removeRedundantMissing could erroneously remove missing dependencies 
occurring after the new appliedBeforeIndex  - Invariant failure for some 
propagate calls supplying 'wrong' addRoute  - Disambiguate nextTxnId and 
nextTxnIdWithDefaultFlags  - Journal.replayTopologies to return List for 
consistency of integration and semi-integra [...]
c3a62d77 is described below

commit c3a62d77ba332f4a01220709c040af86dd3acf6a
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Wed Sep 3 12:08:06 2025 +0100

    All pending topology notifications must be propagated to a newly published 
epoch, not only the pending notification for that epoch
    Also Fix:
     - removeRedundantMissing could erroneously remove missing dependencies 
occurring after the new appliedBeforeIndex
     - Invariant failure for some propagate calls supplying 'wrong' addRoute
     - Disambiguate nextTxnId and nextTxnIdWithDefaultFlags
     - Journal.replayTopologies to return List for consistency of integration 
and semi-integrated burn test
    
    patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-20886
---
 accord-core/src/main/java/accord/api/Journal.java  |   2 +-
 .../accord/coordinate/CoordinateSyncPoint.java     |   4 +-
 .../accord/impl/AbstractConfigurationService.java  |  18 ++++
 .../src/main/java/accord/local/Bootstrap.java      |   2 +-
 accord-core/src/main/java/accord/local/Node.java   |  66 ++++--------
 .../src/main/java/accord/local/cfk/Pruning.java    |   2 +-
 .../src/main/java/accord/local/cfk/Utils.java      |  32 ++++--
 .../accord/local/durability/ShardDurability.java   |   2 +-
 .../src/main/java/accord/messages/Propagate.java   |   6 +-
 .../src/main/java/accord/topology/Topology.java    |   2 +-
 .../main/java/accord/topology/TopologyManager.java | 112 ++++++++++-----------
 .../coordinate/CoordinateTransactionTest.java      |  12 +--
 .../java/accord/coordinate/TopologyChangeTest.java |   6 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |   5 +-
 .../java/accord/impl/basic/InMemoryJournal.java    |  16 +--
 .../java/accord/impl/basic/LoggingJournal.java     |   3 +-
 .../src/test/java/accord/local/CommandsTest.java   |   2 +-
 .../java/accord/local/ImmutableCommandTest.java    |   2 +-
 .../test/java/accord/messages/ReadDataTest.java    |   2 +-
 .../src/main/java/accord/maelstrom/Cluster.java    |   3 +-
 20 files changed, 146 insertions(+), 153 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
index cef95a31..cdb2144a 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -64,7 +64,7 @@ public interface Journal
     // TODO (required): propagate exceptions
     void saveCommand(int store, CommandUpdate value, Runnable onFlush);
 
-    Iterator<? extends TopologyUpdate> replayTopologies();
+    List<? extends TopologyUpdate> replayTopologies();
     void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush);
 
     void purge(CommandStores commandStores, EpochSupplier minEpoch);
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
index 94c5ad27..de84965c 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
@@ -105,14 +105,14 @@ public class CoordinateSyncPoint<R> extends 
CoordinatePreAccept<R>
     public static <U extends Unseekable> AsyncResult<SyncPoint<U>> 
coordinate(Node node, Txn.Kind kind, Unseekables<U> keysOrRanges, 
SyncPointAdapter<SyncPoint<U>> adapter)
     {
         Invariants.requireArgument(kind.isSyncPoint());
-        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain(), 
cardinality(keysOrRanges));
+        TxnId txnId = node.nextTxnIdWithDefaultFlags(kind, 
keysOrRanges.domain(), cardinality(keysOrRanges));
         return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node, 
txnId, keysOrRanges, adapter)).beginAsResult();
     }
 
     public static <U extends Unseekable> AsyncResult<SyncPoint<U>> 
coordinate(Node node, Txn.Kind kind, FullRoute<U> route, 
SyncPointAdapter<SyncPoint<U>> adapter)
     {
         Invariants.requireArgument(kind.isSyncPoint());
-        TxnId txnId = node.nextTxnId(kind, route.domain(), cardinality(route));
+        TxnId txnId = node.nextTxnIdWithDefaultFlags(kind, route.domain(), 
cardinality(route));
         return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node, 
txnId, route, adapter)).beginAsResult();
     }
 
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java 
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index ccae9ccb..d6c6257c 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -179,6 +180,15 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             return lastReceived == 0;
         }
 
+        @Nullable
+        synchronized EpochState getIfExists(long epoch)
+        {
+            if (epoch < minEpoch() || epoch > lastReceived)
+                return null;
+
+            return getOrCreate(epoch);
+        }
+
         synchronized EpochState getOrCreate(long epoch)
         {
             Invariants.requireArgument(!wasTruncated(epoch), "Can not 
re-create truncated epoch %d. Last truncated: %d", epoch, lastTruncated);
@@ -352,6 +362,14 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         return epochs.topologyFor(epoch);
     }
 
+    protected Topology getTopologyIfExists(long epoch)
+    {
+        EpochState state = epochs.getIfExists(epoch);
+        if (state != null)
+            return state.topology;
+        return null;
+    }
+
     @Override
     public abstract void fetchTopologyForEpoch(long epoch);
 
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java 
b/accord-core/src/main/java/accord/local/Bootstrap.java
index af38ae7f..809c80ca 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -118,7 +118,7 @@ class Bootstrap
                 return;
             }
 
-            globalSyncId = node.nextTxnId(ExclusiveSyncPoint, 
Routable.Domain.Range);
+            globalSyncId = node.nextTxnIdWithDefaultFlags(ExclusiveSyncPoint, 
Routable.Domain.Range);
             Invariants.requireArgument(epoch <= globalSyncId.epoch(), 
"Attempting to use local epoch %d which is larger than global epoch %d", epoch, 
globalSyncId.epoch());
 
             if (!node.topology().hasEpoch(globalSyncId.epoch()))
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 9ab6ee52..cd452886 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -690,63 +690,36 @@ public class Node implements 
ConfigurationService.Listener, NodeCommandStoreServ
         messageSink.reply(replyingToNode, replyContext, send);
     }
 
-    public TxnId nextTxnId(Txn.Kind rw, Domain domain)
+    public TxnId nextTxnIdWithDefaultFlags(Txn.Kind rw, Domain domain)
     {
-        return nextTxnId(rw, domain, Any, defaultMediumPath().bit());
+        return nextTxnIdWithFlags(rw, domain, Any, defaultMediumPath().bit());
     }
 
-    public TxnId nextTxnId(Timestamp min, Txn.Kind rw, Domain domain)
+    public TxnId nextStaleTxnIdWithDefaultFlags(long minEpoch, long minHlc, 
Txn.Kind rw, Domain domain)
     {
-        return nextTxnId(min, rw, domain, Any, defaultMediumPath().bit());
+        return nextStaleTxnIdWithFlags(minEpoch, minHlc, rw, domain, Any, 
defaultMediumPath().bit());
     }
 
-    public TxnId nextStaleTxnId(long minEpoch, long minHlc, Txn.Kind rw, 
Domain domain)
+    public TxnId nextTxnIdWithDefaultFlags(Txn.Kind rw, Domain domain, 
Cardinality cardinality)
     {
-        return nextStaleTxnId(minEpoch, minHlc, rw, domain, Any, 
defaultMediumPath().bit());
+        return nextTxnIdWithFlags(rw, domain, cardinality, 
defaultMediumPath().bit());
     }
 
-    public TxnId nextTxnId(Txn.Kind rw, Domain domain, Cardinality cardinality)
+    public TxnId nextTxnIdWithDefaultFlags(long minEpoch, long minHlc, 
Txn.Kind rw, Domain domain, Cardinality cardinality)
     {
-        return nextTxnId(rw, domain, cardinality, defaultMediumPath().bit());
-    }
-
-    public TxnId nextTxnId(long minHlc, Txn.Kind rw, Domain domain, 
Cardinality cardinality)
-    {
-        return newTxnId(epoch(), uniqueNow(minHlc), rw, domain, cardinality, 
defaultMediumPath().bit(), id);
-    }
-
-    public TxnId nextTxnId(Timestamp min, Txn.Kind rw, Domain domain, 
Cardinality cardinality)
-    {
-        return nextTxnId(min, rw, domain, cardinality, 
defaultMediumPath().bit());
-    }
-
-    public TxnId nextTxnId(Txn.Kind rw, Domain domain, int flags)
-    {
-        return nextTxnId(rw, domain, Any, flags);
-    }
-
-    public TxnId nextTxnId(Timestamp min, Txn.Kind rw, Domain domain, int 
flags)
-    {
-        return nextTxnId(min, rw, domain, Any, flags);
+        return newTxnId(Math.max(minEpoch, epoch()), uniqueNow(minHlc), rw, 
domain, cardinality, defaultMediumPath().bit(), id);
     }
 
     /**
      * TODO (required): Make sure we cannot re-issue the same txnid on startup
      * TODO (required): Don't use a new epoch for the TxnId at least until we 
know its definition
      */
-    public TxnId nextTxnId(Txn.Kind rw, Domain domain, Cardinality 
cardinality, int flags)
+    public TxnId nextTxnIdWithFlags(Txn.Kind rw, Domain domain, Cardinality 
cardinality, int flags)
     {
         return newTxnId(epoch(), uniqueNow(), rw, domain, cardinality, flags, 
id);
     }
 
-    public TxnId nextTxnId(Timestamp min, Txn.Kind rw, Domain domain, 
Cardinality cardinality, int flags)
-    {
-        long epoch = min == null ? epoch() : Math.max(min.epoch(), epoch());
-        long hlc = uniqueNow(min == null ? 0 : min.hlc());
-        return newTxnId(epoch, hlc, rw, domain, cardinality, flags, id);
-    }
-
-    public TxnId nextStaleTxnId(long minEpoch, long minHlc, Txn.Kind rw, 
Domain domain, Cardinality cardinality, int flags)
+    public TxnId nextStaleTxnIdWithFlags(long minEpoch, long minHlc, Txn.Kind 
rw, Domain domain, Cardinality cardinality, int flags)
     {
         long epoch = Math.max(minEpoch, epoch());
         long hlc = uniqueStale(minHlc);
@@ -763,27 +736,32 @@ public class Node implements 
ConfigurationService.Listener, NodeCommandStoreServ
     }
 
     public TxnId nextTxnId(Txn txn)
+    {
+        return nextTxnId(0, 0, txn);
+    }
+
+    public TxnId nextTxnId(long minEpoch, long minHlc, Txn txn)
     {
         Seekables<?, ?> keys = txn.keys();
         Txn.Kind kind = txn.kind();
-        return nextTxnId(keys, kind);
+        return nextTxnId(minEpoch, minHlc, keys, kind);
     }
 
-    public TxnId nextTxnId(Seekables<?, ?> keys, Txn.Kind kind)
+    public TxnId nextTxnId(@Nullable Timestamp min, Seekables<?, ?> keys, 
Txn.Kind kind)
     {
-        return nextTxnId(null, keys, kind);
+        return nextTxnId(min == null ? 0 : min.epoch(), min == null ? 0 : 
min.hlc(), keys, kind);
     }
 
-    public TxnId nextTxnId(@Nullable Timestamp min, Seekables<?, ?> keys, 
Txn.Kind kind)
+    public TxnId nextTxnId(long minEpoch, long minHlc, Seekables<?, ?> keys, 
Txn.Kind kind)
     {
         Domain domain = keys.domain();
         Cardinality cardinality = cardinality(domain, keys);
 
         if (!usePrivilegedCoordinator() || (kind != Read && kind != Write))
-            return nextTxnId(min, kind, domain, cardinality);
+            return nextTxnIdWithDefaultFlags(minEpoch, minHlc, kind, domain, 
cardinality);
 
-        long epoch = min == null ? epoch() : Math.max(min.epoch(), epoch());
-        long hlc = uniqueNow(min == null ? 0 : min.hlc());
+        long epoch = Math.max(minEpoch, epoch());
+        long hlc = uniqueNow(minHlc);
         int flags = computeBestDefaultTxnIdFlags(keys, epoch);
         TxnId txnId = new TxnId(epoch, hlc, flags, kind, domain, cardinality, 
id);
         Invariants.require((txnId.lsb & (0xffff & ~TxnId.IDENTITY_FLAGS)) == 
0);
diff --git a/accord-core/src/main/java/accord/local/cfk/Pruning.java 
b/accord-core/src/main/java/accord/local/cfk/Pruning.java
index 4c778f3a..00b553a9 100644
--- a/accord-core/src/main/java/accord/local/cfk/Pruning.java
+++ b/accord-core/src/main/java/accord/local/cfk/Pruning.java
@@ -603,7 +603,7 @@ public class Pruning
                 TxnInfo txn = newById[i];
                 TxnId[] missing = txn.missing();
                 if (missing == NO_TXNIDS) continue;
-                missing = removeRedundantMissing(missing, newRedundantBefore, 
newById, newAppliedBeforeIndex);
+                missing = removeRedundantMissing(missing, newRedundantBefore, 
newAppliedBefore, newAppliedBeforeIndex, newById);
                 newById[i] = txn.withMissing(missing);
             }
         }
diff --git a/accord-core/src/main/java/accord/local/cfk/Utils.java 
b/accord-core/src/main/java/accord/local/cfk/Utils.java
index bc73f262..1b711f77 100644
--- a/accord-core/src/main/java/accord/local/cfk/Utils.java
+++ b/accord-core/src/main/java/accord/local/cfk/Utils.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import accord.local.RedundantBefore.QuickBounds;
 import accord.local.cfk.CommandsForKey.TxnInfo;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
@@ -361,7 +360,7 @@ class Utils
         return newMissing;
     }
 
-    static TxnId[] removeRedundantMissing(TxnId[] missing, TxnId removeBefore, 
TxnInfo[] newById, int appliedBeforeIndex)
+    static TxnId[] removeRedundantMissing(TxnId[] missing, TxnId removeBefore, 
TxnId appliedBefore, int appliedBeforeIndex, TxnInfo[] newById)
     {
         if (missing == NO_TXNIDS)
             return NO_TXNIDS;
@@ -373,20 +372,20 @@ class Utils
             if (j == missing.length) return NO_TXNIDS;
             missing = Arrays.copyOfRange(missing, j, missing.length);
         }
+
         if (appliedBeforeIndex < 0)
             return missing;
 
+        int maybeRemovedBefore = Arrays.binarySearch(missing, appliedBefore);
+        if (maybeRemovedBefore < 0) maybeRemovedBefore = -1 - 
maybeRemovedBefore;
+        if (maybeRemovedBefore == 0)
+            return missing;
+
         int removed = 0;
+        int i = 0;
         j = SortedArrays.binarySearch(newById, 0, appliedBeforeIndex, 
missing[0], TxnId::compareTo, FAST);
-        if (j >= 0) ++j;
-        else
-        {
-            ++removed;
-            j = -1 - j;
-        }
-        for (int i = 1 ; i < missing.length ; ++i)
+        while (true)
         {
-            j = SortedArrays.exponentialSearch(newById, j, appliedBeforeIndex, 
missing[i], TxnId::compareTo, FAST);
             if (j < 0)
             {
                 ++removed;
@@ -396,10 +395,21 @@ class Utils
             {
                 missing[i - removed] = missing[i];
             }
+            if (++i == maybeRemovedBefore)
+                break;
+            j = SortedArrays.exponentialSearch(newById, j, appliedBeforeIndex, 
missing[i], TxnId::compareTo, FAST);
         }
+
         if (removed == 0) return missing;
         else if (removed == missing.length) return NO_TXNIDS;
-        else return Arrays.copyOf(missing, missing.length - removed);
+        else if (i == missing.length) return Arrays.copyOf(missing, 
missing.length - removed);
+        else
+        {
+            TxnId[] newMissing = new TxnId[missing.length - removed];
+            System.arraycopy(missing, 0, newMissing, 0, i - removed);
+            System.arraycopy(missing, i, newMissing, i - removed, 
missing.length - i);
+            return newMissing;
+        }
     }
 
     static TxnId[] ensureOneMissing(TxnId txnId, TxnId[] oneMissing)
diff --git 
a/accord-core/src/main/java/accord/local/durability/ShardDurability.java 
b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
index b808d46e..a4604f61 100644
--- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
+++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
@@ -369,7 +369,7 @@ public class ShardDurability
                 }
             }
             minHlc = Math.max(minHlc, node.agent().minStaleHlc(node, 
activeRequest != null));
-            TxnId staleId = node.nextStaleTxnId(minEpoch, minHlc, kind, 
Domain.Range);
+            TxnId staleId = node.nextStaleTxnIdWithDefaultFlags(minEpoch, 
minHlc, kind, Domain.Range);
             if (activeRequest != null) logger.info("Initiating RX requested by 
{} for {} with TxnId {}. Remaining: {}.", activeRequest.requestedBy, ranges, 
staleId, active);
             else logger.debug("Initiating RX for durability of {} with TxnId 
{}.", ranges, staleId);
 
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java 
b/accord-core/src/main/java/accord/messages/Propagate.java
index 6a7b18c4..8d510f62 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -319,7 +319,7 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
             case Invalidated:
                 if (tracing != null)
                     tracing.trace(safeStore.commandStore(), "Invalidating");
-                Commands.commitInvalidate(safeStore, safeCommand, route);
+                Commands.commitInvalidate(safeStore, safeCommand, 
participants.route());
                 break;
 
             case Applied:
@@ -328,13 +328,13 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
                     tracing.trace(safeStore.commandStore(), "Applying");
                 Invariants.require(committedExecuteAt != null);
                 // we must use the remote executeAt, as it might have a 
uniqueHlc we aren't aware of at commit
-                confirm(Commands.apply(safeStore, safeCommand, participants, 
Ballot.ZERO, txnId, route, committedExecuteAt, stableDeps, partialTxn, writes, 
result));
+                confirm(Commands.apply(safeStore, safeCommand, participants, 
Ballot.ZERO, txnId, participants.route(), committedExecuteAt, stableDeps, 
partialTxn, writes, result));
                 break;
 
             case Stable:
                 if (tracing != null)
                     tracing.trace(safeStore.commandStore(), "Committing as 
stable");
-                confirm(Commands.commit(safeStore, safeCommand, participants, 
Stable, acceptedOrCommitted, txnId, route, partialTxn, executeAtIfKnown, 
stableDeps, null));
+                confirm(Commands.commit(safeStore, safeCommand, participants, 
Stable, acceptedOrCommitted, txnId, participants.route(), partialTxn, 
executeAtIfKnown, stableDeps, null));
                 break;
 
             case Committed:
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index e426cd58..b89f23ee 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -276,7 +276,7 @@ public class Topology
     {
         NodeInfo info = nodeLookup.get(node.id);
         if (info == null)
-            return Topology.EMPTY;
+            return Topology.EMPTY.withEpoch(epoch);
 
         SortedArrayList<Id> nodeIds = new SortedArrayList<>(new Id[] { node });
         return new Topology(global(), epoch, shards, ranges, removedIds, 
staleIds, nodeIds, nodeLookup, info.ranges, info.supersetIndexes);
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index f44825af..989ea706 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -140,7 +140,7 @@ public class TopologyManager
             this.self = node;
             this.global = Invariants.requireArgument(global, 
!global.isSubset());
             this.local = global.forNode(node).trim();
-            Invariants.requireArgument(!global().isSubset());
+            Invariants.requireArgument(local.epoch == global.epoch);
             this.curShardSyncComplete = new BitSet(global.shards.length);
             if (!global().isEmpty())
                 this.syncTracker = new QuorumTracker(new Single(sorter, 
global()));
@@ -383,32 +383,21 @@ public class TopologyManager
         public void syncComplete(Id node, long epoch)
         {
             Invariants.requireArgument(epoch > 0);
+            int i = indexOf(epoch);
             if (epoch > currentEpoch)
             {
                 pending(epoch).syncComplete.add(node);
+                if (epochs.length == 0)
+                    return;
+                i = 0;
             }
-            else
+            else if (i < 0)
             {
-                int i = indexOf(epoch);
-                if (i < 0)
-                    return;
-
-                EpochState.NodeSyncStatus status = 
epochs[i].recordSyncComplete(node);
-                switch (status)
-                {
-                    case Complete:
-                        i++;
-                        for (; i < epochs.length && 
epochs[i].recordSyncCompleteFromFuture(); i++) {}
-                        break;
-                    case Untracked:
-                        // don't have access to TopologyManager.this.node to 
check if the nodes match... this state should not happen unless it is the same 
node
-                    case NoUpdate:
-                    case ShardUpdate:
-                        break;
-                    default:
-                        throw new UnsupportedOperationException("Unknown 
status " + status);
-                }
+                Invariants.require(epoch < minEpoch(), "Could not find epoch 
%d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch);
+                return;
             }
+
+            recordSyncComplete(epochs, i, node);
         }
 
         /**
@@ -417,23 +406,16 @@ public class TopologyManager
          */
         public Ranges epochClosed(Ranges ranges, long epoch)
         {
-            if (epochs.length == 0)
-                return ranges;
-
             Invariants.requireArgument(epoch > 0);
-            int i;
+            int i = indexOf(epoch);
             if (epoch > currentEpoch)
             {
-                Notifications notifications = pending(epoch);
-                notifications.closed = 
notifications.closed.union(MERGE_ADJACENT, ranges);
+                pending(epoch).closed.union(MERGE_ADJACENT, ranges);
+                if (epochs.length == 0)
+                    return ranges;
                 i = 0;
             }
-            else
-            {
-                i = indexOf(epoch);
-            }
-
-            if (i == -1)
+            else if (i < 0)
             {
                 Invariants.require(epoch < minEpoch(), "Could not find epoch 
%d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch);
                 return Ranges.EMPTY; // notification came for an already 
truncated epoch
@@ -451,25 +433,20 @@ public class TopologyManager
          */
         public Ranges epochRetired(Ranges ranges, long epoch)
         {
-            if (epochs.length == 0)
-                return ranges;
-
-            Invariants.requireArgument(epoch > 0);
-            int retiredIdx;
+            int i = indexOf(epoch);
             if (epoch > currentEpoch)
             {
-                Notifications notifications = pending(epoch);
-                notifications.retired = 
notifications.retired.union(MERGE_ADJACENT, ranges);
-                retiredIdx = 0; // record these ranges as complete for all 
earlier epochs as well
+                pending(epoch).retired.union(MERGE_ADJACENT, ranges);
+                if (epochs.length == 0)
+                    return ranges;
+                i = 0;
             }
-            else
+            else if (i < 0)
             {
-                retiredIdx = indexOf(epoch);
-                if (retiredIdx < 0)
-                    return Ranges.EMPTY;
+                Invariants.require(epoch < minEpoch(), "Could not find epoch 
%d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch);
+                return Ranges.EMPTY; // notification came for an already 
truncated epoch
             }
 
-            int i = retiredIdx;
             Ranges report = ranges = epochs[i++].recordRetired(ranges);
             while (!ranges.isEmpty() && i < epochs.length)
                 ranges = epochs[i++].recordRetired(ranges);
@@ -505,6 +482,25 @@ public class TopologyManager
         }
     }
 
+    private static void recordSyncComplete(EpochState[] epochs, int i, Id node)
+    {
+        EpochState.NodeSyncStatus status = epochs[i].recordSyncComplete(node);
+        switch (status)
+        {
+            case Complete:
+                i++;
+                for (; i < epochs.length && 
epochs[i].recordSyncCompleteFromFuture(); i++) {}
+                break;
+            case Untracked:
+                // don't have access to TopologyManager.this.node to check if 
the nodes match... this state should not happen unless it is the same node
+            case NoUpdate:
+            case ShardUpdate:
+                break;
+            default:
+                throw new UnsupportedOperationException("Unknown status " + 
status);
+        }
+    }
+
     static class WaitingForEpoch extends AsyncResults.SettableResult<Void>
     {
         final long deadlineMicros;
@@ -729,17 +725,23 @@ public class TopologyManager
             prev = epochs;
             Invariants.requireArgument(topology.epoch == prev.nextEpoch() || 
epochs.epochs.length == 0,
                                        "Expected topology update %d to be %d", 
topology.epoch, prev.nextEpoch());
-            EpochState[] nextEpochs = new EpochState[prev.epochs.length + 1];
-            List<Epochs.Notifications> pending = new ArrayList<>(prev.pending);
-            Epochs.Notifications notifications = pending.isEmpty() ? new 
Epochs.Notifications() : pending.remove(0);
-
-            System.arraycopy(prev.epochs, 0, nextEpochs, 1, 
prev.epochs.length);
 
             Ranges prevAll = prev.epochs.length == 0 ? Ranges.EMPTY : 
prev.epochs[0].global.ranges;
+            List<Epochs.Notifications> pending = prev.pending.size() <= 1 ? 
new ArrayList<>() : new ArrayList<>(prev.pending.subList(1, 
prev.pending.size()));
+
+            EpochState[] nextEpochs = new EpochState[prev.epochs.length + 1];
+            System.arraycopy(prev.epochs, 0, nextEpochs, 1, 
prev.epochs.length);
             nextEpochs[0] = new EpochState(self, topology, 
sorter.get(topology), prevAll);
-            
notifications.syncComplete.forEach(nextEpochs[0]::recordSyncComplete);
-            nextEpochs[0].recordClosed(notifications.closed);
-            nextEpochs[0].recordRetired(notifications.retired);
+            if (!prev.pending.isEmpty())
+            {
+                // TODO (expected): we should invoke the same code as we do 
when receiving normally, to prevent divergence
+                prev.pending.get(0).syncComplete.forEach(id -> 
recordSyncComplete(nextEpochs, 0, id));
+                for (Epochs.Notifications notifications : prev.pending)
+                {
+                    nextEpochs[0].recordClosed(notifications.closed);
+                    nextEpochs[0].recordRetired(notifications.retired);
+                }
+            }
 
             List<FutureEpoch> futureEpochs = new 
ArrayList<>(prev.futureEpochs);
             notifyDone = !futureEpochs.isEmpty() ? futureEpochs.remove(0) : 
null;
@@ -939,8 +941,6 @@ public class TopologyManager
             for (int i = 0; i < topologies.size() && count > 0; i++, count--)
             {
                 Topology topology = topologies.get(i);
-                Invariants.require(i > 0 || topology.epoch() == minEpoch || 
firstNonEmpty == topology.epoch(),
-                                   "Min epoch: %d. Range: %s", minEpoch, this);
                 forEach.accept(topology);
             }
         }
diff --git 
a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java 
b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
index 9658b4bb..d9a52477 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
@@ -63,7 +63,7 @@ public class CoordinateTransactionTest
             Node node = cluster.get(1);
             assertNotNull(node);
 
-            TxnId txnId = node.nextTxnId(Write, Key);
+            TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key);
             Keys keys = keys(10);
             Txn txn = writeTxn(keys);
             FullKeyRoute route = keys.toRoute(keys.get(0).toUnseekable());
@@ -80,7 +80,7 @@ public class CoordinateTransactionTest
             Node node = cluster.get(1);
             assertNotNull(node);
 
-            TxnId txnId = node.nextTxnId(Read, Range);
+            TxnId txnId = node.nextTxnIdWithDefaultFlags(Read, Range);
             Ranges keys = ranges(range(1, 2));
             Txn txn = writeTxn(keys);
             FullRangeRoute route = 
keys.toRoute(keys.get(0).someIntersectingRoutingKey(null));
@@ -97,8 +97,8 @@ public class CoordinateTransactionTest
             Node node = cluster.get(1);
             assertNotNull(node);
 
-            TxnId oldId1 = node.nextTxnId(Write, Key);
-            TxnId oldId2 = node.nextTxnId(Write, Key);
+            TxnId oldId1 = node.nextTxnIdWithDefaultFlags(Write, Key);
+            TxnId oldId2 = node.nextTxnIdWithDefaultFlags(Write, Key);
 
             getUninterruptibly(CoordinateSyncPoint.exclusive(node, 
ranges(range(0, 1))));
             try
@@ -139,7 +139,7 @@ public class CoordinateTransactionTest
 
     private TxnId coordinate(Node node, long clock, Keys keys) throws Throwable
     {
-        TxnId txnId = node.nextTxnId(Write, Key);
+        TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key);
         txnId = new TxnId(txnId.epoch(), txnId.hlc() + clock, 0, Write, Key, 
txnId.node);
         Txn txn = writeTxn(keys);
         Result result = 
getUninterruptibly(CoordinateTransaction.coordinate(node, 
node.computeRoute(txnId, txn.keys()), txnId, txn));
@@ -203,7 +203,7 @@ public class CoordinateTransactionTest
             Node node = cluster.get(1);
             assertNotNull(node);
 
-            TxnId txnId = node.nextTxnId(Write, Key);
+            TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key);
             Keys oneKey = keys(10);
             Keys twoKeys = keys(10, 20);
             Txn txn = new Txn.InMemory(oneKey, MockStore.read(oneKey), 
MockStore.QUERY, MockStore.update(twoKeys));
diff --git 
a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java 
b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
index 8adb26a3..502d48c0 100644
--- a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
+++ b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
@@ -69,7 +69,7 @@ public class TopologyChangeTest
                                               .build())
         {
             Node node1 = cluster.get(1);
-            TxnId txnId1 = node1.nextTxnId(Write, Key);
+            TxnId txnId1 = node1.nextTxnIdWithDefaultFlags(Write, Key);
             Txn txn1 = writeTxn(keys);
             getUninterruptibly(node1.coordinate(txnId1, txn1));
             
getUninterruptibly(node1.commandStores().forEach(PreLoadContext.contextFor(txnId1,
 "Test"), keys.toParticipants(), 1, 1, safeStore -> {
@@ -91,7 +91,7 @@ public class TopologyChangeTest
             });
 
             Node node4 = cluster.get(4);
-            TxnId txnId2 = node4.nextTxnId(Write, Key);
+            TxnId txnId2 = node4.nextTxnIdWithDefaultFlags(Write, Key);
             Txn txn2 = writeTxn(keys);
             getUninterruptibly(node4.coordinate(txnId2, txn2));
 
@@ -194,7 +194,7 @@ public class TopologyChangeTest
             });
 
             Node node4 = cluster.get(4);
-            TxnId epoch2txnId = node4.nextTxnId(Write, Key);
+            TxnId epoch2txnId = node4.nextTxnIdWithDefaultFlags(Write, Key);
             Assertions.assertEquals(2, epoch2txnId.epoch());
 
             cluster.configServices(1, 2, 3, 4, 5).forEach(configService -> 
configService.reportTopology(topology3));
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 09906ee0..ea65c488 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -788,11 +788,10 @@ public class Cluster
 
                 // Replay journal
                 Journal journal = journalMap.get(id);
-                Iterator<? extends Journal.TopologyUpdate> iter = 
journal.replayTopologies();
+                List<? extends Journal.TopologyUpdate> list = 
journal.replayTopologies();
                 Journal.TopologyUpdate lastUpdate = null;
-                while (iter.hasNext())
+                for (Journal.TopologyUpdate update : list)
                 {
-                    Journal.TopologyUpdate update = iter.next();
                     Invariants.require(lastUpdate == null || 
update.global.epoch() > lastUpdate.global.epoch());
                     lastUpdate = update;
                 }
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index a926e5cb..3b9b6945 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -233,21 +233,9 @@ public class InMemoryJournal implements Journal
     }
 
     @Override
-    public Iterator<TopologyUpdate> replayTopologies()
+    public List<TopologyUpdate> replayTopologies()
     {
-        return new Iterator<>()
-        {
-            int current = 0;
-            public boolean hasNext()
-            {
-                return current < topologyUpdates.size();
-            }
-
-            public TopologyUpdate next()
-            {
-                return topologyUpdates.get(current++);
-            }
-        };
+        return new ArrayList<>(topologyUpdates);
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index 9ba39729..db098689 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -25,6 +25,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NavigableMap;
 
 import accord.api.Journal;
@@ -107,7 +108,7 @@ public class LoggingJournal implements Journal
     }
 
     @Override
-    public Iterator<? extends TopologyUpdate> replayTopologies()
+    public List<? extends TopologyUpdate> replayTopologies()
     {
         log("REPLAY TOPOLOGIES\n");
         return delegate.replayTopologies();
diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java 
b/accord-core/src/test/java/accord/local/CommandsTest.java
index eab240a7..ff29ee44 100644
--- a/accord-core/src/test/java/accord/local/CommandsTest.java
+++ b/accord-core/src/test/java/accord/local/CommandsTest.java
@@ -96,7 +96,7 @@ class CommandsTest
                     Keys keys = 
Keys.of(Gens.lists(keyGen).unique().ofSizeBetween(1, 10).next(rs));
                     Txn txn = listWriteTxn(from, keys);
 
-                    TxnId txnId = node.nextTxnId(Write, Routable.Domain.Key);
+                    TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, 
Routable.Domain.Key);
 
                     for (Node n : nodeMap.values())
                         ((TestableConfigurationService) 
n.configService()).reportTopology(updatedTopology);
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java 
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index a1ddd093..69332710 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -158,7 +158,7 @@ public class ImmutableCommandTest
         CommandStoreSupport support = new CommandStoreSupport();
         Node node = createNode(ID1, support);
         CommandStore commands = node.unsafeByIndex(0);
-        TxnId txnId = node.nextTxnId(Write, Key);
+        TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key);
         ((MockCluster.Clock)node.time()).increment(10);
         Keys keys = Keys.of(KEY);
         Txn txn = writeTxn(keys);
diff --git a/accord-core/src/test/java/accord/messages/ReadDataTest.java 
b/accord-core/src/test/java/accord/messages/ReadDataTest.java
index a0ec3fa3..dbc462b5 100644
--- a/accord-core/src/test/java/accord/messages/ReadDataTest.java
+++ b/accord-core/src/test/java/accord/messages/ReadDataTest.java
@@ -97,7 +97,7 @@ class ReadDataTest
         MessageSink sink = Mockito.mock(MessageSink.class);
         Node node = createNode(ID1, TOPOLOGY, sink, new 
MockCluster.Clock(100));
 
-        TxnId txnId = node.nextTxnId(Txn.Kind.Write, Key);
+        TxnId txnId = node.nextTxnIdWithDefaultFlags(Txn.Kind.Write, Key);
         Keys keys = Keys.of(IntKey.key(1), IntKey.key(43));
 
         AsyncResults.SettableResult<Data> readResult = new 
AsyncResults.SettableResult<>();
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 582e4b94..99a6a9cd 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -29,7 +29,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -394,7 +393,7 @@ public class Cluster implements Scheduler
         @Override public Command.Minimal loadMinimal(int commandStoreId, TxnId 
txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { throw 
new IllegalStateException("Not impelemented"); }
         @Override public Command.MinimalWithDeps loadMinimalWithDeps(int 
store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore 
durableBefore) { throw new IllegalStateException("Not impelemented"); }
         @Override public void saveCommand(int store, CommandUpdate value, 
Runnable onFlush)  { throw new IllegalStateException("Not impelemented"); }
-        @Override public Iterator<TopologyUpdate> replayTopologies() { throw 
new IllegalStateException("Not impelemented"); }
+        @Override public List<TopologyUpdate> replayTopologies() { throw new 
IllegalStateException("Not impelemented"); }
         @Override public void saveTopology(TopologyUpdate topologyUpdate, 
Runnable onFlush)  { throw new IllegalStateException("Not impelemented"); }
         @Override public void purge(CommandStores commandStores, EpochSupplier 
minEpoch)  { throw new IllegalStateException("Not impelemented"); }
         @Override public void replay(CommandStores commandStores)  { throw new 
IllegalStateException("Not impelemented"); }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to