This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new fe306fc5 Epoch/Topology Garbage Collection fe306fc5 is described below commit fe306fc5539b40d1c9d49f9afd0ca45bb74c49d3 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Mon Feb 17 18:49:40 2025 +0100 Epoch/Topology Garbage Collection Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20347. --- .../main/java/accord/api/ConfigurationService.java | 11 +- accord-core/src/main/java/accord/api/Journal.java | 5 +- .../accord/impl/AbstractConfigurationService.java | 68 ++++-- accord-core/src/main/java/accord/local/Node.java | 26 +- .../accord/local/durability/DurabilityService.java | 12 +- .../accord/local/durability/ShardDurability.java | 18 ++ .../java/accord/primitives/AbstractRanges.java | 5 + .../src/main/java/accord/topology/Topologies.java | 2 +- .../main/java/accord/topology/TopologyManager.java | 261 ++++++++++++++++++--- .../accord/burn/BurnTestConfigurationService.java | 8 + .../accord/coordinate/CoordinateSyncPointTest.java | 4 +- .../impl/AbstractConfigurationServiceTest.java | 25 +- .../src/test/java/accord/impl/basic/Cluster.java | 4 +- .../java/accord/impl/basic/InMemoryJournal.java | 17 +- .../java/accord/impl/basic/LoggingJournal.java | 7 +- .../src/test/java/accord/impl/list/ListAgent.java | 13 +- .../test/java/accord/impl/list/ListRequest.java | 22 +- .../src/test/java/accord/impl/list/ListStore.java | 5 - .../accord/impl/mock/MockConfigurationService.java | 6 + .../java/accord/topology/TopologyManagerTest.java | 40 ++-- .../src/test/java/accord/utils/Property.java | 1 - .../src/main/java/accord/maelstrom/Cluster.java | 3 +- .../java/accord/maelstrom/SimpleConfigService.java | 5 + 23 files changed, 429 insertions(+), 139 deletions(-) diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java b/accord-core/src/main/java/accord/api/ConfigurationService.java index b8bbcbfc..90d53468 100644 --- a/accord-core/src/main/java/accord/api/ConfigurationService.java +++ b/accord-core/src/main/java/accord/api/ConfigurationService.java @@ -142,12 +142,6 @@ public interface ConfigurationService */ void onRemoteSyncComplete(Node.Id node, long epoch); - /** - * Called when the configuration service is meant to truncate it's topology data up to (but not including) - * the given epoch - */ - void truncateTopologyUntil(long epoch); - /** * Called when no new TxnId may be agreed with an epoch less than or equal to the provided one. * This means future epochs are now aware of all TxnId with this epoch or earlier that may be executed @@ -197,4 +191,9 @@ public interface ConfigurationService void reportEpochClosed(Ranges ranges, long epoch); void reportEpochRetired(Ranges ranges, long epoch); + + /** + * Called after this epoch is garbage collected / removed on the current node. + */ + void reportEpochRemoved(long epoch); } diff --git a/accord-core/src/main/java/accord/api/Journal.java b/accord-core/src/main/java/accord/api/Journal.java index ac95bda4..00e3ffb4 100644 --- a/accord-core/src/main/java/accord/api/Journal.java +++ b/accord-core/src/main/java/accord/api/Journal.java @@ -28,6 +28,7 @@ import accord.local.Command; import accord.local.CommandStores; import accord.local.DurableBefore; import accord.local.RedundantBefore; +import accord.primitives.EpochSupplier; import accord.primitives.Ranges; import accord.primitives.Timestamp; import accord.primitives.TxnId; @@ -47,10 +48,10 @@ public interface Journal // TODO (required): propagate exceptions (i.e. using OnDone instead of Runnable) void saveCommand(int store, CommandUpdate value, Runnable onFlush); - Iterator<TopologyUpdate> replayTopologies(); + Iterator<? extends TopologyUpdate> replayTopologies(); void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush); - void purge(CommandStores commandStores); + void purge(CommandStores commandStores, EpochSupplier minEpoch); void replay(CommandStores commandStores); RedundantBefore loadRedundantBefore(int store); diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java index fe8046fe..3cf7df2a 100644 --- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java +++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java @@ -69,11 +69,25 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo return epoch; } + public boolean isReady() + { + return reads != null && reads.isDone(); + } + @Override public String toString() { return "EpochState{" + epoch + '}'; } + + @VisibleForTesting + public synchronized void setReadyForTesting(Topology topology) + { + this.topology = topology; + received.setSuccess(topology); + acknowledged.setSuccess(null); + reads = AsyncResults.success(null); + } } /** @@ -96,6 +110,7 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo */ private volatile long lastReceived = 0; private volatile long lastAcknowledged = 0; + private volatile long lastTruncated = -1; protected abstract EpochState createEpochState(long epoch); @@ -132,6 +147,11 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo return epochs.size(); } + public boolean wasTruncated(long epoch) + { + return lastTruncated > 0 && lastTruncated >= epoch; + } + public boolean isEmpty() { return lastReceived == 0; @@ -139,8 +159,9 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo synchronized EpochState getOrCreate(long epoch) { + Invariants.requireArgument(!wasTruncated(epoch), "Can not re-create truncated epoch %d. Last truncated: %d", epoch, lastTruncated); Invariants.requireArgument(epoch >= 0, "Epoch must be non-negative but given %d", epoch); - Invariants.requireArgument(epoch > 0 || (lastReceived == 0 && epochs.isEmpty()), "Received epoch 0 after initialization. Last received %d, epochsf; %s", lastReceived, epochs); + Invariants.requireArgument(epoch > 0 || (lastReceived == 0 && epochs.isEmpty()), "Received epoch 0 after initialization. Last received %d, epochs; %s", lastReceived, epochs); if (epochs.isEmpty()) { EpochState state = createEpochState(epoch); @@ -237,15 +258,31 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo return getOrCreate(epoch).acknowledged; } - synchronized void truncateUntil(long epoch) + public synchronized void truncateUntil(long epoch) { Invariants.requireArgument(epoch <= maxEpoch(), "epoch %d > %d", epoch, maxEpoch()); long minEpoch = minEpoch(); - int toTrim = Ints.checkedCast(epoch - minEpoch); - if (toTrim <= 0) + int trimFrom = Ints.checkedCast(epoch - minEpoch); + + final int count = epochs.size(); + int highestReadyIdx = -1; + for (int i = trimFrom; i >= 0; i--) + { + if (epochs.get(i).isReady()) + { + highestReadyIdx = i; + break; + } + } + + // Always leave least 1 ready epoch after truncation + if (highestReadyIdx <= 0) return; - epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size())); + List<EpochState> next = new ArrayList<>(epochs.subList(highestReadyIdx, count)); + Invariants.require(next.get(0).reads.isDone()); + epochs = next; + lastTruncated = next.get(0).epoch - 1; } } @@ -294,12 +331,14 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo @Override public void acknowledgeEpoch(EpochReady ready, boolean startSync) { + if (epochs.wasTruncated(ready.epoch)) + return; + ready.metadata.addCallback(() -> epochs.acknowledge(ready)); - ready.coordinate.addCallback(() -> localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync)); + ready.coordinate.addCallback(() -> localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync)); ready.reads.addCallback(() -> localBootstrapsComplete(epochs.getOrCreate(ready.epoch).topology)); } - protected void topologyUpdatePreListenerNotify(Topology topology) {} protected void topologyUpdatePostListenerNotify(Topology topology) {} public void reportTopology(Topology topology, boolean isLoad, boolean startSync) @@ -332,7 +371,6 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo } epochs.receive(topology); - topologyUpdatePreListenerNotify(topology); for (Listener listener : listeners) listener.onTopologyUpdate(topology, isLoad, startSync); topologyUpdatePostListenerNotify(topology); @@ -364,18 +402,6 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo listener.onEpochRetired(ranges, epoch); } - protected void truncateTopologiesPreListenerNotify(long epoch) {} - protected void truncateTopologiesPostListenerNotify(long epoch) {} - - public void truncateTopologiesUntil(long epoch) - { - truncateTopologiesPreListenerNotify(epoch); - for (Listener listener : listeners) - listener.truncateTopologyUntil(epoch); - truncateTopologiesPostListenerNotify(epoch); - epochs.truncateUntil(epoch); - } - // synchronized because state.reads is written public AsyncChain<Void> epochReady(long epoch) { @@ -419,4 +445,4 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo return new EpochHistory(); } } -} +} \ No newline at end of file diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 958a23c6..446633c5 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -335,7 +335,8 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ if (this.topology.isEmpty()) return bootstrap.get(); return orderFastPathReporting(this.topology.epochReady(topology.epoch() - 1), bootstrap.get()); }; - return this.topology.onTopologyUpdate(topology, orderFastPathReporting); + + return this.topology.onTopologyUpdate(topology, orderFastPathReporting, configService::reportEpochRemoved); } private static EpochReady orderFastPathReporting(EpochReady previous, EpochReady next) @@ -367,12 +368,6 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ topology.onEpochSyncComplete(node, epoch); } - @Override - public void truncateTopologyUntil(long epoch) - { - topology.truncateTopologyUntil(epoch); - } - @Override public void onEpochClosed(Ranges ranges, long epoch) { @@ -383,6 +378,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ public void onEpochRetired(Ranges ranges, long epoch) { topology.onEpochRetired(ranges, epoch); + durabilityService.onEpochRetired(ranges, epoch); } // TODO (required): audit error handling, as the refactor to provide epoch timeouts appears to have broken a number of coordination @@ -397,6 +393,12 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ public void withEpoch(long epoch, BiConsumer<Void, Throwable> callback) { + if (epoch < topology.minEpoch()) + { + callback.accept(null, new TopologyManager.TopologyRetiredException(epoch, topology.minEpoch())); + return; + } + if (topology.hasAtLeastEpoch(epoch)) { callback.accept(null, null); @@ -410,7 +412,9 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ public void withEpoch(long epoch, BiConsumer<?, ? super Throwable> ifFailure, Runnable ifSuccess) { - if (topology.hasEpoch(epoch)) + if (epoch < topology.minEpoch()) + throw new TopologyManager.TopologyRetiredException(epoch, topology.minEpoch()); + if (topology.hasAtLeastEpoch(epoch)) { ifSuccess.run(); } @@ -426,6 +430,8 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ public void withEpoch(long epoch, BiConsumer<?, Throwable> ifFailure, Function<Throwable, Throwable> onFailure, Runnable ifSuccess) { + if (epoch < topology.minEpoch()) + throw new TopologyManager.TopologyRetiredException(epoch, topology.minEpoch()); if (topology.hasEpoch(epoch)) { ifSuccess.run(); @@ -443,6 +449,8 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ @Inline public <T> AsyncChain<T> withEpoch(long epoch, Supplier<? extends AsyncChain<T>> supplier) { + if (epoch < topology.minEpoch()) + throw new TopologyManager.TopologyRetiredException(epoch, topology.minEpoch()); if (topology.hasEpoch(epoch)) { return supplier.get(); @@ -932,4 +940,4 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ { return time; } -} +} \ No newline at end of file diff --git a/accord-core/src/main/java/accord/local/durability/DurabilityService.java b/accord-core/src/main/java/accord/local/durability/DurabilityService.java index e497b7ec..4502e084 100644 --- a/accord-core/src/main/java/accord/local/durability/DurabilityService.java +++ b/accord-core/src/main/java/accord/local/durability/DurabilityService.java @@ -177,18 +177,18 @@ public class DurabilityService implements ConfigurationService.Listener { } - @Override - public void truncateTopologyUntil(long epoch) - { - } - @Override public void onEpochClosed(Ranges ranges, long epoch) { } @Override - public void onEpochRetired(Ranges ranges, long epoch) + public void onEpochRetired(Ranges retiredRanges, long epoch) { + // No need to cancel work for ranges that are still active + if (!node.topology().isFullyRetired(retiredRanges)) + return; + + shards.retireRanges(retiredRanges, epoch); } } \ No newline at end of file diff --git a/accord-core/src/main/java/accord/local/durability/ShardDurability.java b/accord-core/src/main/java/accord/local/durability/ShardDurability.java index 0bfa81c6..4c17b8c4 100644 --- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java +++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java @@ -496,6 +496,24 @@ public class ShardDurability } } + synchronized void retireRanges(Ranges retiredRanges, long epoch) + { + Map<Range, ShardScheduler> prev = new HashMap<>(this.shardSchedulers); + this.shardSchedulers.clear(); + + for (Map.Entry<Range, ShardScheduler> e : prev.entrySet()) + { + if (retiredRanges.contains(e.getKey())) + { + logger.info("Cancelling durability scheduling for {}, since it was retired in epoch {}", + e.getKey(), epoch); + e.getValue().markDefunct(); + } + else + this.shardSchedulers.put(e.getKey(), e.getValue()); + } + } + synchronized void updateTopology(Topology latestGlobal) { if (latestGlobal.epoch() <= latestEpoch) diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java b/accord-core/src/main/java/accord/primitives/AbstractRanges.java index 6b483ca3..dc19ff20 100644 --- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java +++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java @@ -127,6 +127,11 @@ public abstract class AbstractRanges implements Iterable<Range>, Routables<Range return ((int) supersetLinearMerge(this.ranges, that.ranges)) == that.size(); } + public boolean contains(Range range) + { + return indexOf(range, FAST) >= 0; + } + @Override public int size() { diff --git a/accord-core/src/main/java/accord/topology/Topologies.java b/accord-core/src/main/java/accord/topology/Topologies.java index 3a5dc838..a727acf3 100644 --- a/accord-core/src/main/java/accord/topology/Topologies.java +++ b/accord-core/src/main/java/accord/topology/Topologies.java @@ -573,7 +573,7 @@ public interface Topologies extends TopologySorter public Builder(int initialCapacity) { - buffer = ArrayBuffers.cachedAny().get(4); + buffer = ArrayBuffers.cachedAny().get(initialCapacity); } public void add(Topology topology) diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 5e2ce80a..4c22806a 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -20,18 +20,24 @@ package accord.topology; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.LongConsumer; import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import accord.api.Agent; import accord.api.ConfigurationService; @@ -40,6 +46,7 @@ import accord.api.ProtocolModifiers.QuorumEpochIntersections.Include; import accord.api.Timeouts; import accord.api.Timeouts.RegisteredTimeout; import accord.api.TopologySorter; +import accord.api.VisibleForImplementation; import accord.coordinate.EpochTimeout; import accord.coordinate.tracking.QuorumTracker; import accord.local.CommandStore; @@ -65,12 +72,13 @@ import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Owne import static accord.api.ProtocolModifiers.QuorumEpochIntersections.Include.Unsynced; import static accord.coordinate.tracking.RequestStatus.Success; import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; -import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithoutDeps; import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithDeps; +import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithoutDeps; import static accord.primitives.TxnId.FastPath.Unoptimised; import static accord.utils.Invariants.illegalState; import static accord.utils.Invariants.nonNull; import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.stream.Collectors.joining; /** * Manages topology state changes and update bookkeeping @@ -87,6 +95,7 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS; */ public class TopologyManager { + private static final Logger logger = LoggerFactory.getLogger(TopologyManager.class); private static final FutureEpoch SUCCESS; static @@ -110,6 +119,21 @@ public class TopologyManager @GuardedBy("TopologyManager.this") Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY; + private volatile boolean allRetired; + + public boolean allRetired() + { + if (allRetired) + return true; + + if (!retired.containsAll(global.ranges)) + return false; + + Invariants.require(closed.containsAll(global.ranges)); + allRetired = true; + return true; + } + EpochState(Id node, Topology global, TopologySorter sorter, Ranges prevRanges) { this.self = node; @@ -117,7 +141,7 @@ public class TopologyManager this.local = global.forNode(node).trim(); Invariants.requireArgument(!global().isSubset()); this.curShardSyncComplete = new BitSet(global.shards.length); - if (global().size() > 0) + if (!global().isEmpty()) this.syncTracker = new QuorumTracker(new Single(sorter, global())); else this.syncTracker = null; @@ -234,8 +258,10 @@ public class TopologyManager Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY; } - private static final Epochs EMPTY = new Epochs(new EpochState[0]); + private static final Epochs EMPTY = new Epochs(new EpochState[0], Collections.emptyList(), Collections.emptyList(), -1); private final long currentEpoch; + private final long firstNonEmptyEpoch; + // Epochs are sorted in _descending_ order private final EpochState[] epochs; // nodes we've received sync complete notifications from, for epochs we do not yet have topologies for. // Pending sync notifications are indexed by epoch, with the current epoch as index[0], and future epochs @@ -249,9 +275,16 @@ public class TopologyManager // NOTE: this is NOT copy-on-write. This is mutated in place! private final List<FutureEpoch> futureEpochs; - private Epochs(EpochState[] epochs, List<Notifications> pending, List<FutureEpoch> futureEpochs) + private Epochs(EpochState[] epochs, List<Notifications> pending, List<FutureEpoch> futureEpochs, long prevFirstNonEmptyEpoch) { this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0; + if (prevFirstNonEmptyEpoch != -1) + this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; + else if (epochs.length > 0 && !epochs[0].global().isEmpty()) + this.firstNonEmptyEpoch = currentEpoch; + else + this.firstNonEmptyEpoch = prevFirstNonEmptyEpoch; + this.pending = pending; this.futureEpochs = futureEpochs; if (!futureEpochs.isEmpty()) @@ -261,12 +294,45 @@ public class TopologyManager Invariants.requireArgument(futureEpochs.get(i).epoch == futureEpochs.get(i - 1).epoch + 1); for (int i = 1; i < epochs.length; i++) Invariants.requireArgument(epochs[i].epoch() == epochs[i - 1].epoch() - 1); - this.epochs = epochs; - } + int truncateFrom = -1; + // > 0 because we do not want to be left without epochs in case they're all empty + for (int i = epochs.length - 1; i > 0; i--) + { + EpochState epochState = epochs[i]; + if (epochState.allRetired() && + (truncateFrom == -1 || truncateFrom == i + 1)) + { + Invariants.require(epochs[i].syncComplete()); + truncateFrom = i; + } + } - private Epochs(EpochState[] epochs) - { - this(epochs, new ArrayList<>(), new ArrayList<>()); + if (truncateFrom == -1) + { + this.epochs = epochs; + } + else + { + this.epochs = Arrays.copyOf(epochs, truncateFrom); + if (logger.isDebugEnabled()) + { + for (int i = truncateFrom; i < epochs.length; i++) + { + EpochState state = epochs[i]; + Invariants.require(epochs[i].syncComplete()); + logger.debug("Retired epoch {} with added/removed ranges {}/{}. Topology: {}. Closed: {}", state.epoch(), state.addedRanges, state.removedRanges, state.global.ranges, state.closed); + } + } + if (logger.isTraceEnabled()) + { + for (int i = 0; i < truncateFrom; i++) + { + EpochState state = epochs[i]; + Invariants.require(state.syncComplete()); + logger.trace("Leaving epoch {} with added/removed ranges {}/{}", state.epoch(), state.addedRanges, state.removedRanges); + } + } + } } private FutureEpoch awaitEpoch(long epoch, TopologyManager manager) @@ -365,30 +431,38 @@ public class TopologyManager { i = indexOf(epoch); } + + if (i == -1) + { + Invariants.require(epoch < minEpoch(), "Could not find epoch %d. Min: %d, current: %d", epoch, minEpoch(), currentEpoch); + return; // notification came for an already truncated epoch + } while (epochs[i].recordClosed(ranges) && ++i < epochs.length) {} } /** - * Mark the epoch as "redundant" for the provided ranges; this means that all transactions that can be + * Mark the epoch as "retired" for the provided ranges; this means that all transactions that can be * proposed for this epoch have now been executed globally. */ public void epochRetired(Ranges ranges, long epoch) { Invariants.requireArgument(epoch > 0); - int i; + int retiredIdx; if (epoch > currentEpoch) { Notifications notifications = pending(epoch); notifications.retired = notifications.retired.union(MERGE_ADJACENT, ranges); - i = 0; // record these ranges as complete for all earlier epochs as well + retiredIdx = 0; // record these ranges as complete for all earlier epochs as well } else { - i = indexOf(epoch); - if (i < 0) + retiredIdx = indexOf(epoch); + if (retiredIdx < 0) return; } - while (epochs[i].recordRetired(ranges) && ++i < epochs.length) {} + + for (int i = retiredIdx; i < epochs.length; i++) + epochs[i].recordRetired(ranges); } private Notifications pending(long epoch) @@ -633,33 +707,39 @@ public class TopologyManager return new EpochsSnapshot(builder.build()); } - public EpochReady onTopologyUpdate(Topology topology, Supplier<EpochReady> bootstrap) + public EpochReady onTopologyUpdate(Topology topology, Supplier<EpochReady> bootstrap, LongConsumer truncate) { FutureEpoch notifyDone; EpochReady ready; + Epochs prev; + Epochs next; synchronized (this) { - Epochs current = epochs; - Invariants.requireArgument(topology.epoch == current.nextEpoch() || epochs == Epochs.EMPTY, - "Expected topology update %d to be %d", topology.epoch, current.nextEpoch()); - EpochState[] nextEpochs = new EpochState[current.epochs.length + 1]; - List<Epochs.Notifications> pending = new ArrayList<>(current.pending); + prev = epochs; + Invariants.requireArgument(topology.epoch == prev.nextEpoch() || epochs == Epochs.EMPTY, + "Expected topology update %d to be %d", topology.epoch, prev.nextEpoch()); + EpochState[] nextEpochs = new EpochState[prev.epochs.length + 1]; + List<Epochs.Notifications> pending = new ArrayList<>(prev.pending); Epochs.Notifications notifications = pending.isEmpty() ? new Epochs.Notifications() : pending.remove(0); - System.arraycopy(current.epochs, 0, nextEpochs, 1, current.epochs.length); + System.arraycopy(prev.epochs, 0, nextEpochs, 1, prev.epochs.length); - Ranges prevAll = current.epochs.length == 0 ? Ranges.EMPTY : current.epochs[0].global.ranges; + Ranges prevAll = prev.epochs.length == 0 ? Ranges.EMPTY : prev.epochs[0].global.ranges; nextEpochs[0] = new EpochState(self, topology, sorter.get(topology), prevAll); notifications.syncComplete.forEach(nextEpochs[0]::recordSyncComplete); nextEpochs[0].recordClosed(notifications.closed); nextEpochs[0].recordRetired(notifications.retired); - List<FutureEpoch> futureEpochs = new ArrayList<>(current.futureEpochs); + List<FutureEpoch> futureEpochs = new ArrayList<>(prev.futureEpochs); notifyDone = !futureEpochs.isEmpty() ? futureEpochs.remove(0) : null; - epochs = new Epochs(nextEpochs, pending, futureEpochs); + next = new Epochs(nextEpochs, pending, futureEpochs, prev.firstNonEmptyEpoch); + epochs = next; ready = nextEpochs[0].ready = bootstrap.get(); } + if (next.minEpoch() != prev.minEpoch()) + truncate.accept(epochs.minEpoch()); + if (notifyDone != null) notifyDone.setDone(); @@ -709,7 +789,7 @@ public class TopologyManager return epochs.get(epoch).synced; } - public synchronized void truncateTopologyUntil(long epoch) + public synchronized void truncateTopologiesUntil(long epoch) { Epochs current = epochs; Invariants.requireArgument(current.epoch() >= epoch, "Unable to truncate; epoch %d is > current epoch %d", epoch , current.epoch()); @@ -722,7 +802,7 @@ public class TopologyManager EpochState[] nextEpochs = new EpochState[newLen]; System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen); - epochs = new Epochs(nextEpochs, current.pending, current.futureEpochs); + epochs = new Epochs(nextEpochs, current.pending, current.futureEpochs, current.firstNonEmptyEpoch); } public synchronized void onEpochClosed(Ranges ranges, long epoch) @@ -730,6 +810,28 @@ public class TopologyManager epochs.epochClosed(ranges, epoch); } + /** + * If ranges were added in epoch X, and are _not_ present in the current epoch, they + * are purged and durability scheduling for them should be cancelled. + */ + public synchronized boolean isFullyRetired(Ranges ranges) + { + Epochs epochs = this.epochs; + EpochState current = epochs.get(epochs.currentEpoch); + if (!current.addedRanges.containsAll(ranges)) + return false; + + long minEpoch = epochs.minEpoch(); + for (long i = minEpoch; i < epochs.currentEpoch; i++) + { + EpochState retiredIn = epochs.get(i); + if (retiredIn.allRetired() && retiredIn.addedRanges.containsAll(ranges)) + return true; + } + + return false; + } + public synchronized void onEpochRetired(Ranges ranges, long epoch) { epochs.epochRetired(ranges, epoch); @@ -760,8 +862,16 @@ public class TopologyManager return current().epoch; } + // TODO (desired): add tests for epoch GC and tracking + @VisibleForImplementation + public long firstNonEmpty() + { + return epochs.firstNonEmptyEpoch; + } + public long minEpoch() { + Epochs epochs = this.epochs; return epochs.minEpoch(); } @@ -771,6 +881,71 @@ public class TopologyManager return epochs.get(epoch); } + /** + * Fetch topologies between {@param minEpoch} (inclusive), and {@param maxEpoch} (inclusive). + */ + public TopologyRange between(long minEpoch, long maxEpoch) + { + Epochs epochs = this.epochs; + // No epochs known to Accord + if (epochs.firstNonEmptyEpoch == -1) + return new TopologyRange(epochs.minEpoch(), epochs.currentEpoch, epochs.firstNonEmptyEpoch, Collections.emptyList()); + + minEpoch = Math.max(minEpoch, epochs.minEpoch()); + int diff = Math.toIntExact(epochs.currentEpoch - minEpoch + 1); + List<Topology> topologies = new ArrayList<>(diff); + for (int i = 0; epochs.minEpoch() + i <= maxEpoch && i < diff; i++) + topologies.add(epochs.get(minEpoch + i).global); + + return new TopologyRange(epochs.minEpoch(), epochs.currentEpoch, epochs.firstNonEmptyEpoch, topologies); + } + + public static class TopologyRange + { + public final long min; + public final long current; + public final long firstNonEmpty; + public final List<Topology> topologies; + + public TopologyRange(long min, long current, long firstNonEmpty, List<Topology> topologies) + { + this.min = min; + this.current = current; + this.topologies = topologies; + this.firstNonEmpty = firstNonEmpty; + } + + public void forEach(Consumer<Topology> forEach, long minEpoch, int count) + { + if (minEpoch == 0) // Bootstrap + minEpoch = this.min; + + long emptyUpTo = firstNonEmpty == -1 ? current : firstNonEmpty - 1; + // Report empty epochs + for (long epoch = minEpoch; epoch <= emptyUpTo && count > 0; epoch++, count--) + forEach.accept(new Topology(epoch)); + + // Report known non-empty epochs + for (int i = 0; i < topologies.size() && count > 0; i++, count--) + { + Topology topology = topologies.get(i); + Invariants.require(i > 0 || topology.epoch() == minEpoch || firstNonEmpty == topology.epoch(), + "Min epoch: %d. Range: %s", minEpoch, this); + forEach.accept(topology); + } + } + + @Override + public String toString() + { + return String.format("TopologyRange{min=%d, current=%d, firstNonEmpty=%d, topologies=[%s]}", + min, + current, + firstNonEmpty, + topologies.stream().map(t -> Long.toString(t.epoch())).collect(joining(","))); + } + } + public Topologies preciseEpochs(long epoch) { return new Single(sorter, epochs.get(epoch).global); @@ -864,9 +1039,8 @@ public class TopologyManager return atLeast(select, minEpoch, maxEpoch, isSufficientFor, collector); } - private <C, K extends Routables<?>, T> T atLeast(K select, long minEpoch, long maxEpoch, Function<EpochState, Ranges> isSufficientFor, - Collectors<C, K, T> collectors) + Collectors<C, K, T> collectors) throws IllegalArgumentException { Invariants.requireArgument(minEpoch <= maxEpoch); Epochs snapshot = epochs; @@ -898,8 +1072,11 @@ public class TopologyManager if (i == snapshot.epochs.length) { - if (!select.isEmpty()) - throw new IllegalArgumentException("Ranges " + select + " could not be found"); + // Epochs earlier than minEpoch might have been GC'd, so we can not collect + // matching ranges for them. However, if ranges were still present in the min epoch, + // we have reported them. + if (!select.isEmpty() && !select.without(snapshot.get(minEpoch).global.ranges).isEmpty()) + throw Invariants.illegalArgument("Ranges %s could not be found", select); return collectors.multi(collector); } @@ -922,11 +1099,15 @@ public class TopologyManager collector = collectors.update(collector, next, select, false); prev = next; } while (i < snapshot.epochs.length); - // needd to remove sufficent / added else remaining may not be empty when the final matches are the last epoch + // need to remove sufficient / added else remaining may not be empty when the final matches are the last epoch remaining = remaining.without(isSufficientFor.apply(prev)); remaining = remaining.without(prev.addedRanges); - if (!remaining.isEmpty()) throw new IllegalArgumentException("Ranges " + remaining + " could not be found"); + // Epochs earlier than minEpoch might have been GC'd, so we can not collect + // matching ranges for them. However, if ranges were still present in the min epoch, + // we have reported them. + if (!remaining.isEmpty() && !select.without(snapshot.get(minEpoch).global.ranges).isEmpty()) + Invariants.illegalArgument("Ranges %s could not be found", remaining); return collectors.multi(collector); } @@ -1038,8 +1219,8 @@ public class TopologyManager public Topologies preciseEpochs(Unseekables<?> select, long minEpoch, long maxEpoch, SelectNodeOwnership selectNodeOwnership, SelectFunction selectFunction) { Epochs snapshot = epochs; - EpochState maxState = snapshot.get(maxEpoch); + Invariants.require(maxState != null, "Unable to find epoch %d; known epochs are %d -> %d", maxEpoch, snapshot.minEpoch(), snapshot.currentEpoch); if (minEpoch == maxEpoch) return new Single(sorter, selectFunction.apply(snapshot.get(minEpoch).global, select, selectNodeOwnership)); @@ -1091,6 +1272,8 @@ public class TopologyManager public Topology localForEpoch(long epoch) { + if (epoch < minEpoch()) + throw new TopologyRetiredException(epoch, minEpoch()); EpochState epochState = epochs.get(epoch); if (epochState == null) throw illegalState("Unknown epoch " + epoch); @@ -1099,6 +1282,8 @@ public class TopologyManager public Ranges localRangesForEpoch(long epoch) { + if (epoch < minEpoch()) + throw new TopologyRetiredException(epoch, minEpoch()); return epochs.get(epoch).local().rangesForNode(self); } @@ -1299,4 +1484,12 @@ public class TopologyManager T one(EpochState epoch, K select, boolean permitMissing); T multi(C collector); } + + public static class TopologyRetiredException extends RuntimeException + { + public TopologyRetiredException(long epoch, long minEpoch) + { + super(String.format("Topology %s retired. Min topology %d", epoch, minEpoch)); + } + } } diff --git a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java index 3495c0f9..ce6f18e1 100644 --- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java +++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java @@ -139,7 +139,10 @@ public class BurnTestConfigurationService extends AbstractConfigurationService.M public void onSuccess(Node.Id from, FetchTopologyReply reply) { if (reply.topology != null) + { reportTopology(reply.topology); + pendingEpochs.remove(reply.topology.epoch()); + } else sendNext(); } @@ -224,4 +227,9 @@ public class BurnTestConfigurationService extends AbstractConfigurationService.M if (topology != null) topologyUpdates.epochRetired(lookup.apply(localId), topology.nodes(), ranges, epoch); } + + @Override + public void reportEpochRemoved(long epoch) + { + } } diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java index c7f727d2..50578cd5 100644 --- a/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java +++ b/accord-core/src/test/java/accord/coordinate/CoordinateSyncPointTest.java @@ -70,7 +70,7 @@ class CoordinateSyncPointTest Utils.shard(removed, new SortedArrayList<>(new Node.Id[] { N2 }))); Node n1 = Utils.createNode(N1, t1, happyPathMessaging(), new MockCluster.Clock(0), new TestAgent.RethrowAgent()); - n1.topology().onTopologyUpdate(t2, () -> null); + n1.topology().onTopologyUpdate(t2, () -> null, e -> {}); for (Node.Id node : ALL) n1.topology().onEpochSyncComplete(node, t1.epoch()); @@ -86,7 +86,7 @@ class CoordinateSyncPointTest Utils.shard(IntKey.range(0, 10), ALL)); Node n1 = Utils.createNode(N1, t1, happyPathMessaging(), new MockCluster.Clock(0), new TestAgent.RethrowAgent()); - n1.topology().onTopologyUpdate(t2, () -> null); + n1.topology().onTopologyUpdate(t2, () -> null, e -> {}); for (Node.Id node : ALL) n1.topology().onEpochSyncComplete(node, t1.epoch()); diff --git a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java index 6402d071..a26c2181 100644 --- a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java +++ b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java @@ -78,13 +78,6 @@ public class AbstractConfigurationServiceTest throw new AssertionError(String.format("Recieved multiple syncs for epoch %s from %s", epoch, node)); } - @Override - public void truncateTopologyUntil(long epoch) - { - if (!truncates.add(epoch)) - throw new AssertionError(String.format("Recieved multiple truncates for epoch", epoch)); - } - @Override public void onEpochClosed(Ranges ranges, long epoch) { @@ -170,6 +163,12 @@ public class AbstractConfigurationServiceTest public void reportEpochRetired(Ranges ranges, long epoch) { } + + @Override + public void reportEpochRemoved(long epoch) + { + + } } private static final Id ID1 = new Id(1); @@ -313,12 +312,12 @@ public class AbstractConfigurationServiceTest EpochHistory history = new EpochHistory(); Assertions.assertEquals(0, history.size()); - history.getOrCreate(1); - history.getOrCreate(2); - history.getOrCreate(3); - history.getOrCreate(4); - history.getOrCreate(5); - history.getOrCreate(6); + history.getOrCreate(1).setReadyForTesting(new Topology(1)); + history.getOrCreate(2).setReadyForTesting(new Topology(2)); + history.getOrCreate(3).setReadyForTesting(new Topology(3)); + history.getOrCreate(4).setReadyForTesting(new Topology(4)); + history.getOrCreate(5).setReadyForTesting(new Topology(5)); + history.getOrCreate(6).setReadyForTesting(new Topology(6)); assertHistoryEpochs(history, 1, 2, 3, 4, 5, 6); diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index 7f71da11..752acfe5 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -760,7 +760,7 @@ public class Cluster // Replay journal Journal journal = journalMap.get(id); - Iterator<Journal.TopologyUpdate> iter = journal.replayTopologies(); + Iterator<? extends Journal.TopologyUpdate> iter = journal.replayTopologies(); Journal.TopologyUpdate lastUpdate = null; while (iter.hasNext()) { @@ -859,7 +859,7 @@ public class Cluster CommandStores stores = nodeMap.get(node.id()).commandStores(); // run on node scheduler so doesn't run during replay scheduled = node.scheduler().selfRecurring(() -> { - journal.purge(stores); + journal.purge(stores, node.topology()::minEpoch); schedule(clusterScheduler, rs, nodes, nodeMap, journalMap); }, 0, SECONDS); } diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java index 855c2f83..1f54dbfa 100644 --- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java @@ -46,6 +46,7 @@ import accord.local.Node; import accord.local.RedundantBefore; import accord.local.StoreParticipants; import accord.primitives.Ballot; +import accord.primitives.EpochSupplier; import accord.primitives.PartialDeps; import accord.primitives.PartialTxn; import accord.primitives.Ranges; @@ -105,6 +106,7 @@ public class InMemoryJournal implements Journal { this.id = id; this.agent = agent; + } @Override @@ -217,14 +219,14 @@ public class InMemoryJournal implements Journal public void truncateTopologiesForTesting(long minEpoch) { - List<TopologyUpdate> next = new ArrayList<>(); - for (int i = 0; i < topologyUpdates.size(); i++) + Iterator<TopologyUpdate> iter = topologyUpdates.iterator(); + while (iter.hasNext()) { - TopologyUpdate update = topologyUpdates.get(i); - if (update.global.epoch() >= minEpoch) - next.add(update); + TopologyUpdate current = iter.next(); + if (current.global.epoch() >= minEpoch) + break; + iter.remove(); } - topologyUpdates.retainAll(next); } @Override @@ -294,8 +296,9 @@ public class InMemoryJournal implements Journal } @Override - public void purge(CommandStores commandStores) + public void purge(CommandStores commandStores, EpochSupplier minEpoch) { + truncateTopologiesForTesting(minEpoch.epoch()); for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> e : diffsPerCommandStore.entrySet()) { int commandStoreId = e.getKey(); diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java index 564fe00b..1469bc47 100644 --- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java @@ -32,6 +32,7 @@ import accord.local.Command; import accord.local.CommandStores; import accord.local.DurableBefore; import accord.local.RedundantBefore; +import accord.primitives.EpochSupplier; import accord.primitives.Ranges; import accord.primitives.Timestamp; import accord.primitives.TxnId; @@ -92,7 +93,7 @@ public class LoggingJournal implements Journal } @Override - public Iterator<TopologyUpdate> replayTopologies() + public Iterator<? extends TopologyUpdate> replayTopologies() { log("REPLAY TOPOLOGIES\n"); return delegate.replayTopologies(); @@ -108,10 +109,10 @@ public class LoggingJournal implements Journal } @Override - public void purge(CommandStores commandStores) + public void purge(CommandStores commandStores, EpochSupplier minEpoch) { log("PURGE\n"); - delegate.purge(commandStores); + delegate.purge(commandStores, minEpoch); } @Override diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java index a66b30a8..0a2083ff 100644 --- a/accord-core/src/test/java/accord/impl/list/ListAgent.java +++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java @@ -18,6 +18,9 @@ package accord.impl.list; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; @@ -46,6 +49,7 @@ import accord.primitives.Status; import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; +import accord.topology.TopologyManager; import accord.utils.Invariants; import accord.utils.RandomSource; import accord.utils.async.AsyncChain; @@ -122,12 +126,17 @@ public class ListAgent implements Agent onStale.accept(staleSince, ranges); } + private static final Set<Class<?>> expectedExceptions = new HashSet<>(Arrays.asList(SimulatedFault.class, ExecuteSyncPoint.SyncPointErased.class, CancellationException.class, TopologyManager.TopologyRetiredException.class)); @Override public void onUncaughtException(Throwable t) { + if (expectedExceptions.contains(t.getClass())) + return; + // TODO (required): why are we now seeing SnapshotAborted? Nothing inherently wrong with it, but should find out what has changed. - if (!(t instanceof CoordinationFailed) && !(t instanceof SimulatedFault) && !(t instanceof ExecuteSyncPoint.SyncPointErased) - && !(t instanceof CancellationException) && !(t.getCause() instanceof CancellationException) && !(t instanceof ListStore.SnapshotAborted)) + if (!(t instanceof CoordinationFailed) + && !(t.getCause() instanceof CancellationException) + && !(t instanceof ListStore.SnapshotAborted)) onFailure.accept(t); } diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java index 18f7b07b..ddc06379 100644 --- a/accord-core/src/test/java/accord/impl/list/ListRequest.java +++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java @@ -49,6 +49,7 @@ import accord.messages.Request; import accord.primitives.RoutingKeys; import accord.primitives.Txn; import accord.primitives.TxnId; +import accord.topology.TopologyManager; import javax.annotation.Nullable; @@ -95,8 +96,15 @@ public class ListRequest implements Request static void checkOnResult(Node node, TxnId txnId, RoutingKey homeKey, BiConsumer<Outcome, Throwable> callback) { - CheckOnResult result = new CheckOnResult(node, txnId, homeKey, callback); - result.start(); + try + { + CheckOnResult result = new CheckOnResult(node, txnId, homeKey, callback); + result.start(); + } + catch (Throwable t) + { + callback.accept(null, t); + } } @Override @@ -104,7 +112,7 @@ public class ListRequest implements Request { ++count; // this method is called for each reply, so if we see a reply where the status is not known, it may be known on others; - // once all status are merged, then onDone will apply aditional logic to make sure things are safe. + // once all status are merged, then onDone will apply additional logic to make sure things are safe. if (ok.maxKnowledgeSaveStatus == SaveStatus.Uninitialised) return Action.ApproveIfQuorum; return ok.maxKnowledgeSaveStatus.hasBeen(PreApplied) ? Action.Approve : Action.Reject; @@ -205,6 +213,14 @@ public class ListRequest implements Request node.withEpoch(txnId.epoch(), (success, fail) -> checkOnResult(hk, txnId, attempt + 1, t)); return; } + + if (txnId.epoch() < node.topology().minEpoch()) + { + node.reply(client, replyContext, ListResult.failure(client, ((Packet)replyContext).requestId, txnId), null); + node.agent().onUncaughtException(new TopologyManager.TopologyRetiredException(txnId.epoch(), node.topology().minEpoch())); + return; + } + if (homeKey == null) homeKey = node.computeRoute(txnId, txn.keys()).homeKey(); RoutingKey finalHomeKey = homeKey; diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/test/java/accord/impl/list/ListStore.java index 848f4292..ac1f3496 100644 --- a/accord-core/src/test/java/accord/impl/list/ListStore.java +++ b/accord-core/src/test/java/accord/impl/list/ListStore.java @@ -596,11 +596,6 @@ public class ListStore implements DataStore, ConfigurationService.Listener { } - @Override - public void truncateTopologyUntil(long epoch) - { - } - @Override public void onEpochClosed(Ranges ranges, long epoch) { diff --git a/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java b/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java index 0ed285d6..b90d5cda 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java +++ b/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java @@ -104,6 +104,12 @@ public class MockConfigurationService implements TestableConfigurationService { } + @Override + public void reportEpochRemoved(long epoch) + { + + } + @Override public synchronized void reportTopology(Topology topology) { diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java index 221a91ac..3752a953 100644 --- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java +++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java @@ -85,8 +85,8 @@ public class TopologyManagerTest int[] unmoved = { 1, 3, 5 }; int[] moved = { 2, 4 }; TopologyManager service = testTopologyManager(SUPPLIER, ID); - service.onTopologyUpdate(t1, () -> null); - service.onTopologyUpdate(t2, () -> null); + service.onTopologyUpdate(t1, () -> null, e -> {}); + service.onTopologyUpdate(t2, () -> null, e -> {}); for (Unseekables<?> select : Arrays.asList(Ranges.ofSortedAndDeoverlapped(range(10, 20)), Ranges.ofSortedAndDeoverlapped(range(110, 120)))) { @@ -113,8 +113,8 @@ public class TopologyManagerTest TopologyManager service = testTopologyManager(SUPPLIER, ID); Assertions.assertSame(Topology.EMPTY, service.current()); - service.onTopologyUpdate(topology1, () -> null); - service.onTopologyUpdate(topology2, () -> null); + service.onTopologyUpdate(topology1, () -> null, e -> {}); + service.onTopologyUpdate(topology2, () -> null, e -> {}); Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete()); Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete()); @@ -136,8 +136,8 @@ public class TopologyManagerTest shard(range(200, 300), idList(4, 5, 6), idSet(5, 6))); TopologyManager service = testTopologyManager(SUPPLIER, ID); - service.onTopologyUpdate(topology1, () -> null); - service.onTopologyUpdate(topology2, () -> null); + service.onTopologyUpdate(topology1, () -> null, e -> {}); + service.onTopologyUpdate(topology2, () -> null, e -> {}); return service; } @@ -164,9 +164,9 @@ public class TopologyManagerTest Shard[] shards = { shard(range(0, 100), idList(1, 2, 3), idSet(1, 2, 3)), shard(range(100, 200), idList(3, 4, 5), idSet(3, 4, 5)) }; - service.onTopologyUpdate(topology(1, shards), () -> null); - service.onTopologyUpdate(topology(2, shards), () -> null); - service.onTopologyUpdate(topology(3, shards), () -> null); + service.onTopologyUpdate(topology(1, shards), () -> null, e -> {}); + service.onTopologyUpdate(topology(2, shards), () -> null, e -> {}); + service.onTopologyUpdate(topology(3, shards), () -> null, e -> {}); for (int i = 1; i <= 5; i++) service.onEpochSyncComplete(id(i), service.epoch()); @@ -186,20 +186,18 @@ public class TopologyManagerTest Range range = range(100, 200); Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2))); Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3))); -// Topology topology3 = topology(3, shard(range, idList(1, 2, 3), idSet(3, 4))); TopologyManager service = testTopologyManager(SUPPLIER, ID); - service.onTopologyUpdate(topology1, () -> null); + service.onTopologyUpdate(topology1, () -> null, e -> {}); // sync epoch 2 service.onEpochSyncComplete(id(2), 2); service.onEpochSyncComplete(id(3), 2); // learn of epoch 2 - service.onTopologyUpdate(topology2, () -> null); + service.onTopologyUpdate(topology2, () -> null, e -> {}); Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete()); Assertions.assertTrue(service.getEpochStateUnsafe(2).syncComplete()); -// Assertions.assertTrue(service.getEpochStateUnsafe(3).syncComplete()); } @Test @@ -213,9 +211,9 @@ public class TopologyManagerTest TopologyManager service = testTopologyManager(SUPPLIER, ID); Assertions.assertSame(Topology.EMPTY, service.current()); - service.onTopologyUpdate(topology1, () -> null); - service.onTopologyUpdate(topology2, () -> null); - service.onTopologyUpdate(topology3, () -> null); + service.onTopologyUpdate(topology1, () -> null, e -> {}); + service.onTopologyUpdate(topology2, () -> null, e -> {}); + service.onTopologyUpdate(topology3, () -> null, e -> {}); Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete()); RoutingKeys keys = keys(150).toParticipants(); @@ -241,8 +239,8 @@ public class TopologyManagerTest shard(range(200, 300), idList(4, 5, 6), idSet(5, 6))); TopologyManager service = testTopologyManager(SUPPLIER, ID); - service.onTopologyUpdate(topology5, () -> null); - service.onTopologyUpdate(topology6, () -> null); + service.onTopologyUpdate(topology5, () -> null, e -> {}); + service.onTopologyUpdate(topology6, () -> null, e -> {}); Assertions.assertSame(topology6, service.getEpochStateUnsafe(6).global()); Assertions.assertSame(topology5, service.getEpochStateUnsafe(5).global()); @@ -260,7 +258,7 @@ public class TopologyManagerTest private static void addAndMarkSynced(TopologyManager service, Topology topology) { - service.onTopologyUpdate(topology, () -> null); + service.onTopologyUpdate(topology, () -> null, e -> {}); markTopologySynced(service, topology.epoch()); } @@ -279,7 +277,7 @@ public class TopologyManagerTest Assertions.assertTrue(service.hasEpoch(3)); Assertions.assertTrue(service.hasEpoch(4)); - service.truncateTopologyUntil(3); + service.truncateTopologiesUntil(3); Assertions.assertFalse(service.hasEpoch(1)); Assertions.assertFalse(service.hasEpoch(2)); Assertions.assertTrue(service.hasEpoch(3)); @@ -550,7 +548,7 @@ public class TopologyManagerTest case OnTopologyUpdate: Topology t = next.next(); preTopologyUpdate(id, t); - tm.onTopologyUpdate(t, () -> null); + tm.onTopologyUpdate(t, () -> null, e -> {}); pendingSyncComplete.put(t.epoch, new HashSet<>(t.nodes())); postTopologyUpdate(id, t); break; diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java index 4a7c2703..bef889e3 100644 --- a/accord-core/src/test/java/accord/utils/Property.java +++ b/accord-core/src/test/java/accord/utils/Property.java @@ -495,7 +495,6 @@ public class Property } catch (Throwable t) { - throw new PropertyError(statefulPropertyError(this, t, state, maybeRewriteHistory(history, historyTiming)), t); } if (pure) diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index f26cc7dc..01161363 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -67,6 +67,7 @@ import accord.messages.Reply.FailureReply; import accord.messages.ReplyContext; import accord.messages.Request; import accord.messages.SafeCallback; +import accord.primitives.EpochSupplier; import accord.primitives.Ranges; import accord.primitives.Timestamp; import accord.primitives.TxnId; @@ -392,7 +393,7 @@ public class Cluster implements Scheduler @Override public void saveCommand(int store, CommandUpdate value, Runnable onFlush) { throw new IllegalStateException("Not impelemented"); } @Override public Iterator<TopologyUpdate> replayTopologies() { throw new IllegalStateException("Not impelemented"); } @Override public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush) { throw new IllegalStateException("Not impelemented"); } - @Override public void purge(CommandStores commandStores) { throw new IllegalStateException("Not impelemented"); } + @Override public void purge(CommandStores commandStores, EpochSupplier minEpoch) { throw new IllegalStateException("Not impelemented"); } @Override public void replay(CommandStores commandStores) { throw new IllegalStateException("Not impelemented"); } @Override public RedundantBefore loadRedundantBefore(int store) { throw new IllegalStateException("Not impelemented"); } @Override public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store) { throw new IllegalStateException("Not impelemented"); } diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java b/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java index 6371c0cf..e4e36644 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java @@ -69,5 +69,10 @@ public class SimpleConfigService implements ConfigurationService public void reportEpochRetired(Ranges ranges, long epoch) { } + + @Override + public void reportEpochRemoved(long epoch) + { + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org