This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit d4648ac969b6399ee96129015dd00e2c8c085568
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Oct 4 10:25:14 2024 +0100

    visit journal backwards to save time parsing
    don't load range commands that are redundant, and load least possible
    use MISC verb handler for maintenance tasks
---
 .gitmodules                                        |   4 +-
 modules/accord                                     |   2 +-
 .../db/compaction/CompactionIterator.java          |  20 +-
 src/java/org/apache/cassandra/dht/Token.java       |   8 +
 .../cassandra/index/accord/RoutesSearcher.java     |  20 +-
 .../apache/cassandra/journal/InMemoryIndex.java    |  27 +-
 src/java/org/apache/cassandra/journal/Index.java   |   2 +-
 src/java/org/apache/cassandra/journal/Journal.java |  15 +-
 .../org/apache/cassandra/journal/OnDiskIndex.java  |  69 +---
 .../apache/cassandra/journal/RecordConsumer.java   |   1 -
 src/java/org/apache/cassandra/journal/Segment.java |  10 +-
 .../org/apache/cassandra/journal/Segments.java     |   5 +-
 .../apache/cassandra/journal/StaticSegment.java    |   4 +-
 src/java/org/apache/cassandra/net/Verb.java        |   8 +-
 src/java/org/apache/cassandra/schema/TableId.java  |   7 +-
 .../cassandra/service/accord/AccordJournal.java    |   1 +
 .../service/accord/AccordJournalTable.java         |  13 +-
 .../accord/AccordJournalValueSerializers.java      |   4 +
 .../cassandra/service/accord/AccordKeyspace.java   |   3 +-
 .../service/accord/AccordSafeCommandStore.java     |   3 +-
 .../service/accord/AccordSegmentCompactor.java     |  23 +-
 .../service/accord/CommandsForRanges.java          |   7 +-
 .../service/accord/CommandsForRangesLoader.java    | 105 +++---
 .../cassandra/service/accord/SavedCommand.java     | 372 ++++++++++++---------
 .../cassandra/service/accord/TokenRange.java       |  11 +-
 .../service/accord/api/AccordRoutableKey.java      |   8 +
 .../service/accord/api/AccordRoutingKey.java       |  38 ++-
 .../cassandra/service/accord/api/PartitionKey.java |  10 +-
 .../service/accord/async/AsyncLoader.java          |  16 +-
 .../service/accord/async/AsyncOperation.java       |   2 +-
 .../accord/serializers/CommandSerializers.java     |  12 +
 .../service/accord/serializers/KeySerializers.java |  49 ++-
 .../accord/AccordJournalCompactionTest.java        |  29 +-
 .../test/AccordJournalSimulationTest.java          |   2 +-
 .../index/accord/AccordIndexStressTest.java        |  25 +-
 .../cassandra/index/accord/RouteIndexTest.java     |   3 +-
 .../org/apache/cassandra/journal/IndexTest.java    |  25 +-
 .../org/apache/cassandra/journal/JournalTest.java  |  16 +-
 .../org/apache/cassandra/journal/SegmentTest.java  |  16 +-
 .../service/accord/CommandsForRangesTest.java      |   2 +-
 .../service/accord/async/AsyncLoaderTest.java      |  24 +-
 .../accord/async/SimulatedAsyncOperationTest.java  |   2 +-
 42 files changed, 611 insertions(+), 412 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index 616dacf610..1e61f63e19 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
 [submodule "modules/accord"]
        path = modules/accord
-       url = https://github.com/apache/cassandra-accord.git
-       branch = trunk
+       url = https://github.com/belliottsmith/cassandra-accord.git
+       branch = followup3
diff --git a/modules/accord b/modules/accord
index 3846a378bf..17314e15d4 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 3846a378bfec8d28b312b40cd8541fad2a76e840
+Subproject commit 17314e15d45f46a1572cd10867efc0660169de13
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index fc993e98da..3a83394495 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -150,7 +150,6 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CompactionIterator.class);
     private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100;
-    private static Object[] TRUNCATE_CLUSTERING_VALUE = new Object[] { 
Long.MAX_VALUE, Integer.MAX_VALUE };
 
     private final OperationType type;
     private final AbstractCompactionController controller;
@@ -1027,7 +1026,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
         JournalKey key = null;
         Object builder = null;
         FlyweightSerializer<Object, Object> serializer = null;
-        Object[] lastClustering = null;
+        Object[] firstClustering = null;
         long maxSeenTimestamp = -1;
         final int userVersion;
         long lastDescriptor = -1;
@@ -1058,6 +1057,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             maxSeenTimestamp = -1;
             lastDescriptor = -1;
             lastOffset = -1;
+            firstClustering = null;
         }
 
         @Override
@@ -1084,7 +1084,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                     try (DataOutputBuffer out = 
DataOutputBuffer.scratchBuffer.get())
                     {
                         serializer.reserialize(key, builder, out, userVersion);
-                        newVersion.row(lastClustering)
+                        newVersion.row(firstClustering)
                                   .add("record", out.asNewBuffer())
                                   .add("user_version", userVersion);
                     }
@@ -1117,12 +1117,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
 
                     PartitionUpdate.SimpleBuilder newVersion = 
PartitionUpdate.simpleBuilder(AccordKeyspace.Journal, partition.partitionKey());
 
-                    Row.SimpleBuilder rowBuilder;
-                    if (cleanup == TRUNCATE || cleanup == 
TRUNCATE_WITH_OUTCOME)
-                        rowBuilder = newVersion.row(TRUNCATE_CLUSTERING_VALUE);
-                    else
-                        rowBuilder = newVersion.row(lastClustering);
-
+                    Row.SimpleBuilder rowBuilder = 
newVersion.row(firstClustering);
                     rowBuilder.add("record", 
commandBuilder.asByteBuffer(userVersion))
                               .add("user_version", userVersion);
 
