This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 938ba19a Coordinate/ExecuteEphemeralRead should invoke coordinator
metrics Also improve: - TimeService.recentElapsed to reduce number of nanoTime
calls - Support serializing prefixes before CommandsForKey
938ba19a is described below
commit 938ba19adaf70bf9901fb2dc177dce313d7ee15c
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Nov 10 17:33:07 2025 +0000
Coordinate/ExecuteEphemeralRead should invoke coordinator metrics
Also improve:
- TimeService.recentElapsed to reduce number of nanoTime calls
- Support serializing prefixes before CommandsForKey
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21017
---
.../main/java/accord/api/ReplicaEventListener.java | 2 +-
.../src/main/java/accord/api/TopologyService.java | 1 -
.../accord/coordinate/CoordinateEphemeralRead.java | 1 +
.../accord/coordinate/ExecuteEphemeralRead.java | 8 +++++++
.../main/java/accord/impl/AbstractReplayer.java | 2 +-
.../main/java/accord/impl/AbstractTimeouts.java | 6 ++---
.../main/java/accord/impl/RequestCallbacks.java | 2 +-
.../impl/progresslog/DefaultProgressLog.java | 6 ++---
.../java/accord/impl/progresslog/TxnState.java | 2 +-
.../main/java/accord/local/CommandSummaries.java | 10 ++++++---
.../src/main/java/accord/local/Commands.java | 16 +++++--------
accord-core/src/main/java/accord/local/Node.java | 2 +-
.../src/main/java/accord/local/TimeService.java | 6 +++++
.../src/main/java/accord/local/cfk/Serialize.java | 26 +++++++++++++++++-----
14 files changed, 59 insertions(+), 31 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/ReplicaEventListener.java
b/accord-core/src/main/java/accord/api/ReplicaEventListener.java
index a46dbadc..a3169a0c 100644
--- a/accord-core/src/main/java/accord/api/ReplicaEventListener.java
+++ b/accord-core/src/main/java/accord/api/ReplicaEventListener.java
@@ -45,7 +45,7 @@ public interface ReplicaEventListener
default void onPreApplied(SafeCommandStore safeStore, Command cmd) {}
// startedApplyAt may be less than zero, indicating it has not been
populated
- default void onApplied(SafeCommandStore safeStore, Command cmd, long
startedApplyAt) {}
+ default void onApplied(SafeCommandStore safeStore, Command cmd) {}
ReplicaEventListener NOOP = new ReplicaEventListener()
{
diff --git a/accord-core/src/main/java/accord/api/TopologyService.java
b/accord-core/src/main/java/accord/api/TopologyService.java
index d9815a7f..a3a4ad7a 100644
--- a/accord-core/src/main/java/accord/api/TopologyService.java
+++ b/accord-core/src/main/java/accord/api/TopologyService.java
@@ -19,7 +19,6 @@
package accord.api;
import accord.local.Node;
-import accord.topology.EpochReady;
import accord.topology.Topology;
import accord.utils.async.AsyncResult;
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
b/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
index 8f4ad7c2..774e0e35 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
@@ -172,6 +172,7 @@ public class CoordinateEphemeralRead extends
AbstractCoordinatePreAccept<Result,
@Override
void onPreAccepted(Topologies topologies)
{
+ node.agent().coordinatorEvents().onPreAccepted(txnId);
SortedListMap<Node.Id, GetEphemeralReadDepsOk> oks = finishOks();
Deps deps = Deps.merge(oks, oks.domainSize(), SortedListMap::getValue,
ok -> ok.deps);
topologies = node.topology().active().reselect(topologies,
QuorumEpochIntersections.preaccept.include, scope, executeAtEpoch,
executeAtEpoch, SHARE, Owned);
diff --git
a/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
b/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
index ed936df3..009c9a99 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
@@ -41,6 +41,7 @@ import accord.messages.ReadData.ReadReply;
import accord.messages.ReadEphemeralTxnData;
import accord.messages.SafeCallback;
import accord.messages.StableThenRead;
+import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
import accord.primitives.Participants;
@@ -52,6 +53,7 @@ import accord.utils.Invariants;
import accord.utils.UnhandledEnum;
import static accord.api.ProtocolModifiers.Toggles.permitLocalExecution;
+import static accord.coordinate.ExecutePath.FAST;
import static accord.coordinate.ReadCoordinator.Action.Approve;
import static accord.coordinate.ReadCoordinator.Action.ApprovePartial;
import static accord.primitives.Status.Phase.Execute;
@@ -89,6 +91,7 @@ public class ExecuteEphemeralRead extends
ReadCoordinator<Result, ReadReply>
@Override
protected void startOnceInitialised()
{
+ node.agent().coordinatorEvents().onExecuting(txnId, Ballot.ZERO, deps,
FAST);
if (permitLocalExecution() && tryIfUniversal(node.id()))
{
new LocalExecute(txnId, node.id()).process(node,
node.agent().selfExpiresAt(txnId, Execute, MICROSECONDS));
@@ -158,9 +161,14 @@ public class ExecuteEphemeralRead extends
ReadCoordinator<Result, ReadReply>
protected void onDone(Success success, Throwable failure)
{
if (failure == null)
+ {
+ node.agent().coordinatorEvents().onExecuted(txnId, Ballot.ZERO);
invokeCallback(txn.result(txnId,
txnId.withEpochAtLeast(allTopologies.currentEpoch()), data), null);
+ }
else
+ {
invokeCallback(null, failure);
+ }
}
class LocalExecute extends ReadEphemeralTxnData
diff --git a/accord-core/src/main/java/accord/impl/AbstractReplayer.java
b/accord-core/src/main/java/accord/impl/AbstractReplayer.java
index d42615ed..9fd37f57 100644
--- a/accord-core/src/main/java/accord/impl/AbstractReplayer.java
+++ b/accord-core/src/main/java/accord/impl/AbstractReplayer.java
@@ -81,7 +81,7 @@ public abstract class AbstractReplayer implements
Journal.Replayer
command.writes()
.apply(safeStore, executes, command.partialTxn())
.invoke(() ->
unsafeStore.chain(PreLoadContext.contextFor(txnId, "Replay"), ss -> {
- Commands.postApply(ss, txnId, -1, true);
+ Commands.postApply(ss, txnId, true);
}))
.begin(safeStore.agent());
}
diff --git a/accord-core/src/main/java/accord/impl/AbstractTimeouts.java
b/accord-core/src/main/java/accord/impl/AbstractTimeouts.java
index 69d89845..a37ce363 100644
--- a/accord-core/src/main/java/accord/impl/AbstractTimeouts.java
+++ b/accord-core/src/main/java/accord/impl/AbstractTimeouts.java
@@ -123,7 +123,7 @@ public class AbstractTimeouts<S extends
AbstractTimeouts.Stripe> implements Time
@Override
public void run()
{
- long now = time.elapsed(MICROSECONDS);
+ long now = time.recentElapsed(MICROSECONDS);
lock();
unlock(now);
}
@@ -241,7 +241,7 @@ public class AbstractTimeouts<S extends
AbstractTimeouts.Stripe> implements Time
@Override
public RegisteredTimeout registerAt(Timeout timeout, long deadline,
TimeUnit units)
{
- long now = time.elapsed(MICROSECONDS);
+ long now = time.recentElapsed(MICROSECONDS);
deadline = units.toMicros(deadline);
return registerAt(timeout, now, deadline);
}
@@ -262,7 +262,7 @@ public class AbstractTimeouts<S extends
AbstractTimeouts.Stripe> implements Time
@Override
public void maybeNotify()
{
- long nowMicros = time.elapsed(MICROSECONDS);
+ long nowMicros = time.recentElapsed(MICROSECONDS);
for (Stripe stripe : stripes)
{
if (stripe.timeouts.shouldWake(nowMicros) && stripe.tryLock())
diff --git a/accord-core/src/main/java/accord/impl/RequestCallbacks.java
b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
index 746430f6..59dc1eb9 100644
--- a/accord-core/src/main/java/accord/impl/RequestCallbacks.java
+++ b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
@@ -212,7 +212,7 @@ public class RequestCallbacks extends
AbstractTimeouts<RequestCallbacks.Callback
private <T, P> RegisteredCallback<T> safeInvoke(long callbackId,
Node.Id from, P param, BiConsumer<RegisteredCallback<T>, P> invoker, boolean
remove)
{
RegisteredCallback<T> registered = null;
- long now = time.elapsed(MICROSECONDS);
+ long now = time.recentElapsed(MICROSECONDS);
lock();
try
{
diff --git
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index 35017ea5..0e3e8174 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -527,7 +527,7 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
if (stopped || processing)
return;
- long nowMicros = node.elapsed(TimeUnit.MICROSECONDS);
+ long nowMicros = node.recentElapsed(TimeUnit.MICROSECONDS);
processing = true;
try
{
@@ -815,7 +815,7 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
long nextCallbackId()
{
- long id = node.elapsed(NANOSECONDS);
+ long id = node.recentElapsed(NANOSECONDS);
if (id > prevCallbackId) prevCallbackId = id;
else id = ++prevCallbackId;
return id;
@@ -948,7 +948,7 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
}
else
{
- long now = node.elapsed(MICROSECONDS);
+ long now = node.recentElapsed(MICROSECONDS);
if (timers.shouldWake(now))
commandStore.execute((PreLoadContext.Empty) () -> "Run
ProgressLog", this, node.agent());
}
diff --git a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
index 7b84f4a9..1292061e 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
@@ -120,7 +120,7 @@ public final class TxnState extends WaitingState implements
PreLoadContext
}
else
{
- long nowMicros = owner.node().elapsed(MICROSECONDS);
+ long nowMicros = owner.node().recentElapsed(MICROSECONDS);
long newDeadline = nowMicros + newDelay;
if (otherDeadline == 0)
{
diff --git a/accord-core/src/main/java/accord/local/CommandSummaries.java
b/accord-core/src/main/java/accord/local/CommandSummaries.java
index 675b48f5..91ce1ff1 100644
--- a/accord-core/src/main/java/accord/local/CommandSummaries.java
+++ b/accord-core/src/main/java/accord/local/CommandSummaries.java
@@ -24,6 +24,7 @@ import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
+import accord.api.RoutingKey;
import accord.local.MaxDecidedRX.DecidedRX;
import accord.local.RedundantBefore.Bounds;
import accord.local.cfk.CommandsForKey;
@@ -227,17 +228,20 @@ public interface CommandSummaries
if (cfk == null || cfk.size() == 0)
return false;
+ return isRelevant(cfk.key(), cfk.get(cfk.size() - 1),
cfk.minUndecided());
+ }
+
+ public boolean isRelevant(RoutingKey key, TxnId last, TxnId
minUndecided)
+ {
// NOTE: we CANNOT safely filter on first element, as we may have
pruned dependencies we need to witness
// and that will be populated on the receiving replicas as
necessary - that is,
// we must permit adopting future dependencies
- CommandsForKey.TxnInfo last = cfk.get(cfk.size() - 1);
if (last.compareTo(minTxnId) < 0)
return false;
if (maxDecidedRX == null)
return true;
- CommandsForKey.TxnInfo minUndecided = cfk.minUndecided();
if (minUndecided != null)
return true;
@@ -245,7 +249,7 @@ public interface CommandSummaries
if (decidedRx != null && decidedRx.excludeDecided(last))
return false;
- DecidedRX decidedRx = maxDecidedRX.forDeps(cfk.key(),
primaryTxnId);
+ DecidedRX decidedRx = maxDecidedRX.forDeps(key, primaryTxnId);
return decidedRx == null || decidedRx.includeDecided(last);
}
diff --git a/accord-core/src/main/java/accord/local/Commands.java
b/accord-core/src/main/java/accord/local/Commands.java
index b9789473..16db84fe 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -131,7 +131,6 @@ import static accord.primitives.Txn.Kind.Write;
import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithDeps;
import static accord.utils.Invariants.illegalState;
import static accord.utils.Invariants.nonNull;
-import static java.util.concurrent.TimeUnit.MICROSECONDS;
public class Commands
{
@@ -616,7 +615,7 @@ public class Commands
{
Command.Executed executed = safeCommand.applied(safeStore,
participants, executeAt, partialTxn, partialDeps, waitingOn, writes, result);
safeStore.agent().replicaEvents().onPreApplied(safeStore,
executed);
- safeStore.agent().replicaEvents().onApplied(safeStore,
executed, -1);
+ safeStore.agent().replicaEvents().onApplied(safeStore,
executed);
safeStore.notifyListeners(safeCommand, command);
break;
}
@@ -664,7 +663,7 @@ public class Commands
}
}
- public static void postApply(SafeCommandStore safeStore, TxnId txnId, long
startedApplyAt, boolean forceApply)
+ public static void postApply(SafeCommandStore safeStore, TxnId txnId,
boolean forceApply)
{
SafeCommand safeCommand = safeStore.get(txnId);
Command command = safeCommand.current();
@@ -673,7 +672,7 @@ public class Commands
return;
safeCommand.applied(safeStore, forceApply);
- safeStore.agent().replicaEvents().onApplied(safeStore, command,
startedApplyAt);
+ safeStore.agent().replicaEvents().onApplied(safeStore, command);
safeStore.notifyListeners(safeCommand, command);
}
@@ -682,16 +681,14 @@ public class Commands
final CommandStore commandStore;
final TxnId txnId;
final Participants<?> participants;
- final long startedApplyAt;
final boolean force;
- protected PostApply(Head<?> head, CommandStore commandStore, TxnId
txnId, Participants<?> participants, long startedApplyAt, boolean force)
+ protected PostApply(Head<?> head, CommandStore commandStore, TxnId
txnId, Participants<?> participants, boolean force)
{
super(head);
this.commandStore = commandStore;
this.txnId = txnId;
this.participants = participants;
- this.startedApplyAt = startedApplyAt;
this.force = force;
}
@@ -704,7 +701,7 @@ public class Commands
@Override
public void accept(SafeCommandStore safeStore)
{
- postApply(safeStore, txnId, startedApplyAt, force);
+ postApply(safeStore, txnId, force);
}
@Override public TxnId primaryTxnId() { return txnId; }
@@ -722,14 +719,13 @@ public class Commands
// TODO (required, API): do we care about tracking the write
persistence latency, when this is just a memtable write?
// the only reason it will be slow is because Memtable flushes are
backed-up (which will be reported elsewhere)
// TODO (required): this is anyway non-monotonic and milliseconds
granularity
- long startedApplyAt = safeStore.node().elapsed(MICROSECONDS);
TxnId txnId = command.txnId();
//noinspection DataFlowIssue
safeStore = safeStore; // disable reuse
Participants<?> executes = command.participants().stillExecutes(); //
including any keys we aren't writing
return command.writes()
.apply(safeStore, executes, command.partialTxn())
- .then(head -> new PostApply<>(head, unsafeStore, txnId,
executes, startedApplyAt, false));
+ .then(head -> new PostApply<>(head, unsafeStore, txnId,
executes, false));
}
public static boolean maybeExecute(SafeCommandStore safeStore, SafeCommand
safeCommand, boolean alwaysNotifyListeners, boolean notifyWaitingOn)
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index e2828ddc..effd6b44 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -872,7 +872,7 @@ public class Node implements NodeCommandStoreService
long mostRecent = coordinations.mostRecent(txnId,
COORDINATES_STATE_MACHINE, ballot);
if (mostRecent < 0)
return false;
- long ageNanos = Math.max(elapsed(NANOSECONDS) - mostRecent, 0);
+ long ageNanos = Math.max(recentElapsed(NANOSECONDS) - mostRecent, 0);
return !agent.isSlowCoordinator(ageNanos, NANOSECONDS, txnId, 1);
}
diff --git a/accord-core/src/main/java/accord/local/TimeService.java
b/accord-core/src/main/java/accord/local/TimeService.java
index b9a918f2..9dc7f0de 100644
--- a/accord-core/src/main/java/accord/local/TimeService.java
+++ b/accord-core/src/main/java/accord/local/TimeService.java
@@ -38,6 +38,12 @@ public interface TimeService
*/
long elapsed(TimeUnit unit);
+ /**
+ * {@link #elapsed(TimeUnit)} but accepts a "recently" updated value; this
must be no earlier than the start
+ * of processing the relevant task.
+ */
+ default long recentElapsed(TimeUnit unit) { return elapsed(unit); }
+
static TimeService ofNonMonotonic(LongSupplier now, TimeUnit units)
{
return of(now, elapsedWrapperFromNonMonotonicSource(units, now));
diff --git a/accord-core/src/main/java/accord/local/cfk/Serialize.java
b/accord-core/src/main/java/accord/local/cfk/Serialize.java
index 659aeb2e..610f2431 100644
--- a/accord-core/src/main/java/accord/local/cfk/Serialize.java
+++ b/accord-core/src/main/java/accord/local/cfk/Serialize.java
@@ -132,12 +132,17 @@ public class Serialize
* TODO (desired): determine timestamp resolution as a factor of 10
*/
public static ByteBuffer toBytesWithoutKey(CommandsForKey cfk)
+ {
+ return toBytesWithoutKey(0, cfk);
+ }
+
+ public static ByteBuffer toBytesWithoutKey(int prefixBytes, CommandsForKey
cfk)
{
Invariants.requireArgument(!cfk.isLoadingPruned());
- return unsafeToBytesWithoutKey(cfk);
+ return unsafeToBytesWithoutKey(prefixBytes, cfk);
}
- private static ByteBuffer unsafeToBytesWithoutKey(CommandsForKey cfk)
+ private static ByteBuffer unsafeToBytesWithoutKey(int prefixBytes,
CommandsForKey cfk)
{
Invariants.require(!cfk.isLoadingPruned());
@@ -145,10 +150,11 @@ public class Serialize
if (commandCount == 0)
{
if (!cfk.hasMaxUniqueHlc())
- return ByteBuffer.allocate(1);
+ return ByteBuffer.allocate(prefixBytes + 1);
int size = 1 + VIntCoding.sizeOfUnsignedVInt(cfk.maxUniqueHlc);
- ByteBuffer result = ByteBuffer.allocate(size);
+ ByteBuffer result = ByteBuffer.allocate(prefixBytes + size);
+ result.position(prefixBytes);
VIntCoding.writeUnsignedVInt32(1, result);
VIntCoding.writeUnsignedVInt(cfk.maxUniqueHlc, result);
Invariants.require(!result.hasRemaining());
@@ -458,7 +464,8 @@ public class Serialize
totalBytes += sizeOfUnsignedVInt(unmanagedPendingCommitCount);
totalBytes += sizeOfUnsignedVInt(cfk.unmanagedCount() -
unmanagedPendingCommitCount);
- ByteBuffer out = ByteBuffer.allocate(totalBytes);
+ ByteBuffer out = ByteBuffer.allocate(prefixBytes + totalBytes);
+ out.position(prefixBytes);
VIntCoding.writeUnsignedVInt32(commandCount + 1, out);
VIntCoding.writeUnsignedVInt32(nodeIdCount, out);
VIntCoding.writeUnsignedVInt32(nodeIds[0], out);
@@ -756,11 +763,18 @@ public class Serialize
}
public static CommandsForKey fromBytes(RoutingKey key, ByteBuffer in)
+ {
+ return fromBytes(key, in, true);
+ }
+
+ public static CommandsForKey fromBytes(RoutingKey key, ByteBuffer in,
boolean duplicate)
{
if (!in.hasRemaining())
return null;
- in = in.duplicate();
+ if (duplicate)
+ in = in.duplicate();
+
int commandCount = VIntCoding.readUnsignedVInt32(in) - 1;
if (commandCount <= 0)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]