This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new 1bf039709e Truncate mutation journal as logs get reconciled
1bf039709e is described below

commit 1bf039709ec72fa6b08a31effb0972fe63757f0f
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Tue Sep 23 16:44:45 2025 +0100

    Truncate mutation journal as logs get reconciled
    
    patch by Aleksey Yeschenko; reviewed by Alex Petrov for CASSANDRA-20710
---
 .../apache/cassandra/journal/ActiveSegment.java    |  41 ++-
 .../org/apache/cassandra/journal/Component.java    |  10 +-
 .../org/apache/cassandra/journal/Descriptor.java   |   9 +-
 .../org/apache/cassandra/journal/DumpUtil.java     |   2 +-
 .../apache/cassandra/journal/EntrySerializer.java  |  11 +-
 src/java/org/apache/cassandra/journal/Journal.java | 198 ++++++++----
 .../apache/cassandra/journal/JournalReadError.java |   4 +-
 .../cassandra/journal/JournalWriteError.java       |   4 +-
 .../org/apache/cassandra/journal/KeyStats.java     |  97 ++++++
 .../org/apache/cassandra/journal/KeySupport.java   |   4 +-
 .../org/apache/cassandra/journal/Metadata.java     |   3 +-
 .../org/apache/cassandra/journal/OnDiskIndex.java  |   5 +-
 src/java/org/apache/cassandra/journal/Segment.java |  10 +
 .../org/apache/cassandra/journal/Segments.java     |  34 +-
 .../apache/cassandra/journal/StaticSegment.java    |  47 ++-
 .../apache/cassandra/journal/ValueSerializer.java  |  21 ++
 .../cassandra/replication/CoordinatorLog.java      |   5 +
 .../cassandra/replication/CoordinatorLogId.java    |   2 +-
 .../cassandra/replication/Log2OffsetsMap.java      |  10 +-
 .../cassandra/replication/MutationJournal.java     | 351 ++++++++++++++++++---
 .../replication/MutationTrackingService.java       |  38 ++-
 .../org/apache/cassandra/replication/Offsets.java  |  23 +-
 .../org/apache/cassandra/replication/Shard.java    |   5 +
 .../cassandra/service/accord/AccordJournal.java    |  22 +-
 .../cassandra/service/accord/JournalKey.java       |   4 +-
 .../service/reads/tracked/ReadReconciliations.java |   8 +-
 .../service/reads/tracked/TrackedLocalReads.java   |  17 +-
 .../service/reads/tracked/TrackedRead.java         |  10 +-
 src/java/org/apache/cassandra/utils/Crc.java       |  20 ++
 .../org/apache/cassandra/utils/FBUtilities.java    |   2 +
 .../tracking/MutationTrackingLogPersisterTest.java | 100 ++++++
 .../test/AccordJournalSimulationTest.java          |  17 +-
 .../org/apache/cassandra/journal/JournalTest.java  |  18 +-
 .../org/apache/cassandra/journal/SegmentTest.java  |  12 +-
 .../org/apache/cassandra/journal/SegmentsTest.java |   9 +-
 .../cassandra/journal/TimeUUIDKeySupport.java      |  11 +-
 .../cassandra/replication/MutationJournalTest.java | 181 +++++++++--
 .../apache/cassandra/replication/OffsetsTest.java  |  42 +++
 38 files changed, 1159 insertions(+), 248 deletions(-)

diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java 
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index 140371f980..e84ca40bcf 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -79,12 +79,19 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
     private final Ref<Segment<K, V>> selfRef;
 
     final InMemoryIndex<K> index;
+    final KeyStats.Active<K> keyStats;
 
     private ActiveSegment(
-        Descriptor descriptor, Params params, InMemoryIndex<K> index, Metadata 
metadata, KeySupport<K> keySupport)
+        Descriptor descriptor,
+        Params params,
+        InMemoryIndex<K> index,
+        Metadata metadata,
+        KeyStats.Active<K> keyStats,
+        KeySupport<K> keySupport)
     {
         super(descriptor, metadata, keySupport);
         this.index = index;
+        this.keyStats = keyStats;
         try
         {
             channel = FileChannel.open(file.toPath(), 
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
@@ -98,16 +105,12 @@ public final class ActiveSegment<K, V> extends Segment<K, 
V>
         }
     }
 
-    public CommitLogPosition currentPosition()
-    {
-        return new CommitLogPosition(id(), (int) allocateOffset);
-    }
-
-    static <K, V> ActiveSegment<K, V> create(Descriptor descriptor, Params 
params, KeySupport<K> keySupport)
+    static <K, V> ActiveSegment<K, V> create(
+        Descriptor descriptor, Params params, KeySupport<K> keySupport, 
KeyStats.Factory<K> keyStatsFactory)
     {
         InMemoryIndex<K> index = InMemoryIndex.create(keySupport);
         Metadata metadata = Metadata.empty();
-        return new ActiveSegment<>(descriptor, params, index, metadata, 
keySupport);
+        return new ActiveSegment<>(descriptor, params, index, metadata, 
keyStatsFactory.create(), keySupport);
     }
 
     @Override
@@ -116,6 +119,16 @@ public final class ActiveSegment<K, V> extends Segment<K, 
V>
         return index;
     }
 
+    public KeyStats.Active<K> keyStats()
+    {
+        return keyStats;
+    }
+
+    public CommitLogPosition currentPosition()
+    {
+        return new CommitLogPosition(id(), (int) allocateOffset);
+    }
+
     boolean isEmpty()
     {
         return allocateOffset == 0;
@@ -225,6 +238,7 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
     {
         index.persist(descriptor);
         metadata.persist(descriptor);
+        keyStats.persist(descriptor);
         SyncUtil.trySyncDir(descriptor.directory);
     }
 
@@ -236,6 +250,7 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
         descriptor.fileFor(Component.DATA).deleteIfExists();
         descriptor.fileFor(Component.INDEX).deleteIfExists();
         descriptor.fileFor(Component.METADATA).deleteIfExists();
+        descriptor.fileFor(Component.KEYSTATS).deleteIfExists();
     }
 
     @Override
@@ -290,6 +305,7 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
         }
     }
 
+    @Override
     public boolean isFlushed(long position)
     {
         return writtenTo >= position;
@@ -465,18 +481,14 @@ public final class ActiveSegment<K, V> extends Segment<K, 
V>
             this.buffer = buffer;
         }
 
-        Segment<K, V> segment()
-        {
-            return ActiveSegment.this;
-        }
-
         void write(K id, ByteBuffer record)
         {
             try
             {
                 EntrySerializer.write(id, record, keySupport, buffer, 
descriptor.userVersion);
-                metadata.update();
                 index.update(id, position, length);
+                keyStats.update(id);
+                metadata.update();
             }
             catch (IOException e)
             {
@@ -508,6 +520,7 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
             {
                 EntrySerializer.write(id, record, keySupport, buffer, 
descriptor.userVersion);
                 index.update(id, position, length);
+                keyStats.update(id);
                 metadata.update();
             }
             catch (IOException e)
diff --git a/src/java/org/apache/cassandra/journal/Component.java 
b/src/java/org/apache/cassandra/journal/Component.java
index 07da71536a..8e99708de0 100644
--- a/src/java/org/apache/cassandra/journal/Component.java
+++ b/src/java/org/apache/cassandra/journal/Component.java
@@ -24,15 +24,17 @@ import org.apache.cassandra.io.util.File;
 
 import static accord.utils.SortedArrays.SortedArrayList.ofSorted;
 
-enum Component
+public enum Component
 {
-    DATA           ("data"),
-    INDEX          ("indx"),
-    METADATA       ("meta");
+    DATA     ("data"),
+    INDEX    ("indx"),
+    METADATA ("meta"),
+    KEYSTATS ("keys");
     //OFFSET_MAP     (".offs"),
     //INVLALIDATIONS (".invl");
 
     public static final List<Component> VALUES = ofSorted(values());
+
     final String extension;
 
     Component(String extension)
diff --git a/src/java/org/apache/cassandra/journal/Descriptor.java 
b/src/java/org/apache/cassandra/journal/Descriptor.java
index bac6c7029f..e8e8556b56 100644
--- a/src/java/org/apache/cassandra/journal/Descriptor.java
+++ b/src/java/org/apache/cassandra/journal/Descriptor.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.util.File;
 
 import static java.lang.String.format;
@@ -90,7 +92,8 @@ public final class Descriptor implements 
Comparable<Descriptor>
         this.userVersion = userVersion;
     }
 
-    static Descriptor create(File directory, long timestamp, int userVersion)
+    @VisibleForTesting
+    public static Descriptor create(File directory, long timestamp, int 
userVersion)
     {
         return new Descriptor(directory, timestamp, 1, 
CURRENT_JOURNAL_VERSION, userVersion);
     }
@@ -114,12 +117,12 @@ public final class Descriptor implements 
Comparable<Descriptor>
         return fromName(file.parent(), file.name());
     }
 
-    File fileFor(Component component)
+    public File fileFor(Component component)
     {
         return new File(directory, formatFileName(component));
     }
 
-    File tmpFileFor(Component component)
+    public File tmpFileFor(Component component)
     {
         return new File(directory, formatFileName(component) + '.' + 
TMP_SUFFIX);
     }
diff --git a/src/java/org/apache/cassandra/journal/DumpUtil.java 
b/src/java/org/apache/cassandra/journal/DumpUtil.java
index d98e758384..404a688a38 100644
--- a/src/java/org/apache/cassandra/journal/DumpUtil.java
+++ b/src/java/org/apache/cassandra/journal/DumpUtil.java
@@ -37,6 +37,6 @@ public class DumpUtil
 
     public static <K, V> StaticSegment<K, V> open(Descriptor descriptor, 
KeySupport<K> keySupport)
     {
-        return StaticSegment.open(descriptor, keySupport);
+        return StaticSegment.open(descriptor, keySupport, 
KeyStats.Factory.noop());
     }
 }
diff --git a/src/java/org/apache/cassandra/journal/EntrySerializer.java 
b/src/java/org/apache/cassandra/journal/EntrySerializer.java
index 454234454e..6861cdae47 100644
--- a/src/java/org/apache/cassandra/journal/EntrySerializer.java
+++ b/src/java/org/apache/cassandra/journal/EntrySerializer.java
@@ -27,11 +27,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.zip.CRC32;
 