@@ -1154,10 +1149,10 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
 
             if (lastOffset != -1)
             {
-                Invariants.checkState(descriptor >= lastDescriptor,
+                Invariants.checkState(descriptor <= lastDescriptor,
                                       "Descriptors were accessed out of order: 
%d was accessed after %d", descriptor, lastDescriptor);
                 Invariants.checkState(descriptor != lastDescriptor ||
-                                      offset > lastOffset,
+                                      offset < lastOffset,
                                       "Offsets within %s were accessed out of 
order: %d was accessed after %s", offset, lastOffset);
             }
             lastDescriptor = descriptor;
@@ -1167,7 +1162,8 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             {
                 int userVersion = 
Int32Type.instance.compose(row.getCell(versionColumn).buffer());
                 serializer.deserialize(key, builder, in, userVersion);
-                lastClustering = row.clustering().getBufferArray();
+                if (firstClustering == null)
+                    firstClustering = row.clustering().getBufferArray();
             }
             catch (IOException e)
             {
diff --git a/src/java/org/apache/cassandra/dht/Token.java 
b/src/java/org/apache/cassandra/dht/Token.java
index 92481745f4..048d1a0394 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -223,6 +223,14 @@ public abstract class Token implements 
RingPosition<Token>, Serializable
             return p.getTokenFactory().fromByteArray(ByteBuffer.wrap(bytes));
         }
 
+        public void skip(DataInputPlus in, IPartitioner p, int version) throws 
IOException
+        {
+            int size = p.isFixedLength() ? p.getMaxTokenSize() : 
in.readUnsignedVInt32();
+            if (logPartitioner && deserializePartitioners.add(p.getClass()))
+                logger.debug("Deserializing token with partitioner " + p);
+            in.skipBytesFully(size);
+        }
+
         public Token deserialize(DataInputPlus in, IPartitioner p, int 
version) throws IOException
         {
             int size = p.isFixedLength() ? p.getMaxTokenSize() : 
in.readUnsignedVInt32();
diff --git a/src/java/org/apache/cassandra/index/accord/RoutesSearcher.java 
b/src/java/org/apache/cassandra/index/accord/RoutesSearcher.java
index b90e1278c0..268739290a 100644
--- a/src/java/org/apache/cassandra/index/accord/RoutesSearcher.java
+++ b/src/java/org/apache/cassandra/index/accord/RoutesSearcher.java
@@ -19,10 +19,11 @@
 package org.apache.cassandra.index.accord;
 
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Set;
 
+import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import org.agrona.collections.ObjectHashSet;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DataRange;
@@ -67,10 +68,10 @@ public class RoutesSearcher
                                                    limits,
                                                    dataRange);
         Index.Searcher s = index.searcherFor(cmd);
-        try (var controler = cmd.executionController())
+        try (var controller = cmd.executionController())
         {
-            UnfilteredPartitionIterator partitionIterator = 
s.search(controler);
-            return new CloseableIterator<Entry>()
+            UnfilteredPartitionIterator partitionIterator = 
s.search(controller);
+            return new CloseableIterator<>()
             {
                 private final Entry entry = new Entry();
                 @Override
@@ -98,21 +99,22 @@ public class RoutesSearcher
         }
     }
 
-    public Set<TxnId> intersects(int store, TokenRange range)
+    public Set<TxnId> intersects(int store, TokenRange range, TxnId minTxnId, 
Timestamp maxTxnId)
     {
-        return intersects(store, (AccordRoutingKey) range.start(), 
(AccordRoutingKey) range.end());
+        return intersects(store, range.start(), range.end(), minTxnId, 
maxTxnId);
     }
 
-    public Set<TxnId> intersects(int store, AccordRoutingKey start, 
AccordRoutingKey end)
+    public Set<TxnId> intersects(int store, AccordRoutingKey start, 
AccordRoutingKey end, TxnId minTxnId, Timestamp maxTxnId)
     {
-        var set = new HashSet<TxnId>();
+        var set = new ObjectHashSet<TxnId>();
         try (var it = searchKeysAccord(store, start, end))
         {
             while (it.hasNext())
             {
                 Entry next = it.next();
                 if (next.store_id != store) continue; // the index should 
filter out, but just in case...
-                set.add(next.txnId);
+                if (next.txnId.compareTo(minTxnId) >= 0 && 
next.txnId.compareTo(maxTxnId) < 0)
+                    set.add(next.txnId);
             }
         }
         return set.isEmpty() ? Collections.emptySet() : set;
diff --git a/src/java/org/apache/cassandra/journal/InMemoryIndex.java 
b/src/java/org/apache/cassandra/journal/InMemoryIndex.java
index 2c71d8c4ff..1f0da7fd28 100644
--- a/src/java/org/apache/cassandra/journal/InMemoryIndex.java
+++ b/src/java/org/apache/cassandra/journal/InMemoryIndex.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.journal;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -61,16 +60,24 @@ final class InMemoryIndex<K> extends Index<K>
         index.merge(id, new long[] { currentOffsetAndSize },
                     (current, value) ->
                     {
-                        int idx = Arrays.binarySearch(current, 
currentOffsetAndSize);
-                        if (idx >= 0) // repeat update() call; shouldn't 
occur, but we might as well allow this NOOP
-                            return current;
+                        long inserting = value[0];
+                        int idx = 0;
+                        while (idx < current.length)
+                        {
+                            long cur = current[idx];
+                            if (cur <= inserting)
+                            {
+                                if (cur == inserting)
+                                    return current; // TODO (expected): throw 
exception?
+                                break;
+                            }
+                            ++idx;
+                        }
 
-                        /* Merge the new offset with existing values */
-                        int pos = -idx - 1;
                         long[] merged = new long[current.length + 1];
-                        System.arraycopy(current, 0, merged, 0, pos);
-                        merged[pos] = currentOffsetAndSize;
-                        System.arraycopy(current, pos, merged, pos + 1, 
current.length - pos);
+                        System.arraycopy(current, 0, merged, 0, idx);
+                        merged[idx] = inserting;
+                        System.arraycopy(current, idx, merged, idx + 1, 
current.length - idx);
                         return merged;
                     });
 
@@ -98,7 +105,7 @@ final class InMemoryIndex<K> extends Index<K>
     }
 
     @Override
-    public long lookUpFirst(K id)
+    public long lookUpLast(K id)
     {
         long[] offsets = lookUp(id);
         return offsets.length == 0 ? -1 : offsets[0];
diff --git a/src/java/org/apache/cassandra/journal/Index.java 
b/src/java/org/apache/cassandra/journal/Index.java
index bf6ab5d0c1..ac1e7c1d91 100644
--- a/src/java/org/apache/cassandra/journal/Index.java
+++ b/src/java/org/apache/cassandra/journal/Index.java
@@ -51,7 +51,7 @@ abstract class Index<K> implements Closeable
      *
      * @return the first offset into the segment, or -1 is none were found
      */
-    abstract long lookUpFirst(K id);
+    abstract long lookUpLast(K id);
 
     abstract long[] lookUpAll(K id);
 
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index 5e91c7d3d3..ba4bf503b2 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -306,15 +306,15 @@ public class Journal<K, V> implements Shutdownable
      * @return deserialized record if found, null otherwise
      */
     @SuppressWarnings("unused")
-    public V readFirst(K id)
+    public V readLast(K id)
     {
         EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
 
         try (ReferencedSegments<K, V> segments = selectAndReference(id))
         {
-            for (Segment<K, V> segment : segments.allSorted())
+            for (Segment<K, V> segment : segments.allSorted(true))
             {
-                if (segment.readFirst(id, holder))
+                if (segment.readLast(id, holder))
                 {
                     try (DataInputBuffer in = new 
DataInputBuffer(holder.value, false))
                     {
@@ -336,8 +336,7 @@ public class Journal<K, V> implements Shutdownable
         EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
         try (ReferencedSegments<K, V> segments = selectAndReference(id))
         {
-            consumer.init();
-            for (Segment<K, V> segment : segments.allSorted())
+            for (Segment<K, V> segment : segments.allSorted(false))
                 segment.readAll(id, holder, consumer);
         }
     }
@@ -422,12 +421,12 @@ public class Journal<K, V> implements Shutdownable
      * @return true if the record was found, false otherwise
      */
     @SuppressWarnings("unused")
-    public boolean readFirst(K id, RecordConsumer<K> consumer)
+    public boolean readLast(K id, RecordConsumer<K> consumer)
     {
         try (ReferencedSegments<K, V> segments = selectAndReference(id))
         {
             for (Segment<K, V> segment : segments.all())
-                if (segment.readFirst(id, consumer))
+                if (segment.readLast(id, consumer))
                     return true;
         }
         return false;
@@ -448,7 +447,7 @@ public class Journal<K, V> implements Shutdownable
             {
                 for (K id : test)
                 {
-                    if (segment.index().lookUpFirst(id) != -1)
+                    if (segment.index().lookUpLast(id) != -1)
                     {
                         present.add(id);
                         if (test.size() == present.size())
diff --git a/src/java/org/apache/cassandra/journal/OnDiskIndex.java 
b/src/java/org/apache/cassandra/journal/OnDiskIndex.java
index b2f1487ad0..4994e3a087 100644
--- a/src/java/org/apache/cassandra/journal/OnDiskIndex.java
+++ b/src/java/org/apache/cassandra/journal/OnDiskIndex.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
-import java.util.Arrays;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.zip.CRC32;
@@ -172,8 +171,8 @@ final class OnDiskIndex<K> extends Index<K>
                 if (prev != -1)
                 {
                     long tmp = prev;
-                    Invariants.checkState(readOffset(offsetAndSize) > 
readOffset(prev),
-                                          () -> String.format("Offsets should 
be strictly monotonic, but found %d following %d",
+                    Invariants.checkState(readOffset(offsetAndSize) < 
readOffset(prev),
+                                          () -> String.format("Offsets should 
be strictly reverse monotonic, but found %d following %d",
                                                               
readOffset(offsetAndSize), readOffset(tmp)));
                 }
                 out.writeLong(offsetAndSize);
@@ -202,53 +201,16 @@ final class OnDiskIndex<K> extends Index<K>
     @Override
     public long[] lookUp(K id)
     {
-        if (!mayContainId(id))
-            return EMPTY;
-
-        int keyIndex = binarySearch(id);
-        if (keyIndex < 0)
-            return EMPTY;
-
-        long[] records = new long[] { recordAtIndex(keyIndex) };
-
-        /*
-         * Duplicate entries are possible within one segment (but should be 
rare).
-         * Check and add entries before and after the found result (not 
guaranteed to be first).
-         */
-
-        for (int i = keyIndex - 1; i >= 0 && id.equals(keyAtIndex(i)); i--)
-        {
-            int length = records.length;
-            records = Arrays.copyOf(records, length + 1);
-            records[length] = recordAtIndex(i);
-        }
-
-        for (int i = keyIndex + 1; i < entryCount && id.equals(keyAtIndex(i)); 
i++)
-        {
-            int length = records.length;
-            records = Arrays.copyOf(records, length + 1);
-            records[length] = recordAtIndex(i);
-        }
-
-        Arrays.sort(records);
-        return records;
+        return lookUpAll(id);
     }
 
     @Override
-    public long lookUpFirst(K id)
+    public long lookUpLast(K id)
     {
         if (!mayContainId(id))
             return -1L;
 
         int keyIndex = binarySearch(id);
-
-        /*
-         * Duplicate entries are possible within one segment (but should be 
rare).
-         * Check and add entries before until we find the first occurrence of 
key.
-         */
-        for (int i = keyIndex - 1; i >= 0 && id.equals(keyAtIndex(i)); i--)
-            keyIndex = i;
-
         return keyIndex < 0 ? -1 : recordAtIndex(keyIndex);
     }
 
@@ -258,27 +220,22 @@ final class OnDiskIndex<K> extends Index<K>
         if (!mayContainId(id))
             return EMPTY;
 
-        int start = binarySearch(id);
-        int firstKeyIndex = start;
-
-        for (int i = firstKeyIndex - 1; i >= 0 && id.equals(keyAtIndex(i)); 
i--)
-            firstKeyIndex = i;
-
-        if (firstKeyIndex < 0)
+        int someIndex = binarySearch(id);
+        if (someIndex < 0)
             return EMPTY;
 
-        int lastKeyIndex = start;
+        int firstKeyIndex = someIndex;
+        while (firstKeyIndex > 0 && id.equals(keyAtIndex(firstKeyIndex - 1)))
+            --firstKeyIndex;
 
-        for (int i = lastKeyIndex + 1; i < entryCount && 
id.equals(keyAtIndex(i)); i++)
-            lastKeyIndex = i;
+        int lastKeyIndex = someIndex;
+        while (lastKeyIndex + 1 < entryCount && 
id.equals(keyAtIndex(lastKeyIndex + 1)))
+            ++lastKeyIndex;
 
         long[] all = new long[lastKeyIndex - firstKeyIndex + 1];
         int idx = firstKeyIndex;
         for (int i = 0; i < all.length; i++)
-        {
-            all[i] = recordAtIndex(idx);
-            idx++;
-        }
+            all[i] = recordAtIndex(idx++);
         return all;
     }
 
diff --git a/src/java/org/apache/cassandra/journal/RecordConsumer.java 
b/src/java/org/apache/cassandra/journal/RecordConsumer.java
index 3403cd0f23..e16194001d 100644
--- a/src/java/org/apache/cassandra/journal/RecordConsumer.java
+++ b/src/java/org/apache/cassandra/journal/RecordConsumer.java
@@ -24,6 +24,5 @@ import org.agrona.collections.IntHashSet;
 @FunctionalInterface
 public interface RecordConsumer<K>
 {
-    default void init() {}
     void accept(long segment, int position, K key, ByteBuffer buffer, 
IntHashSet hosts, int userVersion);
 }
diff --git a/src/java/org/apache/cassandra/journal/Segment.java 
b/src/java/org/apache/cassandra/journal/Segment.java
index 0da59118b7..7f955669cd 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -56,9 +56,9 @@ public abstract class Segment<K, V> implements Closeable, 
RefCounted<Segment<K,
      * Reading entries (by id, by offset, iterate)
      */
 
-    boolean readFirst(K id, RecordConsumer<K> consumer)
+    boolean readLast(K id, RecordConsumer<K> consumer)
     {
-        long offsetAndSize = index().lookUpFirst(id);
+        long offsetAndSize = index().lookUpLast(id);
         if (offsetAndSize == -1)
             return false;
 
@@ -74,9 +74,9 @@ public abstract class Segment<K, V> implements Closeable, 
RefCounted<Segment<K,
         return false;
     }
 
-    boolean readFirst(K id, EntrySerializer.EntryHolder<K> into)
+    boolean readLast(K id, EntrySerializer.EntryHolder<K> into)
     {
-        long offsetAndSize = index().lookUpFirst(id);
+        long offsetAndSize = index().lookUpLast(id);
         if (offsetAndSize == -1 || !read(Index.readOffset(offsetAndSize), 
Index.readSize(offsetAndSize), into))
             return false;
         Invariants.checkState(id.equals(into.key), "Index for %s read 
incorrect key: expected %s but read %s", descriptor, id, into.key);
@@ -86,10 +86,12 @@ public abstract class Segment<K, V> implements Closeable, 
RefCounted<Segment<K,
     void readAll(K id, EntrySerializer.EntryHolder<K> into, RecordConsumer<K> 
onEntry)
     {
         long[] all = index().lookUpAll(id);
+        int prevOffset = Integer.MAX_VALUE;
         for (int i = 0; i < all.length; i++)
         {
             int offset = Index.readOffset(all[i]);
             int size = Index.readSize(all[i]);
+            Invariants.checkState(offset < prevOffset);
             Invariants.checkState(read(offset, size, into), "Read should 
always return true");
             onEntry.accept(descriptor.timestamp, offset, into.key, into.value, 
into.hosts, into.userVersion);
         }
diff --git a/src/java/org/apache/cassandra/journal/Segments.java 
b/src/java/org/apache/cassandra/journal/Segments.java
index 94282e9d87..cc98750fc4 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -106,10 +106,11 @@ class Segments<K, V>
     /**
      * Returns segments in timestamp order. Will allocate and sort the segment 
collection.
      */
-    List<Segment<K, V>> allSorted()
+    List<Segment<K, V>> allSorted(boolean asc)
     {
         List<Segment<K, V>> segments = new ArrayList<>(this.segments.values());
-        segments.sort(Comparator.comparing(s -> s.descriptor));
+        if (asc) segments.sort(Comparator.comparing(s -> s.descriptor));
+        else segments.sort((o1, o2) -> 
-o1.descriptor.compareTo(o2.descriptor));
         return segments;
     }
 
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java 
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index f5f15ee13c..3a8c03bb1a 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -475,10 +475,10 @@ public final class StaticSegment<K, V> extends Segment<K, 
V>
             int cmp = keySupport.compare(this.key(), that.key());
             if (cmp != 0)
                 return cmp;
-            cmp = Long.compare(this.descriptor.timestamp, 
that.descriptor.timestamp);
+            cmp = Long.compare(that.descriptor.timestamp, 
this.descriptor.timestamp);
             if (cmp != 0)
                 return cmp;
-            return Integer.compare(this.offset, that.offset);
+            return Integer.compare(that.offset, this.offset);
         }
     }
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index 87b366300f..7cc9fe9dfa 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -336,10 +336,10 @@ public enum Verb
     ACCORD_GET_EPHMRL_READ_DEPS_REQ (162, P2, writeTimeout, IMMEDIATE,         
 () -> GetEphmrlReadDepsSerializers.request, AccordService::verbHandlerOrNoop, 
ACCORD_GET_EPHMRL_READ_DEPS_RSP),
     ACCORD_GET_MAX_CONFLICT_RSP     (163, P2, writeTimeout, IMMEDIATE,         
 () -> GetMaxConflictSerializers.reply,      RESPONSE_HANDLER                   
                                         ),
     ACCORD_GET_MAX_CONFLICT_REQ     (164, P2, writeTimeout, IMMEDIATE,         
 () -> GetMaxConflictSerializers.request,    AccordService::verbHandlerOrNoop, 
ACCORD_GET_MAX_CONFLICT_RSP),
-    ACCORD_FETCH_DATA_RSP           (145, P2, writeTimeout,IMMEDIATE,          
 () -> FetchSerializers.reply,               RESPONSE_HANDLER                   
                                         ),
-    ACCORD_FETCH_DATA_REQ           (146, P2, writeTimeout,IMMEDIATE,          
 () -> FetchSerializers.request,             AccordService::verbHandlerOrNoop, 
ACCORD_FETCH_DATA_RSP                     ),
-    ACCORD_SET_SHARD_DURABLE_REQ    (147, P2, writeTimeout, IMMEDIATE,         
 () -> SetDurableSerializers.shardDurable,   AccordService::verbHandlerOrNoop, 
ACCORD_SIMPLE_RSP                         ),
-    ACCORD_SET_GLOBALLY_DURABLE_REQ (148, P2, writeTimeout, IMMEDIATE,         
 () -> SetDurableSerializers.globallyDurable,AccordService::verbHandlerOrNoop, 
ACCORD_SIMPLE_RSP                         ),
+    ACCORD_FETCH_DATA_RSP           (145, P2, writeTimeout, IMMEDIATE,         
 () -> FetchSerializers.reply,               RESPONSE_HANDLER                   
                                         ),
+    ACCORD_FETCH_DATA_REQ           (146, P2, writeTimeout, IMMEDIATE,         
 () -> FetchSerializers.request,             AccordService::verbHandlerOrNoop, 
ACCORD_FETCH_DATA_RSP                     ),
+    ACCORD_SET_SHARD_DURABLE_REQ    (147, P2, writeTimeout, MISC,              
 () -> SetDurableSerializers.shardDurable,   AccordService::verbHandlerOrNoop, 
ACCORD_SIMPLE_RSP                         ),
+    ACCORD_SET_GLOBALLY_DURABLE_REQ (148, P2, writeTimeout, MISC,              
 () -> SetDurableSerializers.globallyDurable,AccordService::verbHandlerOrNoop, 
ACCORD_SIMPLE_RSP                         ),
     ACCORD_QUERY_DURABLE_BEFORE_RSP (149, P2, writeTimeout, IMMEDIATE,         
 () -> QueryDurableBeforeSerializers.reply,  RESPONSE_HANDLER                   
                                         ),
     ACCORD_QUERY_DURABLE_BEFORE_REQ (150, P2, writeTimeout, IMMEDIATE,         
 () -> QueryDurableBeforeSerializers.request,AccordService::verbHandlerOrNoop, 
ACCORD_QUERY_DURABLE_BEFORE_RSP           ),
 
diff --git a/src/java/org/apache/cassandra/schema/TableId.java 
b/src/java/org/apache/cassandra/schema/TableId.java
index ac486f71d9..302d7db6bf 100644
--- a/src/java/org/apache/cassandra/schema/TableId.java
+++ b/src/java/org/apache/cassandra/schema/TableId.java
@@ -185,6 +185,11 @@ public class TableId implements Comparable<TableId>
         return 16;
     }
 
+    public static int staticSerializedSize()
+    {
+        return 16;
+    }
+
     public static TableId deserialize(DataInput in) throws IOException
     {
         return new TableId(new UUID(in.readLong(), in.readLong()));
@@ -201,7 +206,7 @@ public class TableId implements Comparable<TableId>
         return id.compareTo(o.id);
     }
 
-    public static final IVersionedSerializer<TableId> serializer = new 
IVersionedSerializer<TableId>()
+    public static final IVersionedSerializer<TableId> serializer = new 
IVersionedSerializer<>()
     {
         @Override
         public void serialize(TableId t, DataOutputPlus out, int version) 
throws IOException
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 310bbeb636..49af8042a8 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -323,6 +323,7 @@ public class AccordJournal implements IJournal, Shutdownable
         BUILDER builder = (BUILDER) key.type.serializer.mergerFor(key);
         // TODO: this can be further improved to avoid allocating lambdas
         AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER> 
serializer = (AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>) 
key.type.serializer;
+        // TODO (expected): for those where we store an image, read only the 
first entry we find in DESC order
         journalTable.readAll(key, (in, userVersion) -> 
serializer.deserialize(key, builder, in, userVersion));
         return builder;
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
index ef3ac9eff3..5935a910b5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
@@ -143,8 +143,7 @@ public class AccordJournalTable<K, V>
             this.tableRecordConsumer = new TableRecordConsumer(reader);
         }
 
-        @Override
-        public void init()
+        void readTable()
         {
             readAllFromTable(key, tableRecordConsumer);
         }
@@ -164,7 +163,9 @@ public class AccordJournalTable<K, V>
      */
     public void readAll(K key, Reader reader)
     {
-        journal.readAll(key, new JournalAndTableRecordConsumer(key, reader));
+        JournalAndTableRecordConsumer consumer = new 
JournalAndTableRecordConsumer(key, reader);
+        journal.readAll(key, consumer);
+        consumer.readTable();
     }
 
     private void readAllFromTable(K key, TableRecordConsumer onEntry)
@@ -332,15 +333,15 @@ public class AccordJournalTable<K, V>
         {
             K tableKey = tableIterator.key();
             K journalKey = staticSegmentIterator.key();
-            if (tableKey != null && keySupport.compare(tableKey, key) == 0)
-                tableIterator.readAllForKey(key, reader);
-
             if (journalKey != null && keySupport.compare(journalKey, key) == 0)
                 staticSegmentIterator.readAllForKey(key, (segment, position, 
key1, buffer, hosts, userVersion) -> {
                     if (!tableIterator.visited(segment))
                         reader.accept(segment, position, key1, buffer, hosts, 
userVersion);
                 });
 
+            if (tableKey != null && keySupport.compare(tableKey, key) == 0)
+                tableIterator.readAllForKey(key, reader);
+
             tableIterator.clear();
         }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
index 289b5c1b98..60a1ef4f58 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
@@ -116,6 +116,7 @@ public class AccordJournalValueSerializers
 
     public static class IdentityAccumulator<T> extends Accumulator<T, T>
     {
+        boolean hasRead;
         public IdentityAccumulator(T initial)
         {
             super(initial);
@@ -124,6 +125,9 @@ public class AccordJournalValueSerializers
         @Override
         protected T accumulate(T oldValue, T newValue)
         {
+            if (hasRead)
+                return oldValue;
+            hasRead = true;
             return newValue;
         }
     }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index ca1326156e..0a8cb085fb 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -174,6 +174,7 @@ public class AccordKeyspace
                                                                   TOPOLOGIES, 
EPOCH_METADATA,
                                                                   JOURNAL);
 
+    // TODO (desired): implement a custom type so we can get correct sort order
     private static final TupleType TIMESTAMP_TYPE = new 
TupleType(Lists.newArrayList(LongType.instance, LongType.instance, 
Int32Type.instance));
     private static final String TIMESTAMP_TUPLE = 
TIMESTAMP_TYPE.asCQL3Type().toString();
     private static final TupleType KEY_TYPE = new 
TupleType(Arrays.asList(UUIDType.instance, BytesType.instance));
@@ -235,7 +236,7 @@ public class AccordKeyspace
               + "user_version int,"
               + "record blob,"
               + "PRIMARY KEY(key, descriptor, offset)"
-              + ") WITH compression = {'class':'NoopCompressor'};")
+              + ") WITH CLUSTERING ORDER BY (descriptor DESC, offset DESC) 
WITH compression = {'class':'NoopCompressor'};")
         .partitioner(new LocalPartitioner(BytesType.instance))
         .build();
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index 0b256574f4..624fcc378b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -48,6 +48,7 @@ import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Unseekables;
+import accord.utils.Invariants;
 
 public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeCommand, AccordSafeTimestampsForKey, 
AccordSafeCommandsForKey>
 {
@@ -74,7 +75,7 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
         this.commandStore = commandStore;
         commandStore.updateRangesForEpoch(this);
         if (this.ranges == null)
-            this.ranges = commandStore.unsafeRangesForEpoch();
+            this.ranges = 
Invariants.nonNull(commandStore.unsafeRangesForEpoch());
     }
 
     public static AccordSafeCommandStore create(PreLoadContext preLoadContext,
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java 
b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
index f94510b8b8..c6fba012a5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
@@ -85,8 +85,8 @@ public class AccordSegmentCompactor<V> implements 
SegmentCompactor<JournalKey, V
             JournalKey key = null;
             Object builder = null;
             FlyweightSerializer<Object, Object> serializer = null;
-            long lastDescriptor = -1;
-            int lastOffset = -1;
+            long firstDescriptor = -1, lastDescriptor = -1;
+            int firstOffset = -1, lastOffset = -1;
             try
             {
                 KeyOrderReader<JournalKey> reader;
@@ -94,13 +94,13 @@ public class AccordSegmentCompactor<V> implements 
SegmentCompactor<JournalKey, V
                 {
                     if (key == null || !reader.key().equals(key))
                     {
-                        maybeWritePartition(cfs, writer, key, builder, 
serializer, lastDescriptor, lastOffset);
+                        maybeWritePartition(cfs, writer, key, builder, 
serializer, firstDescriptor, firstOffset);
 
                         key = reader.key();
                         serializer = (FlyweightSerializer<Object, Object>) 
key.type.serializer;
                         builder = serializer.mergerFor(key);
-                        lastOffset = -1;
-                        lastDescriptor = -1;
+                        firstDescriptor = lastDescriptor = -1;
+                        firstOffset = lastOffset = -1;
                     }
 
                     boolean advanced;
@@ -110,15 +110,20 @@ public class AccordSegmentCompactor<V> implements 
SegmentCompactor<JournalKey, V
                         {
                             if (lastDescriptor != -1)
                             {
-                                
Invariants.checkState(reader.descriptor.timestamp >= lastDescriptor,
+                                
Invariants.checkState(reader.descriptor.timestamp <= lastDescriptor,
                                                       "Descriptors were 
accessed out of order: %d was accessed after %d", reader.descriptor.timestamp, 
lastDescriptor);
                                 
Invariants.checkState(reader.descriptor.timestamp != lastDescriptor ||
-                                                      reader.offset() > 
lastOffset,
-                                                      "Offsets within %s were 
accessed out of order: %d was accessed after %s", reader.offset(), lastOffset);
+                                                      reader.offset() < 
lastOffset,
+                                                      "Offsets were accessed 
out of order: %d was accessed after %s", reader.offset(), lastOffset);
                             }
                             serializer.deserialize(key, builder, in, 
reader.descriptor.userVersion);
                             lastDescriptor = reader.descriptor.timestamp;
                             lastOffset = reader.offset();
+                            if (firstDescriptor == -1)
+                            {
+                                firstDescriptor = lastDescriptor;
+                                firstOffset = lastOffset;
+                            }
                         }
                     }
                     while ((advanced = reader.advance()) && 
reader.key().equals(key));
@@ -126,7 +131,7 @@ public class AccordSegmentCompactor<V> implements 
SegmentCompactor<JournalKey, V
                     if (advanced) readers.offer(reader); // there is more to 
this reader, but not with this key
                 }
 
-                maybeWritePartition(cfs, writer, key, builder, serializer, 
lastDescriptor, lastOffset);
+                maybeWritePartition(cfs, writer, key, builder, serializer, 
firstDescriptor, firstOffset);
             }
             catch (Throwable t)
             {
diff --git 
a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java 
b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
index c664ff62c3..edfde0fb1f 100644
--- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
@@ -39,6 +39,7 @@ import accord.primitives.SaveStatus;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
+import accord.utils.Invariants;
 
 import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
 import static accord.local.SafeCommandStore.TestDep.WITH;
@@ -107,7 +108,7 @@ public class CommandsForRanges implements CommandsSummary
 
             // range specific logic... ranges don't update CommandsForRange 
based off the life cycle and instead
             // merge the cache with the disk state; so exclude states that 
should get removed from CommandsFor*
-            if (summary.saveStatus.compareTo(SaveStatus.Erased) >= 0)
+            if (summary.saveStatus != null && 
summary.saveStatus.compareTo(SaveStatus.Erased) >= 0)
                 return;
 
             switch (testStatus)
@@ -153,8 +154,10 @@ public class CommandsForRanges implements CommandsSummary
                 // and so it is safe to execute, when in fact it is only a 
dependency on a different shard
                 // (and that other shard, perhaps, does not know that it is a 
dependency - and so it is not durably known)
                 // TODO (required): consider this some more
-                if ((testDep == WITH) == !summary.depsIds.contains(testTxnId))
+                if ((testDep == WITH) == !summary.hasAsDep)
                     return;
+
+                Invariants.checkState(testTxnId.equals(summary.findAsDep));
             }
 
             // TODO (required): ensure we are excluding any ranges that are 
now shard-redundant (not sure if this is enforced yet)
diff --git 
a/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java 
b/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java
index 6324735883..5afa153fcd 100644
--- a/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRangesLoader.java
@@ -21,9 +21,6 @@ package org.apache.cassandra.service.accord;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
@@ -35,7 +32,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 
 import accord.local.Command;
-import accord.local.DurableBefore;
+import accord.local.KeyHistory;
+import accord.local.RedundantBefore;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status;
 import accord.primitives.Range;
@@ -46,16 +44,19 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
+import org.agrona.collections.ObjectHashSet;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.index.accord.RoutesSearcher;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 import org.apache.cassandra.utils.Pair;
 
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+
 public class CommandsForRangesLoader
 {
     private final RoutesSearcher searcher = new RoutesSearcher();
     //TODO (now, durability): find solution for this...
-    private final Map<TxnId, Ranges> historicalTransaction = new HashMap<>();
+    private final NavigableMap<TxnId, Ranges> historicalTransaction = new 
TreeMap<>();
     private final AccordCommandStore store;
 
     public CommandsForRangesLoader(AccordCommandStore store)
@@ -63,34 +64,38 @@ public class CommandsForRangesLoader
         this.store = store;
     }
 
-    public AsyncResult<Pair<Watcher, NavigableMap<TxnId, Summary>>> get(Ranges 
ranges)
+    public AsyncResult<Pair<Watcher, NavigableMap<TxnId, Summary>>> 
get(@Nullable TxnId primaryTxnId, KeyHistory keyHistory, Ranges ranges)
     {
-        var watcher = fromCache(ranges);
+        RedundantBefore redundantBefore = store.unsafeGetRedundantBefore();
+        TxnId minTxnId = redundantBefore.minGcBefore(ranges);
+        Timestamp maxTxnId = primaryTxnId == null || keyHistory == 
KeyHistory.RECOVERY || !primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX : 
primaryTxnId;
+        TxnId findAsDep = primaryTxnId != null && keyHistory == 
KeyHistory.RECOVERY ? primaryTxnId : null;
+        var watcher = fromCache(findAsDep, ranges, minTxnId, maxTxnId, 
redundantBefore);
         var before = ImmutableMap.copyOf(watcher.get());
-        return AsyncChains.ofCallable(Stage.READ.executor(), () -> get(ranges, 
before))
+        return AsyncChains.ofCallable(Stage.READ.executor(), () -> get(ranges, 
before, findAsDep, minTxnId, maxTxnId, redundantBefore))
                           .map(map -> Pair.create(watcher, map), store)
                .beginAsResult();
     }
 
-    private NavigableMap<TxnId, Summary> get(Ranges ranges, Map<TxnId, 
Summary> cacheHits)
+    private NavigableMap<TxnId, Summary> get(Ranges ranges, Map<TxnId, 
Summary> cacheHits, @Nullable TxnId findAsDep, TxnId minTxnId, Timestamp 
maxTxnId, RedundantBefore redundantBefore)
     {
-        Set<TxnId> matches = new HashSet<>();
+        Set<TxnId> matches = new ObjectHashSet<>();
         for (Range range : ranges)
-            matches.addAll(intersects(range));
+            matches.addAll(intersects(range, minTxnId, maxTxnId));
         if (matches.isEmpty())
             return new TreeMap<>();
-        return load(ranges, cacheHits, matches);
+        return load(ranges, cacheHits, matches, findAsDep, redundantBefore);
     }
 
-    private Collection<TxnId> intersects(Range range)
+    private Collection<TxnId> intersects(Range range, TxnId minTxnId, 
Timestamp maxTxnId)
     {
         assert range instanceof TokenRange : "Require TokenRange but given " + 
range.getClass();
-        Set<TxnId> intersects = searcher.intersects(store.id(), (TokenRange) 
range);
+        Set<TxnId> intersects = searcher.intersects(store.id(), (TokenRange) 
range, minTxnId, maxTxnId);
         if (!historicalTransaction.isEmpty())
         {
             if (intersects.isEmpty())
-                intersects = new HashSet<>();
-            for (var e : historicalTransaction.entrySet())
+                intersects = new ObjectHashSet<>();
+            for (var e : historicalTransaction.tailMap(minTxnId, 
true).entrySet())
             {
                 if (e.getValue().intersects(range))
                     intersects.add(e.getKey());
@@ -104,13 +109,21 @@ public class CommandsForRangesLoader
     public class Watcher implements AccordStateCache.Listener<TxnId, Command>, 
AutoCloseable
     {
         private final Ranges ranges;
+        private final @Nullable TxnId findAsDep;
+        private final TxnId minTxnId;
+        private final Timestamp maxTxnId;
+        private final RedundantBefore redundantBefore;
 
         private NavigableMap<TxnId, Summary> summaries = null;
-        private List<AccordCachingState<TxnId, Command>> needToDoubleCheck = 
null;
+        private Set<AccordCachingState<TxnId, Command>> needToDoubleCheck = 
null;
 
-        public Watcher(Ranges ranges)
+        public Watcher(Ranges ranges, @Nullable TxnId findAsDep, TxnId 
minTxnId, Timestamp maxTxnId, RedundantBefore redundantBefore)
         {
             this.ranges = ranges;
+            this.findAsDep = findAsDep;
+            this.minTxnId = minTxnId;
+            this.maxTxnId = maxTxnId;
+            this.redundantBefore = redundantBefore;
         }
 
         public NavigableMap<TxnId, Summary> get()
@@ -123,15 +136,18 @@ public class CommandsForRangesLoader
         {
             if (n.key().domain() != Routable.Domain.Range)
                 return;
+            if (n.key().compareTo(minTxnId) < 0 || n.key().compareTo(maxTxnId) 
>= 0)
+                return;
+
             var state = n.state();
             if (state instanceof AccordCachingState.Loading)
             {
                 if (needToDoubleCheck == null)
-                    needToDoubleCheck = new ArrayList<>();
+                    needToDoubleCheck = new ObjectHashSet<>();
                 needToDoubleCheck.add(n);
                 return;
             }
-            //TODO (now): include FailedToSave?  Most likely need to, but need 
to improve test coverage to have failed writes
+            //TODO (required): include FailedToSave?  Most likely need to, but 
need to improve test coverage to have failed writes
             if (!(state instanceof AccordCachingState.Loaded
                   || state instanceof AccordCachingState.Modified
                   || state instanceof AccordCachingState.Saving))
@@ -140,7 +156,7 @@ public class CommandsForRangesLoader
             var cmd = state.get();
             if (cmd == null)
                 return;
-            Summary summary = create(cmd, ranges, null);
+            Summary summary = create(cmd, ranges, findAsDep, redundantBefore);
             if (summary != null)
             {
                 if (summaries == null)
@@ -175,19 +191,18 @@ public class CommandsForRangesLoader
         }
     }
 
-    private Watcher fromCache(Ranges ranges)
+    private Watcher fromCache(@Nullable TxnId findAsDep, Ranges ranges, TxnId 
minTxnId, Timestamp maxTxnId, RedundantBefore redundantBefore)
     {
-        Watcher watcher = new Watcher(ranges);
+        Watcher watcher = new Watcher(ranges, findAsDep, minTxnId, maxTxnId, 
redundantBefore);
         store.commandCache().stream().forEach(watcher::onAdd);
         store.commandCache().register(watcher);
         return watcher;
     }
 
-    private NavigableMap<TxnId, Summary> load(Ranges ranges, Map<TxnId, 
Summary> cacheHits, Collection<TxnId> possibleTxns)
+    private NavigableMap<TxnId, Summary> load(Ranges ranges, Map<TxnId, 
Summary> cacheHits, Collection<TxnId> possibleTxns, @Nullable TxnId findAsDep, 
RedundantBefore redundantBefore)
     {
-        //TODO (now): this logic is kinda duplicate of 
org.apache.cassandra.service.accord.CommandsForRange.mapReduce
+        //TODO (required): this logic is kinda duplicate of 
org.apache.cassandra.service.accord.CommandsForRange.mapReduce
         // should figure out if this can be improved... also what is correct?
-        var durableBefore = store.durableBefore();
         NavigableMap<TxnId, Summary> map = new TreeMap<>();
         for (TxnId txnId : possibleTxns)
         {
@@ -196,7 +211,7 @@ public class CommandsForRangesLoader
             var cmd = store.loadCommand(txnId);
             if (cmd == null)
                 continue; // unknown command
-            var summary = create(cmd, ranges, durableBefore);
+            var summary = create(cmd, ranges, findAsDep, redundantBefore);
             if (summary == null)
                 continue;
             map.put(txnId, summary);
@@ -204,9 +219,9 @@ public class CommandsForRangesLoader
         return map;
     }
 
-    private static Summary create(Command cmd, Ranges cacheRanges, @Nullable 
DurableBefore durableBefore)
+    private static Summary create(Command cmd, Ranges cacheRanges, @Nullable 
TxnId findAsDep, @Nullable RedundantBefore redundantBefore)
     {
-        //TODO (now, correctness): C* did Invalidated, accord-core did 
Erased... what is correct?
+        //TODO (required, correctness): C* did Invalidated, accord-core did 
Erased... what is correct?
         SaveStatus saveStatus = cmd.saveStatus();
         if (saveStatus == SaveStatus.Invalidated
             || saveStatus == SaveStatus.Erased
@@ -223,10 +238,10 @@ public class CommandsForRangesLoader
         if (!ranges.intersects(cacheRanges))
             return null;
 
-        if (durableBefore != null)
+        if (redundantBefore != null)
         {
-            Ranges durableAlready = 
Ranges.of(durableBefore.foldlWithBounds(ranges, (e, accum, start, end) -> {
-                if (e.universalBefore.compareTo(cmd.txnId()) < 0)
+            Ranges durableAlready = 
Ranges.of(redundantBefore.foldlWithBounds(ranges, (e, accum, start, end) -> {
+                if (e.gcBefore.compareTo(cmd.txnId()) < 0)
                     return accum;
                 accum.add(new TokenRange((AccordRoutingKey) start, 
(AccordRoutingKey) end));
                 return accum;
@@ -238,8 +253,8 @@ public class CommandsForRangesLoader
         }
 
         var partialDeps = cmd.partialDeps();
-        List<TxnId> deps = partialDeps == null ? Collections.emptyList() : 
partialDeps.txnIds();
-        return new Summary(cmd.txnId(), cmd.executeAt(), saveStatus, ranges, 
deps);
+        boolean hasAsDep = findAsDep != null && 
partialDeps.rangeDeps.intersects(findAsDep, ranges);
+        return new Summary(cmd.txnId(), cmd.executeAt(), saveStatus, ranges, 
findAsDep, hasAsDep);
     }
 
     public void mergeHistoricalTransaction(TxnId txnId, Ranges ranges, 
BiFunction<? super Ranges, ? super Ranges, ? extends Ranges> remappingFunction)
@@ -250,25 +265,28 @@ public class CommandsForRangesLoader
     public static class Summary
     {
         public final TxnId txnId;
-        @Nullable
-        public final Timestamp executeAt;
-        public final SaveStatus saveStatus;
-        public final Ranges ranges;
-        public final List<TxnId> depsIds;
+        @Nullable public final Timestamp executeAt;
+        @Nullable public final SaveStatus saveStatus;
+        @Nullable public final Ranges ranges;
+
+        // TODO (required): this logic is still broken (was already): needs to 
consider exact range matches
+        public final TxnId findAsDep;
+        public final boolean hasAsDep;
 
         @VisibleForTesting
-        Summary(TxnId txnId, @Nullable Timestamp executeAt, SaveStatus 
saveStatus, Ranges ranges, List<TxnId> depsIds)
+        Summary(TxnId txnId, @Nullable Timestamp executeAt, SaveStatus 
saveStatus, Ranges ranges, TxnId findAsDep, boolean hasAsDep)
         {
             this.txnId = txnId;
             this.executeAt = executeAt;
             this.saveStatus = saveStatus;
             this.ranges = ranges;
-            this.depsIds = depsIds;
+            this.findAsDep = findAsDep;
+            this.hasAsDep = hasAsDep;
         }
 
         public Summary slice(Ranges slice)
         {
-            return new Summary(txnId, executeAt, saveStatus, 
ranges.slice(slice, Routables.Slice.Minimal), depsIds);
+            return new Summary(txnId, executeAt, saveStatus, 
ranges.slice(slice, Routables.Slice.Minimal), findAsDep, hasAsDep);
         }
 
         @Override
@@ -279,7 +297,8 @@ public class CommandsForRangesLoader
                    ", executeAt=" + executeAt +
                    ", saveStatus=" + saveStatus +
                    ", ranges=" + ranges +
-                   ", depsIds=" + depsIds +
+                   ", findAsDep=" + findAsDep +
+                   ", hasAsDep=" + hasAsDep +
                    '}';
         }
     }
diff --git a/src/java/org/apache/cassandra/service/accord/SavedCommand.java 
b/src/java/org/apache/cassandra/service/accord/SavedCommand.java
index 209208989f..a0cd86bb5b 100644
--- a/src/java/org/apache/cassandra/service/accord/SavedCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/SavedCommand.java
@@ -161,43 +161,63 @@ public class SavedCommand
     public static void serialize(Command before, Command after, DataOutputPlus 
out, int userVersion) throws IOException
     {
         int flags = getFlags(before, after);
-
         out.writeInt(flags);
 
-        // We encode all changed fields unless their value is null
-        if (getFieldChanged(Fields.EXECUTE_AT, flags) && after.executeAt() != 
null)
-            CommandSerializers.timestamp.serialize(after.executeAt(), out, 
userVersion);
-        // TODO (desired): check if this can fold into executeAt
-        if (getFieldChanged(Fields.EXECUTES_AT_LEAST, flags) && 
after.executesAtLeast() != null)
-            CommandSerializers.timestamp.serialize(after.executesAtLeast(), 
out, userVersion);
-        if (getFieldChanged(Fields.SAVE_STATUS, flags))
-            out.writeInt(after.saveStatus().ordinal());
-        if (getFieldChanged(Fields.DURABILITY, flags) && after.durability() != 
null)
-            out.writeInt(after.durability().ordinal());
-
-        if (getFieldChanged(Fields.ACCEPTED, flags) && 
after.acceptedOrCommitted() != null)
-            CommandSerializers.ballot.serialize(after.acceptedOrCommitted(), 
out, userVersion);
-        if (getFieldChanged(Fields.PROMISED, flags) && after.promised() != 
null)
-            CommandSerializers.ballot.serialize(after.promised(), out, 
userVersion);
-
-        if (getFieldChanged(Fields.PARTICIPANTS, flags) && 
after.participants() != null)
-            CommandSerializers.participants.serialize(after.participants(), 
out, userVersion);
-        if (getFieldChanged(Fields.PARTIAL_TXN, flags) && after.partialTxn() 
!= null)
-            CommandSerializers.partialTxn.serialize(after.partialTxn(), out, 
userVersion);
-        if (getFieldChanged(Fields.PARTIAL_DEPS, flags) && after.partialDeps() 
!= null)
-            DepsSerializer.partialDeps.serialize(after.partialDeps(), out, 
userVersion);
-
-        Command.WaitingOn waitingOn = getWaitingOn(after);
-        if (getFieldChanged(Fields.WAITING_ON, flags) && waitingOn != null)
-        {
-            long size = WaitingOnSerializer.serializedSize(waitingOn);
-            ByteBuffer serialized = 
WaitingOnSerializer.serialize(after.txnId(), waitingOn);
-            out.writeInt((int) size);
-            out.write(serialized);
-        }
-
-        if (getFieldChanged(Fields.WRITES, flags) && after.writes() != null)
-            CommandSerializers.writes.serialize(after.writes(), out, 
userVersion);
+        int iterable = toIterableSetFields(flags);
+        while (iterable != 0)
+        {
+            Fields field = nextSetField(iterable);
+            if (getFieldIsNull(field, flags))
+            {
+                iterable = unsetIterableFields(field, iterable);
+                continue;
+            }
+
+            switch (field)
+            {
+                case EXECUTE_AT:
+                    CommandSerializers.timestamp.serialize(after.executeAt(), 
out, userVersion);
+                    break;
+                case EXECUTES_AT_LEAST:
+                    
CommandSerializers.timestamp.serialize(after.executesAtLeast(), out, 
userVersion);
+                    break;
+                case SAVE_STATUS:
+                    out.writeShort(after.saveStatus().ordinal());
+                    break;
+                case DURABILITY:
+                    out.writeByte(after.durability().ordinal());
+                    break;
+                case ACCEPTED:
+                    
CommandSerializers.ballot.serialize(after.acceptedOrCommitted(), out, 
userVersion);
+                    break;
+                case PROMISED:
+                    CommandSerializers.ballot.serialize(after.promised(), out, 
userVersion);
+                    break;
+                case PARTICIPANTS:
+                    
CommandSerializers.participants.serialize(after.participants(), out, 
userVersion);
+                    break;
+                case PARTIAL_TXN:
+                    
CommandSerializers.partialTxn.serialize(after.partialTxn(), out, userVersion);
+                    break;
+                case PARTIAL_DEPS:
+                    DepsSerializer.partialDeps.serialize(after.partialDeps(), 
out, userVersion);
+                    break;
+                case WAITING_ON:
+                    Command.WaitingOn waitingOn = getWaitingOn(after);
+                    long size = WaitingOnSerializer.serializedSize(waitingOn);
+                    ByteBuffer serialized = 
WaitingOnSerializer.serialize(after.txnId(), waitingOn);
+                    out.writeInt((int) size);
+                    out.write(serialized);
+                    break;
+                case WRITES:
+                    CommandSerializers.writes.serialize(after.writes(), out, 
userVersion);
+                    break;
+                case CLEANUP:
+                    throw new IllegalStateException();
+            }
+
+            iterable = unsetIterableFields(field, iterable);
+        }
     }
 
     @VisibleForTesting
@@ -258,13 +278,29 @@ public class SavedCommand
 
     private static int setFieldChanged(Fields field, int oldFlags)
     {
-        return oldFlags | (1 << (field.ordinal() + Short.SIZE));
+        return oldFlags | (0x10000 << field.ordinal());
     }
 
     @VisibleForTesting
     static boolean getFieldChanged(Fields field, int oldFlags)
     {
-        return (oldFlags & (1 << (field.ordinal() + Short.SIZE))) != 0;
+        return (oldFlags & (0x10000 << field.ordinal())) != 0;
+    }
+
+    static int toIterableSetFields(int flags)
+    {
+        return flags >>> 16;
+    }
+
+    static Fields nextSetField(int iterable)
+    {
+        int i = Integer.numberOfTrailingZeros(Integer.lowestOneBit(iterable));
+        return i == 32 ? null : Fields.FIELDS[i];
+    }
+
+    static int unsetIterableFields(Fields field, int iterable)
+    {
+        return iterable & ~(1 << field.ordinal());
     }
 
     @VisibleForTesting
@@ -547,43 +583,61 @@ public class SavedCommand
         {
             out.writeInt(flags);
 
-            // We encode all changed fields unless their value is null
-            if (getFieldChanged(Fields.EXECUTE_AT, flags) && 
!getFieldIsNull(Fields.EXECUTE_AT, flags))
-                CommandSerializers.timestamp.serialize(executeAt(), out, 
userVersion);
-            // TODO (desired): check if this can fold into executeAt
-            if (getFieldChanged(Fields.EXECUTES_AT_LEAST, flags) && 
!getFieldIsNull(Fields.EXECUTES_AT_LEAST, flags))
-                CommandSerializers.timestamp.serialize(executeAtLeast(), out, 
userVersion);
-            if (getFieldChanged(Fields.SAVE_STATUS, flags) && 
!getFieldIsNull(Fields.SAVE_STATUS, flags))
-                out.writeInt(saveStatus().ordinal());
-            if (getFieldChanged(Fields.DURABILITY, flags) && 
!getFieldIsNull(Fields.DURABILITY, flags))
-                out.writeInt(durability().ordinal());
-
-            if (getFieldChanged(Fields.ACCEPTED, flags) && 
!getFieldIsNull(Fields.ACCEPTED, flags))
-                CommandSerializers.ballot.serialize(acceptedOrCommitted(), 
out, userVersion);
-            if (getFieldChanged(Fields.PROMISED, flags) && 
!getFieldIsNull(Fields.PROMISED, flags))
-                CommandSerializers.ballot.serialize(promised(), out, 
userVersion);
-
-            if (getFieldChanged(Fields.PARTICIPANTS, flags) && 
!getFieldIsNull(Fields.PARTICIPANTS, flags))
-                CommandSerializers.participants.serialize(participants(), out, 
userVersion);
-            if (getFieldChanged(Fields.PARTIAL_TXN, flags) && 
!getFieldIsNull(Fields.PARTIAL_TXN, flags))
-                CommandSerializers.partialTxn.serialize(partialTxn(), out, 
userVersion);
-            if (getFieldChanged(Fields.PARTIAL_DEPS, flags) && 
!getFieldIsNull(Fields.PARTIAL_DEPS, flags))
-                DepsSerializer.partialDeps.serialize(partialDeps(), out, 
userVersion);
-
-            if (getFieldChanged(Fields.WAITING_ON, flags) && 
!getFieldIsNull(Fields.WAITING_ON, flags))
+            int iterable = toIterableSetFields(flags);
+            while (iterable != 0)
             {
-                out.writeInt(waitingOnBytes.length);
-                out.write(waitingOnBytes);
-            }
+                Fields field = nextSetField(iterable);
+                if (getFieldIsNull(field, flags))
+                {
+                    iterable = unsetIterableFields(field, iterable);
+                    continue;
+                }
 
-            if (getFieldChanged(Fields.WRITES, flags) && 
!getFieldIsNull(Fields.WRITES, flags))
-                CommandSerializers.writes.serialize(writes(), out, 
userVersion);
+                switch (field)
+                {
+                    case EXECUTE_AT:
+                        CommandSerializers.timestamp.serialize(executeAt(), 
out, userVersion);
+                        break;
+                    case EXECUTES_AT_LEAST:
+                        
CommandSerializers.timestamp.serialize(executeAtLeast(), out, userVersion);
+                        break;
+                    case SAVE_STATUS:
+                        out.writeShort(saveStatus().ordinal());
+                        break;
+                    case DURABILITY:
+                        out.writeByte(durability().ordinal());
+                        break;
+                    case ACCEPTED:
+                        
CommandSerializers.ballot.serialize(acceptedOrCommitted(), out, userVersion);
+                        break;
+                    case PROMISED:
+                        CommandSerializers.ballot.serialize(promised(), out, 
userVersion);
+                        break;
+                    case PARTICIPANTS:
+                        
CommandSerializers.participants.serialize(participants(), out, userVersion);
+                        break;
+                    case PARTIAL_TXN:
+                        CommandSerializers.partialTxn.serialize(partialTxn(), 
out, userVersion);
+                        break;
+                    case PARTIAL_DEPS:
+                        DepsSerializer.partialDeps.serialize(partialDeps(), 
out, userVersion);
+                        break;
+                    case WAITING_ON:
+                        out.writeInt(waitingOnBytes.length);
+                        out.write(waitingOnBytes);
+                        break;
+                    case WRITES:
+                        CommandSerializers.writes.serialize(writes(), out, 
userVersion);
+                        break;
+                    case CLEANUP:
+                        out.writeByte(cleanup.ordinal());
+                        break;
+                }
 
-            if (getFieldChanged(Fields.CLEANUP, flags))
-                out.writeByte(cleanup.ordinal());
+                iterable = unsetIterableFields(field, iterable);
+            }
         }
 
-
         // TODO: we seem to be writing some form of empty transaction
         @SuppressWarnings({ "rawtypes", "unchecked" })
         public void deserializeNext(DataInputPlus in, int userVersion) throws 
IOException
@@ -593,97 +647,65 @@ public class SavedCommand
             nextCalled = true;
             count++;
 
-            for (Fields field : Fields.FIELDS)
+            int iterable = toIterableSetFields(flags);
+            while (iterable != 0)
             {
-                if (getFieldChanged(field, flags))
+                Fields field = nextSetField(iterable);
+                if (getFieldChanged(field, this.flags))
                 {
-                    this.flags = setFieldChanged(field, this.flags);
-                    if (getFieldIsNull(field, flags))
-                        this.flags = setFieldIsNull(field, this.flags);
-                    else
-                        this.flags = unsetFieldIsNull(field, this.flags);
-                }
-            }
+                    if (!getFieldIsNull(field, flags))
+                        skip(field, in, userVersion);
 
-            if (getFieldChanged(Fields.EXECUTE_AT, flags))
-            {
-                if (getFieldIsNull(Fields.EXECUTE_AT, flags))
-                    executeAt = null;
-                else
-                    executeAt = CommandSerializers.timestamp.deserialize(in, 
userVersion);
-            }
+                    iterable = unsetIterableFields(field, iterable);
+                    continue;
+                }
+                this.flags = setFieldChanged(field, this.flags);
 
-            if (getFieldChanged(Fields.EXECUTES_AT_LEAST, flags))
-            {
-                if (getFieldIsNull(Fields.EXECUTES_AT_LEAST, flags))
-                    executeAtLeast = null;
+                if (getFieldIsNull(field, flags))
+                {
+                    this.flags = setFieldIsNull(field, this.flags);
+                }
                 else
-                    executeAtLeast = 
CommandSerializers.timestamp.deserialize(in, userVersion);
-            }
+                {
+                    deserialize(field, in, userVersion);
+                }
 
-            if (getFieldChanged(Fields.SAVE_STATUS, flags))
-            {
-                if (getFieldIsNull(Fields.SAVE_STATUS, flags))
-                    saveStatus = null;
-                else
-                    saveStatus = SaveStatus.values()[in.readInt()];
-            }
-            if (getFieldChanged(Fields.DURABILITY, flags))
-            {
-                if (getFieldIsNull(Fields.DURABILITY, flags))
-                    durability = null;
-                else
-                    durability = Status.Durability.values()[in.readInt()];
+                iterable = unsetIterableFields(field, iterable);
             }
+        }
 
-            if (getFieldChanged(Fields.ACCEPTED, flags))
+        private void deserialize(Fields field, DataInputPlus in, int 
userVersion) throws IOException
+        {
+            switch (field)
             {
-                if (getFieldIsNull(Fields.ACCEPTED, flags))
-                    acceptedOrCommitted = null;
-                else
+                case EXECUTE_AT:
+                    executeAt = CommandSerializers.timestamp.deserialize(in, 
userVersion);
+                    break;
+                case EXECUTES_AT_LEAST:
+                    executeAtLeast = 
CommandSerializers.timestamp.deserialize(in, userVersion);
+                    break;
+                case SAVE_STATUS:
+                    saveStatus = SaveStatus.values()[in.readShort()];
+                    break;
+                case DURABILITY:
+                    durability = Status.Durability.values()[in.readByte()];
+                    break;
+                case ACCEPTED:
                     acceptedOrCommitted = 
CommandSerializers.ballot.deserialize(in, userVersion);
-            }
-
-            if (getFieldChanged(Fields.PROMISED, flags))
-            {
-                if (getFieldIsNull(Fields.PROMISED, flags))
-                    promised = null;
-                else
+                    break;
+                case PROMISED:
                     promised = CommandSerializers.ballot.deserialize(in, 
userVersion);
-            }
-
-            if (getFieldChanged(Fields.PARTICIPANTS, flags))
-            {
-                if (getFieldIsNull(Fields.PARTICIPANTS, flags))
-                    participants = null;
-                else
+                    break;
+                case PARTICIPANTS:
                     participants = 
CommandSerializers.participants.deserialize(in, userVersion);
-            }
-
-            if (getFieldChanged(Fields.PARTIAL_TXN, flags))
-            {
-                if (getFieldIsNull(Fields.PARTIAL_TXN, flags))
-                    partialTxn = null;
-                else
+                    break;
+                case PARTIAL_TXN:
                     partialTxn = CommandSerializers.partialTxn.deserialize(in, 
userVersion);
-            }
-
-            if (getFieldChanged(Fields.PARTIAL_DEPS, flags))
-            {
-                if (getFieldIsNull(Fields.PARTIAL_DEPS, flags))
-                    partialDeps = null;
-                else
+                    break;
+                case PARTIAL_DEPS:
                     partialDeps = DepsSerializer.partialDeps.deserialize(in, 
userVersion);
-            }
-
-            if (getFieldChanged(Fields.WAITING_ON, flags))
-            {
-                if (getFieldIsNull(Fields.WAITING_ON, flags))
-                {
-                    waitingOn = null;
-                }
-                else
-                {
+                    break;
+                case WAITING_ON:
                     int size = in.readInt();
                     waitingOnBytes = new byte[size];
                     in.readFully(waitingOnBytes);
@@ -699,22 +721,56 @@ public class SavedCommand
                             throw Throwables.unchecked(e);
                         }
                     };
-                }
-            }
-
-            if (getFieldChanged(Fields.WRITES, flags))
-            {
-                if (getFieldIsNull(Fields.WRITES, flags))
-                    writes = null;
-                else
+                    break;
+                case WRITES:
                     writes = CommandSerializers.writes.deserialize(in, 
userVersion);
+                    break;
+                case CLEANUP:
+                    Cleanup newCleanup = Cleanup.forOrdinal(in.readByte());
+                    if (cleanup == null || newCleanup.compareTo(cleanup) > 0)
+                        cleanup = newCleanup;
+                    break;
             }
+        }
 
-            if (getFieldChanged(Fields.CLEANUP, flags))
+        private void skip(Fields field, DataInputPlus in, int userVersion) 
throws IOException
+        {
+            switch (field)
             {
-                Cleanup newCleanup = Cleanup.forOrdinal(in.readByte());
-                if (cleanup == null || newCleanup.compareTo(cleanup) > 0)
-                    cleanup = newCleanup;
+                case EXECUTE_AT:
+                case EXECUTES_AT_LEAST:
+                    CommandSerializers.timestamp.skip(in, userVersion);
+                    break;
+                case SAVE_STATUS:
+                    in.readShort();
+                    break;
+                case DURABILITY:
+                    in.readByte();
+                    break;
+                case ACCEPTED:
+                case PROMISED:
+                    CommandSerializers.ballot.skip(in, userVersion);
+                    break;
+                case PARTICIPANTS:
+                    CommandSerializers.participants.deserialize(in, 
userVersion);
+                    break;
+                case PARTIAL_TXN:
+                    CommandSerializers.partialTxn.deserialize(in, userVersion);
+                    break;
+                case PARTIAL_DEPS:
+                    DepsSerializer.partialDeps.deserialize(in, userVersion);
+                    break;
+                case WAITING_ON:
+                    int size = in.readInt();
+                    in.skipBytesFully(size);
+                    break;
+                case WRITES:
+                    // TODO (expected): skip
+                    CommandSerializers.writes.deserialize(in, userVersion);
+                    break;
+                case CLEANUP:
+                    in.readByte();
+                    break;
             }
         }
 
diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java 
b/src/java/org/apache/cassandra/service/accord/TokenRange.java
index aed0270979..70135b7a36 100644
--- a/src/java/org/apache/cassandra/service/accord/TokenRange.java
+++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java
@@ -104,7 +104,10 @@ public class TokenRange extends Range.EndInclusive
         return new org.apache.cassandra.dht.Range<>(left, right);
     }
 
-    public static final IVersionedSerializer<TokenRange> serializer = new 
IVersionedSerializer<>()
+
+    public static final Serializer serializer = new Serializer();
+
+    public static final class Serializer implements 
IVersionedSerializer<TokenRange>
     {
         @Override
         public void serialize(TokenRange range, DataOutputPlus out, int 
version) throws IOException
@@ -113,6 +116,12 @@ public class TokenRange extends Range.EndInclusive
             AccordRoutingKey.serializer.serialize(range.end(), out, version);
         }
 
+        public void skip(DataInputPlus in, int version) throws IOException
+        {
+            AccordRoutingKey.serializer.skip(in, version);
+            AccordRoutingKey.serializer.skip(in, version);
+        }
+
         @Override
         public TokenRange deserialize(DataInputPlus in, int version) throws 
IOException
         {
diff --git 
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java
index 1869310f5b..18d6926bc5 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java
@@ -18,16 +18,24 @@
 
 package org.apache.cassandra.service.accord.api;
 
+import java.io.IOException;
 import java.util.Objects;
 
 import accord.primitives.RoutableKey;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
 
 public abstract class AccordRoutableKey implements RoutableKey
 {
+    public interface AccordKeySerializer<T> extends IVersionedSerializer<T>
+    {
+        void skip(DataInputPlus in, int version) throws IOException;
+    }
+
     final TableId table; // TODO (desired): use an id (TrM)
 
     protected AccordRoutableKey(TableId table)
diff --git 
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index 1f29c16d96..6d8d2b8184 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -37,7 +37,6 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -168,7 +167,7 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
             return isMin ? "-Inf" : "+Inf";
         }
 
-        public static final IVersionedSerializer<SentinelKey> serializer = new 
IVersionedSerializer<SentinelKey>()
+        public static final AccordKeySerializer<SentinelKey> serializer = new 
AccordKeySerializer<SentinelKey>()
         {
             @Override
             public void serialize(SentinelKey key, DataOutputPlus out, int 
version) throws IOException
@@ -177,6 +176,12 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
                 out.writeBoolean(key.isMin);
             }
 
+            @Override
+            public void skip(DataInputPlus in, int version) throws IOException
+            {
+                in.skipBytesFully(TableId.staticSerializedSize() + 1);
+            }
+
             @Override
             public SentinelKey deserialize(DataInputPlus in, int version) 
throws IOException
             {
@@ -260,7 +265,7 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
         }
 
         public static final Serializer serializer = new Serializer();
-        public static class Serializer implements 
IVersionedSerializer<TokenKey>
+        public static class Serializer implements AccordKeySerializer<TokenKey>
         {
             private Serializer() {}
 
@@ -271,6 +276,14 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
                 Token.compactSerializer.serialize(key.token, out, version);
             }
 
+            @Override
+            public void skip(DataInputPlus in, int version) throws IOException
+            {
+                in.skipBytesFully(TableId.staticSerializedSize());
+                // TODO (expected): should we be using the TableId partitioner 
here?
+                Token.compactSerializer.skip(in, getPartitioner(), version);
+            }
+
             @Override
             public TokenKey deserialize(DataInputPlus in, int version) throws 
IOException
             {
@@ -306,7 +319,7 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
         }
     }
 
-    public static class Serializer implements 
IVersionedSerializer<AccordRoutingKey>
+    public static class Serializer implements 
AccordKeySerializer<AccordRoutingKey>
     {
         static final RoutingKeyKind[] kinds = RoutingKeyKind.values();
 
@@ -358,6 +371,23 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
             }
         }
 
+        @Override
+        public void skip(DataInputPlus in, int version) throws IOException
+        {
+            RoutingKeyKind kind = kinds[in.readByte()];
+            switch (kind)
+            {
+                case TOKEN:
+                    TokenKey.serializer.skip(in, version);
+                    break;
+                case SENTINEL:
+                    SentinelKey.serializer.skip(in, version);
+                    break;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        }
+
         @Override
         public AccordRoutingKey deserialize(DataInputPlus in, int version) 
throws IOException
         {
diff --git a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java 
b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
index 71feb7d88e..28a180d7f1 100644
--- a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
@@ -119,7 +119,7 @@ public final class PartitionKey extends AccordRoutableKey 
implements Key
     }
 
     public static final Serializer serializer = new Serializer();
-    public static class Serializer implements 
IVersionedSerializer<PartitionKey>
+    public static class Serializer implements AccordKeySerializer<PartitionKey>
     {
         // TODO: add vint to value accessor and use vints
         private Serializer() {}
@@ -144,6 +144,14 @@ public final class PartitionKey extends AccordRoutableKey 
implements Key
 
         }
 
+        @Override
+        public void skip(DataInputPlus in, int version) throws IOException
+        {
+            TableId tableId = TableId.deserialize(in);
+            IPartitioner partitioner = 
Schema.instance.getExistingTablePartitioner(tableId);
+            ByteBufferUtil.skipShortLength(in);
+        }
+
         @Override
         public PartitionKey deserialize(DataInputPlus in, int version) throws 
IOException
         {
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
index d33805a091..5c622ee680 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
@@ -42,6 +42,7 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 
 public class AsyncLoader
 {
@@ -126,6 +127,7 @@ public class AsyncLoader
                 referenceAndAssembleReadsForKey(key, context.timestampsForKey, 
commandStore.timestampsForKeyCache(), listenChains);
                 break;
             case COMMANDS:
+            case RECOVERY:
                 referenceAndAssembleReadsForKey(key, context.commandsForKey, 
commandStore.commandsForKeyCache(), listenChains);
             case NONE:
                 break;
@@ -141,7 +143,7 @@ public class AsyncLoader
         keys.forEach(key -> referenceAndAssembleReadsForKey(key, context, 
cache, listenChains));
     }
 
-    private AsyncResult<?> referenceAndDispatchReads(AsyncOperation.Context 
context)
+    private AsyncResult<?> referenceAndDispatchReads(@Nullable TxnId 
primaryTxnId, AsyncOperation.Context context)
     {
         List<AsyncChain<?>> chains = new ArrayList<>();
 
@@ -154,7 +156,7 @@ public class AsyncLoader
                 keys.forEach(key -> referenceAndAssembleReadsForKey(key, 
context, chains));
                 break;
             case Range:
-                chains.add(referenceAndDispatchReadsForRange(context));
+                chains.add(referenceAndDispatchReadsForRange(primaryTxnId, 
context));
                 break;
             default:
                 throw new UnsupportedOperationException("Unable to process 
keys of " + keysOrRanges.domain());
@@ -163,7 +165,7 @@ public class AsyncLoader
         return !chains.isEmpty() ? AsyncChains.reduce(chains, (a, b) -> 
null).beginAsResult() : null;
     }
 
-    private AsyncChain<?> 
referenceAndDispatchReadsForRange(AsyncOperation.Context context)
+    private AsyncChain<?> referenceAndDispatchReadsForRange(@Nullable TxnId 
primaryTxnId, AsyncOperation.Context context)
     {
         Ranges ranges = ((AbstractRanges) keysOrRanges).toRanges();
 
@@ -183,6 +185,8 @@ public class AsyncLoader
                     cached.add(pk);
             }
         }
+
+        // TODO (required): this needs to be optimised (e.g. to not load 
redundant commands, but maybe to be avoided altogether with async evaluation)
         Watcher watcher = new Watcher();
         commandStore.commandsForKeyCache().register(watcher);
         root.add(findOverlappingKeys(ranges).flatMap(keys -> {
@@ -195,7 +199,7 @@ public class AsyncLoader
             return chains.isEmpty() ? AsyncChains.success(null) : 
AsyncChains.reduce(chains, (a, b) -> null);
         }, commandStore));
 
-        var chain = commandStore.diskCommandsForRanges().get(ranges);
+        var chain = commandStore.diskCommandsForRanges().get(primaryTxnId, 
keyHistory, ranges);
         root.add(chain);
         context.commandsForRanges = new AccordSafeCommandsForRanges(ranges, 
chain);
 
@@ -234,7 +238,7 @@ public class AsyncLoader
         this.state = state;
     }
 
-    public boolean load(AsyncOperation.Context context, BiConsumer<Object, 
Throwable> callback)
+    public boolean load(@Nullable TxnId primaryTxnId, AsyncOperation.Context 
context, BiConsumer<Object, Throwable> callback)
     {
         logger.trace("Running load for {} with state {}: {} {}", callback, 
state, txnIds, keysOrRanges);
         commandStore.checkInStoreThread();
@@ -244,7 +248,7 @@ public class AsyncLoader
                 state(State.SETUP);
 
             case SETUP:
-                readResult = referenceAndDispatchReads(context);
+                readResult = referenceAndDispatchReads(primaryTxnId, context);
                 state(State.LOADING);
 
             case LOADING:
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index 2c2867aa65..cc5bee6d32 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -240,7 +240,7 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
             case INITIALIZED:
                 state(LOADING);
             case LOADING:
-                if (!loader.load(context, this::onLoaded))
+                if (!loader.load(preLoadContext.primaryTxnId(), context, 
this::onLoaded))
                     return false;
                 state(PREPARING);
                 if (loadOnly)
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index 16acd2ab0b..a1205d74a2 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -111,6 +111,13 @@ public class CommandSerializers
             if (!ownsEqualsTouches) 
KeySerializers.participants.serialize(t.owns(), out, version);
         }
 
+        public void skip(DataInputPlus in, int version) throws IOException
+        {
+            int flags = in.readByte();
+            if (0 != (flags & HAS_ROUTE))
+                KeySerializers.route.deserialize(in, version);
+        }
+
         @Override
         public StoreParticipants deserialize(DataInputPlus in, int version) 
throws IOException
         {
@@ -194,6 +201,11 @@ public class CommandSerializers
             TopologySerializers.nodeId.serialize(ts.node, out);
         }
 
+        public void skip(DataInputPlus in, int version) throws IOException
+        {
+            in.skipBytesFully(serializedSize());
+        }
+
         @Override
         public T deserialize(DataInputPlus in, int version) throws IOException
         {
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java 
b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
index f2f9f8ac1a..fa1cfe1697 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
@@ -50,6 +50,8 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.AccordRoutableKey;
+import 
org.apache.cassandra.service.accord.api.AccordRoutableKey.AccordKeySerializer;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.NullableSerializer;
@@ -58,11 +60,11 @@ public class KeySerializers
 {
     private KeySerializers() {}
 
-    public static final IVersionedSerializer<Key> key = 
(IVersionedSerializer<Key>) (IVersionedSerializer<?>) PartitionKey.serializer;
-    public static final IVersionedSerializer<RoutingKey> routingKey = 
(IVersionedSerializer<RoutingKey>) (IVersionedSerializer<?>) 
AccordRoutingKey.serializer;
+    public static final AccordKeySerializer<Key> key = 
(AccordKeySerializer<Key>) (AccordKeySerializer<?>) PartitionKey.serializer;
+    public static final IVersionedSerializer<RoutingKey> routingKey = 
(AccordKeySerializer<RoutingKey>) (AccordKeySerializer<?>) 
AccordRoutingKey.serializer;
     public static final IVersionedSerializer<RoutingKey> nullableRoutingKey = 
NullableSerializer.wrap(routingKey);
 
-    public static final IVersionedSerializer<RoutingKeys> routingKeys = new 
AbstractKeysSerializer<RoutingKey, RoutingKeys>(routingKey, RoutingKey[]::new)
+    public static final AbstractKeysSerializer<RoutingKey, RoutingKeys> 
routingKeys = new AbstractKeysSerializer<>(routingKey, RoutingKey[]::new)
     {
         @Override RoutingKeys deserialize(DataInputPlus in, int version, 
RoutingKey[] keys)
         {
@@ -78,7 +80,7 @@ public class KeySerializers
         }
     };
 
-    public static final IVersionedSerializer<Ranges> ranges = new 
AbstractRangesSerializer<Ranges>()
+    public static final AbstractRangesSerializer<Ranges> ranges = new 
AbstractRangesSerializer<Ranges>()
     {
         @Override
         public Ranges deserialize(DataInputPlus in, int version, Range[] 
ranges)
@@ -87,7 +89,7 @@ public class KeySerializers
         }
     };
 
-    public static final IVersionedSerializer<PartialKeyRoute> partialKeyRoute 
= new AbstractKeysSerializer<RoutingKey, PartialKeyRoute>(routingKey, 
RoutingKey[]::new)
+    public static final AbstractKeysSerializer<?, PartialKeyRoute> 
partialKeyRoute = new AbstractKeysSerializer<RoutingKey, 
PartialKeyRoute>(routingKey, RoutingKey[]::new)
     {
         @Override PartialKeyRoute deserialize(DataInputPlus in, int version, 
RoutingKey[] keys) throws IOException
         {
@@ -110,7 +112,7 @@ public class KeySerializers
         }
     };
 
-    public static final IVersionedSerializer<FullKeyRoute> fullKeyRoute = new 
AbstractKeysSerializer<>(routingKey, RoutingKey[]::new)
+    public static final AbstractKeysSerializer<?, FullKeyRoute> fullKeyRoute = 
new AbstractKeysSerializer<>(routingKey, RoutingKey[]::new)
     {
         @Override FullKeyRoute deserialize(DataInputPlus in, int version, 
RoutingKey[] keys) throws IOException
         {
@@ -133,7 +135,7 @@ public class KeySerializers
         }
     };
 
-    public static final IVersionedSerializer<PartialRangeRoute> 
partialRangeRoute = new AbstractRangesSerializer<PartialRangeRoute>()
+    public static final AbstractRangesSerializer<PartialRangeRoute> 
partialRangeRoute = new AbstractRangesSerializer<>()
     {
         @Override PartialRangeRoute deserialize(DataInputPlus in, int version, 
Range[] rs) throws IOException
         {
@@ -157,7 +159,7 @@ public class KeySerializers
         }
     };
 
-    public static final IVersionedSerializer<FullRangeRoute> fullRangeRoute = 
new AbstractRangesSerializer<FullRangeRoute>()
+    public static final AbstractRangesSerializer<FullRangeRoute> 
fullRangeRoute = new AbstractRangesSerializer<>()
     {
         @Override FullRangeRoute deserialize(DataInputPlus in, int version, 
Range[] Ranges) throws IOException
         {
@@ -180,7 +182,7 @@ public class KeySerializers
         }
     };
 
-    public static final IVersionedSerializer<Route<?>> route = new 
AbstractRoutablesSerializer<>(
+    public static final AbstractRoutablesSerializer<Route<?>> route = new 
AbstractRoutablesSerializer<>(
         EnumSet.of(UnseekablesKind.PartialKeyRoute, 
UnseekablesKind.FullKeyRoute, UnseekablesKind.PartialRangeRoute, 
UnseekablesKind.FullRangeRoute)
     );
     public static final IVersionedSerializer<Route<?>> nullableRoute = 
NullableSerializer.wrap(route);
@@ -271,6 +273,21 @@ public class KeySerializers
             return result;
         }
 
+        public void skip(DataInputPlus in, int version) throws IOException
+        {
+            byte b = in.readByte();
+            switch (b)
+            {
+                default: throw new IOException("Corrupted input: expected byte 
1, 2, 3, 4 or 5; received " + b);
+                case 1: routingKeys.skip(in, version); break;
+                case 2: partialKeyRoute.skip(in, version); break;
+                case 3: fullKeyRoute.skip(in, version); break;
+                case 4: ranges.skip(in, version); break;
+                case 5: partialRangeRoute.skip(in, version); break;
+                case 6: fullRangeRoute.skip(in, version); break;
+            }
+        }
+
         @Override
         public long serializedSize(RS t, int version)
         {
@@ -362,6 +379,13 @@ public class KeySerializers
 
         abstract KS deserialize(DataInputPlus in, int version, K[] keys) 
throws IOException;
 
+        public void skip(DataInputPlus in, int version) throws IOException
+        {
+            int count = in.readUnsignedVInt32();
+            for (int i = 0; i < count ; i++)
+                keySerializer.deserialize(in, version);
+        }
+
         @Override
         public KS deserialize(DataInputPlus in, int version) throws IOException
         {
@@ -391,6 +415,13 @@ public class KeySerializers
                 TokenRange.serializer.serialize((TokenRange) ranges.get(i), 
out, version);
         }
 
+        public void skip(DataInputPlus in, int version) throws IOException
+        {
+            int count = in.readUnsignedVInt32();
+            for (int i = 0; i < count ; i++)
+                TokenRange.serializer.deserialize(in, version);
+        }
+
         @Override
         public RS deserialize(DataInputPlus in, int version) throws IOException
         {
diff --git 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
index a24cc81e9b..eebfda397a 100644
--- 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
+++ 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalCompactionTest.java
@@ -19,9 +19,12 @@
 package org.apache.cassandra.service.accord;
 
 import java.nio.file.Files;
+import java.util.Collections;
+import java.util.List;
 import java.util.NavigableMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedMap;
 import org.junit.Assert;
 import org.junit.Before;
@@ -88,9 +91,9 @@ public class AccordJournalCompactionTest
 
         RedundantBeforeAccumulator redundantBeforeAccumulator = new 
RedundantBeforeAccumulator();
         DurableBeforeAccumulator durableBeforeAccumulator = new 
DurableBeforeAccumulator();
-        IdentityAccumulator<NavigableMap<TxnId, Ranges>> 
bootstrapBeganAtAccumulator = new 
IdentityAccumulator<>(ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY));
-        IdentityAccumulator<NavigableMap<Timestamp, Ranges>> 
safeToReadAccumulator = new 
IdentityAccumulator<>(ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY));
-        IdentityAccumulator<RangesForEpoch.Snapshot> rangesForEpochAccumulator 
= new IdentityAccumulator<>(null);
+        NavigableMap<Timestamp, Ranges> safeToReadAtAccumulator = 
ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY);
+        NavigableMap<TxnId, Ranges> bootstrapBeganAtAccumulator = 
ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY);
+        RangesForEpoch.Snapshot rangesForEpochAccumulator = null;
         HistoricalTransactionsAccumulator historicalTransactionsAccumulator = 
new HistoricalTransactionsAccumulator();
 
         Gen<RedundantBefore> redundantBeforeGen = 
AccordGenerators.redundantBefore(DatabaseDescriptor.getPartitioner());
@@ -118,7 +121,7 @@ public class AccordJournalCompactionTest
             journal.start(null);
             Timestamp timestamp = Timestamp.NONE;
 
-            RandomSource rs = new DefaultRandom();
+            RandomSource rs = new DefaultRandom(1);
 
             int count = 1_000;
 //            RedundantBefore redundantBefore = RedundantBefore.EMPTY;
@@ -140,9 +143,11 @@ public class AccordJournalCompactionTest
                 redundantBeforeAccumulator.update(updates.newRedundantBefore);
                 durableBeforeAccumulator.update(addDurableBefore);
                 if (updates.newBootstrapBeganAt != null)
-                    
bootstrapBeganAtAccumulator.update(updates.newBootstrapBeganAt);
-                safeToReadAccumulator.update(updates.newSafeToRead);
-                rangesForEpochAccumulator.update(updates.newRangesForEpoch);
+                    bootstrapBeganAtAccumulator = updates.newBootstrapBeganAt;
+                if (updates.newSafeToRead != null)
+                    safeToReadAtAccumulator = updates.newSafeToRead;
+                if (updates.newRangesForEpoch != null)
+                    rangesForEpochAccumulator = updates.newRangesForEpoch;
                 
historicalTransactionsAccumulator.update(updates.addHistoricalTransactions);
 
                 if (i % 100 == 0)
@@ -153,10 +158,12 @@ public class AccordJournalCompactionTest
 
 //            Assert.assertEquals(redundantBeforeAccumulator.get(), 
journal.loadRedundantBefore(1));
             Assert.assertEquals(durableBeforeAccumulator.get(), 
journal.durableBeforePersister().load());
-            Assert.assertEquals(bootstrapBeganAtAccumulator.get(), 
journal.loadBootstrapBeganAt(1));
-            Assert.assertEquals(safeToReadAccumulator.get(), 
journal.loadSafeToRead(1));
-            Assert.assertEquals(rangesForEpochAccumulator.get(), 
journal.loadRangesForEpoch(1));
-            Assert.assertEquals(historicalTransactionsAccumulator.get(), 
journal.loadHistoricalTransactions(1));
+            Assert.assertEquals(bootstrapBeganAtAccumulator, 
journal.loadBootstrapBeganAt(1));
+            Assert.assertEquals(safeToReadAtAccumulator, 
journal.loadSafeToRead(1));
+            Assert.assertEquals(rangesForEpochAccumulator, 
journal.loadRangesForEpoch(1));
+            List<Deps> historical = historicalTransactionsAccumulator.get();
+            Collections.reverse(historical);
+            Assert.assertEquals(historical, 
journal.loadHistoricalTransactions(1));
         }
         finally
         {
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
 
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
index 4c339ebcfe..73bb343489 100644
--- 
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
@@ -105,7 +105,7 @@ public class AccordJournalSimulationTest extends 
SimulationTestBase
             for (int i = 0; i < count; i++)
             {
                 State.logger.debug("Reading {}", i);
-                Assert.assertEquals(State.journal.readFirst("test" + i), 
"test" + i);
+                Assert.assertEquals(State.journal.readLast("test" + i), "test" 
+ i);
             }
         }
 
diff --git 
a/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java 
b/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java
index fb278581df..bf41340c88 100644
--- a/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java
@@ -49,11 +49,13 @@ import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.Routable;
 import accord.primitives.Route;
+import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.utils.RandomSource;
 import org.agrona.collections.Int2ObjectHashMap;
 import org.agrona.collections.Long2ObjectHashMap;
+import org.agrona.collections.ObjectHashSet;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -297,7 +299,8 @@ public class AccordIndexStressTest extends CQLTester
                 }
 
                 var startNs = nanoTime();
-                Set<TxnId> actual = read(store, start, end);
+                // TODO (desired): randomise lower bound for reading
+                Set<TxnId> actual = read(store, start, end, TxnId.NONE, 
Timestamp.MAX);
                 var durationNs = nanoTime() - startNs;
                 samples[size] = durationNs;
                 counts[size++] = actual.size();
@@ -334,32 +337,36 @@ public class AccordIndexStressTest extends CQLTester
         return durationNs >= SLOW_NS;
     }
 
-    private Set<TxnId> read(int store, AccordRoutingKey start, 
AccordRoutingKey end)
+    private Set<TxnId> read(int store, AccordRoutingKey start, 
AccordRoutingKey end, TxnId minTxnId, Timestamp maxTxnId)
     {
         switch (read)
         {
             case INDEX:
-                return readIndex(store, start, end);
+                return readIndex(store, start, end, minTxnId, maxTxnId);
             case CQL:
-                return readCQL(store, start, end);
+                return readCQL(store, start, end, minTxnId, maxTxnId);
             default:
                 throw new AssertionError("Unknown read type: " + read);
         }
     }
 
-    private Set<TxnId> readIndex(int store, AccordRoutingKey start, 
AccordRoutingKey end)
+    private Set<TxnId> readIndex(int store, AccordRoutingKey start, 
AccordRoutingKey end, TxnId minTxnId, Timestamp maxTxnId)
     {
-        return searcher.intersects(store, start, end);
+        return searcher.intersects(store, start, end, minTxnId, maxTxnId);
     }
 
-    private Set<TxnId> readCQL(int store, AccordRoutingKey start, 
AccordRoutingKey end)
+    private Set<TxnId> readCQL(int store, AccordRoutingKey start, 
AccordRoutingKey end, TxnId minTxnId, Timestamp maxTxnId)
     {
-        Set<TxnId> actual = new HashSet<>();
+        Set<TxnId> actual = new ObjectHashSet<>();
         try
         {
             UntypedResultSet results = execute("SELECT txn_id FROM 
system_accord.commands WHERE store_id = ? AND route > ? AND route <= ?", store, 
OrderedRouteSerializer.serializeRoutingKey(start), 
OrderedRouteSerializer.serializeRoutingKey(end));
             for (var row : results)
-                actual.add(AccordKeyspace.deserializeTxnId(row));
+            {
+                TxnId txnId = AccordKeyspace.deserializeTxnId(row);
+                if (txnId.compareTo(minTxnId) >= 0 && 
txnId.compareTo(maxTxnId) < 0)
+                    actual.add(txnId);
+            }
         }
         catch (ReadSizeAbortException e)
         {
diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java 
b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
index bd5bc5eb93..3716236756 100644
--- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
@@ -44,6 +44,7 @@ import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.Routable.Domain;
 import accord.primitives.Route;
+import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.utils.Gen;
@@ -345,7 +346,7 @@ public class RouteIndexTest extends CQLTester.InMemory
         @Override
         public Set<TxnId> run(ColumnFamilyStore sut) throws Throwable
         {
-            return ROUTES_SEARCHER.intersects(storeId, range);
+            return ROUTES_SEARCHER.intersects(storeId, range, TxnId.NONE, 
Timestamp.MAX);
         }
 
         @Override
diff --git a/test/unit/org/apache/cassandra/journal/IndexTest.java 
b/test/unit/org/apache/cassandra/journal/IndexTest.java
index 8f1046f2c3..ce5ed00ebe 100644
--- a/test/unit/org/apache/cassandra/journal/IndexTest.java
+++ b/test/unit/org/apache/cassandra/journal/IndexTest.java
@@ -32,6 +32,7 @@ import com.google.common.collect.Maps;
 import org.junit.Assert;
 import org.junit.Test;
 
+import accord.utils.Invariants;
 import org.agrona.collections.IntHashSet;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.utils.Generators;
@@ -87,8 +88,8 @@ public class IndexTest
         assertArrayEquals(EMPTY, index.lookUp(key0));
 
         assertArrayEquals(new long[] { composeOffsetAndSize(val11, 1) }, 
index.lookUp(key1));
-        assertArrayEquals(new long[] { composeOffsetAndSize(val21, 2), 
composeOffsetAndSize(val22, 3) }, index.lookUp(key2));
-        assertArrayEquals(new long[] { composeOffsetAndSize(val31, 4), 
composeOffsetAndSize(val32, 5), composeOffsetAndSize(val33, 6) }, 
index.lookUp(key3));
+        assertArrayEquals(new long[] { composeOffsetAndSize(val22, 3), 
composeOffsetAndSize(val21, 2) }, index.lookUp(key2));
+        assertArrayEquals(new long[] { composeOffsetAndSize(val33, 6), 
composeOffsetAndSize(val32, 5), composeOffsetAndSize(val31, 4) }, 
index.lookUp(key3));
         assertArrayEquals(EMPTY, index.lookUp(key4));
 
         assertEquals(key1, index.firstId());
@@ -160,13 +161,23 @@ public class IndexTest
         Gen<long[]> valueGen = rs -> {
             long[] array = new long[(int) rs.next(valueSizeConstraint)];
             IntHashSet uniq = new IntHashSet();
-            for (int i = 0; i < array.length; i++)
+            for (int i = 0 ; i < array.length ; ++i)
             {
                 int offset = (int) rs.next(positionConstraint);
                 while (!uniq.add(offset))
                     offset = (int) rs.next(positionConstraint);
                 array[i] = Index.composeOffsetAndSize(offset, (int) 
rs.next(positionConstraint));
             }
+
+            Arrays.sort(array);
+            for (int i = 0 ; i < array.length / 2 ; ++i)
+            {
+                int back = array.length - (1 + i);
+                long v = array[i];
+                array[i] = array[back];
+                array[back] = v;
+            }
+
             return array;
         };
         Gen<Map<TimeUUID, long[]>> gen = rs -> {
@@ -190,7 +201,7 @@ public class IndexTest
         });
         File directory = new File(Files.createTempDirectory(null));
         directory.deleteOnExit();
-        qt().forAll(gen).checkAssert(map -> test(directory, map));
+        qt().withFixedSeed(185124544959375L).forAll(gen).checkAssert(map -> 
test(directory, map));
     }
 
     private static void test(File directory, Map<TimeUUID, long[]> map)
@@ -206,7 +217,8 @@ public class IndexTest
                 continue;
             for (long i : value)
                 inMemory.update(key, Index.readOffset(i), Index.readSize(i));
-            Arrays.sort(value);
+            for (int i = 1 ; i < value.length ; ++i)
+                Invariants.checkState(value[i - 1] > value[i]);
         }
         assertIndex(map, inMemory);
 
@@ -262,6 +274,9 @@ public class IndexTest
             long[] value = e.getValue();
             long[] read = actual.lookUp(key);
 
+            if (!Arrays.equals(value, read))
+                actual.lookUp(key);
+
             if (value.length == 0)
             {
                 assertThat(read).describedAs("Index %s returned wrong values 
for %s", actual, key).isEmpty();
diff --git a/test/unit/org/apache/cassandra/journal/JournalTest.java 
b/test/unit/org/apache/cassandra/journal/JournalTest.java
index 30952a96d8..7dd1c948a8 100644
--- a/test/unit/org/apache/cassandra/journal/JournalTest.java
+++ b/test/unit/org/apache/cassandra/journal/JournalTest.java
@@ -65,20 +65,20 @@ public class JournalTest
         journal.blockingWrite(id3, 3L, Collections.singleton(1));
         journal.blockingWrite(id4, 4L, Collections.singleton(1));
 
-        assertEquals(1L, (long) journal.readFirst(id1));
-        assertEquals(2L, (long) journal.readFirst(id2));
-        assertEquals(3L, (long) journal.readFirst(id3));
-        assertEquals(4L, (long) journal.readFirst(id4));
+        assertEquals(1L, (long) journal.readLast(id1));
+        assertEquals(2L, (long) journal.readLast(id2));
+        assertEquals(3L, (long) journal.readLast(id3));
+        assertEquals(4L, (long) journal.readLast(id4));
 
         journal.shutdown();
 
         journal = new Journal<>("TestJournal", directory, TestParams.INSTANCE, 
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop());
         journal.start();
 
-        assertEquals(1L, (long) journal.readFirst(id1));
-        assertEquals(2L, (long) journal.readFirst(id2));
-        assertEquals(3L, (long) journal.readFirst(id3));
-        assertEquals(4L, (long) journal.readFirst(id4));
+        assertEquals(1L, (long) journal.readLast(id1));
+        assertEquals(2L, (long) journal.readLast(id2));
+        assertEquals(3L, (long) journal.readLast(id3));
+        assertEquals(4L, (long) journal.readLast(id4));
 
         journal.shutdown();
     }
diff --git a/test/unit/org/apache/cassandra/journal/SegmentTest.java 
b/test/unit/org/apache/cassandra/journal/SegmentTest.java
index 573ba4c9e0..d78fae8ecf 100644
--- a/test/unit/org/apache/cassandra/journal/SegmentTest.java
+++ b/test/unit/org/apache/cassandra/journal/SegmentTest.java
@@ -76,22 +76,22 @@ public class SegmentTest
         // read all 4 entries by id and compare with originals
         EntrySerializer.EntryHolder<TimeUUID> holder = new 
EntrySerializer.EntryHolder<>();
 
-        segment.readFirst(id1, holder);
+        segment.readLast(id1, holder);
         assertEquals(id1, holder.key);
         assertEquals(hosts1, holder.hosts);
         assertEquals(record1, holder.value);
 
-        segment.readFirst(id2, holder);
+        segment.readLast(id2, holder);
         assertEquals(id2, holder.key);
         assertEquals(hosts2, holder.hosts);
         assertEquals(record2, holder.value);
 
-        segment.readFirst(id3, holder);
+        segment.readLast(id3, holder);
         assertEquals(id3, holder.key);
         assertEquals(hosts3, holder.hosts);
         assertEquals(record3, holder.value);
 
-        segment.readFirst(id4, holder);
+        segment.readLast(id4, holder);
         assertEquals(id4, holder.key);
         assertEquals(hosts4, holder.hosts);
         assertEquals(record4, holder.value);
@@ -143,22 +143,22 @@ public class SegmentTest
         // read all 4 entries by id and compare with originals
         EntrySerializer.EntryHolder<TimeUUID> holder = new 
EntrySerializer.EntryHolder<>();
 
-        staticSegment.readFirst(id1, holder);
+        staticSegment.readLast(id1, holder);
         assertEquals(id1, holder.key);
         assertEquals(hosts1, holder.hosts);
         assertEquals(record1, holder.value);
 
-        staticSegment.readFirst(id2, holder);
+        staticSegment.readLast(id2, holder);
         assertEquals(id2, holder.key);
         assertEquals(hosts2, holder.hosts);
         assertEquals(record2, holder.value);
 
-        staticSegment.readFirst(id3, holder);
+        staticSegment.readLast(id3, holder);
         assertEquals(id3, holder.key);
         assertEquals(hosts3, holder.hosts);
         assertEquals(record3, holder.value);
 
-        staticSegment.readFirst(id4, holder);
+        staticSegment.readLast(id4, holder);
         assertEquals(id4, holder.key);
         assertEquals(hosts4, holder.hosts);
         assertEquals(record4, holder.value);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/CommandsForRangesTest.java 
b/test/unit/org/apache/cassandra/service/accord/CommandsForRangesTest.java
index 93e35bc92c..115a231d5f 100644
--- a/test/unit/org/apache/cassandra/service/accord/CommandsForRangesTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/CommandsForRangesTest.java
@@ -51,7 +51,7 @@ public class CommandsForRangesTest
         for (int i = 0; i < numTxn; i++)
         {
             TxnId id = TXN_ID_GEN.next(rs);
-            map.put(id, new CommandsForRangesLoader.Summary(id, id, 
SaveStatus.ReadyToExecute, ranges, Collections.emptyList()));
+            map.put(id, new CommandsForRangesLoader.Summary(id, id, 
SaveStatus.ReadyToExecute, ranges, null, false));
         }
         return CommandsForRanges.create(ranges, map);
     };
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index 78af413da8..680777bde8 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -123,7 +123,7 @@ public class AsyncLoaderTest
         // everything is cached, so the loader should return immediately
         commandStore.executeBlocking(() -> {
             Context context = new Context();
-            boolean result = loader.load(context, (o, t) -> Assert.fail());
+            boolean result = loader.load(txnId, context, (o, t) -> 
Assert.fail());
             Assert.assertEquals(safeCommandGlobal, 
context.commands.get(txnId).global());
             Assert.assertEquals(safeTimestampsGlobal, 
context.timestampsForKey.get(key).global());
             Assert.assertTrue(result);
@@ -162,7 +162,7 @@ public class AsyncLoaderTest
         AsyncPromise<Void> cbFired = new AsyncPromise<>();
         Context context = new Context();
         commandStore.executeBlocking(() -> {
-            boolean result = loader.load(context, (o, t) -> {
+            boolean result = loader.load(txnId, context, (o, t) -> {
                 Assert.assertNull(t);
                 Assert.assertTrue(context.commands.containsKey(txnId));
                 Assert.assertTrue(context.timestampsForKey.containsKey(key));
@@ -175,7 +175,7 @@ public class AsyncLoaderTest
 
         // then return immediately after the callback has fired
         commandStore.executeBlocking(() -> {
-            boolean result = loader.load(context, (o, t) -> Assert.fail());
+            boolean result = loader.load(txnId, context, (o, t) -> 
Assert.fail());
             Assert.assertTrue(context.commands.containsKey(txnId));
             Assert.assertTrue(context.timestampsForKey.containsKey(key));
             Assert.assertTrue(result);
@@ -210,7 +210,7 @@ public class AsyncLoaderTest
         AsyncPromise<Void> cbFired = new AsyncPromise<>();
         Context context = new Context();
         commandStore.executeBlocking(() -> {
-            boolean result = loader.load(context, (o, t) -> {
+            boolean result = loader.load(txnId, context, (o, t) -> {
                 Assert.assertNull(t);
                 Assert.assertTrue(context.commands.containsKey(txnId));
                 Assert.assertTrue(context.timestampsForKey.containsKey(key));
@@ -225,7 +225,7 @@ public class AsyncLoaderTest
         // then return immediately after the callback has fired
         commandStore.executeBlocking(() -> {
 
-            boolean result = loader.load(context, (o, t) -> Assert.fail());
+            boolean result = loader.load(txnId, context, (o, t) -> 
Assert.fail());
             Assert.assertTrue(context.commands.containsKey(txnId));
             Assert.assertTrue(context.timestampsForKey.containsKey(key));
             Assert.assertTrue(result);
@@ -260,7 +260,7 @@ public class AsyncLoaderTest
         AsyncPromise<Void> cbFired = new AsyncPromise<>();
         Context context = new Context();
         commandStore.executeBlocking(() -> {
-            boolean result = loader.load(context, (o, t) -> {
+            boolean result = loader.load(txnId, context, (o, t) -> {
                 Assert.assertNull(t);
                 Assert.assertTrue(context.commands.containsKey(txnId));
                 Assert.assertFalse(context.timestampsForKey.containsKey(key));
@@ -277,7 +277,7 @@ public class AsyncLoaderTest
 
         // then return immediately after the callback has fired
         commandStore.executeBlocking(() -> {
-            boolean result = loader.load(context, (o, t) -> Assert.fail());
+            boolean result = loader.load(txnId, context, (o, t) -> 
Assert.fail());
             Assert.assertTrue(context.commands.containsKey(txnId));
             Assert.assertFalse(context.timestampsForKey.containsKey(key));
             Assert.assertTrue(result);
@@ -322,7 +322,7 @@ public class AsyncLoaderTest
 
             AsyncLoader loader = new AsyncLoader(commandStore, 
ImmutableList.of(txnId1, txnId2), RoutingKeys.EMPTY, KeyHistory.COMMANDS);
 
-            boolean result =  loader.load(new Context(), (u, t) -> {
+            boolean result =  loader.load(txnId1, new Context(), (u, t) -> {
                 Assert.assertFalse(callback.isDone());
                 Assert.assertNull(u);
                 Assert.assertEquals(failure, t);
@@ -369,7 +369,7 @@ public class AsyncLoaderTest
         AsyncPromise<Void> cbFired = new AsyncPromise<>();
         Context context = new Context();
         commandStore.executeBlocking(() -> {
-            boolean result = loader.load(context, (o, t) -> {
+            boolean result = loader.load(txnId, context, (o, t) -> {
                 Assert.assertNull(t);
                 Assert.assertTrue(context.commands.containsKey(txnId));
                 cbFired.setSuccess(null);
@@ -384,7 +384,7 @@ public class AsyncLoaderTest
 
         // then return immediately after the callback has fired
         commandStore.executeBlocking(() -> {
-            boolean result = loader.load(context, (o, t) -> Assert.fail());
+            boolean result = loader.load(txnId, context, (o, t) -> 
Assert.fail());
             Assert.assertTrue(context.commands.containsKey(txnId));
             Assert.assertTrue(result);
         });
@@ -432,7 +432,7 @@ public class AsyncLoaderTest
         AsyncPromise<Void> cbFired = new AsyncPromise<>();
         Context context = new Context();
         commandStore.executeBlocking(() -> {
-            boolean result = loader.load(context, (o, t) -> {
+            boolean result = loader.load(txnId, context, (o, t) -> {
                 Assert.assertNull(t);
                 Assert.assertEquals(context.timestampsForKey.containsKey(key), 
inContext.apply(context) == context.timestampsForKey);
                 Assert.assertEquals(context.commandsForKey.containsKey(key), 
inContext.apply(context) == context.commandsForKey);
@@ -446,7 +446,7 @@ public class AsyncLoaderTest
 
         // then return immediately after the callback has fired
         commandStore.executeBlocking(() -> {
-            boolean result = loader.load(context, (o, t) -> Assert.fail());
+            boolean result = loader.load(txnId, context, (o, t) -> 
Assert.fail());
             Assert.assertEquals(context.timestampsForKey.containsKey(key), 
inContext.apply(context) == context.timestampsForKey);
             Assert.assertEquals(context.commandsForKey.containsKey(key), 
inContext.apply(context) == context.commandsForKey);
             Assert.assertTrue(result);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
 
b/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
index 39b12a862a..124749025d 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java
@@ -235,7 +235,7 @@ public class SimulatedAsyncOperationTest extends 
SimulatedAccordCommandStoreTest
         }
 
         @Override
-        public boolean load(AsyncOperation.Context context, BiConsumer<Object, 
Throwable> callback)
+        public boolean load(TxnId primaryTxnId, AsyncOperation.Context 
context, BiConsumer<Object, Throwable> callback)
         {
             if (delay)
             {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to