This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit d4648ac969b6399ee96129015dd00e2c8c085568 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Fri Oct 4 10:25:14 2024 +0100 visit journal backwards to save time parsing don't load range commands that are redundant, and load least possible use MISC verb handler for maintenance tasks --- .gitmodules | 4 +- modules/accord | 2 +- .../db/compaction/CompactionIterator.java | 20 +- src/java/org/apache/cassandra/dht/Token.java | 8 + .../cassandra/index/accord/RoutesSearcher.java | 20 +- .../apache/cassandra/journal/InMemoryIndex.java | 27 +- src/java/org/apache/cassandra/journal/Index.java | 2 +- src/java/org/apache/cassandra/journal/Journal.java | 15 +- .../org/apache/cassandra/journal/OnDiskIndex.java | 69 +--- .../apache/cassandra/journal/RecordConsumer.java | 1 - src/java/org/apache/cassandra/journal/Segment.java | 10 +- .../org/apache/cassandra/journal/Segments.java | 5 +- .../apache/cassandra/journal/StaticSegment.java | 4 +- src/java/org/apache/cassandra/net/Verb.java | 8 +- src/java/org/apache/cassandra/schema/TableId.java | 7 +- .../cassandra/service/accord/AccordJournal.java | 1 + .../service/accord/AccordJournalTable.java | 13 +- .../accord/AccordJournalValueSerializers.java | 4 + .../cassandra/service/accord/AccordKeyspace.java | 3 +- .../service/accord/AccordSafeCommandStore.java | 3 +- .../service/accord/AccordSegmentCompactor.java | 23 +- .../service/accord/CommandsForRanges.java | 7 +- .../service/accord/CommandsForRangesLoader.java | 105 +++--- .../cassandra/service/accord/SavedCommand.java | 372 ++++++++++++--------- .../cassandra/service/accord/TokenRange.java | 11 +- .../service/accord/api/AccordRoutableKey.java | 8 + .../service/accord/api/AccordRoutingKey.java | 38 ++- .../cassandra/service/accord/api/PartitionKey.java | 10 +- .../service/accord/async/AsyncLoader.java | 16 +- .../service/accord/async/AsyncOperation.java | 2 +- .../accord/serializers/CommandSerializers.java | 12 + .../service/accord/serializers/KeySerializers.java | 49 ++- .../accord/AccordJournalCompactionTest.java | 29 +- .../test/AccordJournalSimulationTest.java | 2 +- .../index/accord/AccordIndexStressTest.java | 25 +- .../cassandra/index/accord/RouteIndexTest.java | 3 +- .../org/apache/cassandra/journal/IndexTest.java | 25 +- .../org/apache/cassandra/journal/JournalTest.java | 16 +- .../org/apache/cassandra/journal/SegmentTest.java | 16 +- .../service/accord/CommandsForRangesTest.java | 2 +- .../service/accord/async/AsyncLoaderTest.java | 24 +- .../accord/async/SimulatedAsyncOperationTest.java | 2 +- 42 files changed, 611 insertions(+), 412 deletions(-) diff --git a/.gitmodules b/.gitmodules index 616dacf610..1e61f63e19 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = https://github.com/belliottsmith/cassandra-accord.git + branch = followup3 diff --git a/modules/accord b/modules/accord index 3846a378bf..17314e15d4 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 3846a378bfec8d28b312b40cd8541fad2a76e840 +Subproject commit 17314e15d45f46a1572cd10867efc0660169de13 diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index fc993e98da..3a83394495 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -150,7 +150,6 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte { private static final Logger logger = LoggerFactory.getLogger(CompactionIterator.class); private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100; - private static Object[] TRUNCATE_CLUSTERING_VALUE = new Object[] { Long.MAX_VALUE, Integer.MAX_VALUE }; private final OperationType type; private final AbstractCompactionController controller; @@ -1027,7 +1026,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte JournalKey key = null; Object builder = null; FlyweightSerializer<Object, Object> serializer = null; - Object[] lastClustering = null; + Object[] firstClustering = null; long maxSeenTimestamp = -1; final int userVersion; long lastDescriptor = -1; @@ -1058,6 +1057,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte maxSeenTimestamp = -1; lastDescriptor = -1; lastOffset = -1; + firstClustering = null; } @Override @@ -1084,7 +1084,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get()) { serializer.reserialize(key, builder, out, userVersion); - newVersion.row(lastClustering) + newVersion.row(firstClustering) .add("record", out.asNewBuffer()) .add("user_version", userVersion); } @@ -1117,12 +1117,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte PartitionUpdate.SimpleBuilder newVersion = PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey()); - Row.SimpleBuilder rowBuilder; - if (cleanup == TRUNCATE || cleanup == TRUNCATE_WITH_OUTCOME) - rowBuilder = newVersion.row(TRUNCATE_CLUSTERING_VALUE); - else - rowBuilder = newVersion.row(lastClustering); - + Row.SimpleBuilder rowBuilder = newVersion.row(firstClustering); rowBuilder.add("record", commandBuilder.asByteBuffer(userVersion)) .add("user_version", userVersion); @@ -1154,10 +1149,10 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte if (lastOffset != -1) { - Invariants.checkState(descriptor >= lastDescriptor, + Invariants.checkState(descriptor <= lastDescriptor, "Descriptors were accessed out of order: %d was accessed after %d", descriptor, lastDescriptor); Invariants.checkState(descriptor != lastDescriptor || - offset > lastOffset, + offset < lastOffset, "Offsets within %s were accessed out of order: %d was accessed after %s", offset, lastOffset); } lastDescriptor = descriptor; @@ -1167,7 +1162,8 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte { int userVersion = Int32Type.instance.compose(row.getCell(versionColumn).buffer()); serializer.deserialize(key, builder, in, userVersion); - lastClustering = row.clustering().getBufferArray(); + if (firstClustering == null) + firstClustering = row.clustering().getBufferArray(); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java index 92481745f4..048d1a0394 100644 --- a/src/java/org/apache/cassandra/dht/Token.java +++ b/src/java/org/apache/cassandra/dht/Token.java @@ -223,6 +223,14 @@ public abstract class Token implements RingPosition<Token>, Serializable return p.getTokenFactory().fromByteArray(ByteBuffer.wrap(bytes)); } + public void skip(DataInputPlus in, IPartitioner p, int version) throws IOException + { + int size = p.isFixedLength() ? p.getMaxTokenSize() : in.readUnsignedVInt32(); + if (logPartitioner && deserializePartitioners.add(p.getClass())) + logger.debug("Deserializing token with partitioner " + p); + in.skipBytesFully(size); + } + public Token deserialize(DataInputPlus in, IPartitioner p, int version) throws IOException { int size = p.isFixedLength() ? p.getMaxTokenSize() : in.readUnsignedVInt32(); diff --git a/src/java/org/apache/cassandra/index/accord/RoutesSearcher.java b/src/java/org/apache/cassandra/index/accord/RoutesSearcher.java index b90e1278c0..268739290a 100644 --- a/src/java/org/apache/cassandra/index/accord/RoutesSearcher.java +++ b/src/java/org/apache/cassandra/index/accord/RoutesSearcher.java @@ -19,10 +19,11 @@ package org.apache.cassandra.index.accord; import java.util.Collections; -import java.util.HashSet; import java.util.Set; +import accord.primitives.Timestamp; import accord.primitives.TxnId; +import org.agrona.collections.ObjectHashSet; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DataRange; @@ -67,10 +68,10 @@ public class RoutesSearcher limits, dataRange); Index.Searcher s = index.searcherFor(cmd); - try (var controler = cmd.executionController()) + try (var controller = cmd.executionController()) { - UnfilteredPartitionIterator partitionIterator = s.search(controler); - return new CloseableIterator<Entry>() + UnfilteredPartitionIterator partitionIterator = s.search(controller); + return new CloseableIterator<>() { private final Entry entry = new Entry(); @Override @@ -98,21 +99,22 @@ public class RoutesSearcher } } - public Set<TxnId> intersects(int store, TokenRange range) + public Set<TxnId> intersects(int store, TokenRange range, TxnId minTxnId, Timestamp maxTxnId) { - return intersects(store, (AccordRoutingKey) range.start(), (AccordRoutingKey) range.end()); + return intersects(store, range.start(), range.end(), minTxnId, maxTxnId); } - public Set<TxnId> intersects(int store, AccordRoutingKey start, AccordRoutingKey end) + public Set<TxnId> intersects(int store, AccordRoutingKey start, AccordRoutingKey end, TxnId minTxnId, Timestamp maxTxnId) { - var set = new HashSet<TxnId>(); + var set = new ObjectHashSet<TxnId>(); try (var it = searchKeysAccord(store, start, end)) { while (it.hasNext()) { Entry next = it.next(); if (next.store_id != store) continue; // the index should filter out, but just in case... - set.add(next.txnId); + if (next.txnId.compareTo(minTxnId) >= 0 && next.txnId.compareTo(maxTxnId) < 0) + set.add(next.txnId); } } return set.isEmpty() ? Collections.emptySet() : set; diff --git a/src/java/org/apache/cassandra/journal/InMemoryIndex.java b/src/java/org/apache/cassandra/journal/InMemoryIndex.java index 2c71d8c4ff..1f0da7fd28 100644 --- a/src/java/org/apache/cassandra/journal/InMemoryIndex.java +++ b/src/java/org/apache/cassandra/journal/InMemoryIndex.java @@ -18,7 +18,6 @@ package org.apache.cassandra.journal; import java.io.IOException; -import java.util.Arrays; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -61,16 +60,24 @@ final class InMemoryIndex<K> extends Index<K> index.merge(id, new long[] { currentOffsetAndSize }, (current, value) -> { - int idx = Arrays.binarySearch(current, currentOffsetAndSize); - if (idx >= 0) // repeat update() call; shouldn't occur, but we might as well allow this NOOP - return current; + long inserting = value[0]; + int idx = 0; + while (idx < current.length) + { + long cur = current[idx]; + if (cur <= inserting) + { + if (cur == inserting) + return current; // TODO (expected): throw exception? + break; + } + ++idx; + } - /* Merge the new offset with existing values */ - int pos = -idx - 1; long[] merged = new long[current.length + 1]; - System.arraycopy(current, 0, merged, 0, pos); - merged[pos] = currentOffsetAndSize; - System.arraycopy(current, pos, merged, pos + 1, current.length - pos); + System.arraycopy(current, 0, merged, 0, idx); + merged[idx] = inserting; + System.arraycopy(current, idx, merged, idx + 1, current.length - idx); return merged; }); @@ -98,7 +105,7 @@ final class InMemoryIndex<K> extends Index<K> } @Override - public long lookUpFirst(K id) + public long lookUpLast(K id) { long[] offsets = lookUp(id); return offsets.length == 0 ? -1 : offsets[0]; diff --git a/src/java/org/apache/cassandra/journal/Index.java b/src/java/org/apache/cassandra/journal/Index.java index bf6ab5d0c1..ac1e7c1d91 100644 --- a/src/java/org/apache/cassandra/journal/Index.java +++ b/src/java/org/apache/cassandra/journal/Index.java @@ -51,7 +51,7 @@ abstract class Index<K> implements Closeable * * @return the first offset into the segment, or -1 is none were found */ - abstract long lookUpFirst(K id); + abstract long lookUpLast(K id); abstract long[] lookUpAll(K id); diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index 5e91c7d3d3..ba4bf503b2 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -306,15 +306,15 @@ public class Journal<K, V> implements Shutdownable * @return deserialized record if found, null otherwise */ @SuppressWarnings("unused") - public V readFirst(K id) + public V readLast(K id) { EntrySerializer.EntryHolder<K> holder = new EntrySerializer.EntryHolder<>(); try (ReferencedSegments<K, V> segments = selectAndReference(id)) { - for (Segment<K, V> segment : segments.allSorted()) + for (Segment<K, V> segment : segments.allSorted(true)) { - if (segment.readFirst(id, holder)) + if (segment.readLast(id, holder)) { try (DataInputBuffer in = new DataInputBuffer(holder.value, false)) { @@ -336,8 +336,7 @@ public class Journal<K, V> implements Shutdownable EntrySerializer.EntryHolder<K> holder = new EntrySerializer.EntryHolder<>(); try (ReferencedSegments<K, V> segments = selectAndReference(id)) { - consumer.init(); - for (Segment<K, V> segment : segments.allSorted()) + for (Segment<K, V> segment : segments.allSorted(false)) segment.readAll(id, holder, consumer); } } @@ -422,12 +421,12 @@ public class Journal<K, V> implements Shutdownable * @return true if the record was found, false otherwise */ @SuppressWarnings("unused") - public boolean readFirst(K id, RecordConsumer<K> consumer) + public boolean readLast(K id, RecordConsumer<K> consumer) { try (ReferencedSegments<K, V> segments = selectAndReference(id)) { for (Segment<K, V> segment : segments.all()) - if (segment.readFirst(id, consumer)) + if (segment.readLast(id, consumer)) return true; } return false; @@ -448,7 +447,7 @@ public class Journal<K, V> implements Shutdownable { for (K id : test) { - if (segment.index().lookUpFirst(id) != -1) + if (segment.index().lookUpLast(id) != -1) { present.add(id); if (test.size() == present.size()) diff --git a/src/java/org/apache/cassandra/journal/OnDiskIndex.java b/src/java/org/apache/cassandra/journal/OnDiskIndex.java index b2f1487ad0..4994e3a087 100644 --- a/src/java/org/apache/cassandra/journal/OnDiskIndex.java +++ b/src/java/org/apache/cassandra/journal/OnDiskIndex.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; -import java.util.Arrays; import java.util.Map; import java.util.NavigableMap; import java.util.zip.CRC32; @@ -172,8 +171,8 @@ final class OnDiskIndex<K> extends Index<K> if (prev != -1) { long tmp = prev; - Invariants.checkState(readOffset(offsetAndSize) > readOffset(prev), - () -> String.format("Offsets should be strictly monotonic, but found %d following %d", + Invariants.checkState(readOffset(offsetAndSize) < readOffset(prev), + () -> String.format("Offsets should be strictly reverse monotonic, but found %d following %d", readOffset(offsetAndSize), readOffset(tmp))); } out.writeLong(offsetAndSize); @@ -202,53 +201,16 @@ final class OnDiskIndex<K> extends Index<K> @Override public long[] lookUp(K id) { - if (!mayContainId(id)) - return EMPTY; - - int keyIndex = binarySearch(id); - if (keyIndex < 0) - return EMPTY; - - long[] records = new long[] { recordAtIndex(keyIndex) }; - - /* - * Duplicate entries are possible within one segment (but should be rare). - * Check and add entries before and after the found result (not guaranteed to be first). - */ - - for (int i = keyIndex - 1; i >= 0 && id.equals(keyAtIndex(i)); i--) - { - int length = records.length; - records = Arrays.copyOf(records, length + 1); - records[length] = recordAtIndex(i); - } - - for (int i = keyIndex + 1; i < entryCount && id.equals(keyAtIndex(i)); i++) - { - int length = records.length; - records = Arrays.copyOf(records, length + 1); - records[length] = recordAtIndex(i); - } - - Arrays.sort(records); - return records; + return lookUpAll(id); } @Override - public long lookUpFirst(K id) + public long lookUpLast(K id) { if (!mayContainId(id)) return -1L; int keyIndex = binarySearch(id); - - /* - * Duplicate entries are possible within one segment (but should be rare). - * Check and add entries before until we find the first occurrence of key. - */ - for (int i = keyIndex - 1; i >= 0 && id.equals(keyAtIndex(i)); i--) - keyIndex = i; - return keyIndex < 0 ? -1 : recordAtIndex(keyIndex); } @@ -258,27 +220,22 @@ final class OnDiskIndex<K> extends Index<K> if (!mayContainId(id)) return EMPTY; - int start = binarySearch(id); - int firstKeyIndex = start; - - for (int i = firstKeyIndex - 1; i >= 0 && id.equals(keyAtIndex(i)); i--) - firstKeyIndex = i; - - if (firstKeyIndex < 0) + int someIndex = binarySearch(id); + if (someIndex < 0) return EMPTY; - int lastKeyIndex = start; + int firstKeyIndex = someIndex; + while (firstKeyIndex > 0 && id.equals(keyAtIndex(firstKeyIndex - 1))) + --firstKeyIndex; - for (int i = lastKeyIndex + 1; i < entryCount && id.equals(keyAtIndex(i)); i++) - lastKeyIndex = i; + int lastKeyIndex = someIndex; + while (lastKeyIndex + 1 < entryCount && id.equals(keyAtIndex(lastKeyIndex + 1))) + ++lastKeyIndex; long[] all = new long[lastKeyIndex - firstKeyIndex + 1]; int idx = firstKeyIndex; for (int i = 0; i < all.length; i++) - { - all[i] = recordAtIndex(idx); - idx++; - } + all[i] = recordAtIndex(idx++); return all; } diff --git a/src/java/org/apache/cassandra/journal/RecordConsumer.java b/src/java/org/apache/cassandra/journal/RecordConsumer.java index 3403cd0f23..e16194001d 100644 --- a/src/java/org/apache/cassandra/journal/RecordConsumer.java +++ b/src/java/org/apache/cassandra/journal/RecordConsumer.java @@ -24,6 +24,5 @@ import org.agrona.collections.IntHashSet; @FunctionalInterface public interface RecordConsumer<K> { - default void init() {} void accept(long segment, int position, K key, ByteBuffer buffer, IntHashSet hosts, int userVersion); } diff --git a/src/java/org/apache/cassandra/journal/Segment.java b/src/java/org/apache/cassandra/journal/Segment.java index 0da59118b7..7f955669cd 100644 --- a/src/java/org/apache/cassandra/journal/Segment.java +++ b/src/java/org/apache/cassandra/journal/Segment.java @@ -56,9 +56,9 @@ public abstract class Segment<K, V> implements Closeable, RefCounted<Segment<K, * Reading entries (by id, by offset, iterate) */ - boolean readFirst(K id, RecordConsumer<K> consumer) + boolean readLast(K id, RecordConsumer<K> consumer) { - long offsetAndSize = index().lookUpFirst(id); + long offsetAndSize = index().lookUpLast(id); if (offsetAndSize == -1) return false; @@ -74,9 +74,9 @@ public abstract class Segment<K, V> implements Closeable, RefCounted<Segment<K, return false; } - boolean readFirst(K id, EntrySerializer.EntryHolder<K> into) + boolean readLast(K id, EntrySerializer.EntryHolder<K> into) { - long offsetAndSize = index().lookUpFirst(id); + long offsetAndSize = index().lookUpLast(id); if (offsetAndSize == -1 || !read(Index.readOffset(offsetAndSize), Index.readSize(offsetAndSize), into)) return false; Invariants.checkState(id.equals(into.key), "Index for %s read incorrect key: expected %s but read %s", descriptor, id, into.key); @@ -86,10 +86,12 @@ public abstract class Segment<K, V> implements Closeable, RefCounted<Segment<K, void readAll(K id, EntrySerializer.EntryHolder<K> into, RecordConsumer<K> onEntry) { long[] all = index().lookUpAll(id); + int prevOffset = Integer.MAX_VALUE; for (int i = 0; i < all.length; i++) { int offset = Index.readOffset(all[i]); int size = Index.readSize(all[i]); + Invariants.checkState(offset < prevOffset); Invariants.checkState(read(offset, size, into), "Read should always return true"); onEntry.accept(descriptor.timestamp, offset, into.key, into.value, into.hosts, into.userVersion); } diff --git a/src/java/org/apache/cassandra/journal/Segments.java b/src/java/org/apache/cassandra/journal/Segments.java index 94282e9d87..cc98750fc4 100644 --- a/src/java/org/apache/cassandra/journal/Segments.java +++ b/src/java/org/apache/cassandra/journal/Segments.java @@ -106,10 +106,11 @@ class Segments<K, V> /** * Returns segments in timestamp order. Will allocate and sort the segment collection. */ - List<Segment<K, V>> allSorted() + List<Segment<K, V>> allSorted(boolean asc) { List<Segment<K, V>> segments = new ArrayList<>(this.segments.values()); - segments.sort(Comparator.comparing(s -> s.descriptor)); + if (asc) segments.sort(Comparator.comparing(s -> s.descriptor)); + else segments.sort((o1, o2) -> -o1.descriptor.compareTo(o2.descriptor)); return segments; } diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java b/src/java/org/apache/cassandra/journal/StaticSegment.java index f5f15ee13c..3a8c03bb1a 100644 --- a/src/java/org/apache/cassandra/journal/StaticSegment.java +++ b/src/java/org/apache/cassandra/journal/StaticSegment.java @@ -475,10 +475,10 @@ public final class StaticSegment<K, V> extends Segment<K, V> int cmp = keySupport.compare(this.key(), that.key()); if (cmp != 0) return cmp; - cmp = Long.compare(this.descriptor.timestamp, that.descriptor.timestamp); + cmp = Long.compare(that.descriptor.timestamp, this.descriptor.timestamp); if (cmp != 0) return cmp; - return Integer.compare(this.offset, that.offset); + return Integer.compare(that.offset, this.offset); } } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index 87b366300f..7cc9fe9dfa 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -336,10 +336,10 @@ public enum Verb ACCORD_GET_EPHMRL_READ_DEPS_REQ (162, P2, writeTimeout, IMMEDIATE, () -> GetEphmrlReadDepsSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_GET_EPHMRL_READ_DEPS_RSP), ACCORD_GET_MAX_CONFLICT_RSP (163, P2, writeTimeout, IMMEDIATE, () -> GetMaxConflictSerializers.reply, RESPONSE_HANDLER ), ACCORD_GET_MAX_CONFLICT_REQ (164, P2, writeTimeout, IMMEDIATE, () -> GetMaxConflictSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_GET_MAX_CONFLICT_RSP), - ACCORD_FETCH_DATA_RSP (145, P2, writeTimeout,IMMEDIATE, () -> FetchSerializers.reply, RESPONSE_HANDLER ), - ACCORD_FETCH_DATA_REQ (146, P2, writeTimeout,IMMEDIATE, () -> FetchSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_FETCH_DATA_RSP ), - ACCORD_SET_SHARD_DURABLE_REQ (147, P2, writeTimeout, IMMEDIATE, () -> SetDurableSerializers.shardDurable, AccordService::verbHandlerOrNoop, ACCORD_SIMPLE_RSP ), - ACCORD_SET_GLOBALLY_DURABLE_REQ (148, P2, writeTimeout, IMMEDIATE, () -> SetDurableSerializers.globallyDurable,AccordService::verbHandlerOrNoop, ACCORD_SIMPLE_RSP ), + ACCORD_FETCH_DATA_RSP (145, P2, writeTimeout, IMMEDIATE, () -> FetchSerializers.reply, RESPONSE_HANDLER ), + ACCORD_FETCH_DATA_REQ (146, P2, writeTimeout, IMMEDIATE, () -> FetchSerializers.request, AccordService::verbHandlerOrNoop, ACCORD_FETCH_DATA_RSP ), + ACCORD_SET_SHARD_DURABLE_REQ (147, P2, writeTimeout, MISC, () -> SetDurableSerializers.shardDurable, AccordService::verbHandlerOrNoop, ACCORD_SIMPLE_RSP ), + ACCORD_SET_GLOBALLY_DURABLE_REQ (148, P2, writeTimeout, MISC, () -> SetDurableSerializers.globallyDurable,AccordService::verbHandlerOrNoop, ACCORD_SIMPLE_RSP ), ACCORD_QUERY_DURABLE_BEFORE_RSP (149, P2, writeTimeout, IMMEDIATE, () -> QueryDurableBeforeSerializers.reply, RESPONSE_HANDLER ), ACCORD_QUERY_DURABLE_BEFORE_REQ (150, P2, writeTimeout, IMMEDIATE, () -> QueryDurableBeforeSerializers.request,AccordService::verbHandlerOrNoop, ACCORD_QUERY_DURABLE_BEFORE_RSP ), diff --git a/src/java/org/apache/cassandra/schema/TableId.java b/src/java/org/apache/cassandra/schema/TableId.java index ac486f71d9..302d7db6bf 100644 --- a/src/java/org/apache/cassandra/schema/TableId.java +++ b/src/java/org/apache/cassandra/schema/TableId.java @@ -185,6 +185,11 @@ public class TableId implements Comparable<TableId> return 16; } + public static int staticSerializedSize() + { + return 16; + } + public static TableId deserialize(DataInput in) throws IOException { return new TableId(new UUID(in.readLong(), in.readLong())); @@ -201,7 +206,7 @@ public class TableId implements Comparable<TableId> return id.compareTo(o.id); } - public static final IVersionedSerializer<TableId> serializer = new IVersionedSerializer<TableId>() + public static final IVersionedSerializer<TableId> serializer = new IVersionedSerializer<>() { @Override public void serialize(TableId t, DataOutputPlus out, int version) throws IOException diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 310bbeb636..49af8042a8 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -323,6 +323,7 @@ public class AccordJournal implements IJournal, Shutdownable BUILDER builder = (BUILDER) key.type.serializer.mergerFor(key); // TODO: this can be further improved to avoid allocating lambdas AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER> serializer = (AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>) key.type.serializer; + // TODO (expected): for those where we store an image, read only the first entry we find in DESC order journalTable.readAll(key, (in, userVersion) -> serializer.deserialize(key, builder, in, userVersion)); return builder; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java index ef3ac9eff3..5935a910b5 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java @@ -143,8 +143,7 @@ public class AccordJournalTable<K, V> this.tableRecordConsumer = new TableRecordConsumer(reader); } - @Override - public void init() + void readTable() { readAllFromTable(key, tableRecordConsumer); } @@ -164,7 +163,9 @@ public class AccordJournalTable<K, V> */ public void readAll(K key, Reader reader) { - journal.readAll(key, new JournalAndTableRecordConsumer(key, reader)); + JournalAndTableRecordConsumer consumer = new JournalAndTableRecordConsumer(key, reader); + journal.readAll(key, consumer); + consumer.readTable(); } private void readAllFromTable(K key, TableRecordConsumer onEntry) @@ -332,15 +333,15 @@ public class AccordJournalTable<K, V> { K tableKey = tableIterator.key(); K journalKey = staticSegmentIterator.key(); - if (tableKey != null && keySupport.compare(tableKey, key) == 0) - tableIterator.readAllForKey(key, reader); - if (journalKey != null && keySupport.compare(journalKey, key) == 0) staticSegmentIterator.readAllForKey(key, (segment, position, key1, buffer, hosts, userVersion) -> { if (!tableIterator.visited(segment)) reader.accept(segment, position, key1, buffer, hosts, userVersion); }); + if (tableKey != null && keySupport.compare(tableKey, key) == 0) + tableIterator.readAllForKey(key, reader); + tableIterator.clear(); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java index 289b5c1b98..60a1ef4f58 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java @@ -116,6 +116,7 @@ public class AccordJournalValueSerializers public static class IdentityAccumulator<T> extends Accumulator<T, T> { + boolean hasRead; public IdentityAccumulator(T initial) { super(initial); @@ -124,6 +125,9 @@ public class AccordJournalValueSerializers @Override protected T accumulate(T oldValue, T newValue) { + if (hasRead) + return oldValue; + hasRead = true; return newValue; } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index ca1326156e..0a8cb085fb 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -174,6 +174,7 @@ public class AccordKeyspace TOPOLOGIES, EPOCH_METADATA, JOURNAL); + // TODO (desired): implement a custom type so we can get correct sort order private static final TupleType TIMESTAMP_TYPE = new TupleType(Lists.newArrayList(LongType.instance, LongType.instance, Int32Type.instance)); private static final String TIMESTAMP_TUPLE = TIMESTAMP_TYPE.asCQL3Type().toString(); private static final TupleType KEY_TYPE = new TupleType(Arrays.asList(UUIDType.instance, BytesType.instance)); @@ -235,7 +236,7 @@ public class AccordKeyspace + "user_version int," + "record blob," + "PRIMARY KEY(key, descriptor, offset)" - + ") WITH compression = {'class':'NoopCompressor'};") + + ") WITH CLUSTERING ORDER BY (descriptor DESC, offset DESC) WITH compression = {'class':'NoopCompressor'};") .partitioner(new LocalPartitioner(BytesType.instance)) .build(); diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java index 0b256574f4..624fcc378b 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java @@ -48,6 +48,7 @@ import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; import accord.primitives.Unseekables; +import accord.utils.Invariants; public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeCommand, AccordSafeTimestampsForKey, AccordSafeCommandsForKey> { @@ -74,7 +75,7 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC this.commandStore = commandStore; commandStore.updateRangesForEpoch(this); if (this.ranges == null) - this.ranges = commandStore.unsafeRangesForEpoch(); + this.ranges = Invariants.nonNull(commandStore.unsafeRangesForEpoch()); } public static AccordSafeCommandStore create(PreLoadContext preLoadContext, diff --git a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java index f94510b8b8..c6fba012a5 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java @@ -85,8 +85,8 @@ public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V JournalKey key = null; Object builder = null; FlyweightSerializer<Object, Object> serializer = null; - long lastDescriptor = -1; - int lastOffset = -1; + long firstDescriptor = -1, lastDescriptor = -1; + int firstOffset = -1, lastOffset = -1; try { KeyOrderReader<JournalKey> reader; @@ -94,13 +94,13 @@ public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V { if (key == null || !reader.key().equals(key)) { - maybeWritePartition(cfs, writer, key, builder, serializer, lastDescriptor, lastOffset); + maybeWritePartition(cfs, writer, key, builder, serializer, firstDescriptor, firstOffset); key = reader.key(); serializer = (FlyweightSerializer<Object, Object>) key.type.serializer; builder = serializer.mergerFor(key); - lastOffset = -1; - lastDescriptor = -1; + firstDescriptor = lastDescriptor = -1; + firstOffset = lastOffset = -1; } boolean advanced; @@ -110,15 +110,20 @@ public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V { if (lastDescriptor != -1) { - Invariants.checkState(reader.descriptor.timestamp >= lastDescriptor, + Invariants.checkState(reader.descriptor.timestamp <= lastDescriptor, "Descriptors were accessed out of order: %d was accessed after %d", reader.descriptor.timestamp, lastDescriptor); Invariants.checkState(reader.descriptor.timestamp != lastDescriptor || - reader.offset() > lastOffset, - "Offsets within %s were accessed out of order: %d was accessed after %s", reader.offset(), lastOffset); + reader.offset() < lastOffset, + "Offsets were accessed out of order: %d was accessed after %s", reader.offset(), lastOffset); } serializer.deserialize(key, builder, in, reader.descriptor.userVersion); lastDescriptor = reader.descriptor.timestamp; lastOffset = reader.offset(); + if (firstDescriptor == -1) + { + firstDescriptor = lastDescriptor; + firstOffset = lastOffset; + } } } while ((advanced = reader.advance()) && reader.key().equals(key)); @@ -126,7 +131,7 @@ public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V if (advanced) readers.offer(reader); // there is more to this reader, but not with this key } - maybeWritePartition(cfs, writer, key, builder, serializer, lastDescriptor, lastOffset); + maybeWritePartition(cfs, writer, key, builder, serializer, firstDescriptor, firstOffset); } catch (Throwable t) { diff --git a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java index c664ff62c3..edfde0fb1f 100644 --- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java +++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java @@ -39,6 +39,7 @@ import accord.primitives.SaveStatus; import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; +import accord.utils.Invariants; import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; import static accord.local.SafeCommandStore.TestDep.WITH; @@ -107,7 +108,7 @@ public class CommandsForRanges implements CommandsSummary // range specific logic... ranges don't update CommandsForRange based off the life cycle and instead // merge the cache with the disk state; so exclude states that should get removed from CommandsFor* - if (summary.saveStatus.compareTo(SaveStatus.Erased) >= 0) + if (summary.saveStatus != null && summary.saveStatus.compareTo(SaveStatus.Erased) >= 0) return; switch (testStatus) @@ -153,8 +154,10 @@ public class CommandsForRanges implements CommandsSummary // and so it is safe to execute, when in fact it is only a dependency on a different shard // (and that other shard, perhaps, does not know that it is a dependency - and so it is not durably known) // TODO (required): consider this some more - if ((testDep == WITH) == !summary.depsIds.contains(testTxnId)) + if ((testDep == WITH) == !summary.hasAsDep) return; + + Invariants.checkState(testTxnId.equals(summary.findAsDep)); } // TODO (required): ensure we are excluding any ranges that are now shard-redundant (not sure if this is enforced yet) diff --git a/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java b/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java index 6324735883..5afa153fcd 100644 --- a/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java +++ b/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java @@ -21,9 +21,6 @@ package org.apache.cassandra.service.accord; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Set; @@ -35,7 +32,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import accord.local.Command; -import accord.local.DurableBefore; +import accord.local.KeyHistory; +import accord.local.RedundantBefore; import accord.primitives.SaveStatus; import accord.primitives.Status; import accord.primitives.Range; @@ -46,16 +44,19 @@ import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.utils.async.AsyncChains; import accord.utils.async.AsyncResult; +import org.agrona.collections.ObjectHashSet; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.index.accord.RoutesSearcher; import org.apache.cassandra.service.accord.api.AccordRoutingKey; import org.apache.cassandra.utils.Pair; +import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; + public class CommandsForRangesLoader { private final RoutesSearcher searcher = new RoutesSearcher(); //TODO (now, durability): find solution for this... - private final Map<TxnId, Ranges> historicalTransaction = new HashMap<>(); + private final NavigableMap<TxnId, Ranges> historicalTransaction = new TreeMap<>(); private final AccordCommandStore store; public CommandsForRangesLoader(AccordCommandStore store) @@ -63,34 +64,38 @@ public class CommandsForRangesLoader this.store = store; } - public AsyncResult<Pair<Watcher, NavigableMap<TxnId, Summary>>> get(Ranges ranges) + public AsyncResult<Pair<Watcher, NavigableMap<TxnId, Summary>>> get(@Nullable TxnId primaryTxnId, KeyHistory keyHistory, Ranges ranges) { - var watcher = fromCache(ranges); + RedundantBefore redundantBefore = store.unsafeGetRedundantBefore(); + TxnId minTxnId = redundantBefore.minGcBefore(ranges); + Timestamp maxTxnId = primaryTxnId == null || keyHistory == KeyHistory.RECOVERY || !primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX : primaryTxnId; + TxnId findAsDep = primaryTxnId != null && keyHistory == KeyHistory.RECOVERY ? primaryTxnId : null; + var watcher = fromCache(findAsDep, ranges, minTxnId, maxTxnId, redundantBefore); var before = ImmutableMap.copyOf(watcher.get()); - return AsyncChains.ofCallable(Stage.READ.executor(), () -> get(ranges, before)) + return AsyncChains.ofCallable(Stage.READ.executor(), () -> get(ranges, before, findAsDep, minTxnId, maxTxnId, redundantBefore)) .map(map -> Pair.create(watcher, map), store) .beginAsResult(); } - private NavigableMap<TxnId, Summary> get(Ranges ranges, Map<TxnId, Summary> cacheHits) + private NavigableMap<TxnId, Summary> get(Ranges ranges, Map<TxnId, Summary> cacheHits, @Nullable TxnId findAsDep, TxnId minTxnId, Timestamp maxTxnId, RedundantBefore redundantBefore) { - Set<TxnId> matches = new HashSet<>(); + Set<TxnId> matches = new ObjectHashSet<>(); for (Range range : ranges) - matches.addAll(intersects(range)); + matches.addAll(intersects(range, minTxnId, maxTxnId)); if (matches.isEmpty()) return new TreeMap<>(); - return load(ranges, cacheHits, matches); + return load(ranges, cacheHits, matches, findAsDep, redundantBefore); } - private Collection<TxnId> intersects(Range range) + private Collection<TxnId> intersects(Range range, TxnId minTxnId, Timestamp maxTxnId) { assert range instanceof TokenRange : "Require TokenRange but given " + range.getClass(); - Set<TxnId> intersects = searcher.intersects(store.id(), (TokenRange) range); + Set<TxnId> intersects = searcher.intersects(store.id(), (TokenRange) range, minTxnId, maxTxnId); if (!historicalTransaction.isEmpty()) { if (intersects.isEmpty()) - intersects = new HashSet<>(); - for (var e : historicalTransaction.entrySet()) + intersects = new ObjectHashSet<>(); + for (var e : historicalTransaction.tailMap(minTxnId, true).entrySet()) { if (e.getValue().intersects(range)) intersects.add(e.getKey()); @@ -104,13 +109,21 @@ public class CommandsForRangesLoader public class Watcher implements AccordStateCache.Listener<TxnId, Command>, AutoCloseable { private final Ranges ranges; + private final @Nullable TxnId findAsDep; + private final TxnId minTxnId; + private final Timestamp maxTxnId; + private final RedundantBefore redundantBefore; private NavigableMap<TxnId, Summary> summaries = null; - private List<AccordCachingState<TxnId, Command>> needToDoubleCheck = null; + private Set<AccordCachingState<TxnId, Command>> needToDoubleCheck = null; - public Watcher(Ranges ranges) + public Watcher(Ranges ranges, @Nullable TxnId findAsDep, TxnId minTxnId, Timestamp maxTxnId, RedundantBefore redundantBefore) { this.ranges = ranges; + this.findAsDep = findAsDep; + this.minTxnId = minTxnId; + this.maxTxnId = maxTxnId; + this.redundantBefore = redundantBefore; } public NavigableMap<TxnId, Summary> get() @@ -123,15 +136,18 @@ public class CommandsForRangesLoader { if (n.key().domain() != Routable.Domain.Range) return; + if (n.key().compareTo(minTxnId) < 0 || n.key().compareTo(maxTxnId) >= 0) + return; + var state = n.state(); if (state instanceof AccordCachingState.Loading) { if (needToDoubleCheck == null) - needToDoubleCheck = new ArrayList<>(); + needToDoubleCheck = new ObjectHashSet<>(); needToDoubleCheck.add(n); return; } - //TODO (now): include FailedToSave? Most likely need to, but need to improve test coverage to have failed writes + //TODO (required): include FailedToSave? Most likely need to, but need to improve test coverage to have failed writes if (!(state instanceof AccordCachingState.Loaded || state instanceof AccordCachingState.Modified || state instanceof AccordCachingState.Saving)) @@ -140,7 +156,7 @@ public class CommandsForRangesLoader var cmd = state.get(); if (cmd == null) return; - Summary summary = create(cmd, ranges, null); + Summary summary = create(cmd, ranges, findAsDep, redundantBefore); if (summary != null) { if (summaries == null) @@ -175,19 +191,18 @@ public class CommandsForRangesLoader } } - private Watcher fromCache(Ranges ranges) + private Watcher fromCache(@Nullable TxnId findAsDep, Ranges ranges, TxnId minTxnId, Timestamp maxTxnId, RedundantBefore redundantBefore) { - Watcher watcher = new Watcher(ranges); + Watcher watcher = new Watcher(ranges, findAsDep, minTxnId, maxTxnId, redundantBefore); store.commandCache().stream().forEach(watcher::onAdd); store.commandCache().register(watcher); return watcher; } - private NavigableMap<TxnId, Summary> load(Ranges ranges, Map<TxnId, Summary> cacheHits, Collection<TxnId> possibleTxns) + private NavigableMap<TxnId, Summary> load(Ranges ranges, Map<TxnId, Summary> cacheHits, Collection<TxnId> possibleTxns, @Nullable TxnId findAsDep, RedundantBefore redundantBefore) { - //TODO (now): this logic is kinda duplicate of org.apache.cassandra.service.accord.CommandsForRange.mapReduce + //TODO (required): this logic is kinda duplicate of org.apache.cassandra.service.accord.CommandsForRange.mapReduce // should figure out if this can be improved... also what is correct? - var durableBefore = store.durableBefore(); NavigableMap<TxnId, Summary> map = new TreeMap<>(); for (TxnId txnId : possibleTxns) { @@ -196,7 +211,7 @@ public class CommandsForRangesLoader var cmd = store.loadCommand(txnId); if (cmd == null) continue; // unknown command - var summary = create(cmd, ranges, durableBefore); + var summary = create(cmd, ranges, findAsDep, redundantBefore); if (summary == null) continue; map.put(txnId, summary); @@ -204,9 +219,9 @@ public class CommandsForRangesLoader return map; } - private static Summary create(Command cmd, Ranges cacheRanges, @Nullable DurableBefore durableBefore) + private static Summary create(Command cmd, Ranges cacheRanges, @Nullable TxnId findAsDep, @Nullable RedundantBefore redundantBefore) { - //TODO (now, correctness): C* did Invalidated, accord-core did Erased... what is correct? + //TODO (required, correctness): C* did Invalidated, accord-core did Erased... what is correct? SaveStatus saveStatus = cmd.saveStatus(); if (saveStatus == SaveStatus.Invalidated || saveStatus == SaveStatus.Erased @@ -223,10 +238,10 @@ public class CommandsForRangesLoader if (!ranges.intersects(cacheRanges)) return null; - if (durableBefore != null) + if (redundantBefore != null) { - Ranges durableAlready = Ranges.of(durableBefore.foldlWithBounds(ranges, (e, accum, start, end) -> { - if (e.universalBefore.compareTo(cmd.txnId()) < 0) + Ranges durableAlready = Ranges.of(redundantBefore.foldlWithBounds(ranges, (e, accum, start, end) -> { + if (e.gcBefore.compareTo(cmd.txnId()) < 0) return accum; accum.add(new TokenRange((AccordRoutingKey) start, (AccordRoutingKey) end)); return accum; @@ -238,8 +253,8 @@ public class CommandsForRangesLoader } var partialDeps = cmd.partialDeps(); - List<TxnId> deps = partialDeps == null ? Collections.emptyList() : partialDeps.txnIds(); - return new Summary(cmd.txnId(), cmd.executeAt(), saveStatus, ranges, deps); + boolean hasAsDep = findAsDep != null && partialDeps.rangeDeps.intersects(findAsDep, ranges); + return new Summary(cmd.txnId(), cmd.executeAt(), saveStatus, ranges, findAsDep, hasAsDep); } public void mergeHistoricalTransaction(TxnId txnId, Ranges ranges, BiFunction<? super Ranges, ? super Ranges, ? extends Ranges> remappingFunction) @@ -250,25 +265,28 @@ public class CommandsForRangesLoader public static class Summary { public final TxnId txnId; - @Nullable - public final Timestamp executeAt; - public final SaveStatus saveStatus; - public final Ranges ranges; - public final List<TxnId> depsIds; + @Nullable public final Timestamp executeAt; + @Nullable public final SaveStatus saveStatus; + @Nullable public final Ranges ranges; + + // TODO (required): this logic is still broken (was already): needs to consider exact range matches + public final TxnId findAsDep; + public final boolean hasAsDep; @VisibleForTesting - Summary(TxnId txnId, @Nullable Timestamp executeAt, SaveStatus saveStatus, Ranges ranges, List<TxnId> depsIds) + Summary(TxnId txnId, @Nullable Timestamp executeAt, SaveStatus saveStatus, Ranges ranges, TxnId findAsDep, boolean hasAsDep) { this.txnId = txnId; this.executeAt = executeAt; this.saveStatus = saveStatus; this.ranges = ranges; - this.depsIds = depsIds; + this.findAsDep = findAsDep; + this.hasAsDep = hasAsDep; } public Summary slice(Ranges slice) { - return new Summary(txnId, executeAt, saveStatus, ranges.slice(slice, Routables.Slice.Minimal), depsIds); + return new Summary(txnId, executeAt, saveStatus, ranges.slice(slice, Routables.Slice.Minimal), findAsDep, hasAsDep); } @Override @@ -279,7 +297,8 @@ public class CommandsForRangesLoader ", executeAt=" + executeAt + ", saveStatus=" + saveStatus + ", ranges=" + ranges + - ", depsIds=" + depsIds + + ", findAsDep=" + findAsDep + + ", hasAsDep=" + hasAsDep + '}'; } } diff --git a/src/java/org/apache/cassandra/service/accord/SavedCommand.java b/src/java/org/apache/cassandra/service/accord/SavedCommand.java index 209208989f..a0cd86bb5b 100644 --- a/src/java/org/apache/cassandra/service/accord/SavedCommand.java +++ b/src/java/org/apache/cassandra/service/accord/SavedCommand.java @@ -161,43 +161,63 @@ public class SavedCommand public static void serialize(Command before, Command after, DataOutputPlus out, int userVersion) throws IOException { int flags = getFlags(before, after); - out.writeInt(flags); - // We encode all changed fields unless their value is null - if (getFieldChanged(Fields.EXECUTE_AT, flags) && after.executeAt() != null) - CommandSerializers.timestamp.serialize(after.executeAt(), out, userVersion); - // TODO (desired): check if this can fold into executeAt - if (getFieldChanged(Fields.EXECUTES_AT_LEAST, flags) && after.executesAtLeast() != null) - CommandSerializers.timestamp.serialize(after.executesAtLeast(), out, userVersion); - if (getFieldChanged(Fields.SAVE_STATUS, flags)) - out.writeInt(after.saveStatus().ordinal()); - if (getFieldChanged(Fields.DURABILITY, flags) && after.durability() != null) - out.writeInt(after.durability().ordinal()); - - if (getFieldChanged(Fields.ACCEPTED, flags) && after.acceptedOrCommitted() != null) - CommandSerializers.ballot.serialize(after.acceptedOrCommitted(), out, userVersion); - if (getFieldChanged(Fields.PROMISED, flags) && after.promised() != null) - CommandSerializers.ballot.serialize(after.promised(), out, userVersion); - - if (getFieldChanged(Fields.PARTICIPANTS, flags) && after.participants() != null) - CommandSerializers.participants.serialize(after.participants(), out, userVersion); - if (getFieldChanged(Fields.PARTIAL_TXN, flags) && after.partialTxn() != null) - CommandSerializers.partialTxn.serialize(after.partialTxn(), out, userVersion); - if (getFieldChanged(Fields.PARTIAL_DEPS, flags) && after.partialDeps() != null) - DepsSerializer.partialDeps.serialize(after.partialDeps(), out, userVersion); - - Command.WaitingOn waitingOn = getWaitingOn(after); - if (getFieldChanged(Fields.WAITING_ON, flags) && waitingOn != null) - { - long size = WaitingOnSerializer.serializedSize(waitingOn); - ByteBuffer serialized = WaitingOnSerializer.serialize(after.txnId(), waitingOn); - out.writeInt((int) size); - out.write(serialized); - } - - if (getFieldChanged(Fields.WRITES, flags) && after.writes() != null) - CommandSerializers.writes.serialize(after.writes(), out, userVersion); + int iterable = toIterableSetFields(flags); + while (iterable != 0) + { + Fields field = nextSetField(iterable); + if (getFieldIsNull(field, flags)) + { + iterable = unsetIterableFields(field, iterable); + continue; + } + + switch (field) + { + case EXECUTE_AT: + CommandSerializers.timestamp.serialize(after.executeAt(), out, userVersion); + break; + case EXECUTES_AT_LEAST: + CommandSerializers.timestamp.serialize(after.executesAtLeast(), out, userVersion); + break; + case SAVE_STATUS: + out.writeShort(after.saveStatus().ordinal()); + break; + case DURABILITY: + out.writeByte(after.durability().ordinal()); + break; + case ACCEPTED: + CommandSerializers.ballot.serialize(after.acceptedOrCommitted(), out, userVersion); + break; + case PROMISED: + CommandSerializers.ballot.serialize(after.promised(), out, userVersion); + break; + case PARTICIPANTS: + CommandSerializers.participants.serialize(after.participants(), out, userVersion); + break; + case PARTIAL_TXN: + CommandSerializers.partialTxn.serialize(after.partialTxn(), out, userVersion); + break; + case PARTIAL_DEPS: + DepsSerializer.partialDeps.serialize(after.partialDeps(), out, userVersion); + break; + case WAITING_ON: + Command.WaitingOn waitingOn = getWaitingOn(after); + long size = WaitingOnSerializer.serializedSize(waitingOn); + ByteBuffer serialized = WaitingOnSerializer.serialize(after.txnId(), waitingOn); + out.writeInt((int) size); + out.write(serialized); + break; + case WRITES: + CommandSerializers.writes.serialize(after.writes(), out, userVersion); + break; + case CLEANUP: + throw new IllegalStateException(); + } + + iterable = unsetIterableFields(field, iterable); + } } @VisibleForTesting @@ -258,13 +278,29 @@ public class SavedCommand private static int setFieldChanged(Fields field, int oldFlags) { - return oldFlags | (1 << (field.ordinal() + Short.SIZE)); + return oldFlags | (0x10000 << field.ordinal()); } @VisibleForTesting static boolean getFieldChanged(Fields field, int oldFlags) { - return (oldFlags & (1 << (field.ordinal() + Short.SIZE))) != 0; + return (oldFlags & (0x10000 << field.ordinal())) != 0; + } + + static int toIterableSetFields(int flags) + { + return flags >>> 16; + } + + static Fields nextSetField(int iterable) + { + int i = Integer.numberOfTrailingZeros(Integer.lowestOneBit(iterable)); + return i == 32 ? null : Fields.FIELDS[i]; + } + + static int unsetIterableFields(Fields field, int iterable) + { + return iterable & ~(1 << field.ordinal()); } @VisibleForTesting @@ -547,43 +583,61 @@ public class SavedCommand { out.writeInt(flags); - // We encode all changed fields unless their value is null - if (getFieldChanged(Fields.EXECUTE_AT, flags) && !getFieldIsNull(Fields.EXECUTE_AT, flags)) - CommandSerializers.timestamp.serialize(executeAt(), out, userVersion); - // TODO (desired): check if this can fold into executeAt - if (getFieldChanged(Fields.EXECUTES_AT_LEAST, flags) && !getFieldIsNull(Fields.EXECUTES_AT_LEAST, flags)) - CommandSerializers.timestamp.serialize(executeAtLeast(), out, userVersion); - if (getFieldChanged(Fields.SAVE_STATUS, flags) && !getFieldIsNull(Fields.SAVE_STATUS, flags)) - out.writeInt(saveStatus().ordinal()); - if (getFieldChanged(Fields.DURABILITY, flags) && !getFieldIsNull(Fields.DURABILITY, flags)) - out.writeInt(durability().ordinal()); - - if (getFieldChanged(Fields.ACCEPTED, flags) && !getFieldIsNull(Fields.ACCEPTED, flags)) - CommandSerializers.ballot.serialize(acceptedOrCommitted(), out, userVersion); - if (getFieldChanged(Fields.PROMISED, flags) && !getFieldIsNull(Fields.PROMISED, flags)) - CommandSerializers.ballot.serialize(promised(), out, userVersion); - - if (getFieldChanged(Fields.PARTICIPANTS, flags) && !getFieldIsNull(Fields.PARTICIPANTS, flags)) - CommandSerializers.participants.serialize(participants(), out, userVersion); - if (getFieldChanged(Fields.PARTIAL_TXN, flags) && !getFieldIsNull(Fields.PARTIAL_TXN, flags)) - CommandSerializers.partialTxn.serialize(partialTxn(), out, userVersion); - if (getFieldChanged(Fields.PARTIAL_DEPS, flags) && !getFieldIsNull(Fields.PARTIAL_DEPS, flags)) - DepsSerializer.partialDeps.serialize(partialDeps(), out, userVersion); - - if (getFieldChanged(Fields.WAITING_ON, flags) && !getFieldIsNull(Fields.WAITING_ON, flags)) + int iterable = toIterableSetFields(flags); + while (iterable != 0) { - out.writeInt(waitingOnBytes.length); - out.write(waitingOnBytes); - } + Fields field = nextSetField(iterable); + if (getFieldIsNull(field, flags)) + { + iterable = unsetIterableFields(field, iterable); + continue; + } - if (getFieldChanged(Fields.WRITES, flags) && !getFieldIsNull(Fields.WRITES, flags)) - CommandSerializers.writes.serialize(writes(), out, userVersion); + switch (field) + { + case EXECUTE_AT: + CommandSerializers.timestamp.serialize(executeAt(), out, userVersion); + break; + case EXECUTES_AT_LEAST: + CommandSerializers.timestamp.serialize(executeAtLeast(), out, userVersion); + break; + case SAVE_STATUS: + out.writeShort(saveStatus().ordinal()); + break; + case DURABILITY: + out.writeByte(durability().ordinal()); + break; + case ACCEPTED: + CommandSerializers.ballot.serialize(acceptedOrCommitted(), out, userVersion); + break; + case PROMISED: + CommandSerializers.ballot.serialize(promised(), out, userVersion); + break; + case PARTICIPANTS: + CommandSerializers.participants.serialize(participants(), out, userVersion); + break; + case PARTIAL_TXN: + CommandSerializers.partialTxn.serialize(partialTxn(), out, userVersion); + break; + case PARTIAL_DEPS: + DepsSerializer.partialDeps.serialize(partialDeps(), out, userVersion); + break; + case WAITING_ON: + out.writeInt(waitingOnBytes.length); + out.write(waitingOnBytes); + break; + case WRITES: + CommandSerializers.writes.serialize(writes(), out, userVersion); + break; + case CLEANUP: + out.writeByte(cleanup.ordinal()); + break; + } - if (getFieldChanged(Fields.CLEANUP, flags)) - out.writeByte(cleanup.ordinal()); + iterable = unsetIterableFields(field, iterable); + } } - // TODO: we seem to be writing some form of empty transaction @SuppressWarnings({ "rawtypes", "unchecked" }) public void deserializeNext(DataInputPlus in, int userVersion) throws IOException @@ -593,97 +647,65 @@ public class SavedCommand nextCalled = true; count++; - for (Fields field : Fields.FIELDS) + int iterable = toIterableSetFields(flags); + while (iterable != 0) { - if (getFieldChanged(field, flags)) + Fields field = nextSetField(iterable); + if (getFieldChanged(field, this.flags)) { - this.flags = setFieldChanged(field, this.flags); - if (getFieldIsNull(field, flags)) - this.flags = setFieldIsNull(field, this.flags); - else - this.flags = unsetFieldIsNull(field, this.flags); - } - } + if (!getFieldIsNull(field, flags)) + skip(field, in, userVersion); - if (getFieldChanged(Fields.EXECUTE_AT, flags)) - { - if (getFieldIsNull(Fields.EXECUTE_AT, flags)) - executeAt = null; - else - executeAt = CommandSerializers.timestamp.deserialize(in, userVersion); - } + iterable = unsetIterableFields(field, iterable); + continue; + } + this.flags = setFieldChanged(field, this.flags); - if (getFieldChanged(Fields.EXECUTES_AT_LEAST, flags)) - { - if (getFieldIsNull(Fields.EXECUTES_AT_LEAST, flags)) - executeAtLeast = null; + if (getFieldIsNull(field, flags)) + { + this.flags = setFieldIsNull(field, this.flags); + } else - executeAtLeast = CommandSerializers.timestamp.deserialize(in, userVersion); - } + { + deserialize(field, in, userVersion); + } - if (getFieldChanged(Fields.SAVE_STATUS, flags)) - { - if (getFieldIsNull(Fields.SAVE_STATUS, flags)) - saveStatus = null; - else - saveStatus = SaveStatus.values()[in.readInt()]; - } - if (getFieldChanged(Fields.DURABILITY, flags)) - { - if (getFieldIsNull(Fields.DURABILITY, flags)) - durability = null; - else - durability = Status.Durability.values()[in.readInt()]; + iterable = unsetIterableFields(field, iterable); } + } - if (getFieldChanged(Fields.ACCEPTED, flags)) + private void deserialize(Fields field, DataInputPlus in, int userVersion) throws IOException + { + switch (field) { - if (getFieldIsNull(Fields.ACCEPTED, flags)) - acceptedOrCommitted = null; - else + case EXECUTE_AT: + executeAt = CommandSerializers.timestamp.deserialize(in, userVersion); + break; + case EXECUTES_AT_LEAST: + executeAtLeast = CommandSerializers.timestamp.deserialize(in, userVersion); + break; + case SAVE_STATUS: + saveStatus = SaveStatus.values()[in.readShort()]; + break; + case DURABILITY: + durability = Status.Durability.values()[in.readByte()]; + break; + case ACCEPTED: acceptedOrCommitted = CommandSerializers.ballot.deserialize(in, userVersion); - } - - if (getFieldChanged(Fields.PROMISED, flags)) - { - if (getFieldIsNull(Fields.PROMISED, flags)) - promised = null; - else + break; + case PROMISED: promised = CommandSerializers.ballot.deserialize(in, userVersion); - } - - if (getFieldChanged(Fields.PARTICIPANTS, flags)) - { - if (getFieldIsNull(Fields.PARTICIPANTS, flags)) - participants = null; - else + break; + case PARTICIPANTS: participants = CommandSerializers.participants.deserialize(in, userVersion); - } - - if (getFieldChanged(Fields.PARTIAL_TXN, flags)) - { - if (getFieldIsNull(Fields.PARTIAL_TXN, flags)) - partialTxn = null; - else + break; + case PARTIAL_TXN: partialTxn = CommandSerializers.partialTxn.deserialize(in, userVersion); - } - - if (getFieldChanged(Fields.PARTIAL_DEPS, flags)) - { - if (getFieldIsNull(Fields.PARTIAL_DEPS, flags)) - partialDeps = null; - else + break; + case PARTIAL_DEPS: partialDeps = DepsSerializer.partialDeps.deserialize(in, userVersion); - } - - if (getFieldChanged(Fields.WAITING_ON, flags)) - { - if (getFieldIsNull(Fields.WAITING_ON, flags)) - { - waitingOn = null; - } - else - { + break; + case WAITING_ON: int size = in.readInt(); waitingOnBytes = new byte[size]; in.readFully(waitingOnBytes); @@ -699,22 +721,56 @@ public class SavedCommand throw Throwables.unchecked(e); } }; - } - } - - if (getFieldChanged(Fields.WRITES, flags)) - { - if (getFieldIsNull(Fields.WRITES, flags)) - writes = null; - else + break; + case WRITES: writes = CommandSerializers.writes.deserialize(in, userVersion); + break; + case CLEANUP: + Cleanup newCleanup = Cleanup.forOrdinal(in.readByte()); + if (cleanup == null || newCleanup.compareTo(cleanup) > 0) + cleanup = newCleanup; + break; } + } - if (getFieldChanged(Fields.CLEANUP, flags)) + private void skip(Fields field, DataInputPlus in, int userVersion) throws IOException + { + switch (field) { - Cleanup newCleanup = Cleanup.forOrdinal(in.readByte()); - if (cleanup == null || newCleanup.compareTo(cleanup) > 0) - cleanup = newCleanup; + case EXECUTE_AT: + case EXECUTES_AT_LEAST: + CommandSerializers.timestamp.skip(in, userVersion); + break; + case SAVE_STATUS: + in.readShort(); + break; + case DURABILITY: + in.readByte(); + break; + case ACCEPTED: + case PROMISED: + CommandSerializers.ballot.skip(in, userVersion); + break; + case PARTICIPANTS: + CommandSerializers.participants.deserialize(in, userVersion); + break; + case PARTIAL_TXN: + CommandSerializers.partialTxn.deserialize(in, userVersion); + break; + case PARTIAL_DEPS: + DepsSerializer.partialDeps.deserialize(in, userVersion); + break; + case WAITING_ON: + int size = in.readInt(); + in.skipBytesFully(size); + break; + case WRITES: + // TODO (expected): skip + CommandSerializers.writes.deserialize(in, userVersion); + break; + case CLEANUP: + in.readByte(); + break; } } diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java b/src/java/org/apache/cassandra/service/accord/TokenRange.java index aed0270979..70135b7a36 100644 --- a/src/java/org/apache/cassandra/service/accord/TokenRange.java +++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java @@ -104,7 +104,10 @@ public class TokenRange extends Range.EndInclusive return new org.apache.cassandra.dht.Range<>(left, right); } - public static final IVersionedSerializer<TokenRange> serializer = new IVersionedSerializer<>() + + public static final Serializer serializer = new Serializer(); + + public static final class Serializer implements IVersionedSerializer<TokenRange> { @Override public void serialize(TokenRange range, DataOutputPlus out, int version) throws IOException @@ -113,6 +116,12 @@ public class TokenRange extends Range.EndInclusive AccordRoutingKey.serializer.serialize(range.end(), out, version); } + public void skip(DataInputPlus in, int version) throws IOException + { + AccordRoutingKey.serializer.skip(in, version); + AccordRoutingKey.serializer.skip(in, version); + } + @Override public TokenRange deserialize(DataInputPlus in, int version) throws IOException { diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java b/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java index 1869310f5b..18d6926bc5 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java @@ -18,16 +18,24 @@ package org.apache.cassandra.service.accord.api; +import java.io.IOException; import java.util.Objects; import accord.primitives.RoutableKey; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey; import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; public abstract class AccordRoutableKey implements RoutableKey { + public interface AccordKeySerializer<T> extends IVersionedSerializer<T> + { + void skip(DataInputPlus in, int version) throws IOException; + } + final TableId table; // TODO (desired): use an id (TrM) protected AccordRoutableKey(TableId table) diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java index 1f29c16d96..6d8d2b8184 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java @@ -37,7 +37,6 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.marshal.ByteBufferAccessor; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; @@ -168,7 +167,7 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout return isMin ? "-Inf" : "+Inf"; } - public static final IVersionedSerializer<SentinelKey> serializer = new IVersionedSerializer<SentinelKey>() + public static final AccordKeySerializer<SentinelKey> serializer = new AccordKeySerializer<SentinelKey>() { @Override public void serialize(SentinelKey key, DataOutputPlus out, int version) throws IOException @@ -177,6 +176,12 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout out.writeBoolean(key.isMin); } + @Override + public void skip(DataInputPlus in, int version) throws IOException + { + in.skipBytesFully(TableId.staticSerializedSize() + 1); + } + @Override public SentinelKey deserialize(DataInputPlus in, int version) throws IOException { @@ -260,7 +265,7 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout } public static final Serializer serializer = new Serializer(); - public static class Serializer implements IVersionedSerializer<TokenKey> + public static class Serializer implements AccordKeySerializer<TokenKey> { private Serializer() {} @@ -271,6 +276,14 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout Token.compactSerializer.serialize(key.token, out, version); } + @Override + public void skip(DataInputPlus in, int version) throws IOException + { + in.skipBytesFully(TableId.staticSerializedSize()); + // TODO (expected): should we be using the TableId partitioner here? + Token.compactSerializer.skip(in, getPartitioner(), version); + } + @Override public TokenKey deserialize(DataInputPlus in, int version) throws IOException { @@ -306,7 +319,7 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout } } - public static class Serializer implements IVersionedSerializer<AccordRoutingKey> + public static class Serializer implements AccordKeySerializer<AccordRoutingKey> { static final RoutingKeyKind[] kinds = RoutingKeyKind.values(); @@ -358,6 +371,23 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout } } + @Override + public void skip(DataInputPlus in, int version) throws IOException + { + RoutingKeyKind kind = kinds[in.readByte()]; + switch (kind) + { + case TOKEN: + TokenKey.serializer.skip(in, version); + break; + case SENTINEL: + SentinelKey.serializer.skip(in, version); + break; + default: + throw new IllegalArgumentException(); + } + } + @Override public AccordRoutingKey deserialize(DataInputPlus in, int version) throws IOException { diff --git a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java index 71feb7d88e..28a180d7f1 100644 --- a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java +++ b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java @@ -119,7 +119,7 @@ public final class PartitionKey extends AccordRoutableKey implements Key } public static final Serializer serializer = new Serializer(); - public static class Serializer implements IVersionedSerializer<PartitionKey> + public static class Serializer implements AccordKeySerializer<PartitionKey> { // TODO: add vint to value accessor and use vints private Serializer() {} @@ -144,6 +144,14 @@ public final class PartitionKey extends AccordRoutableKey implements Key } + @Override + public void skip(DataInputPlus in, int version) throws IOException + { + TableId tableId = TableId.deserialize(in); + IPartitioner partitioner = Schema.instance.getExistingTablePartitioner(tableId); + ByteBufferUtil.skipShortLength(in); + } + @Override public PartitionKey deserialize(DataInputPlus in, int version) throws IOException { diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java index d33805a091..5c622ee680 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java @@ -42,6 +42,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.stream.Collectors; +import javax.annotation.Nullable; public class AsyncLoader { @@ -126,6 +127,7 @@ public class AsyncLoader referenceAndAssembleReadsForKey(key, context.timestampsForKey, commandStore.timestampsForKeyCache(), listenChains); break; case COMMANDS: + case RECOVERY: referenceAndAssembleReadsForKey(key, context.commandsForKey, commandStore.commandsForKeyCache(), listenChains); case NONE: break; @@ -141,7 +143,7 @@ public class AsyncLoader keys.forEach(key -> referenceAndAssembleReadsForKey(key, context, cache, listenChains)); } - private AsyncResult<?> referenceAndDispatchReads(AsyncOperation.Context context) + private AsyncResult<?> referenceAndDispatchReads(@Nullable TxnId primaryTxnId, AsyncOperation.Context context) { List<AsyncChain<?>> chains = new ArrayList<>(); @@ -154,7 +156,7 @@ public class AsyncLoader keys.forEach(key -> referenceAndAssembleReadsForKey(key, context, chains)); break; case Range: - chains.add(referenceAndDispatchReadsForRange(context)); + chains.add(referenceAndDispatchReadsForRange(primaryTxnId, context)); break; default: throw new UnsupportedOperationException("Unable to process keys of " + keysOrRanges.domain()); @@ -163,7 +165,7 @@ public class AsyncLoader return !chains.isEmpty() ? AsyncChains.reduce(chains, (a, b) -> null).beginAsResult() : null; } - private AsyncChain<?> referenceAndDispatchReadsForRange(AsyncOperation.Context context) + private AsyncChain<?> referenceAndDispatchReadsForRange(@Nullable TxnId primaryTxnId, AsyncOperation.Context context) { Ranges ranges = ((AbstractRanges) keysOrRanges).toRanges(); @@ -183,6 +185,8 @@ public class AsyncLoader cached.add(pk); } } + + // TODO (required): this needs to be optimised (e.g. to not load redundant commands, but maybe to be avoided altogether with async evaluation) Watcher watcher = new Watcher(); commandStore.commandsForKeyCache().register(watcher); root.add(findOverlappingKeys(ranges).flatMap(keys -> { @@ -195,7 +199,7 @@ public class AsyncLoader return chains.isEmpty() ? AsyncChains.success(null) : AsyncChains.reduce(chains, (a, b) -> null); }, commandStore)); - var chain = commandStore.diskCommandsForRanges().get(ranges); + var chain = commandStore.diskCommandsForRanges().get(primaryTxnId, keyHistory, ranges); root.add(chain); context.commandsForRanges = new AccordSafeCommandsForRanges(ranges, chain); @@ -234,7 +238,7 @@ public class AsyncLoader this.state = state; } - public boolean load(AsyncOperation.Context context, BiConsumer<Object, Throwable> callback) + public boolean load(@Nullable TxnId primaryTxnId, AsyncOperation.Context context, BiConsumer<Object, Throwable> callback) { logger.trace("Running load for {} with state {}: {} {}", callback, state, txnIds, keysOrRanges); commandStore.checkInStoreThread(); @@ -244,7 +248,7 @@ public class AsyncLoader state(State.SETUP); case SETUP: - readResult = referenceAndDispatchReads(context); + readResult = referenceAndDispatchReads(primaryTxnId, context); state(State.LOADING); case LOADING: diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java index 2c2867aa65..cc5bee6d32 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java @@ -240,7 +240,7 @@ public abstract class AsyncOperation<R> extends AsyncChains.Head<R> implements R case INITIALIZED: state(LOADING); case LOADING: - if (!loader.load(context, this::onLoaded)) + if (!loader.load(preLoadContext.primaryTxnId(), context, this::onLoaded)) return false; state(PREPARING); if (loadOnly) diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java index 16acd2ab0b..a1205d74a2 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java @@ -111,6 +111,13 @@ public class CommandSerializers if (!ownsEqualsTouches) KeySerializers.participants.serialize(t.owns(), out, version); } + public void skip(DataInputPlus in, int version) throws IOException + { + int flags = in.readByte(); + if (0 != (flags & HAS_ROUTE)) + KeySerializers.route.deserialize(in, version); + } + @Override public StoreParticipants deserialize(DataInputPlus in, int version) throws IOException { @@ -194,6 +201,11 @@ public class CommandSerializers TopologySerializers.nodeId.serialize(ts.node, out); } + public void skip(DataInputPlus in, int version) throws IOException + { + in.skipBytesFully(serializedSize()); + } + @Override public T deserialize(DataInputPlus in, int version) throws IOException { diff --git a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java index f2f9f8ac1a..fa1cfe1697 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java @@ -50,6 +50,8 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.service.accord.TokenRange; +import org.apache.cassandra.service.accord.api.AccordRoutableKey; +import org.apache.cassandra.service.accord.api.AccordRoutableKey.AccordKeySerializer; import org.apache.cassandra.service.accord.api.AccordRoutingKey; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.utils.NullableSerializer; @@ -58,11 +60,11 @@ public class KeySerializers { private KeySerializers() {} - public static final IVersionedSerializer<Key> key = (IVersionedSerializer<Key>) (IVersionedSerializer<?>) PartitionKey.serializer; - public static final IVersionedSerializer<RoutingKey> routingKey = (IVersionedSerializer<RoutingKey>) (IVersionedSerializer<?>) AccordRoutingKey.serializer; + public static final AccordKeySerializer<Key> key = (AccordKeySerializer<Key>) (AccordKeySerializer<?>) PartitionKey.serializer; + public static final IVersionedSerializer<RoutingKey> routingKey = (AccordKeySerializer<RoutingKey>) (AccordKeySerializer<?>) AccordRoutingKey.serializer; public static final IVersionedSerializer<RoutingKey> nullableRoutingKey = NullableSerializer.wrap(routingKey); - public static final IVersionedSerializer<RoutingKeys> routingKeys = new AbstractKeysSerializer<RoutingKey, RoutingKeys>(routingKey, RoutingKey[]::new) + public static final AbstractKeysSerializer<RoutingKey, RoutingKeys> routingKeys = new AbstractKeysSerializer<>(routingKey, RoutingKey[]::new) { @Override RoutingKeys deserialize(DataInputPlus in, int version, RoutingKey[] keys) { @@ -78,7 +80,7 @@ public class KeySerializers } }; - public static final IVersionedSerializer<Ranges> ranges = new AbstractRangesSerializer<Ranges>() + public static final AbstractRangesSerializer<Ranges> ranges = new AbstractRangesSerializer<Ranges>() { @Override public Ranges deserialize(DataInputPlus in, int version, Range[] ranges) @@ -87,7 +89,7 @@ public class KeySerializers } }; - public static final IVersionedSerializer<PartialKeyRoute> partialKeyRoute = new AbstractKeysSerializer<RoutingKey, PartialKeyRoute>(routingKey, RoutingKey[]::new) + public static final AbstractKeysSerializer<?, PartialKeyRoute> partialKeyRoute = new AbstractKeysSerializer<RoutingKey, PartialKeyRoute>(routingKey, RoutingKey[]::new) { @Override PartialKeyRoute deserialize(DataInputPlus in, int version, RoutingKey[] keys) throws IOException { @@ -110,7 +112,7 @@ public class KeySerializers } }; - public static final IVersionedSerializer<FullKeyRoute> fullKeyRoute = new AbstractKeysSerializer<>(routingKey, RoutingKey[]::new) + public static final AbstractKeysSerializer<?, FullKeyRoute> fullKeyRoute = new AbstractKeysSerializer<>(routingKey, RoutingKey[]::new) { @Override FullKeyRoute deserialize(DataInputPlus in, int version, RoutingKey[] keys) throws IOException { @@ -133,7 +135,7 @@ public class KeySerializers } }; - public static final IVersionedSerializer<PartialRangeRoute> partialRangeRoute = new AbstractRangesSerializer<PartialRangeRoute>() + public static final AbstractRangesSerializer<PartialRangeRoute> partialRangeRoute = new AbstractRangesSerializer<>() { @Override PartialRangeRoute deserialize(DataInputPlus in, int version, Range[] rs) throws IOException { @@ -157,7 +159,7 @@ public class KeySerializers } }; - public static final IVersionedSerializer<FullRangeRoute> fullRangeRoute = new AbstractRangesSerializer<FullRangeRoute>() + public static final AbstractRangesSerializer<FullRangeRoute> fullRangeRoute = new AbstractRangesSerializer<>() { @Override FullRangeRoute deserialize(DataInputPlus in, int version, Range[] Ranges) throws IOException { @@ -180,7 +182,7 @@ public class KeySerializers } }; - public static final IVersionedSerializer<Route<?>> route = new AbstractRoutablesSerializer<>( + public static final AbstractRoutablesSerializer<Route<?>> route = new AbstractRoutablesSerializer<>( EnumSet.of(UnseekablesKind.PartialKeyRoute, UnseekablesKind.FullKeyRoute, UnseekablesKind.PartialRangeRoute, UnseekablesKind.FullRangeRoute) ); public static final IVersionedSerializer<Route<?>> nullableRoute = NullableSerializer.wrap(route); @@ -271,6 +273,21 @@ public class KeySerializers return result; } + public void skip(DataInputPlus in, int version) throws IOException + { + byte b = in.readByte(); + switch (b) + { + default: throw new IOException("Corrupted input: expected byte 1, 2, 3, 4 or 5; received " + b); + case 1: routingKeys.skip(in, version); break; + case 2: partialKeyRoute.skip(in, version); break; + case 3: fullKeyRoute.skip(in, version); break; + case 4: ranges.skip(in, version); break; + case 5: partialRangeRoute.skip(in, version); break; + case 6: fullRangeRoute.skip(in, version); break; + } + } + @Override public long serializedSize(RS t, int version) { @@ -362,6 +379,13 @@ public class KeySerializers abstract KS deserialize(DataInputPlus in, int version, K[] keys) throws IOException; + public void skip(DataInputPlus in, int version) throws IOException + { + int count = in.readUnsignedVInt32(); + for (int i = 0; i < count ; i++) + keySerializer.deserialize(in, version); + } + @Override public KS deserialize(DataInputPlus in, int version) throws IOException { @@ -391,6 +415,13 @@ public class KeySerializers TokenRange.serializer.serialize((TokenRange) ranges.get(i), out, version); } + public void skip(DataInputPlus in, int version) throws IOException + { + int count = in.readUnsignedVInt32(); + for (int i = 0; i < count ; i++) + TokenRange.serializer.deserialize(in, version); + } + @Override public RS deserialize(DataInputPlus in, int version) throws IOException { diff --git a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java index a24cc81e9b..eebfda397a 100644 --- a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java @@ -19,9 +19,12 @@ package org.apache.cassandra.service.accord; import java.nio.file.Files; +import java.util.Collections; +import java.util.List; import java.util.NavigableMap; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; import org.junit.Assert; import org.junit.Before; @@ -88,9 +91,9 @@ public class AccordJournalCompactionTest RedundantBeforeAccumulator redundantBeforeAccumulator = new RedundantBeforeAccumulator(); DurableBeforeAccumulator durableBeforeAccumulator = new DurableBeforeAccumulator(); - IdentityAccumulator<NavigableMap<TxnId, Ranges>> bootstrapBeganAtAccumulator = new IdentityAccumulator<>(ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY)); - IdentityAccumulator<NavigableMap<Timestamp, Ranges>> safeToReadAccumulator = new IdentityAccumulator<>(ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY)); - IdentityAccumulator<RangesForEpoch.Snapshot> rangesForEpochAccumulator = new IdentityAccumulator<>(null); + NavigableMap<Timestamp, Ranges> safeToReadAtAccumulator = ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY); + NavigableMap<TxnId, Ranges> bootstrapBeganAtAccumulator = ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY); + RangesForEpoch.Snapshot rangesForEpochAccumulator = null; HistoricalTransactionsAccumulator historicalTransactionsAccumulator = new HistoricalTransactionsAccumulator(); Gen<RedundantBefore> redundantBeforeGen = AccordGenerators.redundantBefore(DatabaseDescriptor.getPartitioner()); @@ -118,7 +121,7 @@ public class AccordJournalCompactionTest journal.start(null); Timestamp timestamp = Timestamp.NONE; - RandomSource rs = new DefaultRandom(); + RandomSource rs = new DefaultRandom(1); int count = 1_000; // RedundantBefore redundantBefore = RedundantBefore.EMPTY; @@ -140,9 +143,11 @@ public class AccordJournalCompactionTest redundantBeforeAccumulator.update(updates.newRedundantBefore); durableBeforeAccumulator.update(addDurableBefore); if (updates.newBootstrapBeganAt != null) - bootstrapBeganAtAccumulator.update(updates.newBootstrapBeganAt); - safeToReadAccumulator.update(updates.newSafeToRead); - rangesForEpochAccumulator.update(updates.newRangesForEpoch); + bootstrapBeganAtAccumulator = updates.newBootstrapBeganAt; + if (updates.newSafeToRead != null) + safeToReadAtAccumulator = updates.newSafeToRead; + if (updates.newRangesForEpoch != null) + rangesForEpochAccumulator = updates.newRangesForEpoch; historicalTransactionsAccumulator.update(updates.addHistoricalTransactions); if (i % 100 == 0) @@ -153,10 +158,12 @@ public class AccordJournalCompactionTest // Assert.assertEquals(redundantBeforeAccumulator.get(), journal.loadRedundantBefore(1)); Assert.assertEquals(durableBeforeAccumulator.get(), journal.durableBeforePersister().load()); - Assert.assertEquals(bootstrapBeganAtAccumulator.get(), journal.loadBootstrapBeganAt(1)); - Assert.assertEquals(safeToReadAccumulator.get(), journal.loadSafeToRead(1)); - Assert.assertEquals(rangesForEpochAccumulator.get(), journal.loadRangesForEpoch(1)); - Assert.assertEquals(historicalTransactionsAccumulator.get(), journal.loadHistoricalTransactions(1)); + Assert.assertEquals(bootstrapBeganAtAccumulator, journal.loadBootstrapBeganAt(1)); + Assert.assertEquals(safeToReadAtAccumulator, journal.loadSafeToRead(1)); + Assert.assertEquals(rangesForEpochAccumulator, journal.loadRangesForEpoch(1)); + List<Deps> historical = historicalTransactionsAccumulator.get(); + Collections.reverse(historical); + Assert.assertEquals(historical, journal.loadHistoricalTransactions(1)); } finally { diff --git a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java index 4c339ebcfe..73bb343489 100644 --- a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java +++ b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java @@ -105,7 +105,7 @@ public class AccordJournalSimulationTest extends SimulationTestBase for (int i = 0; i < count; i++) { State.logger.debug("Reading {}", i); - Assert.assertEquals(State.journal.readFirst("test" + i), "test" + i); + Assert.assertEquals(State.journal.readLast("test" + i), "test" + i); } } diff --git a/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java b/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java index fb278581df..bf41340c88 100644 --- a/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java +++ b/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java @@ -49,11 +49,13 @@ import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.Routable; import accord.primitives.Route; +import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; import accord.utils.RandomSource; import org.agrona.collections.Int2ObjectHashMap; import org.agrona.collections.Long2ObjectHashMap; +import org.agrona.collections.ObjectHashSet; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.UntypedResultSet; @@ -297,7 +299,8 @@ public class AccordIndexStressTest extends CQLTester } var startNs = nanoTime(); - Set<TxnId> actual = read(store, start, end); + // TODO (desired): randomise lower bound for reading + Set<TxnId> actual = read(store, start, end, TxnId.NONE, Timestamp.MAX); var durationNs = nanoTime() - startNs; samples[size] = durationNs; counts[size++] = actual.size(); @@ -334,32 +337,36 @@ public class AccordIndexStressTest extends CQLTester return durationNs >= SLOW_NS; } - private Set<TxnId> read(int store, AccordRoutingKey start, AccordRoutingKey end) + private Set<TxnId> read(int store, AccordRoutingKey start, AccordRoutingKey end, TxnId minTxnId, Timestamp maxTxnId) { switch (read) { case INDEX: - return readIndex(store, start, end); + return readIndex(store, start, end, minTxnId, maxTxnId); case CQL: - return readCQL(store, start, end); + return readCQL(store, start, end, minTxnId, maxTxnId); default: throw new AssertionError("Unknown read type: " + read); } } - private Set<TxnId> readIndex(int store, AccordRoutingKey start, AccordRoutingKey end) + private Set<TxnId> readIndex(int store, AccordRoutingKey start, AccordRoutingKey end, TxnId minTxnId, Timestamp maxTxnId) { - return searcher.intersects(store, start, end); + return searcher.intersects(store, start, end, minTxnId, maxTxnId); } - private Set<TxnId> readCQL(int store, AccordRoutingKey start, AccordRoutingKey end) + private Set<TxnId> readCQL(int store, AccordRoutingKey start, AccordRoutingKey end, TxnId minTxnId, Timestamp maxTxnId) { - Set<TxnId> actual = new HashSet<>(); + Set<TxnId> actual = new ObjectHashSet<>(); try { UntypedResultSet results = execute("SELECT txn_id FROM system_accord.commands WHERE store_id = ? AND route > ? AND route <= ?", store, OrderedRouteSerializer.serializeRoutingKey(start), OrderedRouteSerializer.serializeRoutingKey(end)); for (var row : results) - actual.add(AccordKeyspace.deserializeTxnId(row)); + { + TxnId txnId = AccordKeyspace.deserializeTxnId(row); + if (txnId.compareTo(minTxnId) >= 0 && txnId.compareTo(maxTxnId) < 0) + actual.add(txnId); + } } catch (ReadSizeAbortException e) { diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java index bd5bc5eb93..3716236756 100644 --- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java +++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java @@ -44,6 +44,7 @@ import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.Routable.Domain; import accord.primitives.Route; +import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; import accord.utils.Gen; @@ -345,7 +346,7 @@ public class RouteIndexTest extends CQLTester.InMemory @Override public Set<TxnId> run(ColumnFamilyStore sut) throws Throwable { - return ROUTES_SEARCHER.intersects(storeId, range); + return ROUTES_SEARCHER.intersects(storeId, range, TxnId.NONE, Timestamp.MAX); } @Override diff --git a/test/unit/org/apache/cassandra/journal/IndexTest.java b/test/unit/org/apache/cassandra/journal/IndexTest.java index 8f1046f2c3..ce5ed00ebe 100644 --- a/test/unit/org/apache/cassandra/journal/IndexTest.java +++ b/test/unit/org/apache/cassandra/journal/IndexTest.java @@ -32,6 +32,7 @@ import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Test; +import accord.utils.Invariants; import org.agrona.collections.IntHashSet; import org.apache.cassandra.io.util.File; import org.apache.cassandra.utils.Generators; @@ -87,8 +88,8 @@ public class IndexTest assertArrayEquals(EMPTY, index.lookUp(key0)); assertArrayEquals(new long[] { composeOffsetAndSize(val11, 1) }, index.lookUp(key1)); - assertArrayEquals(new long[] { composeOffsetAndSize(val21, 2), composeOffsetAndSize(val22, 3) }, index.lookUp(key2)); - assertArrayEquals(new long[] { composeOffsetAndSize(val31, 4), composeOffsetAndSize(val32, 5), composeOffsetAndSize(val33, 6) }, index.lookUp(key3)); + assertArrayEquals(new long[] { composeOffsetAndSize(val22, 3), composeOffsetAndSize(val21, 2) }, index.lookUp(key2)); + assertArrayEquals(new long[] { composeOffsetAndSize(val33, 6), composeOffsetAndSize(val32, 5), composeOffsetAndSize(val31, 4) }, index.lookUp(key3)); assertArrayEquals(EMPTY, index.lookUp(key4)); assertEquals(key1, index.firstId()); @@ -160,13 +161,23 @@ public class IndexTest Gen<long[]> valueGen = rs -> { long[] array = new long[(int) rs.next(valueSizeConstraint)]; IntHashSet uniq = new IntHashSet(); - for (int i = 0; i < array.length; i++) + for (int i = 0 ; i < array.length ; ++i) { int offset = (int) rs.next(positionConstraint); while (!uniq.add(offset)) offset = (int) rs.next(positionConstraint); array[i] = Index.composeOffsetAndSize(offset, (int) rs.next(positionConstraint)); } + + Arrays.sort(array); + for (int i = 0 ; i < array.length / 2 ; ++i) + { + int back = array.length - (1 + i); + long v = array[i]; + array[i] = array[back]; + array[back] = v; + } + return array; }; Gen<Map<TimeUUID, long[]>> gen = rs -> { @@ -190,7 +201,7 @@ public class IndexTest }); File directory = new File(Files.createTempDirectory(null)); directory.deleteOnExit(); - qt().forAll(gen).checkAssert(map -> test(directory, map)); + qt().withFixedSeed(185124544959375L).forAll(gen).checkAssert(map -> test(directory, map)); } private static void test(File directory, Map<TimeUUID, long[]> map) @@ -206,7 +217,8 @@ public class IndexTest continue; for (long i : value) inMemory.update(key, Index.readOffset(i), Index.readSize(i)); - Arrays.sort(value); + for (int i = 1 ; i < value.length ; ++i) + Invariants.checkState(value[i - 1] > value[i]); } assertIndex(map, inMemory); @@ -262,6 +274,9 @@ public class IndexTest long[] value = e.getValue(); long[] read = actual.lookUp(key); + if (!Arrays.equals(value, read)) + actual.lookUp(key); + if (value.length == 0) { assertThat(read).describedAs("Index %s returned wrong values for %s", actual, key).isEmpty(); diff --git a/test/unit/org/apache/cassandra/journal/JournalTest.java b/test/unit/org/apache/cassandra/journal/JournalTest.java index 30952a96d8..7dd1c948a8 100644 --- a/test/unit/org/apache/cassandra/journal/JournalTest.java +++ b/test/unit/org/apache/cassandra/journal/JournalTest.java @@ -65,20 +65,20 @@ public class JournalTest journal.blockingWrite(id3, 3L, Collections.singleton(1)); journal.blockingWrite(id4, 4L, Collections.singleton(1)); - assertEquals(1L, (long) journal.readFirst(id1)); - assertEquals(2L, (long) journal.readFirst(id2)); - assertEquals(3L, (long) journal.readFirst(id3)); - assertEquals(4L, (long) journal.readFirst(id4)); + assertEquals(1L, (long) journal.readLast(id1)); + assertEquals(2L, (long) journal.readLast(id2)); + assertEquals(3L, (long) journal.readLast(id3)); + assertEquals(4L, (long) journal.readLast(id4)); journal.shutdown(); journal = new Journal<>("TestJournal", directory, TestParams.INSTANCE, TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop()); journal.start(); - assertEquals(1L, (long) journal.readFirst(id1)); - assertEquals(2L, (long) journal.readFirst(id2)); - assertEquals(3L, (long) journal.readFirst(id3)); - assertEquals(4L, (long) journal.readFirst(id4)); + assertEquals(1L, (long) journal.readLast(id1)); + assertEquals(2L, (long) journal.readLast(id2)); + assertEquals(3L, (long) journal.readLast(id3)); + assertEquals(4L, (long) journal.readLast(id4)); journal.shutdown(); } diff --git a/test/unit/org/apache/cassandra/journal/SegmentTest.java b/test/unit/org/apache/cassandra/journal/SegmentTest.java index 573ba4c9e0..d78fae8ecf 100644 --- a/test/unit/org/apache/cassandra/journal/SegmentTest.java +++ b/test/unit/org/apache/cassandra/journal/SegmentTest.java @@ -76,22 +76,22 @@ public class SegmentTest // read all 4 entries by id and compare with originals EntrySerializer.EntryHolder<TimeUUID> holder = new EntrySerializer.EntryHolder<>(); - segment.readFirst(id1, holder); + segment.readLast(id1, holder); assertEquals(id1, holder.key); assertEquals(hosts1, holder.hosts); assertEquals(record1, holder.value); - segment.readFirst(id2, holder); + segment.readLast(id2, holder); assertEquals(id2, holder.key); assertEquals(hosts2, holder.hosts); assertEquals(record2, holder.value); - segment.readFirst(id3, holder); + segment.readLast(id3, holder); assertEquals(id3, holder.key); assertEquals(hosts3, holder.hosts); assertEquals(record3, holder.value); - segment.readFirst(id4, holder); + segment.readLast(id4, holder); assertEquals(id4, holder.key); assertEquals(hosts4, holder.hosts); assertEquals(record4, holder.value); @@ -143,22 +143,22 @@ public class SegmentTest // read all 4 entries by id and compare with originals EntrySerializer.EntryHolder<TimeUUID> holder = new EntrySerializer.EntryHolder<>(); - staticSegment.readFirst(id1, holder); + staticSegment.readLast(id1, holder); assertEquals(id1, holder.key); assertEquals(hosts1, holder.hosts); assertEquals(record1, holder.value); - staticSegment.readFirst(id2, holder); + staticSegment.readLast(id2, holder); assertEquals(id2, holder.key); assertEquals(hosts2, holder.hosts); assertEquals(record2, holder.value); - staticSegment.readFirst(id3, holder); + staticSegment.readLast(id3, holder); assertEquals(id3, holder.key); assertEquals(hosts3, holder.hosts); assertEquals(record3, holder.value); - staticSegment.readFirst(id4, holder); + staticSegment.readLast(id4, holder); assertEquals(id4, holder.key); assertEquals(hosts4, holder.hosts); assertEquals(record4, holder.value); diff --git a/test/unit/org/apache/cassandra/service/accord/CommandsForRangesTest.java b/test/unit/org/apache/cassandra/service/accord/CommandsForRangesTest.java index 93e35bc92c..115a231d5f 100644 --- a/test/unit/org/apache/cassandra/service/accord/CommandsForRangesTest.java +++ b/test/unit/org/apache/cassandra/service/accord/CommandsForRangesTest.java @@ -51,7 +51,7 @@ public class CommandsForRangesTest for (int i = 0; i < numTxn; i++) { TxnId id = TXN_ID_GEN.next(rs); - map.put(id, new CommandsForRangesLoader.Summary(id, id, SaveStatus.ReadyToExecute, ranges, Collections.emptyList())); + map.put(id, new CommandsForRangesLoader.Summary(id, id, SaveStatus.ReadyToExecute, ranges, null, false)); } return CommandsForRanges.create(ranges, map); }; diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java index 78af413da8..680777bde8 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java @@ -123,7 +123,7 @@ public class AsyncLoaderTest // everything is cached, so the loader should return immediately commandStore.executeBlocking(() -> { Context context = new Context(); - boolean result = loader.load(context, (o, t) -> Assert.fail()); + boolean result = loader.load(txnId, context, (o, t) -> Assert.fail()); Assert.assertEquals(safeCommandGlobal, context.commands.get(txnId).global()); Assert.assertEquals(safeTimestampsGlobal, context.timestampsForKey.get(key).global()); Assert.assertTrue(result); @@ -162,7 +162,7 @@ public class AsyncLoaderTest AsyncPromise<Void> cbFired = new AsyncPromise<>(); Context context = new Context(); commandStore.executeBlocking(() -> { - boolean result = loader.load(context, (o, t) -> { + boolean result = loader.load(txnId, context, (o, t) -> { Assert.assertNull(t); Assert.assertTrue(context.commands.containsKey(txnId)); Assert.assertTrue(context.timestampsForKey.containsKey(key)); @@ -175,7 +175,7 @@ public class AsyncLoaderTest // then return immediately after the callback has fired commandStore.executeBlocking(() -> { - boolean result = loader.load(context, (o, t) -> Assert.fail()); + boolean result = loader.load(txnId, context, (o, t) -> Assert.fail()); Assert.assertTrue(context.commands.containsKey(txnId)); Assert.assertTrue(context.timestampsForKey.containsKey(key)); Assert.assertTrue(result); @@ -210,7 +210,7 @@ public class AsyncLoaderTest AsyncPromise<Void> cbFired = new AsyncPromise<>(); Context context = new Context(); commandStore.executeBlocking(() -> { - boolean result = loader.load(context, (o, t) -> { + boolean result = loader.load(txnId, context, (o, t) -> { Assert.assertNull(t); Assert.assertTrue(context.commands.containsKey(txnId)); Assert.assertTrue(context.timestampsForKey.containsKey(key)); @@ -225,7 +225,7 @@ public class AsyncLoaderTest // then return immediately after the callback has fired commandStore.executeBlocking(() -> { - boolean result = loader.load(context, (o, t) -> Assert.fail()); + boolean result = loader.load(txnId, context, (o, t) -> Assert.fail()); Assert.assertTrue(context.commands.containsKey(txnId)); Assert.assertTrue(context.timestampsForKey.containsKey(key)); Assert.assertTrue(result); @@ -260,7 +260,7 @@ public class AsyncLoaderTest AsyncPromise<Void> cbFired = new AsyncPromise<>(); Context context = new Context(); commandStore.executeBlocking(() -> { - boolean result = loader.load(context, (o, t) -> { + boolean result = loader.load(txnId, context, (o, t) -> { Assert.assertNull(t); Assert.assertTrue(context.commands.containsKey(txnId)); Assert.assertFalse(context.timestampsForKey.containsKey(key)); @@ -277,7 +277,7 @@ public class AsyncLoaderTest // then return immediately after the callback has fired commandStore.executeBlocking(() -> { - boolean result = loader.load(context, (o, t) -> Assert.fail()); + boolean result = loader.load(txnId, context, (o, t) -> Assert.fail()); Assert.assertTrue(context.commands.containsKey(txnId)); Assert.assertFalse(context.timestampsForKey.containsKey(key)); Assert.assertTrue(result); @@ -322,7 +322,7 @@ public class AsyncLoaderTest AsyncLoader loader = new AsyncLoader(commandStore, ImmutableList.of(txnId1, txnId2), RoutingKeys.EMPTY, KeyHistory.COMMANDS); - boolean result = loader.load(new Context(), (u, t) -> { + boolean result = loader.load(txnId1, new Context(), (u, t) -> { Assert.assertFalse(callback.isDone()); Assert.assertNull(u); Assert.assertEquals(failure, t); @@ -369,7 +369,7 @@ public class AsyncLoaderTest AsyncPromise<Void> cbFired = new AsyncPromise<>(); Context context = new Context(); commandStore.executeBlocking(() -> { - boolean result = loader.load(context, (o, t) -> { + boolean result = loader.load(txnId, context, (o, t) -> { Assert.assertNull(t); Assert.assertTrue(context.commands.containsKey(txnId)); cbFired.setSuccess(null); @@ -384,7 +384,7 @@ public class AsyncLoaderTest // then return immediately after the callback has fired commandStore.executeBlocking(() -> { - boolean result = loader.load(context, (o, t) -> Assert.fail()); + boolean result = loader.load(txnId, context, (o, t) -> Assert.fail()); Assert.assertTrue(context.commands.containsKey(txnId)); Assert.assertTrue(result); }); @@ -432,7 +432,7 @@ public class AsyncLoaderTest AsyncPromise<Void> cbFired = new AsyncPromise<>(); Context context = new Context(); commandStore.executeBlocking(() -> { - boolean result = loader.load(context, (o, t) -> { + boolean result = loader.load(txnId, context, (o, t) -> { Assert.assertNull(t); Assert.assertEquals(context.timestampsForKey.containsKey(key), inContext.apply(context) == context.timestampsForKey); Assert.assertEquals(context.commandsForKey.containsKey(key), inContext.apply(context) == context.commandsForKey); @@ -446,7 +446,7 @@ public class AsyncLoaderTest // then return immediately after the callback has fired commandStore.executeBlocking(() -> { - boolean result = loader.load(context, (o, t) -> Assert.fail()); + boolean result = loader.load(txnId, context, (o, t) -> Assert.fail()); Assert.assertEquals(context.timestampsForKey.containsKey(key), inContext.apply(context) == context.timestampsForKey); Assert.assertEquals(context.commandsForKey.containsKey(key), inContext.apply(context) == context.commandsForKey); Assert.assertTrue(result); diff --git a/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java b/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java index 39b12a862a..124749025d 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java @@ -235,7 +235,7 @@ public class SimulatedAsyncOperationTest extends SimulatedAccordCommandStoreTest } @Override - public boolean load(AsyncOperation.Context context, BiConsumer<Object, Throwable> callback) + public boolean load(TxnId primaryTxnId, AsyncOperation.Context context, BiConsumer<Object, Throwable> callback) { if (delay) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
