This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new c3a62d77 All pending topology notifications must be propagated to a newly published epoch, not only the pending notification for that epoch Also Fix: - removeRedundantMissing could erroneously remove missing dependencies occurring after the new appliedBeforeIndex - Invariant failure for some propagate calls supplying 'wrong' addRoute - Disambiguate nextTxnId and nextTxnIdWithDefaultFlags - Journal.replayTopologies to return List for consistency of integration and semi-integra [...] c3a62d77 is described below commit c3a62d77ba332f4a01220709c040af86dd3acf6a Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Wed Sep 3 12:08:06 2025 +0100 All pending topology notifications must be propagated to a newly published epoch, not only the pending notification for that epoch Also Fix: - removeRedundantMissing could erroneously remove missing dependencies occurring after the new appliedBeforeIndex - Invariant failure for some propagate calls supplying 'wrong' addRoute - Disambiguate nextTxnId and nextTxnIdWithDefaultFlags - Journal.replayTopologies to return List for consistency of integration and semi-integrated burn test patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-20886 --- accord-core/src/main/java/accord/api/Journal.java | 2 +- .../accord/coordinate/CoordinateSyncPoint.java | 4 +- .../accord/impl/AbstractConfigurationService.java | 18 ++++ .../src/main/java/accord/local/Bootstrap.java | 2 +- accord-core/src/main/java/accord/local/Node.java | 66 ++++-------- .../src/main/java/accord/local/cfk/Pruning.java | 2 +- .../src/main/java/accord/local/cfk/Utils.java | 32 ++++-- .../accord/local/durability/ShardDurability.java | 2 +- .../src/main/java/accord/messages/Propagate.java | 6 +- .../src/main/java/accord/topology/Topology.java | 2 +- .../main/java/accord/topology/TopologyManager.java | 112 ++++++++++----------- .../coordinate/CoordinateTransactionTest.java | 12 +-- .../java/accord/coordinate/TopologyChangeTest.java | 6 +- .../src/test/java/accord/impl/basic/Cluster.java | 5 +- .../java/accord/impl/basic/InMemoryJournal.java | 16 +-- .../java/accord/impl/basic/LoggingJournal.java | 3 +- .../src/test/java/accord/local/CommandsTest.java | 2 +- .../java/accord/local/ImmutableCommandTest.java | 2 +- .../test/java/accord/messages/ReadDataTest.java | 2 +- .../src/main/java/accord/maelstrom/Cluster.java | 3 +- 20 files changed, 146 insertions(+), 153 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Journal.java b/accord-core/src/main/java/accord/api/Journal.java index cef95a31..cdb2144a 100644 --- a/accord-core/src/main/java/accord/api/Journal.java +++ b/accord-core/src/main/java/accord/api/Journal.java @@ -64,7 +64,7 @@ public interface Journal // TODO (required): propagate exceptions void saveCommand(int store, CommandUpdate value, Runnable onFlush); - Iterator<? extends TopologyUpdate> replayTopologies(); + List<? extends TopologyUpdate> replayTopologies(); void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush); void purge(CommandStores commandStores, EpochSupplier minEpoch); diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java index 94c5ad27..de84965c 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java @@ -105,14 +105,14 @@ public class CoordinateSyncPoint<R> extends CoordinatePreAccept<R> public static <U extends Unseekable> AsyncResult<SyncPoint<U>> coordinate(Node node, Txn.Kind kind, Unseekables<U> keysOrRanges, SyncPointAdapter<SyncPoint<U>> adapter) { Invariants.requireArgument(kind.isSyncPoint()); - TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain(), cardinality(keysOrRanges)); + TxnId txnId = node.nextTxnIdWithDefaultFlags(kind, keysOrRanges.domain(), cardinality(keysOrRanges)); return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node, txnId, keysOrRanges, adapter)).beginAsResult(); } public static <U extends Unseekable> AsyncResult<SyncPoint<U>> coordinate(Node node, Txn.Kind kind, FullRoute<U> route, SyncPointAdapter<SyncPoint<U>> adapter) { Invariants.requireArgument(kind.isSyncPoint()); - TxnId txnId = node.nextTxnId(kind, route.domain(), cardinality(route)); + TxnId txnId = node.nextTxnIdWithDefaultFlags(kind, route.domain(), cardinality(route)); return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node, txnId, route, adapter)).beginAsResult(); } diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java index ccae9ccb..d6c6257c 100644 --- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java +++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import com.google.common.annotations.VisibleForTesting; @@ -179,6 +180,15 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo return lastReceived == 0; } + @Nullable + synchronized EpochState getIfExists(long epoch) + { + if (epoch < minEpoch() || epoch > lastReceived) + return null; + + return getOrCreate(epoch); + } + synchronized EpochState getOrCreate(long epoch) { Invariants.requireArgument(!wasTruncated(epoch), "Can not re-create truncated epoch %d. Last truncated: %d", epoch, lastTruncated); @@ -352,6 +362,14 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo return epochs.topologyFor(epoch); } + protected Topology getTopologyIfExists(long epoch) + { + EpochState state = epochs.getIfExists(epoch); + if (state != null) + return state.topology; + return null; + } + @Override public abstract void fetchTopologyForEpoch(long epoch); diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java b/accord-core/src/main/java/accord/local/Bootstrap.java index af38ae7f..809c80ca 100644 --- a/accord-core/src/main/java/accord/local/Bootstrap.java +++ b/accord-core/src/main/java/accord/local/Bootstrap.java @@ -118,7 +118,7 @@ class Bootstrap return; } - globalSyncId = node.nextTxnId(ExclusiveSyncPoint, Routable.Domain.Range); + globalSyncId = node.nextTxnIdWithDefaultFlags(ExclusiveSyncPoint, Routable.Domain.Range); Invariants.requireArgument(epoch <= globalSyncId.epoch(), "Attempting to use local epoch %d which is larger than global epoch %d", epoch, globalSyncId.epoch()); if (!node.topology().hasEpoch(globalSyncId.epoch())) diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 9ab6ee52..cd452886 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -690,63 +690,36 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ messageSink.reply(replyingToNode, replyContext, send); } - public TxnId nextTxnId(Txn.Kind rw, Domain domain) + public TxnId nextTxnIdWithDefaultFlags(Txn.Kind rw, Domain domain) { - return nextTxnId(rw, domain, Any, defaultMediumPath().bit()); + return nextTxnIdWithFlags(rw, domain, Any, defaultMediumPath().bit()); } - public TxnId nextTxnId(Timestamp min, Txn.Kind rw, Domain domain) + public TxnId nextStaleTxnIdWithDefaultFlags(long minEpoch, long minHlc, Txn.Kind rw, Domain domain) { - return nextTxnId(min, rw, domain, Any, defaultMediumPath().bit()); + return nextStaleTxnIdWithFlags(minEpoch, minHlc, rw, domain, Any, defaultMediumPath().bit()); } - public TxnId nextStaleTxnId(long minEpoch, long minHlc, Txn.Kind rw, Domain domain) + public TxnId nextTxnIdWithDefaultFlags(Txn.Kind rw, Domain domain, Cardinality cardinality) { - return nextStaleTxnId(minEpoch, minHlc, rw, domain, Any, defaultMediumPath().bit()); + return nextTxnIdWithFlags(rw, domain, cardinality, defaultMediumPath().bit()); } - public TxnId nextTxnId(Txn.Kind rw, Domain domain, Cardinality cardinality) + public TxnId nextTxnIdWithDefaultFlags(long minEpoch, long minHlc, Txn.Kind rw, Domain domain, Cardinality cardinality) { - return nextTxnId(rw, domain, cardinality, defaultMediumPath().bit()); - } - - public TxnId nextTxnId(long minHlc, Txn.Kind rw, Domain domain, Cardinality cardinality) - { - return newTxnId(epoch(), uniqueNow(minHlc), rw, domain, cardinality, defaultMediumPath().bit(), id); - } - - public TxnId nextTxnId(Timestamp min, Txn.Kind rw, Domain domain, Cardinality cardinality) - { - return nextTxnId(min, rw, domain, cardinality, defaultMediumPath().bit()); - } - - public TxnId nextTxnId(Txn.Kind rw, Domain domain, int flags) - { - return nextTxnId(rw, domain, Any, flags); - } - - public TxnId nextTxnId(Timestamp min, Txn.Kind rw, Domain domain, int flags) - { - return nextTxnId(min, rw, domain, Any, flags); + return newTxnId(Math.max(minEpoch, epoch()), uniqueNow(minHlc), rw, domain, cardinality, defaultMediumPath().bit(), id); } /** * TODO (required): Make sure we cannot re-issue the same txnid on startup * TODO (required): Don't use a new epoch for the TxnId at least until we know its definition */ - public TxnId nextTxnId(Txn.Kind rw, Domain domain, Cardinality cardinality, int flags) + public TxnId nextTxnIdWithFlags(Txn.Kind rw, Domain domain, Cardinality cardinality, int flags) { return newTxnId(epoch(), uniqueNow(), rw, domain, cardinality, flags, id); } - public TxnId nextTxnId(Timestamp min, Txn.Kind rw, Domain domain, Cardinality cardinality, int flags) - { - long epoch = min == null ? epoch() : Math.max(min.epoch(), epoch()); - long hlc = uniqueNow(min == null ? 0 : min.hlc()); - return newTxnId(epoch, hlc, rw, domain, cardinality, flags, id); - } - - public TxnId nextStaleTxnId(long minEpoch, long minHlc, Txn.Kind rw, Domain domain, Cardinality cardinality, int flags) + public TxnId nextStaleTxnIdWithFlags(long minEpoch, long minHlc, Txn.Kind rw, Domain domain, Cardinality cardinality, int flags) { long epoch = Math.max(minEpoch, epoch()); long hlc = uniqueStale(minHlc); @@ -763,27 +736,32 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ } public TxnId nextTxnId(Txn txn) + { + return nextTxnId(0, 0, txn); + } + + public TxnId nextTxnId(long minEpoch, long minHlc, Txn txn) { Seekables<?, ?> keys = txn.keys(); Txn.Kind kind = txn.kind(); - return nextTxnId(keys, kind); + return nextTxnId(minEpoch, minHlc, keys, kind); } - public TxnId nextTxnId(Seekables<?, ?> keys, Txn.Kind kind) + public TxnId nextTxnId(@Nullable Timestamp min, Seekables<?, ?> keys, Txn.Kind kind) { - return nextTxnId(null, keys, kind); + return nextTxnId(min == null ? 0 : min.epoch(), min == null ? 0 : min.hlc(), keys, kind); } - public TxnId nextTxnId(@Nullable Timestamp min, Seekables<?, ?> keys, Txn.Kind kind) + public TxnId nextTxnId(long minEpoch, long minHlc, Seekables<?, ?> keys, Txn.Kind kind) { Domain domain = keys.domain(); Cardinality cardinality = cardinality(domain, keys); if (!usePrivilegedCoordinator() || (kind != Read && kind != Write)) - return nextTxnId(min, kind, domain, cardinality); + return nextTxnIdWithDefaultFlags(minEpoch, minHlc, kind, domain, cardinality); - long epoch = min == null ? epoch() : Math.max(min.epoch(), epoch()); - long hlc = uniqueNow(min == null ? 0 : min.hlc()); + long epoch = Math.max(minEpoch, epoch()); + long hlc = uniqueNow(minHlc); int flags = computeBestDefaultTxnIdFlags(keys, epoch); TxnId txnId = new TxnId(epoch, hlc, flags, kind, domain, cardinality, id); Invariants.require((txnId.lsb & (0xffff & ~TxnId.IDENTITY_FLAGS)) == 0); diff --git a/accord-core/src/main/java/accord/local/cfk/Pruning.java b/accord-core/src/main/java/accord/local/cfk/Pruning.java index 4c778f3a..00b553a9 100644 --- a/accord-core/src/main/java/accord/local/cfk/Pruning.java +++ b/accord-core/src/main/java/accord/local/cfk/Pruning.java @@ -603,7 +603,7 @@ public class Pruning TxnInfo txn = newById[i]; TxnId[] missing = txn.missing(); if (missing == NO_TXNIDS) continue; - missing = removeRedundantMissing(missing, newRedundantBefore, newById, newAppliedBeforeIndex); + missing = removeRedundantMissing(missing, newRedundantBefore, newAppliedBefore, newAppliedBeforeIndex, newById); newById[i] = txn.withMissing(missing); } } diff --git a/accord-core/src/main/java/accord/local/cfk/Utils.java b/accord-core/src/main/java/accord/local/cfk/Utils.java index bc73f262..1b711f77 100644 --- a/accord-core/src/main/java/accord/local/cfk/Utils.java +++ b/accord-core/src/main/java/accord/local/cfk/Utils.java @@ -22,7 +22,6 @@ import java.util.Arrays; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import accord.local.RedundantBefore.QuickBounds; import accord.local.cfk.CommandsForKey.TxnInfo; import accord.primitives.Timestamp; import accord.primitives.Txn; @@ -361,7 +360,7 @@ class Utils return newMissing; } - static TxnId[] removeRedundantMissing(TxnId[] missing, TxnId removeBefore, TxnInfo[] newById, int appliedBeforeIndex) + static TxnId[] removeRedundantMissing(TxnId[] missing, TxnId removeBefore, TxnId appliedBefore, int appliedBeforeIndex, TxnInfo[] newById) { if (missing == NO_TXNIDS) return NO_TXNIDS; @@ -373,20 +372,20 @@ class Utils if (j == missing.length) return NO_TXNIDS; missing = Arrays.copyOfRange(missing, j, missing.length); } + if (appliedBeforeIndex < 0) return missing; + int maybeRemovedBefore = Arrays.binarySearch(missing, appliedBefore); + if (maybeRemovedBefore < 0) maybeRemovedBefore = -1 - maybeRemovedBefore; + if (maybeRemovedBefore == 0) + return missing; + int removed = 0; + int i = 0; j = SortedArrays.binarySearch(newById, 0, appliedBeforeIndex, missing[0], TxnId::compareTo, FAST); - if (j >= 0) ++j; - else - { - ++removed; - j = -1 - j; - } - for (int i = 1 ; i < missing.length ; ++i) + while (true) { - j = SortedArrays.exponentialSearch(newById, j, appliedBeforeIndex, missing[i], TxnId::compareTo, FAST); if (j < 0) { ++removed; @@ -396,10 +395,21 @@ class Utils { missing[i - removed] = missing[i]; } + if (++i == maybeRemovedBefore) + break; + j = SortedArrays.exponentialSearch(newById, j, appliedBeforeIndex, missing[i], TxnId::compareTo, FAST); } + if (removed == 0) return missing; else if (removed == missing.length) return NO_TXNIDS; - else return Arrays.copyOf(missing, missing.length - removed); + else if (i == missing.length) return Arrays.copyOf(missing, missing.length - removed); + else + { + TxnId[] newMissing = new TxnId[missing.length - removed]; + System.arraycopy(missing, 0, newMissing, 0, i - removed); + System.arraycopy(missing, i, newMissing, i - removed, missing.length - i); + return newMissing; + } } static TxnId[] ensureOneMissing(TxnId txnId, TxnId[] oneMissing) diff --git a/accord-core/src/main/java/accord/local/durability/ShardDurability.java b/accord-core/src/main/java/accord/local/durability/ShardDurability.java index b808d46e..a4604f61 100644 --- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java +++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java @@ -369,7 +369,7 @@ public class ShardDurability } } minHlc = Math.max(minHlc, node.agent().minStaleHlc(node, activeRequest != null)); - TxnId staleId = node.nextStaleTxnId(minEpoch, minHlc, kind, Domain.Range); + TxnId staleId = node.nextStaleTxnIdWithDefaultFlags(minEpoch, minHlc, kind, Domain.Range); if (activeRequest != null) logger.info("Initiating RX requested by {} for {} with TxnId {}. Remaining: {}.", activeRequest.requestedBy, ranges, staleId, active); else logger.debug("Initiating RX for durability of {} with TxnId {}.", ranges, staleId); diff --git a/accord-core/src/main/java/accord/messages/Propagate.java b/accord-core/src/main/java/accord/messages/Propagate.java index 6a7b18c4..8d510f62 100644 --- a/accord-core/src/main/java/accord/messages/Propagate.java +++ b/accord-core/src/main/java/accord/messages/Propagate.java @@ -319,7 +319,7 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt case Invalidated: if (tracing != null) tracing.trace(safeStore.commandStore(), "Invalidating"); - Commands.commitInvalidate(safeStore, safeCommand, route); + Commands.commitInvalidate(safeStore, safeCommand, participants.route()); break; case Applied: @@ -328,13 +328,13 @@ public class Propagate implements PreLoadContext, MapReduceConsume<SafeCommandSt tracing.trace(safeStore.commandStore(), "Applying"); Invariants.require(committedExecuteAt != null); // we must use the remote executeAt, as it might have a uniqueHlc we aren't aware of at commit - confirm(Commands.apply(safeStore, safeCommand, participants, Ballot.ZERO, txnId, route, committedExecuteAt, stableDeps, partialTxn, writes, result)); + confirm(Commands.apply(safeStore, safeCommand, participants, Ballot.ZERO, txnId, participants.route(), committedExecuteAt, stableDeps, partialTxn, writes, result)); break; case Stable: if (tracing != null) tracing.trace(safeStore.commandStore(), "Committing as stable"); - confirm(Commands.commit(safeStore, safeCommand, participants, Stable, acceptedOrCommitted, txnId, route, partialTxn, executeAtIfKnown, stableDeps, null)); + confirm(Commands.commit(safeStore, safeCommand, participants, Stable, acceptedOrCommitted, txnId, participants.route(), partialTxn, executeAtIfKnown, stableDeps, null)); break; case Committed: diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index e426cd58..b89f23ee 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -276,7 +276,7 @@ public class Topology { NodeInfo info = nodeLookup.get(node.id); if (info == null) - return Topology.EMPTY; + return Topology.EMPTY.withEpoch(epoch); SortedArrayList<Id> nodeIds = new SortedArrayList<>(new Id[] { node }); return new Topology(global(), epoch, shards, ranges, removedIds, staleIds, nodeIds, nodeLookup, info.ranges, info.supersetIndexes); diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index f44825af..989ea706 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -140,7 +140,7 @@ public class TopologyManager this.self = node; this.global = Invariants.requireArgument(global, !global.isSubset()); this.local = global.forNode(node).trim(); - Invariants.requireArgument(!global().isSubset()); + Invariants.requireArgument(local.epoch == global.epoch); this.curShardSyncComplete = new BitSet(global.shards.length); if (!global().isEmpty()) this.syncTracker = new QuorumTracker(new Single(sorter, global())); @@ -383,32 +383,21 @@ public class TopologyManager public void syncComplete(Id node, long epoch) { Invariants.requireArgument(epoch > 0); + int i = indexOf(epoch); if (epoch > currentEpoch) { pending(epoch).syncComplete.add(node); + if (epochs.length == 0) + return; + i = 0; } - else + else if (i < 0) { - int i = indexOf(epoch); - if (i < 0) - return; - - EpochState.NodeSyncStatus status = epochs[i].recordSyncComplete(node); - switch (status) - { - case Complete: - i++; - for (; i < epochs.length && epochs[i].recordSyncCompleteFromFuture(); i++) {} - break; - case Untracked: - // don't have access to TopologyManager.this.node to check if the nodes match... this state should not happen unless it is the same node - case NoUpdate: - case ShardUpdate: - break; - default: - throw new UnsupportedOperationException("Unknown status " + status); - } + Invariants.require(epoch < minEpoch(), "Could not find epoch %d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch); + return; } + + recordSyncComplete(epochs, i, node); } /** @@ -417,23 +406,16 @@ public class TopologyManager */ public Ranges epochClosed(Ranges ranges, long epoch) { - if (epochs.length == 0) - return ranges; - Invariants.requireArgument(epoch > 0); - int i; + int i = indexOf(epoch); if (epoch > currentEpoch) { - Notifications notifications = pending(epoch); - notifications.closed = notifications.closed.union(MERGE_ADJACENT, ranges); + pending(epoch).closed.union(MERGE_ADJACENT, ranges); + if (epochs.length == 0) + return ranges; i = 0; } - else - { - i = indexOf(epoch); - } - - if (i == -1) + else if (i < 0) { Invariants.require(epoch < minEpoch(), "Could not find epoch %d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch); return Ranges.EMPTY; // notification came for an already truncated epoch @@ -451,25 +433,20 @@ public class TopologyManager */ public Ranges epochRetired(Ranges ranges, long epoch) { - if (epochs.length == 0) - return ranges; - - Invariants.requireArgument(epoch > 0); - int retiredIdx; + int i = indexOf(epoch); if (epoch > currentEpoch) { - Notifications notifications = pending(epoch); - notifications.retired = notifications.retired.union(MERGE_ADJACENT, ranges); - retiredIdx = 0; // record these ranges as complete for all earlier epochs as well + pending(epoch).retired.union(MERGE_ADJACENT, ranges); + if (epochs.length == 0) + return ranges; + i = 0; } - else + else if (i < 0) { - retiredIdx = indexOf(epoch); - if (retiredIdx < 0) - return Ranges.EMPTY; + Invariants.require(epoch < minEpoch(), "Could not find epoch %d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch); + return Ranges.EMPTY; // notification came for an already truncated epoch } - int i = retiredIdx; Ranges report = ranges = epochs[i++].recordRetired(ranges); while (!ranges.isEmpty() && i < epochs.length) ranges = epochs[i++].recordRetired(ranges); @@ -505,6 +482,25 @@ public class TopologyManager } } + private static void recordSyncComplete(EpochState[] epochs, int i, Id node) + { + EpochState.NodeSyncStatus status = epochs[i].recordSyncComplete(node); + switch (status) + { + case Complete: + i++; + for (; i < epochs.length && epochs[i].recordSyncCompleteFromFuture(); i++) {} + break; + case Untracked: + // don't have access to TopologyManager.this.node to check if the nodes match... this state should not happen unless it is the same node + case NoUpdate: + case ShardUpdate: + break; + default: + throw new UnsupportedOperationException("Unknown status " + status); + } + } + static class WaitingForEpoch extends AsyncResults.SettableResult<Void> { final long deadlineMicros; @@ -729,17 +725,23 @@ public class TopologyManager prev = epochs; Invariants.requireArgument(topology.epoch == prev.nextEpoch() || epochs.epochs.length == 0, "Expected topology update %d to be %d", topology.epoch, prev.nextEpoch()); - EpochState[] nextEpochs = new EpochState[prev.epochs.length + 1]; - List<Epochs.Notifications> pending = new ArrayList<>(prev.pending); - Epochs.Notifications notifications = pending.isEmpty() ? new Epochs.Notifications() : pending.remove(0); - - System.arraycopy(prev.epochs, 0, nextEpochs, 1, prev.epochs.length); Ranges prevAll = prev.epochs.length == 0 ? Ranges.EMPTY : prev.epochs[0].global.ranges; + List<Epochs.Notifications> pending = prev.pending.size() <= 1 ? new ArrayList<>() : new ArrayList<>(prev.pending.subList(1, prev.pending.size())); + + EpochState[] nextEpochs = new EpochState[prev.epochs.length + 1]; + System.arraycopy(prev.epochs, 0, nextEpochs, 1, prev.epochs.length); nextEpochs[0] = new EpochState(self, topology, sorter.get(topology), prevAll); - notifications.syncComplete.forEach(nextEpochs[0]::recordSyncComplete); - nextEpochs[0].recordClosed(notifications.closed); - nextEpochs[0].recordRetired(notifications.retired); + if (!prev.pending.isEmpty()) + { + // TODO (expected): we should invoke the same code as we do when receiving normally, to prevent divergence + prev.pending.get(0).syncComplete.forEach(id -> recordSyncComplete(nextEpochs, 0, id)); + for (Epochs.Notifications notifications : prev.pending) + { + nextEpochs[0].recordClosed(notifications.closed); + nextEpochs[0].recordRetired(notifications.retired); + } + } List<FutureEpoch> futureEpochs = new ArrayList<>(prev.futureEpochs); notifyDone = !futureEpochs.isEmpty() ? futureEpochs.remove(0) : null; @@ -939,8 +941,6 @@ public class TopologyManager for (int i = 0; i < topologies.size() && count > 0; i++, count--) { Topology topology = topologies.get(i); - Invariants.require(i > 0 || topology.epoch() == minEpoch || firstNonEmpty == topology.epoch(), - "Min epoch: %d. Range: %s", minEpoch, this); forEach.accept(topology); } } diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java index 9658b4bb..d9a52477 100644 --- a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java +++ b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java @@ -63,7 +63,7 @@ public class CoordinateTransactionTest Node node = cluster.get(1); assertNotNull(node); - TxnId txnId = node.nextTxnId(Write, Key); + TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key); Keys keys = keys(10); Txn txn = writeTxn(keys); FullKeyRoute route = keys.toRoute(keys.get(0).toUnseekable()); @@ -80,7 +80,7 @@ public class CoordinateTransactionTest Node node = cluster.get(1); assertNotNull(node); - TxnId txnId = node.nextTxnId(Read, Range); + TxnId txnId = node.nextTxnIdWithDefaultFlags(Read, Range); Ranges keys = ranges(range(1, 2)); Txn txn = writeTxn(keys); FullRangeRoute route = keys.toRoute(keys.get(0).someIntersectingRoutingKey(null)); @@ -97,8 +97,8 @@ public class CoordinateTransactionTest Node node = cluster.get(1); assertNotNull(node); - TxnId oldId1 = node.nextTxnId(Write, Key); - TxnId oldId2 = node.nextTxnId(Write, Key); + TxnId oldId1 = node.nextTxnIdWithDefaultFlags(Write, Key); + TxnId oldId2 = node.nextTxnIdWithDefaultFlags(Write, Key); getUninterruptibly(CoordinateSyncPoint.exclusive(node, ranges(range(0, 1)))); try @@ -139,7 +139,7 @@ public class CoordinateTransactionTest private TxnId coordinate(Node node, long clock, Keys keys) throws Throwable { - TxnId txnId = node.nextTxnId(Write, Key); + TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key); txnId = new TxnId(txnId.epoch(), txnId.hlc() + clock, 0, Write, Key, txnId.node); Txn txn = writeTxn(keys); Result result = getUninterruptibly(CoordinateTransaction.coordinate(node, node.computeRoute(txnId, txn.keys()), txnId, txn)); @@ -203,7 +203,7 @@ public class CoordinateTransactionTest Node node = cluster.get(1); assertNotNull(node); - TxnId txnId = node.nextTxnId(Write, Key); + TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key); Keys oneKey = keys(10); Keys twoKeys = keys(10, 20); Txn txn = new Txn.InMemory(oneKey, MockStore.read(oneKey), MockStore.QUERY, MockStore.update(twoKeys)); diff --git a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java index 8adb26a3..502d48c0 100644 --- a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java +++ b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java @@ -69,7 +69,7 @@ public class TopologyChangeTest .build()) { Node node1 = cluster.get(1); - TxnId txnId1 = node1.nextTxnId(Write, Key); + TxnId txnId1 = node1.nextTxnIdWithDefaultFlags(Write, Key); Txn txn1 = writeTxn(keys); getUninterruptibly(node1.coordinate(txnId1, txn1)); getUninterruptibly(node1.commandStores().forEach(PreLoadContext.contextFor(txnId1, "Test"), keys.toParticipants(), 1, 1, safeStore -> { @@ -91,7 +91,7 @@ public class TopologyChangeTest }); Node node4 = cluster.get(4); - TxnId txnId2 = node4.nextTxnId(Write, Key); + TxnId txnId2 = node4.nextTxnIdWithDefaultFlags(Write, Key); Txn txn2 = writeTxn(keys); getUninterruptibly(node4.coordinate(txnId2, txn2)); @@ -194,7 +194,7 @@ public class TopologyChangeTest }); Node node4 = cluster.get(4); - TxnId epoch2txnId = node4.nextTxnId(Write, Key); + TxnId epoch2txnId = node4.nextTxnIdWithDefaultFlags(Write, Key); Assertions.assertEquals(2, epoch2txnId.epoch()); cluster.configServices(1, 2, 3, 4, 5).forEach(configService -> configService.reportTopology(topology3)); diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index 09906ee0..ea65c488 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -788,11 +788,10 @@ public class Cluster // Replay journal Journal journal = journalMap.get(id); - Iterator<? extends Journal.TopologyUpdate> iter = journal.replayTopologies(); + List<? extends Journal.TopologyUpdate> list = journal.replayTopologies(); Journal.TopologyUpdate lastUpdate = null; - while (iter.hasNext()) + for (Journal.TopologyUpdate update : list) { - Journal.TopologyUpdate update = iter.next(); Invariants.require(lastUpdate == null || update.global.epoch() > lastUpdate.global.epoch()); lastUpdate = update; } diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java index a926e5cb..3b9b6945 100644 --- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java @@ -233,21 +233,9 @@ public class InMemoryJournal implements Journal } @Override - public Iterator<TopologyUpdate> replayTopologies() + public List<TopologyUpdate> replayTopologies() { - return new Iterator<>() - { - int current = 0; - public boolean hasNext() - { - return current < topologyUpdates.size(); - } - - public TopologyUpdate next() - { - return topologyUpdates.get(current++); - } - }; + return new ArrayList<>(topologyUpdates); } @Override diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java index 9ba39729..db098689 100644 --- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java @@ -25,6 +25,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.Iterator; +import java.util.List; import java.util.NavigableMap; import accord.api.Journal; @@ -107,7 +108,7 @@ public class LoggingJournal implements Journal } @Override - public Iterator<? extends TopologyUpdate> replayTopologies() + public List<? extends TopologyUpdate> replayTopologies() { log("REPLAY TOPOLOGIES\n"); return delegate.replayTopologies(); diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java b/accord-core/src/test/java/accord/local/CommandsTest.java index eab240a7..ff29ee44 100644 --- a/accord-core/src/test/java/accord/local/CommandsTest.java +++ b/accord-core/src/test/java/accord/local/CommandsTest.java @@ -96,7 +96,7 @@ class CommandsTest Keys keys = Keys.of(Gens.lists(keyGen).unique().ofSizeBetween(1, 10).next(rs)); Txn txn = listWriteTxn(from, keys); - TxnId txnId = node.nextTxnId(Write, Routable.Domain.Key); + TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Routable.Domain.Key); for (Node n : nodeMap.values()) ((TestableConfigurationService) n.configService()).reportTopology(updatedTopology); diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java index a1ddd093..69332710 100644 --- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java +++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java @@ -158,7 +158,7 @@ public class ImmutableCommandTest CommandStoreSupport support = new CommandStoreSupport(); Node node = createNode(ID1, support); CommandStore commands = node.unsafeByIndex(0); - TxnId txnId = node.nextTxnId(Write, Key); + TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key); ((MockCluster.Clock)node.time()).increment(10); Keys keys = Keys.of(KEY); Txn txn = writeTxn(keys); diff --git a/accord-core/src/test/java/accord/messages/ReadDataTest.java b/accord-core/src/test/java/accord/messages/ReadDataTest.java index a0ec3fa3..dbc462b5 100644 --- a/accord-core/src/test/java/accord/messages/ReadDataTest.java +++ b/accord-core/src/test/java/accord/messages/ReadDataTest.java @@ -97,7 +97,7 @@ class ReadDataTest MessageSink sink = Mockito.mock(MessageSink.class); Node node = createNode(ID1, TOPOLOGY, sink, new MockCluster.Clock(100)); - TxnId txnId = node.nextTxnId(Txn.Kind.Write, Key); + TxnId txnId = node.nextTxnIdWithDefaultFlags(Txn.Kind.Write, Key); Keys keys = Keys.of(IntKey.key(1), IntKey.key(43)); AsyncResults.SettableResult<Data> readResult = new AsyncResults.SettableResult<>(); diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index 582e4b94..99a6a9cd 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -29,7 +29,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -394,7 +393,7 @@ public class Cluster implements Scheduler @Override public Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new IllegalStateException("Not impelemented"); } @Override public Command.MinimalWithDeps loadMinimalWithDeps(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new IllegalStateException("Not impelemented"); } @Override public void saveCommand(int store, CommandUpdate value, Runnable onFlush) { throw new IllegalStateException("Not impelemented"); } - @Override public Iterator<TopologyUpdate> replayTopologies() { throw new IllegalStateException("Not impelemented"); } + @Override public List<TopologyUpdate> replayTopologies() { throw new IllegalStateException("Not impelemented"); } @Override public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush) { throw new IllegalStateException("Not impelemented"); } @Override public void purge(CommandStores commandStores, EpochSupplier minEpoch) { throw new IllegalStateException("Not impelemented"); } @Override public void replay(CommandStores commandStores) { throw new IllegalStateException("Not impelemented"); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org