This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f40826db Accord Query Tracing Also improve:  - Metric listeners
f40826db is described below

commit f40826db9571af286a689deada1613ffc872e5f8
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Mon Jun 30 13:29:06 2025 +0100

    Accord Query Tracing
    Also improve:
     - Metric listeners
    
    patch by Benedict; reviewed by David Capwell for CASSANDRA-20773
---
 accord-core/src/main/java/accord/api/Agent.java    |  26 ++----
 .../java/accord/api/CoordinatorEventListener.java  |  11 +--
 .../src/main/java/accord/api/EventListener.java    |  84 -----------------
 .../main/java/accord/api/LocalEventListener.java   |  91 ++++++++++++++++++
 .../coordinate/AbstractCoordinatePreAccept.java    |  10 --
 .../accord/coordinate/CoordinateSyncPoint.java     |   8 +-
 .../accord/coordinate/CoordinateTransaction.java   |  15 +--
 .../java/accord/coordinate/ExecuteSyncPoint.java   |  21 +++--
 .../main/java/accord/coordinate/ExecuteTxn.java    |   3 +-
 .../main/java/accord/coordinate/MaybeRecover.java  |   2 +-
 .../src/main/java/accord/coordinate/Persist.java   |   3 +-
 .../src/main/java/accord/coordinate/Propose.java   |   9 +-
 .../src/main/java/accord/coordinate/Recover.java   |  10 +-
 .../main/java/accord/impl/AbstractReplayer.java    |  15 ++-
 .../impl/progresslog/DefaultProgressLog.java       |  22 +++--
 .../src/main/java/accord/local/Commands.java       | 102 +++++++++++++--------
 .../java/accord/messages/BeginInvalidation.java    |   2 +-
 .../main/java/accord/messages/InformDurable.java   |   4 +-
 .../src/main/java/accord/messages/ReadData.java    |   2 +
 .../src/test/java/accord/impl/TestAgent.java       |  21 ++---
 .../src/test/java/accord/impl/list/ListAgent.java  |  18 +++-
 .../java/accord/local/cfk/CommandsForKeyTest.java  |   6 --
 .../main/java/accord/maelstrom/MaelstromAgent.java |  12 ++-
 23 files changed, 266 insertions(+), 231 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Agent.java 
