This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 3be0d93e9f Implement Nemesis compactor, improve partial compaction
3be0d93e9f is described below

commit 3be0d93e9fffdfa0bbd839c82dbc6e08f5022343
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Thu Mar 6 16:59:17 2025 +0100

    Implement Nemesis compactor, improve partial compaction
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for 
CASSANDRA-20424.
---
 modules/accord                                     |   2 +-
 ...or.java => AbstractAccordSegmentCompactor.java} | 139 +++++++++++--------
 .../cassandra/service/accord/AccordJournal.java    |  52 ++++---
 .../service/accord/AccordSegmentCompactor.java     | 150 +++-----------------
 .../accord/serializers/WaitingOnSerializer.java    |   1 +
 .../service/accord/AccordJournalBurnTest.java      | 154 ++++++++++++++++-----
 .../accord/NemesisAccordSegmentCompactor.java      | 105 ++++++++++++++
 .../accord/SimulatedAccordCommandStore.java        |   7 +-
 8 files changed, 361 insertions(+), 249 deletions(-)

diff --git a/modules/accord b/modules/accord
index fe306fc553..2025bcf1a6 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit fe306fc5539b40d1c9d49f9afd0ca45bb74c49d3
+Subproject commit 2025bcf1a6ec9f572177616fc5b6ee8714cf8f77
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java 
b/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java
similarity index 51%
copy from 
src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
copy to 
src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java
index 995ba7f9e9..0dd1ef5160 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java
@@ -28,11 +28,8 @@ import org.slf4j.LoggerFactory;
 import accord.utils.Invariants;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.PartitionUpdate.SimpleBuilder;
-import org.apache.cassandra.db.rows.EncodingStats;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -44,18 +41,30 @@ import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.Flyweig
 /**
  * Segment compactor: takes static segments and compacts them into a single 
SSTable.
  */
-public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, 
V>
+public abstract class AbstractAccordSegmentCompactor<V> implements 
SegmentCompactor<JournalKey, V>
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(AccordSegmentCompactor.class);
-    private final int userVersion;
-    private final ColumnFamilyStore cfs;
+    protected static final Logger logger = 
LoggerFactory.getLogger(AbstractAccordSegmentCompactor.class);
+    protected final int userVersion;
+    protected final ColumnFamilyStore cfs;
 
-    public AccordSegmentCompactor(int userVersion, ColumnFamilyStore cfs)
+    public AbstractAccordSegmentCompactor(int userVersion, ColumnFamilyStore 
cfs)
     {
         this.userVersion = userVersion;
         this.cfs = cfs;
     }
 
+    void switchPartitions() {}
+
+    boolean considerWritingKey()
+    {
+        return false;
+    }
+
+    abstract void initializeWriter();
+    abstract SSTableTxnWriter writer();
+    abstract void finishAndAddWriter();
+    abstract Throwable cleanupWriter(Throwable t);
+
     @Override
     public Collection<StaticSegment<JournalKey, V>> 
