This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 29f0ccf Add transaction expiration support
29f0ccf is described below
commit 29f0ccfcfac2829b1278fde6c00b9c5502bd53d6
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Sep 19 11:52:15 2022 +0100
Add transaction expiration support
patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18041
---
accord-core/src/main/java/accord/api/Agent.java | 3 +
.../main/java/accord/coordinate/Coordinate.java | 27 +++++-
.../java/accord/coordinate/CoordinateFailed.java | 23 ++++-
.../java/accord/coordinate/InformHomeOfTxn.java | 1 -
.../main/java/accord/coordinate/Invalidate.java | 16 ++--
.../main/java/accord/coordinate/MaybeRecover.java | 2 +
.../src/main/java/accord/coordinate/Propose.java | 42 ++++++---
.../src/main/java/accord/coordinate/Recover.java | 101 ++++++++++-----------
.../src/main/java/accord/coordinate/Timeout.java | 7 --
.../java/accord/impl/InMemoryCommandStore.java | 18 ++--
.../src/main/java/accord/local/Command.java | 4 +-
.../src/main/java/accord/local/CommandsForKey.java | 4 +-
.../main/java/accord/local/NodeTimeService.java | 1 +
.../main/java/accord/local/SafeCommandStore.java | 3 +-
.../src/main/java/accord/primitives/Timestamp.java | 1 -
.../src/test/java/accord/burn/BurnTest.java | 2 +-
.../src/test/java/accord/burn/TopologyUpdates.java | 2 +-
.../src/test/java/accord/impl/TestAgent.java | 9 ++
.../src/test/java/accord/impl/basic/Cluster.java | 8 +-
.../src/test/java/accord/impl/list/ListAgent.java | 13 ++-
.../test/java/accord/impl/list/ListRequest.java | 4 +-
.../verify/StrictSerializabilityVerifier.java | 1 +
.../main/java/accord/maelstrom/MaelstromAgent.java | 10 +-
23 files changed, 193 insertions(+), 109 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/Agent.java
b/accord-core/src/main/java/accord/api/Agent.java
index c7d105d..83f51a7 100644
--- a/accord-core/src/main/java/accord/api/Agent.java
+++ b/accord-core/src/main/java/accord/api/Agent.java
@@ -21,6 +21,7 @@ package accord.api;
import accord.local.Node;
import accord.local.Command;
import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
/**
* Facility for augmenting node behaviour at specific points
@@ -49,4 +50,6 @@ public interface Agent extends UncaughtExceptionListener
void onUncaughtException(Throwable t);
void onHandledException(Throwable t);
+
+ boolean isExpired(TxnId initiated, long now);
}
diff --git a/accord-core/src/main/java/accord/coordinate/Coordinate.java
b/accord-core/src/main/java/accord/coordinate/Coordinate.java
index 8cddb37..a126dc9 100644
--- a/accord-core/src/main/java/accord/coordinate/Coordinate.java
+++ b/accord-core/src/main/java/accord/coordinate/Coordinate.java
@@ -45,6 +45,9 @@ import com.google.common.collect.Sets;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.apache.cassandra.utils.concurrent.Future;
+import static accord.coordinate.Propose.Invalidate.proposeInvalidate;
+import static accord.messages.Commit.Invalidate.commitInvalidate;
+
/**
* Perform initial rounds of PreAccept and Accept until we have reached
agreement about when we should execute.
* If we are preempted by a recovery coordinator, we abort and let them
complete (and notify us about the execution result)
@@ -276,15 +279,33 @@ public class Coordinate extends AsyncFuture<Result>
implements Callback<PreAccep
// TODO: perhaps don't submit Accept immediately if we almost have
enough for fast-path,
// but by sending accept we rule out hybrid fast-path
- node.withEpoch(executeAt.epoch, () -> Propose.propose(node,
tracker.topologies(), Ballot.ZERO, txnId, txn, route, executeAt, deps, this));
+ // TODO: if we receive a MAX response, perhaps defer to permit at
least one other node to respond before invalidating
+ if (node.agent().isExpired(txnId, executeAt.real))
+ {
+ proposeInvalidate(node, Ballot.ZERO, txnId, route.homeKey,
(success, fail) -> {
+ if (fail != null)
+ {
+ accept(null, fail);
+ }
+ else
+ {
+ commitInvalidate(node, txnId, route, executeAt);
+ accept(null, new Timeout(txnId, route.homeKey));
+ }
+ });
+ }
+ else
+ {
+ node.withEpoch(executeAt.epoch, () -> Propose.propose(node,
tracker.topologies(), Ballot.ZERO, txnId, txn, route, executeAt, deps, this));
+ }
}
}
@Override
public void accept(Result success, Throwable failure)
{
- if (failure instanceof Timeout)
- failure = ((Timeout) failure).with(txnId, route.homeKey);
+ if (failure instanceof CoordinateFailed)
+ ((CoordinateFailed) failure).set(txnId, route.homeKey);
if (success != null) trySuccess(success);
else tryFailure(failure);
diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateFailed.java
b/accord-core/src/main/java/accord/coordinate/CoordinateFailed.java
index ee19884..ccd1e79 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateFailed.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateFailed.java
@@ -28,11 +28,30 @@ import accord.primitives.TxnId;
*/
public class CoordinateFailed extends Throwable
{
- public final TxnId txnId;
- public final @Nullable RoutingKey homeKey;
+ private @Nullable TxnId txnId;
+ private @Nullable RoutingKey homeKey;
public CoordinateFailed(TxnId txnId, @Nullable RoutingKey homeKey)
{
this.txnId = txnId;
this.homeKey = homeKey;
}
+
+ void set(TxnId txnId, RoutingKey homeKey)
+ {
+ if (this.txnId == null && txnId != null)
+ this.txnId = txnId;
+
+ if (this.homeKey == null && homeKey != null)
+ this.homeKey = homeKey;
+ }
+
+ public @Nullable TxnId txnId()
+ {
+ return txnId;
+ }
+
+ public @Nullable RoutingKey homeKey()
+ {
+ return homeKey;
+ }
}
diff --git a/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
b/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
index b9a8c92..363e28f 100644
--- a/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
+++ b/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
@@ -46,7 +46,6 @@ public class InformHomeOfTxn extends AsyncFuture<Void>
implements Callback<Simpl
public static Future<Void> inform(Node node, TxnId txnId, RoutingKey
homeKey)
{
- // TODO: we should not need to send the Txn here, but to avoid that we
need to support no-ops
return node.withEpoch(txnId.epoch, () -> {
Shard homeShard = node.topology().forEpoch(homeKey, txnId.epoch);
InformHomeOfTxn inform = new InformHomeOfTxn(txnId, homeKey,
homeShard);
diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java
b/accord-core/src/main/java/accord/coordinate/Invalidate.java
index fe98c9a..1aae425 100644
--- a/accord-core/src/main/java/accord/coordinate/Invalidate.java
+++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java
@@ -128,10 +128,17 @@ public class Invalidate implements
Callback<InvalidateReply>
switch (maxStatus)
{
default: throw new IllegalStateException();
+ case AcceptedInvalidate:
+ // latest accept also invalidating, so we're on the same
page and should finish our invalidation
case NotWitnessed:
break;
- case PreAccepted:
+
+ case PreAccepted:
case Accepted:
+ // note: we do not attempt to calculate PreAccept outcome
here, we rely on the caller to tell us
+ // what is safe to do. If the caller knows no decision was
reached with PreAccept, we can safely
+ // invalidate if we see PreAccept, and only need to
recover if we see Accept
+ // TODO: if we see Accept, go straight to propose to save
some unnecessary work
if (recoverIfAtLeast.compareTo(maxStatus) > 0)
break;
@@ -139,6 +146,7 @@ public class Invalidate implements Callback<InvalidateReply>
case ReadyToExecute:
case PreApplied:
case Applied:
+ // TODO: if we see Committed or above, go straight to
Execute if we have assembled enough information
if (route != null)
{
RecoverWithRoute.recover(node, ballot, txnId, route,
callback);
@@ -157,9 +165,6 @@ public class Invalidate implements Callback<InvalidateReply>
}
return;
- case AcceptedInvalidate:
- break; // latest accept also invalidating, so we're on the
same page and should finish our invalidation
-
case Invalidated:
isDone = true;
node.forEachLocalSince(contextFor(txnId), informKeys,
txnId, safeStore -> {
@@ -173,9 +178,8 @@ public class Invalidate implements Callback<InvalidateReply>
// if we have witnessed the transaction, but are able to invalidate,
do we want to proceed?
// Probably simplest to do so, but perhaps better for user if we don't.
- proposeInvalidate(node, ballot, txnId,
invalidateWithKey).addCallback((success, fail) -> {
+ proposeInvalidate(node, ballot, txnId, invalidateWithKey, (success,
fail) -> {
isDone = true;
-
if (fail != null)
{
callback.accept(null, fail);
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index 88dec50..d360f2d 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -94,6 +94,8 @@ public class MaybeRecover extends CheckShards
{
// order important, as route could be a Route which
does not implement RoutingKeys.union
RoutingKeys someKeys =
reduceNonNull(RoutingKeys::union, this.contactKeys, merged.route, route);
+ // for correctness reasons, we have not necessarily
preempted the initial pre-accept round and
+ // may have raced with it, so we must attempt to
recover anything we see pre-accepted.
Invalidate.invalidateIfNotWitnessed(node, txnId,
someKeys, homeKey, callback);
break;
}
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java
b/accord-core/src/main/java/accord/coordinate/Propose.java
index 7340804..6126e41 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -40,7 +40,7 @@ import accord.primitives.TxnId;
import accord.messages.Accept;
import accord.messages.Accept.AcceptOk;
import accord.messages.Accept.AcceptReply;
-import org.apache.cassandra.utils.concurrent.AsyncFuture;
+import com.google.common.base.Preconditions;
class Propose implements Callback<AcceptReply>
{
@@ -132,59 +132,73 @@ class Propose implements Callback<AcceptReply>
}
// A special version for proposing the invalidation of a transaction; only
needs to succeed on one shard
- static class Invalidate extends AsyncFuture<Void> implements
Callback<AcceptReply>
+ static class Invalidate implements Callback<AcceptReply>
{
final Node node;
final Ballot ballot;
final TxnId txnId;
- final RoutingKey someKey;
+ final RoutingKey invalidateWithKey;
+ final BiConsumer<Void, Throwable> callback;
private final QuorumShardTracker acceptTracker;
+ private boolean isDone;
- Invalidate(Node node, Shard shard, Ballot ballot, TxnId txnId,
RoutingKey someKey)
+ Invalidate(Node node, Shard shard, Ballot ballot, TxnId txnId,
RoutingKey invalidateWithKey, BiConsumer<Void, Throwable> callback)
{
this.node = node;
this.acceptTracker = new QuorumShardTracker(shard);
this.ballot = ballot;
this.txnId = txnId;
- this.someKey = someKey;
+ this.invalidateWithKey = invalidateWithKey;
+ this.callback = callback;
}
- public static Invalidate proposeInvalidate(Node node, Ballot ballot,
TxnId txnId, RoutingKey someKey)
+ public static Invalidate proposeInvalidate(Node node, Ballot ballot,
TxnId txnId, RoutingKey invalidateWithKey, BiConsumer<Void, Throwable> callback)
{
- Shard shard = node.topology().forEpochIfKnown(someKey,
txnId.epoch);
- Invalidate invalidate = new Invalidate(node, shard, ballot, txnId,
someKey);
- node.send(shard.nodes, to -> new Accept.Invalidate(ballot, txnId,
someKey), invalidate);
+ Shard shard = node.topology().forEpochIfKnown(invalidateWithKey,
txnId.epoch);
+ Invalidate invalidate = new Invalidate(node, shard, ballot, txnId,
invalidateWithKey, callback);
+ node.send(shard.nodes, to -> new Accept.Invalidate(ballot, txnId,
invalidateWithKey), invalidate);
return invalidate;
}
@Override
public void onSuccess(Id from, AcceptReply reply)
{
- if (isDone())
+ if (isDone)
return;
if (!reply.isOk())
{
- tryFailure(new Preempted(txnId, null));
+ isDone = true;
+ callback.accept(null, new Preempted(txnId, null));
return;
}
if (acceptTracker.success(from))
- trySuccess(null);
+ {
+ isDone = true;
+ callback.accept(null, null);
+ }
}
@Override
public void onFailure(Id from, Throwable failure)
{
if (acceptTracker.failure(from))
- tryFailure(new Timeout(txnId, null));
+ {
+ isDone = true;
+ callback.accept(null, new Timeout(txnId, null));
+ }
}
@Override
public void onCallbackFailure(Id from, Throwable failure)
{
- tryFailure(failure);
+ if (isDone)
+ return;
+
+ isDone = true;
+ callback.accept(null, failure);
}
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java
b/accord-core/src/main/java/accord/coordinate/Recover.java
index fefda23..37aab37 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import accord.primitives.*;
+import accord.messages.Commit;
import com.google.common.base.Preconditions;
import accord.api.Result;
@@ -231,15 +232,8 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
{
default: throw new IllegalStateException();
case Invalidated:
- {
- Timestamp invalidateUntil = recoverOks.stream().map(ok ->
ok.executeAt).reduce(txnId, Timestamp::max);
- node.withEpoch(invalidateUntil.epoch, () -> {
- commitInvalidate(node, txnId, route, invalidateUntil);
- });
- isDone = true;
- callback.accept(ProgressToken.INVALIDATED, null);
+ commitInvalidate();
return;
- }
case Applied:
case PreApplied:
@@ -273,22 +267,11 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
case Accepted:
// no need to preaccept the later round, as future
operations always include every old epoch (until it is fully migrated)
- node.withEpoch(acceptOrCommit.executeAt.epoch, () -> {
- Propose.propose(node, ballot, txnId, txn, route,
acceptOrCommit.executeAt, mergeDeps(), this);
- });
+ propose(acceptOrCommit.executeAt, mergeDeps());
return;
case AcceptedInvalidate:
- proposeInvalidate(node, ballot, txnId,
route.homeKey).addCallback((success, fail) -> {
- if (fail != null) accept(null, fail);
- else
- {
- Timestamp invalidateUntil =
recoverOks.stream().map(ok -> ok.executeAt).reduce(txnId, Timestamp::max);
- commitInvalidate(node, txnId, route,
invalidateUntil);
- isDone = true;
- callback.accept(ProgressToken.INVALIDATED, null);
- }
- });
+ invalidate();
return;
case NotWitnessed:
@@ -297,48 +280,64 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
}
}
- // should all be PreAccept
- Deps deps = mergeDeps();
- Deps earlierAcceptedNoWitness = Deps.merge(recoverOks, ok ->
ok.earlierAcceptedNoWitness);
- Deps earlierCommittedWitness = Deps.merge(recoverOks, ok ->
ok.earlierCommittedWitness);
- Timestamp maxExecuteAt = txnId;
- boolean rejectsFastPath = false;
- for (RecoverOk ok : recoverOks)
+ if (!tracker.hasMetFastPathCriteria())
{
- maxExecuteAt = Timestamp.max(maxExecuteAt, ok.executeAt);
- rejectsFastPath |= ok.rejectsFastPath;
+ invalidate();
+ return;
}
- Timestamp executeAt;
- if (rejectsFastPath || !tracker.hasMetFastPathCriteria())
- {
- executeAt = maxExecuteAt;
- }
- else
+ for (RecoverOk ok : recoverOks)
{
-
earlierAcceptedNoWitness.without(earlierCommittedWitness::contains);
- if (!earlierAcceptedNoWitness.isEmpty())
+ if (ok.rejectsFastPath)
{
- // If there exist commands that were proposed an earlier
execution time than us that have not witnessed us,
- // we have to be certain these commands have not successfully
committed without witnessing us (thereby
- // ruling out a fast path decision for us and changing our
recovery decision).
- // So, we wait for these commands to finish committing before
retrying recovery.
- // TODO: check paper: do we assume that witnessing in
PreAccept implies witnessing in Accept? Not guaranteed.
- // See whitepaper for more details
- awaitCommits(node,
earlierAcceptedNoWitness).addCallback((success, failure) -> {
- if (failure != null) accept(null, failure);
- else retry();
- });
+ invalidate();
return;
}
- executeAt = txnId;
}
- node.withEpoch(executeAt.epoch, () -> {
- Propose.propose(node, ballot, txnId, txn, route, executeAt, deps,
this);
+ // should all be PreAccept
+ Deps deps = mergeDeps();
+ Deps earlierAcceptedNoWitness = Deps.merge(recoverOks, ok ->
ok.earlierAcceptedNoWitness);
+ Deps earlierCommittedWitness = Deps.merge(recoverOks, ok ->
ok.earlierCommittedWitness);
+ earlierAcceptedNoWitness =
earlierAcceptedNoWitness.without(earlierCommittedWitness::contains);
+ if (!earlierAcceptedNoWitness.isEmpty())
+ {
+ // If there exist commands that were proposed an earlier execution
time than us that have not witnessed us,
+ // we have to be certain these commands have not successfully
committed without witnessing us (thereby
+ // ruling out a fast path decision for us and changing our
recovery decision).
+ // So, we wait for these commands to finish committing before
retrying recovery.
+ // TODO: check paper: do we assume that witnessing in PreAccept
implies witnessing in Accept? Not guaranteed.
+ // See whitepaper for more details
+ awaitCommits(node, earlierAcceptedNoWitness).addCallback((success,
failure) -> {
+ if (failure != null) accept(null, failure);
+ else retry();
+ });
+ return;
+ }
+ propose(txnId, deps);
+ }
+
+ private void invalidate()
+ {
+ proposeInvalidate(node, ballot, txnId, route.homeKey, (success, fail)
-> {
+ if (fail != null) accept(null, fail);
+ else commitInvalidate();
});
}
+ private void commitInvalidate()
+ {
+ Timestamp invalidateUntil = recoverOks.stream().map(ok ->
ok.executeAt).reduce(txnId, Timestamp::max);
+ node.withEpoch(invalidateUntil.epoch, () ->
Commit.Invalidate.commitInvalidate(node, txnId, route, invalidateUntil));
+ isDone = true;
+ callback.accept(ProgressToken.INVALIDATED, null);
+ }
+
+ private void propose(Timestamp executeAt, Deps deps)
+ {
+ node.withEpoch(executeAt.epoch, () -> Propose.propose(node, ballot,
txnId, txn, route, executeAt, deps, this));
+ }
+
private Deps mergeDeps()
{
KeyRanges ranges = recoverOks.stream().map(r ->
r.deps.covering).reduce(KeyRanges::union).orElseThrow(NoSuchElementException::new);
diff --git a/accord-core/src/main/java/accord/coordinate/Timeout.java
b/accord-core/src/main/java/accord/coordinate/Timeout.java
index a896247..ef31707 100644
--- a/accord-core/src/main/java/accord/coordinate/Timeout.java
+++ b/accord-core/src/main/java/accord/coordinate/Timeout.java
@@ -32,11 +32,4 @@ public class Timeout extends CoordinateFailed
{
super(txnId, homeKey);
}
-
- Timeout with(TxnId txnId, RoutingKey homeKey)
- {
- if (this.txnId == null || (this.homeKey == null && homeKey != null))
- return new Timeout(txnId, homeKey);
- return this;
- }
}
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 48ca781..dd3eaa6 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -132,12 +132,6 @@ public class InMemoryCommandStore
return commandStore;
}
- @Override
- public Timestamp uniqueNow(Timestamp atLeast)
- {
- return time.uniqueNow(atLeast);
- }
-
@Override
public Agent agent()
{
@@ -163,7 +157,17 @@ public class InMemoryCommandStore
}
@Override
- public Timestamp maxConflict(Keys keys)
+ public Timestamp preaccept(TxnId txnId, Keys keys)
+ {
+ Timestamp max = maxConflict(keys);
+ long epoch = latestEpoch();
+ if (txnId.compareTo(max) > 0 && txnId.epoch >= epoch &&
!agent.isExpired(txnId, time.now()))
+ return txnId;
+
+ return time.uniqueNow(max);
+ }
+
+ private Timestamp maxConflict(Keys keys)
{
return keys.stream()
.map(this::maybeCommandsForKey)
diff --git a/accord-core/src/main/java/accord/local/Command.java
b/accord-core/src/main/java/accord/local/Command.java
index 6143367..865211e 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -246,13 +246,11 @@ public abstract class Command implements CommandListener,
BiConsumer<SafeCommand
if (executeAt() == null)
{
- Timestamp max = safeStore.maxConflict(partialTxn.keys());
TxnId txnId = txnId();
// unlike in the Accord paper, we partition shards within a node,
so that to ensure a total order we must either:
// - use a global logical clock to issue new timestamps; or
// - assign each shard _and_ process a unique id, and use both as
components of the timestamp
- setExecuteAt(txnId.compareTo(max) > 0 && txnId.epoch >=
safeStore.latestEpoch()
- ? txnId : safeStore.uniqueNow(max));
+ setExecuteAt(safeStore.preaccept(txnId, partialTxn.keys()));
if (status() == NotWitnessed)
setStatus(PreAccepted);
diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java
b/accord-core/src/main/java/accord/local/CommandsForKey.java
index 6af70df..3779fe0 100644
--- a/accord-core/src/main/java/accord/local/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/CommandsForKey.java
@@ -83,7 +83,7 @@ public abstract class CommandsForKey implements
CommandListener
public abstract CommandTimeseries<TxnId> committedByExecuteAt();
public abstract Timestamp max();
- public abstract void updateMax(Timestamp timestamp);
+ protected abstract void updateMax(Timestamp timestamp);
@Override
public PreLoadContext listenerPreLoadContext(TxnId caller)
@@ -94,7 +94,7 @@ public abstract class CommandsForKey implements
CommandListener
@Override
public void onChange(SafeCommandStore safeStore, Command command)
{
- logger.trace("cfk[{}]: updating as listener in response to change on
{} with status {} ({})",
+ logger.trace("[{}]: updating as listener in response to change on {}
with status {} ({})",
key(), command.txnId(), command.status(), command);
updateMax(command.executeAt());
switch (command.status())
diff --git a/accord-core/src/main/java/accord/local/NodeTimeService.java
b/accord-core/src/main/java/accord/local/NodeTimeService.java
index 161f2b6..c675717 100644
--- a/accord-core/src/main/java/accord/local/NodeTimeService.java
+++ b/accord-core/src/main/java/accord/local/NodeTimeService.java
@@ -6,5 +6,6 @@ public interface NodeTimeService
{
Node.Id id();
long epoch();
+ long now();
Timestamp uniqueNow(Timestamp atLeast);
}
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 9ca608c..8fda93f 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -60,12 +60,11 @@ public interface SafeCommandStore
CommandStore commandStore();
DataStore dataStore();
- Timestamp uniqueNow(Timestamp atLeast);
Agent agent();
ProgressLog progressLog();
CommandStore.RangesForEpoch ranges();
long latestEpoch();
- Timestamp maxConflict(Keys keys);
+ Timestamp preaccept(TxnId txnId, Keys keys);
Future<Void> execute(PreLoadContext context, Consumer<? super
SafeCommandStore> consumer);
<T> Future<T> submit(PreLoadContext context, Function<? super
SafeCommandStore, T> function);
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 0a02fab..c20d0e2 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -23,7 +23,6 @@ import accord.local.Node.Id;
public class Timestamp implements Comparable<Timestamp>
{
public static final Timestamp NONE = new Timestamp(0, 0, 0, Id.NONE);
- public static final Timestamp MAX = new Timestamp(Long.MAX_VALUE,
Long.MAX_VALUE, Integer.MAX_VALUE, Id.MAX);
public final long epoch;
public final long real;
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java
b/accord-core/src/test/java/accord/burn/BurnTest.java
index d761e56..1fa99ab 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -268,7 +268,7 @@ public class BurnTest
public static void main(String[] args) throws Exception
{
// Long overrideSeed = null;
- Long overrideSeed = 654750394218983453L;
+ Long overrideSeed = 1727794989115080196L;
do
{
run(overrideSeed != null ? overrideSeed :
ThreadLocalRandom.current().nextLong());
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index 9178007..1593f01 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -192,7 +192,7 @@ public class TopologyUpdates
{
Topology syncTopology =
node.configService().getTopologyForEpoch(syncEpoch);
Topology localTopology = syncTopology.forNode(node.id());
- Function<CommandSync, Collection<Node.Id>> allNodes = cmd ->
node.topology().withUnsyncedEpochs(cmd.route, syncEpoch).nodes();
+ Function<CommandSync, Collection<Node.Id>> allNodes = cmd ->
node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes();
KeyRanges ranges = localTopology.ranges();
Stream<MessageTask> messageStream = Stream.empty();
diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java
b/accord-core/src/test/java/accord/impl/TestAgent.java
index 13313e1..b417dbc 100644
--- a/accord-core/src/test/java/accord/impl/TestAgent.java
+++ b/accord-core/src/test/java/accord/impl/TestAgent.java
@@ -23,6 +23,9 @@ import accord.api.Agent;
import accord.api.Result;
import accord.local.Command;
import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+import java.util.concurrent.TimeUnit;
public class TestAgent implements Agent
{
@@ -49,4 +52,10 @@ public class TestAgent implements Agent
public void onHandledException(Throwable t)
{
}
+
+ @Override
+ public boolean isExpired(TxnId initiated, long now)
+ {
+ return TimeUnit.SECONDS.convert(now - initiated.real,
TimeUnit.MICROSECONDS) >= 10;
+ }
}
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 9420b0b..a1f215e 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -54,6 +54,8 @@ import accord.topology.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
public class Cluster implements Scheduler
{
public static final Logger trace =
LoggerFactory.getLogger("accord.impl.basic.Trace");
@@ -218,7 +220,7 @@ public class Cluster implements Scheduler
MessageSink messageSink = sinks.create(node,
randomSupplier.get());
BurnTestConfigurationService configService = new
BurnTestConfigurationService(node, messageSink, randomSupplier, topology,
lookup::get, topologyUpdates);
lookup.put(node, new Node(node, messageSink, configService,
- nowSupplier.get(), () -> new
ListStore(node), new ListAgent(onFailure),
+ nowSupplier.get(), () -> new
ListStore(node), new ListAgent(30L, onFailure),
randomSupplier.get(), sinks,
SimpleProgressLog::new, InMemoryCommandStores.Synchronized::new));
}
@@ -228,8 +230,8 @@ public class Cluster implements Scheduler
Collections.shuffle(nodesList, shuffleRandom);
int partitionSize =
shuffleRandom.nextInt((topologyFactory.rf+1)/2);
sinks.partitionSet = new LinkedHashSet<>(nodesList.subList(0,
partitionSize));
- }, 5L, TimeUnit.SECONDS);
- Scheduled reconfigure =
sinks.recurring(configRandomizer::maybeUpdateTopology, 1L, TimeUnit.SECONDS);
+ }, 5L, SECONDS);
+ Scheduled reconfigure =
sinks.recurring(configRandomizer::maybeUpdateTopology, 1L, SECONDS);
Packet next;
while ((next = in.get()) != null)
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index c1f9d12..16fcc2f 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -26,13 +26,16 @@ import accord.api.Agent;
import accord.api.Result;
import accord.local.Command;
import accord.primitives.Timestamp;
-import accord.primitives.Txn;
+import accord.primitives.TxnId;
public class ListAgent implements Agent
{
+ final long timeout;
final Consumer<Throwable> onFailure;
- public ListAgent(Consumer<Throwable> onFailure)
+
+ public ListAgent(long timeout, Consumer<Throwable> onFailure)
{
+ this.timeout = timeout;
this.onFailure = onFailure;
}
@@ -63,4 +66,10 @@ public class ListAgent implements Agent
public void onHandledException(Throwable t)
{
}
+
+ @Override
+ public boolean isExpired(TxnId initiated, long now)
+ {
+ return now - initiated.real >= timeout;
+ }
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java
b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index 96d1a96..1d1c443 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -109,8 +109,8 @@ public class ListRequest implements Request
else if (fail instanceof CoordinateFailed)
{
((Cluster)node.scheduler()).onDone(() -> {
- RoutingKey homeKey = ((CoordinateFailed) fail).homeKey;
- TxnId txnId = ((CoordinateFailed) fail).txnId;
+ RoutingKey homeKey = ((CoordinateFailed) fail).homeKey();
+ TxnId txnId = ((CoordinateFailed) fail).txnId();
CheckOnResult.checkOnResult(node, txnId, homeKey, (s, f)
-> {
switch (s)
{
diff --git
a/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java
b/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java
index 71aa49e..0cd3a07 100644
--- a/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java
+++ b/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java
@@ -712,6 +712,7 @@ public class StrictSerializabilityVerifier
final int[] bufNewPeerSteps;
final UnknownStepHolder[] bufUnknownSteps;
+ // TODO (soon): verify operations with unknown outcomes are finalised by
the first operation that starts after the coordinator abandons the txn
public StrictSerializabilityVerifier(int keyCount)
{
this.keyCount = keyCount;
diff --git
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
index 9b29f91..729fa89 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
@@ -23,7 +23,9 @@ import accord.api.Agent;
import accord.api.Result;
import accord.local.Command;
import accord.primitives.Timestamp;
-import accord.primitives.Txn;
+import accord.primitives.TxnId;
+
+import java.util.concurrent.TimeUnit;
public class MaelstromAgent implements Agent
{
@@ -54,4 +56,10 @@ public class MaelstromAgent implements Agent
public void onHandledException(Throwable t)
{
}
+
+ @Override
+ public boolean isExpired(TxnId initiated, long now)
+ {
+ return TimeUnit.SECONDS.convert(now - initiated.real,
TimeUnit.MICROSECONDS) >= 10;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]