b/accord-core/src/main/java/accord/api/Agent.java
index 89016626..4dae236c 100644
--- a/accord-core/src/main/java/accord/api/Agent.java
+++ b/accord-core/src/main/java/accord/api/Agent.java
@@ -40,22 +40,11 @@ import accord.utils.async.AsyncChain;
 
 /**
  * Facility for augmenting node behaviour at specific points
- *
- * TODO (expected): rationalise LocalConfig and Agent
  */
 public interface Agent extends UncaughtExceptionListener
 {
     default @Nullable Tracing trace(TxnId txnId, TraceEventType eventType) { 
return null; }
 
-    /**
-     * For use by implementations to decide what to do about successfully 
recovered transactions.
-     * Specifically intended to define if and how they should inform clients 
of the result.
-     * e.g. in Maelstrom we send the full result directly, in other impls we 
may simply acknowledge success via the coordinator
-     *
-     * Note: may be invoked multiple times in different places
-     */
-    void onRecover(Node node, Result success, Throwable fail);
-
     /**
      * For use by implementations to decide what to do about timestamp 
inconsistency, i.e. two different timestamps
      * committed for the same transaction. This is a protocol consistency 
violation, potentially leading to non-linearizable
@@ -70,6 +59,16 @@ public interface Agent extends UncaughtExceptionListener
 
     void onStale(Timestamp staleSince, Ranges ranges);
 
+    default CoordinatorEventListener coordinatorEvents()
+    {
+        return CoordinatorEventListener.NOOP;
+    }
+
+    default LocalEventListener localEvents()
+    {
+        return LocalEventListener.NOOP;
+    }
+
     @Override
     void onUncaughtException(Throwable t);
     void onCaughtException(Throwable t, String context);
@@ -112,11 +111,6 @@ public interface Agent extends UncaughtExceptionListener
      */
     Txn emptySystemTxn(Kind kind, Domain domain);
 
-    default EventListener eventListener()
-    {
-        return EventListener.NOOP;
-    }
-
     /**
      * For each shard, select a small number of replicas that should be 
preferred for listening to progress updates
      * from {@code from}. This should be 1-2 nodes that will be contacted 
preferentially for progress to minimise
diff --git a/accord-core/src/main/java/accord/api/CoordinatorEventListener.java 
b/accord-core/src/main/java/accord/api/CoordinatorEventListener.java
index e44a7073..ed4dff11 100644
--- a/accord-core/src/main/java/accord/api/CoordinatorEventListener.java
+++ b/accord-core/src/main/java/accord/api/CoordinatorEventListener.java
@@ -27,22 +27,17 @@ import accord.primitives.Deps;
 import accord.primitives.Status.Durability;
 import accord.primitives.TxnId;
 
-// TODO (required): revisit the call-sites, and boundary with C*
 public interface CoordinatorEventListener
 {
-    default void onPreAccepted(TxnId txnId, Deps deps, boolean isStable)
+    default void onPreAccepted(TxnId txnId)
     {
     }
 
-    default void onAccepted(TxnId txnId, Ballot ballot, Deps deps, boolean 
isStable)
+    default void onAccepted(TxnId txnId, Ballot ballot)
     {
     }
 
-    default void onStabilised(TxnId txnId, Ballot ballot, Deps deps)
-    {
-    }
-
-    default void onExecuting(TxnId txnId, @Nullable Ballot ballot, @Nullable 
ExecutePath path)
+    default void onExecuting(TxnId txnId, @Nullable Ballot ballot, Deps deps, 
@Nullable ExecutePath path)
     {
     }
 
diff --git a/accord-core/src/main/java/accord/api/EventListener.java 
b/accord-core/src/main/java/accord/api/EventListener.java
deleted file mode 100644
index e1ed5089..00000000
--- a/accord-core/src/main/java/accord/api/EventListener.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.api;
-
-import accord.local.Command;
-import accord.primitives.Deps;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-
-// TODO (required): revisit the call-sites, and boundary with C*
-public interface EventListener
-{
-    default void onCommitted(Command cmd)
-    {
-    }
-
-    default void onStable(Command cmd)
-    {
-    }
-
-    default void onPreApplied(Command cmd)
-    {
-    }
-
-    default void onApplied(Command cmd, long applyStartTimestamp)
-    {
-    }
-
-    default void onFastPathTaken(TxnId txnId, Deps deps)
-    {
-    }
-
-    default void onMediumPathTaken(TxnId txnId, Deps deps)
-    {
-    }
-
-    default void onSlowPathTaken(TxnId txnId, Deps deps)
-    {
-    }
-
-    default void onRecover(TxnId txnId, Timestamp recoveryTimestamp)
-    {
-    }
-
-    default void onPreempted(TxnId txnId)
-    {
-    }
-
-    default void onTimeout(TxnId txnId)
-    {
-    }
-
-    default void onInvalidated(TxnId txnId)
-    {
-    }
-
-    default void onRejected(TxnId txnId)
-    {
-    }
-
-    default void onProgressLogSizeChange(TxnId txnId, int delta)
-    {
-    }
-
-    EventListener NOOP = new EventListener()
-    {
-    };
-}
diff --git a/accord-core/src/main/java/accord/api/LocalEventListener.java 
b/accord-core/src/main/java/accord/api/LocalEventListener.java
new file mode 100644
index 00000000..662b00af
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/LocalEventListener.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.api;
+
+import accord.local.Command;
+import accord.local.SafeCommandStore;
+import accord.primitives.SaveStatus;
+
+public interface LocalEventListener
+{
+    default void onRejectPreAccept(SafeCommandStore safeStore, Command cmd, 
Object reason)
+    {
+    }
+
+    default void onPreAccepted(SafeCommandStore safeStore, Command cmd)
+    {
+    }
+
+    default void onRejectPreNotAccept(SafeCommandStore safeStore, Command cmd, 
Object reason)
+    {
+    }
+
+    default void onPreNotAccepted(SafeCommandStore safeStore, Command cmd)
+    {
+    }
+
+    default void onRejectAccept(SafeCommandStore safeStore, Command cmd, 
Object reason)
+    {
+    }
+
+    default void onAccepted(SafeCommandStore safeStore, Command cmd)
+    {
+    }
+
+    default void onRejectNotAccept(SafeCommandStore safeStore, Command cmd, 
Object reason)
+    {
+    }
+
+    default void onNotAccepted(SafeCommandStore safeStore, Command cmd)
+    {
+    }
+
+    default void onRejectCommitOrStable(SafeCommandStore safeStore, SaveStatus 
commitOrStable, Command cmd, Object reason)
+    {
+    }
+
+    default void onReadWaiting(SafeCommandStore safeStore, Command cmd)
+    {
+    }
+
+    default void onReadStarted(SafeCommandStore safeStore, Command cmd)
+    {
+    }
+
+    default void onCommitted(SafeCommandStore safeStore, Command cmd)
+    {
+    }
+
+    default void onStable(SafeCommandStore safeStore, Command cmd)
+    {
+    }
+
+    default void onPreApplied(SafeCommandStore safeStore, Command cmd)
+    {
+    }
+
+    // t0 may be less than zero, indicating it has not been populated
+    default void onApplied(SafeCommandStore safeStore, Command cmd, long 
startedApplyAt)
+    {
+    }
+
+    LocalEventListener NOOP = new LocalEventListener()
+    {
+    };
+}
diff --git 
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java 
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
index 5a9cb827..6fd2d0b6 100644
--- 
a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
+++ 
b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java
@@ -114,16 +114,6 @@ abstract class AbstractCoordinatePreAccept<T, R> 
implements Callback<R>
         // we may already be complete, as we may receive a failure from a 
later phase; but it's fine to redundantly mark done
         isDone = true;
         callback.accept(null, failure);
-        if (failure instanceof CoordinationFailed)
-        {
-            ((CoordinationFailed) failure).set(txnId, route.homeKey());
-            if (failure instanceof Timeout)
-                node.agent().eventListener().onTimeout(txnId);
-            else if (failure instanceof Preempted)
-                node.agent().eventListener().onPreempted(txnId);
-            else if (failure instanceof Invalidated)
-                node.agent().eventListener().onInvalidated(txnId);
-        }
     }
 
     final void onPreAcceptedOrNewEpoch()
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
index 96e88457..c3321567 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
 import accord.api.Result;
 import accord.coordinate.CoordinationAdapter.Adapters;
 import accord.coordinate.CoordinationAdapter.Adapters.SyncPointAdapter;
-import accord.coordinate.ExecuteFlag.CoordinationFlags;
 import accord.coordinate.ExecuteFlag.ExecuteFlags;
 import accord.local.Node;
 import accord.local.SequentialAsyncExecutor;
@@ -54,7 +53,6 @@ import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 import accord.utils.async.AsyncResults.SettableByCallback;
 
-import static accord.coordinate.ExecutePath.FAST;
 import static accord.coordinate.Propose.NotAccept.proposeAndCommitInvalidate;
 import static accord.messages.Apply.Kind.Maximal;
 import static accord.messages.Apply.participates;
@@ -160,6 +158,7 @@ public class CoordinateSyncPoint<R> extends 
CoordinatePreAccept<R>
     {
         if (executeAt.is(REJECTED))
         {
+            node.agent().coordinatorEvents().onRejected(txnId);
             proposeAndCommitInvalidate(node, executor, Ballot.ZERO, txnId, 
route.homeKey(), route, executeAt, callback);
         }
         else
@@ -168,9 +167,8 @@ public class CoordinateSyncPoint<R> extends 
CoordinatePreAccept<R>
             if (txnId.is(ExclusiveSyncPoint) && txnId.epoch() == 
executeAt.epoch())
                 withFlags = txnId.addFlag(HLC_BOUND);
             Deps deps = Deps.merge(oks.valuesAsNullableList(), 
oks.domainSize(), List::get, ok -> ok.deps);
-            if (tracker.hasFastPathAccepted())
-                adapter.execute(node, executor, topologies, route, 
Ballot.ZERO, FAST, CoordinationFlags.none(), txnId, txn, withFlags, deps, deps, 
callback);
-            else if (tracker.hasMediumPathAccepted())
+            node.agent().coordinatorEvents().onPreAccepted(txnId);
+            if (tracker.hasMediumPathAccepted() && txnId.hasMediumPath())
                 adapter.propose(node, executor, topologies, route, 
Accept.Kind.MEDIUM, Ballot.ZERO, txnId, txn, withFlags, deps, callback);
             else
                 adapter.propose(node, executor, topologies, route, 
Accept.Kind.SLOW, Ballot.ZERO, txnId, txn, executeAt, deps, callback);
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
index c8963918..75d57c3a 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
@@ -39,7 +39,6 @@ import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
 import accord.local.SequentialAsyncExecutor;
 import accord.local.StoreParticipants;
-import accord.messages.Accept;
 import accord.messages.PreAccept;
 import accord.messages.PreAccept.PreAcceptNack;
 import accord.messages.PreAccept.PreAcceptReply;
@@ -69,6 +68,8 @@ import static 
accord.coordinate.ExecuteFlag.CoordinationFlags.empty;
 import static accord.coordinate.ExecutePath.FAST;
 import static accord.coordinate.Propose.NotAccept.proposeAndCommitInvalidate;
 import static accord.local.Commands.AcceptOutcome.Success;
+import static accord.messages.Accept.Kind.MEDIUM;
+import static accord.messages.Accept.Kind.SLOW;
 import static accord.primitives.Timestamp.Flag.REJECTED;
 import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithDeps;
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
@@ -128,8 +129,8 @@ public class CoordinateTransaction extends 
CoordinatePreAccept<Result>
                 // note: we merge all Deps regardless of witnessedAt. While we 
only need fast path votes,
                 // we must include Deps from fast path votes from earlier 
epochs that may have witnessed later transactions
                 // TODO (desired): we might mask some bugs by merging more 
responses than we strictly need, so optimise this to optionally merge minimal 
deps
+                node.agent().coordinatorEvents().onPreAccepted(txnId);
                 executeAdapter().execute(node, executor, topologies, route, 
Ballot.ZERO, FAST, flags, txnId, txn, txnId, deps, deps, callback);
-                node.agent().eventListener().onFastPathTaken(txnId, deps);
                 return;
             }
         }
@@ -138,21 +139,21 @@ public class CoordinateTransaction extends 
CoordinatePreAccept<Result>
             Deps deps = mergeFastOrMediumDeps(oks);
             if (deps != null)
             {
-                proposeAdapter().propose(node, executor, topologies, route, 
Accept.Kind.MEDIUM, Ballot.ZERO, txnId, txn, txnId, deps, callback);
-                node.agent().eventListener().onMediumPathTaken(txnId, deps);
+                node.agent().coordinatorEvents().onPreAccepted(txnId);
+                proposeAdapter().propose(node, executor, topologies, route, 
MEDIUM, Ballot.ZERO, txnId, txn, txnId, deps, callback);
                 return;
             }
         }
         else if (executeAt.is(REJECTED))
         {
             proposeAndCommitInvalidate(node, executor, Ballot.ZERO, txnId, 
route.homeKey(), route, executeAt, callback);
-            node.agent().eventListener().onRejected(txnId);
+            node.agent().coordinatorEvents().onRejected(txnId);
             return;
         }
 
         Deps deps = Deps.merge(oks.valuesAsNullableList(), oks.domainSize(), 
List::get, ok -> ok.deps);
-        proposeAdapter().propose(node, executor, topologies, route, 
Accept.Kind.SLOW, Ballot.ZERO, txnId, txn, executeAt, deps, callback);
-        node.agent().eventListener().onSlowPathTaken(txnId, deps);
+        node.agent().coordinatorEvents().onPreAccepted(txnId);
+        proposeAdapter().propose(node, executor, topologies, route, SLOW, 
Ballot.ZERO, txnId, txn, executeAt, deps, callback);
     }
 
     private Deps mergeFastOrMediumDeps(SortedListMap<?, PreAcceptOk> oks)
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index ce0d64fa..a37ff476 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -111,6 +111,17 @@ public class ExecuteSyncPoint extends 
SettableResult<DurabilityResult> implement
         return onQuorum;
     }
 
+    protected void start()
+    {
+        node.agent().coordinatorEvents().onExecuting(syncPoint.syncId, null, 
syncPoint.waitFor, null);
+        SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
+        // TODO (desired): special Apply message that doesn't resend deps if 
path=MEDIUM
+        Txn txn = node.agent().emptySystemTxn(Txn.Kind.ExclusiveSyncPoint, 
syncPoint.syncId.domain());
+        Result result = txn.result(syncPoint.syncId, syncPoint.executeAt, 
null);
+        if (contact == null) tryFailure(new Exhausted(syncPoint.syncId, 
syncPoint.route.homeKey(), null));
+        else node.send(contact, to -> new ApplyThenWaitUntilApplied(to, 
tracker.topologies(), syncPoint.executeAt, tracker.topologies().currentEpoch(), 
syncPoint.route, syncPoint.syncId, txn, syncPoint.waitFor, syncPoint.route, 
null, result), executor, this);
+    }
+
     @Override
     public void onSuccess(Node.Id from, ReadReply reply)
     {
@@ -269,14 +280,4 @@ public class ExecuteSyncPoint extends 
SettableResult<DurabilityResult> implement
             return fail;
         }
     }
-
-    protected void start()
-    {
-        SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
-        // TODO (desired): special Apply message that doesn't resend deps if 
path=MEDIUM
-        Txn txn = node.agent().emptySystemTxn(Txn.Kind.ExclusiveSyncPoint, 
syncPoint.syncId.domain());
-        Result result = txn.result(syncPoint.syncId, syncPoint.executeAt, 
null);
-        if (contact == null) tryFailure(new Exhausted(syncPoint.syncId, 
syncPoint.route.homeKey(), null));
-        else node.send(contact, to -> new ApplyThenWaitUntilApplied(to, 
tracker.topologies(), syncPoint.executeAt, tracker.topologies().currentEpoch(), 
syncPoint.route, syncPoint.syncId, txn, syncPoint.waitFor, syncPoint.route, 
null, result), executor, this);
-    }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
index 9c352de4..fe722a6d 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
@@ -98,7 +98,7 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
     ExecuteTxn(Node node, SequentialAsyncExecutor executor, Topologies 
topologies, FullRoute<?> route, Ballot ballot, ExecutePath path, 
CoordinationFlags flags, TxnId txnId, Txn txn, Timestamp executeAt, Deps 
stableDeps, Deps sendDeps, BiConsumer<? super Result, Throwable> callback)
     {
         super(node, executor, topologies.forEpoch(executeAt.epoch()), txnId);
-        this.path = path;
+        this.path = ballot == Ballot.ZERO ? path : RECOVER;
         this.txn = txn;
         this.route = route;
         this.ballot = ballot;
@@ -116,6 +116,7 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
     @Override
     protected void startOnceInitialised()
     {
+        node.agent().coordinatorEvents().onExecuting(txnId, ballot, 
stableDeps, path);
         Node.Id self = node.id();
         if (permitLocalExecution() && tryIfUniversal(self))
         {
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java 
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index 755e5d16..5c1fc3bf 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -136,7 +136,7 @@ public class MaybeRecover extends CheckShards<Route<?>>
                         if (tracing != null)
                             tracing.trace(null, "MaybeRecover found %s; 
reporting progress token %s", hasMadeProgress(full) ? "progress" : "no route", 
progressToken);
                         if (full.durability.isDurable())
-                            InformDurable.informDefault(node, topologies, 
txnId, query, full.executeAtIfKnown(), full.durability);
+                            InformDurable.informDefault(node, topologies, 
txnId, query, bumpBallot, full.executeAtIfKnown(), full.durability);
                         callback.accept(full.toProgressToken(), null);
                     }
                     else
diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java 
b/accord-core/src/main/java/accord/coordinate/Persist.java
index d80a7ca9..cd0d4b01 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -99,7 +99,7 @@ public abstract class Persist implements Callback<ApplyReply>
                     // since we should only invoke Persist with sendTo != 
route when the remainder of the route is already persisted and truncated
                     // but we make this explicit for the caller with 
informDurableOnDone
                     isDone = true;
-                    InformDurable.informDefault(node, topologies, txnId, 
route, executeAt, Majority);
+                    InformDurable.informDefault(node, topologies, txnId, 
route, ballot, executeAt, Majority);
                 }
             case RaceWithRecovery:
                 // don't count this towards durability; otherwise it is 
possible (though very unlikely)
@@ -128,6 +128,7 @@ public abstract class Persist implements 
Callback<ApplyReply>
 
     public void start(Apply.Kind kind, Topologies all, Writes writes, Result 
result)
     {
+        node.agent().coordinatorEvents().onExecuted(txnId, ballot);
         // applyMinimal is used for transaction execution by the original 
coordinator so it's important to use
         // Node's Apply factory in case the factory has to do synchronous 
Apply.
         SortedArrays.SortedArrayList<Node.Id> contact = 
tracker.filterAndRecordFaulty();
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java 
b/accord-core/src/main/java/accord/coordinate/Propose.java
index c59d9839..7cef6754 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -21,6 +21,7 @@ package accord.coordinate;
 import java.util.Map;
 import java.util.function.BiConsumer;
 
+import accord.api.CoordinatorEventListener;
 import accord.api.ProtocolModifiers.Faults;
 import accord.api.RoutingKey;
 import accord.coordinate.ExecuteFlag.CoordinationFlags;
@@ -197,9 +198,10 @@ abstract class Propose<R> implements Callback<AcceptReply>
         //  In this case either id' needs to wait (which requires potentially 
more states like the alternative medium path)
         //  Or we must pick it up as an Unstable dependency here.
         Deps newDeps = mergeNewDeps();
-        Deps stableDeps = mergeDeps(newDeps);
-        if (kind == Kind.MEDIUM) adapter().execute(node, executor, 
acceptTracker.topologies(), route, ballot, MEDIUM, CoordinationFlags.none(), 
txnId, txn, executeAt, stableDeps, newDeps, callback);
-        else adapter().stabilise(node, executor, acceptTracker.topologies(), 
route, ballot, txnId, txn, executeAt, stableDeps, callback);
+        Deps deps = mergeDeps(newDeps);
+        node.agent().coordinatorEvents().onAccepted(txnId, ballot);
+        if (kind == Kind.MEDIUM) adapter().execute(node, executor, 
acceptTracker.topologies(), route, ballot, MEDIUM, CoordinationFlags.none(), 
txnId, txn, executeAt, deps, newDeps, callback);
+        else adapter().stabilise(node, executor, acceptTracker.topologies(), 
route, ballot, txnId, txn, executeAt, deps, callback);
         if (!Invariants.debug()) acceptOks.clear();
     }
 
@@ -288,6 +290,7 @@ abstract class Propose<R> implements Callback<AcceptReply>
                 }
                 else
                 {
+                    node.agent().coordinatorEvents().onInvalidated(txnId);
                     node.withEpochExact(invalidateUntil.epoch(), executor, 
callback, t -> WrappableException.wrap(t), () -> {
                         commitInvalidate(node, txnId, commitInvalidationTo, 
invalidateUntil);
                         callback.accept(null, new Invalidated(txnId, 
invalidateWithParticipant));
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java 
b/accord-core/src/main/java/accord/coordinate/Recover.java
index b0be4183..0ea7d252 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -151,7 +151,6 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
         if (failure == null)
         {
             callback.accept(ProgressToken.APPLIED, null);
-            node.agent().eventListener().onRecover(txnId, ballot);
         }
         else if (failure instanceof Redundant)
         {
@@ -163,15 +162,9 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
         else
         {
             callback.accept(null, WrappableException.wrap(failure));
-            if (failure instanceof Preempted)
-                node.agent().eventListener().onPreempted(txnId);
-            else if (failure instanceof Timeout)
-                node.agent().eventListener().onTimeout(txnId);
-            else if (failure instanceof Invalidated) // TODO (expected): 
should we tick this counter? we haven't invalidated anything
-                node.agent().eventListener().onInvalidated(txnId);
         }
 
-        node.agent().onRecover(node, result, failure);
+        node.agent().coordinatorEvents().onRecoveryStopped(node, txnId, 
ballot, result, failure);
     }
 
     public static Recover recover(Node node, TxnId txnId, Txn txn, 
FullRoute<?> route, BiConsumer<Outcome, Throwable> callback, @Nullable Tracing 
tracing)
@@ -215,6 +208,7 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
 
     void start(Collection<Id> nodes)
     {
+        node.agent().coordinatorEvents().onRecoveryStarted(txnId, ballot);
         node.send(nodes, to -> new BeginRecovery(to, tracker.topologies(), 
txnId, committedExecuteAt, txn, route, ballot), executor, this);
     }
 
diff --git a/accord-core/src/main/java/accord/impl/AbstractReplayer.java 
b/accord-core/src/main/java/accord/impl/AbstractReplayer.java
index 88281e8c..2fd0830d 100644
--- a/accord-core/src/main/java/accord/impl/AbstractReplayer.java
+++ b/accord-core/src/main/java/accord/impl/AbstractReplayer.java
@@ -77,15 +77,12 @@ public abstract class AbstractReplayer implements 
Journal.Replayer
             {
                 CommandStore unsafeStore = safeStore.commandStore();
                 Participants<?> executes = 
command.participants().stillExecutes();
-                if (!executes.isEmpty())
-                {
-                    command.writes()
-                           .apply(safeStore, executes, command.partialTxn())
-                           .invoke(() -> 
unsafeStore.build(PreLoadContext.contextFor(txnId, "Replay"), ss -> {
-                               Commands.postApply(ss, txnId, -1, true);
-                           }))
-                           .begin(safeStore.agent());
-                }
+                command.writes()
+                       .apply(safeStore, executes, command.partialTxn())
+                       .invoke(() -> 
unsafeStore.build(PreLoadContext.contextFor(txnId, "Replay"), ss -> {
+                           Commands.postApply(ss, txnId, -1, true);
+                       }))
+                       .begin(safeStore.agent());
             }
             else Invariants.expect(command.hasBeen(Applied));
         }
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java 
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index 48a9a3d6..be43381c 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -150,7 +150,6 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         if (result == null)
         {
             Invariants.require(debugDeleted == null || 
!debugDeleted.containsKey(txnId));
-            node.agent().eventListener().onProgressLogSizeChange(txnId, 1);
             stateMap = BTree.update(stateMap, BTree.singleton(result = new 
TxnState(txnId)), TxnState::compareTo);
         }
         return result;
@@ -159,7 +158,6 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
     private TxnState insert(TxnId txnId)
     {
         Invariants.require(debugDeleted == null || 
!debugDeleted.containsKey(txnId));
-        node.agent().eventListener().onProgressLogSizeChange(txnId, 1);
         TxnState result = new TxnState(txnId);
         stateMap = BTree.update(stateMap, BTree.singleton(result), 
TxnState::compareTo);
         return result;
@@ -395,10 +393,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
 
     void remove(TxnId txnId)
     {
-        Object[] newStateMap = BTreeRemoval.<TxnId, TxnState>remove(stateMap, 
(id, s) -> id.compareTo(s.txnId), txnId);
-        if (stateMap != newStateMap)
-            node.agent().eventListener().onProgressLogSizeChange(txnId, -1);
-        stateMap = newStateMap;
+        stateMap = BTreeRemoval.<TxnId, TxnState>remove(stateMap, (id, s) -> 
id.compareTo(s.txnId), txnId);
         if (debugDeleted != null)
             debugDeleted.put(txnId, Thread.currentThread().getStackTrace());
     }
@@ -824,6 +819,21 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         return maxConcurrency;
     }
 
+    public int size()
+    {
+        return BTree.size(stateMap);
+    }
+
+    public int pendingHome()
+    {
+        return pendingHome.size();
+    }
+
+    public int pendingWaiting()
+    {
+        return pendingWaiting.size();
+    }
+
     public ImmutableView immutableView()
     {
         return new ImmutableView(commandStore.id(), stateMap);
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index cd511b8e..2f843fd7 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -156,28 +156,39 @@ public class Commands
     private static AcceptOutcome preacceptOrRecover(SafeCommandStore 
safeStore, SafeCommand safeCommand, StoreParticipants participants, SaveStatus 
newSaveStatus, TxnId txnId, Txn txn, @Nullable Deps deps, Ballot ballot)
     {
         final Command command = safeCommand.current();
-        if (command.hasBeen(Truncated))
+        int compareBallots = command.promised().compareTo(ballot);
+        if (command.hasBeen(Truncated) || compareBallots > 0)
         {
-            logger.trace("{}: skipping preaccept - command is truncated", 
txnId);
-            return command.is(Invalidated) ? AcceptOutcome.RejectedBallot : 
participants.owns().isEmpty()
+            AcceptOutcome outcome = !command.hasBeen(Truncated)
+                                        ? AcceptOutcome.RejectedBallot
+                                        : command.is(Invalidated)
+                                           ? AcceptOutcome.RejectedBallot
+                                           : participants.owns().isEmpty()
                                              ? AcceptOutcome.Retired : 
AcceptOutcome.Truncated;
-        }
 
-        int compareBallots = command.promised().compareTo(ballot);
-        if (compareBallots > 0)
-        {
-            logger.trace("{}: skipping preaccept - higher ballot witnessed 
({})", txnId, command.promised());
-            return AcceptOutcome.RejectedBallot;
+            logger.trace("{}: skipping preaccept - {}", txnId, outcome);
+            safeStore.agent().localEvents().onRejectPreAccept(safeStore, 
command, outcome);
+            return outcome;
         }
 
         if (command.known().definition().isKnown())
         {
             Invariants.require(command.status() == Invalidated || 
command.executeAt() != null);
-            logger.trace("{}: skipping preaccept - already known ({})", txnId, 
command.status());
             // in case of Ballot.ZERO, we must either have a competing 
recovery coordinator or have late delivery of the
             // preaccept; in the former case we should abandon coordination, 
and in the latter we have already completed
-            safeCommand.updatePromised(ballot);
-            return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant : 
AcceptOutcome.Success;
+            AcceptOutcome outcome;
+            if (ballot.equals(Ballot.ZERO))
+            {
+                return AcceptOutcome.Redundant;
+            }
+            else
+            {
+                safeCommand.updatePromised(ballot);
+                outcome = AcceptOutcome.Success;
+            }
+            logger.trace("{}: skipping preaccept - {}", txnId, outcome);
+            safeStore.agent().localEvents().onRejectPreAccept(safeStore, 
command, outcome);
+            return outcome;
         }
 
         if (command.known().deps().hasProposedOrDecidedDeps()) participants = 
command.participants().supplement(participants);
@@ -214,27 +225,24 @@ public class Commands
             safeCommand.markDefined(safeStore, participants, ballot, 
partialTxn);
         }
 
+        safeStore.agent().localEvents().onPreAccepted(safeStore, command);
         safeStore.notifyListeners(safeCommand, command);
         return AcceptOutcome.Success;
     }
 
-    public static boolean preacceptInvalidate(SafeCommand safeCommand, Ballot 
ballot)
+    public static boolean preacceptInvalidate(SafeCommandStore safeStore, 
SafeCommand safeCommand, Ballot ballot)
     {
         Command command = safeCommand.current();
 
-        if (command.hasBeen(Status.Committed))
+        if (command.hasBeen(Status.Committed) || 
command.promised().compareTo(ballot) > 0)
         {
-            if (command.is(Truncated)) logger.trace("{}: skipping 
preacceptInvalidate - already truncated", command.txnId());
-            else if (command.is(Invalidated)) logger.trace("{}: skipping 
preacceptInvalidate - already invalidated", command.txnId());
-            else logger.trace("{}: skipping preacceptInvalidate - already 
committed", command.txnId());
+            AcceptOutcome outcome = command.hasBeen(Committed) ? 
AcceptOutcome.Redundant : AcceptOutcome.RejectedBallot;
+            logger.trace("{}: skipping preacceptInvalidate - {}", 
command.txnId(), outcome);
+            safeStore.agent().localEvents().onRejectPreNotAccept(safeStore, 
command, outcome);
             return false;
         }
 
-        if (command.promised().compareTo(ballot) > 0)
-        {
-            logger.trace("{}: skipping preacceptInvalidate - witnessed higher 
ballot ({})", command.txnId(), command.promised());
-            return false;
-        }
+        safeStore.agent().localEvents().onPreNotAccepted(safeStore, command);
         safeCommand.updatePromised(ballot);
         return true;
     }
@@ -274,7 +282,10 @@ public class Commands
         {
             AcceptOutcome reject = maybeRejectAccept(ballot, executeAt, 
command, false);
             if (reject != null)
+            {
+                safeStore.agent().localEvents().onRejectAccept(safeStore, 
command, reject);
                 return reject;
+            }
         }
 
         SaveStatus newSaveStatus = SaveStatus.get(kind == Accept.Kind.MEDIUM ? 
Status.AcceptedMedium : Status.AcceptedSlow, command.known());
@@ -286,7 +297,8 @@ public class Commands
         PartialDeps partialDeps = prepareDeps(validated, participants, 
command, deps);
         participants = prepareParticipants(validated, participants, command);
 
-        safeCommand.accept(safeStore, newSaveStatus, participants, ballot, 
executeAt, partialTxn, partialDeps, ballot);
+        Command accepted = safeCommand.accept(safeStore, newSaveStatus, 
participants, ballot, executeAt, partialTxn, partialDeps, ballot);
+        safeStore.agent().localEvents().onAccepted(safeStore, accepted);
         safeStore.notifyListeners(safeCommand, command);
 
         return AcceptOutcome.Success;
@@ -298,11 +310,15 @@ public class Commands
         {
             AcceptOutcome reject = maybeRejectAccept(ballot, null, command, 
true);
             if (reject != null)
+            {
+                safeStore.agent().localEvents().onRejectNotAccept(safeStore, 
command, reject);
                 return reject;
+            }
         }
 
         logger.trace("{}: not accepted ({})", command.txnId(), status);
-        safeCommand.notAccept(safeStore, status, ballot);
+        Command notAccepted = safeCommand.notAccept(safeStore, status, ballot);
+        safeStore.agent().localEvents().onNotAccepted(safeStore, notAccepted);
         safeStore.notifyListeners(safeCommand, command);
         return AcceptOutcome.Success;
     }
@@ -315,13 +331,20 @@ public class Commands
     {
         final Command command = safeCommand.current();
         if (kind == StableFastPath && !command.promised().equals(Ballot.ZERO))
+        {
+            safeStore.agent().localEvents().onRejectCommitOrStable(safeStore, 
newSaveStatus, command, CommitOutcome.Rejected);
             return CommitOutcome.Rejected;
+        }
 
         SaveStatus curStatus = command.saveStatus();
         Invariants.requireArgument(newSaveStatus == SaveStatus.Committed || 
newSaveStatus == SaveStatus.Stable);
         if (newSaveStatus == SaveStatus.Committed && 
ballot.compareTo(command.promised()) < 0)
-            return curStatus.is(Truncated) || participants.owns().isEmpty()
-                   ? CommitOutcome.Redundant : CommitOutcome.Rejected;
+        {
+            CommitOutcome outcome = curStatus.is(Truncated) || 
participants.owns().isEmpty()
+                                    ? CommitOutcome.Redundant : 
CommitOutcome.Rejected;
+            safeStore.agent().localEvents().onRejectCommitOrStable(safeStore, 
newSaveStatus, command, outcome);
+            return outcome;
+        }
 
         if (curStatus.hasBeen(PreCommitted))
         {
@@ -334,13 +357,17 @@ public class Commands
             if (curStatus.compareTo(newSaveStatus) > 0 || 
curStatus.hasBeen(Stable))
             {
                 logger.trace("{}: skipping commit - already newer or stable 
({})", txnId, command.status());
+                
safeStore.agent().localEvents().onRejectCommitOrStable(safeStore, 
newSaveStatus, command, CommitOutcome.Redundant);
                 return CommitOutcome.Redundant;
             }
 
             if (curStatus == SaveStatus.Committed && newSaveStatus == 
SaveStatus.Committed)
             {
                 if (ballot.equals(command.acceptedOrCommitted()))
+                {
+                    
safeStore.agent().localEvents().onRejectCommitOrStable(safeStore, 
newSaveStatus, command, CommitOutcome.Redundant);
                     return CommitOutcome.Redundant;
+                }
 
                 
Invariants.require(ballot.compareTo(command.acceptedOrCommitted()) > 0);
             }
@@ -349,7 +376,10 @@ public class Commands
         participants = participants.filter(UPDATE, safeStore, txnId, 
executeAt);
         Validated validated = validate(ballot, newSaveStatus, command, 
participants, route, txn, deps, kind, executeAt);
         if (validated == INSUFFICIENT)
+        {
+            safeStore.agent().localEvents().onRejectCommitOrStable(safeStore, 
newSaveStatus, command, CommitOutcome.Insufficient);
             return CommitOutcome.Insufficient;
+        }
 
         PartialTxn partialTxn = prepareTxn(newSaveStatus, participants, 
command, txn);
         PartialDeps partialDeps = prepareDeps(validated, participants, 
command, deps);
@@ -362,15 +392,15 @@ public class Commands
         {
             WaitingOn waitingOn = initialiseWaitingOn(safeStore, txnId, 
executeAt, participants, partialDeps);
             committed = safeCommand.stable(safeStore, participants, ballot, 
executeAt, partialTxn, partialDeps, waitingOn);
-            safeStore.agent().eventListener().onStable(committed);
+            safeStore.agent().localEvents().onStable(safeStore, committed);
             maybeExecute(safeStore, safeCommand, true, true);
         }
         else
         {
             
Invariants.requireArgument(command.acceptedOrCommitted().compareTo(ballot) <= 
0);
             committed = safeCommand.commit(safeStore, participants, ballot, 
executeAt, partialTxn, partialDeps);
+            safeStore.agent().localEvents().onCommitted(safeStore, committed);
             safeStore.notifyListeners(safeCommand, committed);
-            safeStore.agent().eventListener().onCommitted(committed);
         }
 
         return CommitOutcome.Success;
@@ -553,7 +583,7 @@ public class Commands
                 Command.Executed executed = safeCommand.preapplied(safeStore, 
participants, ballot, executeAt, partialTxn, partialDeps, waitingOn, writes, 
result);
                 logger.trace("{}: preapplied", executed.txnId());
                 // must signal preapplied first, else we may be applied (and 
have cleared progress log state) already before maybeExecute exits
-                safeStore.agent().eventListener().onPreApplied(executed);
+                safeStore.agent().localEvents().onPreApplied(safeStore, 
executed);
                 maybeExecute(safeStore, safeCommand, true, true);
                 break;
             }
@@ -561,7 +591,7 @@ public class Commands
             {
                 Invariants.require(!waitingOn.isWaiting());
                 Command.Executed executed = safeCommand.applying(safeStore, 
participants, executeAt, partialTxn, partialDeps, waitingOn, writes, result);
-                safeStore.agent().eventListener().onPreApplied(executed);
+                safeStore.agent().localEvents().onPreApplied(safeStore, 
executed);
                 safeStore.notifyListeners(safeCommand, command);
                 logger.trace("{}: applying", executed.txnId());
                 applyChain(safeStore, executed).begin(safeStore.agent());
@@ -570,8 +600,8 @@ public class Commands
             case Applied:
             {
                 Command.Executed executed = safeCommand.applied(safeStore, 
participants, executeAt, partialTxn, partialDeps, waitingOn, writes, result);
-                safeStore.agent().eventListener().onPreApplied(executed);
-                safeStore.agent().eventListener().onApplied(executed, -1);
+                safeStore.agent().localEvents().onPreApplied(safeStore, 
executed);
+                safeStore.agent().localEvents().onApplied(safeStore, executed, 
-1);
                 safeStore.notifyListeners(safeCommand, command);
                 break;
             }
@@ -619,7 +649,7 @@ public class Commands
         }
     }
 
-    public static void postApply(SafeCommandStore safeStore, TxnId txnId, long 
t0, boolean forceApply)
+    public static void postApply(SafeCommandStore safeStore, TxnId txnId, long 
startedApplyAt, boolean forceApply)
     {
         SafeCommand safeCommand = safeStore.get(txnId);
         Command command = safeCommand.current();
@@ -628,7 +658,7 @@ public class Commands
             return;
 
         safeCommand.applied(safeStore, forceApply);
-        safeStore.agent().eventListener().onApplied(command, t0);
+        safeStore.agent().localEvents().onApplied(safeStore, command, 
startedApplyAt);
         safeStore.notifyListeners(safeCommand, command);
     }
 
@@ -651,13 +681,13 @@ public class Commands
         // TODO (required, API): do we care about tracking the write 
persistence latency, when this is just a memtable write?
         //  the only reason it will be slow is because Memtable flushes are 
backed-up (which will be reported elsewhere)
         // TODO (required): this is anyway non-monotonic and milliseconds 
granularity
-        long t0 = safeStore.node().elapsed(MICROSECONDS);
+        long startedApplyAt = safeStore.node().elapsed(MICROSECONDS);
         TxnId txnId = command.txnId();
         Participants<?> executes = command.participants().stillExecutes(); // 
including any keys we aren't writing
         return command.writes().apply(safeStore, executes, 
command.partialTxn())
                       // TODO (expected): once we guarantee execution order 
KeyHistory can be ASYNC
                .flatMap(unused -> unsafeStore.build(contextFor(txnId, 
executes, SYNC, WRITE, "Post Apply"), ss -> {
-                   postApply(ss, txnId, t0, false);
+                   postApply(ss, txnId, startedApplyAt, false);
                    return null;
                }));
     }
diff --git a/accord-core/src/main/java/accord/messages/BeginInvalidation.java 
b/accord-core/src/main/java/accord/messages/BeginInvalidation.java
index 747773e1..09e43460 100644
--- a/accord-core/src/main/java/accord/messages/BeginInvalidation.java
+++ b/accord-core/src/main/java/accord/messages/BeginInvalidation.java
@@ -77,7 +77,7 @@ public class BeginInvalidation extends 
AbstractRequest<BeginInvalidation.Invalid
         }
         else
         {
-            boolean promised = Commands.preacceptInvalidate(safeCommand, 
ballot);
+            boolean promised = Commands.preacceptInvalidate(safeStore, 
safeCommand, ballot);
             supersededBy = promised ? null : safeCommand.current().promised();
             truncated = null;
         }
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java 
b/accord-core/src/main/java/accord/messages/InformDurable.java
index d73ffde9..773c2dcc 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -26,6 +26,7 @@ import accord.local.Node.Id;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
+import accord.primitives.Ballot;
 import accord.primitives.Status;
 import accord.primitives.Status.Durability;
 import accord.local.StoreParticipants;
@@ -75,8 +76,9 @@ public class InformDurable extends TxnRequest<Reply> 
implements PreLoadContext
         this.durability = durability;
     }
 
-    public static void informDefault(Node node, Topologies any, TxnId txnId, 
Route<?> route, Timestamp executeAt, Durability durability)
+    public static void informDefault(Node node, Topologies any, TxnId txnId, 
Route<?> route, @Nullable Ballot ballot, Timestamp executeAt, Durability 
durability)
     {
+        node.agent().coordinatorEvents().onDurable(durability, ballot, txnId);
         switch (informOfDurability())
         {
             default: throw new AssertionError("Unhandled InformOfDurability: " 
+ informOfDurability());
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java 
b/accord-core/src/main/java/accord/messages/ReadData.java
index 6b77a1c6..cb214c6d 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -312,6 +312,7 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
                     int c = status.compareTo(SaveStatus.Stable);
                     if (c < 0) safeStore.progressLog().waiting(HasStableDeps, 
safeStore, safeCommand, null, null, participants);
                     else if (c > 0 && status.compareTo(executeOn().min) >= 0 
&& status.compareTo(SaveStatus.PreApplied) < 0) 
safeStore.progressLog().waiting(CanApply, safeStore, safeCommand, null, scope, 
null);
+                    node.agent().localEvents().onReadWaiting(safeStore, 
command);
                     return status.compareTo(SaveStatus.Stable) >= 0 ? null : 
Insufficient;
 
                 case OBSOLETE:
@@ -519,6 +520,7 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
         else if (txnId.awaitsOnlyDeps()) this.executeAt = 
Timestamp.max(this.executeAt, executeAt);
         else Invariants.require(executeAt.equals(this.executeAt));
 
+        node.agent().localEvents().onReadStarted(safeStore, command);
         if (executes.isEmpty()) readComplete(unsafeStore, null, unavailable);
         else beginRead(safeStore, executeAt, command.partialTxn(), executes)
              .begin(readCallback(unsafeStore, unavailable));
diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java 
b/accord-core/src/test/java/accord/impl/TestAgent.java
index 8da92cc3..adb1a5a2 100644
--- a/accord-core/src/test/java/accord/impl/TestAgent.java
+++ b/accord-core/src/test/java/accord/impl/TestAgent.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
+import accord.api.CoordinatorEventListener;
 import accord.api.ProgressLog;
 import accord.api.Result;
 import accord.impl.mock.MockStore;
@@ -32,6 +33,7 @@ import accord.local.Node;
 import accord.local.SafeCommandStore;
 import accord.local.TimeService;
 import accord.messages.ReplyContext;
+import accord.primitives.Ballot;
 import accord.primitives.Keys;
 import accord.primitives.Ranges;
 import accord.primitives.Routable.Domain;
@@ -50,16 +52,21 @@ public class TestAgent implements Agent
 {
     private static final Logger logger = 
LoggerFactory.getLogger(TestAgent.class);
 
-    public static class RethrowAgent extends TestAgent
+    public static class RethrowAgent extends TestAgent implements 
CoordinatorEventListener
     {
         @Override
-        public void onRecover(Node node, Result success, Throwable fail)
+        public CoordinatorEventListener coordinatorEvents()
+        {
+            return this;
+        }
+
+        @Override
+        public void onRecoveryStopped(Node node, TxnId txnId, Ballot ballot, 
Result success, Throwable fail)
         {
             if (fail != null)
                 throw new AssertionError("Unexpected exception", fail);
         }
 
-
         @Override
         public void onFailedBootstrap(int attempt, String phase, Ranges 
ranges, Runnable retry, Throwable failure)
         {
@@ -80,14 +87,6 @@ public class TestAgent implements Agent
         }
     }
 
-    @Override
-    public void onRecover(Node node, Result success, Throwable fail)
-    {
-        // do nothing, intended for use by implementations to decide what to 
do about recovered transactions
-        // specifically if and how they should inform clients of the result
-        // e.g. in Maelstrom we send the full result directly, in other impls 
we may simply acknowledge success via the coordinator
-    }
-
     @Override
     public void onInconsistentTimestamp(Command command, Timestamp prev, 
Timestamp next)
     {
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java 
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 24fde94f..56c81f77 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -30,7 +30,8 @@ import java.util.function.LongSupplier;
 
 import javax.annotation.Nullable;
 
-import accord.api.ProgressLog;
+import accord.api.CoordinatorEventListener;
+import accord.api.ProgressLog.BlockedUntil;
 import accord.api.Result;
 import accord.api.Scheduler;
 import accord.api.TraceEventType;
@@ -49,6 +50,7 @@ import accord.local.Node;
 import accord.local.SafeCommandStore;
 import accord.local.TimeService;
 import accord.messages.ReplyContext;
+import accord.primitives.Ballot;
 import accord.primitives.Keys;
 import accord.primitives.Ranges;
 import accord.primitives.Routable.Domain;
@@ -71,7 +73,7 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
-public class ListAgent implements InMemoryAgent
+public class ListAgent implements InMemoryAgent, CoordinatorEventListener
 {
     final Scheduler scheduler;
     final RandomSource rnd;
@@ -104,7 +106,7 @@ public class ListAgent implements InMemoryAgent
     }
 
     @Override
-    public void onRecover(Node node, Result success, Throwable fail)
+    public void onRecoveryStopped(Node node, TxnId txnId, Ballot ballot, 
Result success, Throwable fail)
     {
         if (fail != null)
         {
@@ -140,6 +142,12 @@ public class ListAgent implements InMemoryAgent
         retryBootstrap.accept(retry);
     }
 
+    @Override
+    public CoordinatorEventListener coordinatorEvents()
+    {
+        return this;
+    }
+
     @Override
     public void onStale(Timestamp staleSince, Ranges ranges)
     {
@@ -208,13 +216,13 @@ public class ListAgent implements InMemoryAgent
     }
 
     @Override
-    public long slowReplicaDelay(Node node, SafeCommandStore safeStore, TxnId 
txnId, int retryCount, ProgressLog.BlockedUntil blockedUntil, TimeUnit units)
+    public long slowReplicaDelay(Node node, SafeCommandStore safeStore, TxnId 
txnId, int retryCount, BlockedUntil blockedUntil, TimeUnit units)
     {
         return units.convert(rnd.nextInt(100, 1000), MILLISECONDS);
     }
 
     @Override
-    public long slowAwaitDelay(Node node, SafeCommandStore safeStore, TxnId 
txnId, int retryCount, ProgressLog.BlockedUntil retrying, TimeUnit units)
+    public long slowAwaitDelay(Node node, SafeCommandStore safeStore, TxnId 
txnId, int retryCount, BlockedUntil retrying, TimeUnit units)
     {
         int retryDelay = Math.min(16, 1 << retryCount);
         return units.convert(retryDelay, SECONDS);
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java 
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index 17cd3c25..6d1a84d3 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -1026,12 +1026,6 @@ public class CommandsForKeyTest
             throw new UnsupportedOperationException();
         }
 
-        @Override
-        public void onRecover(Node node, Result success, Throwable fail)
-        {
-            throw new UnsupportedOperationException();
-        }
-
         @Override
         public void onInconsistentTimestamp(Command command, Timestamp prev, 
Timestamp next)
         {
diff --git 
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
index 008ea3b0..a59731cd 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
@@ -21,6 +21,7 @@ package accord.maelstrom;
 import java.util.concurrent.TimeUnit;
 
 import accord.api.Agent;
+import accord.api.CoordinatorEventListener;
 import accord.api.ProgressLog;
 import accord.api.Result;
 import accord.local.Command;
@@ -28,6 +29,7 @@ import accord.local.Node;
 import accord.local.SafeCommandStore;
 import accord.local.TimeService;
 import accord.messages.ReplyContext;
+import accord.primitives.Ballot;
 import accord.primitives.Keys;
 import accord.primitives.Ranges;
 import accord.primitives.Routable.Domain;
@@ -43,12 +45,12 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
-public class MaelstromAgent implements Agent
+public class MaelstromAgent implements Agent, CoordinatorEventListener
 {
     static final MaelstromAgent INSTANCE = new MaelstromAgent();
 
     @Override
-    public void onRecover(Node node, Result success, Throwable fail)
+    public void onRecoveryStopped(Node node, TxnId txnId, Ballot ballot, 
Result success, Throwable fail)
     {
         if (fail != null)
         {
@@ -79,6 +81,12 @@ public class MaelstromAgent implements Agent
     {
     }
 
+    @Override
+    public CoordinatorEventListener coordinatorEvents()
+    {
+        return this;
+    }
+
     @Override
     public void onUncaughtException(Throwable t)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to