This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new fc14a154 Various fixes and improvements Improve: - InMemoryJournal
compaction should simulate files to ensure we compact the same cohorts of
records (so shadowing applied correctly) - Don't update CFK deps if already
known - avoid unnecessary heapification of LogGroupTimers - begin removal of
Guava (mostly unused) - consider medium path on fast path delayed - add
Seekables.indexOf to support faster serialization - unboxed Invariant.requires
variant - AsyncChains.addCallba [...]
fc14a154 is described below
commit fc14a154fd514d4ab40b37508fb9497f786835e0
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sat Mar 15 09:06:01 2025 +0000
Various fixes and improvements
Improve:
- InMemoryJournal compaction should simulate files to ensure we compact
the same cohorts of records (so shadowing applied correctly)
- Don't update CFK deps if already known
- avoid unnecessary heapification of LogGroupTimers
- begin removal of Guava (mostly unused)
- consider medium path on fast path delayed
- add Seekables.indexOf to support faster serialization
- unboxed Invariant.requires variant
- AsyncChains.addCallback -> invoke
Fix:
- Recovery must wait for earlier transactions on shards that have not yet
been accepted, even if we recover a fast Stable record on another shard
- only skip Dep calculation on PreAccept if vote is required by coordinator
- ReadTracker must slice Minimal the first unavailable collection
- If PreLoadContext cannot acquire shared caches, still consider existing
contents
- CFK: make sure to insert UNSTABLE into missing array
- Fix failed task stops DelayedCommandStore task queue processing further
tasks
- short circuit AbstractRanges.equals()
- Handle edge case where we can take fast/medium path but one of the Deps
replies contains a future TxnId
- update executeAtEpoch when retryInFutureEpoch
- Fix incorrect validation in validateMissing (should only validate
newInfo is not in missing when in witnessedBy; all other additions should be
included)
patch by Benedict; reviewed by David Capwell for CASSANDRA-20522
---
accord-core/src/main/java/accord/api/Journal.java | 2 +-
.../accord/coordinate/CoordinateTransaction.java | 71 ++++--
.../java/accord/coordinate/ExecuteSyncPoint.java | 4 +-
.../main/java/accord/coordinate/MaybeRecover.java | 2 +-
.../src/main/java/accord/coordinate/Recover.java | 131 ++++++----
.../coordinate/tracking/FastPathTracker.java | 11 +-
.../accord/coordinate/tracking/ReadTracker.java | 2 +-
.../accord/impl/AbstractConfigurationService.java | 12 +-
.../java/accord/impl/AbstractSafeCommandStore.java | 17 +-
.../src/main/java/accord/local/CommandStores.java | 2 +-
accord-core/src/main/java/accord/local/Node.java | 10 +-
.../main/java/accord/local/RedundantBefore.java | 2 +-
.../main/java/accord/local/cfk/CommandsForKey.java | 64 +++--
.../src/main/java/accord/local/cfk/Pruning.java | 4 +-
.../src/main/java/accord/local/cfk/Updating.java | 79 +++---
.../src/main/java/accord/local/cfk/Utils.java | 37 ++-
.../local/durability/ConcurrencyControl.java | 4 +-
.../accord/local/durability/DurabilityQueue.java | 2 +-
.../accord/local/durability/ShardDurability.java | 2 +-
.../src/main/java/accord/messages/Apply.java | 13 +-
.../main/java/accord/messages/BeginRecovery.java | 2 +-
.../src/main/java/accord/messages/PreAccept.java | 2 +-
.../java/accord/messages/SetGloballyDurable.java | 2 +-
.../main/java/accord/messages/SetShardDurable.java | 2 +-
.../java/accord/messages/WaitUntilApplied.java | 2 +
.../java/accord/primitives/AbstractRanges.java | 8 +
.../main/java/accord/primitives/LatestDeps.java | 14 +-
.../src/main/java/accord/primitives/Routables.java | 1 +
.../src/main/java/accord/primitives/Timestamp.java | 3 +
.../main/java/accord/topology/TopologyManager.java | 4 +-
.../src/main/java/accord/utils/Functions.java | 17 --
.../src/main/java/accord/utils/Invariants.java | 6 +
.../src/main/java/accord/utils/LogGroupTimers.java | 2 +-
.../main/java/accord/utils/PersistentField.java | 2 +-
.../java/accord/utils/async/AsyncCallbacks.java | 4 +-
.../main/java/accord/utils/async/AsyncChain.java | 40 ++-
.../main/java/accord/utils/async/AsyncChains.java | 43 +++-
.../main/java/accord/utils/async/AsyncResult.java | 10 +-
.../main/java/accord/utils/async/AsyncResults.java | 8 +-
.../src/main/java/accord/utils/btree/BTreeSet.java | 6 +-
.../src/test/java/accord/impl/basic/Cluster.java | 2 +-
.../accord/impl/basic/DelayedCommandStores.java | 4 +-
.../java/accord/impl/basic/InMemoryJournal.java | 272 +++++++++++++++++----
.../java/accord/impl/basic/LoggingJournal.java | 4 +-
.../accord/impl/list/ListFetchCoordinator.java | 2 +-
.../test/java/accord/impl/list/ListRequest.java | 2 +-
.../src/test/java/accord/local/CommandsTest.java | 2 +-
.../java/accord/utils/async/AsyncChainsTest.java | 10 +-
.../src/main/java/accord/maelstrom/Cluster.java | 2 +-
.../java/accord/maelstrom/MaelstromRequest.java | 2 +-
50 files changed, 619 insertions(+), 332 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/Journal.java
b/accord-core/src/main/java/accord/api/Journal.java
index a1246f76..1be59925 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -43,7 +43,7 @@ import org.agrona.collections.Int2ObjectHashMap;
*/
public interface Journal
{
- Journal start(Node node);
+ void start(Node node);
Command loadCommand(int store, TxnId txnId, RedundantBefore
redundantBefore, DurableBefore durableBefore);
Command.Minimal loadMinimal(int store, TxnId txnId, Load load,
RedundantBefore redundantBefore, DurableBefore durableBefore);
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
index aceb8ee1..b27eeac9 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
@@ -104,38 +104,57 @@ public class CoordinateTransaction extends
CoordinatePreAccept<Result>
{
if (tracker.hasFastPathAccepted())
{
- Deps deps = Deps.merge(oks.valuesAsNullableList(),
oks.domainSize(), List::get, ok -> ok.deps);
- ExecuteFlags executeFlags =
Functions.foldl(oks.valuesAsNullableList(), (ok, v) -> ok == null ? v :
v.and(ok.flags), ExecuteFlags.all());
- // note: we merge all Deps regardless of witnessedAt. While we
only need fast path votes,
- // we must include Deps from fast path votes from earlier epochs
that may have witnessed later transactions
- // TODO (desired): we might mask some bugs by merging more
responses than we strictly need, so optimise this to optionally merge minimal
deps
- executeAdapter().execute(node, topologies, route, FAST,
executeFlags, txnId, txn, txnId, deps, deps, settingCallback());
- node.agent().eventListener().onFastPathTaken(txnId, deps);
+ Deps deps = mergeFastOrMediumDeps(oks);
+ if (deps != null)
+ {
+ ExecuteFlags executeFlags =
Functions.foldl(oks.valuesAsNullableList(), (ok, v) -> ok == null ? v :
v.and(ok.flags), ExecuteFlags.all());
+ // note: we merge all Deps regardless of witnessedAt. While we
only need fast path votes,
+ // we must include Deps from fast path votes from earlier
epochs that may have witnessed later transactions
+ // TODO (desired): we might mask some bugs by merging more
responses than we strictly need, so optimise this to optionally merge minimal
deps
+ executeAdapter().execute(node, topologies, route, FAST,
executeFlags, txnId, txn, txnId, deps, deps, settingCallback());
+ node.agent().eventListener().onFastPathTaken(txnId, deps);
+ return;
+ }
}
else if (tracker.hasMediumPathAccepted() && txnId.hasMediumPath())
{
- Deps deps = Deps.merge(oks.valuesAsNullableList(),
oks.domainSize(), List::get, ok -> ok.deps);
- proposeAdapter().propose(node, topologies, route,
Accept.Kind.MEDIUM, Ballot.ZERO, txnId, txn, txnId, deps, this);
- node.agent().eventListener().onMediumPathTaken(txnId, deps);
- }
- else
- {
- // TODO (low priority, efficiency): perhaps don't submit Accept
immediately if we almost have enough for fast-path,
- // but by sending accept we rule
out hybrid fast-path
- // TODO (low priority, efficiency): if we receive an expired
response, perhaps defer to permit at least one other
- // node to respond before
invalidating
- if (executeAt.is(REJECTED))
+ Deps deps = mergeFastOrMediumDeps(oks);
+ if (deps != null)
{
- proposeAndCommitInvalidate(node, Ballot.ZERO, txnId,
route.homeKey(), route, executeAt,this);
- node.agent().eventListener().onRejected(txnId);
- }
- else
- {
- Deps deps = Deps.merge(oks.valuesAsNullableList(),
oks.domainSize(), List::get, ok -> ok.deps);
- proposeAdapter().propose(node, topologies, route,
Accept.Kind.SLOW, Ballot.ZERO, txnId, txn, executeAt, deps, this);
- node.agent().eventListener().onSlowPathTaken(txnId, deps);
+ proposeAdapter().propose(node, topologies, route,
Accept.Kind.MEDIUM, Ballot.ZERO, txnId, txn, txnId, deps, this);
+ node.agent().eventListener().onMediumPathTaken(txnId, deps);
+ return;
}
}
+ else if (executeAt.is(REJECTED))
+ {
+ proposeAndCommitInvalidate(node, Ballot.ZERO, txnId,
route.homeKey(), route, executeAt,this);
+ node.agent().eventListener().onRejected(txnId);
+ return;
+ }
+
+ Deps deps = Deps.merge(oks.valuesAsNullableList(), oks.domainSize(),
List::get, ok -> ok.deps);
+ proposeAdapter().propose(node, topologies, route, Accept.Kind.SLOW,
Ballot.ZERO, txnId, txn, executeAt, deps, this);
+ node.agent().eventListener().onSlowPathTaken(txnId, deps);
+ }
+
+ private Deps mergeFastOrMediumDeps(SortedListMap<?, PreAcceptOk> oks)
+ {
+ // we must merge all Deps replies from prior topologies, but from the
latest topology we can safely merge only those replies that voted for the fast
path
+ // TODO (desired): actually merge these topologies separately, rather
than just switching behaviour when multiple topologies
+ if (tracker.topologies().size() == 1)
+ return Deps.merge(oks.valuesAsNullableList(), oks.domainSize(),
List::get, ok -> ok.witnessedAt.equals(ok.txnId) ? ok.deps : null);
+
+ Deps deps = Deps.merge(oks.valuesAsNullableList(), oks.domainSize(),
List::get, ok -> ok.deps);
+ // it is possible that one of the earlier epochs that did not need to
vote for the fast path
+ // was also unable to compute valid dependencies, and returned a
future TxnId as a proxy.
+ // In this case while it is still in principle safe to propose the
fast path, it is simpler not to,
+ // as it permits us to maintain safety validation logic that detects
unsafe behaviour and execution will
+ // need to wait for the future transaction to be agreed anyway (so we
can use its dependency calculation).
+ if (deps.maxTxnId(txnId).compareTo(txnId) > 0)
+ return null;
+
+ return deps;
}
protected CoordinationAdapter<Result> proposeAdapter()
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index 0f37728c..79482b51 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -173,7 +173,7 @@ public class ExecuteSyncPoint extends
SettableResult<DurabilityResult> implement
{
node.withEpoch(retryInFutureEpoch, (ignore, failure) ->
tryFailure(WrappableException.wrap(failure)), () -> {
ExecuteSyncPoint continuation = new ExecuteSyncPoint(node,
syncPoint, node.topology().preciseEpochs(syncPoint.route(),
tracker.topologies().currentEpoch(), retryInFutureEpoch, SHARE),
excludeSuccess, executor, attempt, current());
- continuation.addCallback((success, failure) -> {
+ continuation.invoke((success, failure) -> {
if (failure == null) trySuccess(success);
else tryFailure(failure);
});
@@ -249,6 +249,6 @@ public class ExecuteSyncPoint extends
SettableResult<DurabilityResult> implement
{
SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
if (contact == null) tryFailure(new Exhausted(syncPoint.syncId,
syncPoint.route.homeKey(), null));
- else node.send(contact, to -> new WaitUntilApplied(to,
tracker.topologies(), syncPoint.syncId, syncPoint.route,
syncPoint.syncId.epoch()), executor, this);
+ else node.send(contact, to -> new WaitUntilApplied(to,
tracker.topologies(), syncPoint.syncId, syncPoint.route,
tracker.topologies().currentEpoch()), executor, this);
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index 81232d89..1a50486c 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -122,7 +122,7 @@ public class MaybeRecover extends CheckShards<Route<?>>
else
{
Invariants.require(Route.isFullRoute(someRoute),
"Require a full route but given %s", full.route);
- node.recover(txnId, full.invalidIf,
Route.castToFullRoute(someRoute), reportLowEpoch,
reportHighEpoch).addCallback(callback);
+ node.recover(txnId, full.invalidIf,
Route.castToFullRoute(someRoute), reportLowEpoch,
reportHighEpoch).invoke(callback);
}
break;
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java
b/accord-core/src/main/java/accord/coordinate/Recover.java
index d3d095d4..74e74f3a 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -67,6 +67,7 @@ import accord.utils.async.AsyncResults;
import static accord.api.ProgressLog.BlockedUntil.CommittedOrNotFastPathCommit;
import static accord.api.ProgressLog.BlockedUntil.HasCommittedDeps;
+import static accord.api.ProgressLog.BlockedUntil.HasDecidedExecuteAt;
import static accord.api.ProtocolModifiers.QuorumEpochIntersections;
import static accord.coordinate.CoordinationAdapter.Factory.Kind.Recovery;
import static accord.coordinate.ExecutePath.RECOVER;
@@ -253,18 +254,21 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
if (acceptOrCommitNotTruncated != null)
{
- Status status = acceptOrCommitNotTruncated.status;
Timestamp executeAt = acceptOrCommitNotTruncated.executeAt;
- if (committedExecuteAt != null)
- {
-
Invariants.require(acceptOrCommitNotTruncated.status.compareTo(Status.PreCommitted)
< 0 || executeAt.equals(committedExecuteAt));
- // if we know from a prior Accept attempt that this is
committed we can go straight to the commit phase
- if (status == AcceptedMedium || status == AcceptedSlow)
- status = Status.Committed;
+ Status status; {
+ Status tmp = acceptOrCommitNotTruncated.status;
+ if (committedExecuteAt != null)
+ {
+
Invariants.require(acceptOrCommitNotTruncated.status.compareTo(Status.PreCommitted)
< 0 || executeAt.equals(committedExecuteAt));
+ // if we know from a prior Accept attempt that this is
committed we can go straight to the commit phase
+ if (tmp == AcceptedMedium || tmp == AcceptedSlow)
+ tmp = Status.Committed;
+ }
+ status = tmp;
}
+
switch (status)
{
- default: throw new UnhandledEnum(status);
case Truncated: throw illegalState("Truncate should be
filtered");
case Invalidated:
{
@@ -272,45 +276,6 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
return;
}
- case Applied:
- case PreApplied:
- {
- withStableDeps(recoverOkList, executeAt, (i, t) ->
node.agent().acceptAndWrap(i, t), stableDeps -> {
- adapter.persist(node, tracker.topologies(), route,
txnId, txn, executeAt, stableDeps, acceptOrCommitNotTruncated.writes,
acceptOrCommitNotTruncated.result, (i, t) -> node.agent().acceptAndWrap(i, t));
- });
- accept(acceptOrCommitNotTruncated.result, null);
- return;
- }
-
- case Stable:
- {
- withStableDeps(recoverOkList, executeAt, this, stableDeps
-> {
- adapter.execute(node, tracker.topologies(), route,
RECOVER, ExecuteFlags.none(), txnId, txn, executeAt, stableDeps, stableDeps,
this);
- });
- return;
- }
-
- case PreCommitted:
- case Committed:
- {
- withCommittedDeps(recoverOkList, executeAt, this,
committedDeps -> {
- adapter.stabilise(node, tracker.topologies(), route,
ballot, txnId, txn, executeAt, committedDeps, this);
- });
- return;
- }
-
- case AcceptedSlow:
- case AcceptedMedium:
- {
- // TODO (desired): if we have a quorum of Accept with
matching ballot or proposal we can go straight to Commit
- // TODO (desired): if we didn't find Accepted in *every*
shard, consider invalidating for consistency of behaviour
- // however, note that we may have taken the fast path
and recovered, so we can only do this if acceptedOrCommitted=Ballot.ZERO
- // (otherwise recovery was attempted and did not
invalidate, so it must have determined it needed to complete)
- Deps proposeDeps = LatestDeps.mergeProposal(recoverOkList,
ok -> ok == null ? null : ok.deps);
- propose(SLOW, acceptOrCommitNotTruncated.executeAt,
proposeDeps);
- return;
- }
-
case AcceptedInvalidate:
{
invalidate(recoverOks);
@@ -321,6 +286,53 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
case PreAccepted:
throw illegalState("Should only be possible to have
Accepted or later commands");
}
+
+ LatestDeps.Merge merge = mergeDeps(recoverOkList);
+ Participants<?> await = merge.notAccepted(route);
+ awaitPartialEarlier(recoverOkList, await, () -> {
+ switch (status)
+ {
+ default: throw new UnhandledEnum(status);
+ case Applied:
+ case PreApplied:
+ {
+ withStableDeps(merge, executeAt, (i, t) ->
node.agent().acceptAndWrap(i, t), stableDeps -> {
+ adapter.persist(node, tracker.topologies(), route,
txnId, txn, executeAt, stableDeps, acceptOrCommitNotTruncated.writes,
acceptOrCommitNotTruncated.result, (i, t) -> node.agent().acceptAndWrap(i, t));
+ });
+ accept(acceptOrCommitNotTruncated.result, null);
+ return;
+ }
+
+ case Stable:
+ {
+ withStableDeps(merge, executeAt, this, stableDeps -> {
+ adapter.execute(node, tracker.topologies(), route,
RECOVER, ExecuteFlags.none(), txnId, txn, executeAt, stableDeps, stableDeps,
this);
+ });
+ return;
+ }
+
+ case PreCommitted:
+ case Committed:
+ {
+ withCommittedDeps(merge, executeAt, this,
committedDeps -> {
+ adapter.stabilise(node, tracker.topologies(),
route, ballot, txnId, txn, executeAt, committedDeps, this);
+ });
+ return;
+ }
+
+ case AcceptedSlow:
+ case AcceptedMedium:
+ {
+ // TODO (desired): if we have a quorum of Accept with
matching ballot or proposal we can go straight to Commit
+ // TODO (desired): if we didn't find Accepted in
*every* shard, consider invalidating for consistency of behaviour
+ // however, note that we may have taken the fast
path and recovered, so we can only do this if acceptedOrCommitted=Ballot.ZERO
+ // (otherwise recovery was attempted and did not
invalidate, so it must have determined it needed to complete)
+ Deps proposeDeps = merge.mergeProposal();
+ propose(SLOW, acceptOrCommitNotTruncated.executeAt,
proposeDeps);
+ }
+ }
+ });
+ return;
}
if (acceptOrCommit != null && acceptOrCommit !=
acceptOrCommitNotTruncated)
@@ -417,6 +429,23 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
}
}
+ private static LatestDeps.Merge mergeDeps(List<RecoverOk>
nullableRecoverOkList)
+ {
+ return LatestDeps.merge(nullableRecoverOkList, ok -> ok == null ? null
: ok.deps);
+ }
+
+ private void awaitPartialEarlier(List<RecoverOk> nullableRecoverOkList,
Participants<?> participants, Runnable whenReady)
+ {
+ Deps earlierWait = Deps.merge(nullableRecoverOkList,
nullableRecoverOkList.size(), List::get, ok -> ok.earlierWait);
+ Deps earlierNoWait = Deps.merge(nullableRecoverOkList,
nullableRecoverOkList.size(), List::get, ok -> ok.earlierNoWait);
+ earlierWait = earlierWait.without(earlierNoWait);
+ earlierWait = earlierWait.intersecting(participants);
+ awaitEarlier(node, earlierWait, HasDecidedExecuteAt).begin((success,
fail) -> {
+ if (fail != null) accept(null, fail);
+ else whenReady.run();
+ });
+ }
+
private static boolean supersedingRejects(List<RecoverOk> oks)
{
for (RecoverOk ok : oks)
@@ -464,15 +493,13 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
return ok.laterCoordRejects.with(id -> from.equals(id.node));
}
- private void withCommittedDeps(List<RecoverOk> nullableRecoverOkList,
Timestamp executeAt, BiConsumer<?, Throwable> failureCallback, Consumer<Deps>
withDeps)
+ private void withCommittedDeps(LatestDeps.Merge merge, Timestamp
executeAt, BiConsumer<?, Throwable> failureCallback, Consumer<Deps> withDeps)
{
- LatestDeps.Merge merge = LatestDeps.merge(nullableRecoverOkList, ok ->
ok == null ? null : ok.deps);
LatestDeps.withCommitted(adapter, node, merge, route, ballot, txnId,
executeAt, txn, failureCallback, withDeps);
}
- private void withStableDeps(List<RecoverOk> nullableRecoverOkList,
Timestamp executeAt, BiConsumer<?, Throwable> failureCallback, Consumer<Deps>
withDeps)
+ private void withStableDeps(LatestDeps.Merge merge, Timestamp executeAt,
BiConsumer<?, Throwable> failureCallback, Consumer<Deps> withDeps)
{
- LatestDeps.Merge merge = LatestDeps.merge(nullableRecoverOkList, ok ->
ok == null ? null : ok.deps);
LatestDeps.withStable(adapter, node, merge, Deps.NONE, route, null,
null, route, ballot, txnId, executeAt, txn, failureCallback, withDeps);
}
@@ -506,7 +533,7 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
private void propose(Accept.Kind kind, Timestamp executeAt,
List<RecoverOk> recoverOkList)
{
- Deps proposeDeps = LatestDeps.mergeProposal(recoverOkList, ok -> ok ==
null ? null : ok.deps);
+ Deps proposeDeps = mergeDeps(recoverOkList).mergeProposal();
propose(kind, executeAt, proposeDeps);
}
diff --git
a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
index d2168d0d..f1fa1eae 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
@@ -100,7 +100,7 @@ public class FastPathTracker extends
PreAcceptTracker<FastPathTracker.FastPathSh
++fastPathFailures;
if (hasRejectedFastPath() && hasReachedQuorum())
- return complete(Success);
+ return mediumOrSlowSuccess();
}
return NoChange;
@@ -116,7 +116,7 @@ public class FastPathTracker extends
PreAcceptTracker<FastPathTracker.FastPathSh
++fastPathDelayed;
if (isFastPathDelayed() && hasReachedQuorum())
- return complete(Success);
+ return mediumOrSlowSuccess();
}
return NoChange;
@@ -125,10 +125,15 @@ public class FastPathTracker extends
PreAcceptTracker<FastPathTracker.FastPathSh
final ShardOutcome<? super FastPathTracker>
quorumIfHasRejectedFastPath()
{
return hasReachedQuorum() && hasRejectedFastPath()
- ? hasMetMediumPathCriteria() ?
complete(NewMediumPathSuccess) : complete(Success)
+ ? mediumOrSlowSuccess()
: NoChange;
}
+ final ShardOutcome<? super FastPathTracker> mediumOrSlowSuccess()
+ {
+ return hasMetMediumPathCriteria() ? complete(NewMediumPathSuccess)
: complete(Success);
+ }
+
final boolean isFastPathDelayed()
{
return shard.rejectsFastPath(fastQuorumSize, fastPathDelayed);
diff --git
a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
index ddec94a2..dd9c2c90 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
@@ -117,7 +117,7 @@ public class ReadTracker extends
AbstractTracker<ReadTracker.ReadShardTracker>
return NoChange;
// TODO (low priority, efficiency): support slice method accepting
a single Range
- if (unavailable == null) unavailable =
partialSuccess.unavailable.slice(Ranges.of(shard.range));
+ if (unavailable == null) unavailable =
partialSuccess.unavailable.slice(Ranges.of(shard.range), Minimal);
else unavailable = unavailable.slice(partialSuccess.unavailable,
Minimal);
if (!unavailable.isEmpty())
return ensureProgressOrFail();
diff --git
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index 3cf7df2a..4fc927f0 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -334,9 +334,9 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
if (epochs.wasTruncated(ready.epoch))
return;
- ready.metadata.addCallback(() -> epochs.acknowledge(ready));
- ready.coordinate.addCallback(() ->
localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync));
- ready.reads.addCallback(() ->
localBootstrapsComplete(epochs.getOrCreate(ready.epoch).topology));
+ ready.metadata.invokeIfSuccess(() -> epochs.acknowledge(ready));
+ ready.coordinate.invokeIfSuccess(() ->
localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync));
+ ready.reads.invokeIfSuccess(() ->
localBootstrapsComplete(epochs.getOrCreate(ready.epoch).topology));
}
protected void topologyUpdatePostListenerNotify(Topology topology) {}
@@ -350,7 +350,7 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
if (lastReceived > 0 && topology.epoch() > lastReceived + 1)
{
logger.debug("Epoch {} received; waiting to receive {} before
reporting", topology.epoch(), lastReceived + 1);
- epochs.receiveFuture(lastReceived + 1).addCallback(() ->
reportTopology(topology, isLoad, startSync));
+ epochs.receiveFuture(lastReceived + 1).invokeIfSuccess(() ->
reportTopology(topology, isLoad, startSync));
fetchTopologyForEpoch(lastReceived + 1);
return;
}
@@ -359,14 +359,14 @@ public abstract class
AbstractConfigurationService<EpochState extends AbstractCo
if (lastAcked == 0 && lastReceived > 0)
{
logger.debug("Epoch {} received; waiting for {} to ack before
reporting", topology.epoch(), epochs.minEpoch());
- epochs.acknowledgeFuture(epochs.minEpoch()).addCallback(() ->
reportTopology(topology, isLoad, startSync));
+ epochs.acknowledgeFuture(epochs.minEpoch()).invokeIfSuccess(() ->
reportTopology(topology, isLoad, startSync));
return;
}
if (lastAcked > 0 && topology.epoch() > lastAcked + 1)
{
logger.debug("Epoch {} received; waiting for {} to ack before
reporting", topology.epoch(), lastAcked + 1);
- epochs.acknowledgeFuture(lastAcked + 1).addCallback(() ->
reportTopology(topology, isLoad, startSync));
+ epochs.acknowledgeFuture(lastAcked + 1).invokeIfSuccess(() ->
reportTopology(topology, isLoad, startSync));
return;
}
diff --git
a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
index d67b4105..0e9a1ebb 100644
--- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
@@ -86,14 +86,14 @@ extends SafeCommandStore
try (Caches caches = tryGetCaches())
{
- if (caches == null)
- return with.isSubsetOf(this.context) ? with : null;
-
for (TxnId txnId : with.txnIds())
{
if (null != getInternal(txnId))
continue;
+ if (caches == null)
+ return null;
+
C safeCommand = caches.acquireIfLoaded(txnId);
if (safeCommand == null)
return null;
@@ -116,11 +116,14 @@ extends SafeCommandStore
if (null != getInternal(key))
continue; // already in working set
- CFK safeCfk = caches.acquireIfLoaded(key);
- if (safeCfk != null)
+ if (caches != null)
{
- add(safeCfk, caches);
- continue;
+ CFK safeCfk = caches.acquireIfLoaded(key);
+ if (safeCfk != null)
+ {
+ add(safeCfk, caches);
+ continue;
+ }
}
if (unavailable == null)
unavailable = new ArrayList<>();
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java
b/accord-core/src/main/java/accord/local/CommandStores.java
index abe95748..4ce5e59a 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -781,7 +781,7 @@ public abstract class CommandStores
for (ShardHolder shard : shards)
results.add(shard.store.build(context, mapper));
- return AsyncChains.all(results);
+ return AsyncChains.allOf(results);
}
protected <O> AsyncChain<O> mapReduce(PreLoadContext context, IntStream
commandStoreIds, MapReduce<? super SafeCommandStore, O> mapReduce)
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index 446633c5..a5c421c2 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -238,7 +238,7 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
public AsyncResult<Void> unsafeStart()
{
EpochReady ready =
onTopologyUpdateInternal(configService.currentTopology(), false);
- ready.coordinate.addCallback(() ->
this.topology.onEpochSyncComplete(id, topology.epoch()));
+ ready.coordinate.invokeIfSuccess(() ->
this.topology.onEpochSyncComplete(id, topology.epoch()));
configService.acknowledgeEpoch(ready, false);
return ready.metadata;
}
@@ -357,7 +357,7 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
if (topology.epoch() <= this.topology.epoch())
return AsyncResults.success(null);
EpochReady ready = onTopologyUpdateInternal(topology, startSync);
- ready.coordinate.addCallback(() ->
this.topology.onEpochSyncComplete(id, topology.epoch()));
+ ready.coordinate.invokeIfSuccess(() ->
this.topology.onEpochSyncComplete(id, topology.epoch()));
configService.acknowledgeEpoch(ready, startSync);
return ready.coordinate;
}
@@ -790,7 +790,7 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
{
AsyncResult<Result> result = withEpoch(Math.max(txnId.epoch(),
minEpoch), () -> initiateCoordination(txnId, txn)).beginAsResult();
coordinating.putIfAbsent(txnId, result);
- result.addCallback((success, fail) -> coordinating.remove(txnId,
result));
+ result.invoke((success, fail) -> coordinating.remove(txnId, result));
return result;
}
@@ -855,7 +855,7 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
return future;
}).beginAsResult();
coordinating.putIfAbsent(txnId, result);
- result.addCallback((success, fail) -> coordinating.remove(txnId,
result));
+ result.invoke((success, fail) -> coordinating.remove(txnId, result));
return result;
}
@@ -865,7 +865,7 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
if (waitForEpoch > topology.epoch())
{
configService.fetchTopologyForEpoch(waitForEpoch);
- topology().awaitEpoch(waitForEpoch).addCallback((ignored, failure)
-> {
+ topology().awaitEpoch(waitForEpoch).invoke((ignored, failure) -> {
if (failure != null)
agent().onUncaughtException(WrappableException.wrap(failure));
else
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 7efe5509..3a86e5cc 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -519,7 +519,7 @@ public class RedundantBefore extends
ReducingRangeMap<RedundantBefore.Bounds>
if (bounds == null)
return execute;
- Invariants.require(executeAt == null ? !bounds.outOfBounds(txnId)
: !bounds.outOfBounds(txnId, executeAt));
+ Invariants.require(txnId.isSyncPoint() || (executeAt == null ?
!bounds.outOfBounds(txnId) : !bounds.outOfBounds(txnId, executeAt)));
if (bounds.is(txnId, PRE_BOOTSTRAP_OR_STALE))
return without.apply(execute, Ranges.of(bounds.range));
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index c30a140f..628b5718 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -660,7 +660,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
return encoded;
}
- private static int encode(TxnId txnId, InternalStatus internalStatus,
boolean mayExecute)
+ static int encode(TxnId txnId, InternalStatus internalStatus, boolean
mayExecute)
{
int encoded = internalStatus.txnInfoEncoded | (mayExecute ?
MAY_EXECUTE : 0);
if (txnId.is(Key)) encoded |= MANAGED;
@@ -1262,10 +1262,10 @@ public class CommandsForKey extends CommandsForKeyUpdate
int insertPos = Arrays.binarySearch(byId, testStartedAtTimestamp);
if (insertPos < 0)
{
- loadingFor = NO_TXNIDS;
+ loadingFor = NOT_LOADING_PRUNED;
insertPos = -1 - insertPos;
if (computeIsDep != IGNORE &&
testTxnId.compareTo(prunedBefore) < 0)
- loadingFor = loadingPrunedFor(loadingPruned, testTxnId,
NO_TXNIDS);
+ loadingFor = loadingPrunedFor(loadingPruned, testTxnId,
NOT_LOADING_PRUNED);
}
switch (testStartedAt)
@@ -1300,7 +1300,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
TxnId[] missing = txn.missing();
hasAsDep = missing == NO_TXNIDS ||
Arrays.binarySearch(txn.missing(), testTxnId) < 0;
}
- else if (loadingFor == NO_TXNIDS)
+ else if (loadingFor == NOT_LOADING_PRUNED)
{
hasAsDep = false;
}
@@ -1603,35 +1603,33 @@ public class CommandsForKey extends CommandsForKeyUpdate
{
// update
TxnInfo cur = byId[pos];
- if (cur != null)
+ int c = cur.compareTo(newStatus);
+ if (c > 0)
{
- int c = cur.compareTo(newStatus);
- if (c > 0)
- {
- // newStatus moves us backwards; we only permit this for
(Pre)?(Not)?Accepted states
- if (cur.compareTo(COMMITTED) >= 0 ||
newStatus.compareTo(PREACCEPTED) <= 0)
- return this;
+ // newStatus moves us backwards; we only permit this for
(Pre)?(Not)?Accepted states
+ if (cur.compareTo(COMMITTED) >= 0 ||
newStatus.compareTo(PREACCEPTED) <= 0)
+ return this;
- // and only when the new ballot is strictly greater
- if (updated.acceptedOrCommitted().compareTo(cur.ballot())
<= 0)
- return this;
- }
- else if (c == 0)
- {
- // we're updating to the same state; we only do this with
a strictly greater ballot;
- // even so, if we have no executeAt or deps there's
nothing to record
- if (updated.acceptedOrCommitted().compareTo(cur.ballot())
<= 0 || !newStatus.hasExecuteAtOrDeps())
- return this;
- }
- else
- {
- // we're advancing to a higher status, but this is only
permitted either if the new state is stable or the ballot is higher
- if (cur.compareTo(STABLE) < 0 &&
updated.acceptedOrCommitted().compareTo(cur.ballot()) < 0)
- return this;
- }
+ // and only when the new ballot is strictly greater
+ if (updated.acceptedOrCommitted().compareTo(cur.ballot()) <= 0)
+ return this;
+ }
+ else if (c == 0)
+ {
+ // we're updating to the same state; we only do this with a
strictly greater ballot;
+ // even so, if we have no executeAt or deps there's nothing to
record
+ if (updated.acceptedOrCommitted().compareTo(cur.ballot()) <= 0
|| !newStatus.hasExecuteAtOrDeps())
+ return this;
+ }
+ else
+ {
+ // we're advancing to a higher status, but this is only
permitted either if the new state is stable or the ballot is higher
+ if (cur.compareTo(STABLE) < 0 &&
updated.acceptedOrCommitted().compareTo(cur.ballot()) < 0)
+ return this;
}
if (isOutOfRange) result = insertOrUpdateOutOfRange(pos, txnId,
cur, newStatus, mayExecute, updated, witnessedBy);
+ else if (cur.compareTo(STABLE) >= 0) result = update(pos, txnId,
cur, cur.withEncodedStatus(TxnInfo.encode(txnId, newStatus, cur.mayExecute(),
cur.statusOverrides())), updated, witnessedBy);
else if (newStatus.hasDeps()) result = update(pos, txnId, cur,
newStatus, mayExecute, updated, witnessedBy);
else result = update(pos, txnId, cur, TxnInfo.create(txnId,
newStatus, mayExecute, updated), updated, witnessedBy);
}
@@ -2000,10 +1998,10 @@ public class CommandsForKey extends CommandsForKeyUpdate
return new CommandsForKeyUpdateWithPostProcess(newCfk,
newPostProcess);
}
- TxnInfo[] newById = removeRedundantById(byId, bounds, newBounds);
+ Object[] newLoadingPruned =
Pruning.removeRedundantLoadingPruned(loadingPruned, redundantBefore(newBounds));
+ TxnInfo[] newById = removeRedundantById(byId, newLoadingPruned !=
loadingPruned, bounds, newBounds);
int newPrunedBeforeById = prunedBeforeId(newById, prunedBefore(),
redundantBefore(newBounds));
Invariants.paranoid(newPrunedBeforeById < 0 ? prunedBeforeById < 0 ||
byId[prunedBeforeById].compareTo(newBounds.gcBefore) < 0 :
newById[newPrunedBeforeById].equals(byId[prunedBeforeById]));
- Object[] newLoadingPruned =
Pruning.removeRedundantLoadingPruned(loadingPruned, redundantBefore(newBounds));
long maxUniqueHlc = this.maxUniqueHlc;
if (maxUniqueHlc <= newBounds.gcBefore.hlc() &&
newBounds.gcBefore.is(HLC_BOUND))
@@ -2022,10 +2020,10 @@ public class CommandsForKey extends CommandsForKeyUpdate
{
QuickBounds newBoundsInfo =
bounds.withGcBeforeBeforeAtLeast(newRedundantBefore);
- TxnInfo[] newById = removeRedundantById(byId, bounds, newBoundsInfo);
+ Object[] newLoadingPruned =
Pruning.removeRedundantLoadingPruned(loadingPruned, newRedundantBefore);
+ TxnInfo[] newById = removeRedundantById(byId, newLoadingPruned !=
loadingPruned, bounds, newBoundsInfo);
int newPrunedBeforeById = prunedBeforeId(newById, prunedBefore(),
newRedundantBefore);
Invariants.paranoid(newPrunedBeforeById < 0 ? prunedBeforeById < 0 :
newById[newPrunedBeforeById].equals(byId[prunedBeforeById]));
- Object[] newLoadingPruned =
Pruning.removeRedundantLoadingPruned(loadingPruned, newRedundantBefore);
return reconstruct(key, newBoundsInfo, true, newById, maxUniqueHlc,
newLoadingPruned, newPrunedBeforeById, unmanageds);
}
@@ -2216,7 +2214,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
{
Invariants.require(txn.witnesses(missingId));
TxnInfo missingInfo = get(missingId, byId);
-
Invariants.require(missingInfo.status().compareTo(InternalStatus.COMMITTED) <
0);
+ Invariants.require(missingInfo == null ?
missingId.is(UNSTABLE) && find(loadingPruned, missingId) != null :
missingInfo.status().compareTo(InternalStatus.COMMITTED) < 0);
Invariants.require(txn.depsKnownBefore().compareTo(missingId) >= 0);
}
if (txn.isCommittedAndExecutes())
diff --git a/accord-core/src/main/java/accord/local/cfk/Pruning.java
b/accord-core/src/main/java/accord/local/cfk/Pruning.java
index 08788c05..29aec370 100644
--- a/accord-core/src/main/java/accord/local/cfk/Pruning.java
+++ b/accord-core/src/main/java/accord/local/cfk/Pruning.java
@@ -527,7 +527,7 @@ public class Pruning
return epochPrunedBefores;
}
- static TxnInfo[] removeRedundantById(TxnInfo[] byId, QuickBounds
prevBounds, QuickBounds newBounds)
+ static TxnInfo[] removeRedundantById(TxnInfo[] byId, boolean
hasRedundantLoadingPruned, QuickBounds prevBounds, QuickBounds newBounds)
{
TxnId newRedundantBefore = redundantBefore(newBounds);
TxnId newBootstrappedAt = bootstrappedAt(newBounds);
@@ -538,7 +538,7 @@ public class Pruning
TxnInfo[] newById = byId;
int pos = insertPos(byId, newRedundantBefore);
- if (pos != 0)
+ if (pos != 0 || hasRedundantLoadingPruned)
{
if (Invariants.isParanoid() && testParanoia(LINEAR, NONE, LOW))
{
diff --git a/accord-core/src/main/java/accord/local/cfk/Updating.java
b/accord-core/src/main/java/accord/local/cfk/Updating.java
index a33d3b5b..d93f2735 100644
--- a/accord-core/src/main/java/accord/local/cfk/Updating.java
+++ b/accord-core/src/main/java/accord/local/cfk/Updating.java
@@ -47,6 +47,7 @@ import accord.primitives.RoutingKeys;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
+import accord.utils.ArrayBuffers.ObjectBuffers;
import accord.utils.Invariants;
import accord.utils.RelationMultiMap;
import accord.utils.SortedArrays;
@@ -65,6 +66,7 @@ import static accord.local.cfk.CommandsForKey.NO_INFOS;
import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.APPLY;
import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.COMMIT;
import static accord.local.cfk.CommandsForKey.executesIgnoringBootstrap;
+import static accord.local.cfk.CommandsForKey.manages;
import static accord.local.cfk.CommandsForKey.reportLinearizabilityViolation;
import static accord.local.cfk.CommandsForKey.mayExecute;
import static accord.local.cfk.Pruning.removeLoadingPruned;
@@ -120,7 +122,6 @@ class Updating
static CommandsForKeyUpdate insertOrUpdate(CommandsForKey cfk, int
insertPos, int updatePos, TxnId plainTxnId, TxnInfo curInfo, InternalStatus
newStatus, boolean mayExecute, Command command, @Nonnull TxnId[] witnessedBy)
{
- // TODO (expected): do not calculate any deps or additions if we're
transitioning from Stable to Applied; wasted effort and might trigger LoadPruned
Object newInfoObj = computeInfoAndAdditions(cfk, insertPos, updatePos,
plainTxnId, newStatus, mayExecute, command);
if (newInfoObj.getClass() != InfoWithAdditions.class)
return insertOrUpdate(cfk, insertPos, plainTxnId, curInfo,
(TxnInfo)newInfoObj, command, witnessedBy);
@@ -157,7 +158,7 @@ class Updating
TxnInfo[] newById = new TxnInfo[byId.length + additionCount +
(updatePos < 0 ? 1 : 0)];
insertOrUpdateWithAdditions(byId, insertPos, updatePos, plainTxnId,
newInfo, additions, additionCount, newById, newCommittedByExecuteAt,
witnessedBy, cfk.bounds);
if (testParanoia(SUPERLINEAR, NONE, LOW))
- validateMissing(newById, additions, additionCount, curInfo,
newInfo, NO_TXNIDS);
+ validateMissing(newById, additions, additionCount, curInfo,
newInfo, witnessedBy);
int newMinUndecidedById = updateMinUndecidedById(newInfo, insertPos,
updatePos, cfk, byId, newById, additions, additionCount);
// we don't insert anything before prunedBeforeById (we LoadPruned
instead), so we can simply bump it by 0 or 1
@@ -299,14 +300,14 @@ class Updating
MergeCursor<TxnId, DepList> deps =
command.partialDeps().txnIds(cfk.key());
deps.find(cfk.redundantBefore());
- return computeInfoAndAdditions(cfk.byId, insertPos, updatePos, txnId,
newStatus, mayExecute, ballot, executeAt, depsKnownBefore, deps);
+ return computeInfoAndAdditions(cfk.byId, insertPos, updatePos, txnId,
newStatus, mayExecute, ballot, executeAt, cfk.prunedBefore(), depsKnownBefore,
deps);
}
/**
* We return an Object here to avoid wasting allocations; most of the time
we expect a new TxnInfo to be returned,
* but if we have transitive dependencies to insert we return an
InfoWithAdditions
*/
- static Object computeInfoAndAdditions(TxnInfo[] byId, int insertPos, int
updatePos, TxnId plainTxnId, InternalStatus newStatus, boolean mayExecute,
Ballot ballot, Timestamp executeAt, Timestamp depsKnownBefore,
MergeCursor<TxnId, DepList> deps)
+ static Object computeInfoAndAdditions(TxnInfo[] byId, int insertPos, int
updatePos, TxnId plainTxnId, InternalStatus newStatus, boolean mayExecute,
Ballot ballot, Timestamp executeAt, TxnInfo prunedBefore, Timestamp
depsKnownBefore, MergeCursor<TxnId, DepList> deps)
{
TxnId[] additions = NO_TXNIDS, missing = NO_TXNIDS;
int additionCount = 0, missingCount = 0;
@@ -335,12 +336,8 @@ class Updating
// we should ensure any existing TRANSITIVE entries are
upgraded.
// OR we should remove TRANSITIVE for simplicity,
// OR document/enforce that TRANSITIVE_VISIBLE can only be
applied to dependencies of unmanaged transactions
- if (d.is(UNSTABLE) && t.compareTo(COMMITTED) < 0 &&
t.witnesses(d))
- {
- if (missingCount == missing.length)
- missing = cachedTxnIds().resize(missing, missingCount,
Math.max(8, missingCount * 2));
- missing[missingCount++] = d;
- }
+ if (d.is(UNSTABLE) && txnIdsIndex < depsKnownBeforePos &&
t.compareTo(COMMITTED) < 0 && plainTxnId.witnesses(d))
+ missing = append(missing, missingCount++, d,
cachedTxnIds());
++txnIdsIndex;
deps.advance();
@@ -350,21 +347,20 @@ class Updating
// we expect to be missing ourselves
// we also permit any transaction we have recorded as
COMMITTED or later to be missing, as recovery will not need to consult our
information
if (txnIdsIndex != updatePos && txnIdsIndex <
depsKnownBeforePos && t.compareTo(COMMITTED) < 0 && plainTxnId.witnesses(t))
- {
- if (missingCount == missing.length)
- missing = cachedTxnIds().resize(missing, missingCount,
Math.max(8, missingCount * 2));
- missing[missingCount++] = t.plainTxnId();
- }
+ missing = append(missing, missingCount++, t.plainTxnId(),
cachedTxnIds());
txnIdsIndex++;
}
else
{
if (plainTxnId.witnesses(d))
{
- if (additionCount >= additions.length)
- additions = cachedTxnIds().resize(additions,
additionCount, Math.max(8, additionCount * 2));
-
- additions[additionCount++] = d;
+ if (d.is(UNSTABLE))
+ {
+ if (d.compareTo(depsKnownBefore) < 0 && (manages(d) ||
d.compareTo(prunedBefore) > 0))
+ missing = append(missing, missingCount++, d,
cachedTxnIds());
+ d = d.withoutNonIdentityFlags();
+ }
+ additions = append(additions, additionCount++, d,
cachedTxnIds());
}
else
{
@@ -384,9 +380,13 @@ class Updating
TxnId d = deps.cur();
if (plainTxnId.witnesses(d))
{
- if (additionCount >= additions.length)
- additions = cachedTxnIds().resize(additions,
additionCount, Math.max(8, additionCount * 2));
- additions[additionCount++] =
deps.cur().withoutNonIdentityFlags();
+ if (d.is(UNSTABLE))
+ {
+ if (d.compareTo(depsKnownBefore) < 0 && (manages(d) ||
d.compareTo(prunedBefore) > 0))
+ missing = append(missing, missingCount++, d,
cachedTxnIds());
+ d = d.withoutNonIdentityFlags();
+ }
+ additions = append(additions, additionCount++, d,
cachedTxnIds());
}
deps.advance();
}
@@ -398,13 +398,9 @@ class Updating
{
if (txnIdsIndex != updatePos &&
byId[txnIdsIndex].compareTo(COMMITTED) < 0)
{
- TxnId txnId = byId[txnIdsIndex].plainTxnId();
- if ((plainTxnId.witnesses(txnId)))
- {
- if (missingCount == missing.length)
- missing = cachedTxnIds().resize(missing,
missingCount, Math.max(8, missingCount * 2));
- missing[missingCount++] = txnId;
- }
+ TxnInfo txn = byId[txnIdsIndex];
+ if (plainTxnId.witnesses(txn))
+ missing = append(missing, missingCount++,
txn.plainTxnId(), cachedTxnIds());
}
txnIdsIndex++;
}
@@ -417,6 +413,14 @@ class Updating
return new InfoWithAdditions(info, additions, additionCount);
}
+ private static <T> T[] append(T[] array, int index, T add,
ObjectBuffers<T> cached)
+ {
+ if (index == array.length)
+ array = cached.resize(array, index, Math.max(8, index * 2));
+ array[index] = add;
+ return array;
+ }
+
static CommandsForKeyUpdate insertOrUpdate(CommandsForKey cfk, int pos,
TxnId plainTxnId, TxnInfo curInfo, TxnInfo newInfo, Command command, @Nullable
TxnId[] witnessedBy)
{
if (curInfo == newInfo)
@@ -475,6 +479,10 @@ class Updating
// TODO (desired): for consistency, move this to insertOrUpdate
(without additions), while maintaining the efficiency
Utils.addToMissingArrays(newById, newCommittedByExecuteAt,
newInfo, plainTxnId, witnessedBy);
}
+ else if (witnessedBy != null && newInfo.compareTo(COMMITTED) >= 0)
+ {
+ Utils.removeFromWitnessMissingArrays(newById,
newCommittedByExecuteAt, plainTxnId, witnessedBy);
+ }
if (testParanoia(SUPERLINEAR, NONE, LOW) && curInfo == null &&
newInfo.compareTo(COMMITTED) < 0)
validateMissing(newById, NO_TXNIDS, 0, curInfo, newInfo,
witnessedBy);
@@ -499,7 +507,8 @@ class Updating
// we may need to insert or remove ourselves, depending on the new and
prior status
boolean insertSelfMissing = sourceUpdatePos < 0 &&
newInfo.compareTo(COMMITTED) < 0;
- boolean removeSelfMissing = sourceUpdatePos >= 0 &&
newInfo.compareTo(COMMITTED) >= 0 && byId[sourceUpdatePos].compareTo(COMMITTED)
< 0;
+ boolean removeSelfMissing = newInfo.compareTo(COMMITTED) >= 0 &&
sourceUpdatePos >= 0 && byId[sourceUpdatePos].compareTo(COMMITTED) < 0;
+ boolean removeWitnessedMissing = newInfo.compareTo(COMMITTED) >= 0 &&
witnessedBy.length > 0;
// missingSource starts as additions, but if we insertSelfMissing at
the relevant moment it becomes the merge of additions and plainTxnId
TxnId[] missingSource = additions;
@@ -536,20 +545,24 @@ class Updating
}
int to = missingTo(txn, depsKnownBefore, missingSource,
missingCount, missingLimit);
- if (to > 0 || removeSelfMissing)
+ if (to > 0 || removeSelfMissing || removeWitnessedMissing)
{
+ int witnessedByIndex = -1;
+ if (witnessedBy != NOT_LOADING_PRUNED && (to > 0 ||
!removeSelfMissing))
+ witnessedByIndex =
Arrays.binarySearch(witnessedBy, txn);
+
TxnId[] curMissing = txn.missing();
TxnId[] newMissing = curMissing;
if (to > 0)
{
TxnId skipInsertMissing = null;
- if (Arrays.binarySearch(witnessedBy, plainTxnId)
>= 0)
+ if (insertSelfMissing && witnessedByIndex >= 0)
skipInsertMissing = plainTxnId;
newMissing = mergeAndFilterMissing(txn,
curMissing, missingSource, to, skipInsertMissing);
}
- if (removeSelfMissing)
+ if (removeSelfMissing || (removeWitnessedMissing &&
witnessedByIndex >= 0))
newMissing = removeOneMissing(newMissing,
plainTxnId);
if (newMissing != curMissing)
diff --git a/accord-core/src/main/java/accord/local/cfk/Utils.java
b/accord-core/src/main/java/accord/local/cfk/Utils.java
index a8a0ae39..695e8f88 100644
--- a/accord-core/src/main/java/accord/local/cfk/Utils.java
+++ b/accord-core/src/main/java/accord/local/cfk/Utils.java
@@ -42,6 +42,7 @@ class Utils
{
static void validateMissing(TxnInfo[] byId, TxnId[] additions, int
additionCount, TxnInfo curInfo, TxnInfo newInfo, @Nonnull TxnId[]
shouldNotHaveMissing)
{
+ int newInfoAdditionIndex = Arrays.binarySearch(additions, 0,
additionCount, newInfo);
for (TxnInfo txn : byId)
{
if (txn == newInfo) continue;
@@ -54,7 +55,7 @@ class Utils
{
if (!txn.witnesses(additions[i])) continue;
j = SortedArrays.exponentialSearch(missing, j, missing.length,
additions[i]);
- if (shouldNotHaveMissing != NO_TXNIDS &&
Arrays.binarySearch(shouldNotHaveMissing, txn) >= 0) Invariants.require(j < 0);
+ if (shouldNotHaveMissing != NO_TXNIDS && i ==
newInfoAdditionIndex && Arrays.binarySearch(shouldNotHaveMissing, txn) >= 0)
Invariants.require(j < 0);
else Invariants.require(j >= 0);
}
if (curInfo == null && newInfo.compareTo(COMMITTED) < 0 &&
txn.witnesses(newInfo) && txn.depsKnownBefore().compareTo(newInfo) > 0 &&
(shouldNotHaveMissing == NO_TXNIDS || Arrays.binarySearch(shouldNotHaveMissing,
txn) < 0))
@@ -95,6 +96,40 @@ class Utils
removeFromMissingArraysById(byId, minSearchIndex, byId.length,
removeTxnId);
}
+ /**
+ * {@code removeTxnId} no longer needs to be tracked in missing arrays;
+ * remove it from byId and committedByExecuteAt, ensuring both arrays
still reference the same TxnInfo where updated
+ */
+ static void removeFromWitnessMissingArrays(TxnInfo[] byId, TxnInfo[]
committedByExecuteAt, TxnId removeTxnId, TxnId[] witnessedBy)
+ {
+ if (witnessedBy.length == 0)
+ return;
+
+ int byIdIndex = Arrays.binarySearch(byId, witnessedBy[0]);
+ if (byIdIndex < 0)
+ byIdIndex = -1 - byIdIndex;
+
+ for (TxnId txnId : witnessedBy)
+ {
+ byIdIndex = SortedArrays.exponentialSearch(byId, byIdIndex,
byId.length, txnId);
+ if (byIdIndex < 0)
+ {
+ byIdIndex = -1 - byIdIndex;
+ continue;
+ }
+ TxnInfo curTxn = byId[byIdIndex];
+ TxnId[] curMissing = curTxn.missing();
+ if (curMissing == NO_TXNIDS) continue;
+ TxnId[] newMissing = removeOneMissing(curMissing, removeTxnId);
+ if (newMissing == curMissing) continue;
+ TxnInfo newTxn = curTxn.withMissing(newMissing);
+ byId[byIdIndex] = newTxn;
+ if (!curTxn.isCommittedAndExecutes()) continue;
+ int byExecuteAtIndex = Arrays.binarySearch(committedByExecuteAt,
curTxn, TxnInfo::compareExecuteAt);
+ committedByExecuteAt[byExecuteAtIndex] = newTxn;
+ }
+ }
+
/**
* {@code removeTxnId} no longer needs to be tracked in missing arrays;
* remove it from a range of byId ACCEPTED status entries only, that could
not be tracked via committedByExecuteAt
diff --git
a/accord-core/src/main/java/accord/local/durability/ConcurrencyControl.java
b/accord-core/src/main/java/accord/local/durability/ConcurrencyControl.java
index ea840f09..202e7eff 100644
--- a/accord-core/src/main/java/accord/local/durability/ConcurrencyControl.java
+++ b/accord-core/src/main/java/accord/local/durability/ConcurrencyControl.java
@@ -49,7 +49,7 @@ class ConcurrencyControl implements BiConsumer<Object,
Throwable>
void start(ConcurrencyControl concurrencyControl)
{
- supplier.get().addCallback(result).begin(concurrencyControl);
+ supplier.get().invoke(result).begin(concurrencyControl);
}
}
@@ -80,7 +80,7 @@ class ConcurrencyControl implements BiConsumer<Object,
Throwable>
}
++inflight;
}
- return supplier.get().addCallback(this);
+ return supplier.get().invoke(this);
}
synchronized void setMaxConcurrency(int newMaxConcurrency)
diff --git
a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
index 431a396c..9f209e52 100644
--- a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
+++ b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
@@ -248,7 +248,7 @@ public class DurabilityQueue
if (request != null)
request.reportAttempt(exclusiveSyncPoint.syncId,
node.elapsed(MICROSECONDS), coordinate);
- coordinate.addCallback((success, fail) -> {
+ coordinate.invoke((success, fail) -> {
synchronized (this)
{
unregisterInProgress(exclusiveSyncPoint, coordinate);
diff --git
a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
index 4c17b8c4..f9abe277 100644
--- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
+++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
@@ -321,7 +321,7 @@ public class ShardDurability
syncId -> node.withEpoch(syncId.epoch(),
() -> syncPointControl.submit(
() ->
CoordinateSyncPoint.exclusive(node, syncId, (FullRoute<Range>)
node.computeRoute(syncId, ranges))
-
.addCallback(logSyncPoint(syncId, ranges))
+
.invoke(logSyncPoint(syncId, ranges))
)))
.begin((success, fail) -> {
scheduled = null;
diff --git a/accord-core/src/main/java/accord/messages/Apply.java
b/accord-core/src/main/java/accord/messages/Apply.java
index 379f0a7b..8b5f3855 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -52,9 +52,9 @@ public class Apply extends TxnRequest<ApplyReply>
public static final Factory FACTORY = Apply::new;
public static class SerializationSupport
{
- public static Apply create(TxnId txnId, Route<?> scope, long minEpoch,
long waitForEpoch, Kind kind, Timestamp executeAt, PartialDeps deps, PartialTxn
txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result)
+ public static Apply create(TxnId txnId, Route<?> scope, long minEpoch,
long waitForEpoch, long maxEpoch, Kind kind, Timestamp executeAt, PartialDeps
deps, PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result
result)
{
- return new Apply(kind, txnId, scope, minEpoch, waitForEpoch,
executeAt, deps, txn, fullRoute, writes, result);
+ return new Apply(kind, txnId, scope, minEpoch, waitForEpoch,
maxEpoch, executeAt, deps, txn, fullRoute, writes, result);
}
}
@@ -71,6 +71,7 @@ public class Apply extends TxnRequest<ApplyReply>
public final @Nullable Writes writes;
public final Result result;
public final long minEpoch;
+ public final long maxEpoch;
public enum Kind { Minimal, Maximal }
@@ -86,6 +87,7 @@ public class Apply extends TxnRequest<ApplyReply>
this.writes = writes;
this.result = result;
this.minEpoch = participates.oldestEpoch();
+ this.maxEpoch = participates.currentEpoch();
}
public static Topologies participates(Node node, Unseekables<?> route,
TxnId txnId, Timestamp executeAt, Topologies executes)
@@ -98,7 +100,7 @@ public class Apply extends TxnRequest<ApplyReply>
return node.topology().preciseEpochs(route, txnId.epoch(),
executeAt.epoch(), SHARE);
}
- protected Apply(Kind kind, TxnId txnId, Route<?> route, long minEpoch,
long waitForEpoch, Timestamp executeAt, PartialDeps deps, @Nullable PartialTxn
txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result)
+ protected Apply(Kind kind, TxnId txnId, Route<?> route, long minEpoch,
long waitForEpoch, long maxEpoch, Timestamp executeAt, PartialDeps deps,
@Nullable PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes,
Result result)
{
super(txnId, route, waitForEpoch);
this.kind = kind;
@@ -109,19 +111,20 @@ public class Apply extends TxnRequest<ApplyReply>
this.writes = writes;
this.result = result;
this.minEpoch = minEpoch;
+ this.maxEpoch = maxEpoch;
}
@Override
public Cancellable submit()
{
- return node.mapReduceConsumeLocal(this, minEpoch, executeAt.epoch(),
this);
+ return node.mapReduceConsumeLocal(this, minEpoch, maxEpoch, this);
}
@Override
public ApplyReply apply(SafeCommandStore safeStore)
{
Route<?> route = fullRoute != null ? fullRoute : scope;
- StoreParticipants participants = StoreParticipants.execute(safeStore,
route, minEpoch, txnId, executeAt.epoch());
+ StoreParticipants participants = StoreParticipants.execute(safeStore,
route, minEpoch, txnId, maxEpoch);
return apply(safeStore, participants);
}
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java
b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index d3c0da31..e858a659 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -479,7 +479,7 @@ public class BeginRecovery extends
TxnRequest.WithUnsynced<BeginRecovery.Recover
", deps:" + deps +
", earlierWait:" + earlierWait +
", earlierNoWait:" + earlierNoWait +
- ", laterNoVote:" + laterCoordRejects +
+ ", laterCoordRejects:" + laterCoordRejects +
", selfAcceptsFastPath:" + selfAcceptsFastPath +
(txnId.hasPrivilegedCoordinator() ? ",
coordinatorFastPath:" + selfAcceptsFastPath : "") +
", supersedingRejects:" + supersedingRejects +
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java
b/accord-core/src/main/java/accord/messages/PreAccept.java
index a55201e6..dabff631 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -123,7 +123,7 @@ public class PreAccept extends
WithUnsynced<PreAccept.PreAcceptReply>
if (command.status().compareTo(Status.PreAccepted) > 0)
return PreAcceptNack.INSTANCE;
- if (command.executeAt().is(REJECTED))
+ if (command.executeAt().is(REJECTED) &&
!participants.owns().isEmpty()) // if our vote is required we don't need to
compute deps
return new PreAcceptOk(txnId, command.executeAt(),
Deps.NONE, ExecuteFlags.none());
case Retired:
diff --git a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
index ff3a6c99..473ea98d 100644
--- a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
@@ -40,7 +40,7 @@ public class SetGloballyDurable implements Request,
PreLoadContext
@Override
public void process(Node node, Node.Id from, ReplyContext replyContext)
{
- node.markDurable(durableBefore).addCallback((success, fail) -> {
+ node.markDurable(durableBefore).invoke((success, fail) -> {
node.reply(from, replyContext, fail == null ? Ok : null, fail);
});
}
diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java
b/accord-core/src/main/java/accord/messages/SetShardDurable.java
index 1ead882f..5f27c3e1 100644
--- a/accord-core/src/main/java/accord/messages/SetShardDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java
@@ -57,7 +57,7 @@ public class SetShardDurable extends
AbstractRequest<SimpleReply>
Invariants.require(durability.compareTo(Durability.MajorityOrInvalidated) >= 0);
TxnId syncIdWithFlags = syncIdWithFlags();
node.markDurable(exclusiveSyncPoint.route.toRanges(), syncIdWithFlags,
durability.compareTo(Durability.UniversalOrInvalidated) >= 0 ? syncIdWithFlags
: TxnId.NONE)
- .addCallback((success, fail) -> {
+ .invoke((success, fail) -> {
if (fail != null) node.reply(replyTo, replyContext, null, fail);
else node.mapReduceConsumeLocal(this, exclusiveSyncPoint.route,
waitForEpoch(), waitForEpoch(), this);
});
diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
index bc3d8dac..a2fa3c88 100644
--- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
@@ -26,6 +26,7 @@ import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.TxnId;
import accord.topology.Topologies;
+import accord.utils.Invariants;
import static
accord.messages.MessageType.StandardMessage.WAIT_UNTIL_APPLIED_REQ;
import static accord.primitives.SaveStatus.Applied;
@@ -52,6 +53,7 @@ public class WaitUntilApplied extends ReadData
{
super(to, topologies, txnId, scope, executeAtEpoch);
this.minEpoch = topologies.oldestEpoch();
+ Invariants.require(minEpoch <= executeAtEpoch);
}
protected WaitUntilApplied(TxnId txnId, Participants<?> scope, long
minEpoch, long executeAtEpoch)
diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
index dc19ff20..d7bcad48 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
@@ -222,6 +222,12 @@ public abstract class AbstractRanges implements
Iterable<Range>, Routables<Range
return SortedArrays.binarySearch(ranges, 0, size(), find,
Range::compareIntersecting, search);
}
+ @Override
+ public final int indexOf(Range find)
+ {
+ return Arrays.binarySearch(ranges, 0, size(), find, Range::compare);
+ }
+
@Override
public final int findNext(int thisIndex, Range find, SortedArrays.Search
search)
{
@@ -648,6 +654,8 @@ public abstract class AbstractRanges implements
Iterable<Range>, Routables<Range
@Override
public boolean equals(Object that)
{
+ if (this == that)
+ return true;
if (that == null || this.getClass() != that.getClass())
return false;
return Arrays.equals(this.ranges, ((AbstractRanges) that).ranges);
diff --git a/accord-core/src/main/java/accord/primitives/LatestDeps.java
b/accord-core/src/main/java/accord/primitives/LatestDeps.java
index cb6a656b..294df183 100644
--- a/accord-core/src/main/java/accord/primitives/LatestDeps.java
+++ b/accord-core/src/main/java/accord/primitives/LatestDeps.java
@@ -46,9 +46,11 @@ import accord.utils.UnhandledEnum;
import static accord.messages.Accept.Kind.SLOW;
import static accord.primitives.Known.KnownDeps.DepsCommitted;
import static accord.primitives.Known.KnownDeps.DepsErased;
+import static accord.primitives.Known.KnownDeps.DepsFromCoordinator;
import static accord.primitives.Known.KnownDeps.DepsKnown;
import static accord.primitives.Known.KnownDeps.DepsProposed;
import static accord.primitives.Known.KnownDeps.DepsProposedFixed;
+import static accord.primitives.Known.KnownDeps.DepsUnknown;
import static accord.topology.Topologies.SelectNodeOwnership.SHARE;
public class LatestDeps extends ReducingRangeMap<LatestDeps.LatestEntry>
@@ -419,7 +421,17 @@ public class LatestDeps extends
ReducingRangeMap<LatestDeps.LatestEntry>
return result;
}
- Deps mergeProposal()
+ public Participants<?> notAccepted(Participants<?> participants)
+ {
+ for (int i = 0 ; i < values.length ; ++i)
+ {
+ if (values[i] != null && values[i].known != DepsUnknown &&
values[i].known != DepsFromCoordinator)
+ participants =
participants.without(Ranges.of(starts[i].rangeFactory().newRange(starts[i],
starts[i + 1])));
+ }
+ return participants;
+ }
+
+ public Deps mergeProposal()
{
return mergeProposal(null);
}
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java
b/accord-core/src/main/java/accord/primitives/Routables.java
index 16db6efb..cf08d5fb 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -51,6 +51,7 @@ public interface Routables<K extends Routable> extends
Iterable<K>
}
K get(int i);
+ int indexOf(K key);
int size();
boolean isEmpty();
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 41a33693..c1e0ceb4 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -360,6 +360,9 @@ public class Timestamp implements Comparable<Timestamp>,
EpochSupplier
public boolean equals(Timestamp that)
{
+ if (that == this)
+ return true;
+
return that != null && this.msb == that.msb
&& ((this.lsb ^ that.lsb) & IDENTITY_LSB) == 0
&& this.node.equals(that.node);
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 39943296..72f12f53 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -1220,7 +1220,9 @@ public class TopologyManager
Epochs snapshot = epochs;
EpochState maxState = snapshot.get(maxEpoch);
- Invariants.require(maxState != null, "Unable to find epoch %d; known
epochs are %d -> %d", maxEpoch, snapshot.minEpoch(), snapshot.currentEpoch);
+ if (maxState == null)
+ throw new TopologyRetiredException(maxEpoch, snapshot.minEpoch());
+
if (minEpoch == maxEpoch)
return new Single(sorter,
selectFunction.apply(snapshot.get(minEpoch).global, select,
selectNodeOwnership));
diff --git a/accord-core/src/main/java/accord/utils/Functions.java
b/accord-core/src/main/java/accord/utils/Functions.java
index 58551c41..8e25d80e 100644
--- a/accord-core/src/main/java/accord/utils/Functions.java
+++ b/accord-core/src/main/java/accord/utils/Functions.java
@@ -18,29 +18,12 @@
package accord.utils;
-import javax.annotation.Nonnull;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
public class Functions
{
-
- public static <T> T reduceNonNull(BiFunction<T, T, T> merge, T a, T b)
- {
- return a == null ? b : b == null ? a : merge.apply(a, b);
- }
-
- public static <T1, T2> T1 reduceNonNull(BiFunction<T1, T2, T1> merge,
@Nonnull T1 a, T2 ... bs)
- {
- for (T2 b : bs)
- {
- if (b != null)
- a = merge.apply(a, b);
- }
- return a;
- }
-
public static <I, O> O mapReduceNonNull(Function<I, O> map, BiFunction<O,
O, O> reduce, List<I> input)
{
O result = null;
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java
b/accord-core/src/main/java/accord/utils/Invariants.java
index fe6cae97..1e2b7bab 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -285,6 +285,12 @@ public class Invariants
throw illegalState(format(fmt, p1, p2, p3));
}
+ public static void require(boolean condition, String fmt, int p1,
@Nullable Object p2, @Nullable Object p3)
+ {
+ if (!condition)
+ throw illegalState(format(fmt, p1, p2, p3));
+ }
+
public static <P> void require(boolean condition, String fmt, @Nullable
Object p1, @Nullable Object p2, @Nullable P p3, Function<? super P, Object>
transformP3)
{
if (!condition)
diff --git a/accord-core/src/main/java/accord/utils/LogGroupTimers.java
b/accord-core/src/main/java/accord/utils/LogGroupTimers.java
index 4d45479d..338ef27b 100644
--- a/accord-core/src/main/java/accord/utils/LogGroupTimers.java
+++ b/accord-core/src/main/java/accord/utils/LogGroupTimers.java
@@ -339,7 +339,7 @@ public class LogGroupTimers<T extends LogGroupTimers.Timer>
{
wakeAt = deadline;
}
- else if (prevDeadline == wakeAt && bucketsStart != bucketsEnd)
+ else if (prevDeadline == wakeAt && prevDeadline != Long.MAX_VALUE &&
bucketsStart != bucketsEnd)
{
Bucket<T> head = buckets[bucketsStart];
if (!head.isHeapified())
diff --git a/accord-core/src/main/java/accord/utils/PersistentField.java
b/accord-core/src/main/java/accord/utils/PersistentField.java
index 6face04d..117f8157 100644
--- a/accord-core/src/main/java/accord/utils/PersistentField.java
+++ b/accord-core/src/main/java/accord/utils/PersistentField.java
@@ -105,7 +105,7 @@ public class PersistentField<Input, Saved>
pending.add(new Pending<>(id, newValue));
AsyncResult<?> pendingWrite = persister.persist(inputValue, newValue);
- pendingWrite.addCallback((success, fail) -> {
+ pendingWrite.invoke((success, fail) -> {
synchronized (this)
{
complete.add(id);
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
index d8950f94..d05fc1ec 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
@@ -49,8 +49,6 @@ public class AsyncCallbacks
}
public static <T> BiConsumer<T, Throwable> toCallback(Runnable runnable) {
- return (unused, failure) -> {
- if (failure == null) runnable.run();
- };
+ return (s, f) -> runnable.run();
}
}
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChain.java
b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
index 0899b268..b2f6070f 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChain.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
@@ -27,13 +27,8 @@ import java.util.function.Function;
import javax.annotation.Nullable;
-import com.google.common.util.concurrent.ListenableFuture;
-
public interface AsyncChain<V>
{
- /**
- * Support {@link
com.google.common.util.concurrent.Futures#transform(ListenableFuture,
com.google.common.base.Function, Executor)} natively
- */
<T> AsyncChain<T> map(Function<? super V, ? extends T> mapper);
default <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper,
Executor executor)
@@ -41,9 +36,6 @@ public interface AsyncChain<V>
return AsyncChains.map(this, mapper, executor);
}
- /**
- * Support {@link
com.google.common.util.concurrent.Futures#transform(ListenableFuture,
com.google.common.base.Function, Executor)} natively
- */
<T> AsyncChain<T> flatMap(Function<? super V, ? extends AsyncChain<T>>
mapper);
default <T> AsyncChain<T> flatMap(Function<? super V, ? extends
AsyncChain<T>> mapper, Executor executor)
@@ -78,19 +70,19 @@ public interface AsyncChain<V>
*/
AsyncChain<V> recover(Function<? super Throwable, ? extends AsyncChain<V>>
mapper);
- default AsyncChain<Void> accept(Consumer<? super V> action)
+ default AsyncChain<V> invokeIfSuccess(Consumer<? super V> action)
{
return map(r -> {
action.accept(r);
- return null;
+ return r;
});
}
- default AsyncChain<Void> accept(Consumer<? super V> action, Executor
executor)
+ default AsyncChain<V> invokeIfSuccess(Consumer<? super V> action, Executor
executor)
{
return map(r -> {
action.accept(r);
- return null;
+ return r;
}, executor);
}
@@ -101,27 +93,29 @@ public interface AsyncChain<V>
return map(a -> a, e);
}
- /**
- * Support {@link com.google.common.util.concurrent.Futures#addCallback}
natively
- */
- AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback);
+ AsyncChain<V> invoke(BiConsumer<? super V, Throwable> callback);
/**
- * Adds a callback that only listens to the successful case, a failed
chain will not trigger the callback
+ * Adds a callback that fires on success only
*/
- default AsyncChain<V> addCallback(Runnable runnable)
+ default AsyncChain<V> invokeIfSuccess(Runnable runnable)
{
- return addCallback(AsyncCallbacks.toCallback(runnable));
+ return invoke((success, fail) -> {
+ if (fail == null) runnable.run();
+ });
}
- default AsyncChain<V> addCallback(BiConsumer<? super V, Throwable>
callback, ExecutorService es)
+ /**
+ * Adds a callback that fires on either success or failure
+ */
+ default AsyncChain<V> invoke(Runnable runnable)
{
- return addCallback(AsyncCallbacks.inExecutorService(callback, es));
+ return invoke((success, fail) -> runnable.run());
}
- default AsyncChain<V> addCallback(Runnable runnable, ExecutorService es)
+ default AsyncChain<V> invoke(BiConsumer<? super V, Throwable> callback,
ExecutorService es)
{
- return addCallback(AsyncCallbacks.inExecutorService(runnable, es));
+ return invoke(AsyncCallbacks.inExecutorService(callback, es));
}
/**
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index bc5256b6..060e4f7c 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -19,6 +19,7 @@
package accord.utils.async;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
@@ -39,7 +40,6 @@ import java.util.function.Function;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,7 +129,7 @@ public abstract class AsyncChains<V> implements
AsyncChain<V>
}
@Override
- public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable>
callback)
+ public AsyncChain<V> invoke(BiConsumer<? super V, Throwable> callback)
{
if (value == null || value.getClass() != FailureHolder.class)
callback.accept((V) value, null);
@@ -141,7 +141,7 @@ public abstract class AsyncChains<V> implements
AsyncChain<V>
@Override
public Cancellable begin(BiConsumer<? super V, Throwable> callback)
{
- addCallback(callback);
+ invoke(callback);
return null;
}
}
@@ -454,7 +454,7 @@ public abstract class AsyncChains<V> implements
AsyncChain<V>
}
@Override
- public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback)
+ public AsyncChain<V> invoke(BiConsumer<? super V, Throwable> callback)
{
return add(EncapsulatedCallback::new, callback);
}
@@ -655,28 +655,45 @@ public abstract class AsyncChains<V> implements
AsyncChain<V>
});
}
- public static <V> AsyncChain<V[]> allOf(List<? extends AsyncChain<?
extends V>> chains)
+ public static <V> AsyncChain<List<V>> allOf(List<? extends AsyncChain<?
extends V>> chains)
{
- return new AsyncChainCombiner<>(chains);
+ return allOfInternal(chains).map(Arrays::asList);
}
- public static <V> AsyncChain<List<V>> all(List<? extends AsyncChain<?
extends V>> chains)
+ // cannot expose this as we're actually providing an Object[] to the next
in the chain
+ // which is not safe if receiver statically expecting the strongly typed
array
+ private static <V> AsyncChain<V[]> allOfInternal(List<? extends
AsyncChain<? extends V>> chains)
{
- return new AsyncChainCombiner<>(chains).map(Lists::newArrayList);
+ return new AsyncChainCombiner<>(chains);
}
public static <V> AsyncChain<V> reduce(List<? extends AsyncChain<? extends
V>> chains, BiFunction<? super V, ? super V, ? extends V> reducer)
{
if (chains.size() == 1)
return (AsyncChain<V>) chains.get(0);
- return allOf(chains).map(r -> reduceArray(r, reducer));
+ return allOfInternal(chains).map(r -> reduceArray(r, reducer));
+ }
+
+ public static <I, O> AsyncChain<O> reduce(List<? extends AsyncChain<?
extends I>> chains, BiFunction<? super I, ? super O, ? extends O> reducer, O
identity)
+ {
+ if (chains.size() == 1)
+ return chains.get(0).map(i -> reducer.apply(i, identity));
+ return allOfInternal(chains).map(r -> reduceArray(r, reducer,
identity));
}
- private static <V> V reduceArray(V[] results, BiFunction<? super V, ?
super V, ? extends V> reducer)
+ private static <V> V reduceArray(Object[] results, BiFunction<? super V, ?
super V, ? extends V> reducer)
{
- V result = results[0];
+ V result = (V) results[0];
for (int i=1; i< results.length; i++)
- result = reducer.apply(result, results[i]);
+ result = reducer.apply(result, (V)results[i]);
+ return result;
+ }
+
+ private static <I, O> O reduceArray(Object[] results, BiFunction<? super
I, ? super O, ? extends O> reducer, O identity)
+ {
+ O result = identity;
+ for (int i=0; i< results.length; i++)
+ result = reducer.apply((I)results[i], identity);
return result;
}
@@ -710,7 +727,7 @@ public abstract class AsyncChains<V> implements
AsyncChain<V>
case 0: return AsyncChains.success(identity);
case 1: return chains.get(0).map(a -> reducer.apply(identity, a));
}
- return allOf(chains).map(results -> {
+ return allOfInternal(chains).map(results -> {
B result = identity;
for (A r : results)
result = reducer.apply(result, r);
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResult.java
b/accord-core/src/main/java/accord/utils/async/AsyncResult.java
index 30fd524f..fae3bc9c 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResult.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResult.java
@@ -31,13 +31,7 @@ import static accord.utils.Invariants.illegalState;
public interface AsyncResult<V> extends AsyncChain<V>
{
@Override
- AsyncResult<V> addCallback(BiConsumer<? super V, Throwable> callback);
-
- @Override
- default AsyncResult<V> addCallback(Runnable runnable)
- {
- return addCallback(AsyncCallbacks.toCallback(runnable));
- }
+ AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback);
boolean isDone();
boolean isSuccess();
@@ -46,7 +40,7 @@ public interface AsyncResult<V> extends AsyncChain<V>
default @Nullable Cancellable begin(BiConsumer<? super V, Throwable>
callback)
{
//TODO chain shouldn't allow double calling, but should result allow?
- addCallback(callback);
+ invoke(callback);
return null;
}
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
index 01632db4..6c954376 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
@@ -136,7 +136,7 @@ public class AsyncResults
@Override
protected Cancellable start(BiConsumer<? super V, Throwable>
callback)
{
- AbstractResult.this.addCallback(callback);
+ AbstractResult.this.invoke(callback);
return null;
}
};
@@ -172,7 +172,7 @@ public class AsyncResults
}
@Override
- public AsyncResult<V> addCallback(BiConsumer<? super V, Throwable>
callback)
+ public AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback)
{
Listener<V> listener = null;
while (true)
@@ -290,7 +290,7 @@ public class AsyncResults
@Override
protected Cancellable start(BiConsumer<? super V, Throwable>
callback)
{
- AsyncResults.Immediate.this.addCallback(callback);
+ AsyncResults.Immediate.this.invoke(callback);
return null;
}
};
@@ -315,7 +315,7 @@ public class AsyncResults
}
@Override
- public AsyncResult<V> addCallback(BiConsumer<? super V, Throwable>
callback)
+ public AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback)
{
callback.accept(value, failure);
return this;
diff --git a/accord-core/src/main/java/accord/utils/btree/BTreeSet.java
b/accord-core/src/main/java/accord/utils/btree/BTreeSet.java
index 1122da86..1d7dd15b 100644
--- a/accord-core/src/main/java/accord/utils/btree/BTreeSet.java
+++ b/accord-core/src/main/java/accord/utils/btree/BTreeSet.java
@@ -31,8 +31,6 @@ import java.util.SortedSet;
import java.util.Spliterator;
import java.util.Spliterators;
-import com.google.common.collect.Ordering;
-
import static accord.utils.btree.BTree.Dir;
import static accord.utils.btree.BTree.findIndex;
@@ -76,7 +74,7 @@ public class BTreeSet<V> extends AbstractSet<V> implements
NavigableSet<V>, List
*/
public V get(int index)
{
- return BTree.<V>findByIndex(tree, index);
+ return BTree.findByIndex(tree, index);
}
public int lastIndexOf(Object o)
@@ -658,7 +656,7 @@ public class BTreeSet<V> extends AbstractSet<V> implements
NavigableSet<V>, List
public static <V extends Comparable<V>> BTreeSet<V> of(V value)
{
- return new BTreeSet<>(BTree.singleton(value), Ordering.<V>natural());
+ return new BTreeSet<>(BTree.singleton(value),
Comparator.naturalOrder());
}
public static <V> BTreeSet<V> empty(Comparator<? super V> comparator)
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 6a6a8f9e..54a8314a 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -915,7 +915,7 @@ public class Cluster
Command afterCommand = e.getValue().value();
if (beforeCommand == null)
{
-
Invariants.requireArgument(afterCommand.is(Status.NotDefined) ||
afterCommand.saveStatus() == SaveStatus.Vestigial);
+
Invariants.requireArgument(afterCommand.is(Status.NotDefined) ||
afterCommand.saveStatus().compareTo(SaveStatus.Vestigial) >= 0);
continue;
}
if (afterCommand.hasBeen(Status.Truncated))
diff --git
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 975cc727..b02f0744 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -360,8 +360,8 @@ public class DelayedCommandStores extends
InMemoryCommandStores.SingleThread
if (next == null)
return;
- next.addCallback(agent()); // used to track unexpected exceptions
and notify simulations
- next.addCallback(this::afterExecution);
+ next.invoke(agent()); // used to track unexpected exceptions and
notify simulations
+ next.invoke(this::afterExecution);
executor.execute(next);
}
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 32303e91..2b4edb8c 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -20,6 +20,8 @@ package accord.impl.basic;
import java.util.AbstractList;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
@@ -33,7 +35,6 @@ import com.google.common.collect.ImmutableSortedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.api.Agent;
import accord.api.Journal;
import accord.api.Result;
import accord.impl.CommandChange;
@@ -110,35 +111,34 @@ import static accord.utils.Invariants.illegalState;
public class InMemoryJournal implements Journal
{
private static final Logger log =
LoggerFactory.getLogger(InMemoryJournal.class);
- private final Int2ObjectHashMap<NavigableMap<TxnId, List<Diff>>>
diffsPerCommandStore = new Int2ObjectHashMap<>();
+ private final Int2ObjectHashMap<NavigableMap<TxnId, Diffs>>
diffsPerCommandStore = new Int2ObjectHashMap<>();
private final List<TopologyUpdate> topologyUpdates = new ArrayList<>();
private final Int2ObjectHashMap<FieldUpdates> fieldStates = new
Int2ObjectHashMap<>();
private Node node;
- private Agent agent;
private final RandomSource random;
+ private final float partialCompactionChance;
public InMemoryJournal(Node.Id id, RandomSource random)
{
this.random = random;
+ this.partialCompactionChance = 1f - (random.nextFloat()/2);
}
- public Journal start(Node node)
+ public void start(Node node)
{
this.node = node;
- this.agent = node.agent();
- return this;
}
@Override
public Command loadCommand(int commandStoreId, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore)
{
- NavigableMap<TxnId, List<Diff>> commandStore =
this.diffsPerCommandStore.get(commandStoreId);
+ NavigableMap<TxnId, Diffs> commandStore =
this.diffsPerCommandStore.get(commandStoreId);
if (commandStore == null)
return null;
- List<Diff> saved =
this.diffsPerCommandStore.get(commandStoreId).get(txnId);
+ Diffs saved = this.diffsPerCommandStore.get(commandStoreId).get(txnId);
if (saved == null)
return null;
@@ -174,7 +174,7 @@ public class InMemoryJournal implements Journal
private Builder reconstruct(int commandStoreId, TxnId txnId, Load load)
{
- NavigableMap<TxnId, List<Diff>> commandStore =
this.diffsPerCommandStore.get(commandStoreId);
+ NavigableMap<TxnId, Diffs> commandStore =
this.diffsPerCommandStore.get(commandStoreId);
if (commandStore == null)
return null;
@@ -182,12 +182,13 @@ public class InMemoryJournal implements Journal
return
reconstruct(this.diffsPerCommandStore.get(commandStoreId).get(txnId), load);
}
- private Builder reconstruct(List<Diff> saved, Load load)
+ private Builder reconstruct(Diffs files, Load load)
{
- if (saved == null)
+ if (files == null)
return null;
Builder builder = null;
+ List<Diff> saved = files.sorted(false);
for (int i = saved.size() - 1; i >= 0; i--)
{
Diff diff = saved.get(i);
@@ -212,8 +213,8 @@ public class InMemoryJournal implements Journal
}
diffsPerCommandStore.computeIfAbsent(commandStoreId, (k) -> new
TreeMap<>())
- .computeIfAbsent(update.txnId, (k_) -> new
ArrayList<>())
- .add(diff);
+ .computeIfAbsent(update.txnId, (k_) -> new Diffs())
+ .addFlushed(diff);
if (onFlush!= null)
onFlush.run();
@@ -323,50 +324,189 @@ public class InMemoryJournal implements Journal
onFlush.run();
}
+ static class DiffFile extends ArrayList<Diff>
+ {
+ DiffFile(){}
+ DiffFile(List<Diff> diffs)
+ {
+ for (Diff diff : diffs)
+ {
+ if (diff != null)
+ add(diff);
+ }
+ }
+ }
+
+ static class Diffs
+ {
+ final boolean subset;
+ final List<DiffFile> files;
+ final List<Diff> flushed;
+ int nextId;
+
+ int size;
+ List<Diff> sorted;
+
+ Diffs()
+ {
+ this.subset = false;
+ this.files = new ArrayList<>();
+ this.flushed = new ArrayList<>();
+ }
+
+ Diffs(PurgedList purged)
+ {
+ this.subset = false;
+ this.files = Collections.emptyList();
+ this.flushed = purged;
+ }
+
+ Diffs(TruncatedList truncated)
+ {
+ this.subset = false;
+ this.files = new ArrayList<>();
+ this.flushed = truncated;
+ this.size = 1;
+ }
+
+ Diffs(ErasedList erased)
+ {
+ this.subset = false;
+ this.files = Collections.emptyList();
+ this.flushed = erased;
+ this.size = 1;
+ }
+
+ Diffs(boolean subset, List<DiffFile> files, List<Diff> flushed)
+ {
+ this.subset = subset;
+ this.files = files;
+ this.flushed = flushed;
+ this.size = flushed.size();
+ for (DiffFile file : files)
+ size += file.size();
+ }
+
+ void addFlushed(Diff diff)
+ {
+ diff.rowId = ++nextId;
+ flushed.add(diff);
+ if (sorted != null && sorted != flushed)
+ sorted.add(diff);
+ ++size;
+ }
+
+ List<Diff> sorted(boolean copy)
+ {
+ if (sorted != null)
+ {
+ Invariants.require(sorted.size() == size);
+ return copy ? new ArrayList<>(sorted) : sorted;
+ }
+
+ if (!subset)
+ {
+ if (files.isEmpty())
+ return copy ? new ArrayList<>(flushed) : flushed;
+
+ if (flushed.isEmpty() && files.size() == 1)
+ return copy ? new ArrayList<>(files.get(0)) : files.get(0);
+ }
+
+ List<Diff> sorted = new ArrayList<>(size);
+ for (Diff diff : flushed)
+ {
+ if (diff != null)
+ sorted.add(diff);
+ }
+ for (DiffFile file : this.files)
+ {
+ if (file != null)
+ sorted.addAll(file);
+ }
+ Invariants.require(sorted.size() == size);
+ sorted.sort(Comparator.comparingInt(d -> d.rowId));
+ if (!copy)
+ this.sorted = sorted;
+ return sorted;
+ }
+
+ void removeAll(Diffs diffs)
+ {
+ files.removeAll(diffs.files);
+ flushed.removeAll(diffs.flushed);
+ size -= diffs.size;
+ sorted = null;
+ }
+
+ boolean isEmpty()
+ {
+ return size == 0;
+ }
+ }
+
static int counter = 0;
@Override
public void purge(CommandStores commandStores, EpochSupplier minEpoch)
{
truncateTopologiesForTesting(minEpoch.epoch());
- boolean isPartialCompaction = random.nextBoolean();
- for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> e :
diffsPerCommandStore.entrySet())
+ boolean isPartialCompaction = random.decide(0.9f);
+ for (Map.Entry<Integer, NavigableMap<TxnId, Diffs>> e :
diffsPerCommandStore.entrySet())
{
int commandStoreId = e.getKey();
- Map<TxnId, List<Diff>> localJournal = e.getValue();
+ Map<TxnId, Diffs> localJournal = e.getValue();
CommandStore store = commandStores.forId(commandStoreId);
if (store == null)
continue;
- for (Map.Entry<TxnId, List<Diff>> e2 : localJournal.entrySet())
+ for (Map.Entry<TxnId, Diffs> e2 : localJournal.entrySet())
{
- List<Diff> diffs = e2.getValue();
-
+ Diffs diffs = e2.getValue();
if (diffs.isEmpty()) continue;
- List<Diff> subset = diffs;
- if (diffs.size() > 1 && isPartialCompaction)
+
+ Diffs subset = diffs;
{
- int removeCount = 1 + random.nextInt(diffs.size() - 1);
- int count = diffs.size();
- subset = new ArrayList<>(diffs);
- while (removeCount-- > 0)
+ int filesAndFlushed = subset.flushed.size() +
subset.files.size();
+ if (filesAndFlushed > 1 && isPartialCompaction)
{
- int removeIndex = random.nextInt(diffs.size());
- if (subset.get(removeIndex) == null)
+ int removeCount = 1 + random.nextInt(filesAndFlushed -
1);
+ int count = filesAndFlushed;
+ subset = new Diffs(true, new ArrayList<>(diffs.files),
new ArrayList<>(diffs.flushed));
+ List<DiffFile> files = subset.files;
+ List<Diff> flushed = subset.flushed;
+ while (removeCount-- > 0)
+ {
+ int removeIndex = random.nextInt(filesAndFlushed);
+ if (removeIndex < flushed.size())
+ {
+ if (flushed.get(removeIndex) == null)
+ continue;
+ --subset.size;
+ flushed.set(removeIndex, null);
+ }
+ else
+ {
+ removeIndex -= flushed.size();
+ if (files.get(removeIndex) == null)
+ continue;
+ subset.size -= files.get(removeIndex).size();
+ files.set(removeIndex, null);
+ }
+ --count;
+ }
+
+ if (count == 0)
continue;
- subset.set(removeIndex, null);
- --count;
}
-
- if (count == 0)
- continue;
}
- Builder[] builders = new Builder[diffs.size()];
- for (int i = 0 ; i < subset.size() ; ++i)
+ Builder[] builders = new Builder[subset.size];
+ List<Diff> sorted = subset.sorted(true);
+ for (int i = 0 ; i < sorted.size() ; ++i)
{
- if (subset.get(i) == null) continue;
+ if (sorted.get(i) == null) continue;
Builder builder = new Builder(e2.getKey(), ALL);
- builder.apply(subset.get(i));
+ builder.apply(sorted.get(i));
builders[i] = builder;
}
@@ -388,7 +528,8 @@ public class InMemoryJournal implements Journal
{
if (cleanup == EXPUNGE)
{
- if (input == FULL || subset == diffs) e2.setValue(new
PurgedList());
+ if (input == FULL) e2.setValue(new Diffs(new
PurgedList()));
+ else if (subset == diffs) e2.setValue(new Diffs());
else diffs.removeAll(subset);
continue;
}
@@ -411,20 +552,39 @@ public class InMemoryJournal implements Journal
else
{
Diff diff = builder.toDiff();
- e2.setValue(cleanup == ERASE ? new
ErasedList(diff) : new TruncatedList(diff));
+ e2.setValue(cleanup == ERASE ? new Diffs(new
ErasedList(diff)) : new Diffs(new TruncatedList(diff)));
continue;
}
}
}
+ if (diffs.flushed instanceof FinalList)
+ continue;
+
+ int removeCount = 0;
for (int i = 0 ; i < builders.length ; ++i)
{
if (builders[i] != null)
{
Diff diff = builders[i].toDiff();
- diffs.set(i, diff.flags == 0 ? null : diff);
+ if (diff.flags == 0)
+ {
+ ++removeCount;
+ sorted.set(i, null);
+ }
+ else
+ {
+ diff.rowId = sorted.get(i).rowId;
+ sorted.set(i, diff);
+ }
}
}
+
+ diffs.size -= removeCount;
+ diffs.flushed.removeAll(subset.flushed);
+ diffs.files.removeAll(subset.files);
+ diffs.files.add(new DiffFile(sorted));
+ diffs.sorted = null;
}
}
}
@@ -432,18 +592,18 @@ public class InMemoryJournal implements Journal
@Override
public void replay(CommandStores commandStores)
{
- for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> diffEntry :
diffsPerCommandStore.entrySet())
+ for (Map.Entry<Integer, NavigableMap<TxnId, Diffs>> diffEntry :
diffsPerCommandStore.entrySet())
{
int commandStoreId = diffEntry.getKey();
// copy to avoid concurrent modification when appending to journal
- Map<TxnId, List<Diff>> diffs = new TreeMap<>(diffEntry.getValue());
+ Map<TxnId, List<Diff>> diffs = new TreeMap<>();
InMemoryCommandStore commandStore = (InMemoryCommandStore)
commandStores.forId(commandStoreId);
Loader loader = commandStore.loader();
- for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
- e.setValue(new ArrayList<>(e.getValue()));
+ for (Map.Entry<TxnId, Diffs> e : diffEntry.getValue().entrySet())
+ diffs.put(e.getKey(), e.getValue().sorted(true));
for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
{
@@ -455,7 +615,20 @@ public class InMemoryJournal implements Journal
}
}
- private static class ErasedList extends AbstractList<Diff>
+ static class TruncatedList extends ArrayList<Diff>
+ {
+ TruncatedList(Diff truncated)
+ {
+ add(truncated);
+ }
+ }
+
+ private static abstract class FinalList extends AbstractList<Diff>
+ {
+
+ }
+
+ private static class ErasedList extends FinalList
{
private Diff erased;
@@ -500,15 +673,7 @@ public class InMemoryJournal implements Journal
}
}
- static class TruncatedList extends ArrayList<Diff>
- {
- TruncatedList(Diff truncated)
- {
- add(truncated);
- }
- }
-
- private static class PurgedList extends AbstractList<Diff>
+ private static class PurgedList extends FinalList
{
@Override
public Diff get(int index)
@@ -558,6 +723,7 @@ public class InMemoryJournal implements Journal
public final TxnId txnId;
public final Map<Field, Object> changes;
public final int flags;
+ private int rowId;
private Diff(TxnId txnId, int flags, Map<Field, Object> changes)
{
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index 3df406ec..32a94e5c 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -76,9 +76,9 @@ public class LoggingJournal implements Journal
@Override
- public Journal start(Node node)
+ public void start(Node node)
{
- return this;
+ delegate.start(node);
}
@Override
diff --git
a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
index 53b78354..0f2452bd 100644
--- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
+++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
@@ -66,7 +66,7 @@ public class ListFetchCoordinator extends
AbstractFetchCoordinator
ListData listData = (ListData) data;
persisting.add(commandStore.build(PreLoadContext.empty(), safeStore ->
{
listData.forEach((key, value) -> listStore.writeUnsafe(key,
value));
- }).flatMap(ignore -> listStore.snapshot(true, received,
syncPoint.syncId)).addCallback((success, fail) -> {
+ }).flatMap(ignore -> listStore.snapshot(true, received,
syncPoint.syncId)).invoke((success, fail) -> {
if (fail == null) success(from, received);
else fail(from, received, fail);
}).beginAsResult());
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java
b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index ddc06379..dc95693d 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -300,7 +300,7 @@ public class ListRequest implements Request
txn = gen.apply(node);
id = txnIdGen.apply(node, txn);
listener.onClientAction(MessageListener.ClientAction.SUBMIT,
node.id(), id, txn);
- node.coordinate(id, txn).addCallback(new ResultCallback(node, client,
replyContext, listener, id, txn));
+ node.coordinate(id, txn).invoke(new ResultCallback(node, client,
replyContext, listener, id, txn));
}
@Override
diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java
b/accord-core/src/test/java/accord/local/CommandsTest.java
index f10b9e7f..eab240a7 100644
--- a/accord-core/src/test/java/accord/local/CommandsTest.java
+++ b/accord-core/src/test/java/accord/local/CommandsTest.java
@@ -101,7 +101,7 @@ class CommandsTest
for (Node n : nodeMap.values())
((TestableConfigurationService)
n.configService()).reportTopology(updatedTopology);
- node.coordinate(txnId, txn).addCallback((success, failure)
-> {
+ node.coordinate(txnId, txn).invoke((success, failure) -> {
if (failure == null)
{
node.agent().onUncaughtException(new
AssertionError("Expected TopologyMismatch exception, but txn was success"));
diff --git a/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java
b/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java
index 8f429a3e..f7893e5a 100644
--- a/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java
+++ b/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java
@@ -90,7 +90,7 @@ public class AsyncChainsTest
ResultCallback<Integer> intermediateCallback = new ResultCallback<>();
AsyncChain<Integer> chain =
AsyncChains.ofCallable(MoreExecutors.directExecutor(), () -> 5);
- chain = chain.addCallback(intermediateCallback);
+ chain = chain.invoke(intermediateCallback);
chain = chain.map(i -> i + 2);
chain.begin(finalCallback);
@@ -129,10 +129,10 @@ public class AsyncChainsTest
AsyncChain<Integer> chain1 = AsyncChains.success(1);
AsyncChain<Integer> chain2 = AsyncChains.success(2);
AsyncChain<Integer> chain3 = AsyncChains.success(3);
- AsyncChain<List<Integer>> reduced =
AsyncChains.all(Lists.newArrayList(chain1, chain2, chain3));
+ AsyncChain<List<Integer>> reduced =
AsyncChains.allOf(Lists.newArrayList(chain1, chain2, chain3));
ResultCallback<List<Integer>> callback = new ResultCallback<>();
reduced.begin(callback);
- Assertions.assertEquals(Lists.newArrayList(1, 2, 3), callback.value());
+ Assertions.assertEquals(Arrays.asList(1, 2, 3), callback.value());
}
@Test
@@ -164,7 +164,7 @@ public class AsyncChainsTest
}, () -> {})
.map(ignore -> 1)
.beginAsResult()
- .addCallback((success, failure) -> {
+ .invoke((success, failure) -> {
if (failure == null)
throw illegalState("Expected to fail");
});
@@ -243,7 +243,7 @@ public class AsyncChainsTest
AtomicBoolean sawCallback = new AtomicBoolean(false);
AsyncChains.failure(new NullPointerException("just kidding"))
.beginAsResult()
- .addCallback(() -> sawCallback.set(true))
+ .invokeIfSuccess(() -> sawCallback.set(true))
.begin((success, failure) -> {
if (failure != null) sawFailure.set(true);
else sawFailure.set(false);
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index ffc0d3ef..8ab5ed99 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -388,7 +388,7 @@ public class Cluster implements Scheduler
public static class NoOpJournal implements Journal
{
- @Override public Journal start(Node node) { return null; }
+ @Override public void start(Node node) { }
@Override public Command loadCommand(int store, TxnId txnId,
RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new
IllegalStateException("Not impelemented"); }
@Override public Command.Minimal loadMinimal(int store, TxnId txnId,
Load load, RedundantBefore redundantBefore, DurableBefore durableBefore) {
throw new IllegalStateException("Not impelemented"); }
@Override public void saveCommand(int store, CommandUpdate value,
Runnable onFlush) { throw new IllegalStateException("Not impelemented"); }
diff --git
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
index f7987140..01621f72 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
@@ -51,7 +51,7 @@ public class MaelstromRequest extends Body implements Request
@Override
public void process(Node node, Id client, ReplyContext replyContext)
{
- node.coordinate(txn).addCallback((success, fail) -> {
+ node.coordinate(txn).invoke((success, fail) -> {
Reply reply = success != null ? new
MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext),
(MaelstromResult) success) : null;
node.reply(client, replyContext, reply, fail);
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]