This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 973335bf Follow-up to CASSANDRA-21042: SyncPoints should operate
against the last epoch containing their ranges, which should refuse other
transactions
973335bf is described below
commit 973335bfe7e930c6646016a4a48cff084f49b660
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Nov 26 10:30:58 2025 +0000
Follow-up to CASSANDRA-21042: SyncPoints should operate against the last
epoch containing their ranges, which should refuse other transactions
---
.../coordinate/AbstractCoordinatePreAccept.java | 6 +-
.../accord/coordinate/CoordinateMaxConflict.java | 6 +-
.../accord/coordinate/CoordinateSyncPoint.java | 2 +-
.../accord/coordinate/CoordinationAdapter.java | 4 +-
.../java/accord/coordinate/ExecuteSyncPoint.java | 2 +-
.../src/main/java/accord/local/Bootstrap.java | 2 +-
accord-core/src/main/java/accord/local/Node.java | 56 ++++++++++-------
.../accord/local/durability/DurabilityService.java | 19 +++---
.../accord/local/durability/ShardDurability.java | 3 +-
.../main/java/accord/messages/NoWaitRequest.java | 1 +
.../src/main/java/accord/topology/ActiveEpoch.java | 9 ++-
.../main/java/accord/topology/ActiveEpochs.java | 12 +++-
.../src/main/java/accord/topology/Topology.java | 37 ++++++++++-
.../main/java/accord/topology/TopologyManager.java | 73 +++++++++++++++++++++-
.../java/accord/topology/TopologyMismatch.java | 69 ++++++++------------
.../src/test/java/accord/burn/TopologyUpdates.java | 1 +
.../coordinate/CoordinateTransactionTest.java | 17 +++--
.../java/accord/coordinate/TopologyChangeTest.java | 6 +-
.../src/test/java/accord/impl/list/ListAgent.java | 3 +-
.../src/test/java/accord/local/CommandsTest.java | 2 +-
.../java/accord/local/ImmutableCommandTest.java | 4 +-
.../test/java/accord/messages/ReadDataTest.java | 2 +-
.../java/accord/topology/TopologyRandomizer.java | 7 ++-
23 files changed, 226 insertions(+), 117 deletions(-)
diff --git
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
index 1bb115ee..f31ab8a0 100644
---
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
+++
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
@@ -27,12 +27,8 @@ import accord.messages.Callback;
import accord.primitives.FullRoute;
import accord.primitives.TxnId;
import accord.topology.Topologies;
-import accord.topology.Topology;
import accord.topology.TopologyException;
import accord.topology.TopologyMismatch;
-import accord.topology.TopologyRetiredException;
-
-import static accord.topology.TopologyMismatch.TopologyMatch.LATEST;
/**
* Abstract parent class for implementing preaccept-like operations where we
may need to fetch additional replies
@@ -64,7 +60,7 @@ abstract class AbstractCoordinatePreAccept<Result, Reply
extends accord.messages
TopologyMismatch mismatch;
try
{
- mismatch = TopologyMismatch.checkForMismatch(latestEpoch, scope,
node.topology().active(), LATEST);
+ mismatch = TopologyMismatch.checkForMismatch(latestEpoch, scope,
node.topology().active(), txnId.kind());
}
catch (TopologyException t)
{
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateMaxConflict.java
b/accord-core/src/main/java/accord/coordinate/CoordinateMaxConflict.java
index 2f0f589c..a4a42cd1 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateMaxConflict.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateMaxConflict.java
@@ -33,6 +33,7 @@ import accord.messages.GetMaxConflict.GetMaxConflictOk;
import accord.primitives.FullRoute;
import accord.primitives.Routables;
import accord.primitives.Timestamp;
+import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.topology.ActiveEpochs;
import accord.topology.Topologies;
@@ -43,7 +44,6 @@ import accord.utils.async.Cancellable;
import static accord.coordinate.tracking.RequestStatus.Failed;
import static accord.coordinate.tracking.RequestStatus.Success;
-import static accord.topology.TopologyMismatch.TopologyMatch.ANY;
/**
* Calculate the maximum TxnId that could have been agreed before this
operation started
@@ -82,8 +82,8 @@ public class CoordinateMaxConflict extends
AbstractCoordinatePreAccept<Timestamp
try
{
ActiveEpochs active = node.topology().active();
- long epoch = active.epoch();
- FullRoute<?> route = node.computeRoute(epoch, keysOrRanges,
active, ANY);
+ long epoch = active.maxEpoch(Long.MIN_VALUE, keysOrRanges);
+ FullRoute<?> route = node.computeRoute(epoch, keysOrRanges,
active, Txn.Kind.ExclusiveSyncPoint);
Topologies topologies = active.withUnsyncedEpochs(route, epoch,
epoch);
coordinate = new CoordinateMaxConflict(node,
node.someSequentialExecutor(), topologies, route, epoch, callback);
}
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
index 88b2f2aa..2c8ae89c 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
@@ -96,7 +96,7 @@ public class CoordinateSyncPoint<R> extends
CoordinatePreAccept<R>
public static AsyncChain<SyncPoint> coordinate(Node node, Txn.Kind kind,
Ranges ranges, SyncPointAdapter<SyncPoint> adapter)
{
Invariants.requireArgument(kind.isSyncPoint());
- TxnId txnId = node.nextTxnIdWithDefaultFlags(kind, ranges.domain(),
cardinality(ranges));
+ TxnId txnId = node.nextTxnIdWithDefaultFlags(ranges, kind,
ranges.domain(), cardinality(ranges));
return node.withEpochExact(txnId.epoch(), null, () -> coordinate(node,
txnId, ranges, adapter));
}
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
index 7d8e8510..bf169935 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
@@ -462,7 +462,7 @@ public interface CoordinationAdapter<R>
if (txnId.is(Routable.Domain.Range))
{
MinimalSyncPoint syncPoint = new MinimalSyncPoint(txnId,
executeAt, (RangeRoute) route);
- node.topology().onEpochClosed(syncPoint.route.toRanges(),
syncPoint.syncId.epoch() - 1);
+ node.topology().onEpochClosed(syncPoint.route.toRanges(),
syncPoint.syncId);
node.durability().report(new DurabilityResult(syncPoint,
DurabilityLevel.NONE, null));
}
}
@@ -482,7 +482,7 @@ public interface CoordinationAdapter<R>
}
if (txnId.is(Routable.Domain.Range))
{
- node.topology().onEpochClosed(syncPoint.route.toRanges(),
syncPoint.syncId.epoch() - 1);
+ node.topology().onEpochClosed(syncPoint.route.toRanges(),
syncPoint.syncId);
node.durability().report(new DurabilityResult(syncPoint,
DurabilityLevel.NONE, null));
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index 986f9bc8..7cac839e 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -240,7 +240,7 @@ public class ExecuteSyncPoint extends
AbstractCoordination<Route<Range>, Durabil
DurabilityResult result = current();
if (result.min.remote == SyncRemote.All)
{
- node.topology().onEpochRetired(scope.toRanges(),
syncPoint.syncId.epoch() - 1);
+ node.topology().onEpochRetired(scope.toRanges(),
syncPoint.syncId);
node.send(tracker.topologies(), new SetShardDurable(syncPoint,
Universal));
}
else if (result.min.remote == SyncRemote.Quorum)
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java
b/accord-core/src/main/java/accord/local/Bootstrap.java
index 56661dbd..27aba94c 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -118,7 +118,7 @@ class Bootstrap
TxnId start(SafeCommandStore safeStore)
{
- globalSyncId = node.nextTxnIdWithDefaultFlags(epoch, 0,
ExclusiveSyncPoint, Domain.Range, Cardinality.Any);
+ globalSyncId = node.nextTxnIdWithDefaultFlags(epoch, 0, valid,
ExclusiveSyncPoint, Domain.Range, Cardinality.Any);
Invariants.require(epoch <= globalSyncId.epoch(), "Attempting to
use local epoch %d which is larger than global epoch %d", epoch,
globalSyncId.epoch());
if (valid.isEmpty())
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index 36cd6710..0ff73086 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -82,7 +82,6 @@ import accord.primitives.TxnId;
import accord.primitives.TxnId.Cardinality;
import accord.topology.TopologyException;
import accord.topology.TopologyManager;
-import accord.topology.TopologyMismatch.TopologyMatch;
import accord.topology.TopologyRetiredException;
import accord.utils.Invariants;
import accord.utils.PersistentField;
@@ -567,47 +566,56 @@ public class Node implements NodeCommandStoreService
messageSink.reply(replyingToNode, replyContext, send);
}
- public TxnId nextTxnIdWithDefaultFlags(Txn.Kind rw, Domain domain)
+ public TxnId nextTxnIdWithDefaultFlags(Seekables<?, ?> keys, Txn.Kind
kind, Domain domain)
{
- return nextTxnIdWithFlags(rw, domain, Any, defaultMediumPath().bit());
+ return nextTxnIdWithFlags(keys, kind, domain, Any,
defaultMediumPath().bit());
}
- public TxnId nextStaleTxnIdWithDefaultFlags(long minEpoch, long minHlc,
Txn.Kind rw, Domain domain)
+ public TxnId nextStaleTxnIdWithDefaultFlags(long minEpoch, long minHlc,
Seekables<?, ?> keys, Txn.Kind kind, Domain domain)
{
- return nextStaleTxnIdWithFlags(minEpoch, minHlc, rw, domain, Any,
defaultMediumPath().bit());
+ return nextStaleTxnIdWithFlags(minEpoch, minHlc, keys, kind, domain,
Any, defaultMediumPath().bit());
}
- public TxnId nextTxnIdWithDefaultFlags(Txn.Kind rw, Domain domain,
Cardinality cardinality)
+ public TxnId nextTxnIdWithDefaultFlags(Seekables<?, ?> keys, Txn.Kind
kind, Domain domain, Cardinality cardinality)
{
- return nextTxnIdWithFlags(rw, domain, cardinality,
defaultMediumPath().bit());
+ return nextTxnIdWithFlags(keys, kind, domain, cardinality,
defaultMediumPath().bit());
}
- public TxnId nextTxnIdWithDefaultFlags(long minEpoch, long minHlc,
Txn.Kind rw, Domain domain, Cardinality cardinality)
+ private long epoch(long minEpoch, Seekables<?, ?> keys, Txn.Kind kind)
{
- return newTxnId(Math.max(minEpoch, epoch()), uniqueNow(minHlc), rw,
domain, cardinality, defaultMediumPath().bit(), id);
+ if (!kind.isSyncPoint())
+ return Math.max(minEpoch, epoch());
+
+ return topology.active().maxEpoch(minEpoch, keys);
+ }
+
+ public TxnId nextTxnIdWithDefaultFlags(long minEpoch, long minHlc,
Seekables<?, ?> keys, Txn.Kind kind, Domain domain, Cardinality cardinality)
+ {
+ long epoch = epoch(minEpoch, keys, kind);
+ return newTxnId(epoch, uniqueNow(minHlc), kind, domain, cardinality,
defaultMediumPath().bit(), id);
}
/**
* TODO (required): Make sure we cannot re-issue the same txnid on startup
* TODO (required): Don't use new epoch for TxnId until a quorum is ready
to coordinate it
*/
- public TxnId nextTxnIdWithFlags(Txn.Kind rw, Domain domain, Cardinality
cardinality, int flags)
+ public TxnId nextTxnIdWithFlags(Seekables<?, ?> keys, Txn.Kind kind,
Domain domain, Cardinality cardinality, int flags)
{
- return newTxnId(epoch(), uniqueNow(), rw, domain, cardinality, flags,
id);
+ return newTxnId(epoch(Long.MIN_VALUE, keys, kind), uniqueNow(), kind,
domain, cardinality, flags, id);
}
- public TxnId nextStaleTxnIdWithFlags(long minEpoch, long minHlc, Txn.Kind
rw, Domain domain, Cardinality cardinality, int flags)
+ public TxnId nextStaleTxnIdWithFlags(long minEpoch, long minHlc,
Seekables<?, ?> keys, Txn.Kind kind, Domain domain, Cardinality cardinality,
int flags)
{
- long epoch = Math.max(minEpoch, epoch());
+ long epoch = epoch(minEpoch, keys, kind);
long hlc = uniqueStale(minHlc);
- return newTxnId(epoch, hlc, rw, domain, cardinality, flags, id);
+ return newTxnId(epoch, hlc, kind, domain, cardinality, flags, id);
}
- private static TxnId newTxnId(long epoch, long now, Txn.Kind rw, Domain
domain, Cardinality cardinality, int flags, Node.Id node)
+ private static TxnId newTxnId(long epoch, long now, Txn.Kind kind, Domain
domain, Cardinality cardinality, int flags, Node.Id node)
{
- Invariants.require(domain == Key || rw != Write, "Range writes not
supported without forwarding uniqueHlc information to WaitingOn for direct
dependencies");
- Invariants.require(domain == Range || !rw.isSyncPoint, "Key
ExclusiveSyncPoint not supported without improvements to CommandsForKey for
managing execution");
- TxnId txnId = new TxnId(epoch, now, flags, rw, domain, cardinality,
node);
+ Invariants.require(domain == Key || kind != Write, "Range writes not
supported without forwarding uniqueHlc information to WaitingOn for direct
dependencies");
+ Invariants.require(domain == Range || !kind.isSyncPoint, "Key
ExclusiveSyncPoint not supported without improvements to CommandsForKey for
managing execution");
+ TxnId txnId = new TxnId(epoch, now, flags, kind, domain, cardinality,
node);
Invariants.require((txnId.lsb & (0xffff & ~TxnId.IDENTITY_FLAGS)) ==
0);
return txnId;
}
@@ -635,9 +643,9 @@ public class Node implements NodeCommandStoreService
Cardinality cardinality = cardinality(domain, keys);
if (!usePrivilegedCoordinator() || (kind != Read && kind != Write))
- return nextTxnIdWithDefaultFlags(minEpoch, minHlc, kind, domain,
cardinality);
+ return nextTxnIdWithDefaultFlags(minEpoch, minHlc, keys, kind,
domain, cardinality);
- long epoch = Math.max(minEpoch, epoch());
+ long epoch = epoch(minEpoch, keys, kind);
long hlc = uniqueNow(minHlc);
int flags = computeBestDefaultTxnIdFlags(keys, epoch);
TxnId txnId = new TxnId(epoch, hlc, flags, kind, domain, cardinality,
id);
@@ -662,7 +670,7 @@ public class Node implements NodeCommandStoreService
Txn.Kind kind = txn.kind();
Domain domain = keys.domain();
- long epoch = epoch();
+ long epoch = epoch(Long.MIN_VALUE, keys, kind);
long now = uniqueNow();
fastPath = ensurePermitted(fastPath);
if (fastPath != Unoptimised && (!epochs.hasEpoch(epoch) ||
!epochs.supportsPrivilegedFastPath(keys, epoch)))
@@ -699,17 +707,17 @@ public class Node implements NodeCommandStoreService
public FullRoute<?> computeRoute(TxnId txnId, Routables<?> keysOrRanges)
throws TopologyException
{
- return computeRoute(txnId.epoch(), keysOrRanges, topology.active(),
txnId.isSyncPoint() ? TopologyMatch.ANY : TopologyMatch.LATEST);
+ return computeRoute(txnId.epoch(), keysOrRanges, topology.active(),
txnId.kind());
}
- public FullRoute<?> computeRoute(long epoch, Routables<?> keysOrRanges,
ActiveEpochs active, TopologyMatch match) throws TopologyException
+ public FullRoute<?> computeRoute(long epoch, Routables<?> keysOrRanges,
ActiveEpochs active, Txn.Kind kind) throws TopologyException
{
Invariants.requireArgument(!keysOrRanges.isEmpty(), "Attempted to
compute a route from empty keys or ranges");
RoutingKey homeKey = selectHomeKey(active.get(epoch), keysOrRanges);
FullRoute<?> route = keysOrRanges.toRoute(homeKey);
- TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(epoch,
keysOrRanges, active, match);
+ TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(epoch,
keysOrRanges, active, kind);
if (mismatch != null)
throw mismatch;
diff --git
a/accord-core/src/main/java/accord/local/durability/DurabilityService.java
b/accord-core/src/main/java/accord/local/durability/DurabilityService.java
index 18adccee..eba7b53a 100644
--- a/accord-core/src/main/java/accord/local/durability/DurabilityService.java
+++ b/accord-core/src/main/java/accord/local/durability/DurabilityService.java
@@ -40,6 +40,7 @@ import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.topology.ActiveEpoch;
import accord.topology.ActiveEpochs;
+import accord.topology.Shard;
import accord.topology.Topology;
import accord.topology.TopologyException;
import accord.utils.Invariants;
@@ -239,12 +240,11 @@ public class DurabilityService implements TopologyListener
next.reportSuccess();
}
-
@Override
- public void onReceived(Topology topology)
+ public void onActive(ActiveEpoch epoch)
{
- shards.updateTopology(topology);
- global.updateTopology(topology);
+ shards.updateTopology(epoch.global());
+ global.updateTopology(epoch.global());
}
@Override
@@ -261,10 +261,13 @@ public class DurabilityService implements TopologyListener
e = epochs.getKnown(epochs.minEpoch());
}
+ Ranges retiredAndRemoved = e.global().foldl(retiredRanges, (shard, rs,
i) -> {
+ if (shard.is(Shard.Flag.PENDING_REMOVAL))
+ return rs.with(Ranges.of(shard.range));
+ return rs;
+ }, Ranges.EMPTY);
// if the ranges are retired and have been removed in the epoch in
which they're retired, then we can retire the associated scheduler(s)
- retiredRanges = retiredRanges.without(e.global().ranges());
- if (retiredRanges.isEmpty())
- return;
- shards.retireRanges(retiredRanges, epoch);
+ if (!retiredAndRemoved.isEmpty())
+ shards.retireRanges(retiredAndRemoved, epoch);
}
}
\ No newline at end of file
diff --git
a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
index ca269279..fb287db4 100644
--- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
+++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
@@ -38,7 +38,6 @@ import accord.coordinate.CoordinationFailed;
import accord.local.DurableBefore;
import accord.local.Node;
import accord.local.ShardDistributor;
-import accord.primitives.FullRoute;
import accord.primitives.SyncPoint;
import accord.primitives.Range;
import accord.primitives.Ranges;
@@ -369,7 +368,7 @@ public class ShardDurability
}
}
minHlc = Math.max(minHlc, node.agent().minStaleHlc(node,
activeRequest != null));
- TxnId staleId = node.nextStaleTxnIdWithDefaultFlags(minEpoch,
minHlc, kind, Domain.Range);
+ TxnId staleId = node.nextStaleTxnIdWithDefaultFlags(minEpoch,
minHlc, ranges, kind, Domain.Range);
if (activeRequest != null) logger.info("Initiating RX requested by
{} for {} with TxnId {}. Remaining: {}.", activeRequest.requestedBy, ranges,
staleId, active);
else logger.debug("Initiating RX for durability of {} with TxnId
{}.", ranges, staleId);
diff --git a/accord-core/src/main/java/accord/messages/NoWaitRequest.java
b/accord-core/src/main/java/accord/messages/NoWaitRequest.java
index fa58359e..ea6a252d 100644
--- a/accord-core/src/main/java/accord/messages/NoWaitRequest.java
+++ b/accord-core/src/main/java/accord/messages/NoWaitRequest.java
@@ -124,6 +124,7 @@ public abstract class NoWaitRequest<P extends
Participants<?>, R extends Reply>
protected void acceptInternal(R reply, Throwable failure)
{
+ Invariants.require(reply != null || failure != null, "No reply
produced for %s", this);
if (failure != null || reply.isFinal())
{
Invariants.require(!hasSentFinalReply);
diff --git a/accord-core/src/main/java/accord/topology/ActiveEpoch.java
b/accord-core/src/main/java/accord/topology/ActiveEpoch.java
index 6beadf49..89ef2b73 100644
--- a/accord-core/src/main/java/accord/topology/ActiveEpoch.java
+++ b/accord-core/src/main/java/accord/topology/ActiveEpoch.java
@@ -30,6 +30,7 @@ import accord.utils.SimpleBitSet;
import static accord.coordinate.tracking.RequestStatus.Success;
import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
+import static accord.primitives.Routables.Slice.Minimal;
public final class ActiveEpoch
{
@@ -165,7 +166,8 @@ public final class ActiveEpoch
ranges = ranges.without(closed);
if (ranges.isEmpty())
return ranges;
- closed = closed.union(MERGE_ADJACENT, ranges);
+ Ranges add = ranges.slice(global.ranges, Minimal);
+ closed = closed.union(MERGE_ADJACENT, add);
Invariants.require(closed.mergeTouching() == closed);
return ranges.without(addedRanges);
}
@@ -177,8 +179,9 @@ public final class ActiveEpoch
if (ranges.isEmpty())
return ranges;
quorumReady = quorumReady.union(MERGE_ADJACENT, ranges);
- closed = closed.union(MERGE_ADJACENT, ranges);
- retired = retired.union(MERGE_ADJACENT, ranges);
+ Ranges add = ranges.slice(global.ranges, Minimal);
+ closed = closed.union(MERGE_ADJACENT, add);
+ retired = retired.union(MERGE_ADJACENT, add);
Invariants.require(closed.mergeTouching() == closed);
Invariants.require(retired.mergeTouching() == retired);
return ranges.without(addedRanges);
diff --git a/accord-core/src/main/java/accord/topology/ActiveEpochs.java
b/accord-core/src/main/java/accord/topology/ActiveEpochs.java
index c5883a10..a0e16752 100644
--- a/accord-core/src/main/java/accord/topology/ActiveEpochs.java
+++ b/accord-core/src/main/java/accord/topology/ActiveEpochs.java
@@ -132,6 +132,16 @@ public final class ActiveEpochs implements
Iterable<ActiveEpoch>
return currentEpoch;
}
+ public long maxEpoch(long minEpoch, Routables<?> keys)
+ {
+ long epoch = Math.max(epoch(), minEpoch);
+ while (!getKnown(epoch).global().ranges().containsAll(keys))
+ {
+ if (--epoch < minEpoch())
+ throw new IllegalArgumentException(keys + " not found in any
active epoch");
+ }
+ return epoch;
+ }
public Topology current()
{
return epochs.length > 0 ? epochs[0].global() : Topology.EMPTY;
@@ -606,7 +616,7 @@ public final class ActiveEpochs implements
Iterable<ActiveEpoch>
ActiveEpoch e = ifExists(epoch);
if (e == null)
return null;
- return e.global().forKey(key);
+ return e.global().forKeyIfKnown(key);
}
public Shard forEpoch(RoutableKey key, long epoch) throws TopologyException
diff --git a/accord-core/src/main/java/accord/topology/Topology.java
b/accord-core/src/main/java/accord/topology/Topology.java
index c79fb762..4f8e4103 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -348,12 +348,20 @@ public class Topology
// TODO (low priority, efficiency): optimised HomeKey concept containing
the Key, Shard and Topology to avoid lookups when topology hasn't changed
public Shard forKey(RoutableKey key)
{
- int i = subsetOfRanges.indexOf(key);
+ int i = indexForKey(key);
if (i < 0)
throw illegalArgument("Range not found for " + key);
return shards[supersetIndexes[i]];
}
+ public Shard forKeyIfKnown(RoutableKey key)
+ {
+ int i = indexForKey(key);
+ if (i < 0)
+ return null;
+ return shards[supersetIndexes[i]];
+ }
+
public int indexForKey(RoutableKey key)
{
return subsetOfRanges.indexOf(key);
@@ -497,6 +505,7 @@ public class Topology
if (abi < 0)
break;
+ ai = (int)abi;
bi = (int)(abi >>> 32);
if (count == newSubset.length)
newSubset = cachedInts.resize(newSubset, count, count
* 2);
@@ -547,6 +556,32 @@ public class Topology
return accumulator;
}
+ public <P, T> T foldlWithDefault(Routables<?> select,
IndexedTriFunction<Shard, P, T, T> function, Shard ifNull, P param, T
accumulator)
+ {
+ Routables<?> as = select;
+ Ranges bs = subsetOfRanges;
+ int ai = 0, amax = 0, bi = 0;
+
+ while (true)
+ {
+ long abi = as.findNextIntersection(ai, bs, bi);
+ if (abi < 0)
+ break;
+
+ int nextai = (int)(abi);
+ bi = (int)(abi >>> 32);
+
+ if (nextai > amax + 1)
+ accumulator = function.apply(null, param, accumulator, -1 -
bi);
+ accumulator = function.apply(shards[supersetIndexes[bi]], param,
accumulator, bi);
+ amax = nextai + 1;
+ ai = nextai;
+ ++bi;
+ }
+
+ return accumulator;
+ }
+
public void forEachOn(Id on, IndexedConsumer<Shard> consumer)
{
NodeInfo info = nodeLookup.get(on.id);
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 1b4a1e89..2e56d077 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -41,6 +41,7 @@ import accord.local.Node;
import accord.local.Node.Id;
import accord.local.TimeService;
import accord.primitives.Ranges;
+import accord.primitives.TxnId;
import accord.topology.Topologies.SelectNodeOwnership;
import accord.topology.TopologyCollector.BestFastPath;
import accord.topology.TopologyCollector.Simple;
@@ -116,11 +117,37 @@ public class TopologyManager
}
public void onEpochClosed(Ranges ranges, long epoch)
+ {
+ onEpochClosed(ranges, epoch, null);
+ }
+
+ public void onEpochClosed(Ranges ranges, TxnId txnId)
+ {
+ onEpochClosed(ranges, txnId.epoch(), txnId);
+ }
+
+ private void onEpochClosed(Ranges ranges, long epoch, @Nullable TxnId
txnId)
{
Topology topology;
synchronized (this)
{
- topology = active.maybeGlobalForEpoch(epoch);
+ if (txnId == null)
+ {
+ topology = active.maybeGlobalForEpoch(epoch);
+ }
+ else
+ {
+ ActiveEpoch e = active.ifExists(epoch);
+ if (e != null)
+ {
+ ranges = ranges.without(e.addedRanges);
+ if (ranges.isEmpty())
+ return;
+ }
+
+ --epoch;
+ topology = active.maybeGlobalForEpoch(epoch);
+ }
if (epoch > active.currentEpoch)
ranges = pending.closed(ranges, epoch);
ranges = active.closed(ranges, epoch);
@@ -134,10 +161,41 @@ public class TopologyManager
public void onEpochRetired(Ranges ranges, long epoch)
{
- Topology topology;
+ onEpochRetired(ranges, epoch, null);
+ }
+
+ public void onEpochRetired(Ranges ranges, TxnId txnId)
+ {
+ onEpochRetired(ranges, txnId.epoch(), txnId);
+ }
+
+ private void onEpochRetired(Ranges ranges, long epoch, @Nullable TxnId
txnId)
+ {
+ Topology topology = null;
synchronized (this)
{
- topology = active.maybeGlobalForEpoch(epoch);
+ if (txnId == null)
+ {
+ topology = active.maybeGlobalForEpoch(epoch);
+ }
+ else
+ {
+ ActiveEpoch e = active.ifExists(epoch);
+ if (e != null)
+ {
+ ranges = ranges.without(e.addedRanges);
+ if (ranges.isEmpty())
+ return;
+
+ topology = active.maybeGlobalForEpoch(epoch);
+ }
+
+ if (topology == null || notPendingRemoval(ranges, topology))
+ {
+ --epoch;
+ topology = active.maybeGlobalForEpoch(epoch);
+ }
+ }
if (epoch > active.currentEpoch)
ranges = pending.retired(ranges, epoch);
ranges = active.retired(ranges, epoch);
@@ -149,6 +207,15 @@ public class TopologyManager
}
}
+ private boolean notPendingRemoval(Ranges ranges, Topology topology)
+ {
+ return topology.foldlWithDefault(ranges, (shard, p, v, i) -> {
+ if (shard == null || !shard.is(Shard.Flag.PENDING_REMOVAL))
+ return Boolean.TRUE;
+ return v;
+ }, null, null, Boolean.FALSE);
+ }
+
public synchronized void truncateTopologiesUntil(long epoch)
{
ActiveEpochs current = active;
diff --git a/accord-core/src/main/java/accord/topology/TopologyMismatch.java
b/accord-core/src/main/java/accord/topology/TopologyMismatch.java
index a299a82c..b3e0f989 100644
--- a/accord-core/src/main/java/accord/topology/TopologyMismatch.java
+++ b/accord-core/src/main/java/accord/topology/TopologyMismatch.java
@@ -20,28 +20,12 @@ package accord.topology;
import javax.annotation.Nullable;
-import accord.primitives.Ranges;
import accord.primitives.Routables;
+import accord.primitives.Txn;
import accord.utils.UnhandledEnum;
public final class TopologyMismatch extends TopologyException
{
- public enum TopologyMatch
- {
- /**
- * All participating topologies are expected to contain all
participating keys.
- * This is for user transactions, which must operate at all times on
topologies
- * consistent with the operation.
- */
- LATEST,
-
- /**
- * All participating keys are expected to be contained in SOME
participating epoch.
- * This is used for sync points which may be run after some range has
been removed.
- */
- ANY
- }
-
private TopologyMismatch(String message)
{
super(message);
@@ -58,37 +42,34 @@ public final class TopologyMismatch extends
TopologyException
return new TopologyMismatch(getMessage(), this);
}
+ private enum Mismatch { NOT_KNOWN, PENDING_REMOVAL }
+
@Nullable
- public static TopologyMismatch checkForMismatch(long epoch, Routables<?>
keysOrRanges, ActiveEpochs active, TopologyMatch match) throws TopologyException
+ public static TopologyMismatch checkForMismatch(long epoch, Routables<?>
keysOrRanges, ActiveEpochs active, Txn.Kind kind) throws TopologyException
{
- switch (match)
- {
- default: throw new UnhandledEnum(match);
- case ANY:
- {
- long e = Math.min(active.currentEpoch, epoch);
- while (e >= active.minEpoch())
- {
- Ranges rs = active.getKnown(e).global.ranges();
- if (rs.containsAll(keysOrRanges))
- return null;
+ Topology topology = active.globalForEpoch(epoch);
+ Mismatch result = topology.foldlWithDefault(keysOrRanges, (shard, k,
v, i) -> {
+ if (shard == null)
+ return Mismatch.NOT_KNOWN;
+ if (shard.is(Shard.Flag.PENDING_REMOVAL) && !k.isSyncPoint())
+ return Mismatch.PENDING_REMOVAL;
+ return v;
+ }, null, kind, null);
- keysOrRanges = keysOrRanges.without(rs);
- --e;
- }
+ if (result == null)
+ return null;
- String message = String.format("Txn attempted to access keys
or ranges that are not known in any epoch (%s)", keysOrRanges);
- return new TopologyMismatch(message);
- }
- case LATEST:
- {
- Topology topology = active.globalForEpoch(epoch);
- if (topology.ranges().containsAll(keysOrRanges))
- return null;
-
- String message = String.format("Txn attempted to access keys
or ranges that are not known in the epoch %d (%s)", topology.epoch(),
keysOrRanges);
- return new TopologyMismatch(message);
- }
+ String message;
+ switch (result)
+ {
+ default: throw new UnhandledEnum(result);
+ case PENDING_REMOVAL:
+ message = String.format("Txn attempted to access keys or
ranges that are being removed in epoch %d (%s)", topology.epoch(),
keysOrRanges);
+ break;
+ case NOT_KNOWN:
+ message = String.format("Txn attempted to access keys or
ranges that are not known in the epoch %d (%s)", topology.epoch(),
keysOrRanges);
+ break;
}
+ return new TopologyMismatch(message);
}
}
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index 98ac4da1..50501d9b 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -21,6 +21,7 @@ package accord.burn;
import accord.api.AsyncExecutor;
import accord.local.Node;
import accord.primitives.Ranges;
+import accord.primitives.TxnId;
import accord.topology.Topology;
import accord.utils.Invariants;
import accord.utils.MessageTask;
diff --git
a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
index d27d65e2..e5c836e8 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
@@ -30,8 +30,6 @@ import accord.impl.mock.MockStore;
import accord.local.Node;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
-import accord.primitives.FullKeyRoute;
-import accord.primitives.FullRangeRoute;
import accord.primitives.Keys;
import accord.primitives.Ranges;
import accord.primitives.Txn;
@@ -63,8 +61,8 @@ public class CoordinateTransactionTest
Node node = cluster.get(1);
assertNotNull(node);
- TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key);
Keys keys = keys(10);
+ TxnId txnId = node.nextTxnIdWithDefaultFlags(keys, Write, Key);
Txn txn = writeTxn(keys);
Result result =
getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn));
assertEquals(MockStore.RESULT, result);
@@ -79,8 +77,8 @@ public class CoordinateTransactionTest
Node node = cluster.get(1);
assertNotNull(node);
- TxnId txnId = node.nextTxnIdWithDefaultFlags(Read, Range);
Ranges keys = ranges(range(1, 2));
+ TxnId txnId = node.nextTxnIdWithDefaultFlags(keys, Read, Range);
Txn txn = writeTxn(keys);
Result result =
getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn));
assertEquals(MockStore.RESULT, result);
@@ -95,10 +93,11 @@ public class CoordinateTransactionTest
Node node = cluster.get(1);
assertNotNull(node);
- TxnId oldId1 = node.nextTxnIdWithDefaultFlags(Write, Key);
- TxnId oldId2 = node.nextTxnIdWithDefaultFlags(Write, Key);
+ Ranges syncRanges = ranges(range(0, 1));
+ TxnId oldId1 = node.nextTxnIdWithDefaultFlags(syncRanges, Write,
Key);
+ TxnId oldId2 = node.nextTxnIdWithDefaultFlags(syncRanges, Write,
Key);
- getUninterruptibly(CoordinateSyncPoint.exclusive(node,
ranges(range(0, 1))));
+ getUninterruptibly(CoordinateSyncPoint.exclusive(node,
syncRanges));
try
{
Keys keys = keys(1);
@@ -135,7 +134,7 @@ public class CoordinateTransactionTest
private TxnId coordinate(Node node, long clock, Keys keys) throws Throwable
{
- TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key);
+ TxnId txnId = node.nextTxnIdWithDefaultFlags(keys, Write, Key);
txnId = new TxnId(txnId.epoch(), txnId.hlc() + clock, 0, Write, Key,
txnId.node);
Txn txn = writeTxn(keys);
Result result =
getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn));
@@ -199,9 +198,9 @@ public class CoordinateTransactionTest
Node node = cluster.get(1);
assertNotNull(node);
- TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key);
Keys oneKey = keys(10);
Keys twoKeys = keys(10, 20);
+ TxnId txnId = node.nextTxnIdWithDefaultFlags(twoKeys, Write, Key);
Txn txn = new Txn.InMemory(oneKey, MockStore.read(oneKey),
MockStore.QUERY, MockStore.update(twoKeys));
Result result =
getUninterruptibly(CoordinateTransaction.coordinate(node, txnId, txn));
assertEquals(MockStore.RESULT, result);
diff --git
a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
index 98c27375..0acc3997 100644
--- a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
+++ b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
@@ -69,7 +69,7 @@ public class TopologyChangeTest
.build())
{
Node node1 = cluster.get(1);
- TxnId txnId1 = node1.nextTxnIdWithDefaultFlags(Write, Key);
+ TxnId txnId1 = node1.nextTxnIdWithDefaultFlags(keys, Write, Key);
Txn txn1 = writeTxn(keys);
getUninterruptibly(node1.coordinate(txnId1, txn1));
@@ -93,7 +93,7 @@ public class TopologyChangeTest
});
Node node4 = cluster.get(4);
- TxnId txnId2 = node4.nextTxnIdWithDefaultFlags(Write, Key);
+ TxnId txnId2 = node4.nextTxnIdWithDefaultFlags(keys, Write, Key);
Txn txn2 = writeTxn(keys);
getUninterruptibly(node4.coordinate(txnId2, txn2));
@@ -202,7 +202,7 @@ public class TopologyChangeTest
});
Node node4 = cluster.get(4);
- TxnId epoch2txnId = node4.nextTxnIdWithDefaultFlags(Write, Key);
+ TxnId epoch2txnId = node4.nextTxnIdWithDefaultFlags(keys, Write,
Key);
Assertions.assertEquals(2, epoch2txnId.epoch());
cluster.nodes(1, 2, 3, 4, 5).forEach(node ->
node.topology().reportTopology(topology3));
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 266900a0..c5a54bb2 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -63,6 +63,7 @@ import accord.primitives.Status;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
+import accord.topology.TopologyMismatch;
import accord.topology.TopologyRetiredException;
import accord.utils.Invariants;
import accord.utils.RandomSource;
@@ -167,7 +168,7 @@ public class ListAgent implements InMemoryAgent,
CoordinatorEventListener, Owner
ownershipEventListener.onFailedBootstrap(attempt, phase, ranges,
retry, fail, failure);
}
- private static final Set<Class<?>> expectedExceptions = new
HashSet<>(Arrays.asList(SimulatedFault.class,
ExecuteSyncPoint.SyncPointErased.class, CancellationException.class,
TopologyRetiredException.class, Snapshotter.SnapshotAborted.class,
TimeoutException.class, LogUnavailableException.class));
+ private static final Set<Class<?>> expectedExceptions = new
HashSet<>(Arrays.asList(SimulatedFault.class,
ExecuteSyncPoint.SyncPointErased.class, CancellationException.class,
TopologyRetiredException.class, TopologyMismatch.class,
Snapshotter.SnapshotAborted.class, TimeoutException.class,
LogUnavailableException.class));
@Override
public void onException(Throwable t)
{
diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java
b/accord-core/src/test/java/accord/local/CommandsTest.java
index 0c912297..492b43de 100644
--- a/accord-core/src/test/java/accord/local/CommandsTest.java
+++ b/accord-core/src/test/java/accord/local/CommandsTest.java
@@ -95,7 +95,7 @@ class CommandsTest
Keys keys =
Keys.of(Gens.lists(keyGen).unique().ofSizeBetween(1, 10).next(rs));
Txn txn = listWriteTxn(from, keys);
- TxnId txnId = node.nextTxnIdWithDefaultFlags(Write,
Routable.Domain.Key);
+ TxnId txnId = node.nextTxnIdWithDefaultFlags(keys, Write,
Routable.Domain.Key);
for (Node n : nodeMap.values())
node.topology().reportTopology(updatedTopology);
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index dc41d3e9..037325fb 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -157,9 +157,9 @@ public class ImmutableCommandTest
CommandStoreSupport support = new CommandStoreSupport();
Node node = createNode(ID1, support);
CommandStore commands = node.unsafeByIndex(0);
- TxnId txnId = node.nextTxnIdWithDefaultFlags(Write, Key);
- ((MockCluster.Clock)node.time()).increment(10);
Keys keys = Keys.of(KEY);
+ TxnId txnId = node.nextTxnIdWithDefaultFlags(keys, Write, Key);
+ ((MockCluster.Clock)node.time()).increment(10);
Txn txn = writeTxn(keys);
{
diff --git a/accord-core/src/test/java/accord/messages/ReadDataTest.java
b/accord-core/src/test/java/accord/messages/ReadDataTest.java
index 89533fd4..535ab41f 100644
--- a/accord-core/src/test/java/accord/messages/ReadDataTest.java
+++ b/accord-core/src/test/java/accord/messages/ReadDataTest.java
@@ -97,8 +97,8 @@ class ReadDataTest
MessageSink sink = Mockito.mock(MessageSink.class);
Node node = createNode(ID1, TOPOLOGY, sink, new
MockCluster.Clock(100));
- TxnId txnId = node.nextTxnIdWithDefaultFlags(Txn.Kind.Write, Key);
Keys keys = Keys.of(IntKey.key(1), IntKey.key(43));
+ TxnId txnId = node.nextTxnIdWithDefaultFlags(keys, Txn.Kind.Write,
Key);
AsyncResults.SettableResult<Data> readResult = new
AsyncResults.SettableResult<>();
diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
index 1a3c6d42..1031ab78 100644
--- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
@@ -103,6 +103,8 @@ public class TopologyRandomizer
private final Map<Id, Map<Long, Ranges>> bootstrapping = new HashMap<>();
private final Map<Id, Integer> bootstrappingGeneration = new HashMap<>();
private final ConcurrentLinkedQueue<Integer> newPrefixes = new
ConcurrentLinkedQueue<>();
+ // TODO (required): remove this restriction, we should be able to
replicate previously owned ranges just fine
+ private final Map<Id, Ranges> previouslyReplicated = new HashMap<>();
private final TopologyUpdates topologyUpdates;
private final Listener listener;
@@ -112,6 +114,8 @@ public class TopologyRandomizer
this.topologyUpdates = topologyUpdates;
this.epochs.add(Topology.EMPTY);
this.epochs.add(initialTopology);
+ for (Id node : initialTopology.nodes())
+ previouslyReplicated.put(node,
initialTopology.rangesForNode(node));
this.nodeLookup = nodeLookup;
this.listener = listener;
for (int prefix : prefixes)
@@ -466,7 +470,8 @@ public class TopologyRandomizer
state.shards = newShards;
Shard[] testShards = type.apply(state, random);
Arrays.sort(testShards, (a, b) -> a.range.compareTo(b.range));
- if (!everyShardHasQuorumOverlaps(oldShards, testShards))
+ if (!everyShardHasQuorumOverlaps(oldShards, testShards)
+ || reassignsRanges(current, testShards, previouslyReplicated))
{
++rejectedMutations;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]