This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new 3be0d93e9f Implement Nemesis compactor, improve partial compaction 3be0d93e9f is described below commit 3be0d93e9fffdfa0bbd839c82dbc6e08f5022343 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Thu Mar 6 16:59:17 2025 +0100 Implement Nemesis compactor, improve partial compaction Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20424. --- modules/accord | 2 +- ...or.java => AbstractAccordSegmentCompactor.java} | 139 +++++++++++-------- .../cassandra/service/accord/AccordJournal.java | 52 ++++--- .../service/accord/AccordSegmentCompactor.java | 150 +++----------------- .../accord/serializers/WaitingOnSerializer.java | 1 + .../service/accord/AccordJournalBurnTest.java | 154 ++++++++++++++++----- .../accord/NemesisAccordSegmentCompactor.java | 105 ++++++++++++++ .../accord/SimulatedAccordCommandStore.java | 7 +- 8 files changed, 361 insertions(+), 249 deletions(-) diff --git a/modules/accord b/modules/accord index fe306fc553..2025bcf1a6 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit fe306fc5539b40d1c9d49f9afd0ca45bb74c49d3 +Subproject commit 2025bcf1a6ec9f572177616fc5b6ee8714cf8f77 diff --git a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java b/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java similarity index 51% copy from src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java copy to src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java index 995ba7f9e9..0dd1ef5160 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java +++ b/src/java/org/apache/cassandra/service/accord/AbstractAccordSegmentCompactor.java @@ -28,11 +28,8 @@ import org.slf4j.LoggerFactory; import accord.utils.Invariants; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.PartitionUpdate.SimpleBuilder; -import org.apache.cassandra.db.rows.EncodingStats; -import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableTxnWriter; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; @@ -44,18 +41,30 @@ import org.apache.cassandra.service.accord.AccordJournalValueSerializers.Flyweig /** * Segment compactor: takes static segments and compacts them into a single SSTable. */ -public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V> +public abstract class AbstractAccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V> { - private static final Logger logger = LoggerFactory.getLogger(AccordSegmentCompactor.class); - private final int userVersion; - private final ColumnFamilyStore cfs; + protected static final Logger logger = LoggerFactory.getLogger(AbstractAccordSegmentCompactor.class); + protected final int userVersion; + protected final ColumnFamilyStore cfs; - public AccordSegmentCompactor(int userVersion, ColumnFamilyStore cfs) + public AbstractAccordSegmentCompactor(int userVersion, ColumnFamilyStore cfs) { this.userVersion = userVersion; this.cfs = cfs; } + void switchPartitions() {} + + boolean considerWritingKey() + { + return false; + } + + abstract void initializeWriter(); + abstract SSTableTxnWriter writer(); + abstract void finishAndAddWriter(); + abstract Throwable cleanupWriter(Throwable t); + @Override public Collection<StaticSegment<JournalKey, V>> compact(Collection<StaticSegment<JournalKey, V>> segments) { @@ -77,78 +86,86 @@ public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V if (readers.isEmpty()) return Collections.emptyList(); - Descriptor descriptor = cfs.newSSTableDescriptor(cfs.getDirectories().getDirectoryForNewSSTables()); - SerializationHeader header = new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS); + initializeWriter(); - try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, descriptor, 0, 0, null, false, header)) + JournalKey key = null; + Object builder = null; + FlyweightSerializer<Object, Object> serializer = null; + long firstDescriptor = -1, lastDescriptor = -1; + int firstOffset = -1, lastOffset = -1; + try { - JournalKey key = null; - Object builder = null; - FlyweightSerializer<Object, Object> serializer = null; - long firstDescriptor = -1, lastDescriptor = -1; - int firstOffset = -1, lastOffset = -1; - try + KeyOrderReader<JournalKey> reader; + while ((reader = readers.poll()) != null) { - KeyOrderReader<JournalKey> reader; - while ((reader = readers.poll()) != null) + if (key == null || !reader.key().equals(key)) { - if (key == null || !reader.key().equals(key)) - { - maybeWritePartition(writer, key, builder, serializer, firstDescriptor, firstOffset); + maybeWritePartition(key, builder, serializer, firstDescriptor, firstOffset); + switchPartitions(); + key = reader.key(); + serializer = (FlyweightSerializer<Object, Object>) key.type.serializer; + builder = serializer.mergerFor(key); + firstDescriptor = lastDescriptor = -1; + firstOffset = lastOffset = -1; + } - key = reader.key(); - serializer = (FlyweightSerializer<Object, Object>) key.type.serializer; + boolean advanced; + do + { + if (builder == null) builder = serializer.mergerFor(key); - firstDescriptor = lastDescriptor = -1; - firstOffset = lastOffset = -1; - } - boolean advanced; - do + try (DataInputBuffer in = new DataInputBuffer(reader.record(), false)) { - try (DataInputBuffer in = new DataInputBuffer(reader.record(), false)) + if (lastDescriptor != -1) { - if (lastDescriptor != -1) - { - Invariants.require(reader.descriptor.timestamp <= lastDescriptor, - "Descriptors were accessed out of order: %d was accessed after %d", reader.descriptor.timestamp, lastDescriptor); - Invariants.require(reader.descriptor.timestamp != lastDescriptor || - reader.offset() < lastOffset, - "Offsets were accessed out of order: %d was accessed after %s", reader.offset(), lastOffset); - } - serializer.deserialize(key, builder, in, reader.descriptor.userVersion); - lastDescriptor = reader.descriptor.timestamp; - lastOffset = reader.offset(); - if (firstDescriptor == -1) - { - firstDescriptor = lastDescriptor; - firstOffset = lastOffset; - } + Invariants.require(reader.descriptor.timestamp <= lastDescriptor, + "Descriptors were accessed out of order: %d was accessed after %d", reader.descriptor.timestamp, lastDescriptor); + Invariants.require(reader.descriptor.timestamp != lastDescriptor || + reader.offset() < lastOffset, + "Offsets were accessed out of order: %d was accessed after %s", reader.offset(), lastOffset); + } + serializer.deserialize(key, builder, in, reader.descriptor.userVersion); + lastDescriptor = reader.descriptor.timestamp; + lastOffset = reader.offset(); + if (firstDescriptor == -1) + { + firstDescriptor = lastDescriptor; + firstOffset = lastOffset; } } - while ((advanced = reader.advance()) && reader.key().equals(key)); - if (advanced) readers.offer(reader); // there is more to this reader, but not with this key - else reader.close(); + if (considerWritingKey()) + { + maybeWritePartition(key, builder, serializer, firstDescriptor, firstOffset); + builder = null; + firstDescriptor = lastDescriptor = -1; + firstOffset = lastOffset = -1; + } } + while ((advanced = reader.advance()) && reader.key().equals(key)); - maybeWritePartition(writer, key, builder, serializer, firstDescriptor, firstOffset); - } - catch (Throwable t) - { - Throwable accumulate = writer.abort(t); - throw new RuntimeException(String.format("Caught exception while serializing. Last seen key: %s", key), accumulate); + if (advanced) readers.offer(reader); // there is more to this reader, but not with this key + else reader.close(); } - cfs.addSSTables(writer.finish(true)); - return Collections.emptyList(); + maybeWritePartition(key, builder, serializer, firstDescriptor, firstOffset); + switchPartitions(); } + catch (Throwable t) + { + t = cleanupWriter(t); + throw new RuntimeException(String.format("Caught exception while serializing. Last seen key: %s", key), t); + } + + finishAndAddWriter(); + return Collections.emptyList(); } private JournalKey prevKey; private DecoratedKey prevDecoratedKey; - private void maybeWritePartition(SSTableTxnWriter writer, JournalKey key, Object builder, FlyweightSerializer<Object, Object> serializer, long descriptor, int offset) throws IOException + private void maybeWritePartition(JournalKey key, Object builder, FlyweightSerializer<Object, Object> serializer, long descriptor, int offset) throws IOException { if (builder != null) { @@ -157,8 +174,8 @@ public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V if (prevKey != null) { Invariants.requireArgument((decoratedKey.compareTo(prevDecoratedKey) >= 0 ? 1 : -1) == (JournalKey.SUPPORT.compare(key, prevKey) >= 0 ? 1 : -1), - String.format("Partition key and JournalKey didn't have matching order, which may imply a serialization issue.\n%s (%s)\n%s (%s)", - key, decoratedKey, prevKey, prevDecoratedKey)); + String.format("Partition key and JournalKey didn't have matching order, which may imply a serialization issue.\n%s (%s)\n%s (%s)", + key, decoratedKey, prevKey, prevDecoratedKey)); } prevKey = key; prevDecoratedKey = decoratedKey; @@ -171,7 +188,7 @@ public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V .add("record", out.asNewBuffer()) .add("user_version", userVersion); } - writer.append(partitionBuilder.build().unfilteredIterator()); + writer().append(partitionBuilder.build().unfilteredIterator()); } } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index c8f5387e31..f856e99ec3 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -63,6 +63,7 @@ import org.apache.cassandra.journal.Compactor; import org.apache.cassandra.journal.Journal; import org.apache.cassandra.journal.Params; import org.apache.cassandra.journal.RecordPointer; +import org.apache.cassandra.journal.SegmentCompactor; import org.apache.cassandra.journal.StaticSegment; import org.apache.cassandra.journal.ValueSerializer; import org.apache.cassandra.net.MessagingService; @@ -78,7 +79,9 @@ import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.ExecutorUtils; +import static accord.impl.CommandChange.Field.CLEANUP; import static accord.impl.CommandChange.anyFieldChanged; +import static accord.impl.CommandChange.describeFlags; import static accord.impl.CommandChange.getFlags; import static accord.impl.CommandChange.isChanged; import static accord.impl.CommandChange.isNull; @@ -100,8 +103,10 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier static final ThreadLocal<byte[]> keyCRCBytes = ThreadLocal.withInitial(() -> new byte[JournalKeySupport.TOTAL_SIZE]); - private final Journal<JournalKey, Object> journal; - private final AccordJournalTable<JournalKey, Object> journalTable; + @VisibleForTesting + protected final Journal<JournalKey, Object> journal; + @VisibleForTesting + protected final AccordJournalTable<JournalKey, Object> journalTable; private final Params params; private final AccordAgent agent; Node node; @@ -118,18 +123,6 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier public AccordJournal(Params params, AccordAgent agent, File directory, ColumnFamilyStore cfs) { this.agent = agent; - AccordSegmentCompactor<Object> compactor = new AccordSegmentCompactor<>(params.userVersion(), cfs) { - @Nullable - @Override - public Collection<StaticSegment<JournalKey, Object>> compact(Collection<StaticSegment<JournalKey, Object>> staticSegments) - { - if (journalTable == null) - throw new IllegalStateException("Unsafe access to AccordJournal during <init>; journalTable was touched before it was published"); - Collection<StaticSegment<JournalKey, Object>> result = super.compact(staticSegments); - journalTable.safeNotify(index -> index.remove(staticSegments)); - return result; - } - }; this.journal = new Journal<>("AccordJournal", directory, params, JournalKey.SUPPORT, // In Accord, we are using streaming serialization, i.e. Reader/Writer interfaces instead of materializing objects new ValueSerializer<>() @@ -146,11 +139,27 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier throw new UnsupportedOperationException(); } }, - compactor); + compactor(cfs, params)); this.journalTable = new AccordJournalTable<>(journal, JournalKey.SUPPORT, cfs, params.userVersion()); this.params = params; } + protected SegmentCompactor<JournalKey, Object> compactor(ColumnFamilyStore cfs, Params params) + { + return new AccordSegmentCompactor<>(params.userVersion(), cfs) { + @Nullable + @Override + public Collection<StaticSegment<JournalKey, Object>> compact(Collection<StaticSegment<JournalKey, Object>> staticSegments) + { + if (journalTable == null) + throw new IllegalStateException("Unsafe access to AccordJournal during <init>; journalTable was touched before it was published"); + Collection<StaticSegment<JournalKey, Object>> result = super.compact(staticSegments); + journalTable.safeNotify(index -> index.remove(staticSegments)); + return result; + } + }; + } + @VisibleForTesting public int inMemorySize() { @@ -594,6 +603,12 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier { return hasField(Field.PARTICIPANTS); } + + @Override + public String toString() + { + return after.saveStatus() + " " + describeFlags(flags); + } } public static class Builder extends CommandChange.Builder @@ -641,7 +656,9 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier switch (field) { default: throw new UnhandledEnum(field); - case CLEANUP: throw UnhandledEnum.invalid(field); + case CLEANUP: + out.writeByte(cleanup.ordinal()); + break; case EXECUTE_AT: Invariants.require(txnId != null); Invariants.require(executeAt != null); @@ -715,7 +732,7 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier { // Since we are iterating in reverse order, we skip the fields that were // set by entries writter later (i.e. already read ones). - if (isChanged(field, flags)) + if (isChanged(field, flags) && field != CLEANUP) skip(txnId, field, in, userVersion); else deserialize(field, in, userVersion); @@ -780,6 +797,7 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier { switch (field) { + default: throw new UnhandledEnum(field); case EXECUTE_AT: ExecuteAtSerializer.skip(txnId, in); break; diff --git a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java index 995ba7f9e9..68a41489e3 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java @@ -17,162 +17,50 @@ */ package org.apache.cassandra.service.accord; -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.PriorityQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import accord.utils.Invariants; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.partitions.PartitionUpdate.SimpleBuilder; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableTxnWriter; -import org.apache.cassandra.io.util.DataInputBuffer; -import org.apache.cassandra.io.util.DataOutputBuffer; -import org.apache.cassandra.journal.SegmentCompactor; -import org.apache.cassandra.journal.StaticSegment; -import org.apache.cassandra.journal.StaticSegment.KeyOrderReader; -import org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer; /** * Segment compactor: takes static segments and compacts them into a single SSTable. */ -public class AccordSegmentCompactor<V> implements SegmentCompactor<JournalKey, V> +public class AccordSegmentCompactor<V> extends AbstractAccordSegmentCompactor<V> { - private static final Logger logger = LoggerFactory.getLogger(AccordSegmentCompactor.class); - private final int userVersion; - private final ColumnFamilyStore cfs; + private SSTableTxnWriter writer; public AccordSegmentCompactor(int userVersion, ColumnFamilyStore cfs) { - this.userVersion = userVersion; - this.cfs = cfs; + super(userVersion, cfs); } @Override - public Collection<StaticSegment<JournalKey, V>> compact(Collection<StaticSegment<JournalKey, V>> segments) + void initializeWriter() { - Invariants.require(segments.size() >= 2, () -> String.format("Can only compact 2 or more segments, but got %d", segments.size())); - logger.info("Compacting {} static segments: {}", segments.size(), segments); - - PriorityQueue<KeyOrderReader<JournalKey>> readers = new PriorityQueue<>(); - for (StaticSegment<JournalKey, V> segment : segments) - { - KeyOrderReader<JournalKey> reader = segment.keyOrderReader(); - if (reader.advance()) - readers.add(reader); - else - reader.close(); - } - - // nothing to compact (all segments empty, should never happen, but it is theoretically possible?) - exit early - // TODO: investigate how this comes to be, check if there is a cleanup issue - if (readers.isEmpty()) - return Collections.emptyList(); - Descriptor descriptor = cfs.newSSTableDescriptor(cfs.getDirectories().getDirectoryForNewSSTables()); SerializationHeader header = new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS); - try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, descriptor, 0, 0, null, false, header)) - { - JournalKey key = null; - Object builder = null; - FlyweightSerializer<Object, Object> serializer = null; - long firstDescriptor = -1, lastDescriptor = -1; - int firstOffset = -1, lastOffset = -1; - try - { - KeyOrderReader<JournalKey> reader; - while ((reader = readers.poll()) != null) - { - if (key == null || !reader.key().equals(key)) - { - maybeWritePartition(writer, key, builder, serializer, firstDescriptor, firstOffset); - - key = reader.key(); - serializer = (FlyweightSerializer<Object, Object>) key.type.serializer; - builder = serializer.mergerFor(key); - firstDescriptor = lastDescriptor = -1; - firstOffset = lastOffset = -1; - } - - boolean advanced; - do - { - try (DataInputBuffer in = new DataInputBuffer(reader.record(), false)) - { - if (lastDescriptor != -1) - { - Invariants.require(reader.descriptor.timestamp <= lastDescriptor, - "Descriptors were accessed out of order: %d was accessed after %d", reader.descriptor.timestamp, lastDescriptor); - Invariants.require(reader.descriptor.timestamp != lastDescriptor || - reader.offset() < lastOffset, - "Offsets were accessed out of order: %d was accessed after %s", reader.offset(), lastOffset); - } - serializer.deserialize(key, builder, in, reader.descriptor.userVersion); - lastDescriptor = reader.descriptor.timestamp; - lastOffset = reader.offset(); - if (firstDescriptor == -1) - { - firstDescriptor = lastDescriptor; - firstOffset = lastOffset; - } - } - } - while ((advanced = reader.advance()) && reader.key().equals(key)); - - if (advanced) readers.offer(reader); // there is more to this reader, but not with this key - else reader.close(); - } - - maybeWritePartition(writer, key, builder, serializer, firstDescriptor, firstOffset); - } - catch (Throwable t) - { - Throwable accumulate = writer.abort(t); - throw new RuntimeException(String.format("Caught exception while serializing. Last seen key: %s", key), accumulate); - } - - cfs.addSSTables(writer.finish(true)); - return Collections.emptyList(); - } + this.writer = SSTableTxnWriter.create(cfs, descriptor, 0, 0, null, false, header); } - private JournalKey prevKey; - private DecoratedKey prevDecoratedKey; - - private void maybeWritePartition(SSTableTxnWriter writer, JournalKey key, Object builder, FlyweightSerializer<Object, Object> serializer, long descriptor, int offset) throws IOException + @Override + SSTableTxnWriter writer() { - if (builder != null) - { - DecoratedKey decoratedKey = AccordKeyspace.JournalColumns.decorate(key); + return writer; + } - if (prevKey != null) - { - Invariants.requireArgument((decoratedKey.compareTo(prevDecoratedKey) >= 0 ? 1 : -1) == (JournalKey.SUPPORT.compare(key, prevKey) >= 0 ? 1 : -1), - String.format("Partition key and JournalKey didn't have matching order, which may imply a serialization issue.\n%s (%s)\n%s (%s)", - key, decoratedKey, prevKey, prevDecoratedKey)); - } - prevKey = key; - prevDecoratedKey = decoratedKey; + @Override + void finishAndAddWriter() + { + cfs.addSSTables(writer.finish(true)); + writer.close(); + } - SimpleBuilder partitionBuilder = PartitionUpdate.simpleBuilder(cfs.metadata(), decoratedKey); - try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get()) - { - serializer.reserialize(key, builder, out, userVersion); - partitionBuilder.row(descriptor, offset) - .add("record", out.asNewBuffer()) - .add("user_version", userVersion); - } - writer.append(partitionBuilder.build().unfilteredIterator()); - } + @Override + Throwable cleanupWriter(Throwable t) + { + return writer.abort(t); } } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java index 4236e7fbc6..2af8fd3b39 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java @@ -73,6 +73,7 @@ public class WaitingOnSerializer @Override public WaitingOn provide(TxnId txnId, PartialDeps deps, Timestamp executeAtLeast, long uniqueHlc) { + Invariants.nonNull(deps); RoutingKeys keys = deps.keyDeps.keys(); RangeDeps directRangeDeps = deps.rangeDeps; int txnIdCount = directRangeDeps.txnIdCount(); diff --git a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java index ee963915f2..6c983eea0c 100644 --- a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java @@ -18,11 +18,15 @@ package org.apache.cassandra.service.accord; +import java.io.IOException; import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; - +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.junit.Before; @@ -37,13 +41,27 @@ import accord.impl.basic.Cluster; import accord.impl.basic.RandomDelayQueue; import accord.local.CommandStores; import accord.local.Node; +import accord.primitives.EpochSupplier; import accord.utils.DefaultRandom; import accord.utils.RandomSource; import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compaction.CompactionController; +import org.apache.cassandra.db.compaction.CompactionIterator; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; +import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.File; +import org.apache.cassandra.journal.Params; +import org.apache.cassandra.journal.SegmentCompactor; +import org.apache.cassandra.journal.StaticSegment; import org.apache.cassandra.journal.TestParams; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; @@ -56,9 +74,9 @@ import org.apache.cassandra.service.accord.serializers.KeySerializers; import org.apache.cassandra.service.accord.serializers.ResultSerializers; import org.apache.cassandra.service.accord.serializers.TopologySerializers; import org.apache.cassandra.tools.FieldUtil; -import org.apache.cassandra.utils.concurrent.Condition; import static accord.impl.PrefixedIntHashKey.ranges; +import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; public class AccordJournalBurnTest extends BurnTestBase { @@ -116,11 +134,11 @@ public class AccordJournalBurnTest extends BurnTestBase List<Node.Id> clients = generateIds(true, 1 + random.nextInt(4)); int rf; float chance = random.nextFloat(); - if (chance < 0.2f) { rf = random.nextInt(2, 9); } - else if (chance < 0.4f) { rf = 3; } - else if (chance < 0.7f) { rf = 5; } - else if (chance < 0.8f) { rf = 7; } - else { rf = 9; } + if (chance < 0.2f) rf = random.nextInt(2, 9); + else if (chance < 0.4f) rf = 3; + else if (chance < 0.7f) rf = 5; + else if (chance < 0.8f) rf = 7; + else rf = 9; List<Node.Id> nodes = generateIds(false, random.nextInt(rf, rf * 3)); @@ -135,6 +153,7 @@ public class AccordJournalBurnTest extends BurnTestBase AccordKeyspace.TABLES = Tables.of(metadatas); setUp(); } + Keyspace ks = Schema.instance.getKeyspaceInstance("system_accord"); burn(random, new TopologyFactory(rf, ranges(0, HASH_RANGE_START, HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1, rf), nodes.size() * 3))), @@ -145,7 +164,7 @@ public class AccordJournalBurnTest extends BurnTestBase operations, 10 + random.nextInt(30), new RandomDelayQueue.Factory(random).get(), - (node, agent) -> { + (node, agent, randomSource) -> { try { File directory = new File(Files.createTempDirectory(Integer.toString(counter.incrementAndGet()))); @@ -154,37 +173,18 @@ public class AccordJournalBurnTest extends BurnTestBase cfs.disableAutoCompaction(); AccordJournal journal = new AccordJournal(new TestParams() { - @Override - public FlushMode flushMode() - { - return FlushMode.PERIODIC; - } - - @Override - public long flushPeriod(TimeUnit units) - { - return 1; - } - @Override public int segmentSize() { return 32 * 1024 * 1024; } - - @Override - public boolean enableCompaction() - { - return false; - } }, new AccordAgent(), directory, cfs) { @Override public void saveCommand(int store, CommandUpdate update, @Nullable Runnable onFlush) { - Condition condition = Condition.newOneTimeCondition(); - super.saveCommand(store, update, condition::signal); - condition.awaitUninterruptibly(); + // For the purpose of this test, we do not have to wait for flush, since we do not test durability and are using mmap + super.saveCommand(store, update, () -> {}); if (onFlush != null) onFlush.run(); } @@ -192,21 +192,105 @@ public class AccordJournalBurnTest extends BurnTestBase @Override public void saveStoreState(int store, FieldUpdates fieldUpdates, @Nullable Runnable onFlush) { - Condition condition = Condition.newOneTimeCondition(); - super.saveStoreState(store, fieldUpdates, condition::signal); + super.saveStoreState(store, fieldUpdates, () -> {}); if (onFlush != null) onFlush.run(); } @Override - public void saveTopology(TopologyUpdate topologyUpdate, @Nullable Runnable onFlush) + public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush) { - Condition condition = Condition.newOneTimeCondition(); - super.saveTopology(topologyUpdate, condition::signal); + super.saveTopology(topologyUpdate, () -> {}); if (onFlush != null) onFlush.run(); } + @Override + protected SegmentCompactor<JournalKey, Object> compactor(ColumnFamilyStore cfs, Params params) + { + return new NemesisAccordSegmentCompactor<>(params.userVersion(), cfs, randomSource.fork()) + { + @Nullable + @Override + public Collection<StaticSegment<JournalKey, Object>> compact(Collection<StaticSegment<JournalKey, Object>> staticSegments) + { + if (journalTable == null) + throw new IllegalStateException("Unsafe access to AccordJournal during <init>; journalTable was touched before it was published"); + Collection<StaticSegment<JournalKey, Object>> result = super.compact(staticSegments); + journalTable.safeNotify(index -> index.remove(staticSegments)); + return result; + } + }; + } + + private CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, + Directories directories, + LifecycleTransaction transaction, + Set<SSTableReader> nonExpiredSSTables) + { + return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, false, 0); + } + + @Override + public void purge(CommandStores commandStores, EpochSupplier minEpoch) + { + this.journal.closeCurrentSegmentForTestingIfNonEmpty(); + this.journal.runCompactorForTesting(); + + List<SSTableReader> all = new ArrayList<>(cfs.getLiveSSTables()); + if (all.size() <= 1) + return; + + Set<SSTableReader> sstables = new HashSet<>(); + + int min, max; + while (true) + { + int tmp1 = randomSource.nextInt(0, all.size()); + int tmp2 = randomSource.nextInt(0, all.size()); + if (tmp1 != tmp2 && Math.abs(tmp1 - tmp2) >= 1) + { + min = Math.min(tmp1, tmp2); + max = Math.max(tmp1, tmp2); + break; + } + } + // Random subset + for (int i = min; i < max; i++) + sstables.add(all.get(i)); + + List<ISSTableScanner> scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList()); + + Collection<SSTableReader> newSStables; + + try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION); + CompactionController controller = new CompactionController(cfs, sstables, 0); + CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners, controller, 0, nextTimeUUID())) + { + CompactionManager.instance.active.beginCompaction(ci); + try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, cfs.getDirectories(), txn, sstables)) + { + while (ci.hasNext()) + { + writer.append(ci.next()); + ci.setTargetDirectory(writer.getSStableDirectory().path()); + } + + // point of no return + newSStables = writer.finish(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + finally + { + CompactionManager.instance.active.finishCompaction(ci); + } + } + } + + @Override public void replay(CommandStores commandStores) { diff --git a/test/distributed/org/apache/cassandra/service/accord/NemesisAccordSegmentCompactor.java b/test/distributed/org/apache/cassandra/service/accord/NemesisAccordSegmentCompactor.java new file mode 100644 index 0000000000..12f8554df7 --- /dev/null +++ b/test/distributed/org/apache/cassandra/service/accord/NemesisAccordSegmentCompactor.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import accord.utils.RandomSource; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableTxnWriter; + +/** + * Nemesis compactor: a compactor that will distribute your keys over a large(r) number of SSTables. + * + * For testing purposes only. + */ +public class NemesisAccordSegmentCompactor<V> extends AbstractAccordSegmentCompactor<V> +{ + private final RandomSource randomSource; + private final SSTableTxnWriter[] writers; + private final Set<SSTableTxnWriter> written = new HashSet<>(); + + public NemesisAccordSegmentCompactor(int userVersion, ColumnFamilyStore cfs, RandomSource randomSource) + { + super(userVersion, cfs); + this.randomSource = randomSource; + this.writers = new SSTableTxnWriter[randomSource.nextInt(2, 10)]; + } + + @Override + boolean considerWritingKey() + { + if (written.size() == writers.length - 1) + return false; + return randomSource.nextBoolean(); + } + + @Override + void switchPartitions() + { + written.clear(); + } + + @Override + void initializeWriter() + { + for (int i = 0; i < writers.length; i++) + { + Descriptor descriptor = cfs.newSSTableDescriptor(cfs.getDirectories().getDirectoryForNewSSTables()); + SerializationHeader header = new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS); + writers[i] = SSTableTxnWriter.create(cfs, descriptor, 0, 0, null, false, header); + } + } + + @Override + SSTableTxnWriter writer() + { + for (int i = 0; i < 10_000; i++) + { + SSTableTxnWriter writer = writers[randomSource.nextInt(writers.length)]; + if (written.add(writer)) + return writer; + } + throw new IllegalStateException(String.format("Could not pick an sstable from %s. Written: %s", Arrays.asList(writers), written)); + } + + @Override + void finishAndAddWriter() + { + for (SSTableTxnWriter writer : writers) + { + cfs.addSSTables(writer.finish(true)); + writer.close(); + } + Arrays.fill(writers, null); + } + + @Override + Throwable cleanupWriter(Throwable t) + { + for (SSTableTxnWriter writer : writers) + t = writer.abort(t); + return t; + } +} diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java index d8c09544db..9a68d127ce 100644 --- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java +++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java @@ -29,7 +29,6 @@ import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.ToLongFunction; - import javax.annotation.Nullable; import accord.api.Agent; @@ -235,7 +234,7 @@ public class SimulatedAccordCommandStore implements AutoCloseable } }; - this.journal = new DefaultJournal(nodeId, agent); + this.journal = new DefaultJournal(nodeId, agent, rs.fork()); this.commandStore = new AccordCommandStore(0, storeService, agent, @@ -466,9 +465,9 @@ public class SimulatedAccordCommandStore implements AutoCloseable private static class DefaultJournal extends InMemoryJournal implements RangeSearcher.Supplier { private final RouteInMemoryIndex<?> index = new RouteInMemoryIndex<>(); - private DefaultJournal(Node.Id id, Agent agent) + private DefaultJournal(Node.Id id, Agent agent, RandomSource rs) { - super(id, agent); + super(id, agent, rs); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org