This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 938ba19a Coordinate/ExecuteEphemeralRead should invoke coordinator 
metrics Also improve:  - TimeService.recentElapsed to reduce number of nanoTime 
calls  - Support serializing prefixes before CommandsForKey
938ba19a is described below

commit 938ba19adaf70bf9901fb2dc177dce313d7ee15c
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Nov 10 17:33:07 2025 +0000

    Coordinate/ExecuteEphemeralRead should invoke coordinator metrics
    Also improve:
     - TimeService.recentElapsed to reduce number of nanoTime calls
     - Support serializing prefixes before CommandsForKey
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21017
---
 .../main/java/accord/api/ReplicaEventListener.java |  2 +-
 .../src/main/java/accord/api/TopologyService.java  |  1 -
 .../accord/coordinate/CoordinateEphemeralRead.java |  1 +
 .../accord/coordinate/ExecuteEphemeralRead.java    |  8 +++++++
 .../main/java/accord/impl/AbstractReplayer.java    |  2 +-
 .../main/java/accord/impl/AbstractTimeouts.java    |  6 ++---
 .../main/java/accord/impl/RequestCallbacks.java    |  2 +-
 .../impl/progresslog/DefaultProgressLog.java       |  6 ++---
 .../java/accord/impl/progresslog/TxnState.java     |  2 +-
 .../main/java/accord/local/CommandSummaries.java   | 10 ++++++---
 .../src/main/java/accord/local/Commands.java       | 16 +++++--------
 accord-core/src/main/java/accord/local/Node.java   |  2 +-
 .../src/main/java/accord/local/TimeService.java    |  6 +++++
 .../src/main/java/accord/local/cfk/Serialize.java  | 26 +++++++++++++++++-----
 14 files changed, 59 insertions(+), 31 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/ReplicaEventListener.java 
b/accord-core/src/main/java/accord/api/ReplicaEventListener.java
index a46dbadc..a3169a0c 100644
--- a/accord-core/src/main/java/accord/api/ReplicaEventListener.java
+++ b/accord-core/src/main/java/accord/api/ReplicaEventListener.java
@@ -45,7 +45,7 @@ public interface ReplicaEventListener
 
     default void onPreApplied(SafeCommandStore safeStore, Command cmd) {}
     // startedApplyAt may be less than zero, indicating it has not been 