-import static org.apache.cassandra.journal.Journal.validateCRC;
-
 /**
  * Entry format:
- *
  *   [Total Size (4 bytes)]
  *   [Header (variable size)]
  *   [Header CRC (4 bytes)]
@@ -95,10 +92,10 @@ public final class EntrySerializer
             CRC32 crc = Crc.crc32();
             int headerSize = EntrySerializer.headerSize(keySupport, 
userVersion);
             int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize);
-            validateCRC(crc, headerCrc);
+            Crc.validate(crc, headerCrc);
 
             int recordCrc = readAndUpdateRecordCrc(crc, from, start + 
totalSize);
-            validateCRC(crc, recordCrc);
+            Crc.validate(crc, recordCrc);
         }
 
         readValidated(into, from, start, keySupport, userVersion);
@@ -142,7 +139,7 @@ public final class EntrySerializer
                 int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize);
                 try
                 {
-                    validateCRC(crc, headerCrc);
+                    Crc.validate(crc, headerCrc);
                 }
                 catch (IOException e)
                 {
@@ -152,7 +149,7 @@ public final class EntrySerializer
                 int recordCrc = readAndUpdateRecordCrc(crc, from, start + 
totalSize);
                 try
                 {
-                    validateCRC(crc, recordCrc);
+                    Crc.validate(crc, recordCrc);
                 }
                 catch (IOException e)
                 {
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index cf3135d77f..136d70a9dc 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -24,14 +24,15 @@ import java.nio.file.FileStore;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.LockSupport;
 import java.util.function.*;
-import java.util.zip.CRC32;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -56,7 +57,6 @@ import 
org.apache.cassandra.journal.Segments.ReferencedSegments;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.Crc;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.Simulate;
@@ -77,10 +77,12 @@ import static 
org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
 
 /**
  * A generic append-only journal with some special features:
- * <p><ul>
+ * <p>
+ * <ul>
  * <li>Records can be looked up by key
  * <li>Invalidated records get purged during segment compaction
- * </ul><p>
+ * </ul>
+ * </p>
  *
  * Type parameters:
  * @param <V> the type of records stored in the journal
@@ -98,6 +100,7 @@ public class Journal<K, V> implements Shutdownable
 
     final KeySupport<K> keySupport;
     final ValueSerializer<K, V> valueSerializer;
+    final KeyStats.Factory<K> keyStatsFactory;
 
     final Metrics<K, V> metrics;
 
@@ -170,6 +173,7 @@ public class Journal<K, V> implements Shutdownable
             this.onFlush = onFlush;
         }
 
+        @Override
         public void run()
         {
             onFlush.run();
@@ -206,11 +210,60 @@ public class Journal<K, V> implements Shutdownable
         }
     }
 
+    public static class Builder<K, V>
+    {
+        private final String name;
+        private final File directory;
+        private final Params params;
+        private final KeySupport<K> keySupport;
+
+        private ValueSerializer<K, V> valueSerializer = ValueSerializer.none();
+        private KeyStats.Factory<K> keyStatsFactory = KeyStats.Factory.noop();
+        private SegmentCompactor<K, V> segmentCompactor = 
SegmentCompactor.noop();
+
+        public Builder(String name, File directory, Params params, 
KeySupport<K> keySupport)
+        {
+            this.name = name;
+            this.directory = directory;
+            this.params = params;
+            this.keySupport = keySupport;
+        }
+
+        public Journal<K, V> build()
+        {
+            return new Journal<>(name, directory, params, keySupport, 
valueSerializer, keyStatsFactory, segmentCompactor);
+        }
+
+        public Builder<K, V> valueSerializer(ValueSerializer<K, V> 
valueSerializer)
+        {
+            this.valueSerializer = valueSerializer;
+            return this;
+        }
+
+        public Builder<K, V> keyStatsFactory(KeyStats.Factory<K> 
keyStatsFactory)
+        {
+            this.keyStatsFactory = keyStatsFactory;
+            return this;
+        }
+
+        public Builder<K, V> segmentCompactor(SegmentCompactor<K, V> 
segmentCompactor)
+        {
+            this.segmentCompactor = segmentCompactor;
+            return this;
+        }
+    }
+
+    public static <K, V> Builder<K, V> builder(String name, File directory, 
Params params, KeySupport<K> keySupport)
+    {
+        return new Builder<>(name, directory, params, keySupport);
+    }
+
     public Journal(String name,
                    File directory,
                    Params params,
                    KeySupport<K> keySupport,
                    ValueSerializer<K, V> valueSerializer,
+                   KeyStats.Factory<K> keyStatsFactory,
                    SegmentCompactor<K, V> segmentCompactor)
     {
         this.name = name;
@@ -219,6 +272,7 @@ public class Journal<K, V> implements Shutdownable
 
         this.keySupport = keySupport;
         this.valueSerializer = valueSerializer;
+        this.keyStatsFactory = keyStatsFactory;
 
         this.metrics = new Metrics<>(name);
         this.flusherCallbacks = new FlusherCallbacks();
@@ -250,7 +304,7 @@ public class Journal<K, V> implements Shutdownable
                           : descriptors.get(descriptors.size() - 1).timestamp;
         nextSegmentId.set(replayLimit = Math.max(currentTimeMillis(), 
maxTimestamp + 1));
 
-        segments.set(Segments.of(StaticSegment.open(descriptors, keySupport)));
+        segments.set(Segments.of(StaticSegment.open(descriptors, keySupport, 
keyStatsFactory)));
         closer = executorFactory().sequential(name + "-closer");
         releaser = executorFactory().sequential(name + "-releaser");
         allocator = executorFactory().infiniteLoop(name + "-allocator", new 
AllocateRunnable(), SAFE, NON_DAEMON, SYNCHRONIZED);
@@ -261,12 +315,6 @@ public class Journal<K, V> implements Shutdownable
         compactor.start();
     }
 
-    @VisibleForTesting
-    public void runCompactorForTesting()
-    {
-        compactor.run();
-    }
-
     public Compactor<K, V> compactor()
     {
         return compactor;
@@ -287,6 +335,7 @@ public class Journal<K, V> implements Shutdownable
         return state.get() == State.TERMINATED;
     }
 
+    @Override
     public void shutdown()
     {
         try
@@ -606,13 +655,6 @@ public class Journal<K, V> implements Shutdownable
         }
     }
 
-    // TODO (require): Find a better way to test unwritten allocations and/or 
corruption
-    @VisibleForTesting
-    public void unsafeConsumeBytesForTesting(int entrySize, 
Consumer<ByteBuffer> corrupt)
-    {
-        allocate(entrySize).consumeBufferUnsafe(corrupt);
-    }
-
     private ActiveSegment<K, V>.Allocation allocate(int entrySize)
     {
         ActiveSegment<K, V> segment = currentSegment;
@@ -793,7 +835,7 @@ public class Journal<K, V> implements Shutdownable
     private ActiveSegment<K, V> createSegment()
     {
         Descriptor descriptor = Descriptor.create(directory, 
nextSegmentId.getAndIncrement(), params.userVersion());
-        return ActiveSegment.create(descriptor, params, keySupport);
+        return ActiveSegment.create(descriptor, params, keySupport, 
keyStatsFactory);
     }
 
     private void closeAllSegments()
@@ -810,7 +852,7 @@ public class Journal<K, V> implements Shutdownable
     }
 
     @SuppressWarnings("unused")
-    public ReferencedSegments<K, V> selectAndReference(Predicate<Segment<K,V>> 
selector)
+    ReferencedSegments<K, V> selectAndReference(Predicate<Segment<K,V>> 
selector)
     {
         while (true)
         {
@@ -847,6 +889,11 @@ public class Journal<K, V> implements Shutdownable
         swapSegments(current -> current.withoutEmptySegment(activeSegment));
     }
 
+    private void removeStaticSegments(Collection<StaticSegment<K, V>> 
staticSegments)
+    {
+        swapSegments(current -> current.withoutStaticSegments(staticSegments));
+    }
+
     private void replaceCompletedSegment(ActiveSegment<K, V> activeSegment, 
StaticSegment<K, V> staticSegment)
     {
         swapSegments(current -> current.withCompletedSegment(activeSegment, 
staticSegment));
@@ -948,7 +995,7 @@ public class Journal<K, V> implements Shutdownable
             activeSegment.updateWrittenTo();
             activeSegment.fsync();
             activeSegment.persistComponents();
-            replaceCompletedSegment(activeSegment, 
StaticSegment.open(activeSegment.descriptor, keySupport));
+            replaceCompletedSegment(activeSegment, 
StaticSegment.open(activeSegment.descriptor, keySupport, keyStatsFactory));
             activeSegment.release(Journal.this);
             if (onDone != null) onDone.run();
         }
@@ -971,27 +1018,16 @@ public class Journal<K, V> implements Shutdownable
         closer.execute(new CloseActiveSegmentRunnable(activeSegment, onDone));
     }
 
-    @VisibleForTesting
-    public void closeCurrentSegmentForTestingIfNonEmpty()
+    public int dropStaticSegments(Predicate<StaticSegment<K, V>> dropIf)
     {
-        ActiveSegment<K, V> segment = currentSegment;
-        if (segment.isEmpty())
-            return;
-        advanceSegment(segment);
-        while (!segments().isSwitched(segment))
-        {
-            LockSupport.parkNanos(1000);
-        }
-    }
-
-    /*
-     * Static helper methods used by journal components
-     */
-
-    static void validateCRC(CRC32 crc, int readCRC) throws Crc.InvalidCrc
-    {
-        if (readCRC != (int) crc.getValue())
-            throw new Crc.InvalidCrc(readCRC, (int) crc.getValue());
+        Set<StaticSegment<K, V>> toDrop = new HashSet<>();
+        segments().selectStatic(dropIf, toDrop);
+        if (toDrop.isEmpty())
+            return 0;
+        removeStaticSegments(toDrop);
+        for (StaticSegment<K, V> segment : toDrop)
+            segment.discard(this);
+        return toDrop.size();
     }
 
     /*
@@ -1045,24 +1081,6 @@ public class Journal<K, V> implements Shutdownable
                       message, segmentSize, availableDiskSpace, directory);
     }
 
-    @VisibleForTesting
-    public void truncateForTesting()
-    {
-        ActiveSegment<?, ?> discarding = currentSegment;
-        if (!discarding.isEmpty()) // if there is no data in the segement then 
ignore it
-        {
-            closeCurrentSegmentForTestingIfNonEmpty();
-            //TODO (desired): wait for the ActiveSegment to get released, else 
can see weird race conditions;
-            // this thread will see the static segmenet and will release it 
(which will delete the file),
-            // and the sync thread will then try to release and will fail as 
the file no longer exists...
-            while (discarding.selfRef().globalCount() > 0) {}
-        }
-
-        Segments<K, V> statics = swapSegments(s -> 
s.select(Segment::isActive)).select(Segment::isStatic);
-        for (Segment<K, V> segment : statics.all())
-            ((StaticSegment) segment).discard(this);
-    }
-
     public interface Writer
     {
         void write(DataOutputPlus out, int userVersion) throws IOException;
@@ -1279,4 +1297,66 @@ public class Journal<K, V> implements Shutdownable
         SHUTDOWN,
         TERMINATED
     }
+
+    /*
+     * Test helpers
+     */
+
+    @VisibleForTesting
+    public void unsafeConsumeBytesForTesting(int entrySize, 
Consumer<ByteBuffer> corrupt)
+    {
+        // TODO (require): Find a better way to test unwritten allocations 
and/or corruption
+        allocate(entrySize).consumeBufferUnsafe(corrupt);
+    }
+
+    @VisibleForTesting
+    public void truncateForTesting()
+    {
+        ActiveSegment<?, ?> discarding = currentSegment;
+        if (!discarding.isEmpty()) // if there is no data in the segment then 
ignore it
+        {
+            closeCurrentSegmentForTestingIfNonEmpty();
+            //TODO (desired): wait for the ActiveSegment to get released, else 
can see weird race conditions;
+            // this thread will see the static segmenet and will release it 
(which will delete the file),
+            // and the sync thread will then try to release and will fail as 
the file no longer exists...
+            while (discarding.selfRef().globalCount() > 0) {}
+        }
+
+        Segments<K, V> statics = swapSegments(s -> 
s.select(Segment::isActive)).select(Segment::isStatic);
+        for (Segment<K, V> segment : statics.all())
+            ((StaticSegment) segment).discard(this);
+    }
+
+    @VisibleForTesting
+    public void runCompactorForTesting()
+    {
+        compactor.run();
+    }
+
+    @VisibleForTesting
+    public void closeCurrentSegmentForTestingIfNonEmpty()
+    {
+        ActiveSegment<K, V> segment = currentSegment;
+        if (segment.isEmpty())
+            return;
+        advanceSegment(segment);
+        while (!segments().isSwitched(segment))
+        {
+            LockSupport.parkNanos(1000);
+        }
+    }
+
+    @VisibleForTesting
+    public void clearNeedsReplayForTesting()
+    {
+        Set<StaticSegment<K, V>> toReset = new HashSet<>();
+        segments().selectStatic(toReset);
+        toReset.forEach(s -> s.metadata().clearNeedsReplay());
+    }
+
+    @VisibleForTesting
+    public int countStaticSegmentsForTesting()
+    {
+        return segments.get().count(Segment::isStatic);
+    }
 }
