This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 29f0ccf  Add transaction expiration support
29f0ccf is described below

commit 29f0ccfcfac2829b1278fde6c00b9c5502bd53d6
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Sep 19 11:52:15 2022 +0100

    Add transaction expiration support
    
    patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18041
---
 accord-core/src/main/java/accord/api/Agent.java    |   3 +
 .../main/java/accord/coordinate/Coordinate.java    |  27 +++++-
 .../java/accord/coordinate/CoordinateFailed.java   |  23 ++++-
 .../java/accord/coordinate/InformHomeOfTxn.java    |   1 -
 .../main/java/accord/coordinate/Invalidate.java    |  16 ++--
 .../main/java/accord/coordinate/MaybeRecover.java  |   2 +
 .../src/main/java/accord/coordinate/Propose.java   |  42 ++++++---
 .../src/main/java/accord/coordinate/Recover.java   | 101 ++++++++++-----------
 .../src/main/java/accord/coordinate/Timeout.java   |   7 --
 .../java/accord/impl/InMemoryCommandStore.java     |  18 ++--
 .../src/main/java/accord/local/Command.java        |   4 +-
 .../src/main/java/accord/local/CommandsForKey.java |   4 +-
 .../main/java/accord/local/NodeTimeService.java    |   1 +
 .../main/java/accord/local/SafeCommandStore.java   |   3 +-
 .../src/main/java/accord/primitives/Timestamp.java |   1 -
 .../src/test/java/accord/burn/BurnTest.java        |   2 +-
 .../src/test/java/accord/burn/TopologyUpdates.java |   2 +-
 .../src/test/java/accord/impl/TestAgent.java       |   9 ++
 .../src/test/java/accord/impl/basic/Cluster.java   |   8 +-
 .../src/test/java/accord/impl/list/ListAgent.java  |  13 ++-
 .../test/java/accord/impl/list/ListRequest.java    |   4 +-
 .../verify/StrictSerializabilityVerifier.java      |   1 +
 .../main/java/accord/maelstrom/MaelstromAgent.java |  10 +-
 23 files changed, 193 insertions(+), 109 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Agent.java 