compact(Collection<StaticSegment<JournalKey, V>> segments)
     {
@@ -77,78 +86,86 @@ public class AccordSegmentCompactor<V> implements 
SegmentCompactor<JournalKey, V
         if (readers.isEmpty())
             return Collections.emptyList();
 
-        Descriptor descriptor = 
cfs.newSSTableDescriptor(cfs.getDirectories().getDirectoryForNewSSTables());
-        SerializationHeader header = new SerializationHeader(true, 
cfs.metadata(), cfs.metadata().regularAndStaticColumns(), 
EncodingStats.NO_STATS);
+        initializeWriter();
 
-        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, 
descriptor, 0, 0, null, false, header))
+        JournalKey key = null;
+        Object builder = null;
+        FlyweightSerializer<Object, Object> serializer = null;
+        long firstDescriptor = -1, lastDescriptor = -1;
+        int firstOffset = -1, lastOffset = -1;
+        try
         {
-            JournalKey key = null;
-            Object builder = null;
-            FlyweightSerializer<Object, Object> serializer = null;
-            long firstDescriptor = -1, lastDescriptor = -1;
-            int firstOffset = -1, lastOffset = -1;
-            try
+            KeyOrderReader<JournalKey> reader;
+            while ((reader = readers.poll()) != null)
             {
-                KeyOrderReader<JournalKey> reader;
-                while ((reader = readers.poll()) != null)
+                if (key == null || !reader.key().equals(key))
                 {
-                    if (key == null || !reader.key().equals(key))
-                    {
-                        maybeWritePartition(writer, key, builder, serializer, 
firstDescriptor, firstOffset);
+                    maybeWritePartition(key, builder, serializer, 
firstDescriptor, firstOffset);
+                    switchPartitions();
+                    key = reader.key();
+                    serializer = (FlyweightSerializer<Object, Object>) 
key.type.serializer;
+                    builder = serializer.mergerFor(key);
+                    firstDescriptor = lastDescriptor = -1;
+                    firstOffset = lastOffset = -1;
+                }
 
-                        key = reader.key();
-                        serializer = (FlyweightSerializer<Object, Object>) 
key.type.serializer;
+                boolean advanced;
+                do
+                {
+                    if (builder == null)
                         builder = serializer.mergerFor(key);
-                        firstDescriptor = lastDescriptor = -1;
-                        firstOffset = lastOffset = -1;
-                    }
 
-                    boolean advanced;
-                    do
+                    try (DataInputBuffer in = new 
DataInputBuffer(reader.record(), false))
                     {
-                        try (DataInputBuffer in = new 
DataInputBuffer(reader.record(), false))
+                        if (lastDescriptor != -1)
                         {
-                            if (lastDescriptor != -1)
-                            {
-                                Invariants.require(reader.descriptor.timestamp 
<= lastDescriptor,
-                                                      "Descriptors were 
accessed out of order: %d was accessed after %d", reader.descriptor.timestamp, 
lastDescriptor);
-                                Invariants.require(reader.descriptor.timestamp 
!= lastDescriptor ||
-                                                      reader.offset() < 
lastOffset,
-                                                      "Offsets were accessed 
out of order: %d was accessed after %s", reader.offset(), lastOffset);
-                            }
-                            serializer.deserialize(key, builder, in, 
reader.descriptor.userVersion);
-                            lastDescriptor = reader.descriptor.timestamp;
-                            lastOffset = reader.offset();
-                            if (firstDescriptor == -1)
-                            {
-                                firstDescriptor = lastDescriptor;
-                                firstOffset = lastOffset;
-                            }
+                            Invariants.require(reader.descriptor.timestamp <= 
lastDescriptor,
+                                               "Descriptors were accessed out 
of order: %d was accessed after %d", reader.descriptor.timestamp, 
lastDescriptor);
+                            Invariants.require(reader.descriptor.timestamp != 
lastDescriptor ||
+                                               reader.offset() < lastOffset,
+                                               "Offsets were accessed out of 
order: %d was accessed after %s", reader.offset(), lastOffset);
+                        }
+                        serializer.deserialize(key, builder, in, 
reader.descriptor.userVersion);
+                        lastDescriptor = reader.descriptor.timestamp;
+                        lastOffset = reader.offset();
+                        if (firstDescriptor == -1)
+                        {
+                            firstDescriptor = lastDescriptor;
+                            firstOffset = lastOffset;
                         }
                     }
-                    while ((advanced = reader.advance()) && 
reader.key().equals(key));
 
-                    if (advanced) readers.offer(reader); // there is more to 
this reader, but not with this key
-                    else reader.close();
+                    if (considerWritingKey())
+                    {
+                        maybeWritePartition(key, builder, serializer, 
firstDescriptor, firstOffset);
+                        builder = null;
+                        firstDescriptor = lastDescriptor = -1;
+                        firstOffset = lastOffset = -1;
+                    }
                 }
+                while ((advanced = reader.advance()) && 
reader.key().equals(key));
 
-                maybeWritePartition(writer, key, builder, serializer, 
firstDescriptor, firstOffset);
-            }
-            catch (Throwable t)
-            {
-                Throwable accumulate = writer.abort(t);
-                throw new RuntimeException(String.format("Caught exception 
while serializing. Last seen key: %s", key), accumulate);
+                if (advanced) readers.offer(reader); // there is more to this 
reader, but not with this key
+                else reader.close();
             }
 
-            cfs.addSSTables(writer.finish(true));
-            return Collections.emptyList();
+            maybeWritePartition(key, builder, serializer, firstDescriptor, 
firstOffset);
+            switchPartitions();
         }
+        catch (Throwable t)
+        {
+            t = cleanupWriter(t);
+            throw new RuntimeException(String.format("Caught exception while 
serializing. Last seen key: %s", key), t);
+        }
+
+        finishAndAddWriter();
+        return Collections.emptyList();
     }
 
     private JournalKey prevKey;
     private DecoratedKey prevDecoratedKey;
 
-    private void maybeWritePartition(SSTableTxnWriter writer, JournalKey key, 
Object builder, FlyweightSerializer<Object, Object> serializer, long 
descriptor, int offset) throws IOException
+    private void maybeWritePartition(JournalKey key, Object builder, 
FlyweightSerializer<Object, Object> serializer, long descriptor, int offset) 
throws IOException
     {
         if (builder != null)
         {
@@ -157,8 +174,8 @@ public class AccordSegmentCompactor<V> implements 
SegmentCompactor<JournalKey, V
             if (prevKey != null)
             {
                 
Invariants.requireArgument((decoratedKey.compareTo(prevDecoratedKey) >= 0 ? 1 : 
-1) == (JournalKey.SUPPORT.compare(key, prevKey) >= 0 ? 1 : -1),
-                                         String.format("Partition key and 
JournalKey didn't have matching order, which may imply a serialization 
issue.\n%s (%s)\n%s (%s)",
-                                                       key, decoratedKey, 
prevKey, prevDecoratedKey));
+                                           String.format("Partition key and 
JournalKey didn't have matching order, which may imply a serialization 
issue.\n%s (%s)\n%s (%s)",
+                                                         key, decoratedKey, 
prevKey, prevDecoratedKey));
             }
             prevKey = key;
             prevDecoratedKey = decoratedKey;
@@ -171,7 +188,7 @@ public class AccordSegmentCompactor<V> implements 
SegmentCompactor<JournalKey, V
                                 .add("record", out.asNewBuffer())
                                 .add("user_version", userVersion);
             }
-            writer.append(partitionBuilder.build().unfilteredIterator());
+            writer().append(partitionBuilder.build().unfilteredIterator());
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index c8f5387e31..f856e99ec3 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.journal.Compactor;
 import org.apache.cassandra.journal.Journal;
 import org.apache.cassandra.journal.Params;
 import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.SegmentCompactor;
 import org.apache.cassandra.journal.StaticSegment;
 import org.apache.cassandra.journal.ValueSerializer;
 import org.apache.cassandra.net.MessagingService;
@@ -78,7 +79,9 @@ import 
org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.ExecutorUtils;
 
+import static accord.impl.CommandChange.Field.CLEANUP;
 import static accord.impl.CommandChange.anyFieldChanged;
+import static accord.impl.CommandChange.describeFlags;
 import static accord.impl.CommandChange.getFlags;
 import static accord.impl.CommandChange.isChanged;
 import static accord.impl.CommandChange.isNull;
@@ -100,8 +103,10 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
 
     static final ThreadLocal<byte[]> keyCRCBytes = ThreadLocal.withInitial(() 
-> new byte[JournalKeySupport.TOTAL_SIZE]);
 
-    private final Journal<JournalKey, Object> journal;
-    private final AccordJournalTable<JournalKey, Object> journalTable;
+    @VisibleForTesting
+    protected final Journal<JournalKey, Object> journal;
+    @VisibleForTesting
+    protected final AccordJournalTable<JournalKey, Object> journalTable;
     private final Params params;
     private final AccordAgent agent;
     Node node;
@@ -118,18 +123,6 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
     public AccordJournal(Params params, AccordAgent agent, File directory, 
ColumnFamilyStore cfs)
     {
         this.agent = agent;
-        AccordSegmentCompactor<Object> compactor = new 
AccordSegmentCompactor<>(params.userVersion(), cfs) {
-            @Nullable
-            @Override
-            public Collection<StaticSegment<JournalKey, Object>> 
compact(Collection<StaticSegment<JournalKey, Object>> staticSegments)
-            {
-                if (journalTable == null)
-                    throw new IllegalStateException("Unsafe access to 
AccordJournal during <init>; journalTable was touched before it was published");
-                Collection<StaticSegment<JournalKey, Object>> result = 
super.compact(staticSegments);
-                journalTable.safeNotify(index -> index.remove(staticSegments));
-                return result;
-            }
-        };
         this.journal = new Journal<>("AccordJournal", directory, params, 
JournalKey.SUPPORT,
                                      // In Accord, we are using streaming 
serialization, i.e. Reader/Writer interfaces instead of materializing objects
                                      new ValueSerializer<>()
@@ -146,11 +139,27 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                                              throw new 
UnsupportedOperationException();
                                          }
                                      },
-                                     compactor);
+                                     compactor(cfs, params));
         this.journalTable = new AccordJournalTable<>(journal, 
JournalKey.SUPPORT, cfs, params.userVersion());
         this.params = params;
     }
 
+    protected SegmentCompactor<JournalKey, Object> compactor(ColumnFamilyStore 
cfs, Params params)
+    {
+        return new AccordSegmentCompactor<>(params.userVersion(), cfs) {
+            @Nullable
+            @Override
+            public Collection<StaticSegment<JournalKey, Object>> 
compact(Collection<StaticSegment<JournalKey, Object>> staticSegments)
+            {
+                if (journalTable == null)
+                    throw new IllegalStateException("Unsafe access to 
AccordJournal during <init>; journalTable was touched before it was published");
+                Collection<StaticSegment<JournalKey, Object>> result = 
super.compact(staticSegments);
+                journalTable.safeNotify(index -> index.remove(staticSegments));
+                return result;
+            }
+        };
+    }
+
     @VisibleForTesting
     public int inMemorySize()
     {
@@ -594,6 +603,12 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         {
             return hasField(Field.PARTICIPANTS);
         }
+
+        @Override
+        public String toString()
+        {
+            return after.saveStatus() + " " + describeFlags(flags);
+        }
     }
 
     public static class Builder extends CommandChange.Builder
@@ -641,7 +656,9 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
                 switch (field)
                 {
                     default: throw new UnhandledEnum(field);
-                    case CLEANUP: throw UnhandledEnum.invalid(field);
+                    case CLEANUP:
+                        out.writeByte(cleanup.ordinal());
+                        break;
                     case EXECUTE_AT:
                         Invariants.require(txnId != null);
                         Invariants.require(executeAt != null);
@@ -715,7 +732,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
             {
                 // Since we are iterating in reverse order, we skip the fields 
that were
                 // set by entries writter later (i.e. already read ones).
-                if (isChanged(field, flags))
+                if (isChanged(field, flags) && field != CLEANUP)
                     skip(txnId, field, in, userVersion);
                 else
                     deserialize(field, in, userVersion);
@@ -780,6 +797,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
         {
             switch (field)
             {
+                default: throw new UnhandledEnum(field);
                 case EXECUTE_AT:
                     ExecuteAtSerializer.skip(txnId, in);
                     break;
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java 
b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
index 995ba7f9e9..68a41489e3 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java
@@ -17,162 +17,50 @@
  */
 package org.apache.cassandra.service.accord;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.PriorityQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.utils.Invariants;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.partitions.PartitionUpdate.SimpleBuilder;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.journal.SegmentCompactor;
-import org.apache.cassandra.journal.StaticSegment;
-import org.apache.cassandra.journal.StaticSegment.KeyOrderReader;
-import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer;
 
 /**
  * Segment compactor: takes static segments and compacts them into a single 
SSTable.
  */
-public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, 
V>
+public class AccordSegmentCompactor<V> extends 
AbstractAccordSegmentCompactor<V>
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(AccordSegmentCompactor.class);
-    private final int userVersion;
-    private final ColumnFamilyStore cfs;
+    private SSTableTxnWriter writer;
 
     public AccordSegmentCompactor(int userVersion, ColumnFamilyStore cfs)
     {
-        this.userVersion = userVersion;
-        this.cfs = cfs;
+        super(userVersion, cfs);
     }
 
     @Override
-    public Collection<StaticSegment<JournalKey, V>> 
compact(Collection<StaticSegment<JournalKey, V>> segments)
+    void initializeWriter()
     {
-        Invariants.require(segments.size() >= 2, () -> String.format("Can only 
compact 2 or more segments, but got %d", segments.size()));
-        logger.info("Compacting {} static segments: {}", segments.size(), 
segments);
-
-        PriorityQueue<KeyOrderReader<JournalKey>> readers = new 
PriorityQueue<>();
-        for (StaticSegment<JournalKey, V> segment : segments)
-        {
-            KeyOrderReader<JournalKey> reader = segment.keyOrderReader();
-            if (reader.advance())
-                readers.add(reader);
-            else
-                reader.close();
-        }
-
-        // nothing to compact (all segments empty, should never happen, but it 
is theoretically possible?) - exit early
-        // TODO: investigate how this comes to be, check if there is a cleanup 
issue
-        if (readers.isEmpty())
-            return Collections.emptyList();
-
         Descriptor descriptor = 
cfs.newSSTableDescriptor(cfs.getDirectories().getDirectoryForNewSSTables());
         SerializationHeader header = new SerializationHeader(true, 
cfs.metadata(), cfs.metadata().regularAndStaticColumns(), 
EncodingStats.NO_STATS);
 
-        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, 
descriptor, 0, 0, null, false, header))
-        {
-            JournalKey key = null;
-            Object builder = null;
-            FlyweightSerializer<Object, Object> serializer = null;
-            long firstDescriptor = -1, lastDescriptor = -1;
-            int firstOffset = -1, lastOffset = -1;
-            try
-            {
-                KeyOrderReader<JournalKey> reader;
-                while ((reader = readers.poll()) != null)
-                {
-                    if (key == null || !reader.key().equals(key))
-                    {
-                        maybeWritePartition(writer, key, builder, serializer, 
firstDescriptor, firstOffset);
-
-                        key = reader.key();
-                        serializer = (FlyweightSerializer<Object, Object>) 
key.type.serializer;
-                        builder = serializer.mergerFor(key);
-                        firstDescriptor = lastDescriptor = -1;
-                        firstOffset = lastOffset = -1;
-                    }
-
-                    boolean advanced;
-                    do
-                    {
-                        try (DataInputBuffer in = new 
DataInputBuffer(reader.record(), false))
-                        {
-                            if (lastDescriptor != -1)
-                            {
-                                Invariants.require(reader.descriptor.timestamp 
<= lastDescriptor,
-                                                      "Descriptors were 
accessed out of order: %d was accessed after %d", reader.descriptor.timestamp, 
lastDescriptor);
-                                Invariants.require(reader.descriptor.timestamp 
!= lastDescriptor ||
-                                                      reader.offset() < 
lastOffset,
-                                                      "Offsets were accessed 
out of order: %d was accessed after %s", reader.offset(), lastOffset);
-                            }
-                            serializer.deserialize(key, builder, in, 
reader.descriptor.userVersion);
-                            lastDescriptor = reader.descriptor.timestamp;
-                            lastOffset = reader.offset();
-                            if (firstDescriptor == -1)
-                            {
-                                firstDescriptor = lastDescriptor;
-                                firstOffset = lastOffset;
-                            }
-                        }
-                    }
-                    while ((advanced = reader.advance()) && 
reader.key().equals(key));
-
-                    if (advanced) readers.offer(reader); // there is more to 
this reader, but not with this key
-                    else reader.close();
-                }
-
-                maybeWritePartition(writer, key, builder, serializer, 
firstDescriptor, firstOffset);
-            }
-            catch (Throwable t)
-            {
-                Throwable accumulate = writer.abort(t);
-                throw new RuntimeException(String.format("Caught exception 
while serializing. Last seen key: %s", key), accumulate);
-            }
-
-            cfs.addSSTables(writer.finish(true));
-            return Collections.emptyList();
-        }
+        this.writer = SSTableTxnWriter.create(cfs, descriptor, 0, 0, null, 
false, header);
     }
 
-    private JournalKey prevKey;
-    private DecoratedKey prevDecoratedKey;
-
-    private void maybeWritePartition(SSTableTxnWriter writer, JournalKey key, 
Object builder, FlyweightSerializer<Object, Object> serializer, long 
descriptor, int offset) throws IOException
+    @Override
+    SSTableTxnWriter writer()
     {
-        if (builder != null)
-        {
-            DecoratedKey decoratedKey = 
AccordKeyspace.JournalColumns.decorate(key);
+        return writer;
+    }
 
-            if (prevKey != null)
-            {
-                
Invariants.requireArgument((decoratedKey.compareTo(prevDecoratedKey) >= 0 ? 1 : 
-1) == (JournalKey.SUPPORT.compare(key, prevKey) >= 0 ? 1 : -1),
-                                         String.format("Partition key and 
JournalKey didn't have matching order, which may imply a serialization 
issue.\n%s (%s)\n%s (%s)",
-                                                       key, decoratedKey, 
prevKey, prevDecoratedKey));
-            }
-            prevKey = key;
-            prevDecoratedKey = decoratedKey;
+    @Override
+    void finishAndAddWriter()
+    {
+        cfs.addSSTables(writer.finish(true));
+        writer.close();
+    }
 
-            SimpleBuilder partitionBuilder = 
PartitionUpdate.simpleBuilder(cfs.metadata(), decoratedKey);
-            try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
-            {
-                serializer.reserialize(key, builder, out, userVersion);
-                partitionBuilder.row(descriptor, offset)
-                                .add("record", out.asNewBuffer())
-                                .add("user_version", userVersion);
-            }
-            writer.append(partitionBuilder.build().unfilteredIterator());
-        }
+    @Override
+    Throwable cleanupWriter(Throwable t)
+    {
+        return writer.abort(t);
     }
 }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
index 4236e7fbc6..2af8fd3b39 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
@@ -73,6 +73,7 @@ public class WaitingOnSerializer
         @Override
         public WaitingOn provide(TxnId txnId, PartialDeps deps, Timestamp 
executeAtLeast, long uniqueHlc)
         {
+            Invariants.nonNull(deps);
             RoutingKeys keys = deps.keyDeps.keys();
             RangeDeps directRangeDeps = deps.rangeDeps;
             int txnIdCount = directRangeDeps.txnIdCount();
diff --git 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
index ee963915f2..6c983eea0c 100644
--- 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
+++ 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
@@ -18,11 +18,15 @@
 
 package org.apache.cassandra.service.accord;
 
+import java.io.IOException;
 import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 import org.junit.Before;
@@ -37,13 +41,27 @@ import accord.impl.basic.Cluster;
 import accord.impl.basic.RandomDelayQueue;
 import accord.local.CommandStores;
 import accord.local.Node;
+import accord.primitives.EpochSupplier;
 import accord.utils.DefaultRandom;
 import accord.utils.RandomSource;
 import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.CompactionIterator;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.Params;
+import org.apache.cassandra.journal.SegmentCompactor;
+import org.apache.cassandra.journal.StaticSegment;
 import org.apache.cassandra.journal.TestParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
@@ -56,9 +74,9 @@ import 
org.apache.cassandra.service.accord.serializers.KeySerializers;
 import org.apache.cassandra.service.accord.serializers.ResultSerializers;
 import org.apache.cassandra.service.accord.serializers.TopologySerializers;
 import org.apache.cassandra.tools.FieldUtil;
-import org.apache.cassandra.utils.concurrent.Condition;
 
 import static accord.impl.PrefixedIntHashKey.ranges;
+import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
 
 public class AccordJournalBurnTest extends BurnTestBase
 {
@@ -116,11 +134,11 @@ public class AccordJournalBurnTest extends BurnTestBase
             List<Node.Id> clients = generateIds(true, 1 + random.nextInt(4));
             int rf;
             float chance = random.nextFloat();
-            if (chance < 0.2f)      { rf = random.nextInt(2, 9); }
-            else if (chance < 0.4f) { rf = 3; }
-            else if (chance < 0.7f) { rf = 5; }
-            else if (chance < 0.8f) { rf = 7; }
-            else                    { rf = 9; }
+            if (chance < 0.2f) rf = random.nextInt(2, 9);
+            else if (chance < 0.4f) rf = 3;
+            else if (chance < 0.7f) rf = 5;
+            else if (chance < 0.8f) rf = 7;
+            else rf = 9;
 
             List<Node.Id> nodes = generateIds(false, random.nextInt(rf, rf * 
3));
 
@@ -135,6 +153,7 @@ public class AccordJournalBurnTest extends BurnTestBase
                 AccordKeyspace.TABLES = Tables.of(metadatas);
                 setUp();
             }
+
             Keyspace ks = Schema.instance.getKeyspaceInstance("system_accord");
 
             burn(random, new TopologyFactory(rf, ranges(0, HASH_RANGE_START, 
HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1, rf), nodes.size() * 
3))),
@@ -145,7 +164,7 @@ public class AccordJournalBurnTest extends BurnTestBase
                  operations,
                  10 + random.nextInt(30),
                  new RandomDelayQueue.Factory(random).get(),
-                 (node, agent) -> {
+                 (node, agent, randomSource) -> {
                      try
                      {
                          File directory = new 
File(Files.createTempDirectory(Integer.toString(counter.incrementAndGet())));
@@ -154,37 +173,18 @@ public class AccordJournalBurnTest extends BurnTestBase
                          cfs.disableAutoCompaction();
                          AccordJournal journal = new AccordJournal(new 
TestParams()
                          {
-                             @Override
-                             public FlushMode flushMode()
-                             {
-                                 return FlushMode.PERIODIC;
-                             }
-
-                             @Override
-                             public long flushPeriod(TimeUnit units)
-                             {
-                                 return 1;
-                             }
-
                              @Override
                              public int segmentSize()
                              {
                                  return 32 * 1024 * 1024;
                              }
-
-                             @Override
-                             public boolean enableCompaction()
-                             {
-                                 return false;
-                             }
                          }, new AccordAgent(), directory, cfs)
                          {
                              @Override
                              public void saveCommand(int store, CommandUpdate 
update, @Nullable Runnable onFlush)
                              {
-                                 Condition condition = 
Condition.newOneTimeCondition();
-                                 super.saveCommand(store, update, 
condition::signal);
-                                 condition.awaitUninterruptibly();
+                                 // For the purpose of this test, we do not 
have to wait for flush, since we do not test durability and are using mmap
+                                 super.saveCommand(store, update, () -> {});
                                  if (onFlush != null)
                                      onFlush.run();
                              }
@@ -192,21 +192,105 @@ public class AccordJournalBurnTest extends BurnTestBase
                              @Override
                              public void saveStoreState(int store, 
FieldUpdates fieldUpdates, @Nullable Runnable onFlush)
                              {
-                                 Condition condition = 
Condition.newOneTimeCondition();
-                                 super.saveStoreState(store, fieldUpdates, 
condition::signal);
+                                 super.saveStoreState(store, fieldUpdates, () 
-> {});
                                  if (onFlush != null)
                                      onFlush.run();
                              }
 
                              @Override
-                             public void saveTopology(TopologyUpdate 
topologyUpdate, @Nullable Runnable onFlush)
+                             public void saveTopology(TopologyUpdate 
topologyUpdate, Runnable onFlush)
                              {
-                                 Condition condition = 
Condition.newOneTimeCondition();
-                                 super.saveTopology(topologyUpdate, 
condition::signal);
+                                 super.saveTopology(topologyUpdate, () -> {});
                                  if (onFlush != null)
                                      onFlush.run();
                              }
 
+                             @Override
+                             protected SegmentCompactor<JournalKey, Object> 
compactor(ColumnFamilyStore cfs, Params params)
+                             {
+                                 return new 
NemesisAccordSegmentCompactor<>(params.userVersion(), cfs, randomSource.fork())
+                                 {
+                                     @Nullable
+                                     @Override
+                                     public 
Collection<StaticSegment<JournalKey, Object>> 
compact(Collection<StaticSegment<JournalKey, Object>> staticSegments)
+                                     {
+                                         if (journalTable == null)
+                                             throw new 
IllegalStateException("Unsafe access to AccordJournal during <init>; 
journalTable was touched before it was published");
+                                         Collection<StaticSegment<JournalKey, 
Object>> result = super.compact(staticSegments);
+                                         journalTable.safeNotify(index -> 
index.remove(staticSegments));
+                                         return result;
+                                     }
+                                 };
+                             }
+
+                             private CompactionAwareWriter 
getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                                               
     Directories directories,
+                                                                               
     LifecycleTransaction transaction,
+                                                                               
     Set<SSTableReader> nonExpiredSSTables)
+                             {
+                                 return new DefaultCompactionWriter(cfs, 
directories, transaction, nonExpiredSSTables, false, 0);
+                             }
+
+                             @Override
+                             public void purge(CommandStores commandStores, 
EpochSupplier minEpoch)
+                             {
+                                 
this.journal.closeCurrentSegmentForTestingIfNonEmpty();
+                                 this.journal.runCompactorForTesting();
+
+                                 List<SSTableReader> all = new 
ArrayList<>(cfs.getLiveSSTables());
+                                 if (all.size() <= 1)
+                                     return;
+
+                                 Set<SSTableReader> sstables = new HashSet<>();
+
+                                 int min, max;
+                                 while (true)
+                                 {
+                                     int tmp1 = randomSource.nextInt(0, 
all.size());
+                                     int tmp2 = randomSource.nextInt(0, 
all.size());
+                                     if (tmp1 != tmp2 && Math.abs(tmp1 - tmp2) 
>= 1)
+                                     {
+                                         min = Math.min(tmp1, tmp2);
+                                         max = Math.max(tmp1, tmp2);
+                                         break;
+                                     }
+                                 }
+                                 // Random subset
+                                 for (int i = min; i < max; i++)
+                                     sstables.add(all.get(i));
+
+                                 List<ISSTableScanner> scanners = 
sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
+
+                                 Collection<SSTableReader> newSStables;
+
+                                 try (LifecycleTransaction txn = 
cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+                                      CompactionController controller = new 
CompactionController(cfs, sstables, 0);
+                                      CompactionIterator ci = new 
CompactionIterator(OperationType.COMPACTION, scanners, controller, 0, 
nextTimeUUID()))
+                                 {
+                                     
CompactionManager.instance.active.beginCompaction(ci);
+                                     try (CompactionAwareWriter writer = 
getCompactionAwareWriter(cfs, cfs.getDirectories(), txn, sstables))
+                                     {
+                                         while (ci.hasNext())
+                                         {
+                                             writer.append(ci.next());
+                                             
ci.setTargetDirectory(writer.getSStableDirectory().path());
+                                         }
+
+                                         // point of no return
+                                         newSStables = writer.finish();
+                                     }
+                                     catch (IOException e)
+                                     {
+                                         throw new RuntimeException(e);
+                                     }
+                                     finally
+                                     {
+                                         
CompactionManager.instance.active.finishCompaction(ci);
+                                     }
+                                 }
+                             }
+
+
                              @Override
                              public void replay(CommandStores commandStores)
                              {
diff --git 
a/test/distributed/org/apache/cassandra/service/accord/NemesisAccordSegmentCompactor.java
 
b/test/distributed/org/apache/cassandra/service/accord/NemesisAccordSegmentCompactor.java
new file mode 100644
index 0000000000..12f8554df7
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/service/accord/NemesisAccordSegmentCompactor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import accord.utils.RandomSource;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
+
+/**
+ * Nemesis compactor: a compactor that will distribute your keys over a 
large(r) number of SSTables.
+ *
+ * For testing purposes only.
+ */
+public class NemesisAccordSegmentCompactor<V> extends 
AbstractAccordSegmentCompactor<V>
+{
+    private final RandomSource randomSource;
+    private final SSTableTxnWriter[] writers;
+    private final Set<SSTableTxnWriter> written = new HashSet<>();
+
+    public NemesisAccordSegmentCompactor(int userVersion, ColumnFamilyStore 
cfs, RandomSource randomSource)
+    {
+        super(userVersion, cfs);
+        this.randomSource = randomSource;
+        this.writers = new SSTableTxnWriter[randomSource.nextInt(2, 10)];
+    }
+
+    @Override
+    boolean considerWritingKey()
+    {
+        if (written.size() == writers.length - 1)
+            return false;
+        return randomSource.nextBoolean();
+    }
+
+    @Override
+    void switchPartitions()
+    {
+        written.clear();
+    }
+
+    @Override
+    void initializeWriter()
+    {
+        for (int i = 0; i < writers.length; i++)
+        {
+            Descriptor descriptor = 
cfs.newSSTableDescriptor(cfs.getDirectories().getDirectoryForNewSSTables());
+            SerializationHeader header = new SerializationHeader(true, 
cfs.metadata(), cfs.metadata().regularAndStaticColumns(), 
EncodingStats.NO_STATS);
+            writers[i] = SSTableTxnWriter.create(cfs, descriptor, 0, 0, null, 
false, header);
+        }
+    }
+
+    @Override
+    SSTableTxnWriter writer()
+    {
+        for (int i = 0; i < 10_000; i++)
+        {
+            SSTableTxnWriter writer = 
writers[randomSource.nextInt(writers.length)];
+            if (written.add(writer))
+                return writer;
+        }
+        throw new IllegalStateException(String.format("Could not pick an 
sstable from %s. Written: %s", Arrays.asList(writers), written));
+    }
+
+    @Override
+    void finishAndAddWriter()
+    {
+        for (SSTableTxnWriter writer : writers)
+        {
+            cfs.addSSTables(writer.finish(true));
+            writer.close();
+        }
+        Arrays.fill(writers, null);
+    }
+
+    @Override
+    Throwable cleanupWriter(Throwable t)
+    {
+        for (SSTableTxnWriter writer : writers)
+            t = writer.abort(t);
+        return t;
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index d8c09544db..9a68d127ce 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -29,7 +29,6 @@ import java.util.function.BooleanSupplier;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.ToLongFunction;
-
 import javax.annotation.Nullable;
 
 import accord.api.Agent;
@@ -235,7 +234,7 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
             }
         };
 
-        this.journal = new DefaultJournal(nodeId, agent);
+        this.journal = new DefaultJournal(nodeId, agent, rs.fork());
         this.commandStore = new AccordCommandStore(0,
                                                    storeService,
                                                    agent,
@@ -466,9 +465,9 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
     private static class DefaultJournal extends InMemoryJournal implements 
RangeSearcher.Supplier
     {
         private final RouteInMemoryIndex<?> index = new RouteInMemoryIndex<>();
-        private DefaultJournal(Node.Id id, Agent agent)
+        private DefaultJournal(Node.Id id, Agent agent, RandomSource rs)
         {
-            super(id, agent);
+            super(id, agent, rs);
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to