diff --git a/src/java/org/apache/cassandra/journal/JournalReadError.java 
b/src/java/org/apache/cassandra/journal/JournalReadError.java
index 87366c8d7c..70ae829b03 100644
--- a/src/java/org/apache/cassandra/journal/JournalReadError.java
+++ b/src/java/org/apache/cassandra/journal/JournalReadError.java
@@ -24,13 +24,13 @@ public class JournalReadError extends FSReadError
 {
     public final Descriptor descriptor;
 
-    JournalReadError(Descriptor descriptor, File file, Throwable throwable)
+    public JournalReadError(Descriptor descriptor, File file, Throwable 
throwable)
     {
         super(throwable, file);
         this.descriptor = descriptor;
     }
 
-    JournalReadError(Descriptor descriptor, Component component, Throwable 
throwable)
+    public JournalReadError(Descriptor descriptor, Component component, 
Throwable throwable)
     {
         super(throwable, descriptor.fileFor(component));
         this.descriptor = descriptor;
diff --git a/src/java/org/apache/cassandra/journal/JournalWriteError.java 
b/src/java/org/apache/cassandra/journal/JournalWriteError.java
index 03193af545..ef3f278345 100644
--- a/src/java/org/apache/cassandra/journal/JournalWriteError.java
+++ b/src/java/org/apache/cassandra/journal/JournalWriteError.java
@@ -24,13 +24,13 @@ public class JournalWriteError extends FSWriteError
 {
     public final Descriptor descriptor;
 
-    JournalWriteError(Descriptor descriptor, File file, Throwable throwable)
+    public JournalWriteError(Descriptor descriptor, File file, Throwable 
throwable)
     {
         super(throwable, file);
         this.descriptor = descriptor;
     }
 
-    JournalWriteError(Descriptor descriptor, Component component, Throwable 
throwable)
+    public JournalWriteError(Descriptor descriptor, Component component, 
Throwable throwable)
     {
         super(throwable, descriptor.fileFor(component));
         this.descriptor = descriptor;
diff --git a/src/java/org/apache/cassandra/journal/KeyStats.java 
b/src/java/org/apache/cassandra/journal/KeyStats.java
new file mode 100644
index 0000000000..86f5811040
--- /dev/null
+++ b/src/java/org/apache/cassandra/journal/KeyStats.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.journal;
+
+public interface KeyStats<K>
+{
+    KeyStats<?> NOOP = (KeyStats<Object>) o -> true;
+
+    static <K> KeyStats<K> noop()
+    {
+        //noinspection unchecked
+        return (KeyStats<K>) NOOP;
+    }
+
+    boolean mayContain(K k);
+
+    interface Active<K> extends KeyStats<K>
+    {
+        Active<Object> NOOP = new Active<>()
+        {
+            @Override
+            public void update(Object key)
+            {
+                // no-op
+            }
+
+            @Override
+            public boolean mayContain(Object key)
+            {
+                return true;
+            }
+
+            @Override
+            public void persist(Descriptor descriptor)
+            {
+                // no-op
+            }
+        };
+
+        void update(K key);
+        void persist(Descriptor descriptor);
+    }
+
+    interface Static<K> extends KeyStats<K>
+    {
+        Static<Object> NOOP = key -> true;
+    }
+
+    interface Factory<K>
+    {
+        Factory<?> NOOP = new Factory<>()
+        {
+            @Override
+            public Active<Object> create()
+            {
+                return Active.NOOP;
+            }
+
+            @Override
+            public Static<Object> load(Descriptor descriptor)
+            {
+                return Static.NOOP;
+            }
+
+            @Override
+            public Active<Object> rebuild(Descriptor descriptor, 
KeySupport<Object> keySupport, int fsyncedLimit)
+            {
+                return Active.NOOP;
+            }
+        };
+
+        static <K> Factory<K> noop()
+        {
+            //noinspection unchecked
+            return (Factory<K>) NOOP;
+        }
+
+        Active<K> create();
+        Static<K> load(Descriptor descriptor);
+        Active<K> rebuild(Descriptor descriptor, KeySupport<K> keySupport, int 
fsyncedLimit);
+    }
+}
diff --git a/src/java/org/apache/cassandra/journal/KeySupport.java 
b/src/java/org/apache/cassandra/journal/KeySupport.java
index efc41aa6c8..5e1a7845f2 100644
--- a/src/java/org/apache/cassandra/journal/KeySupport.java
+++ b/src/java/org/apache/cassandra/journal/KeySupport.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.journal;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
-import java.util.zip.Checksum;
+import java.util.zip.CRC32;
 
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -43,7 +43,7 @@ public interface KeySupport<K> extends Comparator<K>
     K deserialize(ByteBuffer buffer, int position, int userVersion);
     K deserialize(ByteBuffer buffer, int userVersion);
 
-    void updateChecksum(Checksum crc, K key, int userVersion);
+    void updateChecksum(CRC32 crc, K key, int userVersion);
 
     int compareWithKeyAt(K key, ByteBuffer buffer, int position, int 
userVersion);
 }
diff --git a/src/java/org/apache/cassandra/journal/Metadata.java 
b/src/java/org/apache/cassandra/journal/Metadata.java
index 6f9552a229..a411713c16 100644
--- a/src/java/org/apache/cassandra/journal/Metadata.java
+++ b/src/java/org/apache/cassandra/journal/Metadata.java
@@ -25,7 +25,6 @@ import java.util.zip.CRC32;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.utils.Crc;
 
-import static org.apache.cassandra.journal.Journal.validateCRC;
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 
 /**
@@ -110,7 +109,7 @@ public final class Metadata
         updateChecksumInt(crc, recordsCount);
         updateChecksumInt(crc, fsyncLimit);
         updateChecksumInt(crc, needsReplay ? 1 : 0);
-        validateCRC(crc, in.readInt());
+        Crc.validate(crc, in.readInt());
         return new Metadata(recordsCount, fsyncLimit, needsReplay);
     }
 
diff --git a/src/java/org/apache/cassandra/journal/OnDiskIndex.java 
b/src/java/org/apache/cassandra/journal/OnDiskIndex.java
index 0a2ca8cc95..abe94ff1f2 100644
--- a/src/java/org/apache/cassandra/journal/OnDiskIndex.java
+++ b/src/java/org/apache/cassandra/journal/OnDiskIndex.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.Crc;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 
-import static org.apache.cassandra.journal.Journal.validateCRC;
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumLong;
 
@@ -136,11 +135,11 @@ final class OnDiskIndex<K> extends Index<K>
         {
             int entryCount = in.readInt();
             updateChecksumInt(crc, entryCount);
-            validateCRC(crc, in.readInt());
+            Crc.validate(crc, in.readInt());
 
             Crc.updateCrc32(crc, buffer, FILE_PREFIX_SIZE, FILE_PREFIX_SIZE + 
entryCount * ENTRY_SIZE);
             in.skipBytesFully(entryCount * ENTRY_SIZE);
-            validateCRC(crc, in.readInt());
+            Crc.validate(crc, in.readInt());
 
             if (in.available() != 0)
                 throw new IOException("Trailing data encountered in segment 
index " + descriptor.fileFor(Component.INDEX));
diff --git a/src/java/org/apache/cassandra/journal/Segment.java 
b/src/java/org/apache/cassandra/journal/Segment.java
index 6a9e80432f..9bc394c26f 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -63,6 +63,7 @@ public abstract class Segment<K, V> implements 
SelfRefCounted<Segment<K, V>>, Co
     }
 
     abstract Index<K> index();
+    public abstract KeyStats<K> keyStats();
 
     abstract boolean isActive();
     abstract boolean isFlushed(long position);
@@ -90,6 +91,9 @@ public abstract class Segment<K, V> implements 
SelfRefCounted<Segment<K, V>>, Co
 
     boolean readLast(K id, RecordConsumer<K> consumer)
     {
+        if (!keyStats().mayContain(id))
+            return false;
+
         long offsetAndSize = index().lookUpLast(id);
         if (offsetAndSize == -1)
             return false;
@@ -108,6 +112,9 @@ public abstract class Segment<K, V> implements 
SelfRefCounted<Segment<K, V>>, Co
 
     boolean readLast(K id, EntrySerializer.EntryHolder<K> into)
     {
+        if (!keyStats().mayContain(id))
+            return false;
+
         long offsetAndSize = index().lookUpLast(id);
         if (offsetAndSize == -1 || !read(Index.readOffset(offsetAndSize), 
Index.readSize(offsetAndSize), into))
             return false;
@@ -128,6 +135,9 @@ public abstract class Segment<K, V> implements 
SelfRefCounted<Segment<K, V>>, Co
 
     void readAll(K id, EntrySerializer.EntryHolder<K> into, RecordConsumer<K> 
onEntry)
     {
+        if (!keyStats().mayContain(id))
+            return;
+
         long[] all = index().lookUpAll(id);
         int prevOffset = Integer.MAX_VALUE;
         for (int i = 0; i < all.length; i++)
diff --git a/src/java/org/apache/cassandra/journal/Segments.java 
b/src/java/org/apache/cassandra/journal/Segments.java
index 4e01bd47b4..dae1bbeaac 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -67,11 +67,27 @@ class Segments<K, V>
     Segments<K, V> withoutEmptySegment(ActiveSegment<K, V> activeSegment)
     {
         Long2ObjectHashMap<Segment<K, V>> newSegments = new 
Long2ObjectHashMap<>(segments);
-        Segment<K, V> oldValue = 
segments.remove(activeSegment.descriptor.timestamp);
+        Segment<K, V> oldValue = 
newSegments.remove(activeSegment.descriptor.timestamp);
+        Invariants.nonNull(oldValue);
         Invariants.require(oldValue.asActive().isEmpty());
         return new Segments<>(newSegments);
     }
 
+    Segments<K, V> withoutStaticSegments(Collection<StaticSegment<K, V>> 
removeSegments)
+    {
+        if (removeSegments.isEmpty())
+            return this;
+
+        Long2ObjectHashMap<Segment<K, V>> newSegments = new 
Long2ObjectHashMap<>(segments);
+        for (StaticSegment<K, V> segment : removeSegments)
+        {
+            Segment<K, V> oldValue = 
newSegments.remove(segment.descriptor.timestamp);
+            if (oldValue != null)
+                Invariants.require(oldValue.isStatic());
+        }
+        return new Segments<>(newSegments);
+    }
+
     Segments<K, V> withCompletedSegment(ActiveSegment<K, V> activeSegment, 
StaticSegment<K, V> staticSegment)
     {
         
Invariants.requireArgument(activeSegment.descriptor.equals(staticSegment.descriptor));
@@ -192,6 +208,13 @@ class Segments<K, V>
                 into.add(segment.asStatic());
     }
 
+    void selectStatic(Predicate<StaticSegment<K, V>> filter, 
Collection<StaticSegment<K, V>> into)
+    {
+        for (Segment<K, V> segment : segments.values())
+            if (segment.isStatic() && filter.test(segment.asStatic()))
+                into.add(segment.asStatic());
+    }
+
     /**
      * Select segments that could potentially have an entry with the specified 
ids and
      * attempt to grab references to them all.
@@ -236,6 +259,15 @@ class Segments<K, V>
         return new Segments<>(selectedSegments);
     }
 
+    int count(Predicate<Segment<K, V>> filter)
+    {
+        int count = 0;
+        for (Segment<K, V> segment : segments.values())
+            if (filter.test(segment))
+                count++;
+        return count;
+    }
+
     static class ReferencedSegments<K, V> extends Segments<K, V> implements 
AutoCloseable
     {
         private final Refs<Segment<K, V>> refs;
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java 
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index 0dfbaaf838..efc2c796aa 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -52,16 +52,19 @@ public final class StaticSegment<K, V> extends Segment<K, V>
     private final Ref<Segment<K, V>> selfRef;
 
     private final OnDiskIndex<K> index;
+    private final KeyStats.Static<K> keyStats;
 
     private StaticSegment(Descriptor descriptor,
                           FileChannel channel,
                           MappedByteBuffer buffer,
                           OnDiskIndex<K> index,
                           Metadata metadata,
+                          KeyStats.Static<K> keyStats,
                           KeySupport<K> keySupport)
     {
         super(descriptor, metadata, keySupport);
         this.index = index;
+        this.keyStats = keyStats;
 
         this.channel = channel;
         this.fsyncLimit = metadata.fsyncLimit();
@@ -76,12 +79,12 @@ public final class StaticSegment<K, V> extends Segment<K, V>
      * @param descriptors descriptors of the segments to load
      * @return list of the loaded segments
      */
-    static <K, V> List<Segment<K, V>> open(Collection<Descriptor> descriptors, 
KeySupport<K> keySupport)
+    static <K, V> List<Segment<K, V>> open(Collection<Descriptor> descriptors, 
KeySupport<K> keySupport, KeyStats.Factory<K> keyStatsFactory)
     {
         List<Segment<K, V>> segments = new ArrayList<>(descriptors.size());
         for (Descriptor descriptor : descriptors)
         {
-            StaticSegment<K, V> segment = open(descriptor, keySupport);
+            StaticSegment<K, V> segment = open(descriptor, keySupport, 
keyStatsFactory);
             segments.add(segment);
         }
 
@@ -95,7 +98,7 @@ public final class StaticSegment<K, V> extends Segment<K, V>
      * @return the loaded segment
      */
     @SuppressWarnings({ "resource", "RedundantSuppression" })
-    static <K, V> StaticSegment<K, V> open(Descriptor descriptor, 
KeySupport<K> keySupport)
+    static <K, V> StaticSegment<K, V> open(Descriptor descriptor, 
KeySupport<K> keySupport, KeyStats.Factory<K> keyStatsFactory)
     {
         if (!Component.DATA.existsFor(descriptor))
             throw new IllegalArgumentException("Data file for segment " + 
descriptor + " doesn't exist");
@@ -135,9 +138,31 @@ public final class StaticSegment<K, V> extends Segment<K, 
V>
         if (index == null)
             index = OnDiskIndex.rebuildAndPersist(descriptor, keySupport, 
metadata.fsyncLimit());
 
+        KeyStats.Static<K> keyStats = null;
+
+        if (Component.KEYSTATS.existsFor(descriptor))
+        {
+            try
+            {
+                keyStats = keyStatsFactory.load(descriptor);
+            }
+            catch (Throwable t)
+            {
+                logger.error("Could not load keystats component for {}; 
rebuilding",  descriptor, t);
+                Component.KEYSTATS.markCorrupted(descriptor);
+            }
+        }
+
+        if (keyStats == null)
+        {
+            KeyStats.Active<K> active = keyStatsFactory.rebuild(descriptor, 
keySupport, metadata.fsyncLimit());
+            active.persist(descriptor);
+            keyStats = keyStatsFactory.load(descriptor);
+        }
+
         try
         {
-            return internalOpen(descriptor, index, metadata, keySupport);
+            return internalOpen(descriptor, index, metadata, keyStats, 
keySupport);
         }
         catch (IOException e)
         {
@@ -146,13 +171,13 @@ public final class StaticSegment<K, V> extends Segment<K, 
V>
     }
 
     private static <K, V> StaticSegment<K, V> internalOpen(
-        Descriptor descriptor, OnDiskIndex<K> index, Metadata metadata, 
KeySupport<K> keySupport)
+        Descriptor descriptor, OnDiskIndex<K> index, Metadata metadata, 
KeyStats.Static<K> keyStats, KeySupport<K> keySupport)
     throws IOException
     {
         File file = descriptor.fileFor(Component.DATA);
         FileChannel channel = FileChannel.open(file.toPath(), 
StandardOpenOption.READ);
         MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 
0, channel.size());
-        return new StaticSegment<>(descriptor, channel, buffer, index, 
metadata, keySupport);
+        return new StaticSegment<>(descriptor, channel, buffer, index, 
metadata, keyStats, keySupport);
     }
 
     public void close(Journal<K, V> journal)
@@ -242,6 +267,12 @@ public final class StaticSegment<K, V> extends Segment<K, 
V>
         return index;
     }
 
+    @Override
+    public KeyStats.Static<K> keyStats()
+    {
+        return keyStats;
+    }
+
     public int entryCount()
     {
         return index.entryCount();
@@ -402,7 +433,7 @@ public final class StaticSegment<K, V> extends Segment<K, V>
         }
     }
 
-    static <K> SequentialReader<K> sequentialReader(Descriptor descriptor, 
KeySupport<K> keySupport, int fsyncedLimit)
+    public static <K> SequentialReader<K> sequentialReader(Descriptor 
descriptor, KeySupport<K> keySupport, int fsyncedLimit)
     {
         return new SequentialReader<>(descriptor, keySupport, fsyncedLimit);
     }
@@ -415,7 +446,7 @@ public final class StaticSegment<K, V> extends Segment<K, V>
      * strictly, throwing {@link JournalReadError}. Errors encountered in 
unsynced portions
      * of segments are treated as segment EOF.
      */
-    static final class SequentialReader<K> extends Reader<K>
+    public static final class SequentialReader<K> extends Reader<K>
     {
         private final int fsyncedLimit; // exclusive
 
diff --git a/src/java/org/apache/cassandra/journal/ValueSerializer.java 
b/src/java/org/apache/cassandra/journal/ValueSerializer.java
index 69690d39b2..b031ffff14 100644
--- a/src/java/org/apache/cassandra/journal/ValueSerializer.java
+++ b/src/java/org/apache/cassandra/journal/ValueSerializer.java
@@ -24,6 +24,27 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 
 public interface ValueSerializer<K, V>
 {
+    ValueSerializer<?, ?> NONE = new ValueSerializer<>()
+    {
+        @Override
+        public void serialize(Object key, Object value, DataOutputPlus out, 
int userVersion)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Object deserialize(Object key, DataInputPlus in, int 
userVersion)
+        {
+            throw new UnsupportedOperationException();
+        }
+    };
+
+    static <K, V> ValueSerializer<K, V> none()
+    {
+        //noinspection unchecked
+        return (ValueSerializer<K, V>) NONE;
+    }
+
     void serialize(K key, V value, DataOutputPlus out, int userVersion) throws 
IOException;
 
     /**
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index cebfcb3ee2..6f4a4aed4c 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -401,6 +401,11 @@ public abstract class CoordinatorLog
         }
     }
 
+    void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
+    {
+        into.add(reconciledPersistedOffsets);
+    }
+
     boolean isDurablyReconciled(CoordinatorLogOffsets<?> logOffsets)
     {
         lock.readLock().lock();
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
index 101d95db9e..2f87f0ecf4 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
@@ -71,7 +71,7 @@ public class CoordinatorLogId implements Serializable
     @VisibleForTesting
     public static long asLong(int hostId, int hostLogId)
     {
-        return ((long) hostId << 32) | hostLogId;
+        return ((long) hostId << 32) | (hostLogId & 0xFFFFFFFFL);
     }
 
     static int hostId(long coordinatorLogId)
diff --git a/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java 
b/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java
index 8e3b2e1bc1..cd59e62050 100644
--- a/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java
+++ b/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java
@@ -56,10 +56,15 @@ public abstract class Log2OffsetsMap<T extends Offsets> 
implements Iterable<Shor
         return count;
     }
 
+    public boolean contains(ShortMutationId id)
+    {
+        Offsets offsets = asMap().get(id.logId());
+        return offsets != null &&  offsets.contains(id.offset());
+    }
+
     @Override
     public String toString()
     {
-
         StringBuilder builder = new StringBuilder("Log2OffsetsMap{");
         boolean isFirst = true;
         for (Map.Entry<Long, T> entry : asMap().entrySet())
@@ -110,10 +115,11 @@ public abstract class Log2OffsetsMap<T extends Offsets> 
implements Iterable<Shor
             return create(new CoordinatorLogId(logId));
         }
 
-        public void add(ShortMutationId id)
+        public AbstractMutable<T> add(ShortMutationId id)
         {
             T offsets = offsetMap.computeIfAbsent(id.logId(), this::create);
             offsets.add(id.offset());
+            return this;
         }
 
         public void add(Offsets offsets)
diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java 
b/src/java/org/apache/cassandra/replication/MutationJournal.java
index e56cc6c9de..e2a2a82e64 100644
--- a/src/java/org/apache/cassandra/replication/MutationJournal.java
+++ b/src/java/org/apache/cassandra/replication/MutationJournal.java
@@ -24,19 +24,16 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
-import java.util.zip.Checksum;
+import java.util.zip.CRC32;
+
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
-import org.apache.cassandra.journal.*;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import accord.utils.Invariants;
+import org.agrona.collections.Long2LongHashMap;
+import org.agrona.collections.Long2ObjectHashMap;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
@@ -47,11 +44,15 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.journal.*;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Crc;
 import org.apache.cassandra.utils.concurrent.Semaphore;
+import org.jctools.maps.NonBlockingHashMapLong;
 
 import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors;
 
@@ -59,7 +60,6 @@ import static 
org.apache.cassandra.utils.FBUtilities.getAvailableProcessors;
 public class MutationJournal
 {
     public static final MutationJournal instance = new MutationJournal();
-    private static final Logger log = 
LoggerFactory.getLogger(MutationJournal.class);
 
     private final Journal<ShortMutationId, Mutation> journal;
     private final Map<Long, SegmentStateTracker> segmentStateTrackers;
@@ -98,17 +98,25 @@ public class MutationJournal
     @VisibleForTesting
     MutationJournal(File directory, Params params)
     {
-        journal = new Journal<>("MutationJournal", directory, params, 
MutationIdSupport.INSTANCE, MutationSerializer.INSTANCE, 
SegmentCompactor.noop()) {
-            @Override
-            protected void 
closeActiveSegmentAndOpenAsStatic(ActiveSegment<ShortMutationId, Mutation> 
activeSegment, Runnable onDone)
-            {
-                super.closeActiveSegmentAndOpenAsStatic(activeSegment,
-                                                        () -> {
-                                                            
maybeCleanupStaticSegment(Invariants.nonNull(getSegment(activeSegment.id())));
-                                                            if (onDone != 
null) onDone.run();
-                                                        });
-            }
-        };
+        journal =
+            new Journal<>("MutationJournal",
+                          directory,
+                          params,
+                          MutationIdSupport.INSTANCE,
+                          MutationSerializer.INSTANCE,
+                          OffsetRangesFactory.INSTANCE,
+                          SegmentCompactor.noop())
+                          {
+                              // TODO (expected): a cleaner way to override 
it; pass a Callbacks object with sanctioned callbacks?
+                              @Override
+                              protected void 
closeActiveSegmentAndOpenAsStatic(ActiveSegment<ShortMutationId, Mutation> 
activeSegment, Runnable onDone)
+                              {
+                                  
super.closeActiveSegmentAndOpenAsStatic(activeSegment, () -> {
+                                      
maybeCleanupStaticSegment(Invariants.nonNull(getSegment(activeSegment.id())));
+                                      if (onDone != null) onDone.run();
+                                  });
+                              }
+                          };
         segmentStateTrackers = new NonBlockingHashMapLong<>();
     }
 
@@ -171,7 +179,12 @@ public class MutationJournal
     {
         for (Segment<ShortMutationId, Mutation> segment : 
journal.getSegments(lowerBound.segmentId, upperBound.segmentId))
         {
-            
Invariants.nonNull(segmentStateTrackers.get(segment.id())).markClean(tableId, 
lowerBound, upperBound);
+            SegmentStateTracker tracker = 
segmentStateTrackers.get(segment.id());
+            // upper flush bound can be first position in an empty active 
segment
+            if (tracker == null)
+                continue;
+
+            segmentStateTrackers.get(segment.id()).markClean(tableId, 
lowerBound, upperBound);
 
             // We can only safely mark static segments as non-replayable. 
Active segment can still be written to,
             // so we only persist this metadata on flush.
@@ -291,16 +304,12 @@ public class MutationJournal
                 // TODO: respect 
SystemKeyspace.getTruncatedPosition(cfs.metadata.id);
                 replayParallelism.acquireThrowUncheckedOnInterrupt(1);
                 Stage.MUTATION.submit(() -> journal.readLast(key, lastSegment, 
replayOne))
-                              .addCallback(new BiConsumer<Object, Throwable>()
-                {
-                    @Override
-                    public void accept(Object o, Throwable fail)
-                    {
-                        if (fail != null && !journal.handleError("Could not 
replay mutation " + key, fail))
-                            abort.set(true);
-                        replayParallelism.release(1);
-                    }
-                });
+                              .addCallback((BiConsumer<Object, Throwable>) (o, 
fail) ->
+                              {
+                                  if (fail != null && 
!journal.handleError("Could not replay mutation " + key, fail))
+                                      abort.set(true);
+                                  replayParallelism.release(1);
+                              });
             }
 
             // Wait for all mutations to be applied before returning
@@ -309,9 +318,12 @@ public class MutationJournal
     }
 
     @VisibleForTesting
-    public void closeCurrentSegmentForTestingIfNonEmpty()
+    public int dropReconciledSegments(Log2OffsetsMap<?> reconciledOffsets)
     {
-        journal.closeCurrentSegmentForTestingIfNonEmpty();
+        return journal.dropStaticSegments((segment) -> {
+            StaticOffsetRanges ranges = (StaticOffsetRanges) 
segment.keyStats();
+            return ranges.isFullyCovered(reconciledOffsets) && 
!segment.metadata().needsReplay();
+        });
     }
 
     public void readAll(RecordConsumer<ShortMutationId> consumer)
@@ -429,10 +441,10 @@ public class MutationJournal
         }
 
         @Override
-        public void updateChecksum(Checksum crc, ShortMutationId id, int 
userVersion)
+        public void updateChecksum(CRC32 crc, ShortMutationId id, int 
userVersion)
         {
-            FBUtilities.updateChecksumLong(crc, id.logId());
-            FBUtilities.updateChecksumInt(crc, id.offset());
+            Crc.updateWithLong(crc, id.logId());
+            Crc.updateWithInt(crc, id.offset());
         }
 
         @Override
@@ -453,6 +465,7 @@ public class MutationJournal
     public static class MutationSerializer implements 
ValueSerializer<ShortMutationId, Mutation>
     {
         public static MutationSerializer INSTANCE = new MutationSerializer();
+
         @Override
         public void serialize(ShortMutationId id, Mutation mutation, 
DataOutputPlus out, int userVersion) throws IOException
         {
@@ -467,4 +480,270 @@ public class MutationJournal
             return Mutation.serializer.deserialize(in, userVersion);
         }
     }
+
+    /*
+     * KeyStats component to track per log min and max offset in a segment
+     */
+
+    static abstract class OffsetRanges implements KeyStats<ShortMutationId>
+    {
+        @Override
+        public abstract boolean mayContain(ShortMutationId id);
+
+        protected static boolean mayContain(long range, ShortMutationId id)
+        {
+            return id.offset() >= minOffset(range) && id.offset() <= 
maxOffset(range);
+        }
+
+        protected static int minOffset(long range)
+        {
+            return (int) (range >>> 32);
+        }
+
+        protected static int maxOffset(long range)
+        {
+            return (int) range;
+        }
+
+        protected static long range(int minOffset, int maxOffset)
+        {
+            return ((long) minOffset << 32) | (maxOffset & 0xFFFFFFFFL);
+        }
+
+        abstract Map<Long, Long> asMap();
+
+        @Override
+        public String toString()
+        {
+            StringBuilder builder = new 
StringBuilder(getClass().getSimpleName());
+            builder.append('{');
+            for (Map.Entry<Long, Long> entry : asMap().entrySet())
+            {
+                CoordinatorLogId logId = new CoordinatorLogId(entry.getKey());
+                long range = entry.getValue();
+                int min = minOffset(range);
+                int max = maxOffset(range);
+                builder.append(logId)
+                       .append("->")
+                       .append('[')
+                       .append(min)
+                       .append(", ")
+                       .append(max)
+                       .append(']')
+                       .append(',');
+            }
+            return builder.append('}').toString();
+        }
+    }
+
+    // TODO (consider): an off-heap version
+    static class ActiveOffsetRanges extends OffsetRanges implements 
KeyStats.Active<ShortMutationId>
+    {
+        private final NonBlockingHashMapLong<Long> ranges;
+
+        ActiveOffsetRanges()
+        {
+            ranges = new NonBlockingHashMapLong<>();
+        }
+
+        @Override
+        protected Map<Long, Long> asMap()
+        {
+            return ranges;
+        }
+
+        @Override
+        public boolean mayContain(ShortMutationId id)
+        {
+            Long range = ranges.get(id.logId());
+            return range != null && mayContain(range, id);
+        }
+
+        @Override
+        @SuppressWarnings("WrapperTypeMayBePrimitive")
+        public void update(ShortMutationId id)
+        {
+            Long prev, next;
+            do
+            {
+                prev = ranges.get(id.logId());
+                int min = prev == null ? id.offset() : 
Math.min(minOffset(prev), id.offset());
+                int max = prev == null ? id.offset() : 
Math.max(maxOffset(prev), id.offset());
+                next = range(min, max);
+            }
+            while (!compareAndSet(id.logId(), prev, next));
+        }
+
+        // NonBlockingHashMapLong doesn't expose putIfMatch() directly, so we 
need to have this logic
+        private boolean compareAndSet(long logId, Long prevValue, Long 
nextValue)
+        {
+            return prevValue == null
+                 ? ranges.putIfAbsent(logId, nextValue) == null
+                 : ranges.replace(logId, prevValue, nextValue);
+        }
+
+        @Override
+        public void persist(Descriptor descriptor)
+        {
+            File tmpFile = descriptor.tmpFileFor(Component.KEYSTATS);
+            try (FileOutputStreamPlus out = new FileOutputStreamPlus(tmpFile))
+            {
+                write(out);
+
+                out.flush();
+                out.sync();
+            }
+            catch (IOException e)
+            {
+                throw new JournalWriteError(descriptor, tmpFile, e);
+            }
+            tmpFile.move(descriptor.fileFor(Component.KEYSTATS));
+        }
+
+        private void write(DataOutputPlus out) throws IOException
+        {
+            CRC32 crc = Crc.crc32();
+            int count = ranges.size();
+            out.writeInt(count);
+            Crc.updateWithInt(crc, count);
+            out.writeInt((int) crc.getValue());
+            for (Map.Entry<Long, Long> entry : ranges.entrySet())
+            {
+                long logId = entry.getKey();
+                long range = entry.getValue();
+                out.writeLong(logId);
+                out.writeLong(range);
+                Crc.updateWithLong(crc, logId);
+                Crc.updateWithLong(crc, range);
+            }
+            out.writeInt((int) crc.getValue());
+        }
+    }
+
+    static class StaticOffsetRanges extends OffsetRanges implements 
KeyStats.Static<ShortMutationId>
+    {
+        private static final long NO_VALUE = Long.MIN_VALUE;
+
+        private final Long2LongHashMap ranges;
+
+        StaticOffsetRanges(Long2LongHashMap ranges)
+        {
+            this.ranges = ranges;
+        }
+
+        @Override
+        protected Map<Long, Long> asMap()
+        {
+            return ranges;
+        }
+
+        @Override
+        public boolean mayContain(ShortMutationId id)
+        {
+            long range = ranges.get(id.logId());
+            return range != NO_VALUE && mayContain(range, id);
+        }
+
+        static StaticOffsetRanges read(DataInputPlus in) throws IOException
+        {
+            CRC32 crc = Crc.crc32();
+            int count = in.readInt();
+            Crc.updateWithInt(crc, count);
+            Crc.validate(crc, in.readInt());
+            Long2LongHashMap ranges = new Long2LongHashMap(count, 0.65f, 
NO_VALUE, false);
+            for (int i = 0; i < count; i++)
+            {
+                long logId = in.readLong();
+                long range = in.readLong();
+                Crc.updateWithLong(crc, logId);
+                Crc.updateWithLong(crc, range);
+                ranges.put(logId, range);
+            }
+            Crc.validate(crc, in.readInt());
+            return new StaticOffsetRanges(ranges);
+        }
+
+        /**
+         * @return whether all keys in the segment are fully covered by the 
specified (durably reconciled) offsets map
+         */
+        boolean isFullyCovered(Log2OffsetsMap<?> durablyReconciled)
+        {
+            Long2ObjectHashMap<Offsets> reconciledMap = 
((Log2OffsetsMap<Offsets>) durablyReconciled).asMap();
+            for (Long2LongHashMap.EntryIterator iter = 
ranges.entrySet().iterator(); iter.hasNext();)
+            {
+                iter.next();
+
+                long logId = iter.getLongKey();
+                long range = iter.getLongValue();
+                int min = minOffset(range);
+                int max = maxOffset(range);
+
+                Offsets offsets = reconciledMap.get(logId);
+                if (offsets == null)
+                    return false;
+                if (!offsets.containsRange(min, max))
+                    return false;
+            }
+            return true;
+        }
+    }
+
+    static final class OffsetRangesFactory implements 
KeyStats.Factory<ShortMutationId>
+    {
+        static final OffsetRangesFactory INSTANCE = new OffsetRangesFactory();
+
+        @Override
+        public ActiveOffsetRanges create()
+        {
+            return new ActiveOffsetRanges();
+        }
+
+        @Override
+        public StaticOffsetRanges load(Descriptor descriptor)
+        {
+            File file = descriptor.fileFor(Component.KEYSTATS);
+            try (FileInputStreamPlus in = new FileInputStreamPlus(file))
+            {
+                return StaticOffsetRanges.read(in);
+            }
+            catch (IOException e)
+            {
+                throw new JournalReadError(descriptor, file, e);
+            }
+        }
+
+        @Override
+        public ActiveOffsetRanges rebuild(Descriptor descriptor, 
KeySupport<ShortMutationId> keySupport, int fsyncedLimit)
+        {
+            ActiveOffsetRanges active = create();
+            try (StaticSegment.SequentialReader<ShortMutationId> reader = 
StaticSegment.sequentialReader(descriptor, keySupport, fsyncedLimit))
+            {
+                while (reader.advance())
+                    active.update(reader.key());
+            }
+            return active;
+        }
+    }
+
+    /*
+     * Test helpers
+     */
+
+    @VisibleForTesting
+    public void closeCurrentSegmentForTestingIfNonEmpty()
+    {
+        journal.closeCurrentSegmentForTestingIfNonEmpty();
+    }
+
+    @VisibleForTesting
+    void clearNeedsReplayForTesting()
+    {
+        journal.clearNeedsReplayForTesting();
+    }
+
+    @VisibleForTesting
+    public int countStaticSegmentsForTesting()
+    {
+        return journal.countStaticSegmentsForTesting();
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java 
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 535a4100f2..4009dbd39a 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -87,7 +87,7 @@ public class MutationTrackingService
 
     /**
      * Split ranges into this many shards.
-     *
+     * <p>
      * TODO (expected): ability to rebalance / change this constant
      */
     private static final int SHARD_MULTIPLIER = 8;
@@ -547,7 +547,6 @@ public class MutationTrackingService
         return false;
     }
 
-
     private static ConcurrentHashMap<String, KeyspaceShards> 
applyUpdatedMetadata(Map<String, KeyspaceShards> keyspaceShardsMap, @Nullable 
ClusterMetadata prev, ClusterMetadata next, LongSupplier logIdProvider, 
BiConsumer<Shard, CoordinatorLog> onNewLog)
     {
         Preconditions.checkNotNull(next);
@@ -619,6 +618,23 @@ public class MutationTrackingService
         }
     }
 
+    private void truncateMutationJournal()
+    {
+        Log2OffsetsMap.Mutable reconciledOffsets = new 
Log2OffsetsMap.Mutable();
+        collectDurablyReconciledOffsets(reconciledOffsets);
+        MutationJournal.instance.dropReconciledSegments(reconciledOffsets);
+    }
+
+    /**
+     * Collect every log's durably reconciled offsets. Every mutation covered
+     * by these offsets can be compacted away by the journal, assuming that all
+     * relevant memtables had been flushed to disk.
+     */
+    private void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
+    {
+        forEachKeyspace(keyspace -> 
keyspace.collectDurablyReconciledOffsets(into));
+    }
+
     public static class KeyspaceShards
     {
         private enum UpdateDecision
@@ -847,6 +863,11 @@ public class MutationTrackingService
             });
         }
 
+        void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
+        {
+            forEachShard(shard -> shard.collectDurablyReconciledOffsets(into));
+        }
+
         void forEachShard(Consumer<Shard> consumer)
         {
             for (Shard shard : shards.values())
@@ -979,8 +1000,15 @@ public class MutationTrackingService
 
         @Override
         public void run()
+        {
+            run(true);
+        }
+
+        private void run(boolean dropSegments)
         {
             MutationTrackingService.instance.forEachKeyspace(this::run);
+            if (dropSegments)
+                MutationTrackingService.instance.truncateMutationJournal();
         }
 
         private void run(KeyspaceShards shards)
@@ -1000,6 +1028,12 @@ public class MutationTrackingService
         offsetsPersister.run();
     }
 
+    @VisibleForTesting
+    public void persistLogStateForTesting(boolean dropSegments)
+    {
+        offsetsPersister.run(dropSegments);
+    }
+
     @VisibleForTesting
     public void broadcastOffsetsForTesting()
     {
diff --git a/src/java/org/apache/cassandra/replication/Offsets.java 
b/src/java/org/apache/cassandra/replication/Offsets.java
index dfb4f3f404..8724c02d4d 100644
--- a/src/java/org/apache/cassandra/replication/Offsets.java
+++ b/src/java/org/apache/cassandra/replication/Offsets.java
@@ -186,12 +186,29 @@ public abstract class Offsets implements 
Iterable<ShortMutationId>
     {
         if (size == 0)
             return false;
-
         int pos = Arrays.binarySearch(bounds, 0, size, offset);
         if (pos >= 0) return true; // matches one of the bounds
+        return (-pos - 1) % 2 != 0;
+    }
+
+    /**
+     * @return whether the entire [minOffset, maxOffset] range is contained in 
these Offsets
+     */
+    public boolean containsRange(int minOffset, int maxOffset)
+    {
+        if (size == 0)
+            return false;
+
+        // find the range the min offset falls under
+        int pos = Arrays.binarySearch(bounds, 0, size, minOffset);
+        if (pos < 0 && (-pos - 1) % 2 == 0) // min offset is outside any 
existing range
+            return false;
+
+        int end = pos >= 0
+                ? (pos % 2 == 0 ? pos + 1 : pos)
+                : -pos - 1;
 
-        pos = -pos - 1;
-        return (pos - 1) % 2 == 0; // offset falls within bounds of an 
existing range if the bound to the left is an open one
+        return maxOffset <= bounds[end];
     }
 
     public void digest(Digest digest)
diff --git a/src/java/org/apache/cassandra/replication/Shard.java 
b/src/java/org/apache/cassandra/replication/Shard.java
index 0717c22817..7274127fcb 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -276,6 +276,11 @@ public class Shard
         return new BroadcastLogOffsets(keyspace, range, offsets, durable);
     }
 
+    void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
+    {
+        logs.values().forEach(log -> 
log.collectDurablyReconciledOffsets(into));
+    }
+
     private CoordinatorLog getOrCreate(Mutation mutation)
     {
         return getOrCreate(mutation.id());
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index aa568104a2..8669586642 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -78,7 +78,6 @@ import org.apache.cassandra.journal.Params;
 import org.apache.cassandra.journal.RecordPointer;
 import org.apache.cassandra.journal.SegmentCompactor;
 import org.apache.cassandra.journal.StaticSegment;
-import org.apache.cassandra.journal.ValueSerializer;
 import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
 import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator;
 import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport;
@@ -134,23 +133,10 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
     public AccordJournal(Params params, File directory, ColumnFamilyStore cfs)
     {
         Version userVersion = Version.fromVersion(params.userVersion());
-        this.journal = new Journal<>("AccordJournal", directory, params, 
JournalKey.SUPPORT,
-                                     // In Accord, we are using streaming 
serialization, i.e. Reader/Writer interfaces instead of materializing objects
-                                     new ValueSerializer<>()
-                                     {
-                                         @Override
-                                         public void serialize(JournalKey key, 
Object value, DataOutputPlus out, int userVersion)
-                                         {
-                                             throw new 
UnsupportedOperationException();
-                                         }
-
-                                         @Override
-                                         public Object deserialize(JournalKey 
key, DataInputPlus in, int userVersion)
-                                         {
-                                             throw new 
UnsupportedOperationException();
-                                         }
-                                     },
-                                     compactor(cfs, userVersion));
+        this.journal =
+            Journal.builder("AccordJournal", directory, params, 
JournalKey.SUPPORT)
+                   .segmentCompactor(compactor(cfs, userVersion))
+                   .build();
         this.journalTable = new AccordJournalTable<>(journal, 
JournalKey.SUPPORT, cfs, userVersion);
         this.params = params;
     }
diff --git a/src/java/org/apache/cassandra/service/accord/JournalKey.java 
b/src/java/org/apache/cassandra/service/accord/JournalKey.java
index 9bd42ebaf1..4bb3bf19e2 100644
--- a/src/java/org/apache/cassandra/service/accord/JournalKey.java
+++ b/src/java/org/apache/cassandra/service/accord/JournalKey.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.service.accord;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
-import java.util.zip.Checksum;
+import java.util.zip.CRC32;
 
 import accord.local.Node.Id;
 import accord.primitives.Timestamp;
@@ -177,7 +177,7 @@ public final class JournalKey
         }
 
         @Override
-        public void updateChecksum(Checksum crc, JournalKey key, int 
userVersion)
+        public void updateChecksum(CRC32 crc, JournalKey key, int userVersion)
         {
             byte[] out = AccordJournal.keyCRCBytes.get();
             serialize(key, out);
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
index 1c57781c00..13e1e69d10 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
@@ -263,19 +263,19 @@ public class ReadReconciliations implements 
ExpiredStatePurger.Expireable
 
         /**
          * Remote summaries minus data node summary offsets
-         * 
+         * <p>
          * This calculation combines BOTH reconciled and unreconciled 
mutations reported by other nodes, and
          * then subtracts mutations reported locally for correctness
-         *
+         * <p>
          * If we subtracted reconciled ids from the unreconciled ids, we could 
violate read monotonicity in this scenario:
          * 1. Read starts locally and doesn't see mutation M.
          * 2. During reconciliation, mutation M arrives and is marked 
reconciled, other replicas report mutation M as reconciled
          * 3. If we filtered out reconciled mutations, this read wouldn't 
augment with M
          * 4. A concurrent read could see M in its initial data
          * 5. This read returns without M
-         * 
+         * <p>
          * Instead, we include all mutations and rely on token range filtering 
during actual mutation
-         * retrieval (in PartialTrackedRead.augment) to ensure we only augment 
with mutations 
+         * retrieval (in PartialTrackedRead.augment()) to ensure we only 
augment with mutations
          * relevant to this read's range/key
          */
         private Log2OffsetsMap.Mutable augmentingOffsets()
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
index 066ebc22e0..fa6c3193b3 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
@@ -25,7 +25,6 @@ import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.locator.ReplicaPlans;
 import org.apache.cassandra.service.reads.ReadCoordinator;
@@ -36,16 +35,14 @@ import org.apache.cassandra.replication.MutationSummary;
 import org.apache.cassandra.replication.ShortMutationId;
 import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.jctools.maps.NonBlockingHashMap;
 
-import org.apache.cassandra.transport.Dispatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.function.Consumer;
-
 /**
  * Since the read reconciliations don't use 2 way callbacks, maps of active 
reads and reconciliations
  * are maintained and expired here.
@@ -69,8 +66,7 @@ public class TrackedLocalReads implements 
ExpiredStatePurger.Expireable
         ReadCommand command,
         ConsistencyLevel consistencyLevel,
         int[] summaryNodes,
-        Dispatcher.RequestTime requestTime,
-        Consumer<PartialTrackedRead> partialReadConsumer)
+        Dispatcher.RequestTime requestTime)
     {
         Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
         ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().id);
@@ -100,7 +96,7 @@ public class TrackedLocalReads implements 
ExpiredStatePurger.Expireable
         }
         // TODO: confirm all summaryNodes are present in the replica plan
         AsyncPromise<TrackedDataResponse> promise = new AsyncPromise<>();
-        beginReadInternal(readId, command, replicaPlan, summaryNodes, 
requestTime, partialReadConsumer, promise);
+        beginReadInternal(readId, command, replicaPlan, summaryNodes, 
requestTime, promise);
         return promise;
     }
 
@@ -111,7 +107,6 @@ public class TrackedLocalReads implements 
ExpiredStatePurger.Expireable
         ReplicaPlan.AbstractForRead<?, ?> replicaPlan,
         int[] summaryNodes,
         Dispatcher.RequestTime requestTime,
-        Consumer<PartialTrackedRead> partialReadConsumer,
         AsyncPromise<TrackedDataResponse> promise)
     {
         PartialTrackedRead read = null;
@@ -137,7 +132,7 @@ public class TrackedLocalReads implements 
ExpiredStatePurger.Expireable
         }
 
         Coordinator coordinator =
-                new Coordinator(readId, promise, command.columnFilter(), read, 
replicaPlan.consistencyLevel(), requestTime);
+                new Coordinator(readId, promise, read, 
replicaPlan.consistencyLevel(), requestTime);
         coordinators.put(readId, coordinator);
 
         // TODO (expected): reconsider the approach to tracked mutation metrics
@@ -187,7 +182,6 @@ public class TrackedLocalReads implements 
ExpiredStatePurger.Expireable
     {
         private final TrackedRead.Id readId;
         private final AsyncPromise<TrackedDataResponse> promise;
-        private final ColumnFilter selection;
         private final PartialTrackedRead read;
         private final ConsistencyLevel consistencyLevel;
         private final Dispatcher.RequestTime requestTime;
@@ -195,14 +189,12 @@ public class TrackedLocalReads implements 
ExpiredStatePurger.Expireable
         Coordinator(
                 TrackedRead.Id readId,
                 AsyncPromise<TrackedDataResponse> promise,
-                ColumnFilter selection,
                 PartialTrackedRead read,
                 ConsistencyLevel consistencyLevel,
                 Dispatcher.RequestTime requestTime)
         {
             this.readId = readId;
             this.promise = promise;
-            this.selection = selection;
             this.read = Preconditions.checkNotNull(read);
             this.consistencyLevel = consistencyLevel;
             this.requestTime = requestTime;
@@ -246,7 +238,6 @@ public class TrackedLocalReads implements 
ExpiredStatePurger.Expireable
 
                 if (followUp != null)
                 {
-                    ReadCommand command = read.command();
                     followUp.addCallback((newResponse, error) -> {
                         if (error != null)
                         {
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
index 267670a63e..975c2742fc 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.net.*;
-import org.apache.cassandra.replication.MutationSummary;
 import org.apache.cassandra.replication.MutationTrackingService;
 import org.apache.cassandra.service.reads.ReadCoordinator;
 import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
@@ -280,7 +279,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P 
extends ReplicaPlan.
             logger.trace("Locally coordinating {}", readId);
             Stage.READ.submit(() -> {
                 AsyncPromise<TrackedDataResponse> promise =
-                    
MutationTrackingService.instance.localReads().beginRead(readId, 
ClusterMetadata.current(), command, consistencyLevel, summaryNodes, 
requestTime, partialReadConsumer);
+                    
MutationTrackingService.instance.localReads().beginRead(readId, 
ClusterMetadata.current(), command, consistencyLevel, summaryNodes, 
requestTime);
                 promise.addCallback((response, error) -> {
                     if (error != null)
                     {
@@ -451,7 +450,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P 
extends ReplicaPlan.
             AsyncPromise<TrackedDataResponse> promise =
                 MutationTrackingService.instance
                                        .localReads()
-                                       .beginRead(readId, metadata, command, 
consistencyLevel, summaryNodes, requestTime, null);
+                                       .beginRead(readId, metadata, command, 
consistencyLevel, summaryNodes, requestTime);
             promise.addCallback((response, error) -> {
                 if (error != null)
                 {
@@ -516,11 +515,6 @@ public abstract class TrackedRead<E extends Endpoints<E>, 
P extends ReplicaPlan.
             ReadReconciliations.instance.handleSummaryRequest((SummaryRequest) 
message.payload);
         }
 
-        public TrackedSummaryResponse makeResponse(MutationSummary summary)
-        {
-            return new TrackedSummaryResponse(readId, summary, dataNode, 
summaryNodes);
-        }
-
         public static final IVersionedSerializer<SummaryRequest> serializer = 
new IVersionedSerializer<>()
         {
             @Override
diff --git a/src/java/org/apache/cassandra/utils/Crc.java 
b/src/java/org/apache/cassandra/utils/Crc.java
index f1a31584f3..63b0055f2b 100644
--- a/src/java/org/apache/cassandra/utils/Crc.java
+++ b/src/java/org/apache/cassandra/utils/Crc.java
@@ -83,6 +83,26 @@ public class Crc
         buffer.position(savePosition);
     }
 
+    public static void updateWithInt(CRC32 crc, int v)
+    {
+        crc.update((v >>> 24) & 0xFF);
+        crc.update((v >>> 16) & 0xFF);
+        crc.update((v >>> 8) & 0xFF);
+        crc.update((v >>> 0) & 0xFF);
+    }
+
+    public static void updateWithLong(CRC32 crc, long v)
+    {
+        updateWithInt(crc, (int) (v >>> 32));
+        updateWithInt(crc, (int) (v & 0xFFFFFFFFL));
+    }
+
+    public static void validate(CRC32 crc, int expectedCRC) throws InvalidCrc
+    {
+        if (expectedCRC != (int) crc.getValue())
+            throw new InvalidCrc(expectedCRC, (int) crc.getValue());
+    }
+
     private static final int CRC24_INIT = 0x875060;
     /**
      * Polynomial chosen from 
https://users.ece.cmu.edu/~koopman/crc/index.html, by Philip Koopman
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index a4493f30a2..3a3a96285d 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -1184,6 +1184,7 @@ public class FBUtilities
         checksum.update((v >>> 0) & 0xFF);
     }
 
+    // TODO: migrate users to Crc class
     public static void updateChecksumInt(Checksum checksum, int v)
     {
         checksum.update((v >>> 24) & 0xFF);
@@ -1192,6 +1193,7 @@ public class FBUtilities
         checksum.update((v >>> 0) & 0xFF);
     }
 
+    // TODO: migrate users to Crc class
     public static void updateChecksumLong(Checksum checksum, long v)
     {
         updateChecksumInt(checksum, (int) (v >>> 32));
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingLogPersisterTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingLogPersisterTest.java
new file mode 100644
index 0000000000..5d0785454e
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingLogPersisterTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tracking;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.log.FuzzTestBase;
+import org.apache.cassandra.harry.SchemaSpec;
+import org.apache.cassandra.harry.dsl.HistoryBuilder;
+import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
+import org.apache.cassandra.harry.execution.InJvmDTestVisitExecutor;
+import org.apache.cassandra.harry.gen.Generator;
+import org.apache.cassandra.harry.gen.SchemaGenerators;
+import org.apache.cassandra.replication.MutationJournal;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.cassandra.harry.checker.TestHelper.withRandom;
+
+public class MutationTrackingLogPersisterTest extends FuzzTestBase
+{
+    private static final int POPULATION = 1000;
+
+    @Test
+    public void testLogPersisterClearsStaticSegments() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(3)
+                                        .withConfig(cfg -> 
cfg.with(Feature.NETWORK)
+                                                              
.with(Feature.GOSSIP)
+                                                              
.set("mutation_tracking_enabled", "true"))
+                                        .start())
+        {
+            int tables = 3;
+            int writesPerKey = 10;
+            int pks = 100;
+
+            withRandom(rng -> {
+                cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3} " +
+                                                   "AND 
replication_type='tracked'",
+                                                   KEYSPACE));
+
+                List<HistoryBuilder> builders = new ArrayList<>();
+                for (int i = 0; i < tables; i++)
+                {
+                    Generator<SchemaSpec> schemaGen = 
SchemaGenerators.trivialSchema(KEYSPACE, () -> "log_persister_test_" + 
(builders.size() + 1), POPULATION,
+                                                                               
      SchemaSpec.optionsBuilder());
+
+                    SchemaSpec schema = schemaGen.generate(rng);
+                    cluster.schemaChange(schema.compile());
+                    builders.add(new 
ReplayingHistoryBuilder(schema.valueGenerators,
+                                                             hb -> 
InJvmDTestVisitExecutor.builder()
+                                                                               
           .consistencyLevel(ConsistencyLevel.QUORUM)
+                                                                               
           .build(schema, hb, cluster)));
+                }
+
+                int counter = 0;
+                for (int pk = 0; pk < pks; pk++)
+                {
+                    for (HistoryBuilder history : builders)
+                        for (int i = 0; i < writesPerKey; i++)
+                            history.insert(pk);
+
+                    if (++counter % 10 == 0)
+                        cluster.get(1).runOnInstance(() -> 
MutationJournal.instance.closeCurrentSegmentForTestingIfNonEmpty());
+                }
+
+                cluster.forEach(i -> i.nodetoolResult("flush", 
KEYSPACE).asserts().success());
+                cluster.forEach(i -> i.runOnInstance(() -> 
MutationTrackingService.instance.persistLogStateForTesting()));
+                cluster.forEach(i -> i.runOnInstance(() -> 
MutationTrackingService.instance.broadcastOffsetsForTesting()));
+                cluster.forEach(i -> i.runOnInstance(() -> 
MutationTrackingService.instance.persistLogStateForTesting()));
+
+                cluster.forEach(i -> i.runOnInstance(() -> {
+                    int staticSegments = 
MutationJournal.instance.countStaticSegmentsForTesting();
+                    Assert.assertEquals("Expected no static segments after log 
persister runs", 0, staticSegments);
+                }));
+            });
+        }
+    }
+}
+
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
 
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
index 36040e115c..3ff5f8e8b0 100644
--- 
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.zip.Checksum;
+import java.util.zip.CRC32;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.jimfs.Jimfs;
@@ -47,7 +47,6 @@ import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.journal.Journal;
 import org.apache.cassandra.journal.KeySupport;
 import org.apache.cassandra.journal.RecordPointer;
-import org.apache.cassandra.journal.SegmentCompactor;
 import org.apache.cassandra.journal.ValueSerializer;
 import org.apache.cassandra.utils.Isolated;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
@@ -75,14 +74,12 @@ public class AccordJournalSimulationTest extends 
SimulationTestBase
                      AccordSpec.JournalSpec spec = new 
AccordSpec.JournalSpec();
                      spec.flushPeriod = new DurationSpec.IntSecondsBound(1);
 
-                     State.journal = new Journal<>("AccordJournal",
-                                                   new File("/journal"),
-                                                   spec,
-                                                   new IdentityKeySerializer(),
-                                                   new 
IdentityValueSerializer(),
-                                                   SegmentCompactor.noop());
+                     State.journal =
+                         Journal.<String, String>builder("AccordJournal", new 
File("/journal"), spec, new IdentityKeySerializer())
+                                .valueSerializer(new IdentityValueSerializer())
+                                .build();
                  }),
-                 () -> check());
+                 AccordJournalSimulationTest::check);
     }
 
     public static void check()
@@ -237,7 +234,7 @@ public class AccordJournalSimulationTest extends 
SimulationTestBase
         }
 
         @Override
-        public void updateChecksum(Checksum crc, String key, int userVersion)
+        public void updateChecksum(CRC32 crc, String key, int userVersion)
         {
             crc.update(key.getBytes());
         }
diff --git a/test/unit/org/apache/cassandra/journal/JournalTest.java 
b/test/unit/org/apache/cassandra/journal/JournalTest.java
index 90b394518c..7cdb7edabb 100644
--- a/test/unit/org/apache/cassandra/journal/JournalTest.java
+++ b/test/unit/org/apache/cassandra/journal/JournalTest.java
@@ -52,7 +52,9 @@ public class JournalTest
         directory.deleteRecursiveOnExit();
 
         Journal<TimeUUID, Long> journal =
-            new Journal<>("TestJournal", directory, TestParams.ACCORD, 
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop());
+            Journal.<TimeUUID, Long>builder("TestJournal", directory, 
TestParams.ACCORD, TimeUUIDKeySupport.INSTANCE)
+                   .valueSerializer(LongSerializer.INSTANCE)
+                   .build();
 
         journal.start();
 
@@ -73,7 +75,10 @@ public class JournalTest
 
         journal.shutdown();
 
-        journal = new Journal<>("TestJournal", directory, TestParams.ACCORD, 
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop());
+        journal =
+            Journal.<TimeUUID, Long>builder("TestJournal", directory, 
TestParams.ACCORD, TimeUUIDKeySupport.INSTANCE)
+                   .valueSerializer(LongSerializer.INSTANCE)
+                   .build();
         journal.start();
 
         assertEquals(1L, (long) journal.readLast(id1));
@@ -91,7 +96,9 @@ public class JournalTest
         directory.deleteRecursiveOnExit();
 
         Journal<TimeUUID, Long> journal =
-            new Journal<>("TestJournalReadAll", directory, TestParams.ACCORD, 
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop());
+            Journal.<TimeUUID, Long>builder("TestJournalReadAll", directory, 
TestParams.ACCORD, TimeUUIDKeySupport.INSTANCE)
+                   .valueSerializer(LongSerializer.INSTANCE)
+                   .build();
 
         journal.start();
 
@@ -127,11 +134,6 @@ public class JournalTest
     {
         static final LongSerializer INSTANCE = new LongSerializer();
 
-        public int serializedSize(TimeUUID key, Long value, int userVersion)
-        {
-            return Long.BYTES;
-        }
-
         public void serialize(TimeUUID key, Long value, DataOutputPlus out, 
int userVersion) throws IOException
         {
             out.writeLong(value);
diff --git a/test/unit/org/apache/cassandra/journal/SegmentTest.java 
b/test/unit/org/apache/cassandra/journal/SegmentTest.java
index 55c54870d8..05a0f43f54 100644
--- a/test/unit/org/apache/cassandra/journal/SegmentTest.java
+++ b/test/unit/org/apache/cassandra/journal/SegmentTest.java
@@ -54,7 +54,8 @@ public class SegmentTest
 
         Descriptor descriptor = Descriptor.create(directory, 
System.currentTimeMillis(), 1);
 
-        ActiveSegment<TimeUUID, ByteBuffer> segment = 
ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE);
+        ActiveSegment<TimeUUID, ByteBuffer> segment =
+            ActiveSegment.create(descriptor, params(), 
TimeUUIDKeySupport.INSTANCE, KeyStats.Factory.noop());
 
         segment.allocate(record1.remaining()).write(id1, record1);
         segment.allocate(record2.remaining()).write(id2, record2);
@@ -101,7 +102,8 @@ public class SegmentTest
 
         Descriptor descriptor = Descriptor.create(directory, 
System.currentTimeMillis(), 1);
 
-        ActiveSegment<TimeUUID, ByteBuffer> activeSegment = 
ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE);
+        ActiveSegment<TimeUUID, ByteBuffer> activeSegment =
+            ActiveSegment.create(descriptor, params(), 
TimeUUIDKeySupport.INSTANCE, KeyStats.Factory.noop());
 
         activeSegment.allocate(record1.remaining()).write(id1, record1);
         activeSegment.allocate(record2.remaining()).write(id2, record2);
@@ -110,7 +112,8 @@ public class SegmentTest
 
         activeSegment.close(null);
 
-        StaticSegment<TimeUUID, ByteBuffer> staticSegment = 
StaticSegment.open(descriptor, TimeUUIDKeySupport.INSTANCE);
+        StaticSegment<TimeUUID, ByteBuffer> staticSegment =
+            StaticSegment.open(descriptor, TimeUUIDKeySupport.INSTANCE, 
KeyStats.Factory.noop());
 
         // read all 4 entries by id and compare with originals
         EntrySerializer.EntryHolder<TimeUUID> holder = new 
EntrySerializer.EntryHolder<>();
@@ -150,7 +153,8 @@ public class SegmentTest
 
         Descriptor descriptor = Descriptor.create(directory, 
System.currentTimeMillis(), 1);
 
-        ActiveSegment<TimeUUID, ByteBuffer> activeSegment = 
ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE);
+        ActiveSegment<TimeUUID, ByteBuffer> activeSegment =
+            ActiveSegment.create(descriptor, params(), 
TimeUUIDKeySupport.INSTANCE, KeyStats.Factory.noop());
 
         activeSegment.allocate(record1.remaining()).write(id1, record1);
         activeSegment.allocate(record2.remaining()).write(id2, record2);
diff --git a/test/unit/org/apache/cassandra/journal/SegmentsTest.java 
b/test/unit/org/apache/cassandra/journal/SegmentsTest.java
index ecacb5e197..2e029f5edd 100644
--- a/test/unit/org/apache/cassandra/journal/SegmentsTest.java
+++ b/test/unit/org/apache/cassandra/journal/SegmentsTest.java
@@ -35,7 +35,7 @@ public class SegmentsTest
     @Test
     public void testSelect()
     {
-        withRandom(0l, rng -> {
+        withRandom(0L, rng -> {
             // Create mock segments with different timestamps
             java.io.File file = File.createTempFile("segments", "test");
             List<Segment<String, String>> segmentList = new ArrayList<>();
@@ -108,6 +108,13 @@ public class SegmentsTest
         }
 
         @Override Index<K> index() { throw new 
UnsupportedOperationException(); }
+
+        @Override
+        public KeyStats<K> keyStats()
+        {
+            return KeyStats.noop();
+        }
+
         @Override boolean isFlushed(long position) { throw new 
UnsupportedOperationException(); }
         @Override public void persistMetadata() { throw new 
UnsupportedOperationException(); }
         @Override boolean read(int offset, int size, 
EntrySerializer.EntryHolder<K> into)  { throw new 
UnsupportedOperationException(); }
diff --git a/test/unit/org/apache/cassandra/journal/TimeUUIDKeySupport.java 
b/test/unit/org/apache/cassandra/journal/TimeUUIDKeySupport.java
index 5694a29e7a..c75197511e 100644
--- a/test/unit/org/apache/cassandra/journal/TimeUUIDKeySupport.java
+++ b/test/unit/org/apache/cassandra/journal/TimeUUIDKeySupport.java
@@ -19,14 +19,13 @@ package org.apache.cassandra.journal;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.zip.Checksum;
+import java.util.zip.CRC32;
 
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.Crc;
 import org.apache.cassandra.utils.TimeUUID;
 
-import static org.apache.cassandra.utils.FBUtilities.updateChecksumLong;
-
 class TimeUUIDKeySupport implements KeySupport<TimeUUID>
 {
     static final TimeUUIDKeySupport INSTANCE = new TimeUUIDKeySupport();
@@ -76,10 +75,10 @@ class TimeUUIDKeySupport implements KeySupport<TimeUUID>
     }
 
     @Override
-    public void updateChecksum(Checksum crc, TimeUUID key, int userVersion)
+    public void updateChecksum(CRC32 crc, TimeUUID key, int userVersion)
     {
-        updateChecksumLong(crc, key.uuidTimestamp());
-        updateChecksumLong(crc, key.lsb());
+        Crc.updateWithLong(crc, key.uuidTimestamp());
+        Crc.updateWithLong(crc, key.lsb());
     }
 
     @Override
diff --git 
a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java 
b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
index ba741799aa..72017bc8a1 100644
--- a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
+++ b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
@@ -33,13 +33,19 @@ import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.Descriptor;
 import org.apache.cassandra.journal.TestParams;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.replication.MutationJournal.ActiveOffsetRanges;
+import org.apache.cassandra.replication.MutationJournal.StaticOffsetRanges;
+import org.apache.cassandra.replication.MutationJournal.OffsetRangesFactory;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests to sanity-check the integration points with Journal
@@ -51,6 +57,7 @@ public class MutationJournalTest
     private static final String TABLE = "mjtt";
 
     private static MutationJournal journal;
+    private static File directory;
 
     @BeforeClass
     public static void setUp() throws IOException
@@ -63,7 +70,7 @@ public class MutationJournalTest
                                                  .addRegularColumn("value", 
UTF8Type.instance)
                                                  .build());
 
-        File directory = new 
File(Files.createTempDirectory("mutation-journal-test-simple"));
+        directory = new 
File(Files.createTempDirectory("mutation-journal-test-simple"));
         directory.deleteRecursiveOnExit();
 
         journal = new MutationJournal(directory, TestParams.MUTATION_JOURNAL);
@@ -79,13 +86,8 @@ public class MutationJournalTest
     @Test
     public void testWriteOneReadOne()
     {
-        Mutation expected =
-            new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, 
TABLE), 0, "key")
-               .clustering("ck")
-               .add("value", "value")
-               .build();
-
-        ShortMutationId id = new ShortMutationId(100L, 0);
+        ShortMutationId id = id(100L, 0);
+        Mutation expected = mutation("key", "ck", "value");
         journal.write(id, expected);
 
         // regular read
@@ -103,22 +105,14 @@ public class MutationJournalTest
     @Test
     public void testWriteManyReadMany()
     {
-        Mutation expected1 =
-            new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, 
TABLE), 0, "key1")
-               .clustering("ck1")
-               .add("value", "value1")
-               .build();
-        Mutation expected2 =
-            new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, 
TABLE), 0, "key2")
-               .clustering("ck2")
-               .add("value", "value2")
-               .build();
-        List<Mutation> expected = List.of(expected1, expected2);
-
-        ShortMutationId id1 = new ShortMutationId(100L, 1);
-        ShortMutationId id2 = new ShortMutationId(100L, 2);
+        ShortMutationId id1 = id(100L, 1);
+        ShortMutationId id2 = id(100L, 2);
         List<ShortMutationId> ids = List.of(id1, id2);
 
+        Mutation expected1 = mutation("key1", "ck1", "value1");
+        Mutation expected2 = mutation("key2", "ck2", "value2");
+        List<Mutation> expected = List.of(expected1, expected2);
+
         journal.write(id1, expected1);
         journal.write(id2, expected2);
 
@@ -127,6 +121,149 @@ public class MutationJournalTest
         assertMutationsEqual(expected, actual);
     }
 
+    @Test
+    public void testActiveOffsetRanges()
+    {
+        {
+            ActiveOffsetRanges ranges = new ActiveOffsetRanges();
+            assertFalse(ranges.mayContain(id(0, 0)));
+        }
+
+        {
+            ActiveOffsetRanges ranges = new ActiveOffsetRanges();
+            ranges.update(id(0, 1));
+            ranges.update(id(0, 9));
+            assertFalse(ranges.mayContain(id(0, 0)));
+            assertFalse(ranges.mayContain(id(0, 10)));
+            for (int i = 1; i < 10; i++)
+                assertTrue(ranges.mayContain(id(0, i)));
+        }
+    }
+
+    @Test
+    public void testStaticOffsetRanges()
+    {
+        Descriptor descriptor = Descriptor.create(directory, 0, 1);
+
+        ActiveOffsetRanges active = new ActiveOffsetRanges();
+        for (int l = 1; l < 11; l++)
+        {
+            for (int o = 5; o > 0 ; o--) active.update(id(l, o));
+            for (int o = 6; o < 11; o++) active.update(id(l, o));
+        }
+
+        active.persist(descriptor);
+        StaticOffsetRanges loaded = 
OffsetRangesFactory.INSTANCE.load(descriptor);
+        assertEquals(active.asMap(), loaded.asMap());
+
+        // absent log ids
+        for (int o = 0; o < 11; o++)
+        {
+            assertFalse(active.mayContain(id(0, o)));
+            assertFalse(loaded.mayContain(id(0, o)));
+            assertFalse(active.mayContain(id(11, o)));
+            assertFalse(loaded.mayContain(id(11, o)));
+        }
+
+        // present log ids
+        for (int l = 1; l < 11; l++)
+        {
+            assertFalse(active.mayContain(id(l, 0)));
+            assertFalse(loaded.mayContain(id(l, 0)));
+            assertFalse(active.mayContain(id(l, 11)));
+            assertFalse(loaded.mayContain(id(l, 11)));
+            for (int o = 1; o < 11; o++)
+            {
+                assertTrue(active.mayContain(id(l, o)));
+                assertTrue(loaded.mayContain(id(l, o)));
+            }
+        }
+    }
+
+    @Test
+    public void testDropReconcileSegments()
+    {
+        ShortMutationId id1 = id(100L, 0);
+        ShortMutationId id2 = id(100L, 1);
+        ShortMutationId id3 = id(200L, 2);
+        ShortMutationId id4 = id(200L, 3);
+
+        Mutation mutation1 = mutation("key1", "ck1", "value1");
+        Mutation mutation2 = mutation("key2", "ck2", "value2");
+        Mutation mutation3 = mutation("key3", "ck3", "value3");
+        Mutation mutation4 = mutation("key4", "ck4", "value4");
+
+        // write two mutations to the first segment and flush it to make static
+        journal.write(id1, mutation1);
+        journal.write(id2, mutation2);
+        journal.closeCurrentSegmentForTestingIfNonEmpty();
+
+        // write two mutations to the second segment and flush it to make 
static
+        journal.write(id3, mutation3);
+        journal.write(id4, mutation4);
+        journal.closeCurrentSegmentForTestingIfNonEmpty();
+
+        {
+            // call dropReconciledSegments() with a log2offsets map that 
covers both segments fully
+            // *BUT* with the segments still marked as needing replay nothing 
should be dropped
+            Log2OffsetsMap.Immutable.Builder builder = new 
Log2OffsetsMap.Immutable.Builder();
+            builder.add(id1);
+            builder.add(id2);
+            builder.add(id3);
+            builder.add(id4);
+            assertEquals(0, journal.dropReconciledSegments(builder.build()));
+            // confirm that no static segments have been dropped
+            assertEquals(2, journal.countStaticSegmentsForTesting());
+        }
+
+        // mark both segments as not needing replay
+        journal.clearNeedsReplayForTesting();
+
+        {
+            // call dropReconciledSegments() with a log2offsets map that 
doesn't cover any segments fully
+            Log2OffsetsMap.Immutable.Builder builder = new 
Log2OffsetsMap.Immutable.Builder();
+            builder.add(id1);
+            assertEquals(0, journal.dropReconciledSegments(builder.build()));
+            // confirm that no static segments got dropped
+            assertEquals(2, journal.countStaticSegmentsForTesting());
+        }
+
+        {
+            // call dropReconciledSegments() with a log2offsets map that 
covers only the first segment fully
+            Log2OffsetsMap.Immutable.Builder builder = new 
Log2OffsetsMap.Immutable.Builder();
+            builder.add(id1);
+            builder.add(id2);
+            assertEquals(1, journal.dropReconciledSegments(builder.build()));
+            // confirm that only one static segment got dropped
+            assertEquals(1, journal.countStaticSegmentsForTesting());
+        }
+
+        {
+            // call dropReconciledSegments() with a log2offsets map that 
covers both segments fully
+            Log2OffsetsMap.Immutable.Builder builder = new 
Log2OffsetsMap.Immutable.Builder();
+            builder.add(id1);
+            builder.add(id2);
+            builder.add(id3);
+            builder.add(id4);
+            assertEquals(1, journal.dropReconciledSegments(builder.build()));
+            // confirm that all static segments have now been dropped
+            assertEquals(0, journal.countStaticSegmentsForTesting());
+        }
+    }
+
+    private ShortMutationId id(long logId, int offset)
+    {
+        return new ShortMutationId(logId, offset);
+    }
+
+    private Mutation mutation(String pk, String ck, String column)
+    {
+        return new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, 
TABLE), 0, pk)
+               .clustering(ck)
+               .add("value", column)
+               .build();
+    }
+
     public static void assertMutationEquals(Mutation expected, Mutation actual)
     {
         if (!serialize(expected).equals(serialize(actual)))
diff --git a/test/unit/org/apache/cassandra/replication/OffsetsTest.java 
b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
index b0d5edda0f..1b39ff000e 100644
--- a/test/unit/org/apache/cassandra/replication/OffsetsTest.java
+++ b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
@@ -861,9 +861,51 @@ public class OffsetsTest
         }
     }
 
+    @Test
     public void asListFromListRoundTripTest()
     {
         for (Offsets.Mutable offsets : new Offsets.Mutable[] { offsets(), 
offsets(1, 2), offsets(1, 3, 7, 9) })
             assertOffsetsEqual(offsets, Offsets.fromList(LOG_ID, 
offsets.asList()));
     }
+
+    @Test
+    public void testContainsRange()
+    {
+        {
+            Offsets.Mutable offsets = offsets();
+            assertFalse(offsets.containsRange(0, 1));
+        }
+
+        {
+            Offsets.Mutable offsets = offsets(2, 4);
+
+            assertTrue(offsets.containsRange(2, 4));
+            assertTrue(offsets.containsRange(3, 4));
+            assertTrue(offsets.containsRange(2, 3));
+
+            assertFalse(offsets.containsRange(1, 1));
+            assertFalse(offsets.containsRange(1, 2));
+            assertFalse(offsets.containsRange(1, 3));
+            assertFalse(offsets.containsRange(1, 4));
+
+            assertFalse(offsets.containsRange(2, 5));
+            assertFalse(offsets.containsRange(3, 5));
+            assertFalse(offsets.containsRange(4, 5));
+            assertFalse(offsets.containsRange(5, 5));
+        }
+
+        {
+            Offsets.Mutable offsets = offsets(2, 4, 6, 8);
+
+            assertTrue(offsets.containsRange(2, 4));
+            assertTrue(offsets.containsRange(6, 8));
+
+            assertFalse(offsets.containsRange(0, 2));
+            assertFalse(offsets.containsRange(3, 5));
+            assertFalse(offsets.containsRange(4, 6));
+            assertFalse(offsets.containsRange(5, 7));
+            assertFalse(offsets.containsRange(7, 9));
+            assertFalse(offsets.containsRange(9, 9));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to