This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit c831042910fce4b2ce1a92a0d86c2003d44b853f
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Thu Aug 22 18:48:03 2024 +0200

    Journal segment compaction
    
    Patch by Alex Petrov and Aleksey Yeschenko, reviewed by Aleksey Yeschenko 
and Alex Petrov for CASSANDRA-19876
---
 .../org/apache/cassandra/config/AccordSpec.java    |  14 ++
 .../apache/cassandra/journal/ActiveSegment.java    |   1 +
 .../org/apache/cassandra/journal/Compactor.java    | 102 +++++++++
 .../org/apache/cassandra/journal/Descriptor.java   |  13 +-
 .../apache/cassandra/journal/EntrySerializer.java  |  49 +++--
 .../apache/cassandra/journal/InMemoryIndex.java    |   4 +-
 src/java/org/apache/cassandra/journal/Index.java   |  13 +-
 src/java/org/apache/cassandra/journal/Journal.java | 164 +++++++--------
 .../org/apache/cassandra/journal/Metadata.java     |   2 +-
 .../org/apache/cassandra/journal/OnDiskIndex.java  |  63 +++---
 src/java/org/apache/cassandra/journal/Params.java  |   4 +
 .../apache/cassandra/journal/RecordConsumer.java   |   1 +
 src/java/org/apache/cassandra/journal/Segment.java |  11 +-
 .../{RecordConsumer.java => SegmentCompactor.java} |  18 +-
 .../org/apache/cassandra/journal/Segments.java     |  42 ++--
 .../apache/cassandra/journal/StaticSegment.java    | 204 +++++++++++++-----
 .../cassandra/service/accord/AccordJournal.java    |  29 +--
 .../service/accord/AccordJournalTable.java         | 227 +++++++++++++++++++++
 .../cassandra/service/accord/AccordKeyspace.java   |  18 +-
 .../service/accord/AccordSegmentCompactor.java     | 119 +++++++++++
 .../cassandra/service/accord/IAccordService.java   |   3 +
 .../cassandra/service/accord/JournalKey.java       |   6 +-
 .../cassandra/service/accord/SavedCommand.java     | 132 ++++++++++--
 ...Test.java => AccordJournalIntegrationTest.java} |   2 +-
 .../journal/AccordJournalCompactionTest.java       | 137 +++++++++++++
 .../test/AccordJournalSimulationTest.java          |   4 +-
 .../org/apache/cassandra/journal/IndexTest.java    |  12 +-
 .../org/apache/cassandra/journal/JournalTest.java  |  36 +++-
 .../org/apache/cassandra/journal/SegmentTest.java  |  10 +-
 .../org/apache/cassandra/journal/TestParams.java   |  12 ++
 .../cassandra/service/accord/AccordTestUtils.java  |   2 +-
 31 files changed, 1160 insertions(+), 294 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java 
b/src/java/org/apache/cassandra/config/AccordSpec.java
index b4d25d6689..102ae68b67 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -28,6 +28,8 @@ public class AccordSpec
 
     public volatile String journal_directory;
 
+    public volatile boolean enable_journal_compaction = true;
+
     public volatile OptionaldPositiveInt shard_count = 
OptionaldPositiveInt.UNDEFINED;
 
     public volatile DurationSpec.IntMillisecondsBound recover_delay = new 
DurationSpec.IntMillisecondsBound(1000);
@@ -101,6 +103,18 @@ public class AccordSpec
             return flushMode;
         }
 
+        @Override
+        public boolean enableCompaction()
+        {
+            return DatabaseDescriptor.getAccord().enable_journal_compaction;
+        }
+
+        @Override
+        public int compactionPeriodMillis()
+        {
+            return 60_000;
+        }
+
         @JsonIgnore
         @Override
         public int flushPeriodMillis()
diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java 
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index 1bee25a96f..1fd9905490 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -197,6 +197,7 @@ final class ActiveSegment<K, V> extends Segment<K, V>
         descriptor.fileFor(Component.SYNCED_OFFSETS).deleteIfExists();
     }
 
