This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit c831042910fce4b2ce1a92a0d86c2003d44b853f Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Thu Aug 22 18:48:03 2024 +0200 Journal segment compaction Patch by Alex Petrov and Aleksey Yeschenko, reviewed by Aleksey Yeschenko and Alex Petrov for CASSANDRA-19876 --- .../org/apache/cassandra/config/AccordSpec.java | 14 ++ .../apache/cassandra/journal/ActiveSegment.java | 1 + .../org/apache/cassandra/journal/Compactor.java | 102 +++++++++ .../org/apache/cassandra/journal/Descriptor.java | 13 +- .../apache/cassandra/journal/EntrySerializer.java | 49 +++-- .../apache/cassandra/journal/InMemoryIndex.java | 4 +- src/java/org/apache/cassandra/journal/Index.java | 13 +- src/java/org/apache/cassandra/journal/Journal.java | 164 +++++++-------- .../org/apache/cassandra/journal/Metadata.java | 2 +- .../org/apache/cassandra/journal/OnDiskIndex.java | 63 +++--- src/java/org/apache/cassandra/journal/Params.java | 4 + .../apache/cassandra/journal/RecordConsumer.java | 1 + src/java/org/apache/cassandra/journal/Segment.java | 11 +- .../{RecordConsumer.java => SegmentCompactor.java} | 18 +- .../org/apache/cassandra/journal/Segments.java | 42 ++-- .../apache/cassandra/journal/StaticSegment.java | 204 +++++++++++++----- .../cassandra/service/accord/AccordJournal.java | 29 +-- .../service/accord/AccordJournalTable.java | 227 +++++++++++++++++++++ .../cassandra/service/accord/AccordKeyspace.java | 18 +- .../service/accord/AccordSegmentCompactor.java | 119 +++++++++++ .../cassandra/service/accord/IAccordService.java | 3 + .../cassandra/service/accord/JournalKey.java | 6 +- .../cassandra/service/accord/SavedCommand.java | 132 ++++++++++-- ...Test.java => AccordJournalIntegrationTest.java} | 2 +- .../journal/AccordJournalCompactionTest.java | 137 +++++++++++++ .../test/AccordJournalSimulationTest.java | 4 +- .../org/apache/cassandra/journal/IndexTest.java | 12 +- .../org/apache/cassandra/journal/JournalTest.java | 36 +++- .../org/apache/cassandra/journal/SegmentTest.java | 10 +- .../org/apache/cassandra/journal/TestParams.java | 12 ++ .../cassandra/service/accord/AccordTestUtils.java | 2 +- 31 files changed, 1160 insertions(+), 294 deletions(-) diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java b/src/java/org/apache/cassandra/config/AccordSpec.java index b4d25d6689..102ae68b67 100644 --- a/src/java/org/apache/cassandra/config/AccordSpec.java +++ b/src/java/org/apache/cassandra/config/AccordSpec.java @@ -28,6 +28,8 @@ public class AccordSpec public volatile String journal_directory; + public volatile boolean enable_journal_compaction = true; + public volatile OptionaldPositiveInt shard_count = OptionaldPositiveInt.UNDEFINED; public volatile DurationSpec.IntMillisecondsBound recover_delay = new DurationSpec.IntMillisecondsBound(1000); @@ -101,6 +103,18 @@ public class AccordSpec return flushMode; } + @Override + public boolean enableCompaction() + { + return DatabaseDescriptor.getAccord().enable_journal_compaction; + } + + @Override + public int compactionPeriodMillis() + { + return 60_000; + } + @JsonIgnore @Override public int flushPeriodMillis() diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java b/src/java/org/apache/cassandra/journal/ActiveSegment.java index 1bee25a96f..1fd9905490 100644 --- a/src/java/org/apache/cassandra/journal/ActiveSegment.java +++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java @@ -197,6 +197,7 @@ final class ActiveSegment<K, V> extends Segment<K, V> descriptor.fileFor(Component.SYNCED_OFFSETS).deleteIfExists(); } + @Override void release() { selfRef.release(); diff --git a/src/java/org/apache/cassandra/journal/Compactor.java b/src/java/org/apache/cassandra/journal/Compactor.java new file mode 100644 index 0000000000..846dd62ba8 --- /dev/null +++ b/src/java/org/apache/cassandra/journal/Compactor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.journal; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.concurrent.ScheduledExecutorPlus; +import org.apache.cassandra.concurrent.Shutdownable; + +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; + +final class Compactor<K, V> implements Runnable, Shutdownable +{ + private final Journal<K, V> journal; + private final SegmentCompactor<K, V> segmentCompactor; + private final ScheduledExecutorPlus executor; + + Compactor(Journal<K, V> journal, SegmentCompactor<K, V> segmentCompactor) + { + this.executor = executorFactory().scheduled(false, journal.name + "-compactor"); + this.journal = journal; + this.segmentCompactor = segmentCompactor; + } + + void start() + { + if (journal.params.enableCompaction()) + { + executor.scheduleWithFixedDelay(this, + journal.params.compactionPeriodMillis(), + journal.params.compactionPeriodMillis(), + TimeUnit.MILLISECONDS); + } + } + + @Override + public void run() + { + Set<StaticSegment<K, V>> toCompact = new HashSet<>(); + journal.segments().selectStatic(toCompact); + if (toCompact.size() < 2) + return; + + try + { + Collection<StaticSegment<K, V>> newSegments = segmentCompactor.compact(toCompact, journal.keySupport); + for (StaticSegment<K, V> segment : newSegments) + toCompact.remove(segment); + + journal.replaceCompactedSegments(toCompact, newSegments); + for (StaticSegment<K, V> segment : toCompact) + segment.discard(); + } + catch (IOException e) + { + throw new RuntimeException("Could not compact segments: " + toCompact); + } + } + + @Override + public boolean isTerminated() + { + return executor.isTerminated(); + } + + @Override + public void shutdown() + { + executor.shutdown(); + } + + @Override + public Object shutdownNow() + { + return executor.shutdownNow(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException + { + return executor.awaitTermination(timeout, units); + } +} diff --git a/src/java/org/apache/cassandra/journal/Descriptor.java b/src/java/org/apache/cassandra/journal/Descriptor.java index 176a12e109..cea68c353e 100644 --- a/src/java/org/apache/cassandra/journal/Descriptor.java +++ b/src/java/org/apache/cassandra/journal/Descriptor.java @@ -66,20 +66,20 @@ public final class Descriptor implements Comparable<Descriptor> static final int CURRENT_JOURNAL_VERSION = JOURNAL_VERSION_1; final File directory; - final long timestamp; - final int generation; + public final long timestamp; + public final int generation; /** * Serialization version for journal components; bumped as journal * implementation evolves over time. */ - final int journalVersion; + public final int journalVersion; /** * Serialization version for user content - specifically journal keys * and journal values; bumped when user logic evolves. */ - final int userVersion; + public final int userVersion; Descriptor(File directory, long timestamp, int generation, int journalVersion, int userVersion) { @@ -114,11 +114,6 @@ public final class Descriptor implements Comparable<Descriptor> return fromName(file.parent(), file.name()); } - Descriptor withIncrementedGeneration() - { - return new Descriptor(directory, timestamp, generation + 1, journalVersion, userVersion); - } - File fileFor(Component component) { return new File(directory, formatFileName(component)); diff --git a/src/java/org/apache/cassandra/journal/EntrySerializer.java b/src/java/org/apache/cassandra/journal/EntrySerializer.java index a2a61cfce3..2a707e7d73 100644 --- a/src/java/org/apache/cassandra/journal/EntrySerializer.java +++ b/src/java/org/apache/cassandra/journal/EntrySerializer.java @@ -35,7 +35,7 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksum; import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; import static org.apache.cassandra.utils.FBUtilities.updateChecksumShort; -final class EntrySerializer +public final class EntrySerializer { static <K> void write(K key, ByteBuffer record, @@ -73,14 +73,14 @@ final class EntrySerializer static <K> void read(EntryHolder<K> into, KeySupport<K> keySupport, - ByteBuffer buffer, + ByteBuffer from, int userVersion) throws IOException { CRC32 crc = Crc.crc32(); into.clear(); - try (DataInputBuffer in = new DataInputBuffer(buffer, false)) + try (DataInputBuffer in = new DataInputBuffer(from, false)) { K key = keySupport.deserialize(in, userVersion); keySupport.updateChecksum(crc, key, userVersion); @@ -101,9 +101,11 @@ final class EntrySerializer into.hosts.add(hostId); } + // TODO: try to avoid allocating another buffer here ByteBuffer entry = ByteBufferUtil.read(in, entrySize); updateChecksum(crc, entry); into.value = entry; + into.userVersion = userVersion; validateCRC(crc, in.readInt()); } @@ -111,7 +113,7 @@ final class EntrySerializer static <K> boolean tryRead(EntryHolder<K> into, KeySupport<K> keySupport, - ByteBuffer buffer, + ByteBuffer from, DataInputBuffer in, int syncedOffset, int userVersion) @@ -121,11 +123,11 @@ final class EntrySerializer into.clear(); int fixedSize = EntrySerializer.fixedEntrySize(keySupport, userVersion); - if (buffer.remaining() < fixedSize) - return handleReadException(new EOFException(), buffer.limit(), syncedOffset); + if (from.remaining() < fixedSize) + return handleReadException(new EOFException(), from.limit(), syncedOffset); - updateChecksum(crc, buffer, buffer.position(), fixedSize - TypeSizes.INT_SIZE); - int fixedCrc = buffer.getInt(buffer.position() + fixedSize - TypeSizes.INT_SIZE); + updateChecksum(crc, from, from.position(), fixedSize - TypeSizes.INT_SIZE); + int fixedCrc = from.getInt(from.position() + fixedSize - TypeSizes.INT_SIZE); try { @@ -133,7 +135,7 @@ final class EntrySerializer } catch (IOException e) { - return handleReadException(e, buffer.position() + fixedSize, syncedOffset); + return handleReadException(e, from.position() + fixedSize, syncedOffset); } int hostCount, recordSize; @@ -150,11 +152,11 @@ final class EntrySerializer } int variableSize = EntrySerializer.variableEntrySize(hostCount, recordSize); - if (buffer.remaining() < variableSize) - return handleReadException(new EOFException(), buffer.limit(), syncedOffset); + if (from.remaining() < variableSize) + return handleReadException(new EOFException(), from.limit(), syncedOffset); - updateChecksum(crc, buffer, buffer.position(), variableSize - TypeSizes.INT_SIZE); - int variableCrc = buffer.getInt(buffer.position() + variableSize - TypeSizes.INT_SIZE); + updateChecksum(crc, from, from.position(), variableSize - TypeSizes.INT_SIZE); + int variableCrc = from.getInt(from.position() + variableSize - TypeSizes.INT_SIZE); try { @@ -162,7 +164,7 @@ final class EntrySerializer } catch (IOException e) { - return handleReadException(e, buffer.position() + variableSize, syncedOffset); + return handleReadException(e, from.position() + variableSize, syncedOffset); } for (int i = 0; i < hostCount; i++) @@ -179,9 +181,10 @@ final class EntrySerializer throw new AssertionError(); // can't happen } - into.value = buffer.duplicate() - .position(buffer.position() - recordSize) - .limit(buffer.position()); + into.value = from.duplicate() + .position(from.position() - recordSize) + .limit(from.position()); + into.userVersion = userVersion; in.skipBytesFully(TypeSizes.INT_SIZE); return true; @@ -210,13 +213,15 @@ final class EntrySerializer + TypeSizes.INT_SIZE; // CRC } - static final class EntryHolder<K> + public static final class EntryHolder<K> { - K key; - ByteBuffer value; - IntHashSet hosts = new IntHashSet(); + public K key; + public ByteBuffer value; + public IntHashSet hosts = new IntHashSet(); - void clear() + public int userVersion; + + public void clear() { key = null; value = null; diff --git a/src/java/org/apache/cassandra/journal/InMemoryIndex.java b/src/java/org/apache/cassandra/journal/InMemoryIndex.java index 5417bfea40..2c71d8c4ff 100644 --- a/src/java/org/apache/cassandra/journal/InMemoryIndex.java +++ b/src/java/org/apache/cassandra/journal/InMemoryIndex.java @@ -131,14 +131,14 @@ final class InMemoryIndex<K> extends Index<K> { InMemoryIndex<K> index = new InMemoryIndex<>(keySupport, new TreeMap<>(keySupport)); - try (StaticSegment.SequentialReader<K> reader = StaticSegment.reader(descriptor, keySupport, fsyncedLimit)) + try (StaticSegment.SequentialReader<K> reader = StaticSegment.sequentialReader(descriptor, keySupport, fsyncedLimit)) { int last = -1; while (reader.advance()) { int current = reader.offset(); if (last >= 0) - index.update(reader.id(), last, current); + index.update(reader.key(), last, current); last = current; } diff --git a/src/java/org/apache/cassandra/journal/Index.java b/src/java/org/apache/cassandra/journal/Index.java index f42a42d5ed..bf6ab5d0c1 100644 --- a/src/java/org/apache/cassandra/journal/Index.java +++ b/src/java/org/apache/cassandra/journal/Index.java @@ -22,6 +22,7 @@ import javax.annotation.Nullable; import org.apache.cassandra.utils.Closeable; import static com.google.common.collect.Iterables.any; + /** * Mapping of client supplied ids to in-segment offsets */ @@ -85,15 +86,6 @@ abstract class Index<K> implements Closeable return any(ids, this::mayContainId); } - interface IndexIterator<K> - { - boolean hasNext(); - K currentKey(); - int currentOffset(); - int currentSize(); - void next(); - } - /** * Helper methods */ @@ -118,7 +110,7 @@ abstract class Index<K> implements Closeable public static long writeSize(long record, int size) { record &= 0xffffffff00000000L; // unset all lower bits - record |= (long) size; + record |= size; return record; } @@ -126,5 +118,4 @@ abstract class Index<K> implements Closeable { return writeSize(writeOffset(0, offset), size); } - } diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index 50a3058ec9..0baf8e5af3 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -18,12 +18,11 @@ package org.apache.cassandra.journal; import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; import java.nio.file.FileStore; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -53,7 +52,6 @@ import org.apache.cassandra.io.util.PathUtils; import org.apache.cassandra.journal.Segments.ReferencedSegment; import org.apache.cassandra.journal.Segments.ReferencedSegments; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.accord.SavedCommand; import org.apache.cassandra.utils.Crc; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Simulate; @@ -100,8 +98,7 @@ public class Journal<K, V> implements Shutdownable final Metrics<K, V> metrics; final Flusher<K, V> flusher; - //final Invalidator<K, V> invalidator; - //final Compactor<K, V> compactor; + final Compactor<K, V> compactor; volatile long replayLimit; final AtomicLong nextSegmentId = new AtomicLong(); @@ -120,7 +117,6 @@ public class Journal<K, V> implements Shutdownable private final FlusherCallbacks flusherCallbacks; SequentialExecutorPlus closer; - //private final Set<Descriptor> invalidations = Collections.newSetFromMap(new ConcurrentHashMap<>()); private class FlusherCallbacks implements Flusher.Callbacks { @@ -180,7 +176,8 @@ public class Journal<K, V> implements Shutdownable File directory, Params params, KeySupport<K> keySupport, - ValueSerializer<K, V> valueSerializer) + ValueSerializer<K, V> valueSerializer, + SegmentCompactor<K, V> segmentCompactor) { this.name = name; this.directory = directory; @@ -192,8 +189,7 @@ public class Journal<K, V> implements Shutdownable this.metrics = new Metrics<>(name); this.flusherCallbacks = new FlusherCallbacks(); this.flusher = new Flusher<>(this, flusherCallbacks); - //this.invalidator = new Invalidator<>(this); - //this.compactor = new Compactor<>(this); + this.compactor = new Compactor<>(this, segmentCompactor); } public boolean isFlushed(RecordPointer recordPointer) @@ -229,8 +225,13 @@ public class Journal<K, V> implements Shutdownable allocator = executorFactory().infiniteLoop(name + "-allocator", new AllocateRunnable(), SAFE, NON_DAEMON, SYNCHRONIZED); advanceSegment(null); flusher.start(); - //invalidator.start(); - //compactor.start(); + compactor.start(); + } + + @VisibleForTesting + void runCompactorForTesting() + { + compactor.run(); } /** @@ -254,8 +255,8 @@ public class Journal<K, V> implements Shutdownable { allocator.shutdown(); allocator.awaitTermination(1, TimeUnit.MINUTES); - //compactor.stop(); - //invalidator.stop(); + compactor.shutdown(); + compactor.awaitTermination(1, TimeUnit.MINUTES); flusher.shutdown(); closer.shutdown(); closer.awaitTermination(1, TimeUnit.MINUTES); @@ -284,34 +285,6 @@ public class Journal<K, V> implements Shutdownable return r; } - /** - * Read an entry by its address (segment timestamp + offest) - * - * @return deserialized record if present, null otherwise - */ - public V read(long segmentTimestamp, int offset, int size) - { - try (ReferencedSegment<K, V> referenced = selectAndReference(segmentTimestamp)) - { - Segment<K, V> segment = referenced.segment(); - if (null == segment) - return null; - - EntrySerializer.EntryHolder<K> holder = new EntrySerializer.EntryHolder<>(); - segment.read(offset, size, holder); - - try (DataInputBuffer in = new DataInputBuffer(holder.value, false)) - { - return valueSerializer.deserialize(holder.key, in, segment.descriptor.userVersion); - } - catch (IOException e) - { - // can only throw if serializer is buggy - throw new RuntimeException(e); - } - } - } - /** * Looks up a record by the provided id. * <p/> @@ -324,19 +297,20 @@ public class Journal<K, V> implements Shutdownable * @param id user-provided record id, expected to roughly correlate with time and go up * @return deserialized record if found, null otherwise */ + @SuppressWarnings("unused") public V readFirst(K id) { EntrySerializer.EntryHolder<K> holder = new EntrySerializer.EntryHolder<>(); try (ReferencedSegments<K, V> segments = selectAndReference(id)) { - for (Segment<K, V> segment : segments.all()) + for (Segment<K, V> segment : segments.allSorted()) { if (segment.readFirst(id, holder)) { try (DataInputBuffer in = new DataInputBuffer(holder.value, false)) { - return valueSerializer.deserialize(holder.key, in, segment.descriptor.userVersion); + return valueSerializer.deserialize(holder.key, in, holder.userVersion); } catch (IOException e) { @@ -349,36 +323,34 @@ public class Journal<K, V> implements Shutdownable return null; } - public List<V> readAll(K id) - { - List<V> res = new ArrayList<>(2); - readAll(id, (in, userVersion) -> res.add(valueSerializer.deserialize(id, in, userVersion))); - return res; - } - - public void readAll(K id, Reader reader) + public void readAll(K id, RecordConsumer<K> consumer) { EntrySerializer.EntryHolder<K> holder = new EntrySerializer.EntryHolder<>(); try (ReferencedSegments<K, V> segments = selectAndReference(id)) { - for (Segment<K, V> segment : segments.all()) + consumer.init(); + + for (Segment<K, V> segment : segments.allSorted()) + segment.readAll(id, holder, consumer); + } + } + + @SuppressWarnings("unused") + public List<V> readAll(K id) + { + List<V> res = new ArrayList<>(2); + readAll(id, (segment, position, key, buffer, hosts, userVersion) -> { + try (DataInputBuffer in = new DataInputBuffer(buffer, false)) { - segment.readAll(id, holder, () -> { - try (DataInputBuffer in = new DataInputBuffer(holder.value, false)) - { - Invariants.checkState(Objects.equals(holder.key, id), - "%s != %s", holder.key, id); - reader.read(in, segment.descriptor.userVersion); - holder.clear(); - } - catch (IOException e) - { - // can only throw if serializer is buggy - throw new RuntimeException(e); - } - }); + res.add(valueSerializer.deserialize(key, in, userVersion)); } - } + catch (IOException e) + { + // can only throw if serializer is buggy + throw new RuntimeException(e); + } + }); + return res; } /** @@ -394,6 +366,7 @@ public class Journal<K, V> implements Shutdownable * @param condition predicate to test the record against * @return deserialized record if found, null otherwise */ + @SuppressWarnings("unused") public V readFirstMatching(K id, Predicate<V> condition) { EntrySerializer.EntryHolder<K> holder = new EntrySerializer.EntryHolder<>(); @@ -441,6 +414,7 @@ public class Journal<K, V> implements Shutdownable * @param consumer function to consume the raw record (bytes and invalidation set) if found * @return true if the record was found, false otherwise */ + @SuppressWarnings("unused") public boolean readFirst(K id, RecordConsumer<K> consumer) { try (ReferencedSegments<K, V> segments = selectAndReference(id)) @@ -457,6 +431,7 @@ public class Journal<K, V> implements Shutdownable * * @return subset of ids to test that have been found in the journal */ + @SuppressWarnings("unused") public Set<K> test(Set<K> test) { Set<K> present = new ObjectHashSet<>(test.size() + 1, 0.9f); @@ -515,19 +490,7 @@ public class Journal<K, V> implements Shutdownable */ public RecordPointer asyncWrite(K id, V record, Set<Integer> hosts) { - return asyncWrite(id, new SavedCommand.Writer<>() - { - public void write(DataOutputPlus out, int userVersion) throws IOException - { - valueSerializer.serialize(id, record, out, params.userVersion()); - } - - public K key() - { - return id; - } - }, - hosts); + return asyncWrite(id, (out, userVersion) -> valueSerializer.serialize(id, record, out, userVersion), hosts); } public RecordPointer asyncWrite(K id, Writer writer, Set<Integer> hosts) @@ -548,7 +511,6 @@ public class Journal<K, V> implements Shutdownable return recordPointer; } - private ActiveSegment<K, V>.Allocation allocate(int entrySize, Set<Integer> hosts) { ActiveSegment<K, V> segment = currentSegment; @@ -658,6 +620,11 @@ public class Journal<K, V> implements Shutdownable Thread.yield(); } } + catch (JournalWriteError e) + { + if (!(e.getCause() instanceof ClosedByInterruptException)) + throw e; + } catch (Throwable t) { if (!handleError("Failed allocating journal segments", t)) @@ -727,26 +694,38 @@ public class Journal<K, V> implements Shutdownable } /** - * Select segments that could potentially have any entry with the specified ids and + * Select segments that could potentially have any entry with the specified id and * attempt to grab references to them all. * * @return a subset of segments with references to them */ - ReferencedSegments<K, V> selectAndReference(Iterable<K> ids) + ReferencedSegments<K, V> selectAndReference(K id) { while (true) { - ReferencedSegments<K, V> referenced = segments().selectAndReference(ids); + ReferencedSegments<K, V> referenced = segments().selectAndReference(s -> s.index().mayContainId(id)); if (null != referenced) return referenced; } } - ReferencedSegments<K, V> selectAndReference(K id) + /** + * Select segments that could potentially have any entry with the specified ids and + * attempt to grab references to them all. + * + * @return a subset of segments with references to them + */ + ReferencedSegments<K, V> selectAndReference(Iterable<K> ids) { - return selectAndReference(Collections.singleton(id)); + while (true) + { + ReferencedSegments<K, V> referenced = segments().selectAndReference(s -> s.index().mayContainIds(ids)); + if (null != referenced) + return referenced; + } } + @SuppressWarnings("unused") ReferencedSegment<K, V> selectAndReference(long segmentTimestamp) { while (true) @@ -757,7 +736,7 @@ public class Journal<K, V> implements Shutdownable } } - private Segments<K, V> segments() + Segments<K, V> segments() { return segments.get(); } @@ -784,9 +763,9 @@ public class Journal<K, V> implements Shutdownable swapSegments(current -> current.withCompletedSegment(activeSegment, staticSegment)); } - private void replaceCompactedSegment(StaticSegment<K, V> oldSegment, StaticSegment<K, V> newSegment) + void replaceCompactedSegments(Collection<StaticSegment<K, V>> oldSegments, Collection<StaticSegment<K, V>> compactedSegments) { - swapSegments(current -> current.withCompactedSegment(oldSegment, newSegment)); + swapSegments(current -> current.withCompactedSegments(oldSegments, compactedSegments)); } void selectSegmentToFlush(Collection<ActiveSegment<K, V>> into) @@ -836,7 +815,7 @@ public class Journal<K, V> implements Shutdownable if (segment == null) throw new IllegalArgumentException("Request the active segment " + timestamp + " but this segment does not exist"); if (!segment.isActive()) - throw new IllegalArgumentException("Request the active segment " + timestamp + " but this segment is not active"); + throw new IllegalArgumentException(String.format("Request the active segment %d but this segment is not active: %s", timestamp, segment)); return segment.asActive(); } } @@ -975,9 +954,4 @@ public class Journal<K, V> implements Shutdownable { void write(DataOutputPlus out, int userVersion) throws IOException; } - - public interface Reader - { - void read(DataInputBuffer in, int userVersion) throws IOException; - } } diff --git a/src/java/org/apache/cassandra/journal/Metadata.java b/src/java/org/apache/cassandra/journal/Metadata.java index bc521cc83c..e8224ca64e 100644 --- a/src/java/org/apache/cassandra/journal/Metadata.java +++ b/src/java/org/apache/cassandra/journal/Metadata.java @@ -191,7 +191,7 @@ final class Metadata Int2IntHashMap recordsPerHost = new Int2IntHashMap(Integer.MIN_VALUE); int recordsCount = 0; - try (StaticSegment.SequentialReader<K> reader = StaticSegment.reader(descriptor, keySupport, fsyncedLimit)) + try (StaticSegment.SequentialReader<K> reader = StaticSegment.sequentialReader(descriptor, keySupport, fsyncedLimit)) { while (reader.advance()) { diff --git a/src/java/org/apache/cassandra/journal/OnDiskIndex.java b/src/java/org/apache/cassandra/journal/OnDiskIndex.java index ba769d6163..fe2c2713b9 100644 --- a/src/java/org/apache/cassandra/journal/OnDiskIndex.java +++ b/src/java/org/apache/cassandra/journal/OnDiskIndex.java @@ -256,7 +256,7 @@ final class OnDiskIndex<K> extends Index<K> public long[] lookUpAll(K id) { if (!mayContainId(id)) - return new long[0]; + return EMPTY; int start = binarySearch(id); int firstKeyIndex = start; @@ -265,7 +265,7 @@ final class OnDiskIndex<K> extends Index<K> firstKeyIndex = i; if (firstKeyIndex < 0) - return new long[0]; + return EMPTY; int lastKeyIndex = start; @@ -282,56 +282,61 @@ final class OnDiskIndex<K> extends Index<K> return all; } - public IndexIterator<K> iterator() + IndexReader reader() { - return new IndexIteratorImpl(); + return new IndexReader(); } - private class IndexIteratorImpl implements IndexIterator<K> + public class IndexReader { - int currentIdx; - K currentKey; - int currentOffset; - int currentSize; + int idx; + K key; + int offset; + int size; - IndexIteratorImpl() + IndexReader() { - currentIdx = -1; + idx = -1; } - @Override - public boolean hasNext() + public K key() { - return currentIdx < (entryCount - 1); + ensureAdvanced(); + return key; } - @Override - public K currentKey() + public int offset() { - return currentKey; + ensureAdvanced(); + return offset; } - @Override - public int currentOffset() + public int recordSize() { - return currentOffset; + ensureAdvanced(); + return size; } - @Override - public int currentSize() + public boolean advance() { - return currentSize; + if (idx >= entryCount - 1) + return false; + + idx++; + key = keyAtIndex(idx); + long record = recordAtIndex(idx); + offset = Index.readOffset(record); + size = Index.readSize(record); + return true; } - public void next() + private void ensureAdvanced() { - currentIdx++; - currentKey = keyAtIndex(currentIdx); - long record = recordAtIndex(currentIdx); - currentOffset = Index.readOffset(record); - currentSize = Index.readSize(record); + if (idx < 0) + throw new IllegalStateException("Must call advance() before accessing entry content"); } } + private K keyAtIndex(int index) { return keySupport.deserialize(buffer, FILE_PREFIX_SIZE + index * ENTRY_SIZE, descriptor.userVersion); diff --git a/src/java/org/apache/cassandra/journal/Params.java b/src/java/org/apache/cassandra/journal/Params.java index 17e719ce5d..56bacce1d9 100644 --- a/src/java/org/apache/cassandra/journal/Params.java +++ b/src/java/org/apache/cassandra/journal/Params.java @@ -38,6 +38,10 @@ public interface Params */ FlushMode flushMode(); + boolean enableCompaction(); + + int compactionPeriodMillis(); + /** * @return milliseconds between journal flushes */ diff --git a/src/java/org/apache/cassandra/journal/RecordConsumer.java b/src/java/org/apache/cassandra/journal/RecordConsumer.java index e16194001d..3403cd0f23 100644 --- a/src/java/org/apache/cassandra/journal/RecordConsumer.java +++ b/src/java/org/apache/cassandra/journal/RecordConsumer.java @@ -24,5 +24,6 @@ import org.agrona.collections.IntHashSet; @FunctionalInterface public interface RecordConsumer<K> { + default void init() {} void accept(long segment, int position, K key, ByteBuffer buffer, IntHashSet hosts, int userVersion); } diff --git a/src/java/org/apache/cassandra/journal/Segment.java b/src/java/org/apache/cassandra/journal/Segment.java index e548c52128..0da59118b7 100644 --- a/src/java/org/apache/cassandra/journal/Segment.java +++ b/src/java/org/apache/cassandra/journal/Segment.java @@ -24,7 +24,7 @@ import org.apache.cassandra.io.util.File; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.RefCounted; -abstract class Segment<K, V> implements Closeable, RefCounted<Segment<K, V>> +public abstract class Segment<K, V> implements Closeable, RefCounted<Segment<K, V>> { final File file; final Descriptor descriptor; @@ -64,7 +64,7 @@ abstract class Segment<K, V> implements Closeable, RefCounted<Segment<K, V>> EntrySerializer.EntryHolder<K> into = new EntrySerializer.EntryHolder<>(); int offset = Index.readOffset(offsetAndSize); - int size = Index.readSize(offset); + int size = Index.readSize(offsetAndSize); if (read(offset, size, into)) { Invariants.checkState(id.equals(into.key), "Index for %s read incorrect key: expected %s but read %s", descriptor, id, into.key); @@ -83,18 +83,19 @@ abstract class Segment<K, V> implements Closeable, RefCounted<Segment<K, V>> return true; } - void readAll(K id, EntrySerializer.EntryHolder<K> into, Runnable onEntry) + void readAll(K id, EntrySerializer.EntryHolder<K> into, RecordConsumer<K> onEntry) { long[] all = index().lookUpAll(id); - for (int i = 0; i < all.length; i++) { int offset = Index.readOffset(all[i]); int size = Index.readSize(all[i]); Invariants.checkState(read(offset, size, into), "Read should always return true"); - onEntry.run(); + onEntry.accept(descriptor.timestamp, offset, into.key, into.value, into.hosts, into.userVersion); } } abstract boolean read(int offset, int size, EntrySerializer.EntryHolder<K> into); + + abstract void release(); } diff --git a/src/java/org/apache/cassandra/journal/RecordConsumer.java b/src/java/org/apache/cassandra/journal/SegmentCompactor.java similarity index 64% copy from src/java/org/apache/cassandra/journal/RecordConsumer.java copy to src/java/org/apache/cassandra/journal/SegmentCompactor.java index e16194001d..5c95b539fc 100644 --- a/src/java/org/apache/cassandra/journal/RecordConsumer.java +++ b/src/java/org/apache/cassandra/journal/SegmentCompactor.java @@ -17,12 +17,18 @@ */ package org.apache.cassandra.journal; -import java.nio.ByteBuffer; +import java.io.IOException; +import java.util.Collection; -import org.agrona.collections.IntHashSet; - -@FunctionalInterface -public interface RecordConsumer<K> +public interface SegmentCompactor<K, V> { - void accept(long segment, int position, K key, ByteBuffer buffer, IntHashSet hosts, int userVersion); + SegmentCompactor<?, ?> NOOP = (SegmentCompactor<Object, Object>) (segments, keySupport) -> segments; + + static <K, V> SegmentCompactor<K, V> noop() + { + //noinspection unchecked + return (SegmentCompactor<K, V>) NOOP; + } + + Collection<StaticSegment<K, V>> compact(Collection<StaticSegment<K, V>> segments, KeySupport<K> keySupport) throws IOException; } diff --git a/src/java/org/apache/cassandra/journal/Segments.java b/src/java/org/apache/cassandra/journal/Segments.java index 18dfc3bdaf..a779aebf23 100644 --- a/src/java/org/apache/cassandra/journal/Segments.java +++ b/src/java/org/apache/cassandra/journal/Segments.java @@ -17,7 +17,11 @@ */ package org.apache.cassandra.journal; +import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.function.Predicate; import accord.utils.Invariants; import org.agrona.collections.Long2ObjectHashMap; @@ -68,27 +72,37 @@ class Segments<K, V> return new Segments<>(newSegments); } - Segments<K, V> withCompactedSegment(StaticSegment<K, V> oldSegment, StaticSegment<K, V> newSegment) + Segments<K, V> withCompactedSegments(Collection<StaticSegment<K, V>> oldSegments, Collection<StaticSegment<K, V>> compactedSegments) { - Invariants.checkArgument(oldSegment.descriptor.timestamp == newSegment.descriptor.timestamp); - Invariants.checkArgument(oldSegment.descriptor.generation < newSegment.descriptor.generation); Long2ObjectHashMap<Segment<K, V>> newSegments = new Long2ObjectHashMap<>(segments); - Segment<K, V> oldValue = newSegments.put(newSegment.descriptor.timestamp, newSegment); - Invariants.checkState(oldValue == oldSegment); + for (StaticSegment<K, V> oldSegment : oldSegments) + { + Segment<K, V> oldValue = newSegments.remove(oldSegment.descriptor.timestamp); + Invariants.checkState(oldValue == oldSegment); + } + + for (StaticSegment<K, V> compactedSegment : compactedSegments) + { + Segment<K, V> oldValue = newSegments.put(compactedSegment.descriptor.timestamp, compactedSegment); + Invariants.checkState(oldValue == null); + } + return new Segments<>(newSegments); } - Segments<K, V> withoutInvalidatedSegment(StaticSegment<K, V> staticSegment) + Iterable<Segment<K, V>> all() { - Long2ObjectHashMap<Segment<K, V>> newSegments = new Long2ObjectHashMap<>(segments); - if (!newSegments.remove(staticSegment.descriptor.timestamp, staticSegment)) - throw new IllegalStateException(); - return new Segments<>(newSegments); + return this.segments.values(); } - Iterable<Segment<K, V>> all() + /** + * Returns segments in timestamp order. Will allocate and sort the segment collection. + */ + List<Segment<K, V>> allSorted() { - return segments.values(); + List<Segment<K, V>> segments = new ArrayList<>(this.segments.values()); + segments.sort(Comparator.comparing(s -> s.descriptor)); + return segments; } void selectActive(long maxTimestamp, Collection<ActiveSegment<K, V>> into) @@ -136,12 +150,12 @@ class Segments<K, V> * @return a subset of segments with references to them, or {@code null} if failed to grab the refs */ @SuppressWarnings("resource") - ReferencedSegments<K, V> selectAndReference(Iterable<K> ids) + ReferencedSegments<K, V> selectAndReference(Predicate<Segment<K, V>> test) { Long2ObjectHashMap<Segment<K, V>> selectedSegments = null; for (Segment<K, V> segment : segments.values()) { - if (segment.index().mayContainIds(ids)) + if (test.test(segment)) { if (null == selectedSegments) selectedSegments = newMap(10); diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java b/src/java/org/apache/cassandra/journal/StaticSegment.java index f63701771c..c7ac7ce410 100644 --- a/src/java/org/apache/cassandra/journal/StaticSegment.java +++ b/src/java/org/apache/cassandra/journal/StaticSegment.java @@ -23,7 +23,10 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.NoSuchFileException; import java.nio.file.StandardOpenOption; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.locks.LockSupport; import org.agrona.collections.IntHashSet; import org.apache.cassandra.io.util.DataInputBuffer; @@ -38,7 +41,7 @@ import org.apache.cassandra.utils.concurrent.Ref; * Can be compacted with input from {@code PersistedInvalidations} into a new smaller segment, * with invalidated entries removed. */ -final class StaticSegment<K, V> extends Segment<K, V> +public final class StaticSegment<K, V> extends Segment<K, V> { final FileChannel channel; @@ -111,7 +114,6 @@ final class StaticSegment<K, V> extends Segment<K, V> } } - @SuppressWarnings("resource") private static <K, V> StaticSegment<K, V> internalOpen( Descriptor descriptor, SyncedOffsets syncedOffsets, OnDiskIndex<K> index, Metadata metadata, KeySupport<K> keySupport) throws IOException @@ -125,7 +127,43 @@ final class StaticSegment<K, V> extends Segment<K, V> @Override public void close() { - selfRef.release(); + try + { + channel.close(); + } + catch (IOException e) + { + throw new RuntimeException("Could not close static segment " + descriptor, e); + } + + release(); + } + + /** + * Waits until this segment is unreferenced, closes it, and deltes all files associated with it. + */ + void discard() + { + // TODO: consider moving deletion logic to Tidier instead of busy-looping here + waitUntilUnreferenced(); + close(); + for (Component component : Component.values()) + { + File file = descriptor.fileFor(component); + if (file.exists()) + file.delete(); + } + } + + public void waitUntilUnreferenced() + { + while (true) + { + if (selfRef.globalCount() == 1) + return; + + LockSupport.parkNanos(100); + } } @Override @@ -140,6 +178,18 @@ final class StaticSegment<K, V> extends Segment<K, V> return selfRef.ref(); } + @Override + void release() + { + selfRef.release(); + } + + @Override + public String toString() + { + return "StaticSegment{" + descriptor + '}'; + } + private static final class Tidier<K> implements Tidy { private final Descriptor descriptor; @@ -223,57 +273,38 @@ final class StaticSegment<K, V> extends Segment<K, V> */ void forEachRecord(RecordConsumer<K> consumer) { - try (SequentialReader<K> reader = reader(descriptor, keySupport, syncedOffsets.syncedOffset())) + try (SequentialReader<K> reader = sequentialReader(descriptor, keySupport, syncedOffsets.syncedOffset())) { while (reader.advance()) { - consumer.accept(descriptor.timestamp, reader.offset(), reader.id(), reader.record(), reader.hosts(), descriptor.userVersion); + consumer.accept(descriptor.timestamp, reader.offset(), reader.key(), reader.record(), reader.hosts(), descriptor.userVersion); } } } /* - * Sequential reading (replay and components rebuild) + * Sequential and in-key order reading (replay and components rebuild) */ - static <K> SequentialReader<K> reader(Descriptor descriptor, KeySupport<K> keySupport, int fsyncedLimit) - { - return SequentialReader.open(descriptor, keySupport, fsyncedLimit); - } - - /** - * A sequential data segment reader to use for journal replay and rebuilding - * missing auxilirary components (index and metadata). - * </p> - * Unexpected EOF and CRC mismatches in synced portions of segments are treated - * strictly, throwing {@link JournalReadError}. Errors encountered in unsynced portions - * of segments are treated as segment EOF. - */ - static final class SequentialReader<K> implements Closeable + static abstract class Reader<K> implements Closeable { - private final Descriptor descriptor; - private final KeySupport<K> keySupport; - private final int fsyncedLimit; // exclusive + enum State { RESET, ADVANCED, EOF } - private final File file; - private final FileChannel channel; - private final MappedByteBuffer buffer; - private final DataInputBuffer in; + public final Descriptor descriptor; + protected final KeySupport<K> keySupport; - private int offset = -1; - private final EntrySerializer.EntryHolder<K> holder = new EntrySerializer.EntryHolder<>(); - private State state = State.RESET; + protected final File file; + protected final FileChannel channel; + protected final MappedByteBuffer buffer; - static <K> SequentialReader<K> open(Descriptor descriptor, KeySupport<K> keySupport, int fsyncedLimit) - { - return new SequentialReader<>(descriptor, keySupport, fsyncedLimit); - } + protected final EntrySerializer.EntryHolder<K> holder = new EntrySerializer.EntryHolder<>(); + protected int offset = -1; + protected State state = State.RESET; - SequentialReader(Descriptor descriptor, KeySupport<K> keySupport, int fsyncedLimit) + Reader(Descriptor descriptor, KeySupport<K> keySupport) { this.descriptor = descriptor; this.keySupport = keySupport; - this.fsyncedLimit = fsyncedLimit; file = descriptor.fileFor(Component.DATA); try @@ -289,7 +320,6 @@ final class StaticSegment<K, V> extends Segment<K, V> { throw new JournalReadError(descriptor, file, e); } - in = new DataInputBuffer(buffer, false); } @Override @@ -299,37 +329,72 @@ final class StaticSegment<K, V> extends Segment<K, V> FileUtils.clean(buffer); } - int offset() + public abstract boolean advance(); + + public int offset() { ensureHasAdvanced(); return offset; } - K id() + public K key() { ensureHasAdvanced(); return holder.key; } - IntHashSet hosts() + public IntHashSet hosts() { ensureHasAdvanced(); return holder.hosts; } - ByteBuffer record() + public ByteBuffer record() { ensureHasAdvanced(); return holder.value; } - private void ensureHasAdvanced() + protected void ensureHasAdvanced() { if (state != State.ADVANCED) throw new IllegalStateException("Must call advance() before accessing entry content"); } - boolean advance() + protected boolean eof() + { + state = State.EOF; + return false; + } + } + + static <K> SequentialReader<K> sequentialReader(Descriptor descriptor, KeySupport<K> keySupport, int fsyncedLimit) + { + return new SequentialReader<>(descriptor, keySupport, fsyncedLimit); + } + + /** + * A sequential data segment reader to use for journal replay and rebuilding + * missing auxilirary components (index and metadata). + * </p> + * Unexpected EOF and CRC mismatches in synced portions of segments are treated + * strictly, throwing {@link JournalReadError}. Errors encountered in unsynced portions + * of segments are treated as segment EOF. + */ + static final class SequentialReader<K> extends Reader<K> + { + private final int fsyncedLimit; // exclusive + private final DataInputBuffer in; + + SequentialReader(Descriptor descriptor, KeySupport<K> keySupport, int fsyncedLimit) + { + super(descriptor, keySupport); + this.fsyncedLimit = fsyncedLimit; + in = new DataInputBuffer(buffer, false); + } + + @Override + public boolean advance() { if (state == State.EOF) return false; @@ -361,13 +426,56 @@ final class StaticSegment<K, V> extends Segment<K, V> holder.clear(); state = State.RESET; } + } + + public StaticSegment.KeyOrderReader<K> keyOrderReader() + { + return new StaticSegment.KeyOrderReader<>(descriptor, keySupport, index.reader()); + } + + public static final class KeyOrderReader<K> extends Reader<K> implements Comparable<KeyOrderReader<K>> + { + private final OnDiskIndex<K>.IndexReader indexReader; + + KeyOrderReader(Descriptor descriptor, KeySupport<K> keySupport, OnDiskIndex<K>.IndexReader indexReader) + { + super(descriptor, keySupport); + this.indexReader = indexReader; + } - private boolean eof() + @Override + public boolean advance() { - state = State.EOF; - return false; + if (!indexReader.advance()) + return eof(); + + offset = indexReader.offset(); + + buffer.limit(offset + indexReader.recordSize()) + .position(offset); + try + { + EntrySerializer.read(holder, keySupport, buffer, descriptor.userVersion); + } + catch (IOException e) + { + throw new JournalReadError(descriptor, file, e); + } + + state = State.ADVANCED; + return true; } - enum State { RESET, ADVANCED, EOF } + @Override + public int compareTo(KeyOrderReader<K> that) + { + this.ensureHasAdvanced(); + that.ensureHasAdvanced(); + + int cmp = keySupport.compare(this.key(), that.key()); + return cmp != 0 + ? cmp + : this.descriptor.compareTo(that.descriptor); + } } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 932d31356a..80ff9739ce 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -19,7 +19,6 @@ package org.apache.cassandra.service.accord; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; @@ -78,7 +77,8 @@ public class AccordJournal implements IJournal, Shutdownable static final ThreadLocal<byte[]> keyCRCBytes = ThreadLocal.withInitial(() -> new byte[22]); - public final Journal<JournalKey, Object> journal; + private final Journal<JournalKey, Object> journal; + private final AccordJournalTable<JournalKey, Object> journalTable; private final AccordEndpointMapper endpointMapper; private final DelayedRequestProcessor delayedRequestProcessor = new DelayedRequestProcessor(); @@ -94,24 +94,26 @@ public class AccordJournal implements IJournal, Shutdownable File directory = new File(DatabaseDescriptor.getAccordJournalDirectory()); this.journal = new Journal<>("AccordJournal", directory, params, JournalKey.SUPPORT, // In Accord, we are using streaming serialization, i.e. Reader/Writer interfaces instead of materializing objects - new ValueSerializer<JournalKey, Object>() + new ValueSerializer<>() { public int serializedSize(JournalKey key, Object value, int userVersion) { throw new UnsupportedOperationException(); } - public void serialize(JournalKey key, Object value, DataOutputPlus out, int userVersion) throws IOException + public void serialize(JournalKey key, Object value, DataOutputPlus out, int userVersion) { throw new UnsupportedOperationException(); } - public Object deserialize(JournalKey key, DataInputPlus in, int userVersion) throws IOException + public Object deserialize(JournalKey key, DataInputPlus in, int userVersion) { throw new UnsupportedOperationException(); } - }); + }, + new AccordSegmentCompactor<>()); this.endpointMapper = endpointMapper; + this.journalTable = new AccordJournalTable<>(journal, JournalKey.SUPPORT, params.userVersion()); } public AccordJournal start(Node node) @@ -153,7 +155,7 @@ public class AccordJournal implements IJournal, Shutdownable { try { - ExecutorUtils.awaitTermination(timeout, units, Arrays.asList(journal)); + ExecutorUtils.awaitTermination(timeout, units, Collections.singletonList(journal)); return true; } catch (TimeoutException e) @@ -191,9 +193,9 @@ public class AccordJournal implements IJournal, Shutdownable @VisibleForTesting public SavedCommand.Builder loadDiffs(int commandStoreId, TxnId txnId) { + JournalKey key = new JournalKey(txnId, commandStoreId); SavedCommand.Builder builder = new SavedCommand.Builder(); - journal.readAll(new JournalKey(txnId, commandStoreId), - builder::deserializeNext); + journalTable.readAll(key, (ignore, in, userVersion) -> builder.deserializeNext(in, userVersion)); return builder; } @@ -240,7 +242,7 @@ public class AccordJournal implements IJournal, Shutdownable // We can only use strict equality if we supply result. Command reconstructed = diffs.construct(); Invariants.checkState(orig.equals(reconstructed), - "\n" + + '\n' + "Original: %s\n" + "Reconstructed: %s\n" + "Diffs: %s", orig, reconstructed, diffs); @@ -255,6 +257,7 @@ public class AccordJournal implements IJournal, Shutdownable /* * Context necessary to process log records */ + static abstract class RequestContext implements ReplyContext { final long waitForEpoch; @@ -351,7 +354,7 @@ public class AccordJournal implements IJournal, Shutdownable public void start() { - executor = executorFactory().infiniteLoop("AccordJournal-delayed-request-processor", this::run, SAFE, InfiniteLoopExecutor.Daemon.NON_DAEMON, InfiniteLoopExecutor.Interrupts.SYNCHRONIZED); + executor = executorFactory().infiniteLoop("AccordJournal-delayed-request-processor", this, SAFE, InfiniteLoopExecutor.Daemon.NON_DAEMON, InfiniteLoopExecutor.Interrupts.SYNCHRONIZED); } private void delay(RequestContext requestContext) @@ -457,9 +460,9 @@ public class AccordJournal implements IJournal, Shutdownable } } - public boolean isRunnable(Status status) + private boolean isRunnable(Status status) { - return status != Status.TERMINATING && status != status.TERMINATED; + return status != Status.TERMINATING && status != Status.TERMINATED; } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java new file mode 100644 index 0000000000..642f49e417 --- /dev/null +++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.accord; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import accord.utils.Invariants; +import org.agrona.collections.IntHashSet; +import org.agrona.collections.LongHashSet; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ColumnFamilyStore.RefViewFragment; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.StorageHook; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.journal.EntrySerializer.EntryHolder; +import org.apache.cassandra.journal.Journal; +import org.apache.cassandra.journal.KeySupport; +import org.apache.cassandra.journal.RecordConsumer; +import org.apache.cassandra.schema.ColumnMetadata; + +import static org.apache.cassandra.io.sstable.SSTableReadsListener.NOOP_LISTENER; + +public class AccordJournalTable<K, V> +{ + private static final IntHashSet SENTINEL_HOSTS = new IntHashSet(); + + private final Journal<K, V> journal; + private final ColumnFamilyStore cfs; + + private final ColumnMetadata recordColumn; + private final ColumnMetadata versionColumn; + + private final KeySupport<K> keySupport; + private final int accordJournalVersion; + + public AccordJournalTable(Journal<K, V> journal, KeySupport<K> keySupport, int accordJournalVersion) + { + this.journal = journal; + this.cfs = Keyspace.open(AccordKeyspace.metadata().name).getColumnFamilyStore(AccordKeyspace.JOURNAL); + this.recordColumn = cfs.metadata().getColumn(ColumnIdentifier.getInterned("record", false)); + this.versionColumn = cfs.metadata().getColumn(ColumnIdentifier.getInterned("user_version", false)); + this.keySupport = keySupport; + this.accordJournalVersion = accordJournalVersion; + } + + public interface Reader<K> + { + void read(K key, DataInputPlus input, int userVersion) throws IOException; + } + + private abstract class AbstractRecordConsumer implements RecordConsumer<K> + { + protected final Reader<K> reader; + + AbstractRecordConsumer(Reader<K> reader) + { + this.reader = reader; + } + + @Override + public void accept(long segment, int position, K key, ByteBuffer buffer, IntHashSet hosts, int userVersion) + { + try (DataInputBuffer in = new DataInputBuffer(buffer, false)) + { + reader.read(key, in, userVersion); + } + catch (IOException e) + { + // can only throw if serializer is buggy + throw new RuntimeException(e); + } + } + } + + private class TableRecordConsumer extends AbstractRecordConsumer + { + protected LongHashSet visited = null; + + TableRecordConsumer(Reader<K> reader) + { + super(reader); + } + + void visit(long segment) + { + if (visited == null) + visited = new LongHashSet(); + visited.add(segment); + } + + boolean visited(long segment) + { + return visited != null && visited.contains(segment); + } + + @Override + public void accept(long segment, int position, K key, ByteBuffer buffer, IntHashSet hosts, int userVersion) + { + visit(segment); + super.accept(segment, position, key, buffer, hosts, userVersion); + } + } + + private class JournalAndTableRecordConsumer extends AbstractRecordConsumer + { + private final K key; + private final TableRecordConsumer tableRecordConsumer; + + JournalAndTableRecordConsumer(K key, Reader<K> reader) + { + super(reader); + this.key = key; + this.tableRecordConsumer = new TableRecordConsumer(reader); + } + + @Override + public void init() + { + readAllFromTable(key, tableRecordConsumer); + } + + @Override + public void accept(long segment, int position, K key, ByteBuffer buffer, IntHashSet hosts, int userVersion) + { + if (!tableRecordConsumer.visited(segment)) + super.accept(segment, position, key, buffer, hosts, userVersion); + } + } + + /** + * Perform a read from Journal table, followed by the reads from all journal segments. + * <p> + * When reading from journal segments, skip descriptors that were read from the table. + */ + public void readAll(K key, Reader<K> reader) + { + journal.readAll(key, new JournalAndTableRecordConsumer(key, reader)); + } + + private void readAllFromTable(K key, TableRecordConsumer onEntry) + { + DecoratedKey pk = makePartitionKey(cfs, key, keySupport, accordJournalVersion); + + try (RefViewFragment view = cfs.selectAndReference(View.select(SSTableSet.LIVE, pk))) + { + if (view.sstables.isEmpty()) + return; + + List<UnfilteredRowIterator> iters = new ArrayList<>(view.sstables.size()); + for (SSTableReader sstable : view.sstables) + if (sstable.mayContainAssumingKeyIsInRange(pk)) + iters.add(StorageHook.instance.makeRowIterator(cfs, sstable, pk, Slices.ALL, ColumnFilter.all(cfs.metadata()), false, NOOP_LISTENER)); + + if (!iters.isEmpty()) + { + EntryHolder<K> into = new EntryHolder<>(); + try (UnfilteredRowIterator iter = UnfilteredRowIterators.merge(iters)) + { + while (iter.hasNext()) readRow(key, iter.next(), into, onEntry); + } + } + } + } + + public static <K> DecoratedKey makePartitionKey(ColumnFamilyStore cfs, K key, KeySupport<K> keySupport, int version) + { + try (DataOutputBuffer out = new DataOutputBuffer(keySupport.serializedSize(version))) + { + keySupport.serialize(key, out, version); + return cfs.decorateKey(out.buffer(false)); + } + catch (IOException e) + { + // can only throw if (key) serializer is buggy + throw new RuntimeException("Could not serialize key " + key + ", this shouldn't be possible", e); + } + } + + private void readRow(K key, Unfiltered unfiltered, EntryHolder<K> into, RecordConsumer<K> onEntry) + { + Invariants.checkState(unfiltered.isRow()); + Row row = (Row) unfiltered; + + long descriptor = LongType.instance.compose(ByteBuffer.wrap((byte[]) row.clustering().get(0))); + int position = Int32Type.instance.compose(ByteBuffer.wrap((byte[]) row.clustering().get(1))); + + into.key = key; + into.value = row.getCell(recordColumn).buffer(); + into.hosts = SENTINEL_HOSTS; + into.userVersion = Int32Type.instance.compose(row.getCell(versionColumn).buffer()); + + onEntry.accept(descriptor, position, into.key, into.value, into.hosts, into.userVersion); + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 45a85998a6..d58e7841df 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -169,6 +169,7 @@ public class AccordKeyspace { private static final Logger logger = LoggerFactory.getLogger(AccordKeyspace.class); + public static final String JOURNAL = "journal"; public static final String COMMANDS = "commands"; public static final String TIMESTAMPS_FOR_KEY = "timestamps_for_key"; public static final String COMMANDS_FOR_KEY = "commands_for_key"; @@ -223,6 +224,21 @@ public class AccordKeyspace } } + public static final TableMetadata Journal = + parse(JOURNAL, + "accord journal", + "CREATE TABLE %s (" + + "key blob," + + "descriptor bigint," + + "offset int," + + "user_version int," + + "record blob," + + "PRIMARY KEY(key, descriptor, offset)" + + ')') + .partitioner(new LocalPartitioner(BytesType.instance)) + .build(); + + // TODO: store timestamps as blobs (confirm there are no negative numbers, or offset) public static final TableMetadata Commands = parse(COMMANDS, @@ -717,7 +733,7 @@ public class AccordKeyspace public static Tables tables() { - return Tables.of(Commands, TimestampsForKeys, CommandsForKeys, Topologies, EpochMetadata, CommandStoreMetadata); + return Tables.of(Commands, TimestampsForKeys, CommandsForKeys, Topologies, EpochMetadata, CommandStoreMetadata, Journal); } private static <T> ByteBuffer serialize(T obj, LocalVersionedSerializer<T> serializer) throws IOException diff --git a/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java new file mode 100644 index 0000000000..e3f10cb644 --- /dev/null +++ b/src/java/org/apache/cassandra/service/accord/AccordSegmentCompactor.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.accord; + +import java.util.Collection; +import java.util.Collections; +import java.util.PriorityQueue; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.utils.Invariants; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableTxnWriter; +import org.apache.cassandra.journal.KeySupport; +import org.apache.cassandra.journal.SegmentCompactor; +import org.apache.cassandra.journal.StaticSegment; +import org.apache.cassandra.journal.StaticSegment.KeyOrderReader; + +/** + * Segment compactor: takes static segments and compacts them into a single SSTable. + */ +public class AccordSegmentCompactor<K, V> implements SegmentCompactor<K, V> +{ + private static final Logger logger = LoggerFactory.getLogger(AccordSegmentCompactor.class); + + @Override + public Collection<StaticSegment<K, V>> compact(Collection<StaticSegment<K, V>> segments, KeySupport<K> keySupport) + { + Invariants.checkState(segments.size() >= 2, () -> String.format("Can only compact 2 or more segments, but got %d", segments.size())); + logger.info("Compacting {} static segments: {}", segments.size(), segments); + + PriorityQueue<KeyOrderReader<K>> readers = new PriorityQueue<>(); + for (StaticSegment<K, V> segment : segments) + { + KeyOrderReader<K> reader = segment.keyOrderReader(); + if (reader.advance()) + readers.add(reader); + } + + // nothing to compact (all segments empty, should never happen, but it is theoretically possible?) - exit early + // TODO: investigate how this comes to be, check if there is a cleanup issue + if (readers.isEmpty()) + return segments; + + ColumnFamilyStore cfs = Keyspace.open(AccordKeyspace.metadata().name).getColumnFamilyStore(AccordKeyspace.JOURNAL); + Descriptor descriptor = cfs.newSSTableDescriptor(cfs.getDirectories().getDirectoryForNewSSTables()); + SerializationHeader header = new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS); + + try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, descriptor, 0, 0, null, false, header)) + { + K key = null; + PartitionUpdate.SimpleBuilder partitionBuilder = null; + + try + { + KeyOrderReader<K> reader; + while ((reader = readers.poll()) != null) + { + if (!reader.key().equals(key)) // first ever - or new - key + { + if (partitionBuilder != null) // append previous partition if any + writer.append(partitionBuilder.build().unfilteredIterator()); + + key = reader.key(); + partitionBuilder = PartitionUpdate.simpleBuilder( + AccordKeyspace.Journal, AccordJournalTable.makePartitionKey(cfs, key, keySupport, reader.descriptor.userVersion) + ); + } + + boolean advanced; + do + { + partitionBuilder.row(reader.descriptor.timestamp, reader.offset()) + .add("record", reader.record()) + .add("user_version", reader.descriptor.userVersion); + } + while ((advanced = reader.advance()) && reader.key().equals(key)); + + if (advanced) readers.offer(reader); // there is more to this reader, but not with this key + } + + //noinspection DataFlowIssue + writer.append(partitionBuilder.build().unfilteredIterator()); // append the last partition + } + catch (Throwable t) + { + Throwable accumulate = writer.abort(t); + Throwables.throwIfUnchecked(accumulate); + throw new RuntimeException(accumulate); + } + + cfs.addSSTables(writer.finish(true)); + return Collections.emptyList(); + } + } +} + diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java index 9e49cf3c47..05e4e31a0d 100644 --- a/src/java/org/apache/cassandra/service/accord/IAccordService.java +++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -126,6 +127,8 @@ public interface IAccordService class CompactionInfo { + static final Supplier<CompactionInfo> NO_OP = () -> new CompactionInfo(new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>(), DurableBefore.EMPTY); + public final Int2ObjectHashMap<RedundantBefore> redundantBefores; public final Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges; public final DurableBefore durableBefore; diff --git a/src/java/org/apache/cassandra/service/accord/JournalKey.java b/src/java/org/apache/cassandra/service/accord/JournalKey.java index aa0c747396..c31c337882 100644 --- a/src/java/org/apache/cassandra/service/accord/JournalKey.java +++ b/src/java/org/apache/cassandra/service/accord/JournalKey.java @@ -36,9 +36,9 @@ import static org.apache.cassandra.db.TypeSizes.SHORT_SIZE; public final class JournalKey { - final Timestamp timestamp; + public final Timestamp timestamp; // TODO: command store id _before_ timestamp - final int commandStoreId; + public final int commandStoreId; JournalKey(Timestamp timestamp) { @@ -60,7 +60,7 @@ public final class JournalKey * when ordering timestamps. This is done for more precise elimination of candidate * segments by min/max record key in segment. */ - static final KeySupport<JournalKey> SUPPORT = new KeySupport<>() + public static final KeySupport<JournalKey> SUPPORT = new KeySupport<>() { private static final int HLC_OFFSET = 0; private static final int EPOCH_AND_FLAGS_OFFSET = HLC_OFFSET + LONG_SIZE; diff --git a/src/java/org/apache/cassandra/service/accord/SavedCommand.java b/src/java/org/apache/cassandra/service/accord/SavedCommand.java index 090e4da755..5f5fb9b9c9 100644 --- a/src/java/org/apache/cassandra/service/accord/SavedCommand.java +++ b/src/java/org/apache/cassandra/service/accord/SavedCommand.java @@ -259,30 +259,128 @@ public class SavedCommand return oldFlags | (1 << field.ordinal()); } - public static class Builder { - TxnId txnId = null; + TxnId txnId; + + Timestamp executeAt; + Timestamp executeAtLeast; + SaveStatus saveStatus; + Status.Durability durability; + + Ballot acceptedOrCommitted; + Ballot promised; + + Route<?> route; + PartialTxn partialTxn; + PartialDeps partialDeps; + Seekables<?, ?> additionalKeysOrRanges; + + SavedCommand.WaitingOnProvider waitingOn; + Writes writes; + Result result; + + boolean nextCalled; + int count; + + public Builder() + { + clear(); + } + + public TxnId txnId() + { + return txnId; + } + + public Timestamp executeAt() + { + return executeAt; + } - Timestamp executeAt = null; - Timestamp executeAtLeast = null; - SaveStatus saveStatus = null; - Status.Durability durability = null; + public SaveStatus saveStatus() + { + return saveStatus; + } + + public Status.Durability durability() + { + return durability; + } - Ballot acceptedOrCommitted = Ballot.ZERO; - Ballot promised = null; + public Ballot acceptedOrCommitted() + { + return acceptedOrCommitted; + } - Route<?> route = null; - PartialTxn partialTxn = null; - PartialDeps partialDeps = null; - Seekables<?, ?> additionalKeysOrRanges = null; + public Ballot promised() + { + return promised; + } - SavedCommand.WaitingOnProvider waitingOn = (txn, deps) -> null; - Writes writes = null; - Result result = CommandSerializers.APPLIED; + public Route<?> route() + { + return route; + } - boolean nextCalled = false; - int count = 0; + public PartialTxn partialTxn() + { + return partialTxn; + } + + public PartialDeps partialDeps() + { + return partialDeps; + } + + public Seekables<?, ?> additionalKeysOrRanges() + { + return additionalKeysOrRanges; + } + + public SavedCommand.WaitingOnProvider waitingOn() + { + return waitingOn; + } + + public Writes writes() + { + return writes; + } + + public Result result() + { + return result; + } + + public void clear() + { + txnId = null; + + executeAt = null; + saveStatus = null; + durability = null; + + acceptedOrCommitted = Ballot.ZERO; + promised = null; + + route = null; + partialTxn = null; + partialDeps = null; + additionalKeysOrRanges = null; + + waitingOn = (txn, deps) -> null; + writes = null; + result = CommandSerializers.APPLIED; + + nextCalled = false; + count = 0; + } + + public boolean isEmpty() + { + return !nextCalled; + } public int count() { diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java similarity index 98% rename from test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalTest.java rename to test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java index 7f7dfb0767..80d48b6091 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java @@ -31,7 +31,7 @@ import org.apache.cassandra.distributed.shared.WithProperties; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.utils.concurrent.CountDownLatch; -public class AccordJournalTest extends TestBaseImpl +public class AccordJournalIntegrationTest extends TestBaseImpl { @Test public void saveLoadSanityCheck() throws Throwable diff --git a/test/distributed/org/apache/cassandra/journal/AccordJournalCompactionTest.java b/test/distributed/org/apache/cassandra/journal/AccordJournalCompactionTest.java new file mode 100644 index 0000000000..a2161c4386 --- /dev/null +++ b/test/distributed/org/apache/cassandra/journal/AccordJournalCompactionTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.journal; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.service.accord.AccordJournalTable; +import org.apache.cassandra.service.accord.AccordSegmentCompactor; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.TimeUUID; + +import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; + +public class AccordJournalCompactionTest +{ + private static final Set<Integer> SENTINEL_HOSTS = Collections.singleton(0); + + @BeforeClass + public static void setUp() + { + DatabaseDescriptor.daemonInitialization(); + ServerTestUtils.prepareServer(); + } + + @Test + public void segmentMergeTest() throws IOException + { + File directory = new File(Files.createTempDirectory(null)); + directory.deleteOnExit(); + + Journal<TimeUUID, ByteBuffer> journal = journal(directory); + AccordJournalTable<TimeUUID, ByteBuffer> journalTable = new AccordJournalTable<>(journal, journal.keySupport, journal.params.userVersion()); + journal.start(); + + Map<TimeUUID, List<ByteBuffer >> uuids = new HashMap<>(); + + int count = 0; + for (int i = 0; i < 1024 * 5; i++) + { + TimeUUID uuid = nextTimeUUID(); + for (long j = 0; j < 5; j++) + { + ByteBuffer buf = ByteBuffer.allocate(1024); + for (int k = 0; k < 1024; k++) + buf.put((byte) count); + count++; + buf.rewind(); + uuids.computeIfAbsent(uuid, (k) -> new ArrayList<>()) + .add(buf); + journal.asyncWrite(uuid, buf, SENTINEL_HOSTS); + } + } + + journal.closeCurrentSegmentForTesting(); + Runnable checkAll = () -> { + for (Map.Entry<TimeUUID, List<ByteBuffer>> e : uuids.entrySet()) + { + List<ByteBuffer> expected = e.getValue(); + + List<ByteBuffer> actual = new ArrayList<>(); + journalTable.readAll(e.getKey(), (key, in, userVersion) -> actual.add(journal.valueSerializer.deserialize(key, in, userVersion))); + Assert.assertEquals(actual.size(), expected.size()); + for (int i = 0; i < actual.size(); i++) + { + if (!actual.get(i).equals(expected.get(i))) + { + StringBuilder sb = new StringBuilder(); + sb.append("Actual:\n"); + for (ByteBuffer bb : actual) + sb.append(ByteBufferUtil.bytesToHex(bb)).append('\n'); + sb.append("Expected:\n"); + for (ByteBuffer bb : expected) + sb.append(ByteBufferUtil.bytesToHex(bb)).append('\n'); + throw new AssertionError(sb.toString()); + } + } + } + }; + + checkAll.run(); + journal.runCompactorForTesting(); + checkAll.run(); + journal.shutdown(); + } + + private static Journal<TimeUUID, ByteBuffer> journal(File directory) + { + return new Journal<>("TestJournal", directory, + new TestParams() { + @Override + public int segmentSize() + { + return 1024 * 1024; + } + + @Override + public boolean enableCompaction() + { + return false; + } + }, + TimeUUIDKeySupport.INSTANCE, + JournalTest.ByteBufferSerializer.INSTANCE, + new AccordSegmentCompactor<>()); + } +} diff --git a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java index 5a3b23076e..8ff8938645 100644 --- a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java +++ b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java @@ -38,6 +38,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.journal.Journal; import org.apache.cassandra.journal.KeySupport; import org.apache.cassandra.journal.RecordPointer; +import org.apache.cassandra.journal.SegmentCompactor; import org.apache.cassandra.journal.ValueSerializer; import org.junit.Assert; @@ -78,7 +79,8 @@ public class AccordJournalSimulationTest extends SimulationTestBase new File("/journal"), new AccordSpec.JournalSpec(), new IdentityKeySerializer(), - new IdentityValueSerializer()); + new IdentityValueSerializer(), + SegmentCompactor.noop()); }), () -> check()); } diff --git a/test/unit/org/apache/cassandra/journal/IndexTest.java b/test/unit/org/apache/cassandra/journal/IndexTest.java index 5e7314a338..8f1046f2c3 100644 --- a/test/unit/org/apache/cassandra/journal/IndexTest.java +++ b/test/unit/org/apache/cassandra/journal/IndexTest.java @@ -224,20 +224,18 @@ public class IndexTest sortedEntries.add(Pair.create(entry.getKey(), l)); } - Index.IndexIterator<TimeUUID> iter = onDisk.iterator(); + OnDiskIndex<TimeUUID>.IndexReader iter = onDisk.reader(); Iterator<Pair<TimeUUID, Long>> expectedIter = sortedEntries.iterator(); - while (iter.hasNext()) + while (iter.advance()) { - iter.next(); Pair<TimeUUID, Long> expected = expectedIter.next(); - Assert.assertEquals(iter.currentKey(), expected.left); - Assert.assertEquals(iter.currentSize(), Index.readSize(expected.right)); - Assert.assertEquals(iter.currentOffset(), Index.readOffset(expected.right)); + Assert.assertEquals(iter.key(), expected.left); + Assert.assertEquals(iter.recordSize(), Index.readSize(expected.right)); + Assert.assertEquals(iter.offset(), Index.readOffset(expected.right)); } } } - private static void assertIndex(Map<TimeUUID, long[]> expected, Index<TimeUUID> actual) { List<TimeUUID> keys = expected.entrySet() diff --git a/test/unit/org/apache/cassandra/journal/JournalTest.java b/test/unit/org/apache/cassandra/journal/JournalTest.java index 241e465ba8..bab37ca150 100644 --- a/test/unit/org/apache/cassandra/journal/JournalTest.java +++ b/test/unit/org/apache/cassandra/journal/JournalTest.java @@ -18,27 +18,33 @@ package org.apache.cassandra.journal; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.Collections; +import java.util.Set; import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.File; import org.apache.cassandra.utils.TimeUUID; -import static org.junit.Assert.assertEquals; import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID; +import static org.junit.Assert.assertEquals; public class JournalTest { + private static final Set<Integer> SENTINEL_HOSTS = Collections.singleton(0); + @BeforeClass public static void setUp() { DatabaseDescriptor.daemonInitialization(); + ServerTestUtils.prepareServer(); } @Test @@ -48,7 +54,8 @@ public class JournalTest directory.deleteRecursiveOnExit(); Journal<TimeUUID, Long> journal = - new Journal<>("TestJournal", directory, TestParams.INSTANCE, TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE); + new Journal<>("TestJournal", directory, TestParams.INSTANCE, TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop()); + journal.start(); @@ -69,7 +76,7 @@ public class JournalTest journal.shutdown(); - journal = new Journal<>("TestJournal", directory, TestParams.INSTANCE, TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE); + journal = new Journal<>("TestJournal", directory, TestParams.INSTANCE, TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop()); journal.start(); assertEquals(1L, (long) journal.readFirst(id1)); @@ -80,6 +87,29 @@ public class JournalTest journal.shutdown(); } + static class ByteBufferSerializer implements ValueSerializer<TimeUUID, ByteBuffer> + { + static final ByteBufferSerializer INSTANCE = new ByteBufferSerializer(); + + public int serializedSize(TimeUUID key, ByteBuffer value, int userVersion) + { + return Integer.BYTES + value.capacity(); + } + + public void serialize(TimeUUID key, ByteBuffer value, DataOutputPlus out, int userVersion) throws IOException + { + out.writeInt(value.capacity()); + out.write(value); + } + + public ByteBuffer deserialize(TimeUUID key, DataInputPlus in, int userVersion) throws IOException + { + byte[] bytes = new byte[in.readInt()]; + in.readFully(bytes); + return ByteBuffer.wrap(bytes); + } + } + static class LongSerializer implements ValueSerializer<TimeUUID, Long> { static final LongSerializer INSTANCE = new LongSerializer(); diff --git a/test/unit/org/apache/cassandra/journal/SegmentTest.java b/test/unit/org/apache/cassandra/journal/SegmentTest.java index 2e59d701cb..573ba4c9e0 100644 --- a/test/unit/org/apache/cassandra/journal/SegmentTest.java +++ b/test/unit/org/apache/cassandra/journal/SegmentTest.java @@ -203,26 +203,26 @@ public class SegmentTest activeSegment.close(); - StaticSegment.SequentialReader<TimeUUID> reader = StaticSegment.reader(descriptor, TimeUUIDKeySupport.INSTANCE, 0); + StaticSegment.SequentialReader<TimeUUID> reader = StaticSegment.sequentialReader(descriptor, TimeUUIDKeySupport.INSTANCE, 0); // read all 4 entries sequentially and compare with originals assertTrue(reader.advance()); - assertEquals(id1, reader.id()); + assertEquals(id1, reader.key()); assertEquals(hosts1, reader.hosts()); assertEquals(record1, reader.record()); assertTrue(reader.advance()); - assertEquals(id2, reader.id()); + assertEquals(id2, reader.key()); assertEquals(hosts2, reader.hosts()); assertEquals(record2, reader.record()); assertTrue(reader.advance()); - assertEquals(id3, reader.id()); + assertEquals(id3, reader.key()); assertEquals(hosts3, reader.hosts()); assertEquals(record3, reader.record()); assertTrue(reader.advance()); - assertEquals(id4, reader.id()); + assertEquals(id4, reader.key()); assertEquals(hosts4, reader.hosts()); assertEquals(record4, reader.record()); diff --git a/test/unit/org/apache/cassandra/journal/TestParams.java b/test/unit/org/apache/cassandra/journal/TestParams.java index 9a9254ce9b..5773c4763a 100644 --- a/test/unit/org/apache/cassandra/journal/TestParams.java +++ b/test/unit/org/apache/cassandra/journal/TestParams.java @@ -41,6 +41,18 @@ public class TestParams implements Params return FlushMode.GROUP; } + @Override + public boolean enableCompaction() + { + return false; + } + + @Override + public int compactionPeriodMillis() + { + return 60_000; + } + @Override public int flushPeriodMillis() { diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index a2fd50d5f9..80a7b41769 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -398,7 +398,7 @@ public class AccordTestUtils if (new File(DatabaseDescriptor.getAccordJournalDirectory()).exists()) ServerTestUtils.cleanupDirectory(DatabaseDescriptor.getAccordJournalDirectory()); - AccordJournal journal = new AccordJournal(null, new AccordSpec.JournalSpec()); + AccordJournal journal = new AccordJournal(SimpleAccordEndpointMapper.INSTANCE, new AccordSpec.JournalSpec()); journal.start(null); SingleEpochRanges holder = new SingleEpochRanges(topology.rangesForNode(node)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org