This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new f40826db Accord Query Tracing Also improve: - Metric listeners f40826db is described below commit f40826db9571af286a689deada1613ffc872e5f8 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Mon Jun 30 13:29:06 2025 +0100 Accord Query Tracing Also improve: - Metric listeners patch by Benedict; reviewed by David Capwell for CASSANDRA-20773 --- accord-core/src/main/java/accord/api/Agent.java | 26 ++---- .../java/accord/api/CoordinatorEventListener.java | 11 +-- .../src/main/java/accord/api/EventListener.java | 84 ----------------- .../main/java/accord/api/LocalEventListener.java | 91 ++++++++++++++++++ .../coordinate/AbstractCoordinatePreAccept.java | 10 -- .../accord/coordinate/CoordinateSyncPoint.java | 8 +- .../accord/coordinate/CoordinateTransaction.java | 15 +-- .../java/accord/coordinate/ExecuteSyncPoint.java | 21 +++-- .../main/java/accord/coordinate/ExecuteTxn.java | 3 +- .../main/java/accord/coordinate/MaybeRecover.java | 2 +- .../src/main/java/accord/coordinate/Persist.java | 3 +- .../src/main/java/accord/coordinate/Propose.java | 9 +- .../src/main/java/accord/coordinate/Recover.java | 10 +- .../main/java/accord/impl/AbstractReplayer.java | 15 ++- .../impl/progresslog/DefaultProgressLog.java | 22 +++-- .../src/main/java/accord/local/Commands.java | 102 +++++++++++++-------- .../java/accord/messages/BeginInvalidation.java | 2 +- .../main/java/accord/messages/InformDurable.java | 4 +- .../src/main/java/accord/messages/ReadData.java | 2 + .../src/test/java/accord/impl/TestAgent.java | 21 ++--- .../src/test/java/accord/impl/list/ListAgent.java | 18 +++- .../java/accord/local/cfk/CommandsForKeyTest.java | 6 -- .../main/java/accord/maelstrom/MaelstromAgent.java | 12 ++- 23 files changed, 266 insertions(+), 231 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Agent.java b/accord-core/src/main/java/accord/api/Agent.java index 89016626..4dae236c 100644 --- a/accord-core/src/main/java/accord/api/Agent.java +++ b/accord-core/src/main/java/accord/api/Agent.java @@ -40,22 +40,11 @@ import accord.utils.async.AsyncChain; /** * Facility for augmenting node behaviour at specific points - * - * TODO (expected): rationalise LocalConfig and Agent */ public interface Agent extends UncaughtExceptionListener { default @Nullable Tracing trace(TxnId txnId, TraceEventType eventType) { return null; } - /** - * For use by implementations to decide what to do about successfully recovered transactions. - * Specifically intended to define if and how they should inform clients of the result. - * e.g. in Maelstrom we send the full result directly, in other impls we may simply acknowledge success via the coordinator - * - * Note: may be invoked multiple times in different places - */ - void onRecover(Node node, Result success, Throwable fail); - /** * For use by implementations to decide what to do about timestamp inconsistency, i.e. two different timestamps * committed for the same transaction. This is a protocol consistency violation, potentially leading to non-linearizable @@ -70,6 +59,16 @@ public interface Agent extends UncaughtExceptionListener void onStale(Timestamp staleSince, Ranges ranges); + default CoordinatorEventListener coordinatorEvents() + { + return CoordinatorEventListener.NOOP; + } + + default LocalEventListener localEvents() + { + return LocalEventListener.NOOP; + } + @Override void onUncaughtException(Throwable t); void onCaughtException(Throwable t, String context); @@ -112,11 +111,6 @@ public interface Agent extends UncaughtExceptionListener */ Txn emptySystemTxn(Kind kind, Domain domain); - default EventListener eventListener() - { - return EventListener.NOOP; - } - /** * For each shard, select a small number of replicas that should be preferred for listening to progress updates * from {@code from}. This should be 1-2 nodes that will be contacted preferentially for progress to minimise diff --git a/accord-core/src/main/java/accord/api/CoordinatorEventListener.java b/accord-core/src/main/java/accord/api/CoordinatorEventListener.java index e44a7073..ed4dff11 100644 --- a/accord-core/src/main/java/accord/api/CoordinatorEventListener.java +++ b/accord-core/src/main/java/accord/api/CoordinatorEventListener.java @@ -27,22 +27,17 @@ import accord.primitives.Deps; import accord.primitives.Status.Durability; import accord.primitives.TxnId; -// TODO (required): revisit the call-sites, and boundary with C* public interface CoordinatorEventListener { - default void onPreAccepted(TxnId txnId, Deps deps, boolean isStable) + default void onPreAccepted(TxnId txnId) { } - default void onAccepted(TxnId txnId, Ballot ballot, Deps deps, boolean isStable) + default void onAccepted(TxnId txnId, Ballot ballot) { } - default void onStabilised(TxnId txnId, Ballot ballot, Deps deps) - { - } - - default void onExecuting(TxnId txnId, @Nullable Ballot ballot, @Nullable ExecutePath path) + default void onExecuting(TxnId txnId, @Nullable Ballot ballot, Deps deps, @Nullable ExecutePath path) { } diff --git a/accord-core/src/main/java/accord/api/EventListener.java b/accord-core/src/main/java/accord/api/EventListener.java deleted file mode 100644 index e1ed5089..00000000 --- a/accord-core/src/main/java/accord/api/EventListener.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.api; - -import accord.local.Command; -import accord.primitives.Deps; -import accord.primitives.Timestamp; -import accord.primitives.TxnId; - -// TODO (required): revisit the call-sites, and boundary with C* -public interface EventListener -{ - default void onCommitted(Command cmd) - { - } - - default void onStable(Command cmd) - { - } - - default void onPreApplied(Command cmd) - { - } - - default void onApplied(Command cmd, long applyStartTimestamp) - { - } - - default void onFastPathTaken(TxnId txnId, Deps deps) - { - } - - default void onMediumPathTaken(TxnId txnId, Deps deps) - { - } - - default void onSlowPathTaken(TxnId txnId, Deps deps) - { - } - - default void onRecover(TxnId txnId, Timestamp recoveryTimestamp) - { - } - - default void onPreempted(TxnId txnId) - { - } - - default void onTimeout(TxnId txnId) - { - } - - default void onInvalidated(TxnId txnId) - { - } - - default void onRejected(TxnId txnId) - { - } - - default void onProgressLogSizeChange(TxnId txnId, int delta) - { - } - - EventListener NOOP = new EventListener() - { - }; -} diff --git a/accord-core/src/main/java/accord/api/LocalEventListener.java b/accord-core/src/main/java/accord/api/LocalEventListener.java new file mode 100644 index 00000000..662b00af --- /dev/null +++ b/accord-core/src/main/java/accord/api/LocalEventListener.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.api; + +import accord.local.Command; +import accord.local.SafeCommandStore; +import accord.primitives.SaveStatus; + +public interface LocalEventListener +{ + default void onRejectPreAccept(SafeCommandStore safeStore, Command cmd, Object reason) + { + } + + default void onPreAccepted(SafeCommandStore safeStore, Command cmd) + { + } + + default void onRejectPreNotAccept(SafeCommandStore safeStore, Command cmd, Object reason) + { + } + + default void onPreNotAccepted(SafeCommandStore safeStore, Command cmd) + { + } + + default void onRejectAccept(SafeCommandStore safeStore, Command cmd, Object reason) + { + } + + default void onAccepted(SafeCommandStore safeStore, Command cmd) + { + } + + default void onRejectNotAccept(SafeCommandStore safeStore, Command cmd, Object reason) + { + } + + default void onNotAccepted(SafeCommandStore safeStore, Command cmd) + { + } + + default void onRejectCommitOrStable(SafeCommandStore safeStore, SaveStatus commitOrStable, Command cmd, Object reason) + { + } + + default void onReadWaiting(SafeCommandStore safeStore, Command cmd) + { + } + + default void onReadStarted(SafeCommandStore safeStore, Command cmd) + { + } + + default void onCommitted(SafeCommandStore safeStore, Command cmd) + { + } + + default void onStable(SafeCommandStore safeStore, Command cmd) + { + } + + default void onPreApplied(SafeCommandStore safeStore, Command cmd) + { + } + + // t0 may be less than zero, indicating it has not been populated + default void onApplied(SafeCommandStore safeStore, Command cmd, long startedApplyAt) + { + } + + LocalEventListener NOOP = new LocalEventListener() + { + }; +} diff --git a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java index 5a9cb827..6fd2d0b6 100644 --- a/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java +++ b/accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java @@ -114,16 +114,6 @@ abstract class AbstractCoordinatePreAccept<T, R> implements Callback<R> // we may already be complete, as we may receive a failure from a later phase; but it's fine to redundantly mark done isDone = true; callback.accept(null, failure); - if (failure instanceof CoordinationFailed) - { - ((CoordinationFailed) failure).set(txnId, route.homeKey()); - if (failure instanceof Timeout) - node.agent().eventListener().onTimeout(txnId); - else if (failure instanceof Preempted) - node.agent().eventListener().onPreempted(txnId); - else if (failure instanceof Invalidated) - node.agent().eventListener().onInvalidated(txnId); - } } final void onPreAcceptedOrNewEpoch() diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java index 96e88457..c3321567 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java @@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory; import accord.api.Result; import accord.coordinate.CoordinationAdapter.Adapters; import accord.coordinate.CoordinationAdapter.Adapters.SyncPointAdapter; -import accord.coordinate.ExecuteFlag.CoordinationFlags; import accord.coordinate.ExecuteFlag.ExecuteFlags; import accord.local.Node; import accord.local.SequentialAsyncExecutor; @@ -54,7 +53,6 @@ import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; import accord.utils.async.AsyncResults.SettableByCallback; -import static accord.coordinate.ExecutePath.FAST; import static accord.coordinate.Propose.NotAccept.proposeAndCommitInvalidate; import static accord.messages.Apply.Kind.Maximal; import static accord.messages.Apply.participates; @@ -160,6 +158,7 @@ public class CoordinateSyncPoint<R> extends CoordinatePreAccept<R> { if (executeAt.is(REJECTED)) { + node.agent().coordinatorEvents().onRejected(txnId); proposeAndCommitInvalidate(node, executor, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, callback); } else @@ -168,9 +167,8 @@ public class CoordinateSyncPoint<R> extends CoordinatePreAccept<R> if (txnId.is(ExclusiveSyncPoint) && txnId.epoch() == executeAt.epoch()) withFlags = txnId.addFlag(HLC_BOUND); Deps deps = Deps.merge(oks.valuesAsNullableList(), oks.domainSize(), List::get, ok -> ok.deps); - if (tracker.hasFastPathAccepted()) - adapter.execute(node, executor, topologies, route, Ballot.ZERO, FAST, CoordinationFlags.none(), txnId, txn, withFlags, deps, deps, callback); - else if (tracker.hasMediumPathAccepted()) + node.agent().coordinatorEvents().onPreAccepted(txnId); + if (tracker.hasMediumPathAccepted() && txnId.hasMediumPath()) adapter.propose(node, executor, topologies, route, Accept.Kind.MEDIUM, Ballot.ZERO, txnId, txn, withFlags, deps, callback); else adapter.propose(node, executor, topologies, route, Accept.Kind.SLOW, Ballot.ZERO, txnId, txn, executeAt, deps, callback); diff --git a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java index c8963918..75d57c3a 100644 --- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java +++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java @@ -39,7 +39,6 @@ import accord.local.SafeCommand; import accord.local.SafeCommandStore; import accord.local.SequentialAsyncExecutor; import accord.local.StoreParticipants; -import accord.messages.Accept; import accord.messages.PreAccept; import accord.messages.PreAccept.PreAcceptNack; import accord.messages.PreAccept.PreAcceptReply; @@ -69,6 +68,8 @@ import static accord.coordinate.ExecuteFlag.CoordinationFlags.empty; import static accord.coordinate.ExecutePath.FAST; import static accord.coordinate.Propose.NotAccept.proposeAndCommitInvalidate; import static accord.local.Commands.AcceptOutcome.Success; +import static accord.messages.Accept.Kind.MEDIUM; +import static accord.messages.Accept.Kind.SLOW; import static accord.primitives.Timestamp.Flag.REJECTED; import static accord.primitives.TxnId.FastPath.PrivilegedCoordinatorWithDeps; import static java.util.concurrent.TimeUnit.MICROSECONDS; @@ -128,8 +129,8 @@ public class CoordinateTransaction extends CoordinatePreAccept<Result> // note: we merge all Deps regardless of witnessedAt. While we only need fast path votes, // we must include Deps from fast path votes from earlier epochs that may have witnessed later transactions // TODO (desired): we might mask some bugs by merging more responses than we strictly need, so optimise this to optionally merge minimal deps + node.agent().coordinatorEvents().onPreAccepted(txnId); executeAdapter().execute(node, executor, topologies, route, Ballot.ZERO, FAST, flags, txnId, txn, txnId, deps, deps, callback); - node.agent().eventListener().onFastPathTaken(txnId, deps); return; } } @@ -138,21 +139,21 @@ public class CoordinateTransaction extends CoordinatePreAccept<Result> Deps deps = mergeFastOrMediumDeps(oks); if (deps != null) { - proposeAdapter().propose(node, executor, topologies, route, Accept.Kind.MEDIUM, Ballot.ZERO, txnId, txn, txnId, deps, callback); - node.agent().eventListener().onMediumPathTaken(txnId, deps); + node.agent().coordinatorEvents().onPreAccepted(txnId); + proposeAdapter().propose(node, executor, topologies, route, MEDIUM, Ballot.ZERO, txnId, txn, txnId, deps, callback); return; } } else if (executeAt.is(REJECTED)) { proposeAndCommitInvalidate(node, executor, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, callback); - node.agent().eventListener().onRejected(txnId); + node.agent().coordinatorEvents().onRejected(txnId); return; } Deps deps = Deps.merge(oks.valuesAsNullableList(), oks.domainSize(), List::get, ok -> ok.deps); - proposeAdapter().propose(node, executor, topologies, route, Accept.Kind.SLOW, Ballot.ZERO, txnId, txn, executeAt, deps, callback); - node.agent().eventListener().onSlowPathTaken(txnId, deps); + node.agent().coordinatorEvents().onPreAccepted(txnId); + proposeAdapter().propose(node, executor, topologies, route, SLOW, Ballot.ZERO, txnId, txn, executeAt, deps, callback); } private Deps mergeFastOrMediumDeps(SortedListMap<?, PreAcceptOk> oks) diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java index ce0d64fa..a37ff476 100644 --- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java +++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java @@ -111,6 +111,17 @@ public class ExecuteSyncPoint extends SettableResult<DurabilityResult> implement return onQuorum; } + protected void start() + { + node.agent().coordinatorEvents().onExecuting(syncPoint.syncId, null, syncPoint.waitFor, null); + SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty(); + // TODO (desired): special Apply message that doesn't resend deps if path=MEDIUM + Txn txn = node.agent().emptySystemTxn(Txn.Kind.ExclusiveSyncPoint, syncPoint.syncId.domain()); + Result result = txn.result(syncPoint.syncId, syncPoint.executeAt, null); + if (contact == null) tryFailure(new Exhausted(syncPoint.syncId, syncPoint.route.homeKey(), null)); + else node.send(contact, to -> new ApplyThenWaitUntilApplied(to, tracker.topologies(), syncPoint.executeAt, tracker.topologies().currentEpoch(), syncPoint.route, syncPoint.syncId, txn, syncPoint.waitFor, syncPoint.route, null, result), executor, this); + } + @Override public void onSuccess(Node.Id from, ReadReply reply) { @@ -269,14 +280,4 @@ public class ExecuteSyncPoint extends SettableResult<DurabilityResult> implement return fail; } } - - protected void start() - { - SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty(); - // TODO (desired): special Apply message that doesn't resend deps if path=MEDIUM - Txn txn = node.agent().emptySystemTxn(Txn.Kind.ExclusiveSyncPoint, syncPoint.syncId.domain()); - Result result = txn.result(syncPoint.syncId, syncPoint.executeAt, null); - if (contact == null) tryFailure(new Exhausted(syncPoint.syncId, syncPoint.route.homeKey(), null)); - else node.send(contact, to -> new ApplyThenWaitUntilApplied(to, tracker.topologies(), syncPoint.executeAt, tracker.topologies().currentEpoch(), syncPoint.route, syncPoint.syncId, txn, syncPoint.waitFor, syncPoint.route, null, result), executor, this); - } } diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java index 9c352de4..fe722a6d 100644 --- a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java +++ b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java @@ -98,7 +98,7 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply> ExecuteTxn(Node node, SequentialAsyncExecutor executor, Topologies topologies, FullRoute<?> route, Ballot ballot, ExecutePath path, CoordinationFlags flags, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps, Deps sendDeps, BiConsumer<? super Result, Throwable> callback) { super(node, executor, topologies.forEpoch(executeAt.epoch()), txnId); - this.path = path; + this.path = ballot == Ballot.ZERO ? path : RECOVER; this.txn = txn; this.route = route; this.ballot = ballot; @@ -116,6 +116,7 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply> @Override protected void startOnceInitialised() { + node.agent().coordinatorEvents().onExecuting(txnId, ballot, stableDeps, path); Node.Id self = node.id(); if (permitLocalExecution() && tryIfUniversal(self)) { diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java index 755e5d16..5c1fc3bf 100644 --- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java +++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java @@ -136,7 +136,7 @@ public class MaybeRecover extends CheckShards<Route<?>> if (tracing != null) tracing.trace(null, "MaybeRecover found %s; reporting progress token %s", hasMadeProgress(full) ? "progress" : "no route", progressToken); if (full.durability.isDurable()) - InformDurable.informDefault(node, topologies, txnId, query, full.executeAtIfKnown(), full.durability); + InformDurable.informDefault(node, topologies, txnId, query, bumpBallot, full.executeAtIfKnown(), full.durability); callback.accept(full.toProgressToken(), null); } else diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java b/accord-core/src/main/java/accord/coordinate/Persist.java index d80a7ca9..cd0d4b01 100644 --- a/accord-core/src/main/java/accord/coordinate/Persist.java +++ b/accord-core/src/main/java/accord/coordinate/Persist.java @@ -99,7 +99,7 @@ public abstract class Persist implements Callback<ApplyReply> // since we should only invoke Persist with sendTo != route when the remainder of the route is already persisted and truncated // but we make this explicit for the caller with informDurableOnDone isDone = true; - InformDurable.informDefault(node, topologies, txnId, route, executeAt, Majority); + InformDurable.informDefault(node, topologies, txnId, route, ballot, executeAt, Majority); } case RaceWithRecovery: // don't count this towards durability; otherwise it is possible (though very unlikely) @@ -128,6 +128,7 @@ public abstract class Persist implements Callback<ApplyReply> public void start(Apply.Kind kind, Topologies all, Writes writes, Result result) { + node.agent().coordinatorEvents().onExecuted(txnId, ballot); // applyMinimal is used for transaction execution by the original coordinator so it's important to use // Node's Apply factory in case the factory has to do synchronous Apply. SortedArrays.SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty(); diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java b/accord-core/src/main/java/accord/coordinate/Propose.java index c59d9839..7cef6754 100644 --- a/accord-core/src/main/java/accord/coordinate/Propose.java +++ b/accord-core/src/main/java/accord/coordinate/Propose.java @@ -21,6 +21,7 @@ package accord.coordinate; import java.util.Map; import java.util.function.BiConsumer; +import accord.api.CoordinatorEventListener; import accord.api.ProtocolModifiers.Faults; import accord.api.RoutingKey; import accord.coordinate.ExecuteFlag.CoordinationFlags; @@ -197,9 +198,10 @@ abstract class Propose<R> implements Callback<AcceptReply> // In this case either id' needs to wait (which requires potentially more states like the alternative medium path) // Or we must pick it up as an Unstable dependency here. Deps newDeps = mergeNewDeps(); - Deps stableDeps = mergeDeps(newDeps); - if (kind == Kind.MEDIUM) adapter().execute(node, executor, acceptTracker.topologies(), route, ballot, MEDIUM, CoordinationFlags.none(), txnId, txn, executeAt, stableDeps, newDeps, callback); - else adapter().stabilise(node, executor, acceptTracker.topologies(), route, ballot, txnId, txn, executeAt, stableDeps, callback); + Deps deps = mergeDeps(newDeps); + node.agent().coordinatorEvents().onAccepted(txnId, ballot); + if (kind == Kind.MEDIUM) adapter().execute(node, executor, acceptTracker.topologies(), route, ballot, MEDIUM, CoordinationFlags.none(), txnId, txn, executeAt, deps, newDeps, callback); + else adapter().stabilise(node, executor, acceptTracker.topologies(), route, ballot, txnId, txn, executeAt, deps, callback); if (!Invariants.debug()) acceptOks.clear(); } @@ -288,6 +290,7 @@ abstract class Propose<R> implements Callback<AcceptReply> } else { + node.agent().coordinatorEvents().onInvalidated(txnId); node.withEpochExact(invalidateUntil.epoch(), executor, callback, t -> WrappableException.wrap(t), () -> { commitInvalidate(node, txnId, commitInvalidationTo, invalidateUntil); callback.accept(null, new Invalidated(txnId, invalidateWithParticipant)); diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java index b0be4183..0ea7d252 100644 --- a/accord-core/src/main/java/accord/coordinate/Recover.java +++ b/accord-core/src/main/java/accord/coordinate/Recover.java @@ -151,7 +151,6 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw if (failure == null) { callback.accept(ProgressToken.APPLIED, null); - node.agent().eventListener().onRecover(txnId, ballot); } else if (failure instanceof Redundant) { @@ -163,15 +162,9 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw else { callback.accept(null, WrappableException.wrap(failure)); - if (failure instanceof Preempted) - node.agent().eventListener().onPreempted(txnId); - else if (failure instanceof Timeout) - node.agent().eventListener().onTimeout(txnId); - else if (failure instanceof Invalidated) // TODO (expected): should we tick this counter? we haven't invalidated anything - node.agent().eventListener().onInvalidated(txnId); } - node.agent().onRecover(node, result, failure); + node.agent().coordinatorEvents().onRecoveryStopped(node, txnId, ballot, result, failure); } public static Recover recover(Node node, TxnId txnId, Txn txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback, @Nullable Tracing tracing) @@ -215,6 +208,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw void start(Collection<Id> nodes) { + node.agent().coordinatorEvents().onRecoveryStarted(txnId, ballot); node.send(nodes, to -> new BeginRecovery(to, tracker.topologies(), txnId, committedExecuteAt, txn, route, ballot), executor, this); } diff --git a/accord-core/src/main/java/accord/impl/AbstractReplayer.java b/accord-core/src/main/java/accord/impl/AbstractReplayer.java index 88281e8c..2fd0830d 100644 --- a/accord-core/src/main/java/accord/impl/AbstractReplayer.java +++ b/accord-core/src/main/java/accord/impl/AbstractReplayer.java @@ -77,15 +77,12 @@ public abstract class AbstractReplayer implements Journal.Replayer { CommandStore unsafeStore = safeStore.commandStore(); Participants<?> executes = command.participants().stillExecutes(); - if (!executes.isEmpty()) - { - command.writes() - .apply(safeStore, executes, command.partialTxn()) - .invoke(() -> unsafeStore.build(PreLoadContext.contextFor(txnId, "Replay"), ss -> { - Commands.postApply(ss, txnId, -1, true); - })) - .begin(safeStore.agent()); - } + command.writes() + .apply(safeStore, executes, command.partialTxn()) + .invoke(() -> unsafeStore.build(PreLoadContext.contextFor(txnId, "Replay"), ss -> { + Commands.postApply(ss, txnId, -1, true); + })) + .begin(safeStore.agent()); } else Invariants.expect(command.hasBeen(Applied)); } diff --git a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java index 48a9a3d6..be43381c 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java +++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java @@ -150,7 +150,6 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor if (result == null) { Invariants.require(debugDeleted == null || !debugDeleted.containsKey(txnId)); - node.agent().eventListener().onProgressLogSizeChange(txnId, 1); stateMap = BTree.update(stateMap, BTree.singleton(result = new TxnState(txnId)), TxnState::compareTo); } return result; @@ -159,7 +158,6 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor private TxnState insert(TxnId txnId) { Invariants.require(debugDeleted == null || !debugDeleted.containsKey(txnId)); - node.agent().eventListener().onProgressLogSizeChange(txnId, 1); TxnState result = new TxnState(txnId); stateMap = BTree.update(stateMap, BTree.singleton(result), TxnState::compareTo); return result; @@ -395,10 +393,7 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor void remove(TxnId txnId) { - Object[] newStateMap = BTreeRemoval.<TxnId, TxnState>remove(stateMap, (id, s) -> id.compareTo(s.txnId), txnId); - if (stateMap != newStateMap) - node.agent().eventListener().onProgressLogSizeChange(txnId, -1); - stateMap = newStateMap; + stateMap = BTreeRemoval.<TxnId, TxnState>remove(stateMap, (id, s) -> id.compareTo(s.txnId), txnId); if (debugDeleted != null) debugDeleted.put(txnId, Thread.currentThread().getStackTrace()); } @@ -824,6 +819,21 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor return maxConcurrency; } + public int size() + { + return BTree.size(stateMap); + } + + public int pendingHome() + { + return pendingHome.size(); + } + + public int pendingWaiting() + { + return pendingWaiting.size(); + } + public ImmutableView immutableView() { return new ImmutableView(commandStore.id(), stateMap); diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index cd511b8e..2f843fd7 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -156,28 +156,39 @@ public class Commands private static AcceptOutcome preacceptOrRecover(SafeCommandStore safeStore, SafeCommand safeCommand, StoreParticipants participants, SaveStatus newSaveStatus, TxnId txnId, Txn txn, @Nullable Deps deps, Ballot ballot) { final Command command = safeCommand.current(); - if (command.hasBeen(Truncated)) + int compareBallots = command.promised().compareTo(ballot); + if (command.hasBeen(Truncated) || compareBallots > 0) { - logger.trace("{}: skipping preaccept - command is truncated", txnId); - return command.is(Invalidated) ? AcceptOutcome.RejectedBallot : participants.owns().isEmpty() + AcceptOutcome outcome = !command.hasBeen(Truncated) + ? AcceptOutcome.RejectedBallot + : command.is(Invalidated) + ? AcceptOutcome.RejectedBallot + : participants.owns().isEmpty() ? AcceptOutcome.Retired : AcceptOutcome.Truncated; - } - int compareBallots = command.promised().compareTo(ballot); - if (compareBallots > 0) - { - logger.trace("{}: skipping preaccept - higher ballot witnessed ({})", txnId, command.promised()); - return AcceptOutcome.RejectedBallot; + logger.trace("{}: skipping preaccept - {}", txnId, outcome); + safeStore.agent().localEvents().onRejectPreAccept(safeStore, command, outcome); + return outcome; } if (command.known().definition().isKnown()) { Invariants.require(command.status() == Invalidated || command.executeAt() != null); - logger.trace("{}: skipping preaccept - already known ({})", txnId, command.status()); // in case of Ballot.ZERO, we must either have a competing recovery coordinator or have late delivery of the // preaccept; in the former case we should abandon coordination, and in the latter we have already completed - safeCommand.updatePromised(ballot); - return ballot.equals(Ballot.ZERO) ? AcceptOutcome.Redundant : AcceptOutcome.Success; + AcceptOutcome outcome; + if (ballot.equals(Ballot.ZERO)) + { + return AcceptOutcome.Redundant; + } + else + { + safeCommand.updatePromised(ballot); + outcome = AcceptOutcome.Success; + } + logger.trace("{}: skipping preaccept - {}", txnId, outcome); + safeStore.agent().localEvents().onRejectPreAccept(safeStore, command, outcome); + return outcome; } if (command.known().deps().hasProposedOrDecidedDeps()) participants = command.participants().supplement(participants); @@ -214,27 +225,24 @@ public class Commands safeCommand.markDefined(safeStore, participants, ballot, partialTxn); } + safeStore.agent().localEvents().onPreAccepted(safeStore, command); safeStore.notifyListeners(safeCommand, command); return AcceptOutcome.Success; } - public static boolean preacceptInvalidate(SafeCommand safeCommand, Ballot ballot) + public static boolean preacceptInvalidate(SafeCommandStore safeStore, SafeCommand safeCommand, Ballot ballot) { Command command = safeCommand.current(); - if (command.hasBeen(Status.Committed)) + if (command.hasBeen(Status.Committed) || command.promised().compareTo(ballot) > 0) { - if (command.is(Truncated)) logger.trace("{}: skipping preacceptInvalidate - already truncated", command.txnId()); - else if (command.is(Invalidated)) logger.trace("{}: skipping preacceptInvalidate - already invalidated", command.txnId()); - else logger.trace("{}: skipping preacceptInvalidate - already committed", command.txnId()); + AcceptOutcome outcome = command.hasBeen(Committed) ? AcceptOutcome.Redundant : AcceptOutcome.RejectedBallot; + logger.trace("{}: skipping preacceptInvalidate - {}", command.txnId(), outcome); + safeStore.agent().localEvents().onRejectPreNotAccept(safeStore, command, outcome); return false; } - if (command.promised().compareTo(ballot) > 0) - { - logger.trace("{}: skipping preacceptInvalidate - witnessed higher ballot ({})", command.txnId(), command.promised()); - return false; - } + safeStore.agent().localEvents().onPreNotAccepted(safeStore, command); safeCommand.updatePromised(ballot); return true; } @@ -274,7 +282,10 @@ public class Commands { AcceptOutcome reject = maybeRejectAccept(ballot, executeAt, command, false); if (reject != null) + { + safeStore.agent().localEvents().onRejectAccept(safeStore, command, reject); return reject; + } } SaveStatus newSaveStatus = SaveStatus.get(kind == Accept.Kind.MEDIUM ? Status.AcceptedMedium : Status.AcceptedSlow, command.known()); @@ -286,7 +297,8 @@ public class Commands PartialDeps partialDeps = prepareDeps(validated, participants, command, deps); participants = prepareParticipants(validated, participants, command); - safeCommand.accept(safeStore, newSaveStatus, participants, ballot, executeAt, partialTxn, partialDeps, ballot); + Command accepted = safeCommand.accept(safeStore, newSaveStatus, participants, ballot, executeAt, partialTxn, partialDeps, ballot); + safeStore.agent().localEvents().onAccepted(safeStore, accepted); safeStore.notifyListeners(safeCommand, command); return AcceptOutcome.Success; @@ -298,11 +310,15 @@ public class Commands { AcceptOutcome reject = maybeRejectAccept(ballot, null, command, true); if (reject != null) + { + safeStore.agent().localEvents().onRejectNotAccept(safeStore, command, reject); return reject; + } } logger.trace("{}: not accepted ({})", command.txnId(), status); - safeCommand.notAccept(safeStore, status, ballot); + Command notAccepted = safeCommand.notAccept(safeStore, status, ballot); + safeStore.agent().localEvents().onNotAccepted(safeStore, notAccepted); safeStore.notifyListeners(safeCommand, command); return AcceptOutcome.Success; } @@ -315,13 +331,20 @@ public class Commands { final Command command = safeCommand.current(); if (kind == StableFastPath && !command.promised().equals(Ballot.ZERO)) + { + safeStore.agent().localEvents().onRejectCommitOrStable(safeStore, newSaveStatus, command, CommitOutcome.Rejected); return CommitOutcome.Rejected; + } SaveStatus curStatus = command.saveStatus(); Invariants.requireArgument(newSaveStatus == SaveStatus.Committed || newSaveStatus == SaveStatus.Stable); if (newSaveStatus == SaveStatus.Committed && ballot.compareTo(command.promised()) < 0) - return curStatus.is(Truncated) || participants.owns().isEmpty() - ? CommitOutcome.Redundant : CommitOutcome.Rejected; + { + CommitOutcome outcome = curStatus.is(Truncated) || participants.owns().isEmpty() + ? CommitOutcome.Redundant : CommitOutcome.Rejected; + safeStore.agent().localEvents().onRejectCommitOrStable(safeStore, newSaveStatus, command, outcome); + return outcome; + } if (curStatus.hasBeen(PreCommitted)) { @@ -334,13 +357,17 @@ public class Commands if (curStatus.compareTo(newSaveStatus) > 0 || curStatus.hasBeen(Stable)) { logger.trace("{}: skipping commit - already newer or stable ({})", txnId, command.status()); + safeStore.agent().localEvents().onRejectCommitOrStable(safeStore, newSaveStatus, command, CommitOutcome.Redundant); return CommitOutcome.Redundant; } if (curStatus == SaveStatus.Committed && newSaveStatus == SaveStatus.Committed) { if (ballot.equals(command.acceptedOrCommitted())) + { + safeStore.agent().localEvents().onRejectCommitOrStable(safeStore, newSaveStatus, command, CommitOutcome.Redundant); return CommitOutcome.Redundant; + } Invariants.require(ballot.compareTo(command.acceptedOrCommitted()) > 0); } @@ -349,7 +376,10 @@ public class Commands participants = participants.filter(UPDATE, safeStore, txnId, executeAt); Validated validated = validate(ballot, newSaveStatus, command, participants, route, txn, deps, kind, executeAt); if (validated == INSUFFICIENT) + { + safeStore.agent().localEvents().onRejectCommitOrStable(safeStore, newSaveStatus, command, CommitOutcome.Insufficient); return CommitOutcome.Insufficient; + } PartialTxn partialTxn = prepareTxn(newSaveStatus, participants, command, txn); PartialDeps partialDeps = prepareDeps(validated, participants, command, deps); @@ -362,15 +392,15 @@ public class Commands { WaitingOn waitingOn = initialiseWaitingOn(safeStore, txnId, executeAt, participants, partialDeps); committed = safeCommand.stable(safeStore, participants, ballot, executeAt, partialTxn, partialDeps, waitingOn); - safeStore.agent().eventListener().onStable(committed); + safeStore.agent().localEvents().onStable(safeStore, committed); maybeExecute(safeStore, safeCommand, true, true); } else { Invariants.requireArgument(command.acceptedOrCommitted().compareTo(ballot) <= 0); committed = safeCommand.commit(safeStore, participants, ballot, executeAt, partialTxn, partialDeps); + safeStore.agent().localEvents().onCommitted(safeStore, committed); safeStore.notifyListeners(safeCommand, committed); - safeStore.agent().eventListener().onCommitted(committed); } return CommitOutcome.Success; @@ -553,7 +583,7 @@ public class Commands Command.Executed executed = safeCommand.preapplied(safeStore, participants, ballot, executeAt, partialTxn, partialDeps, waitingOn, writes, result); logger.trace("{}: preapplied", executed.txnId()); // must signal preapplied first, else we may be applied (and have cleared progress log state) already before maybeExecute exits - safeStore.agent().eventListener().onPreApplied(executed); + safeStore.agent().localEvents().onPreApplied(safeStore, executed); maybeExecute(safeStore, safeCommand, true, true); break; } @@ -561,7 +591,7 @@ public class Commands { Invariants.require(!waitingOn.isWaiting()); Command.Executed executed = safeCommand.applying(safeStore, participants, executeAt, partialTxn, partialDeps, waitingOn, writes, result); - safeStore.agent().eventListener().onPreApplied(executed); + safeStore.agent().localEvents().onPreApplied(safeStore, executed); safeStore.notifyListeners(safeCommand, command); logger.trace("{}: applying", executed.txnId()); applyChain(safeStore, executed).begin(safeStore.agent()); @@ -570,8 +600,8 @@ public class Commands case Applied: { Command.Executed executed = safeCommand.applied(safeStore, participants, executeAt, partialTxn, partialDeps, waitingOn, writes, result); - safeStore.agent().eventListener().onPreApplied(executed); - safeStore.agent().eventListener().onApplied(executed, -1); + safeStore.agent().localEvents().onPreApplied(safeStore, executed); + safeStore.agent().localEvents().onApplied(safeStore, executed, -1); safeStore.notifyListeners(safeCommand, command); break; } @@ -619,7 +649,7 @@ public class Commands } } - public static void postApply(SafeCommandStore safeStore, TxnId txnId, long t0, boolean forceApply) + public static void postApply(SafeCommandStore safeStore, TxnId txnId, long startedApplyAt, boolean forceApply) { SafeCommand safeCommand = safeStore.get(txnId); Command command = safeCommand.current(); @@ -628,7 +658,7 @@ public class Commands return; safeCommand.applied(safeStore, forceApply); - safeStore.agent().eventListener().onApplied(command, t0); + safeStore.agent().localEvents().onApplied(safeStore, command, startedApplyAt); safeStore.notifyListeners(safeCommand, command); } @@ -651,13 +681,13 @@ public class Commands // TODO (required, API): do we care about tracking the write persistence latency, when this is just a memtable write? // the only reason it will be slow is because Memtable flushes are backed-up (which will be reported elsewhere) // TODO (required): this is anyway non-monotonic and milliseconds granularity - long t0 = safeStore.node().elapsed(MICROSECONDS); + long startedApplyAt = safeStore.node().elapsed(MICROSECONDS); TxnId txnId = command.txnId(); Participants<?> executes = command.participants().stillExecutes(); // including any keys we aren't writing return command.writes().apply(safeStore, executes, command.partialTxn()) // TODO (expected): once we guarantee execution order KeyHistory can be ASYNC .flatMap(unused -> unsafeStore.build(contextFor(txnId, executes, SYNC, WRITE, "Post Apply"), ss -> { - postApply(ss, txnId, t0, false); + postApply(ss, txnId, startedApplyAt, false); return null; })); } diff --git a/accord-core/src/main/java/accord/messages/BeginInvalidation.java b/accord-core/src/main/java/accord/messages/BeginInvalidation.java index 747773e1..09e43460 100644 --- a/accord-core/src/main/java/accord/messages/BeginInvalidation.java +++ b/accord-core/src/main/java/accord/messages/BeginInvalidation.java @@ -77,7 +77,7 @@ public class BeginInvalidation extends AbstractRequest<BeginInvalidation.Invalid } else { - boolean promised = Commands.preacceptInvalidate(safeCommand, ballot); + boolean promised = Commands.preacceptInvalidate(safeStore, safeCommand, ballot); supersededBy = promised ? null : safeCommand.current().promised(); truncated = null; } diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java b/accord-core/src/main/java/accord/messages/InformDurable.java index d73ffde9..773c2dcc 100644 --- a/accord-core/src/main/java/accord/messages/InformDurable.java +++ b/accord-core/src/main/java/accord/messages/InformDurable.java @@ -26,6 +26,7 @@ import accord.local.Node.Id; import accord.local.PreLoadContext; import accord.local.SafeCommand; import accord.local.SafeCommandStore; +import accord.primitives.Ballot; import accord.primitives.Status; import accord.primitives.Status.Durability; import accord.local.StoreParticipants; @@ -75,8 +76,9 @@ public class InformDurable extends TxnRequest<Reply> implements PreLoadContext this.durability = durability; } - public static void informDefault(Node node, Topologies any, TxnId txnId, Route<?> route, Timestamp executeAt, Durability durability) + public static void informDefault(Node node, Topologies any, TxnId txnId, Route<?> route, @Nullable Ballot ballot, Timestamp executeAt, Durability durability) { + node.agent().coordinatorEvents().onDurable(durability, ballot, txnId); switch (informOfDurability()) { default: throw new AssertionError("Unhandled InformOfDurability: " + informOfDurability()); diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java index 6b77a1c6..cb214c6d 100644 --- a/accord-core/src/main/java/accord/messages/ReadData.java +++ b/accord-core/src/main/java/accord/messages/ReadData.java @@ -312,6 +312,7 @@ public abstract class ReadData implements PreLoadContext, Request, MapReduceCons int c = status.compareTo(SaveStatus.Stable); if (c < 0) safeStore.progressLog().waiting(HasStableDeps, safeStore, safeCommand, null, null, participants); else if (c > 0 && status.compareTo(executeOn().min) >= 0 && status.compareTo(SaveStatus.PreApplied) < 0) safeStore.progressLog().waiting(CanApply, safeStore, safeCommand, null, scope, null); + node.agent().localEvents().onReadWaiting(safeStore, command); return status.compareTo(SaveStatus.Stable) >= 0 ? null : Insufficient; case OBSOLETE: @@ -519,6 +520,7 @@ public abstract class ReadData implements PreLoadContext, Request, MapReduceCons else if (txnId.awaitsOnlyDeps()) this.executeAt = Timestamp.max(this.executeAt, executeAt); else Invariants.require(executeAt.equals(this.executeAt)); + node.agent().localEvents().onReadStarted(safeStore, command); if (executes.isEmpty()) readComplete(unsafeStore, null, unavailable); else beginRead(safeStore, executeAt, command.partialTxn(), executes) .begin(readCallback(unsafeStore, unavailable)); diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java b/accord-core/src/test/java/accord/impl/TestAgent.java index 8da92cc3..adb1a5a2 100644 --- a/accord-core/src/test/java/accord/impl/TestAgent.java +++ b/accord-core/src/test/java/accord/impl/TestAgent.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.Agent; +import accord.api.CoordinatorEventListener; import accord.api.ProgressLog; import accord.api.Result; import accord.impl.mock.MockStore; @@ -32,6 +33,7 @@ import accord.local.Node; import accord.local.SafeCommandStore; import accord.local.TimeService; import accord.messages.ReplyContext; +import accord.primitives.Ballot; import accord.primitives.Keys; import accord.primitives.Ranges; import accord.primitives.Routable.Domain; @@ -50,16 +52,21 @@ public class TestAgent implements Agent { private static final Logger logger = LoggerFactory.getLogger(TestAgent.class); - public static class RethrowAgent extends TestAgent + public static class RethrowAgent extends TestAgent implements CoordinatorEventListener { @Override - public void onRecover(Node node, Result success, Throwable fail) + public CoordinatorEventListener coordinatorEvents() + { + return this; + } + + @Override + public void onRecoveryStopped(Node node, TxnId txnId, Ballot ballot, Result success, Throwable fail) { if (fail != null) throw new AssertionError("Unexpected exception", fail); } - @Override public void onFailedBootstrap(int attempt, String phase, Ranges ranges, Runnable retry, Throwable failure) { @@ -80,14 +87,6 @@ public class TestAgent implements Agent } } - @Override - public void onRecover(Node node, Result success, Throwable fail) - { - // do nothing, intended for use by implementations to decide what to do about recovered transactions - // specifically if and how they should inform clients of the result - // e.g. in Maelstrom we send the full result directly, in other impls we may simply acknowledge success via the coordinator - } - @Override public void onInconsistentTimestamp(Command command, Timestamp prev, Timestamp next) { diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java index 24fde94f..56c81f77 100644 --- a/accord-core/src/test/java/accord/impl/list/ListAgent.java +++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java @@ -30,7 +30,8 @@ import java.util.function.LongSupplier; import javax.annotation.Nullable; -import accord.api.ProgressLog; +import accord.api.CoordinatorEventListener; +import accord.api.ProgressLog.BlockedUntil; import accord.api.Result; import accord.api.Scheduler; import accord.api.TraceEventType; @@ -49,6 +50,7 @@ import accord.local.Node; import accord.local.SafeCommandStore; import accord.local.TimeService; import accord.messages.ReplyContext; +import accord.primitives.Ballot; import accord.primitives.Keys; import accord.primitives.Ranges; import accord.primitives.Routable.Domain; @@ -71,7 +73,7 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -public class ListAgent implements InMemoryAgent +public class ListAgent implements InMemoryAgent, CoordinatorEventListener { final Scheduler scheduler; final RandomSource rnd; @@ -104,7 +106,7 @@ public class ListAgent implements InMemoryAgent } @Override - public void onRecover(Node node, Result success, Throwable fail) + public void onRecoveryStopped(Node node, TxnId txnId, Ballot ballot, Result success, Throwable fail) { if (fail != null) { @@ -140,6 +142,12 @@ public class ListAgent implements InMemoryAgent retryBootstrap.accept(retry); } + @Override + public CoordinatorEventListener coordinatorEvents() + { + return this; + } + @Override public void onStale(Timestamp staleSince, Ranges ranges) { @@ -208,13 +216,13 @@ public class ListAgent implements InMemoryAgent } @Override - public long slowReplicaDelay(Node node, SafeCommandStore safeStore, TxnId txnId, int retryCount, ProgressLog.BlockedUntil blockedUntil, TimeUnit units) + public long slowReplicaDelay(Node node, SafeCommandStore safeStore, TxnId txnId, int retryCount, BlockedUntil blockedUntil, TimeUnit units) { return units.convert(rnd.nextInt(100, 1000), MILLISECONDS); } @Override - public long slowAwaitDelay(Node node, SafeCommandStore safeStore, TxnId txnId, int retryCount, ProgressLog.BlockedUntil retrying, TimeUnit units) + public long slowAwaitDelay(Node node, SafeCommandStore safeStore, TxnId txnId, int retryCount, BlockedUntil retrying, TimeUnit units) { int retryDelay = Math.min(16, 1 << retryCount); return units.convert(retryDelay, SECONDS); diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java index 17cd3c25..6d1a84d3 100644 --- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java +++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java @@ -1026,12 +1026,6 @@ public class CommandsForKeyTest throw new UnsupportedOperationException(); } - @Override - public void onRecover(Node node, Result success, Throwable fail) - { - throw new UnsupportedOperationException(); - } - @Override public void onInconsistentTimestamp(Command command, Timestamp prev, Timestamp next) { diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java index 008ea3b0..a59731cd 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java @@ -21,6 +21,7 @@ package accord.maelstrom; import java.util.concurrent.TimeUnit; import accord.api.Agent; +import accord.api.CoordinatorEventListener; import accord.api.ProgressLog; import accord.api.Result; import accord.local.Command; @@ -28,6 +29,7 @@ import accord.local.Node; import accord.local.SafeCommandStore; import accord.local.TimeService; import accord.messages.ReplyContext; +import accord.primitives.Ballot; import accord.primitives.Keys; import accord.primitives.Ranges; import accord.primitives.Routable.Domain; @@ -43,12 +45,12 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; -public class MaelstromAgent implements Agent +public class MaelstromAgent implements Agent, CoordinatorEventListener { static final MaelstromAgent INSTANCE = new MaelstromAgent(); @Override - public void onRecover(Node node, Result success, Throwable fail) + public void onRecoveryStopped(Node node, TxnId txnId, Ballot ballot, Result success, Throwable fail) { if (fail != null) { @@ -79,6 +81,12 @@ public class MaelstromAgent implements Agent { } + @Override + public CoordinatorEventListener coordinatorEvents() + { + return this; + } + @Override public void onUncaughtException(Throwable t) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org