+    @Override
     void release()
     {
         selfRef.release();
diff --git a/src/java/org/apache/cassandra/journal/Compactor.java 
b/src/java/org/apache/cassandra/journal/Compactor.java
new file mode 100644
index 0000000000..846dd62ba8
--- /dev/null
+++ b/src/java/org/apache/cassandra/journal/Compactor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.journal;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
+import org.apache.cassandra.concurrent.Shutdownable;
+
+import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+
+final class Compactor<K, V> implements Runnable, Shutdownable
+{
+    private final Journal<K, V> journal;
+    private final SegmentCompactor<K, V> segmentCompactor;
+    private final ScheduledExecutorPlus executor;
+
+    Compactor(Journal<K, V> journal, SegmentCompactor<K, V> segmentCompactor)
+    {
+        this.executor = executorFactory().scheduled(false, journal.name + 
"-compactor");
+        this.journal = journal;
+        this.segmentCompactor = segmentCompactor;
+    }
+
+    void start()
+    {
+        if (journal.params.enableCompaction())
+        {
+            executor.scheduleWithFixedDelay(this,
+                                            
journal.params.compactionPeriodMillis(),
+                                            
journal.params.compactionPeriodMillis(),
+                                            TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public void run()
+    {
+        Set<StaticSegment<K, V>> toCompact = new HashSet<>();
+        journal.segments().selectStatic(toCompact);
+        if (toCompact.size() < 2)
+            return;
+
+        try
+        {
+            Collection<StaticSegment<K, V>> newSegments = 
segmentCompactor.compact(toCompact, journal.keySupport);
+            for (StaticSegment<K, V> segment : newSegments)
+                toCompact.remove(segment);
+
+            journal.replaceCompactedSegments(toCompact, newSegments);
+            for (StaticSegment<K, V> segment : toCompact)
+                segment.discard();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Could not compact segments: " + 
toCompact);
+        }
+    }
+
+    @Override
+    public boolean isTerminated()
+    {
+        return executor.isTerminated();
+    }
+
+    @Override
+    public void shutdown()
+    {
+        executor.shutdown();
+    }
+
+    @Override
+    public Object shutdownNow()
+    {
+        return executor.shutdownNow();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit units) throws 
InterruptedException
+    {
+        return executor.awaitTermination(timeout, units);
+    }
+}
diff --git a/src/java/org/apache/cassandra/journal/Descriptor.java 
b/src/java/org/apache/cassandra/journal/Descriptor.java
index 176a12e109..cea68c353e 100644
--- a/src/java/org/apache/cassandra/journal/Descriptor.java
+++ b/src/java/org/apache/cassandra/journal/Descriptor.java
@@ -66,20 +66,20 @@ public final class Descriptor implements 
Comparable<Descriptor>
     static final int CURRENT_JOURNAL_VERSION = JOURNAL_VERSION_1;
 
     final File directory;
-    final long timestamp;
-    final int generation;
+    public final long timestamp;
+    public final int generation;
 
     /**
      * Serialization version for journal components; bumped as journal
      * implementation evolves over time.
      */
-    final int journalVersion;
+    public final int journalVersion;
 
     /**
      * Serialization version for user content - specifically journal keys
      * and journal values; bumped when user logic evolves.
      */
-    final int userVersion;
+    public final int userVersion;
 
     Descriptor(File directory, long timestamp, int generation, int 
journalVersion, int userVersion)
     {
@@ -114,11 +114,6 @@ public final class Descriptor implements 
Comparable<Descriptor>
         return fromName(file.parent(), file.name());
     }
 
-    Descriptor withIncrementedGeneration()
-    {
-        return new Descriptor(directory, timestamp, generation + 1, 
journalVersion, userVersion);
-    }
-
     File fileFor(Component component)
     {
         return new File(directory, formatFileName(component));
diff --git a/src/java/org/apache/cassandra/journal/EntrySerializer.java 
b/src/java/org/apache/cassandra/journal/EntrySerializer.java
index a2a61cfce3..2a707e7d73 100644
--- a/src/java/org/apache/cassandra/journal/EntrySerializer.java
+++ b/src/java/org/apache/cassandra/journal/EntrySerializer.java
@@ -35,7 +35,7 @@ import static 
org.apache.cassandra.utils.FBUtilities.updateChecksum;
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 import static org.apache.cassandra.utils.FBUtilities.updateChecksumShort;
 
-final class EntrySerializer
+public final class EntrySerializer
 {
     static <K> void write(K key,
                           ByteBuffer record,
@@ -73,14 +73,14 @@ final class EntrySerializer
 
     static <K> void read(EntryHolder<K> into,
                          KeySupport<K> keySupport,
-                         ByteBuffer buffer,
+                         ByteBuffer from,
                          int userVersion)
     throws IOException
     {
         CRC32 crc = Crc.crc32();
         into.clear();
 
-        try (DataInputBuffer in = new DataInputBuffer(buffer, false))
+        try (DataInputBuffer in = new DataInputBuffer(from, false))
         {
             K key = keySupport.deserialize(in, userVersion);
             keySupport.updateChecksum(crc, key, userVersion);
@@ -101,9 +101,11 @@ final class EntrySerializer
                 into.hosts.add(hostId);
             }
 
+            // TODO: try to avoid allocating another buffer here
             ByteBuffer entry = ByteBufferUtil.read(in, entrySize);
             updateChecksum(crc, entry);
             into.value = entry;
+            into.userVersion = userVersion;
 
             validateCRC(crc, in.readInt());
         }
@@ -111,7 +113,7 @@ final class EntrySerializer
 
     static <K> boolean tryRead(EntryHolder<K> into,
                                KeySupport<K> keySupport,
-                               ByteBuffer buffer,
+                               ByteBuffer from,
                                DataInputBuffer in,
                                int syncedOffset,
                                int userVersion)
@@ -121,11 +123,11 @@ final class EntrySerializer
         into.clear();
 
         int fixedSize = EntrySerializer.fixedEntrySize(keySupport, 
userVersion);
-        if (buffer.remaining() < fixedSize)
-            return handleReadException(new EOFException(), buffer.limit(), 
syncedOffset);
+        if (from.remaining() < fixedSize)
+            return handleReadException(new EOFException(), from.limit(), 
syncedOffset);
 
-        updateChecksum(crc, buffer, buffer.position(), fixedSize - 
TypeSizes.INT_SIZE);
-        int fixedCrc = buffer.getInt(buffer.position() + fixedSize - 
TypeSizes.INT_SIZE);
+        updateChecksum(crc, from, from.position(), fixedSize - 
TypeSizes.INT_SIZE);
+        int fixedCrc = from.getInt(from.position() + fixedSize - 
TypeSizes.INT_SIZE);
 
         try
         {
@@ -133,7 +135,7 @@ final class EntrySerializer
         }
         catch (IOException e)
         {
-            return handleReadException(e, buffer.position() + fixedSize, 
syncedOffset);
+            return handleReadException(e, from.position() + fixedSize, 
syncedOffset);
         }
 
         int hostCount, recordSize;
@@ -150,11 +152,11 @@ final class EntrySerializer
         }
 
         int variableSize = EntrySerializer.variableEntrySize(hostCount, 
recordSize);
-        if (buffer.remaining() < variableSize)
-            return handleReadException(new EOFException(), buffer.limit(), 
syncedOffset);
+        if (from.remaining() < variableSize)
+            return handleReadException(new EOFException(), from.limit(), 
syncedOffset);
 
-        updateChecksum(crc, buffer, buffer.position(), variableSize - 
TypeSizes.INT_SIZE);
-        int variableCrc = buffer.getInt(buffer.position() + variableSize - 
TypeSizes.INT_SIZE);
+        updateChecksum(crc, from, from.position(), variableSize - 
TypeSizes.INT_SIZE);
+        int variableCrc = from.getInt(from.position() + variableSize - 
TypeSizes.INT_SIZE);
 
         try
         {
@@ -162,7 +164,7 @@ final class EntrySerializer
         }
         catch (IOException e)
         {
-            return handleReadException(e, buffer.position() + variableSize, 
syncedOffset);
+            return handleReadException(e, from.position() + variableSize, 
syncedOffset);
         }
 
         for (int i = 0; i < hostCount; i++)
@@ -179,9 +181,10 @@ final class EntrySerializer
             throw new AssertionError(); // can't happen
         }
 
-        into.value = buffer.duplicate()
-                           .position(buffer.position() - recordSize)
-                           .limit(buffer.position());
+        into.value = from.duplicate()
+                         .position(from.position() - recordSize)
+                         .limit(from.position());
+        into.userVersion = userVersion;
 
         in.skipBytesFully(TypeSizes.INT_SIZE);
         return true;
@@ -210,13 +213,15 @@ final class EntrySerializer
              + TypeSizes.INT_SIZE;            // CRC
     }
 
-    static final class EntryHolder<K>
+    public static final class EntryHolder<K>
     {
-        K key;
-        ByteBuffer value;
-        IntHashSet hosts = new IntHashSet();
+        public K key;
+        public ByteBuffer value;
+        public IntHashSet hosts = new IntHashSet();
 
-        void clear()
+        public int userVersion;
+
+        public void clear()
         {
             key = null;
             value = null;
diff --git a/src/java/org/apache/cassandra/journal/InMemoryIndex.java 
b/src/java/org/apache/cassandra/journal/InMemoryIndex.java
index 5417bfea40..2c71d8c4ff 100644
--- a/src/java/org/apache/cassandra/journal/InMemoryIndex.java
+++ b/src/java/org/apache/cassandra/journal/InMemoryIndex.java
@@ -131,14 +131,14 @@ final class InMemoryIndex<K> extends Index<K>
     {
         InMemoryIndex<K> index = new InMemoryIndex<>(keySupport, new 
TreeMap<>(keySupport));
 
-        try (StaticSegment.SequentialReader<K> reader = 
StaticSegment.reader(descriptor, keySupport, fsyncedLimit))
+        try (StaticSegment.SequentialReader<K> reader = 
StaticSegment.sequentialReader(descriptor, keySupport, fsyncedLimit))
         {
             int last = -1;
             while (reader.advance())
             {
                 int current = reader.offset();
                 if (last >= 0)
-                    index.update(reader.id(), last, current);
+                    index.update(reader.key(), last, current);
                 last = current;
             }
 
diff --git a/src/java/org/apache/cassandra/journal/Index.java 
b/src/java/org/apache/cassandra/journal/Index.java
index f42a42d5ed..bf6ab5d0c1 100644
--- a/src/java/org/apache/cassandra/journal/Index.java
+++ b/src/java/org/apache/cassandra/journal/Index.java
@@ -22,6 +22,7 @@ import javax.annotation.Nullable;
 import org.apache.cassandra.utils.Closeable;
 
 import static com.google.common.collect.Iterables.any;
+
 /**
  * Mapping of client supplied ids to in-segment offsets
  */
@@ -85,15 +86,6 @@ abstract class Index<K> implements Closeable
         return any(ids, this::mayContainId);
     }
 
-    interface IndexIterator<K>
-    {
-        boolean hasNext();
-        K currentKey();
-        int currentOffset();
-        int currentSize();
-        void next();
-    }
-
     /**
      * Helper methods
      */
@@ -118,7 +110,7 @@ abstract class Index<K> implements Closeable
     public static long writeSize(long record, int size)
     {
         record &= 0xffffffff00000000L; // unset all lower bits
-        record |= (long) size;
+        record |= size;
         return record;
     }
 
@@ -126,5 +118,4 @@ abstract class Index<K> implements Closeable
     {
         return writeSize(writeOffset(0, offset), size);
     }
-
 }
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index 50a3058ec9..0baf8e5af3 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -18,12 +18,11 @@
 package org.apache.cassandra.journal;
 
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.nio.file.FileStore;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -53,7 +52,6 @@ import org.apache.cassandra.io.util.PathUtils;
 import org.apache.cassandra.journal.Segments.ReferencedSegment;
 import org.apache.cassandra.journal.Segments.ReferencedSegments;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.SavedCommand;
 import org.apache.cassandra.utils.Crc;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Simulate;
@@ -100,8 +98,7 @@ public class Journal<K, V> implements Shutdownable
 
     final Metrics<K, V> metrics;
     final Flusher<K, V> flusher;
-    //final Invalidator<K, V> invalidator;
-    //final Compactor<K, V> compactor;
+    final Compactor<K, V> compactor;
 
     volatile long replayLimit;
     final AtomicLong nextSegmentId = new AtomicLong();
@@ -120,7 +117,6 @@ public class Journal<K, V> implements Shutdownable
     private final FlusherCallbacks flusherCallbacks;
 
     SequentialExecutorPlus closer;
-    //private final Set<Descriptor> invalidations = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
 
     private class FlusherCallbacks implements Flusher.Callbacks
     {
@@ -180,7 +176,8 @@ public class Journal<K, V> implements Shutdownable
                    File directory,
                    Params params,
                    KeySupport<K> keySupport,
-                   ValueSerializer<K, V> valueSerializer)
+                   ValueSerializer<K, V> valueSerializer,
+                   SegmentCompactor<K, V> segmentCompactor)
     {
         this.name = name;
         this.directory = directory;
@@ -192,8 +189,7 @@ public class Journal<K, V> implements Shutdownable
         this.metrics = new Metrics<>(name);
         this.flusherCallbacks = new FlusherCallbacks();
         this.flusher = new Flusher<>(this, flusherCallbacks);
-        //this.invalidator = new Invalidator<>(this);
-        //this.compactor = new Compactor<>(this);
+        this.compactor = new Compactor<>(this, segmentCompactor);
     }
 
     public boolean isFlushed(RecordPointer recordPointer)
@@ -229,8 +225,13 @@ public class Journal<K, V> implements Shutdownable
         allocator = executorFactory().infiniteLoop(name + "-allocator", new 
AllocateRunnable(), SAFE, NON_DAEMON, SYNCHRONIZED);
         advanceSegment(null);
         flusher.start();
-        //invalidator.start();
-        //compactor.start();
+        compactor.start();
+    }
+
+    @VisibleForTesting
+    void runCompactorForTesting()
+    {
+        compactor.run();
     }
 
     /**
@@ -254,8 +255,8 @@ public class Journal<K, V> implements Shutdownable
         {
             allocator.shutdown();
             allocator.awaitTermination(1, TimeUnit.MINUTES);
-            //compactor.stop();
-            //invalidator.stop();
+            compactor.shutdown();
+            compactor.awaitTermination(1, TimeUnit.MINUTES);
             flusher.shutdown();
             closer.shutdown();
             closer.awaitTermination(1, TimeUnit.MINUTES);
@@ -284,34 +285,6 @@ public class Journal<K, V> implements Shutdownable
         return r;
     }
 
-    /**
-     * Read an entry by its address (segment timestamp + offest)
-     *
-     * @return deserialized record if present, null otherwise
-     */
-    public V read(long segmentTimestamp, int offset, int size)
-    {
-        try (ReferencedSegment<K, V> referenced = 
selectAndReference(segmentTimestamp))
-        {
-            Segment<K, V> segment = referenced.segment();
-            if (null == segment)
-                return null;
-
-            EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
-            segment.read(offset, size, holder);
-
-            try (DataInputBuffer in = new DataInputBuffer(holder.value, false))
-            {
-                return valueSerializer.deserialize(holder.key, in, 
segment.descriptor.userVersion);
-            }
-            catch (IOException e)
-            {
-                // can only throw if serializer is buggy
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
     /**
      * Looks up a record by the provided id.
      * <p/>
@@ -324,19 +297,20 @@ public class Journal<K, V> implements Shutdownable
      * @param id user-provided record id, expected to roughly correlate with 
time and go up
      * @return deserialized record if found, null otherwise
      */
+    @SuppressWarnings("unused")
     public V readFirst(K id)
     {
         EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
 
         try (ReferencedSegments<K, V> segments = selectAndReference(id))
         {
-            for (Segment<K, V> segment : segments.all())
+            for (Segment<K, V> segment : segments.allSorted())
             {
                 if (segment.readFirst(id, holder))
                 {
                     try (DataInputBuffer in = new 
DataInputBuffer(holder.value, false))
                     {
-                        return valueSerializer.deserialize(holder.key, in, 
segment.descriptor.userVersion);
+                        return valueSerializer.deserialize(holder.key, in, 
holder.userVersion);
                     }
                     catch (IOException e)
                     {
@@ -349,36 +323,34 @@ public class Journal<K, V> implements Shutdownable
         return null;
     }
 
-    public List<V> readAll(K id)
-    {
-        List<V> res = new ArrayList<>(2);
-        readAll(id, (in, userVersion) -> 
res.add(valueSerializer.deserialize(id, in, userVersion)));
-        return res;
-    }
-
-    public void readAll(K id, Reader reader)
+    public void readAll(K id, RecordConsumer<K> consumer)
     {
         EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
         try (ReferencedSegments<K, V> segments = selectAndReference(id))
         {
-            for (Segment<K, V> segment : segments.all())
+            consumer.init();
+
+            for (Segment<K, V> segment : segments.allSorted())
+                segment.readAll(id, holder, consumer);
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public List<V> readAll(K id)
+    {
+        List<V> res = new ArrayList<>(2);
+        readAll(id, (segment, position, key, buffer, hosts, userVersion) -> {
+            try (DataInputBuffer in = new DataInputBuffer(buffer, false))
             {
-                segment.readAll(id, holder, () -> {
-                    try (DataInputBuffer in = new 
DataInputBuffer(holder.value, false))
-                    {
-                        Invariants.checkState(Objects.equals(holder.key, id),
-                                              "%s != %s", holder.key, id);
-                        reader.read(in, segment.descriptor.userVersion);
-                        holder.clear();
-                    }
-                    catch (IOException e)
-                    {
-                        // can only throw if serializer is buggy
-                        throw new RuntimeException(e);
-                    }
-                });
+                res.add(valueSerializer.deserialize(key, in, userVersion));
             }
-        }
+            catch (IOException e)
+            {
+                // can only throw if serializer is buggy
+                throw new RuntimeException(e);
+            }
+        });
+        return res;
     }
 
     /**
@@ -394,6 +366,7 @@ public class Journal<K, V> implements Shutdownable
      * @param condition predicate to test the record against
      * @return deserialized record if found, null otherwise
      */
+    @SuppressWarnings("unused")
     public V readFirstMatching(K id, Predicate<V> condition)
     {
         EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
@@ -441,6 +414,7 @@ public class Journal<K, V> implements Shutdownable
      * @param consumer function to consume the raw record (bytes and 
invalidation set) if found
      * @return true if the record was found, false otherwise
      */
+    @SuppressWarnings("unused")
     public boolean readFirst(K id, RecordConsumer<K> consumer)
     {
         try (ReferencedSegments<K, V> segments = selectAndReference(id))
@@ -457,6 +431,7 @@ public class Journal<K, V> implements Shutdownable
      *
      * @return subset of ids to test that have been found in the journal
      */
+    @SuppressWarnings("unused")
     public Set<K> test(Set<K> test)
     {
         Set<K> present = new ObjectHashSet<>(test.size() + 1, 0.9f);
@@ -515,19 +490,7 @@ public class Journal<K, V> implements Shutdownable
      */
     public RecordPointer asyncWrite(K id, V record, Set<Integer> hosts)
     {
-        return asyncWrite(id, new SavedCommand.Writer<>()
-                          {
-                              public void write(DataOutputPlus out, int 
userVersion) throws IOException
-                              {
-                                  valueSerializer.serialize(id, record, out, 
params.userVersion());
-                              }
-
-                              public K key()
-                              {
-                                  return id;
-                              }
-                          },
-                          hosts);
+        return asyncWrite(id, (out, userVersion) -> 
valueSerializer.serialize(id, record, out, userVersion), hosts);
     }
 
     public RecordPointer asyncWrite(K id, Writer writer, Set<Integer> hosts)
@@ -548,7 +511,6 @@ public class Journal<K, V> implements Shutdownable
         return recordPointer;
     }
 
-
     private ActiveSegment<K, V>.Allocation allocate(int entrySize, 
Set<Integer> hosts)
     {
         ActiveSegment<K, V> segment = currentSegment;
@@ -658,6 +620,11 @@ public class Journal<K, V> implements Shutdownable
                     Thread.yield();
                 }
             }
+            catch (JournalWriteError e)
+            {
+                if (!(e.getCause() instanceof ClosedByInterruptException))
+                    throw e;
+            }
             catch (Throwable t)
             {
                 if (!handleError("Failed allocating journal segments", t))
@@ -727,26 +694,38 @@ public class Journal<K, V> implements Shutdownable
     }
 
     /**
-     * Select segments that could potentially have any entry with the 
specified ids and
+     * Select segments that could potentially have any entry with the 
specified id and
      * attempt to grab references to them all.
      *
      * @return a subset of segments with references to them
      */
-    ReferencedSegments<K, V> selectAndReference(Iterable<K> ids)
+    ReferencedSegments<K, V> selectAndReference(K id)
     {
         while (true)
         {
-            ReferencedSegments<K, V> referenced = 
segments().selectAndReference(ids);
+            ReferencedSegments<K, V> referenced = 
segments().selectAndReference(s -> s.index().mayContainId(id));
             if (null != referenced)
                 return referenced;
         }
     }
 
-    ReferencedSegments<K, V> selectAndReference(K id)
+    /**
+     * Select segments that could potentially have any entry with the 
specified ids and
+     * attempt to grab references to them all.
+     *
+     * @return a subset of segments with references to them
+     */
+    ReferencedSegments<K, V> selectAndReference(Iterable<K> ids)
     {
-        return selectAndReference(Collections.singleton(id));
+        while (true)
+        {
+            ReferencedSegments<K, V> referenced = 
segments().selectAndReference(s -> s.index().mayContainIds(ids));
+            if (null != referenced)
+                return referenced;
+        }
     }
 
+    @SuppressWarnings("unused")
     ReferencedSegment<K, V> selectAndReference(long segmentTimestamp)
     {
         while (true)
@@ -757,7 +736,7 @@ public class Journal<K, V> implements Shutdownable
         }
     }
 
-    private Segments<K, V> segments()
+    Segments<K, V> segments()
     {
         return segments.get();
     }
@@ -784,9 +763,9 @@ public class Journal<K, V> implements Shutdownable
         swapSegments(current -> current.withCompletedSegment(activeSegment, 
staticSegment));
     }
 
-    private void replaceCompactedSegment(StaticSegment<K, V> oldSegment, 
StaticSegment<K, V> newSegment)
+    void replaceCompactedSegments(Collection<StaticSegment<K, V>> oldSegments, 
Collection<StaticSegment<K, V>> compactedSegments)
     {
-        swapSegments(current -> current.withCompactedSegment(oldSegment, 
newSegment));
+        swapSegments(current -> current.withCompactedSegments(oldSegments, 
compactedSegments));
     }
 
     void selectSegmentToFlush(Collection<ActiveSegment<K, V>> into)
@@ -836,7 +815,7 @@ public class Journal<K, V> implements Shutdownable
             if (segment == null)
                 throw new IllegalArgumentException("Request the active segment 
" + timestamp + " but this segment does not exist");
             if (!segment.isActive())
-                throw new IllegalArgumentException("Request the active segment 
" + timestamp + " but this segment is not active");
+                throw new IllegalArgumentException(String.format("Request the 
active segment %d but this segment is not active: %s", timestamp, segment));
             return segment.asActive();
         }
     }
@@ -975,9 +954,4 @@ public class Journal<K, V> implements Shutdownable
     {
         void write(DataOutputPlus out, int userVersion) throws IOException;
     }
-
-    public interface Reader
-    {
-        void read(DataInputBuffer in, int userVersion) throws IOException;
-    }
 }
diff --git a/src/java/org/apache/cassandra/journal/Metadata.java 
b/src/java/org/apache/cassandra/journal/Metadata.java
index bc521cc83c..e8224ca64e 100644
--- a/src/java/org/apache/cassandra/journal/Metadata.java
+++ b/src/java/org/apache/cassandra/journal/Metadata.java
@@ -191,7 +191,7 @@ final class Metadata
         Int2IntHashMap recordsPerHost = new Int2IntHashMap(Integer.MIN_VALUE);
         int recordsCount = 0;
 
-        try (StaticSegment.SequentialReader<K> reader = 
StaticSegment.reader(descriptor, keySupport, fsyncedLimit))
+        try (StaticSegment.SequentialReader<K> reader = 
StaticSegment.sequentialReader(descriptor, keySupport, fsyncedLimit))
         {
             while (reader.advance())
             {
diff --git a/src/java/org/apache/cassandra/journal/OnDiskIndex.java 
b/src/java/org/apache/cassandra/journal/OnDiskIndex.java
index ba769d6163..fe2c2713b9 100644
--- a/src/java/org/apache/cassandra/journal/OnDiskIndex.java
+++ b/src/java/org/apache/cassandra/journal/OnDiskIndex.java
@@ -256,7 +256,7 @@ final class OnDiskIndex<K> extends Index<K>
     public long[] lookUpAll(K id)
     {
         if (!mayContainId(id))
-            return new long[0];
+            return EMPTY;
 
         int start = binarySearch(id);
         int firstKeyIndex = start;
@@ -265,7 +265,7 @@ final class OnDiskIndex<K> extends Index<K>
             firstKeyIndex = i;
 
         if (firstKeyIndex < 0)
-            return new long[0];
+            return EMPTY;
 
         int lastKeyIndex = start;
 
@@ -282,56 +282,61 @@ final class OnDiskIndex<K> extends Index<K>
         return all;
     }
 
-    public IndexIterator<K> iterator()
+    IndexReader reader()
     {
-        return new IndexIteratorImpl();
+        return new IndexReader();
     }
 
-    private class IndexIteratorImpl implements IndexIterator<K>
+    public class IndexReader
     {
-        int currentIdx;
-        K currentKey;
-        int currentOffset;
-        int currentSize;
+        int idx;
+        K key;
+        int offset;
+        int size;
 
-        IndexIteratorImpl()
+        IndexReader()
         {
-            currentIdx = -1;
+            idx = -1;
         }
 
-        @Override
-        public boolean hasNext()
+        public K key()
         {
-            return currentIdx < (entryCount - 1);
+            ensureAdvanced();
+            return key;
         }
 
-        @Override
-        public K currentKey()
+        public int offset()
         {
-            return currentKey;
+            ensureAdvanced();
+            return offset;
         }
 
-        @Override
-        public int currentOffset()
+        public int recordSize()
         {
-            return currentOffset;
+            ensureAdvanced();
+            return size;
         }
 
-        @Override
-        public int currentSize()
+        public boolean advance()
         {
-            return currentSize;
+            if (idx >= entryCount - 1)
+                return false;
+
+            idx++;
+            key = keyAtIndex(idx);
+            long record = recordAtIndex(idx);
+            offset = Index.readOffset(record);
+            size = Index.readSize(record);
+            return true;
         }
 
-        public void next()
+        private void ensureAdvanced()
         {
-            currentIdx++;
-            currentKey = keyAtIndex(currentIdx);
-            long record = recordAtIndex(currentIdx);
-            currentOffset = Index.readOffset(record);
-            currentSize = Index.readSize(record);
+            if (idx < 0)
+                throw new IllegalStateException("Must call advance() before 
accessing entry content");
         }
     }
+
     private K keyAtIndex(int index)
     {
         return keySupport.deserialize(buffer, FILE_PREFIX_SIZE + index * 
ENTRY_SIZE, descriptor.userVersion);
diff --git a/src/java/org/apache/cassandra/journal/Params.java 
b/src/java/org/apache/cassandra/journal/Params.java
index 17e719ce5d..56bacce1d9 100644
--- a/src/java/org/apache/cassandra/journal/Params.java
+++ b/src/java/org/apache/cassandra/journal/Params.java
@@ -38,6 +38,10 @@ public interface Params
      */
     FlushMode flushMode();
 
+    boolean enableCompaction();
+
+    int compactionPeriodMillis();
+
     /**
      * @return milliseconds between journal flushes
      */
diff --git a/src/java/org/apache/cassandra/journal/RecordConsumer.java 
b/src/java/org/apache/cassandra/journal/RecordConsumer.java
index e16194001d..3403cd0f23 100644
--- a/src/java/org/apache/cassandra/journal/RecordConsumer.java
+++ b/src/java/org/apache/cassandra/journal/RecordConsumer.java
@@ -24,5 +24,6 @@ import org.agrona.collections.IntHashSet;
 @FunctionalInterface
 public interface RecordConsumer<K>
 {
+    default void init() {}
     void accept(long segment, int position, K key, ByteBuffer buffer, 
IntHashSet hosts, int userVersion);
 }
diff --git a/src/java/org/apache/cassandra/journal/Segment.java 
b/src/java/org/apache/cassandra/journal/Segment.java
index e548c52128..0da59118b7 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.RefCounted;
 
-abstract class Segment<K, V> implements Closeable, RefCounted<Segment<K, V>>
+public abstract class Segment<K, V> implements Closeable, 
RefCounted<Segment<K, V>>
 {
     final File file;
     final Descriptor descriptor;
@@ -64,7 +64,7 @@ abstract class Segment<K, V> implements Closeable, 
RefCounted<Segment<K, V>>
 
         EntrySerializer.EntryHolder<K> into = new 
EntrySerializer.EntryHolder<>();
         int offset = Index.readOffset(offsetAndSize);
-        int size = Index.readSize(offset);
+        int size = Index.readSize(offsetAndSize);
         if (read(offset, size, into))
         {
             Invariants.checkState(id.equals(into.key), "Index for %s read 
incorrect key: expected %s but read %s", descriptor, id, into.key);
@@ -83,18 +83,19 @@ abstract class Segment<K, V> implements Closeable, 
RefCounted<Segment<K, V>>
         return true;
     }
 
-    void readAll(K id, EntrySerializer.EntryHolder<K> into, Runnable onEntry)
+    void readAll(K id, EntrySerializer.EntryHolder<K> into, RecordConsumer<K> 
onEntry)
     {
         long[] all = index().lookUpAll(id);
-
         for (int i = 0; i < all.length; i++)
         {
             int offset = Index.readOffset(all[i]);
             int size = Index.readSize(all[i]);
             Invariants.checkState(read(offset, size, into), "Read should 
always return true");
-            onEntry.run();
+            onEntry.accept(descriptor.timestamp, offset, into.key, into.value, 
into.hosts, into.userVersion);
         }
     }
 
     abstract boolean read(int offset, int size, EntrySerializer.EntryHolder<K> 
into);
+
+    abstract void release();
 }
diff --git a/src/java/org/apache/cassandra/journal/RecordConsumer.java 
b/src/java/org/apache/cassandra/journal/SegmentCompactor.java
similarity index 64%
copy from src/java/org/apache/cassandra/journal/RecordConsumer.java
copy to src/java/org/apache/cassandra/journal/SegmentCompactor.java
index e16194001d..5c95b539fc 100644
--- a/src/java/org/apache/cassandra/journal/RecordConsumer.java
+++ b/src/java/org/apache/cassandra/journal/SegmentCompactor.java
@@ -17,12 +17,18 @@
  */
 package org.apache.cassandra.journal;
 
-import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.Collection;
 
-import org.agrona.collections.IntHashSet;
-
-@FunctionalInterface
-public interface RecordConsumer<K>
+public interface SegmentCompactor<K, V>
 {
-    void accept(long segment, int position, K key, ByteBuffer buffer, 
IntHashSet hosts, int userVersion);
+    SegmentCompactor<?, ?> NOOP = (SegmentCompactor<Object, Object>) 
(segments, keySupport) -> segments;
+
+    static <K, V> SegmentCompactor<K, V> noop()
+    {
+        //noinspection unchecked
+        return (SegmentCompactor<K, V>) NOOP;
+    }
+
+    Collection<StaticSegment<K, V>> compact(Collection<StaticSegment<K, V>> 
segments, KeySupport<K> keySupport) throws IOException;
 }
diff --git a/src/java/org/apache/cassandra/journal/Segments.java 
b/src/java/org/apache/cassandra/journal/Segments.java
index 18dfc3bdaf..a779aebf23 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -17,7 +17,11 @@
  */
 package org.apache.cassandra.journal;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Predicate;
 
 import accord.utils.Invariants;
 import org.agrona.collections.Long2ObjectHashMap;
@@ -68,27 +72,37 @@ class Segments<K, V>
         return new Segments<>(newSegments);
     }
 
-    Segments<K, V> withCompactedSegment(StaticSegment<K, V> oldSegment, 
StaticSegment<K, V> newSegment)
+    Segments<K, V> withCompactedSegments(Collection<StaticSegment<K, V>> 
oldSegments, Collection<StaticSegment<K, V>> compactedSegments)
     {
-        Invariants.checkArgument(oldSegment.descriptor.timestamp == 
newSegment.descriptor.timestamp);
-        Invariants.checkArgument(oldSegment.descriptor.generation < 
newSegment.descriptor.generation);
         Long2ObjectHashMap<Segment<K, V>> newSegments = new 
Long2ObjectHashMap<>(segments);
-        Segment<K, V> oldValue = 
newSegments.put(newSegment.descriptor.timestamp, newSegment);
-        Invariants.checkState(oldValue == oldSegment);
+        for (StaticSegment<K, V> oldSegment : oldSegments)
+        {
+            Segment<K, V> oldValue = 
newSegments.remove(oldSegment.descriptor.timestamp);
+            Invariants.checkState(oldValue == oldSegment);
+        }
+
+        for (StaticSegment<K, V> compactedSegment : compactedSegments)
+        {
+            Segment<K, V> oldValue = 
newSegments.put(compactedSegment.descriptor.timestamp, compactedSegment);
+            Invariants.checkState(oldValue == null);
+        }
+
         return new Segments<>(newSegments);
     }
 
-    Segments<K, V> withoutInvalidatedSegment(StaticSegment<K, V> staticSegment)
+    Iterable<Segment<K, V>> all()
     {
-        Long2ObjectHashMap<Segment<K, V>> newSegments = new 
Long2ObjectHashMap<>(segments);
-        if (!newSegments.remove(staticSegment.descriptor.timestamp, 
staticSegment))
-            throw new IllegalStateException();
-        return new Segments<>(newSegments);
+        return this.segments.values();
     }
 
-    Iterable<Segment<K, V>> all()
+    /**
+     * Returns segments in timestamp order. Will allocate and sort the segment 
collection.
+     */
+    List<Segment<K, V>> allSorted()
     {
-        return segments.values();
+        List<Segment<K, V>> segments = new ArrayList<>(this.segments.values());
+        segments.sort(Comparator.comparing(s -> s.descriptor));
+        return segments;
     }
 
     void selectActive(long maxTimestamp, Collection<ActiveSegment<K, V>> into)
@@ -136,12 +150,12 @@ class Segments<K, V>
      * @return a subset of segments with references to them, or {@code null} 
if failed to grab the refs
      */
     @SuppressWarnings("resource")
-    ReferencedSegments<K, V> selectAndReference(Iterable<K> ids)
+    ReferencedSegments<K, V> selectAndReference(Predicate<Segment<K, V>> test)
     {
         Long2ObjectHashMap<Segment<K, V>> selectedSegments = null;
         for (Segment<K, V> segment : segments.values())
         {
-            if (segment.index().mayContainIds(ids))
+            if (test.test(segment))
             {
                 if (null == selectedSegments)
                     selectedSegments = newMap(10);
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java 
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index f63701771c..c7ac7ce410 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -23,7 +23,10 @@ import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.StandardOpenOption;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.locks.LockSupport;
 
 import org.agrona.collections.IntHashSet;
 import org.apache.cassandra.io.util.DataInputBuffer;
@@ -38,7 +41,7 @@ import org.apache.cassandra.utils.concurrent.Ref;
  * Can be compacted with input from {@code PersistedInvalidations} into a new 
smaller segment,
  * with invalidated entries removed.
  */
-final class StaticSegment<K, V> extends Segment<K, V>
+public final class StaticSegment<K, V> extends Segment<K, V>
 {
     final FileChannel channel;
 
@@ -111,7 +114,6 @@ final class StaticSegment<K, V> extends Segment<K, V>
         }
     }
 
-    @SuppressWarnings("resource")
     private static <K, V> StaticSegment<K, V> internalOpen(
         Descriptor descriptor, SyncedOffsets syncedOffsets, OnDiskIndex<K> 
index, Metadata metadata, KeySupport<K> keySupport)
     throws IOException
@@ -125,7 +127,43 @@ final class StaticSegment<K, V> extends Segment<K, V>
     @Override
     public void close()
     {
-        selfRef.release();
+        try
+        {
+            channel.close();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Could not close static segment " + 
descriptor, e);
+        }
+
+        release();
+    }
+
+    /**
+     * Waits until this segment is unreferenced, closes it, and deltes all 
files associated with it.
+     */
+    void discard()
+    {
+        // TODO: consider moving deletion logic to Tidier instead of 
busy-looping here
+        waitUntilUnreferenced();
+        close();
+        for (Component component : Component.values())
+        {
+            File file = descriptor.fileFor(component);
+            if (file.exists())
+                file.delete();
+        }
+    }
+
+    public void waitUntilUnreferenced()
+    {
+        while (true)
+        {
+            if (selfRef.globalCount() == 1)
+                return;
+
+            LockSupport.parkNanos(100);
+        }
     }
 
     @Override
@@ -140,6 +178,18 @@ final class StaticSegment<K, V> extends Segment<K, V>
         return selfRef.ref();
     }
 
+    @Override
+    void release()
+    {
+        selfRef.release();
+    }
+
+    @Override
+    public String toString()
+    {
+        return "StaticSegment{" + descriptor + '}';
+    }
+
     private static final class Tidier<K> implements Tidy
     {
         private final Descriptor descriptor;
@@ -223,57 +273,38 @@ final class StaticSegment<K, V> extends Segment<K, V>
      */
     void forEachRecord(RecordConsumer<K> consumer)
     {
-        try (SequentialReader<K> reader = reader(descriptor, keySupport, 
syncedOffsets.syncedOffset()))
+        try (SequentialReader<K> reader = sequentialReader(descriptor, 
keySupport, syncedOffsets.syncedOffset()))
         {
             while (reader.advance())
             {
-                consumer.accept(descriptor.timestamp, reader.offset(), 
reader.id(), reader.record(), reader.hosts(), descriptor.userVersion);
+                consumer.accept(descriptor.timestamp, reader.offset(), 
reader.key(), reader.record(), reader.hosts(), descriptor.userVersion);
             }
         }
     }
 
     /*
-     * Sequential reading (replay and components rebuild)
+     * Sequential and in-key order reading (replay and components rebuild)
      */
 
-    static <K> SequentialReader<K> reader(Descriptor descriptor, KeySupport<K> 
keySupport, int fsyncedLimit)
-    {
-        return SequentialReader.open(descriptor, keySupport, fsyncedLimit);
-    }
-
-    /**
-     * A sequential data segment reader to use for journal replay and 
rebuilding
-     * missing auxilirary components (index and metadata).
-     * </p>
-     * Unexpected EOF and CRC mismatches in synced portions of segments are 
treated
-     * strictly, throwing {@link JournalReadError}. Errors encountered in 
unsynced portions
-     * of segments are treated as segment EOF.
-     */
-    static final class SequentialReader<K> implements Closeable
+    static abstract class Reader<K> implements Closeable
     {
-        private final Descriptor descriptor;
-        private final KeySupport<K> keySupport;
-        private final int fsyncedLimit; // exclusive
+        enum State { RESET, ADVANCED, EOF }
 
-        private final File file;
-        private final FileChannel channel;
-        private final MappedByteBuffer buffer;
-        private final DataInputBuffer in;
+        public final Descriptor descriptor;
+        protected final KeySupport<K> keySupport;
 
-        private int offset = -1;
-        private final EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
-        private State state = State.RESET;
+        protected final File file;
+        protected final FileChannel channel;
+        protected final MappedByteBuffer buffer;
 
-        static <K> SequentialReader<K> open(Descriptor descriptor, 
KeySupport<K> keySupport, int fsyncedLimit)
-        {
-            return new SequentialReader<>(descriptor, keySupport, 
fsyncedLimit);
-        }
+        protected final EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
+        protected int offset = -1;
+        protected State state = State.RESET;
 
-        SequentialReader(Descriptor descriptor, KeySupport<K> keySupport, int 
fsyncedLimit)
+        Reader(Descriptor descriptor, KeySupport<K> keySupport)
         {
             this.descriptor = descriptor;
             this.keySupport = keySupport;
-            this.fsyncedLimit = fsyncedLimit;
 
             file = descriptor.fileFor(Component.DATA);
             try
@@ -289,7 +320,6 @@ final class StaticSegment<K, V> extends Segment<K, V>
             {
                 throw new JournalReadError(descriptor, file, e);
             }
-            in = new DataInputBuffer(buffer, false);
         }
 
         @Override
@@ -299,37 +329,72 @@ final class StaticSegment<K, V> extends Segment<K, V>
             FileUtils.clean(buffer);
         }
 
-        int offset()
+        public abstract boolean advance();
+
+        public int offset()
         {
             ensureHasAdvanced();
             return offset;
         }
 
-        K id()
+        public K key()
         {
             ensureHasAdvanced();
             return holder.key;
         }
 
-        IntHashSet hosts()
+        public IntHashSet hosts()
         {
             ensureHasAdvanced();
             return holder.hosts;
         }
 
-        ByteBuffer record()
+        public ByteBuffer record()
         {
             ensureHasAdvanced();
             return holder.value;
         }
 
-        private void ensureHasAdvanced()
+        protected void ensureHasAdvanced()
         {
             if (state != State.ADVANCED)
                 throw new IllegalStateException("Must call advance() before 
accessing entry content");
         }
 
-        boolean advance()
+        protected boolean eof()
+        {
+            state = State.EOF;
+            return false;
+        }
+    }
+
+    static <K> SequentialReader<K> sequentialReader(Descriptor descriptor, 
KeySupport<K> keySupport, int fsyncedLimit)
+    {
+        return new SequentialReader<>(descriptor, keySupport, fsyncedLimit);
+    }
+
+    /**
+     * A sequential data segment reader to use for journal replay and 
rebuilding
+     * missing auxilirary components (index and metadata).
+     * </p>
+     * Unexpected EOF and CRC mismatches in synced portions of segments are 
treated
+     * strictly, throwing {@link JournalReadError}. Errors encountered in 
unsynced portions
+     * of segments are treated as segment EOF.
+     */
+    static final class SequentialReader<K> extends Reader<K>
+    {
+        private final int fsyncedLimit; // exclusive
+        private final DataInputBuffer in;
+
+        SequentialReader(Descriptor descriptor, KeySupport<K> keySupport, int 
fsyncedLimit)
+        {
+            super(descriptor, keySupport);
+            this.fsyncedLimit = fsyncedLimit;
+            in = new DataInputBuffer(buffer, false);
+        }
+
+        @Override
+        public boolean advance()
         {
             if (state == State.EOF)
                 return false;
@@ -361,13 +426,56 @@ final class StaticSegment<K, V> extends Segment<K, V>
             holder.clear();
             state = State.RESET;
         }
+    }
+
+    public StaticSegment.KeyOrderReader<K> keyOrderReader()
+    {
+        return new StaticSegment.KeyOrderReader<>(descriptor, keySupport, 
index.reader());
+    }
+
+    public static final class KeyOrderReader<K> extends Reader<K> implements 
Comparable<KeyOrderReader<K>>
+    {
+        private final OnDiskIndex<K>.IndexReader indexReader;
+
+        KeyOrderReader(Descriptor descriptor, KeySupport<K> keySupport, 
OnDiskIndex<K>.IndexReader indexReader)
+        {
+            super(descriptor, keySupport);
+            this.indexReader = indexReader;
+        }
 
-        private boolean eof()
+        @Override
+        public boolean advance()
         {
-            state = State.EOF;
-            return false;
+            if (!indexReader.advance())
+                return eof();
+
+            offset = indexReader.offset();
+
+            buffer.limit(offset + indexReader.recordSize())
+                  .position(offset);
+            try
+            {
+                EntrySerializer.read(holder, keySupport, buffer, 
descriptor.userVersion);
+            }
+            catch (IOException e)
+            {
+                throw new JournalReadError(descriptor, file, e);
+            }
+
+            state = State.ADVANCED;
+            return true;
         }
 
-        enum State { RESET, ADVANCED, EOF }
+        @Override
+        public int compareTo(KeyOrderReader<K> that)
+        {
+            this.ensureHasAdvanced();
+            that.ensureHasAdvanced();
+
+            int cmp = keySupport.compare(this.key(), that.key());
+            return cmp != 0
+                 ? cmp
+                 : this.descriptor.compareTo(that.descriptor);
+        }
     }
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 932d31356a..80ff9739ce 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.service.accord;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -78,7 +77,8 @@ public class AccordJournal implements IJournal, Shutdownable
 
     static final ThreadLocal<byte[]> keyCRCBytes = ThreadLocal.withInitial(() 
-> new byte[22]);
 
-    public final Journal<JournalKey, Object> journal;
+    private final Journal<JournalKey, Object> journal;
+    private final AccordJournalTable<JournalKey, Object> journalTable;
     private final AccordEndpointMapper endpointMapper;
 
     private final DelayedRequestProcessor delayedRequestProcessor = new 
DelayedRequestProcessor();
@@ -94,24 +94,26 @@ public class AccordJournal implements IJournal, Shutdownable
         File directory = new 
File(DatabaseDescriptor.getAccordJournalDirectory());
         this.journal = new Journal<>("AccordJournal", directory, params, 
JournalKey.SUPPORT,
                                      // In Accord, we are using streaming 
serialization, i.e. Reader/Writer interfaces instead of materializing objects
-                                     new ValueSerializer<JournalKey, Object>()
+                                     new ValueSerializer<>()
                                      {
                                          public int serializedSize(JournalKey 
key, Object value, int userVersion)
                                          {
                                              throw new 
UnsupportedOperationException();
                                          }
 
-                                         public void serialize(JournalKey key, 
Object value, DataOutputPlus out, int userVersion) throws IOException
+                                         public void serialize(JournalKey key, 
Object value, DataOutputPlus out, int userVersion)
                                          {
                                              throw new 
UnsupportedOperationException();
                                          }
 
-                                         public Object deserialize(JournalKey 
key, DataInputPlus in, int userVersion) throws IOException
+                                         public Object deserialize(JournalKey 
key, DataInputPlus in, int userVersion)
                                          {
                                              throw new 
UnsupportedOperationException();
                                          }
-                                     });
+                                     },
+                                     new AccordSegmentCompactor<>());
         this.endpointMapper = endpointMapper;
+        this.journalTable = new AccordJournalTable<>(journal, 
JournalKey.SUPPORT, params.userVersion());
     }
 
     public AccordJournal start(Node node)
@@ -153,7 +155,7 @@ public class AccordJournal implements IJournal, Shutdownable
     {
         try
         {
-            ExecutorUtils.awaitTermination(timeout, units, 
Arrays.asList(journal));
+            ExecutorUtils.awaitTermination(timeout, units, 
Collections.singletonList(journal));
             return true;
         }
         catch (TimeoutException e)
@@ -191,9 +193,9 @@ public class AccordJournal implements IJournal, Shutdownable
     @VisibleForTesting
     public SavedCommand.Builder loadDiffs(int commandStoreId, TxnId txnId)
     {
+        JournalKey key = new JournalKey(txnId, commandStoreId);
         SavedCommand.Builder builder = new SavedCommand.Builder();
-        journal.readAll(new JournalKey(txnId, commandStoreId),
-                        builder::deserializeNext);
+        journalTable.readAll(key, (ignore, in, userVersion) -> 
builder.deserializeNext(in, userVersion));
         return builder;
     }
 
@@ -240,7 +242,7 @@ public class AccordJournal implements IJournal, Shutdownable
             // We can only use strict equality if we supply result.
             Command reconstructed = diffs.construct();
             Invariants.checkState(orig.equals(reconstructed),
-                                  "\n" +
+                                  '\n' +
                                   "Original:      %s\n" +
                                   "Reconstructed: %s\n" +
                                   "Diffs:         %s", orig, reconstructed, 
diffs);
@@ -255,6 +257,7 @@ public class AccordJournal implements IJournal, Shutdownable
     /*
      * Context necessary to process log records
      */
+
     static abstract class RequestContext implements ReplyContext
     {
         final long waitForEpoch;
@@ -351,7 +354,7 @@ public class AccordJournal implements IJournal, Shutdownable
 
         public void start()
         {
-             executor = 
executorFactory().infiniteLoop("AccordJournal-delayed-request-processor", 
this::run, SAFE, InfiniteLoopExecutor.Daemon.NON_DAEMON, 
InfiniteLoopExecutor.Interrupts.SYNCHRONIZED);
+             executor = 
executorFactory().infiniteLoop("AccordJournal-delayed-request-processor", this, 
SAFE, InfiniteLoopExecutor.Daemon.NON_DAEMON, 
InfiniteLoopExecutor.Interrupts.SYNCHRONIZED);
         }
 
         private void delay(RequestContext requestContext)
@@ -457,9 +460,9 @@ public class AccordJournal implements IJournal, Shutdownable
         }
     }
 
-    public boolean isRunnable(Status status)
+    private boolean isRunnable(Status status)
     {
-        return status != Status.TERMINATING && status != status.TERMINATED;
+        return status != Status.TERMINATING && status != Status.TERMINATED;
     }
 
     @VisibleForTesting
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
new file mode 100644
index 0000000000..642f49e417
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.accord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import accord.utils.Invariants;
+import org.agrona.collections.IntHashSet;
+import org.agrona.collections.LongHashSet;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ColumnFamilyStore.RefViewFragment;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.StorageHook;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.journal.EntrySerializer.EntryHolder;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.RecordConsumer;
+import org.apache.cassandra.schema.ColumnMetadata;
+
+import static 
org.apache.cassandra.io.sstable.SSTableReadsListener.NOOP_LISTENER;
+
+public class AccordJournalTable<K, V>
+{
+    private static final IntHashSet SENTINEL_HOSTS = new IntHashSet();
+
+    private final Journal<K, V> journal;
+    private final ColumnFamilyStore cfs;
+
+    private final ColumnMetadata recordColumn;
+    private final ColumnMetadata versionColumn;
+
+    private final KeySupport<K> keySupport;
+    private final int accordJournalVersion;
+
+    public AccordJournalTable(Journal<K, V> journal, KeySupport<K> keySupport, 
int accordJournalVersion)
+    {
+        this.journal = journal;
+        this.cfs = 
Keyspace.open(AccordKeyspace.metadata().name).getColumnFamilyStore(AccordKeyspace.JOURNAL);
+        this.recordColumn = 
cfs.metadata().getColumn(ColumnIdentifier.getInterned("record", false));
+        this.versionColumn = 
cfs.metadata().getColumn(ColumnIdentifier.getInterned("user_version", false));
+        this.keySupport = keySupport;
+        this.accordJournalVersion = accordJournalVersion;
+    }
+
+    public interface Reader<K>
+    {
+        void read(K key, DataInputPlus input, int userVersion) throws 
IOException;
+    }
+
+    private abstract class AbstractRecordConsumer implements RecordConsumer<K>
+    {
+        protected final Reader<K> reader;
+
+        AbstractRecordConsumer(Reader<K> reader)
+        {
+            this.reader = reader;
+        }
+
+        @Override
+        public void accept(long segment, int position, K key, ByteBuffer 
buffer, IntHashSet hosts, int userVersion)
+        {
+            try (DataInputBuffer in = new DataInputBuffer(buffer, false))
+            {
+                reader.read(key, in, userVersion);
+            }
+            catch (IOException e)
+            {
+                // can only throw if serializer is buggy
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private class TableRecordConsumer extends AbstractRecordConsumer
+    {
+        protected LongHashSet visited = null;
+
+        TableRecordConsumer(Reader<K> reader)
+        {
+            super(reader);
+        }
+
+        void visit(long segment)
+        {
+            if (visited == null)
+                visited = new LongHashSet();
+            visited.add(segment);
+        }
+
+        boolean visited(long segment)
+        {
+            return visited != null && visited.contains(segment);
+        }
+
+        @Override
+        public void accept(long segment, int position, K key, ByteBuffer 
buffer, IntHashSet hosts, int userVersion)
+        {
+            visit(segment);
+            super.accept(segment, position, key, buffer, hosts, userVersion);
+        }
+    }
+
+    private class JournalAndTableRecordConsumer extends AbstractRecordConsumer
+    {
+        private final K key;
+        private final TableRecordConsumer tableRecordConsumer;
+
+        JournalAndTableRecordConsumer(K key, Reader<K> reader)
+        {
+            super(reader);
+            this.key = key;
+            this.tableRecordConsumer = new TableRecordConsumer(reader);
+        }
+
+        @Override
+        public void init()
+        {
+            readAllFromTable(key, tableRecordConsumer);
+        }
+
+        @Override
+        public void accept(long segment, int position, K key, ByteBuffer 
buffer, IntHashSet hosts, int userVersion)
+        {
+            if (!tableRecordConsumer.visited(segment))
+                super.accept(segment, position, key, buffer, hosts, 
userVersion);
+        }
+    }
+
+    /**
+     * Perform a read from Journal table, followed by the reads from all 
journal segments.
+     * <p>
+     * When reading from journal segments, skip descriptors that were read 
from the table.
+     */
+    public void readAll(K key, Reader<K> reader)
+    {
+        journal.readAll(key, new JournalAndTableRecordConsumer(key, reader));
+    }
+
+    private void readAllFromTable(K key, TableRecordConsumer onEntry)
+    {
+        DecoratedKey pk = makePartitionKey(cfs, key, keySupport, 
accordJournalVersion);
+
+        try (RefViewFragment view = 
cfs.selectAndReference(View.select(SSTableSet.LIVE, pk)))
+        {
+            if (view.sstables.isEmpty())
+                return;
+
+            List<UnfilteredRowIterator> iters = new 
ArrayList<>(view.sstables.size());
+            for (SSTableReader sstable : view.sstables)
+                if (sstable.mayContainAssumingKeyIsInRange(pk))
+                    iters.add(StorageHook.instance.makeRowIterator(cfs, 
sstable, pk, Slices.ALL, ColumnFilter.all(cfs.metadata()), false, 
NOOP_LISTENER));
+
+            if (!iters.isEmpty())
+            {
+                EntryHolder<K> into = new EntryHolder<>();
+                try (UnfilteredRowIterator iter = 
UnfilteredRowIterators.merge(iters))
+                {
+                    while (iter.hasNext()) readRow(key, iter.next(), into, 
onEntry);
+                }
+            }
+        }
+    }
+
+    public static <K> DecoratedKey makePartitionKey(ColumnFamilyStore cfs, K 
key, KeySupport<K> keySupport, int version)
+    {
+        try (DataOutputBuffer out = new 
DataOutputBuffer(keySupport.serializedSize(version)))
+        {
+            keySupport.serialize(key, out, version);
+            return cfs.decorateKey(out.buffer(false));
+        }
+        catch (IOException e)
+        {
+            // can only throw if (key) serializer is buggy
+            throw new RuntimeException("Could not serialize key " + key + ", 
this shouldn't be possible", e);
+        }
+    }
+
+    private void readRow(K key, Unfiltered unfiltered, EntryHolder<K> into, 
RecordConsumer<K> onEntry)
+    {
+        Invariants.checkState(unfiltered.isRow());
+        Row row = (Row) unfiltered;
+
+        long descriptor = LongType.instance.compose(ByteBuffer.wrap((byte[]) 
row.clustering().get(0)));
+        int position = Int32Type.instance.compose(ByteBuffer.wrap((byte[]) 
row.clustering().get(1)));
+
+        into.key = key;
+        into.value = row.getCell(recordColumn).buffer();
+        into.hosts = SENTINEL_HOSTS;
+        into.userVersion = 
Int32Type.instance.compose(row.getCell(versionColumn).buffer());
+
+        onEntry.accept(descriptor, position, into.key, into.value, into.hosts, 
into.userVersion);
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 45a85998a6..d58e7841df 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -169,6 +169,7 @@ public class AccordKeyspace
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AccordKeyspace.class);
 
+    public static final String JOURNAL = "journal";
     public static final String COMMANDS = "commands";
     public static final String TIMESTAMPS_FOR_KEY = "timestamps_for_key";
     public static final String COMMANDS_FOR_KEY = "commands_for_key";
@@ -223,6 +224,21 @@ public class AccordKeyspace
         }
     }
 
+    public static final TableMetadata Journal =
+        parse(JOURNAL,
+              "accord journal",
+              "CREATE TABLE %s ("
+              + "key blob,"
+              + "descriptor bigint,"
+              + "offset int,"
+              + "user_version int,"
+              + "record blob,"
+              + "PRIMARY KEY(key, descriptor, offset)"
+              + ')')
+        .partitioner(new LocalPartitioner(BytesType.instance))
+        .build();
+
+
     // TODO: store timestamps as blobs (confirm there are no negative numbers, 
or offset)
     public static final TableMetadata Commands =
         parse(COMMANDS,
@@ -717,7 +733,7 @@ public class AccordKeyspace
 
     public static Tables tables()
     {
-        return Tables.of(Commands, TimestampsForKeys, CommandsForKeys, 
Topologies, EpochMetadata, CommandStoreMetadata);
+        return Tables.of(Commands, TimestampsForKeys, CommandsForKeys, 
Topologies, EpochMetadata, CommandStoreMetadata, Journal);
     }
 
     private static <T> ByteBuffer serialize(T obj, LocalVersionedSerializer<T> 
serializer) throws IOException
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java 
b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
new file mode 100644
index 0000000000..e3f10cb644
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.accord;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.PriorityQueue;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.utils.Invariants;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.SegmentCompactor;
+import org.apache.cassandra.journal.StaticSegment;
+import org.apache.cassandra.journal.StaticSegment.KeyOrderReader;
+
+/**
+ * Segment compactor: takes static segments and compacts them into a single 
SSTable.
+ */
+public class AccordSegmentCompactor<K, V> implements SegmentCompactor<K, V>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordSegmentCompactor.class);
+
+    @Override
+    public Collection<StaticSegment<K, V>> compact(Collection<StaticSegment<K, 
V>> segments, KeySupport<K> keySupport)
+    {
+        Invariants.checkState(segments.size() >= 2, () -> String.format("Can 
only compact 2 or more segments, but got %d", segments.size()));
+        logger.info("Compacting {} static segments: {}", segments.size(), 
segments);
+
+        PriorityQueue<KeyOrderReader<K>> readers = new PriorityQueue<>();
+        for (StaticSegment<K, V> segment : segments)
+        {
+            KeyOrderReader<K> reader = segment.keyOrderReader();
+            if (reader.advance())
+                readers.add(reader);
+        }
+
+        // nothing to compact (all segments empty, should never happen, but it 
is theoretically possible?) - exit early
+        // TODO: investigate how this comes to be, check if there is a cleanup 
issue
+        if (readers.isEmpty())
+            return segments;
+
+        ColumnFamilyStore cfs = 
Keyspace.open(AccordKeyspace.metadata().name).getColumnFamilyStore(AccordKeyspace.JOURNAL);
+        Descriptor descriptor = 
cfs.newSSTableDescriptor(cfs.getDirectories().getDirectoryForNewSSTables());
+        SerializationHeader header = new SerializationHeader(true, 
cfs.metadata(), cfs.metadata().regularAndStaticColumns(), 
EncodingStats.NO_STATS);
+
+        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, 
descriptor, 0, 0, null, false, header))
+        {
+            K key = null;
+            PartitionUpdate.SimpleBuilder partitionBuilder = null;
+
+            try
+            {
+                KeyOrderReader<K> reader;
+                while ((reader = readers.poll()) != null)
+                {
+                    if (!reader.key().equals(key)) // first ever - or new - key
+                    {
+                        if (partitionBuilder != null) // append previous 
partition if any
+                            
writer.append(partitionBuilder.build().unfilteredIterator());
+
+                        key = reader.key();
+                        partitionBuilder = PartitionUpdate.simpleBuilder(
+                            AccordKeyspace.Journal, 
AccordJournalTable.makePartitionKey(cfs, key, keySupport, 
reader.descriptor.userVersion)
+                        );
+                    }
+
+                    boolean advanced;
+                    do
+                    {
+                        partitionBuilder.row(reader.descriptor.timestamp, 
reader.offset())
+                                        .add("record", reader.record())
+                                        .add("user_version", 
reader.descriptor.userVersion);
+                    }
+                    while ((advanced = reader.advance()) && 
reader.key().equals(key));
+
+                    if (advanced) readers.offer(reader); // there is more to 
this reader, but not with this key
+                }
+
+                //noinspection DataFlowIssue
+                writer.append(partitionBuilder.build().unfilteredIterator()); 
// append the last partition
+            }
+            catch (Throwable t)
+            {
+                Throwable accumulate = writer.abort(t);
+                Throwables.throwIfUnchecked(accumulate);
+                throw new RuntimeException(accumulate);
+            }
+
+            cfs.addSSTables(writer.finish(true));
+            return Collections.emptyList();
+        }
+    }
+}
+
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java 
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index 9e49cf3c47..05e4e31a0d 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -126,6 +127,8 @@ public interface IAccordService
 
     class CompactionInfo
     {
+        static final Supplier<CompactionInfo> NO_OP = () ->  new 
CompactionInfo(new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>(), 
DurableBefore.EMPTY);
+
         public final Int2ObjectHashMap<RedundantBefore> redundantBefores;
         public final Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges;
         public final DurableBefore durableBefore;
diff --git a/src/java/org/apache/cassandra/service/accord/JournalKey.java 
b/src/java/org/apache/cassandra/service/accord/JournalKey.java
index aa0c747396..c31c337882 100644
--- a/src/java/org/apache/cassandra/service/accord/JournalKey.java
+++ b/src/java/org/apache/cassandra/service/accord/JournalKey.java
@@ -36,9 +36,9 @@ import static org.apache.cassandra.db.TypeSizes.SHORT_SIZE;
 
 public final class JournalKey
 {
-    final Timestamp timestamp;
+    public final Timestamp timestamp;
     // TODO: command store id _before_ timestamp
-    final int commandStoreId;
+    public final int commandStoreId;
 
     JournalKey(Timestamp timestamp)
     {
@@ -60,7 +60,7 @@ public final class JournalKey
      * when ordering timestamps. This is done for more precise elimination of 
candidate
      * segments by min/max record key in segment.
      */
-    static final KeySupport<JournalKey> SUPPORT = new KeySupport<>()
+    public static final KeySupport<JournalKey> SUPPORT = new KeySupport<>()
     {
         private static final int HLC_OFFSET = 0;
         private static final int EPOCH_AND_FLAGS_OFFSET = HLC_OFFSET + 
LONG_SIZE;
diff --git a/src/java/org/apache/cassandra/service/accord/SavedCommand.java 
b/src/java/org/apache/cassandra/service/accord/SavedCommand.java
index 090e4da755..5f5fb9b9c9 100644
--- a/src/java/org/apache/cassandra/service/accord/SavedCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/SavedCommand.java
@@ -259,30 +259,128 @@ public class SavedCommand
         return oldFlags | (1 << field.ordinal());
     }
 
-
     public static class Builder
     {
-        TxnId txnId = null;
+        TxnId txnId;
+
+        Timestamp executeAt;
+        Timestamp executeAtLeast;
+        SaveStatus saveStatus;
+        Status.Durability durability;
+
+        Ballot acceptedOrCommitted;
+        Ballot promised;
+
+        Route<?> route;
+        PartialTxn partialTxn;
+        PartialDeps partialDeps;
+        Seekables<?, ?> additionalKeysOrRanges;
+
+        SavedCommand.WaitingOnProvider waitingOn;
+        Writes writes;
+        Result result;
+
+        boolean nextCalled;
+        int count;
+
+        public Builder()
+        {
+            clear();
+        }
+
+        public TxnId txnId()
+        {
+            return txnId;
+        }
+
+        public Timestamp executeAt()
+        {
+            return executeAt;
+        }
 
-        Timestamp executeAt = null;
-        Timestamp executeAtLeast = null;
-        SaveStatus saveStatus = null;
-        Status.Durability durability = null;
+        public SaveStatus saveStatus()
+        {
+            return saveStatus;
+        }
+
+        public Status.Durability durability()
+        {
+            return durability;
+        }
 
-        Ballot acceptedOrCommitted = Ballot.ZERO;
-        Ballot promised = null;
+        public Ballot acceptedOrCommitted()
+        {
+            return acceptedOrCommitted;
+        }
 
-        Route<?> route = null;
-        PartialTxn partialTxn = null;
-        PartialDeps partialDeps = null;
-        Seekables<?, ?> additionalKeysOrRanges = null;
+        public Ballot promised()
+        {
+            return promised;
+        }
 
-        SavedCommand.WaitingOnProvider waitingOn = (txn, deps) -> null;
-        Writes writes = null;
-        Result result = CommandSerializers.APPLIED;
+        public Route<?> route()
+        {
+            return route;
+        }
 
-        boolean nextCalled = false;
-        int count = 0;
+        public PartialTxn partialTxn()
+        {
+            return partialTxn;
+        }
+
+        public PartialDeps partialDeps()
+        {
+            return partialDeps;
+        }
+
+        public Seekables<?, ?> additionalKeysOrRanges()
+        {
+            return additionalKeysOrRanges;
+        }
+
+        public SavedCommand.WaitingOnProvider waitingOn()
+        {
+            return waitingOn;
+        }
+
+        public Writes writes()
+        {
+            return writes;
+        }
+
+        public Result result()
+        {
+            return result;
+        }
+
+        public void clear()
+        {
+            txnId = null;
+
+            executeAt = null;
+            saveStatus = null;
+            durability = null;
+
+            acceptedOrCommitted = Ballot.ZERO;
+            promised = null;
+
+            route = null;
+            partialTxn = null;
+            partialDeps = null;
+            additionalKeysOrRanges = null;
+
+            waitingOn = (txn, deps) -> null;
+            writes = null;
+            result = CommandSerializers.APPLIED;
+
+            nextCalled = false;
+            count = 0;
+        }
+
+        public boolean isEmpty()
+        {
+            return !nextCalled;
+        }
 
         public int count()
         {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
similarity index 98%
rename from 
test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalTest.java
rename to 
test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
index 7f7dfb0767..80d48b6091 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.distributed.shared.WithProperties;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
 
-public class AccordJournalTest extends TestBaseImpl
+public class AccordJournalIntegrationTest extends TestBaseImpl
 {
     @Test
     public void saveLoadSanityCheck() throws Throwable
diff --git 
a/test/distributed/org/apache/cassandra/journal/AccordJournalCompactionTest.java
 
b/test/distributed/org/apache/cassandra/journal/AccordJournalCompactionTest.java
new file mode 100644
index 0000000000..a2161c4386
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/journal/AccordJournalCompactionTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.journal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.service.accord.AccordJournalTable;
+import org.apache.cassandra.service.accord.AccordSegmentCompactor;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.TimeUUID;
+
+import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
+
+public class AccordJournalCompactionTest
+{
+    private static final Set<Integer> SENTINEL_HOSTS = 
Collections.singleton(0);
+
+    @BeforeClass
+    public static void setUp()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        ServerTestUtils.prepareServer();
+    }
+
+    @Test
+    public void segmentMergeTest() throws IOException
+    {
+        File directory = new File(Files.createTempDirectory(null));
+        directory.deleteOnExit();
+
+        Journal<TimeUUID, ByteBuffer> journal = journal(directory);
+        AccordJournalTable<TimeUUID, ByteBuffer> journalTable = new 
AccordJournalTable<>(journal, journal.keySupport, journal.params.userVersion());
+        journal.start();
+
+        Map<TimeUUID, List<ByteBuffer >> uuids = new HashMap<>();
+
+        int count = 0;
+        for (int i = 0; i < 1024 * 5; i++)
+        {
+            TimeUUID uuid = nextTimeUUID();
+            for (long j = 0; j < 5; j++)
+            {
+                ByteBuffer buf = ByteBuffer.allocate(1024);
+                for (int k = 0; k < 1024; k++)
+                    buf.put((byte) count);
+                count++;
+                buf.rewind();
+                uuids.computeIfAbsent(uuid, (k) -> new ArrayList<>())
+                     .add(buf);
+                journal.asyncWrite(uuid, buf, SENTINEL_HOSTS);
+            }
+        }
+
+        journal.closeCurrentSegmentForTesting();
+        Runnable checkAll = () -> {
+            for (Map.Entry<TimeUUID, List<ByteBuffer>> e : uuids.entrySet())
+            {
+                List<ByteBuffer> expected = e.getValue();
+
+                List<ByteBuffer> actual = new ArrayList<>();
+                journalTable.readAll(e.getKey(), (key, in, userVersion) -> 
actual.add(journal.valueSerializer.deserialize(key, in, userVersion)));
+                Assert.assertEquals(actual.size(), expected.size());
+                for (int i = 0; i < actual.size(); i++)
+                {
+                    if (!actual.get(i).equals(expected.get(i)))
+                    {
+                        StringBuilder sb = new StringBuilder();
+                        sb.append("Actual:\n");
+                        for (ByteBuffer bb : actual)
+                            
sb.append(ByteBufferUtil.bytesToHex(bb)).append('\n');
+                        sb.append("Expected:\n");
+                        for (ByteBuffer bb : expected)
+                            
sb.append(ByteBufferUtil.bytesToHex(bb)).append('\n');
+                        throw new AssertionError(sb.toString());
+                    }
+                }
+            }
+        };
+
+        checkAll.run();
+        journal.runCompactorForTesting();
+        checkAll.run();
+        journal.shutdown();
+    }
+
+    private static Journal<TimeUUID, ByteBuffer> journal(File directory)
+    {
+        return new Journal<>("TestJournal", directory,
+                             new TestParams() {
+                                 @Override
+                                 public int segmentSize()
+                                 {
+                                     return 1024 * 1024;
+                                 }
+
+                                 @Override
+                                 public boolean enableCompaction()
+                                 {
+                                     return false;
+                                 }
+                             },
+                             TimeUUIDKeySupport.INSTANCE,
+                             JournalTest.ByteBufferSerializer.INSTANCE,
+                             new AccordSegmentCompactor<>());
+    }
+}
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
 
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
index 5a3b23076e..8ff8938645 100644
--- 
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.journal.Journal;
 import org.apache.cassandra.journal.KeySupport;
 import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.SegmentCompactor;
 import org.apache.cassandra.journal.ValueSerializer;
 
 import org.junit.Assert;
@@ -78,7 +79,8 @@ public class AccordJournalSimulationTest extends 
SimulationTestBase
                                                    new File("/journal"),
                                                    new 
AccordSpec.JournalSpec(),
                                                    new IdentityKeySerializer(),
-                                                   new 
IdentityValueSerializer());
+                                                   new 
IdentityValueSerializer(),
+                                                   SegmentCompactor.noop());
                  }),
                  () -> check());
     }
diff --git a/test/unit/org/apache/cassandra/journal/IndexTest.java 
b/test/unit/org/apache/cassandra/journal/IndexTest.java
index 5e7314a338..8f1046f2c3 100644
--- a/test/unit/org/apache/cassandra/journal/IndexTest.java
+++ b/test/unit/org/apache/cassandra/journal/IndexTest.java
@@ -224,20 +224,18 @@ public class IndexTest
                     sortedEntries.add(Pair.create(entry.getKey(), l));
             }
 
-            Index.IndexIterator<TimeUUID> iter = onDisk.iterator();
+            OnDiskIndex<TimeUUID>.IndexReader iter = onDisk.reader();
             Iterator<Pair<TimeUUID, Long>> expectedIter = 
sortedEntries.iterator();
-            while (iter.hasNext())
+            while (iter.advance())
             {
-                iter.next();
                 Pair<TimeUUID, Long> expected = expectedIter.next();
-                Assert.assertEquals(iter.currentKey(), expected.left);
-                Assert.assertEquals(iter.currentSize(), 
Index.readSize(expected.right));
-                Assert.assertEquals(iter.currentOffset(), 
Index.readOffset(expected.right));
+                Assert.assertEquals(iter.key(), expected.left);
+                Assert.assertEquals(iter.recordSize(), 
Index.readSize(expected.right));
+                Assert.assertEquals(iter.offset(), 
Index.readOffset(expected.right));
             }
         }
     }
 
-
     private static void assertIndex(Map<TimeUUID, long[]> expected, 
Index<TimeUUID> actual)
     {
         List<TimeUUID> keys = expected.entrySet()
diff --git a/test/unit/org/apache/cassandra/journal/JournalTest.java 
b/test/unit/org/apache/cassandra/journal/JournalTest.java
index 241e465ba8..bab37ca150 100644
--- a/test/unit/org/apache/cassandra/journal/JournalTest.java
+++ b/test/unit/org/apache/cassandra/journal/JournalTest.java
@@ -18,27 +18,33 @@
 package org.apache.cassandra.journal;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.Collections;
+import java.util.Set;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.utils.TimeUUID;
 
-import static org.junit.Assert.assertEquals;
 import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
+import static org.junit.Assert.assertEquals;
 
 public class JournalTest
 {
+    private static final Set<Integer> SENTINEL_HOSTS = 
Collections.singleton(0);
+
     @BeforeClass
     public static void setUp()
     {
         DatabaseDescriptor.daemonInitialization();
+        ServerTestUtils.prepareServer();
     }
 
     @Test
@@ -48,7 +54,8 @@ public class JournalTest
         directory.deleteRecursiveOnExit();
 
         Journal<TimeUUID, Long> journal =
-            new Journal<>("TestJournal", directory, TestParams.INSTANCE, 
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE);
+        new Journal<>("TestJournal", directory, TestParams.INSTANCE, 
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop());
+
 
         journal.start();
 
@@ -69,7 +76,7 @@ public class JournalTest
 
         journal.shutdown();
 
-        journal = new Journal<>("TestJournal", directory, TestParams.INSTANCE, 
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE);
+        journal = new Journal<>("TestJournal", directory, TestParams.INSTANCE, 
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop());
         journal.start();
 
         assertEquals(1L, (long) journal.readFirst(id1));
@@ -80,6 +87,29 @@ public class JournalTest
         journal.shutdown();
     }
 
+    static class ByteBufferSerializer implements ValueSerializer<TimeUUID, 
ByteBuffer>
+    {
+        static final ByteBufferSerializer INSTANCE = new 
ByteBufferSerializer();
+
+        public int serializedSize(TimeUUID key, ByteBuffer value, int 
userVersion)
+        {
+            return Integer.BYTES + value.capacity();
+        }
+
+        public void serialize(TimeUUID key, ByteBuffer value, DataOutputPlus 
out, int userVersion) throws IOException
+        {
+            out.writeInt(value.capacity());
+            out.write(value);
+        }
+
+        public ByteBuffer deserialize(TimeUUID key, DataInputPlus in, int 
userVersion) throws IOException
+        {
+            byte[] bytes = new byte[in.readInt()];
+            in.readFully(bytes);
+            return ByteBuffer.wrap(bytes);
+        }
+    }
+
     static class LongSerializer implements ValueSerializer<TimeUUID, Long>
     {
         static final LongSerializer INSTANCE = new LongSerializer();
diff --git a/test/unit/org/apache/cassandra/journal/SegmentTest.java 
b/test/unit/org/apache/cassandra/journal/SegmentTest.java
index 2e59d701cb..573ba4c9e0 100644
--- a/test/unit/org/apache/cassandra/journal/SegmentTest.java
+++ b/test/unit/org/apache/cassandra/journal/SegmentTest.java
@@ -203,26 +203,26 @@ public class SegmentTest
 
         activeSegment.close();
 
-        StaticSegment.SequentialReader<TimeUUID> reader = 
StaticSegment.reader(descriptor, TimeUUIDKeySupport.INSTANCE, 0);
+        StaticSegment.SequentialReader<TimeUUID> reader = 
StaticSegment.sequentialReader(descriptor, TimeUUIDKeySupport.INSTANCE, 0);
 
         // read all 4 entries sequentially and compare with originals
         assertTrue(reader.advance());
-        assertEquals(id1, reader.id());
+        assertEquals(id1, reader.key());
         assertEquals(hosts1, reader.hosts());
         assertEquals(record1, reader.record());
 
         assertTrue(reader.advance());
-        assertEquals(id2, reader.id());
+        assertEquals(id2, reader.key());
         assertEquals(hosts2, reader.hosts());
         assertEquals(record2, reader.record());
 
         assertTrue(reader.advance());
-        assertEquals(id3, reader.id());
+        assertEquals(id3, reader.key());
         assertEquals(hosts3, reader.hosts());
         assertEquals(record3, reader.record());
 
         assertTrue(reader.advance());
-        assertEquals(id4, reader.id());
+        assertEquals(id4, reader.key());
         assertEquals(hosts4, reader.hosts());
         assertEquals(record4, reader.record());
 
diff --git a/test/unit/org/apache/cassandra/journal/TestParams.java 
b/test/unit/org/apache/cassandra/journal/TestParams.java
index 9a9254ce9b..5773c4763a 100644
--- a/test/unit/org/apache/cassandra/journal/TestParams.java
+++ b/test/unit/org/apache/cassandra/journal/TestParams.java
@@ -41,6 +41,18 @@ public class TestParams implements Params
         return FlushMode.GROUP;
     }
 
+    @Override
+    public boolean enableCompaction()
+    {
+        return false;
+    }
+
+    @Override
+    public int compactionPeriodMillis()
+    {
+        return 60_000;
+    }
+
     @Override
     public int flushPeriodMillis()
     {
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index a2fd50d5f9..80a7b41769 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -398,7 +398,7 @@ public class AccordTestUtils
 
         if (new File(DatabaseDescriptor.getAccordJournalDirectory()).exists())
             
ServerTestUtils.cleanupDirectory(DatabaseDescriptor.getAccordJournalDirectory());
-        AccordJournal journal = new AccordJournal(null, new 
AccordSpec.JournalSpec());
+        AccordJournal journal = new 
AccordJournal(SimpleAccordEndpointMapper.INSTANCE, new 
AccordSpec.JournalSpec());
         journal.start(null);
 
         SingleEpochRanges holder = new 
SingleEpochRanges(topology.rangesForNode(node));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to