b/accord-core/src/main/java/accord/api/Agent.java
index c7d105d..83f51a7 100644
--- a/accord-core/src/main/java/accord/api/Agent.java
+++ b/accord-core/src/main/java/accord/api/Agent.java
@@ -21,6 +21,7 @@ package accord.api;
 import accord.local.Node;
 import accord.local.Command;
 import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 
 /**
  * Facility for augmenting node behaviour at specific points
@@ -49,4 +50,6 @@ public interface Agent extends UncaughtExceptionListener
     void onUncaughtException(Throwable t);
 
     void onHandledException(Throwable t);
+
+    boolean isExpired(TxnId initiated, long now);
 }
diff --git a/accord-core/src/main/java/accord/coordinate/Coordinate.java 
b/accord-core/src/main/java/accord/coordinate/Coordinate.java
index 8cddb37..a126dc9 100644
--- a/accord-core/src/main/java/accord/coordinate/Coordinate.java
+++ b/accord-core/src/main/java/accord/coordinate/Coordinate.java
@@ -45,6 +45,9 @@ import com.google.common.collect.Sets;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.Future;
 
+import static accord.coordinate.Propose.Invalidate.proposeInvalidate;
+import static accord.messages.Commit.Invalidate.commitInvalidate;
+
 /**
  * Perform initial rounds of PreAccept and Accept until we have reached 
agreement about when we should execute.
  * If we are preempted by a recovery coordinator, we abort and let them 
complete (and notify us about the execution result)
@@ -276,15 +279,33 @@ public class Coordinate extends AsyncFuture<Result> 
implements Callback<PreAccep
 
             // TODO: perhaps don't submit Accept immediately if we almost have 
enough for fast-path,
             //       but by sending accept we rule out hybrid fast-path
-            node.withEpoch(executeAt.epoch, () -> Propose.propose(node, 
tracker.topologies(), Ballot.ZERO, txnId, txn, route, executeAt, deps, this));
+            // TODO: if we receive a MAX response, perhaps defer to permit at 
least one other node to respond before invalidating
+            if (node.agent().isExpired(txnId, executeAt.real))
+            {
+                proposeInvalidate(node, Ballot.ZERO, txnId, route.homeKey, 
(success, fail) -> {
+                    if (fail != null)
+                    {
+                        accept(null, fail);
+                    }
+                    else
+                    {
+                        commitInvalidate(node, txnId, route, executeAt);
+                        accept(null, new Timeout(txnId, route.homeKey));
+                    }
+                });
+            }
+            else
+            {
+                node.withEpoch(executeAt.epoch, () -> Propose.propose(node, 
tracker.topologies(), Ballot.ZERO, txnId, txn, route, executeAt, deps, this));
+            }
         }
     }
 
     @Override
     public void accept(Result success, Throwable failure)
     {
-        if (failure instanceof Timeout)
-            failure = ((Timeout) failure).with(txnId, route.homeKey);
+        if (failure instanceof CoordinateFailed)
+            ((CoordinateFailed) failure).set(txnId, route.homeKey);
 
         if (success != null) trySuccess(success);
         else tryFailure(failure);
diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateFailed.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateFailed.java
index ee19884..ccd1e79 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateFailed.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateFailed.java
@@ -28,11 +28,30 @@ import accord.primitives.TxnId;
  */
 public class CoordinateFailed extends Throwable
 {
-    public final TxnId txnId;
-    public final @Nullable RoutingKey homeKey;
+    private @Nullable TxnId txnId;
+    private @Nullable RoutingKey homeKey;
     public CoordinateFailed(TxnId txnId, @Nullable RoutingKey homeKey)
     {
         this.txnId = txnId;
         this.homeKey = homeKey;
     }
+
+    void set(TxnId txnId, RoutingKey homeKey)
+    {
+        if (this.txnId == null && txnId != null)
+            this.txnId = txnId;
+
+        if (this.homeKey == null && homeKey != null)
+            this.homeKey = homeKey;
+    }
+
+    public @Nullable TxnId txnId()
+    {
+        return txnId;
+    }
+
+    public @Nullable RoutingKey homeKey()
+    {
+        return homeKey;
+    }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java 
b/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
index b9a8c92..363e28f 100644
--- a/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
+++ b/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
@@ -46,7 +46,6 @@ public class InformHomeOfTxn extends AsyncFuture<Void> 
implements Callback<Simpl
 
     public static Future<Void> inform(Node node, TxnId txnId, RoutingKey 
homeKey)
     {
-        // TODO: we should not need to send the Txn here, but to avoid that we 
need to support no-ops
         return node.withEpoch(txnId.epoch, () -> {
             Shard homeShard = node.topology().forEpoch(homeKey, txnId.epoch);
             InformHomeOfTxn inform = new InformHomeOfTxn(txnId, homeKey, 
homeShard);
diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java 
b/accord-core/src/main/java/accord/coordinate/Invalidate.java
index fe98c9a..1aae425 100644
--- a/accord-core/src/main/java/accord/coordinate/Invalidate.java
+++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java
@@ -128,10 +128,17 @@ public class Invalidate implements 
Callback<InvalidateReply>
             switch (maxStatus)
             {
                 default: throw new IllegalStateException();
+                case AcceptedInvalidate:
+                    // latest accept also invalidating, so we're on the same 
page and should finish our invalidation
                 case NotWitnessed:
                     break;
-                case PreAccepted:
+
+                    case PreAccepted:
                 case Accepted:
+                    // note: we do not attempt to calculate PreAccept outcome 
here, we rely on the caller to tell us
+                    // what is safe to do. If the caller knows no decision was 
reached with PreAccept, we can safely
+                    // invalidate if we see PreAccept, and only need to 
recover if we see Accept
+                    // TODO: if we see Accept, go straight to propose to save 
some unnecessary work
                     if (recoverIfAtLeast.compareTo(maxStatus) > 0)
                         break;
 
@@ -139,6 +146,7 @@ public class Invalidate implements Callback<InvalidateReply>
                 case ReadyToExecute:
                 case PreApplied:
                 case Applied:
+                    // TODO: if we see Committed or above, go straight to 
Execute if we have assembled enough information
                     if (route != null)
                     {
                         RecoverWithRoute.recover(node, ballot, txnId, route, 
callback);
@@ -157,9 +165,6 @@ public class Invalidate implements Callback<InvalidateReply>
                     }
                     return;
 
-                case AcceptedInvalidate:
-                    break; // latest accept also invalidating, so we're on the 
same page and should finish our invalidation
-
                 case Invalidated:
                     isDone = true;
                     node.forEachLocalSince(contextFor(txnId), informKeys, 
txnId, safeStore -> {
@@ -173,9 +178,8 @@ public class Invalidate implements Callback<InvalidateReply>
 
         // if we have witnessed the transaction, but are able to invalidate, 
do we want to proceed?
         // Probably simplest to do so, but perhaps better for user if we don't.
-        proposeInvalidate(node, ballot, txnId, 
invalidateWithKey).addCallback((success, fail) -> {
+        proposeInvalidate(node, ballot, txnId, invalidateWithKey, (success, 
fail) -> {
             isDone = true;
-
             if (fail != null)
             {
                 callback.accept(null, fail);
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java 
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index 88dec50..d360f2d 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -94,6 +94,8 @@ public class MaybeRecover extends CheckShards
                     {
                         // order important, as route could be a Route which 
does not implement RoutingKeys.union
                         RoutingKeys someKeys = 
reduceNonNull(RoutingKeys::union, this.contactKeys, merged.route, route);
+                        // for correctness reasons, we have not necessarily 
preempted the initial pre-accept round and
+                        // may have raced with it, so we must attempt to 
recover anything we see pre-accepted.
                         Invalidate.invalidateIfNotWitnessed(node, txnId, 
someKeys, homeKey, callback);
                         break;
                     }
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java 
b/accord-core/src/main/java/accord/coordinate/Propose.java
index 7340804..6126e41 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -40,7 +40,7 @@ import accord.primitives.TxnId;
 import accord.messages.Accept;
 import accord.messages.Accept.AcceptOk;
 import accord.messages.Accept.AcceptReply;
-import org.apache.cassandra.utils.concurrent.AsyncFuture;
+import com.google.common.base.Preconditions;
 
 class Propose implements Callback<AcceptReply>
 {
@@ -132,59 +132,73 @@ class Propose implements Callback<AcceptReply>
     }
 
     // A special version for proposing the invalidation of a transaction; only 
needs to succeed on one shard
-    static class Invalidate extends AsyncFuture<Void> implements 
Callback<AcceptReply>
+    static class Invalidate implements Callback<AcceptReply>
     {
         final Node node;
         final Ballot ballot;
         final TxnId txnId;
-        final RoutingKey someKey;
+        final RoutingKey invalidateWithKey;
+        final BiConsumer<Void, Throwable> callback;
 
         private final QuorumShardTracker acceptTracker;
+        private boolean isDone;
 
-        Invalidate(Node node, Shard shard, Ballot ballot, TxnId txnId, 
RoutingKey someKey)
+        Invalidate(Node node, Shard shard, Ballot ballot, TxnId txnId, 
RoutingKey invalidateWithKey, BiConsumer<Void, Throwable> callback)
         {
             this.node = node;
             this.acceptTracker = new QuorumShardTracker(shard);
             this.ballot = ballot;
             this.txnId = txnId;
-            this.someKey = someKey;
+            this.invalidateWithKey = invalidateWithKey;
+            this.callback = callback;
         }
 
-        public static Invalidate proposeInvalidate(Node node, Ballot ballot, 
TxnId txnId, RoutingKey someKey)
+        public static Invalidate proposeInvalidate(Node node, Ballot ballot, 
TxnId txnId, RoutingKey invalidateWithKey, BiConsumer<Void, Throwable> callback)
         {
-            Shard shard = node.topology().forEpochIfKnown(someKey, 
txnId.epoch);
-            Invalidate invalidate = new Invalidate(node, shard, ballot, txnId, 
someKey);
-            node.send(shard.nodes, to -> new Accept.Invalidate(ballot, txnId, 
someKey), invalidate);
+            Shard shard = node.topology().forEpochIfKnown(invalidateWithKey, 
txnId.epoch);
+            Invalidate invalidate = new Invalidate(node, shard, ballot, txnId, 
invalidateWithKey, callback);
+            node.send(shard.nodes, to -> new Accept.Invalidate(ballot, txnId, 
invalidateWithKey), invalidate);
             return invalidate;
         }
 
         @Override
         public void onSuccess(Id from, AcceptReply reply)
         {
-            if (isDone())
+            if (isDone)
                 return;
 
             if (!reply.isOk())
             {
-                tryFailure(new Preempted(txnId, null));
+                isDone = true;
+                callback.accept(null, new Preempted(txnId, null));
                 return;
             }
 
             if (acceptTracker.success(from))
-                trySuccess(null);
+            {
+                isDone = true;
+                callback.accept(null, null);
+            }
         }
 
         @Override
         public void onFailure(Id from, Throwable failure)
         {
             if (acceptTracker.failure(from))
-                tryFailure(new Timeout(txnId, null));
+            {
+                isDone = true;
+                callback.accept(null, new Timeout(txnId, null));
+            }
         }
 
         @Override
         public void onCallbackFailure(Id from, Throwable failure)
         {
-            tryFailure(failure);
+            if (isDone)
+                return;
+
+            isDone = true;
+            callback.accept(null, failure);
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java 
b/accord-core/src/main/java/accord/coordinate/Recover.java
index fefda23..37aab37 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 
 import accord.primitives.*;
+import accord.messages.Commit;
 import com.google.common.base.Preconditions;
 
 import accord.api.Result;
@@ -231,15 +232,8 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
             {
                 default: throw new IllegalStateException();
                 case Invalidated:
-                {
-                    Timestamp invalidateUntil = recoverOks.stream().map(ok -> 
ok.executeAt).reduce(txnId, Timestamp::max);
-                    node.withEpoch(invalidateUntil.epoch, () -> {
-                        commitInvalidate(node, txnId, route, invalidateUntil);
-                    });
-                    isDone = true;
-                    callback.accept(ProgressToken.INVALIDATED, null);
+                    commitInvalidate();
                     return;
-                }
 
                 case Applied:
                 case PreApplied:
@@ -273,22 +267,11 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
 
                 case Accepted:
                     // no need to preaccept the later round, as future 
operations always include every old epoch (until it is fully migrated)
-                    node.withEpoch(acceptOrCommit.executeAt.epoch, () -> {
-                        Propose.propose(node, ballot, txnId, txn, route, 
acceptOrCommit.executeAt, mergeDeps(), this);
-                    });
+                    propose(acceptOrCommit.executeAt, mergeDeps());
                     return;
 
                 case AcceptedInvalidate:
-                    proposeInvalidate(node, ballot, txnId, 
route.homeKey).addCallback((success, fail) -> {
-                        if (fail != null) accept(null, fail);
-                        else
-                        {
-                            Timestamp invalidateUntil = 
recoverOks.stream().map(ok -> ok.executeAt).reduce(txnId, Timestamp::max);
-                            commitInvalidate(node, txnId, route, 
invalidateUntil);
-                            isDone = true;
-                            callback.accept(ProgressToken.INVALIDATED, null);
-                        }
-                    });
+                    invalidate();
                     return;
 
                 case NotWitnessed:
@@ -297,48 +280,64 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
             }
         }
 
-        // should all be PreAccept
-        Deps deps = mergeDeps();
-        Deps earlierAcceptedNoWitness = Deps.merge(recoverOks, ok -> 
ok.earlierAcceptedNoWitness);
-        Deps earlierCommittedWitness = Deps.merge(recoverOks, ok -> 
ok.earlierCommittedWitness);
-        Timestamp maxExecuteAt = txnId;
-        boolean rejectsFastPath = false;
-        for (RecoverOk ok : recoverOks)
+        if (!tracker.hasMetFastPathCriteria())
         {
-            maxExecuteAt = Timestamp.max(maxExecuteAt, ok.executeAt);
-            rejectsFastPath |= ok.rejectsFastPath;
+            invalidate();
+            return;
         }
 
-        Timestamp executeAt;
-        if (rejectsFastPath || !tracker.hasMetFastPathCriteria())
-        {
-            executeAt = maxExecuteAt;
-        }
-        else
+        for (RecoverOk ok : recoverOks)
         {
-            
earlierAcceptedNoWitness.without(earlierCommittedWitness::contains);
-            if (!earlierAcceptedNoWitness.isEmpty())
+            if (ok.rejectsFastPath)
             {
-                // If there exist commands that were proposed an earlier 
execution time than us that have not witnessed us,
-                // we have to be certain these commands have not successfully 
committed without witnessing us (thereby
-                // ruling out a fast path decision for us and changing our 
recovery decision).
-                // So, we wait for these commands to finish committing before 
retrying recovery.
-                // TODO: check paper: do we assume that witnessing in 
PreAccept implies witnessing in Accept? Not guaranteed.
-                // See whitepaper for more details
-                awaitCommits(node, 
earlierAcceptedNoWitness).addCallback((success, failure) -> {
-                    if (failure != null) accept(null, failure);
-                    else retry();
-                });
+                invalidate();
                 return;
             }
-            executeAt = txnId;
         }
 
-        node.withEpoch(executeAt.epoch, () -> {
-            Propose.propose(node, ballot, txnId, txn, route, executeAt, deps, 
this);
+        // should all be PreAccept
+        Deps deps = mergeDeps();
+        Deps earlierAcceptedNoWitness = Deps.merge(recoverOks, ok -> 
ok.earlierAcceptedNoWitness);
+        Deps earlierCommittedWitness = Deps.merge(recoverOks, ok -> 
ok.earlierCommittedWitness);
+        earlierAcceptedNoWitness = 
earlierAcceptedNoWitness.without(earlierCommittedWitness::contains);
+        if (!earlierAcceptedNoWitness.isEmpty())
+        {
+            // If there exist commands that were proposed an earlier execution 
time than us that have not witnessed us,
+            // we have to be certain these commands have not successfully 
committed without witnessing us (thereby
+            // ruling out a fast path decision for us and changing our 
recovery decision).
+            // So, we wait for these commands to finish committing before 
retrying recovery.
+            // TODO: check paper: do we assume that witnessing in PreAccept 
implies witnessing in Accept? Not guaranteed.
+            // See whitepaper for more details
+            awaitCommits(node, earlierAcceptedNoWitness).addCallback((success, 
failure) -> {
+                if (failure != null) accept(null, failure);
+                else retry();
+            });
+            return;
+        }
+        propose(txnId, deps);
+    }
+
+    private void invalidate()
+    {
+        proposeInvalidate(node, ballot, txnId, route.homeKey, (success, fail) 
-> {
+            if (fail != null) accept(null, fail);
+            else commitInvalidate();
         });
     }
 
+    private void commitInvalidate()
+    {
+        Timestamp invalidateUntil = recoverOks.stream().map(ok -> 
ok.executeAt).reduce(txnId, Timestamp::max);
+        node.withEpoch(invalidateUntil.epoch, () -> 
Commit.Invalidate.commitInvalidate(node, txnId, route, invalidateUntil));
+        isDone = true;
+        callback.accept(ProgressToken.INVALIDATED, null);
+    }
+
+    private void propose(Timestamp executeAt, Deps deps)
+    {
+        node.withEpoch(executeAt.epoch, () -> Propose.propose(node, ballot, 
txnId, txn, route, executeAt, deps, this));
+    }
+
     private Deps mergeDeps()
     {
         KeyRanges ranges = recoverOks.stream().map(r -> 
r.deps.covering).reduce(KeyRanges::union).orElseThrow(NoSuchElementException::new);
diff --git a/accord-core/src/main/java/accord/coordinate/Timeout.java 
b/accord-core/src/main/java/accord/coordinate/Timeout.java
index a896247..ef31707 100644
--- a/accord-core/src/main/java/accord/coordinate/Timeout.java
+++ b/accord-core/src/main/java/accord/coordinate/Timeout.java
@@ -32,11 +32,4 @@ public class Timeout extends CoordinateFailed
     {
         super(txnId, homeKey);
     }
-
-    Timeout with(TxnId txnId, RoutingKey homeKey)
-    {
-        if (this.txnId == null || (this.homeKey == null && homeKey != null))
-            return new Timeout(txnId, homeKey);
-        return this;
-    }
 }
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 48ca781..dd3eaa6 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -132,12 +132,6 @@ public class InMemoryCommandStore
             return commandStore;
         }
 
-        @Override
-        public Timestamp uniqueNow(Timestamp atLeast)
-        {
-            return time.uniqueNow(atLeast);
-        }
-
         @Override
         public Agent agent()
         {
@@ -163,7 +157,17 @@ public class InMemoryCommandStore
         }
 
         @Override
-        public Timestamp maxConflict(Keys keys)
+        public Timestamp preaccept(TxnId txnId, Keys keys)
+        {
+            Timestamp max = maxConflict(keys);
+            long epoch = latestEpoch();
+            if (txnId.compareTo(max) > 0 && txnId.epoch >= epoch && 
!agent.isExpired(txnId, time.now()))
+                return txnId;
+
+            return time.uniqueNow(max);
+        }
+
+        private Timestamp maxConflict(Keys keys)
         {
             return keys.stream()
                     .map(this::maybeCommandsForKey)
diff --git a/accord-core/src/main/java/accord/local/Command.java 
b/accord-core/src/main/java/accord/local/Command.java
index 6143367..865211e 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -246,13 +246,11 @@ public abstract class Command implements CommandListener, 
BiConsumer<SafeCommand
 
         if (executeAt() == null)
         {
-            Timestamp max = safeStore.maxConflict(partialTxn.keys());
             TxnId txnId = txnId();
             // unlike in the Accord paper, we partition shards within a node, 
so that to ensure a total order we must either:
             //  - use a global logical clock to issue new timestamps; or
             //  - assign each shard _and_ process a unique id, and use both as 
components of the timestamp
-            setExecuteAt(txnId.compareTo(max) > 0 && txnId.epoch >= 
safeStore.latestEpoch()
-                    ? txnId : safeStore.uniqueNow(max));
+            setExecuteAt(safeStore.preaccept(txnId, partialTxn.keys()));
 
             if (status() == NotWitnessed)
                 setStatus(PreAccepted);
diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java 
b/accord-core/src/main/java/accord/local/CommandsForKey.java
index 6af70df..3779fe0 100644
--- a/accord-core/src/main/java/accord/local/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/CommandsForKey.java
@@ -83,7 +83,7 @@ public abstract class CommandsForKey implements 
CommandListener
     public abstract CommandTimeseries<TxnId> committedByExecuteAt();
 
     public abstract Timestamp max();
-    public abstract void updateMax(Timestamp timestamp);
+    protected abstract void updateMax(Timestamp timestamp);
 
     @Override
     public PreLoadContext listenerPreLoadContext(TxnId caller)
@@ -94,7 +94,7 @@ public abstract class CommandsForKey implements 
CommandListener
     @Override
     public void onChange(SafeCommandStore safeStore, Command command)
     {
-        logger.trace("cfk[{}]: updating as listener in response to change on 
{} with status {} ({})",
+        logger.trace("[{}]: updating as listener in response to change on {} 
with status {} ({})",
                      key(), command.txnId(), command.status(), command);
         updateMax(command.executeAt());
         switch (command.status())
diff --git a/accord-core/src/main/java/accord/local/NodeTimeService.java 
b/accord-core/src/main/java/accord/local/NodeTimeService.java
index 161f2b6..c675717 100644
--- a/accord-core/src/main/java/accord/local/NodeTimeService.java
+++ b/accord-core/src/main/java/accord/local/NodeTimeService.java
@@ -6,5 +6,6 @@ public interface NodeTimeService
 {
     Node.Id id();
     long epoch();
+    long now();
     Timestamp uniqueNow(Timestamp atLeast);
 }
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 9ca608c..8fda93f 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -60,12 +60,11 @@ public interface SafeCommandStore
 
     CommandStore commandStore();
     DataStore dataStore();
-    Timestamp uniqueNow(Timestamp atLeast);
     Agent agent();
     ProgressLog progressLog();
     CommandStore.RangesForEpoch ranges();
     long latestEpoch();
-    Timestamp maxConflict(Keys keys);
+    Timestamp preaccept(TxnId txnId, Keys keys);
 
     Future<Void> execute(PreLoadContext context, Consumer<? super 
SafeCommandStore> consumer);
     <T> Future<T> submit(PreLoadContext context, Function<? super 
SafeCommandStore, T> function);
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java 
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 0a02fab..c20d0e2 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -23,7 +23,6 @@ import accord.local.Node.Id;
 public class Timestamp implements Comparable<Timestamp>
 {
     public static final Timestamp NONE = new Timestamp(0, 0, 0, Id.NONE);
-    public static final Timestamp MAX = new Timestamp(Long.MAX_VALUE, 
Long.MAX_VALUE, Integer.MAX_VALUE, Id.MAX);
 
     public final long epoch;
     public final long real;
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTest.java
index d761e56..1fa99ab 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -268,7 +268,7 @@ public class BurnTest
     public static void main(String[] args) throws Exception
     {
 //        Long overrideSeed = null;
-        Long overrideSeed = 654750394218983453L;
+        Long overrideSeed = 1727794989115080196L;
         do
         {
             run(overrideSeed != null ? overrideSeed : 
ThreadLocalRandom.current().nextLong());
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java 
b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index 9178007..1593f01 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -192,7 +192,7 @@ public class TopologyUpdates
     {
         Topology syncTopology = 
node.configService().getTopologyForEpoch(syncEpoch);
         Topology localTopology = syncTopology.forNode(node.id());
-        Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> 
node.topology().withUnsyncedEpochs(cmd.route, syncEpoch).nodes();
+        Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> 
node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes();
 
         KeyRanges ranges = localTopology.ranges();
         Stream<MessageTask> messageStream = Stream.empty();
diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java 
b/accord-core/src/test/java/accord/impl/TestAgent.java
index 13313e1..b417dbc 100644
--- a/accord-core/src/test/java/accord/impl/TestAgent.java
+++ b/accord-core/src/test/java/accord/impl/TestAgent.java
@@ -23,6 +23,9 @@ import accord.api.Agent;
 import accord.api.Result;
 import accord.local.Command;
 import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+import java.util.concurrent.TimeUnit;
 
 public class TestAgent implements Agent
 {
@@ -49,4 +52,10 @@ public class TestAgent implements Agent
     public void onHandledException(Throwable t)
     {
     }
+
+    @Override
+    public boolean isExpired(TxnId initiated, long now)
+    {
+        return TimeUnit.SECONDS.convert(now - initiated.real, 
TimeUnit.MICROSECONDS) >= 10;
+    }
 }
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 9420b0b..a1f215e 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -54,6 +54,8 @@ import accord.topology.Topology;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 public class Cluster implements Scheduler
 {
     public static final Logger trace = 
LoggerFactory.getLogger("accord.impl.basic.Trace");
@@ -218,7 +220,7 @@ public class Cluster implements Scheduler
                 MessageSink messageSink = sinks.create(node, 
randomSupplier.get());
                 BurnTestConfigurationService configService = new 
BurnTestConfigurationService(node, messageSink, randomSupplier, topology, 
lookup::get, topologyUpdates);
                 lookup.put(node, new Node(node, messageSink, configService,
-                                          nowSupplier.get(), () -> new 
ListStore(node), new ListAgent(onFailure),
+                                          nowSupplier.get(), () -> new 
ListStore(node), new ListAgent(30L, onFailure),
                                           randomSupplier.get(), sinks, 
SimpleProgressLog::new, InMemoryCommandStores.Synchronized::new));
             }
 
@@ -228,8 +230,8 @@ public class Cluster implements Scheduler
                 Collections.shuffle(nodesList, shuffleRandom);
                 int partitionSize = 
shuffleRandom.nextInt((topologyFactory.rf+1)/2);
                 sinks.partitionSet = new LinkedHashSet<>(nodesList.subList(0, 
partitionSize));
-            }, 5L, TimeUnit.SECONDS);
-            Scheduled reconfigure = 
sinks.recurring(configRandomizer::maybeUpdateTopology, 1L, TimeUnit.SECONDS);
+            }, 5L, SECONDS);
+            Scheduled reconfigure = 
sinks.recurring(configRandomizer::maybeUpdateTopology, 1L, SECONDS);
 
             Packet next;
             while ((next = in.get()) != null)
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java 
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index c1f9d12..16fcc2f 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -26,13 +26,16 @@ import accord.api.Agent;
 import accord.api.Result;
 import accord.local.Command;
 import accord.primitives.Timestamp;
-import accord.primitives.Txn;
+import accord.primitives.TxnId;
 
 public class ListAgent implements Agent
 {
+    final long timeout;
     final Consumer<Throwable> onFailure;
-    public ListAgent(Consumer<Throwable> onFailure)
+
+    public ListAgent(long timeout, Consumer<Throwable> onFailure)
     {
+        this.timeout = timeout;
         this.onFailure = onFailure;
     }
 
@@ -63,4 +66,10 @@ public class ListAgent implements Agent
     public void onHandledException(Throwable t)
     {
     }
+
+    @Override
+    public boolean isExpired(TxnId initiated, long now)
+    {
+        return now - initiated.real >= timeout;
+    }
 }
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java 
b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index 96d1a96..1d1c443 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -109,8 +109,8 @@ public class ListRequest implements Request
             else if (fail instanceof CoordinateFailed)
             {
                 ((Cluster)node.scheduler()).onDone(() -> {
-                    RoutingKey homeKey = ((CoordinateFailed) fail).homeKey;
-                    TxnId txnId = ((CoordinateFailed) fail).txnId;
+                    RoutingKey homeKey = ((CoordinateFailed) fail).homeKey();
+                    TxnId txnId = ((CoordinateFailed) fail).txnId();
                     CheckOnResult.checkOnResult(node, txnId, homeKey, (s, f) 
-> {
                         switch (s)
                         {
diff --git 
a/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java 
b/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java
index 71aa49e..0cd3a07 100644
--- a/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java
+++ b/accord-core/src/test/java/accord/verify/StrictSerializabilityVerifier.java
@@ -712,6 +712,7 @@ public class StrictSerializabilityVerifier
     final int[] bufNewPeerSteps;
     final UnknownStepHolder[] bufUnknownSteps;
 
+    // TODO (soon): verify operations with unknown outcomes are finalised by 
the first operation that starts after the coordinator abandons the txn
     public StrictSerializabilityVerifier(int keyCount)
     {
         this.keyCount = keyCount;
diff --git 
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
index 9b29f91..729fa89 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
@@ -23,7 +23,9 @@ import accord.api.Agent;
 import accord.api.Result;
 import accord.local.Command;
 import accord.primitives.Timestamp;
-import accord.primitives.Txn;
+import accord.primitives.TxnId;
+
+import java.util.concurrent.TimeUnit;
 
 public class MaelstromAgent implements Agent
 {
@@ -54,4 +56,10 @@ public class MaelstromAgent implements Agent
     public void onHandledException(Throwable t)
     {
     }
+
+    @Override
+    public boolean isExpired(TxnId initiated, long now)
+    {
+        return TimeUnit.SECONDS.convert(now - initiated.real, 
TimeUnit.MICROSECONDS) >= 10;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to