populated
-    default void onApplied(SafeCommandStore safeStore, Command cmd, long 
startedApplyAt) {}
+    default void onApplied(SafeCommandStore safeStore, Command cmd) {}
 
     ReplicaEventListener NOOP = new ReplicaEventListener()
     {
diff --git a/accord-core/src/main/java/accord/api/TopologyService.java 
b/accord-core/src/main/java/accord/api/TopologyService.java
index d9815a7f..a3a4ad7a 100644
--- a/accord-core/src/main/java/accord/api/TopologyService.java
+++ b/accord-core/src/main/java/accord/api/TopologyService.java
@@ -19,7 +19,6 @@
 package accord.api;
 
 import accord.local.Node;
-import accord.topology.EpochReady;
 import accord.topology.Topology;
 import accord.utils.async.AsyncResult;
 
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
index 8f4ad7c2..774e0e35 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
@@ -172,6 +172,7 @@ public class CoordinateEphemeralRead extends 
AbstractCoordinatePreAccept<Result,
     @Override
     void onPreAccepted(Topologies topologies)
     {
+        node.agent().coordinatorEvents().onPreAccepted(txnId);
         SortedListMap<Node.Id, GetEphemeralReadDepsOk> oks = finishOks();
         Deps deps = Deps.merge(oks, oks.domainSize(), SortedListMap::getValue, 
ok -> ok.deps);
         topologies = node.topology().active().reselect(topologies, 
QuorumEpochIntersections.preaccept.include, scope, executeAtEpoch, 
executeAtEpoch, SHARE, Owned);
diff --git 
a/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
index ed936df3..009c9a99 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
@@ -41,6 +41,7 @@ import accord.messages.ReadData.ReadReply;
 import accord.messages.ReadEphemeralTxnData;
 import accord.messages.SafeCallback;
 import accord.messages.StableThenRead;
+import accord.primitives.Ballot;
 import accord.primitives.Deps;
 import accord.primitives.FullRoute;
 import accord.primitives.Participants;
@@ -52,6 +53,7 @@ import accord.utils.Invariants;
 import accord.utils.UnhandledEnum;
 
 import static accord.api.ProtocolModifiers.Toggles.permitLocalExecution;
+import static accord.coordinate.ExecutePath.FAST;
 import static accord.coordinate.ReadCoordinator.Action.Approve;
 import static accord.coordinate.ReadCoordinator.Action.ApprovePartial;
 import static accord.primitives.Status.Phase.Execute;
@@ -89,6 +91,7 @@ public class ExecuteEphemeralRead extends 
ReadCoordinator<Result, ReadReply>
     @Override
     protected void startOnceInitialised()
     {
+        node.agent().coordinatorEvents().onExecuting(txnId, Ballot.ZERO, deps, 
FAST);
         if (permitLocalExecution() && tryIfUniversal(node.id()))
         {
             new LocalExecute(txnId, node.id()).process(node, 
node.agent().selfExpiresAt(txnId, Execute, MICROSECONDS));
@@ -158,9 +161,14 @@ public class ExecuteEphemeralRead extends 
ReadCoordinator<Result, ReadReply>
     protected void onDone(Success success, Throwable failure)
     {
         if (failure == null)
+        {
+            node.agent().coordinatorEvents().onExecuted(txnId, Ballot.ZERO);
             invokeCallback(txn.result(txnId, 
txnId.withEpochAtLeast(allTopologies.currentEpoch()), data), null);
+        }
         else
+        {
             invokeCallback(null, failure);
+        }
     }
 
     class LocalExecute extends ReadEphemeralTxnData
diff --git a/accord-core/src/main/java/accord/impl/AbstractReplayer.java 
b/accord-core/src/main/java/accord/impl/AbstractReplayer.java
index d42615ed..9fd37f57 100644
--- a/accord-core/src/main/java/accord/impl/AbstractReplayer.java
+++ b/accord-core/src/main/java/accord/impl/AbstractReplayer.java
@@ -81,7 +81,7 @@ public abstract class AbstractReplayer implements 
Journal.Replayer
                     command.writes()
                            .apply(safeStore, executes, command.partialTxn())
                            .invoke(() -> 
unsafeStore.chain(PreLoadContext.contextFor(txnId, "Replay"), ss -> {
-                               Commands.postApply(ss, txnId, -1, true);
+                               Commands.postApply(ss, txnId, true);
                            }))
                            .begin(safeStore.agent());
                 }
diff --git a/accord-core/src/main/java/accord/impl/AbstractTimeouts.java 
b/accord-core/src/main/java/accord/impl/AbstractTimeouts.java
index 69d89845..a37ce363 100644
--- a/accord-core/src/main/java/accord/impl/AbstractTimeouts.java
+++ b/accord-core/src/main/java/accord/impl/AbstractTimeouts.java
@@ -123,7 +123,7 @@ public class AbstractTimeouts<S extends 
AbstractTimeouts.Stripe> implements Time
         @Override
         public void run()
         {
-            long now = time.elapsed(MICROSECONDS);
+            long now = time.recentElapsed(MICROSECONDS);
             lock();
             unlock(now);
         }
@@ -241,7 +241,7 @@ public class AbstractTimeouts<S extends 
AbstractTimeouts.Stripe> implements Time
     @Override
     public RegisteredTimeout registerAt(Timeout timeout, long deadline, 
TimeUnit units)
     {
-        long now = time.elapsed(MICROSECONDS);
+        long now = time.recentElapsed(MICROSECONDS);
         deadline = units.toMicros(deadline);
         return registerAt(timeout, now, deadline);
     }
@@ -262,7 +262,7 @@ public class AbstractTimeouts<S extends 
AbstractTimeouts.Stripe> implements Time
     @Override
     public void maybeNotify()
     {
-        long nowMicros = time.elapsed(MICROSECONDS);
+        long nowMicros = time.recentElapsed(MICROSECONDS);
         for (Stripe stripe : stripes)
         {
             if (stripe.timeouts.shouldWake(nowMicros) && stripe.tryLock())
diff --git a/accord-core/src/main/java/accord/impl/RequestCallbacks.java 
b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
index 746430f6..59dc1eb9 100644
--- a/accord-core/src/main/java/accord/impl/RequestCallbacks.java
+++ b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
@@ -212,7 +212,7 @@ public class RequestCallbacks extends 
AbstractTimeouts<RequestCallbacks.Callback
         private <T, P> RegisteredCallback<T> safeInvoke(long callbackId, 
Node.Id from, P param, BiConsumer<RegisteredCallback<T>, P> invoker, boolean 
remove)
         {
             RegisteredCallback<T> registered = null;
-            long now = time.elapsed(MICROSECONDS);
+            long now = time.recentElapsed(MICROSECONDS);
             lock();
             try
             {
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java 
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index 35017ea5..0e3e8174 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -527,7 +527,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         if (stopped || processing)
             return;
 
-        long nowMicros = node.elapsed(TimeUnit.MICROSECONDS);
+        long nowMicros = node.recentElapsed(TimeUnit.MICROSECONDS);
         processing = true;
         try
         {
@@ -815,7 +815,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
 
     long nextCallbackId()
     {
-        long id = node.elapsed(NANOSECONDS);
+        long id = node.recentElapsed(NANOSECONDS);
         if (id > prevCallbackId) prevCallbackId = id;
         else id = ++prevCallbackId;
         return id;
@@ -948,7 +948,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         }
         else
         {
-            long now = node.elapsed(MICROSECONDS);
+            long now = node.recentElapsed(MICROSECONDS);
             if (timers.shouldWake(now))
                 commandStore.execute((PreLoadContext.Empty) () -> "Run 
ProgressLog", this, node.agent());
         }
diff --git a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java 
b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
index 7b84f4a9..1292061e 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
@@ -120,7 +120,7 @@ public final class TxnState extends WaitingState implements 
PreLoadContext
         }
         else
         {
-            long nowMicros = owner.node().elapsed(MICROSECONDS);
+            long nowMicros = owner.node().recentElapsed(MICROSECONDS);
             long newDeadline = nowMicros + newDelay;
             if (otherDeadline == 0)
             {
diff --git a/accord-core/src/main/java/accord/local/CommandSummaries.java 
b/accord-core/src/main/java/accord/local/CommandSummaries.java
index 675b48f5..91ce1ff1 100644
--- a/accord-core/src/main/java/accord/local/CommandSummaries.java
+++ b/accord-core/src/main/java/accord/local/CommandSummaries.java
@@ -24,6 +24,7 @@ import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import accord.api.RoutingKey;
 import accord.local.MaxDecidedRX.DecidedRX;
 import accord.local.RedundantBefore.Bounds;
 import accord.local.cfk.CommandsForKey;
@@ -227,17 +228,20 @@ public interface CommandSummaries
             if (cfk == null || cfk.size() == 0)
                 return false;
 
+            return isRelevant(cfk.key(), cfk.get(cfk.size() - 1), 
cfk.minUndecided());
+        }
+
+        public boolean isRelevant(RoutingKey key, TxnId last, TxnId 
minUndecided)
+        {
             // NOTE: we CANNOT safely filter on first element, as we may have 
pruned dependencies we need to witness
             //  and that will be populated on the receiving replicas as 
necessary - that is,
             //  we must permit adopting future dependencies
-            CommandsForKey.TxnInfo last = cfk.get(cfk.size() - 1);
             if (last.compareTo(minTxnId) < 0)
                 return false;
 
             if (maxDecidedRX == null)
                 return true;
 
-            CommandsForKey.TxnInfo minUndecided = cfk.minUndecided();
             if (minUndecided != null)
                 return true;
 
@@ -245,7 +249,7 @@ public interface CommandSummaries
             if (decidedRx != null && decidedRx.excludeDecided(last))
                 return false;
 
-            DecidedRX decidedRx = maxDecidedRX.forDeps(cfk.key(), 
primaryTxnId);
+            DecidedRX decidedRx = maxDecidedRX.forDeps(key, primaryTxnId);
             return decidedRx == null || decidedRx.includeDecided(last);
         }
 
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index b9789473..16db84fe 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -131,7 +131,6 @@ import static accord.primitives.Txn.Kind.Write;
 import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithDeps;
 import static accord.utils.Invariants.illegalState;
 import static accord.utils.Invariants.nonNull;
-import static java.util.concurrent.TimeUnit.MICROSECONDS;
 
 public class Commands
 {
@@ -616,7 +615,7 @@ public class Commands
             {
                 Command.Executed executed = safeCommand.applied(safeStore, 
participants, executeAt, partialTxn, partialDeps, waitingOn, writes, result);
                 safeStore.agent().replicaEvents().onPreApplied(safeStore, 
executed);
-                safeStore.agent().replicaEvents().onApplied(safeStore, 
executed, -1);
+                safeStore.agent().replicaEvents().onApplied(safeStore, 
executed);
                 safeStore.notifyListeners(safeCommand, command);
                 break;
             }
@@ -664,7 +663,7 @@ public class Commands
         }
     }
 
-    public static void postApply(SafeCommandStore safeStore, TxnId txnId, long 
startedApplyAt, boolean forceApply)
+    public static void postApply(SafeCommandStore safeStore, TxnId txnId, 
boolean forceApply)
     {
         SafeCommand safeCommand = safeStore.get(txnId);
         Command command = safeCommand.current();
@@ -673,7 +672,7 @@ public class Commands
             return;
 
         safeCommand.applied(safeStore, forceApply);
-        safeStore.agent().replicaEvents().onApplied(safeStore, command, 
startedApplyAt);
+        safeStore.agent().replicaEvents().onApplied(safeStore, command);
         safeStore.notifyListeners(safeCommand, command);
     }
 
@@ -682,16 +681,14 @@ public class Commands
         final CommandStore commandStore;
         final TxnId txnId;
         final Participants<?> participants;
-        final long startedApplyAt;
         final boolean force;
 
-        protected PostApply(Head<?> head, CommandStore commandStore, TxnId 
txnId, Participants<?> participants, long startedApplyAt, boolean force)
+        protected PostApply(Head<?> head, CommandStore commandStore, TxnId 
txnId, Participants<?> participants, boolean force)
         {
             super(head);
             this.commandStore = commandStore;
             this.txnId = txnId;
             this.participants = participants;
-            this.startedApplyAt = startedApplyAt;
             this.force = force;
         }
 
@@ -704,7 +701,7 @@ public class Commands
         @Override
         public void accept(SafeCommandStore safeStore)
         {
-            postApply(safeStore, txnId, startedApplyAt, force);
+            postApply(safeStore, txnId, force);
         }
 
         @Override public TxnId primaryTxnId() { return txnId; }
@@ -722,14 +719,13 @@ public class Commands
         // TODO (required, API): do we care about tracking the write 
persistence latency, when this is just a memtable write?
         //  the only reason it will be slow is because Memtable flushes are 
backed-up (which will be reported elsewhere)
         // TODO (required): this is anyway non-monotonic and milliseconds 
granularity
-        long startedApplyAt = safeStore.node().elapsed(MICROSECONDS);
         TxnId txnId = command.txnId();
         //noinspection DataFlowIssue
         safeStore = safeStore; // disable reuse
         Participants<?> executes = command.participants().stillExecutes(); // 
including any keys we aren't writing
         return command.writes()
                       .apply(safeStore, executes, command.partialTxn())
-                      .then(head -> new PostApply<>(head, unsafeStore, txnId, 
executes, startedApplyAt, false));
+                      .then(head -> new PostApply<>(head, unsafeStore, txnId, 
executes, false));
     }
 
     public static boolean maybeExecute(SafeCommandStore safeStore, SafeCommand 
safeCommand, boolean alwaysNotifyListeners, boolean notifyWaitingOn)
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index e2828ddc..effd6b44 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -872,7 +872,7 @@ public class Node implements NodeCommandStoreService
         long mostRecent = coordinations.mostRecent(txnId, 
COORDINATES_STATE_MACHINE, ballot);
         if (mostRecent < 0)
             return false;
-        long ageNanos = Math.max(elapsed(NANOSECONDS) - mostRecent, 0);
+        long ageNanos = Math.max(recentElapsed(NANOSECONDS) - mostRecent, 0);
         return !agent.isSlowCoordinator(ageNanos, NANOSECONDS, txnId, 1);
     }
 
diff --git a/accord-core/src/main/java/accord/local/TimeService.java 
b/accord-core/src/main/java/accord/local/TimeService.java
index b9a918f2..9dc7f0de 100644
--- a/accord-core/src/main/java/accord/local/TimeService.java
+++ b/accord-core/src/main/java/accord/local/TimeService.java
@@ -38,6 +38,12 @@ public interface TimeService
      */
     long elapsed(TimeUnit unit);
 
+    /**
+     * {@link #elapsed(TimeUnit)} but accepts a "recently" updated value; this 
must be no earlier than the start
+     * of processing the relevant task.
+     */
+    default long recentElapsed(TimeUnit unit) { return elapsed(unit); }
+
     static TimeService ofNonMonotonic(LongSupplier now, TimeUnit units)
     {
         return of(now, elapsedWrapperFromNonMonotonicSource(units, now));
diff --git a/accord-core/src/main/java/accord/local/cfk/Serialize.java 
b/accord-core/src/main/java/accord/local/cfk/Serialize.java
index 659aeb2e..610f2431 100644
--- a/accord-core/src/main/java/accord/local/cfk/Serialize.java
+++ b/accord-core/src/main/java/accord/local/cfk/Serialize.java
@@ -132,12 +132,17 @@ public class Serialize
      * TODO (desired): determine timestamp resolution as a factor of 10
      */
     public static ByteBuffer toBytesWithoutKey(CommandsForKey cfk)
+    {
+        return toBytesWithoutKey(0, cfk);
+    }
+
+    public static ByteBuffer toBytesWithoutKey(int prefixBytes, CommandsForKey 
cfk)
     {
         Invariants.requireArgument(!cfk.isLoadingPruned());
-        return unsafeToBytesWithoutKey(cfk);
+        return unsafeToBytesWithoutKey(prefixBytes, cfk);
     }
 
-    private static ByteBuffer unsafeToBytesWithoutKey(CommandsForKey cfk)
+    private static ByteBuffer unsafeToBytesWithoutKey(int prefixBytes, 
CommandsForKey cfk)
     {
         Invariants.require(!cfk.isLoadingPruned());
 
@@ -145,10 +150,11 @@ public class Serialize
         if (commandCount == 0)
         {
             if (!cfk.hasMaxUniqueHlc())
-                return ByteBuffer.allocate(1);
+                return ByteBuffer.allocate(prefixBytes + 1);
 
             int size = 1 + VIntCoding.sizeOfUnsignedVInt(cfk.maxUniqueHlc);
-            ByteBuffer result = ByteBuffer.allocate(size);
+            ByteBuffer result = ByteBuffer.allocate(prefixBytes + size);
+            result.position(prefixBytes);
             VIntCoding.writeUnsignedVInt32(1, result);
             VIntCoding.writeUnsignedVInt(cfk.maxUniqueHlc, result);
             Invariants.require(!result.hasRemaining());
@@ -458,7 +464,8 @@ public class Serialize
             totalBytes += sizeOfUnsignedVInt(unmanagedPendingCommitCount);
             totalBytes += sizeOfUnsignedVInt(cfk.unmanagedCount() - 
unmanagedPendingCommitCount);
 
-            ByteBuffer out = ByteBuffer.allocate(totalBytes);
+            ByteBuffer out = ByteBuffer.allocate(prefixBytes + totalBytes);
+            out.position(prefixBytes);
             VIntCoding.writeUnsignedVInt32(commandCount + 1, out);
             VIntCoding.writeUnsignedVInt32(nodeIdCount, out);
             VIntCoding.writeUnsignedVInt32(nodeIds[0], out);
@@ -756,11 +763,18 @@ public class Serialize
     }
 
     public static CommandsForKey fromBytes(RoutingKey key, ByteBuffer in)
+    {
+        return fromBytes(key, in, true);
+    }
+
+    public static CommandsForKey fromBytes(RoutingKey key, ByteBuffer in, 
boolean duplicate)
     {
         if (!in.hasRemaining())
             return null;
 
-        in = in.duplicate();
+        if (duplicate)
+            in = in.duplicate();
+
         int commandCount = VIntCoding.readUnsignedVInt32(in) - 1;
         if (commandCount <= 0)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to