This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 449b2b4d Fix CompactionIteratorTest, switch to streaming serialization of SavedCommand 449b2b4d is described below commit 449b2b4d0bf4bb44d55a3c57f712a4d5a15e7220 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Wed Aug 28 12:01:53 2024 +0200 Fix CompactionIteratorTest, switch to streaming serialization of SavedCommand Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-19865 Co-authored-by: dcapwell <dcapw...@gmail.com> --- .../src/main/java/accord/local/Command.java | 18 ++++++-- .../src/main/java/accord/local/Commands.java | 2 +- .../main/java/accord/utils/ImmutableBitSet.java | 8 ++++ .../test/java/accord/messages/ReadDataTest.java | 2 +- .../src/test/java/accord/utils/AccordGens.java | 51 +++++++++++++++++++++- .../test/java/accord/utils/ReflectionUtils.java | 6 +++ .../src/main/java/accord/maelstrom/Packet.java | 1 + 7 files changed, 80 insertions(+), 8 deletions(-) diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index 8c98882d..1a886c35 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -39,6 +39,7 @@ import accord.primitives.PartialDeps; import accord.primitives.PartialTxn; import accord.primitives.RangeDeps; import accord.primitives.Ranges; +import accord.primitives.Routable; import accord.primitives.Route; import accord.primitives.Seekables; import accord.primitives.Timestamp; @@ -1223,7 +1224,15 @@ public abstract class Command implements CommonAttributes public static class WaitingOn { - public static final WaitingOn EMPTY = new WaitingOn(Keys.EMPTY, RangeDeps.NONE, KeyDeps.NONE, ImmutableBitSet.EMPTY, ImmutableBitSet.EMPTY); + private static final WaitingOn EMPTY_FOR_KEY = new WaitingOn(Keys.EMPTY, RangeDeps.NONE, KeyDeps.NONE, ImmutableBitSet.EMPTY, null); + private static final WaitingOn EMPTY_FOR_RANGE = new WaitingOn(Keys.EMPTY, RangeDeps.NONE, KeyDeps.NONE, ImmutableBitSet.EMPTY, ImmutableBitSet.EMPTY); + + public static WaitingOn empty(Routable.Domain domain) + { + if (domain == Range) + return EMPTY_FOR_RANGE; + return EMPTY_FOR_KEY; + } public final Keys keys; public final RangeDeps directRangeDeps; @@ -1288,10 +1297,11 @@ public abstract class Command implements CommonAttributes return ifNull; } - public static WaitingOn none(Deps deps) + public static WaitingOn none(Routable.Domain domain, Deps deps) { - ImmutableBitSet empty = new ImmutableBitSet(deps.rangeDeps.txnIdCount() + deps.directKeyDeps.txnIdCount() + deps.keyDeps.keys().size()); - return new WaitingOn(deps.keyDeps.keys(), deps.rangeDeps, deps.directKeyDeps, empty, empty); + return new WaitingOn(deps.keyDeps.keys(), deps.rangeDeps, deps.directKeyDeps, + new ImmutableBitSet(deps.directKeyDeps.txnIdCount() + deps.keyDeps.keys().size()), + domain == Range ? new ImmutableBitSet(deps.rangeDeps.txnIdCount()) : null); } public boolean isWaiting() diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index d3387a6b..bb8c8ba4 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -395,7 +395,7 @@ public class Commands PartialTxn partialTxn = emptyTxn.slice(coordinateRanges, true); Invariants.checkState(validate(SaveStatus.Stable, command, coordinateRanges, route, partialTxn, none, null)); CommonAttributes newAttributes = set(SaveStatus.Stable, command, command, coordinateRanges, Ballot.ZERO, route, partialTxn, none); - safeCommand.stable(safeStore, newAttributes, Ballot.ZERO, localSyncId, WaitingOn.EMPTY); + safeCommand.stable(safeStore, newAttributes, Ballot.ZERO, localSyncId, WaitingOn.empty(emptyTxn.keys().domain())); safeStore.notifyListeners(safeCommand); } diff --git a/accord-core/src/main/java/accord/utils/ImmutableBitSet.java b/accord-core/src/main/java/accord/utils/ImmutableBitSet.java index 281c0d16..ef8a33a7 100644 --- a/accord-core/src/main/java/accord/utils/ImmutableBitSet.java +++ b/accord-core/src/main/java/accord/utils/ImmutableBitSet.java @@ -77,4 +77,12 @@ public class ImmutableBitSet extends SimpleBitSet { throw new UnsupportedOperationException(); } + + @Override + public String toString() + { + return "ImmutableBitSet{" + + "count=" + count + + '}'; + } } diff --git a/accord-core/src/test/java/accord/messages/ReadDataTest.java b/accord-core/src/test/java/accord/messages/ReadDataTest.java index f55b8898..72735d11 100644 --- a/accord-core/src/test/java/accord/messages/ReadDataTest.java +++ b/accord-core/src/test/java/accord/messages/ReadDataTest.java @@ -151,7 +151,7 @@ class ReadDataTest CheckedCommands.accept(safe, state.txnId, Ballot.ZERO, state.partialRoute, state.partialTxn.keys(), state.progressKey, state.executeAt, state.deps); SafeCommand safeCommand = safe.ifInitialised(state.txnId); - safeCommand.stable(safe, safeCommand.current(), Ballot.ZERO, state.executeAt, Command.WaitingOn.EMPTY); + safeCommand.stable(safe, safeCommand.current(), Ballot.ZERO, state.executeAt, Command.WaitingOn.empty(state.txnId.domain())); }))); ReplyContext replyContext = state.process(); diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java b/accord-core/src/test/java/accord/utils/AccordGens.java index 6c32f2aa..751a42b6 100644 --- a/accord-core/src/test/java/accord/utils/AccordGens.java +++ b/accord-core/src/test/java/accord/utils/AccordGens.java @@ -30,6 +30,8 @@ import java.util.function.ToIntBiFunction; import javax.annotation.Nullable; +import com.google.common.collect.Iterators; + import accord.api.Key; import accord.api.RoutingKey; import accord.impl.IntHashKey; @@ -41,6 +43,7 @@ import accord.local.RedundantBefore; import accord.primitives.Ballot; import accord.primitives.Deps; import accord.primitives.KeyDeps; +import accord.primitives.Keys; import accord.primitives.Range; import accord.primitives.RangeDeps; import accord.primitives.Ranges; @@ -63,6 +66,11 @@ public class AccordGens return Gens.longs().between(0, Timestamp.MAX_EPOCH); } + public static Gen.LongGen epochs(long min) + { + return Gens.longs().between(min, Timestamp.MAX_EPOCH); + } + public static Gen<Node.Id> nodes() { return nodes(RandomSource::nextInt); @@ -83,6 +91,11 @@ public class AccordGens return rs -> rs.nextLong(0, Long.MAX_VALUE); } + public static Gen.LongGen hlcs(long min) + { + return rs -> rs.nextLong(0, Long.MAX_VALUE); + } + public static Gen<Timestamp> timestamps() { return timestamps(epochs()::nextLong, hlcs(), flags(), RandomSource::nextInt); @@ -544,6 +557,40 @@ public class AccordGens return depsFromKey(keyGen, rangeGen, keyGen); } + public static Gen<Deps> depsFor(TxnId txnId, Txn txn) + { + Gen<KeyDeps> keyDepsGen; + Gen<RangeDeps> rangeDepsGen; + Gen<KeyDeps> directKeyDepsGen; + switch (txnId.kind()) + { + case Write: + case Read: + case EphemeralRead: + { + Gen<? extends Key> keyGen = Gens.pick(Iterators.toArray(((Keys) txn.keys()).iterator(), Key.class)); + keyDepsGen = AccordGens.keyDeps(keyGen, AccordGens.txnIds(Gens.longs().between(0, txnId.epoch()), + Gens.longs().between(0, txnId.hlc()), + RandomSource::nextInt, + Gens.pick(Txn.Kind.Write, Txn.Kind.Read), + ignore -> Routable.Domain.Key)); + rangeDepsGen = i -> RangeDeps.NONE; + directKeyDepsGen = i -> KeyDeps.NONE; + } + break; + case ExclusiveSyncPoint: + case SyncPoint: + case LocalOnly: + //TODO (coverage, now): + keyDepsGen = i -> KeyDeps.NONE; + rangeDepsGen = i -> RangeDeps.NONE; + directKeyDepsGen = i -> KeyDeps.NONE; + break; + default:throw new UnsupportedOperationException(txn.kind().name()); + } + return AccordGens.deps(keyDepsGen, rangeDepsGen, directKeyDepsGen); + } + public static Gen<Command.WaitingOn> waitingOn(Gen<Deps> depsGen, Gen<Boolean> emptyGen, Gen<Boolean> rangeSetGen, Gen<Boolean> directKeySetGen, @@ -551,8 +598,8 @@ public class AccordGens { return rs -> { Deps deps = depsGen.next(rs); - if (deps.isEmpty()) return Command.WaitingOn.EMPTY; - if (emptyGen.next(rs)) return Command.WaitingOn.none(deps); + if (deps.isEmpty()) return Command.WaitingOn.empty(Routable.Domain.Key); + if (emptyGen.next(rs)) return Command.WaitingOn.none(Routable.Domain.Key, deps); int size = deps.rangeDeps.txnIdCount() + deps.directKeyDeps.txnIdCount() + deps.keyDeps.keys().size(); SimpleBitSet set = new SimpleBitSet(size); int directKeyOffset = deps.rangeDeps.txnIdCount(); diff --git a/accord-core/src/test/java/accord/utils/ReflectionUtils.java b/accord-core/src/test/java/accord/utils/ReflectionUtils.java index 0745daa7..facbfd1e 100644 --- a/accord-core/src/test/java/accord/utils/ReflectionUtils.java +++ b/accord-core/src/test/java/accord/utils/ReflectionUtils.java @@ -104,6 +104,12 @@ public class ReflectionUtils accum.add(new Difference<>(path, lhs, rhs)); return; } + if (!lhs.getClass().equals(rhs.getClass())) + { + // when types don't match the field walking won't walk... we know this isn't a match, so just return early + accum.add(new Difference<>(path, lhs, rhs)); + return; + } List<Field> fields = getFields(lhs.getClass()); if (fields.isEmpty()) { diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Packet.java b/accord-maelstrom/src/main/java/accord/maelstrom/Packet.java index 788d5bdc..3d827814 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Packet.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Packet.java @@ -64,6 +64,7 @@ public class Packet implements ReplyContext CheckStatusOkFull(CheckStatus.CheckStatusOkFull.class), InformOfTxnId(InformOfTxnId.class, Json.DEFAULT_ADAPTER), SimpleReply(accord.messages.SimpleReply.class, Json.DEFAULT_ADAPTER), + FailureReply(Reply.FailureReply.class) ; private static final Map<Class<?>, Type> LOOKUP_MAP = Arrays.stream(Type.values()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org