This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new 1bf039709e Truncate mutation journal as logs get reconciled
1bf039709e is described below
commit 1bf039709ec72fa6b08a31effb0972fe63757f0f
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Tue Sep 23 16:44:45 2025 +0100
Truncate mutation journal as logs get reconciled
patch by Aleksey Yeschenko; reviewed by Alex Petrov for CASSANDRA-20710
---
.../apache/cassandra/journal/ActiveSegment.java | 41 ++-
.../org/apache/cassandra/journal/Component.java | 10 +-
.../org/apache/cassandra/journal/Descriptor.java | 9 +-
.../org/apache/cassandra/journal/DumpUtil.java | 2 +-
.../apache/cassandra/journal/EntrySerializer.java | 11 +-
src/java/org/apache/cassandra/journal/Journal.java | 198 ++++++++----
.../apache/cassandra/journal/JournalReadError.java | 4 +-
.../cassandra/journal/JournalWriteError.java | 4 +-
.../org/apache/cassandra/journal/KeyStats.java | 97 ++++++
.../org/apache/cassandra/journal/KeySupport.java | 4 +-
.../org/apache/cassandra/journal/Metadata.java | 3 +-
.../org/apache/cassandra/journal/OnDiskIndex.java | 5 +-
src/java/org/apache/cassandra/journal/Segment.java | 10 +
.../org/apache/cassandra/journal/Segments.java | 34 +-
.../apache/cassandra/journal/StaticSegment.java | 47 ++-
.../apache/cassandra/journal/ValueSerializer.java | 21 ++
.../cassandra/replication/CoordinatorLog.java | 5 +
.../cassandra/replication/CoordinatorLogId.java | 2 +-
.../cassandra/replication/Log2OffsetsMap.java | 10 +-
.../cassandra/replication/MutationJournal.java | 351 ++++++++++++++++++---
.../replication/MutationTrackingService.java | 38 ++-
.../org/apache/cassandra/replication/Offsets.java | 23 +-
.../org/apache/cassandra/replication/Shard.java | 5 +
.../cassandra/service/accord/AccordJournal.java | 22 +-
.../cassandra/service/accord/JournalKey.java | 4 +-
.../service/reads/tracked/ReadReconciliations.java | 8 +-
.../service/reads/tracked/TrackedLocalReads.java | 17 +-
.../service/reads/tracked/TrackedRead.java | 10 +-
src/java/org/apache/cassandra/utils/Crc.java | 20 ++
.../org/apache/cassandra/utils/FBUtilities.java | 2 +
.../tracking/MutationTrackingLogPersisterTest.java | 100 ++++++
.../test/AccordJournalSimulationTest.java | 17 +-
.../org/apache/cassandra/journal/JournalTest.java | 18 +-
.../org/apache/cassandra/journal/SegmentTest.java | 12 +-
.../org/apache/cassandra/journal/SegmentsTest.java | 9 +-
.../cassandra/journal/TimeUUIDKeySupport.java | 11 +-
.../cassandra/replication/MutationJournalTest.java | 181 +++++++++--
.../apache/cassandra/replication/OffsetsTest.java | 42 +++
38 files changed, 1159 insertions(+), 248 deletions(-)
diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index 140371f980..e84ca40bcf 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -79,12 +79,19 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
private final Ref<Segment<K, V>> selfRef;
final InMemoryIndex<K> index;
+ final KeyStats.Active<K> keyStats;
private ActiveSegment(
- Descriptor descriptor, Params params, InMemoryIndex<K> index, Metadata
metadata, KeySupport<K> keySupport)
+ Descriptor descriptor,
+ Params params,
+ InMemoryIndex<K> index,
+ Metadata metadata,
+ KeyStats.Active<K> keyStats,
+ KeySupport<K> keySupport)
{
super(descriptor, metadata, keySupport);
this.index = index;
+ this.keyStats = keyStats;
try
{
channel = FileChannel.open(file.toPath(),
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
@@ -98,16 +105,12 @@ public final class ActiveSegment<K, V> extends Segment<K,
V>
}
}
- public CommitLogPosition currentPosition()
- {
- return new CommitLogPosition(id(), (int) allocateOffset);
- }
-
- static <K, V> ActiveSegment<K, V> create(Descriptor descriptor, Params
params, KeySupport<K> keySupport)
+ static <K, V> ActiveSegment<K, V> create(
+ Descriptor descriptor, Params params, KeySupport<K> keySupport,
KeyStats.Factory<K> keyStatsFactory)
{
InMemoryIndex<K> index = InMemoryIndex.create(keySupport);
Metadata metadata = Metadata.empty();
- return new ActiveSegment<>(descriptor, params, index, metadata,
keySupport);
+ return new ActiveSegment<>(descriptor, params, index, metadata,
keyStatsFactory.create(), keySupport);
}
@Override
@@ -116,6 +119,16 @@ public final class ActiveSegment<K, V> extends Segment<K,
V>
return index;
}
+ public KeyStats.Active<K> keyStats()
+ {
+ return keyStats;
+ }
+
+ public CommitLogPosition currentPosition()
+ {
+ return new CommitLogPosition(id(), (int) allocateOffset);
+ }
+
boolean isEmpty()
{
return allocateOffset == 0;
@@ -225,6 +238,7 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
{
index.persist(descriptor);
metadata.persist(descriptor);
+ keyStats.persist(descriptor);
SyncUtil.trySyncDir(descriptor.directory);
}
@@ -236,6 +250,7 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
descriptor.fileFor(Component.DATA).deleteIfExists();
descriptor.fileFor(Component.INDEX).deleteIfExists();
descriptor.fileFor(Component.METADATA).deleteIfExists();
+ descriptor.fileFor(Component.KEYSTATS).deleteIfExists();
}
@Override
@@ -290,6 +305,7 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
}
}
+ @Override
public boolean isFlushed(long position)
{
return writtenTo >= position;
@@ -465,18 +481,14 @@ public final class ActiveSegment<K, V> extends Segment<K,
V>
this.buffer = buffer;
}
- Segment<K, V> segment()
- {
- return ActiveSegment.this;
- }
-
void write(K id, ByteBuffer record)
{
try
{
EntrySerializer.write(id, record, keySupport, buffer,
descriptor.userVersion);
- metadata.update();
index.update(id, position, length);
+ keyStats.update(id);
+ metadata.update();
}
catch (IOException e)
{
@@ -508,6 +520,7 @@ public final class ActiveSegment<K, V> extends Segment<K, V>
{
EntrySerializer.write(id, record, keySupport, buffer,
descriptor.userVersion);
index.update(id, position, length);
+ keyStats.update(id);
metadata.update();
}
catch (IOException e)
diff --git a/src/java/org/apache/cassandra/journal/Component.java
b/src/java/org/apache/cassandra/journal/Component.java
index 07da71536a..8e99708de0 100644
--- a/src/java/org/apache/cassandra/journal/Component.java
+++ b/src/java/org/apache/cassandra/journal/Component.java
@@ -24,15 +24,17 @@ import org.apache.cassandra.io.util.File;
import static accord.utils.SortedArrays.SortedArrayList.ofSorted;
-enum Component
+public enum Component
{
- DATA ("data"),
- INDEX ("indx"),
- METADATA ("meta");
+ DATA ("data"),
+ INDEX ("indx"),
+ METADATA ("meta"),
+ KEYSTATS ("keys");
//OFFSET_MAP (".offs"),
//INVLALIDATIONS (".invl");
public static final List<Component> VALUES = ofSorted(values());
+
final String extension;
Component(String extension)
diff --git a/src/java/org/apache/cassandra/journal/Descriptor.java
b/src/java/org/apache/cassandra/journal/Descriptor.java
index bac6c7029f..e8e8556b56 100644
--- a/src/java/org/apache/cassandra/journal/Descriptor.java
+++ b/src/java/org/apache/cassandra/journal/Descriptor.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.io.util.File;
import static java.lang.String.format;
@@ -90,7 +92,8 @@ public final class Descriptor implements
Comparable<Descriptor>
this.userVersion = userVersion;
}
- static Descriptor create(File directory, long timestamp, int userVersion)
+ @VisibleForTesting
+ public static Descriptor create(File directory, long timestamp, int
userVersion)
{
return new Descriptor(directory, timestamp, 1,
CURRENT_JOURNAL_VERSION, userVersion);
}
@@ -114,12 +117,12 @@ public final class Descriptor implements
Comparable<Descriptor>
return fromName(file.parent(), file.name());
}
- File fileFor(Component component)
+ public File fileFor(Component component)
{
return new File(directory, formatFileName(component));
}
- File tmpFileFor(Component component)
+ public File tmpFileFor(Component component)
{
return new File(directory, formatFileName(component) + '.' +
TMP_SUFFIX);
}
diff --git a/src/java/org/apache/cassandra/journal/DumpUtil.java
b/src/java/org/apache/cassandra/journal/DumpUtil.java
index d98e758384..404a688a38 100644
--- a/src/java/org/apache/cassandra/journal/DumpUtil.java
+++ b/src/java/org/apache/cassandra/journal/DumpUtil.java
@@ -37,6 +37,6 @@ public class DumpUtil
public static <K, V> StaticSegment<K, V> open(Descriptor descriptor,
KeySupport<K> keySupport)
{
- return StaticSegment.open(descriptor, keySupport);
+ return StaticSegment.open(descriptor, keySupport,
KeyStats.Factory.noop());
}
}
diff --git a/src/java/org/apache/cassandra/journal/EntrySerializer.java
b/src/java/org/apache/cassandra/journal/EntrySerializer.java
index 454234454e..6861cdae47 100644
--- a/src/java/org/apache/cassandra/journal/EntrySerializer.java
+++ b/src/java/org/apache/cassandra/journal/EntrySerializer.java
@@ -27,11 +27,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;
-import static org.apache.cassandra.journal.Journal.validateCRC;
-
/**
* Entry format:
- *
* [Total Size (4 bytes)]
* [Header (variable size)]
* [Header CRC (4 bytes)]
@@ -95,10 +92,10 @@ public final class EntrySerializer
CRC32 crc = Crc.crc32();
int headerSize = EntrySerializer.headerSize(keySupport,
userVersion);
int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize);
- validateCRC(crc, headerCrc);
+ Crc.validate(crc, headerCrc);
int recordCrc = readAndUpdateRecordCrc(crc, from, start +
totalSize);
- validateCRC(crc, recordCrc);
+ Crc.validate(crc, recordCrc);
}
readValidated(into, from, start, keySupport, userVersion);
@@ -142,7 +139,7 @@ public final class EntrySerializer
int headerCrc = readAndUpdateHeaderCrc(crc, from, headerSize);
try
{
- validateCRC(crc, headerCrc);
+ Crc.validate(crc, headerCrc);
}
catch (IOException e)
{
@@ -152,7 +149,7 @@ public final class EntrySerializer
int recordCrc = readAndUpdateRecordCrc(crc, from, start +
totalSize);
try
{
- validateCRC(crc, recordCrc);
+ Crc.validate(crc, recordCrc);
}
catch (IOException e)
{
diff --git a/src/java/org/apache/cassandra/journal/Journal.java
b/src/java/org/apache/cassandra/journal/Journal.java
index cf3135d77f..136d70a9dc 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -24,14 +24,15 @@ import java.nio.file.FileStore;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.*;
-import java.util.zip.CRC32;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -56,7 +57,6 @@ import
org.apache.cassandra.journal.Segments.ReferencedSegments;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.Crc;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MergeIterator;
import org.apache.cassandra.utils.Simulate;
@@ -77,10 +77,12 @@ import static
org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
/**
* A generic append-only journal with some special features:
- * <p><ul>
+ * <p>
+ * <ul>
* <li>Records can be looked up by key
* <li>Invalidated records get purged during segment compaction
- * </ul><p>
+ * </ul>
+ * </p>
*
* Type parameters:
* @param <V> the type of records stored in the journal
@@ -98,6 +100,7 @@ public class Journal<K, V> implements Shutdownable
final KeySupport<K> keySupport;
final ValueSerializer<K, V> valueSerializer;
+ final KeyStats.Factory<K> keyStatsFactory;
final Metrics<K, V> metrics;
@@ -170,6 +173,7 @@ public class Journal<K, V> implements Shutdownable
this.onFlush = onFlush;
}
+ @Override
public void run()
{
onFlush.run();
@@ -206,11 +210,60 @@ public class Journal<K, V> implements Shutdownable
}
}
+ public static class Builder<K, V>
+ {
+ private final String name;
+ private final File directory;
+ private final Params params;
+ private final KeySupport<K> keySupport;
+
+ private ValueSerializer<K, V> valueSerializer = ValueSerializer.none();
+ private KeyStats.Factory<K> keyStatsFactory = KeyStats.Factory.noop();
+ private SegmentCompactor<K, V> segmentCompactor =
SegmentCompactor.noop();
+
+ public Builder(String name, File directory, Params params,
KeySupport<K> keySupport)
+ {
+ this.name = name;
+ this.directory = directory;
+ this.params = params;
+ this.keySupport = keySupport;
+ }
+
+ public Journal<K, V> build()
+ {
+ return new Journal<>(name, directory, params, keySupport,
valueSerializer, keyStatsFactory, segmentCompactor);
+ }
+
+ public Builder<K, V> valueSerializer(ValueSerializer<K, V>
valueSerializer)
+ {
+ this.valueSerializer = valueSerializer;
+ return this;
+ }
+
+ public Builder<K, V> keyStatsFactory(KeyStats.Factory<K>
keyStatsFactory)
+ {
+ this.keyStatsFactory = keyStatsFactory;
+ return this;
+ }
+
+ public Builder<K, V> segmentCompactor(SegmentCompactor<K, V>
segmentCompactor)
+ {
+ this.segmentCompactor = segmentCompactor;
+ return this;
+ }
+ }
+
+ public static <K, V> Builder<K, V> builder(String name, File directory,
Params params, KeySupport<K> keySupport)
+ {
+ return new Builder<>(name, directory, params, keySupport);
+ }
+
public Journal(String name,
File directory,
Params params,
KeySupport<K> keySupport,
ValueSerializer<K, V> valueSerializer,
+ KeyStats.Factory<K> keyStatsFactory,
SegmentCompactor<K, V> segmentCompactor)
{
this.name = name;
@@ -219,6 +272,7 @@ public class Journal<K, V> implements Shutdownable
this.keySupport = keySupport;
this.valueSerializer = valueSerializer;
+ this.keyStatsFactory = keyStatsFactory;
this.metrics = new Metrics<>(name);
this.flusherCallbacks = new FlusherCallbacks();
@@ -250,7 +304,7 @@ public class Journal<K, V> implements Shutdownable
: descriptors.get(descriptors.size() - 1).timestamp;
nextSegmentId.set(replayLimit = Math.max(currentTimeMillis(),
maxTimestamp + 1));
- segments.set(Segments.of(StaticSegment.open(descriptors, keySupport)));
+ segments.set(Segments.of(StaticSegment.open(descriptors, keySupport,
keyStatsFactory)));
closer = executorFactory().sequential(name + "-closer");
releaser = executorFactory().sequential(name + "-releaser");
allocator = executorFactory().infiniteLoop(name + "-allocator", new
AllocateRunnable(), SAFE, NON_DAEMON, SYNCHRONIZED);
@@ -261,12 +315,6 @@ public class Journal<K, V> implements Shutdownable
compactor.start();
}
- @VisibleForTesting
- public void runCompactorForTesting()
- {
- compactor.run();
- }
-
public Compactor<K, V> compactor()
{
return compactor;
@@ -287,6 +335,7 @@ public class Journal<K, V> implements Shutdownable
return state.get() == State.TERMINATED;
}
+ @Override
public void shutdown()
{
try
@@ -606,13 +655,6 @@ public class Journal<K, V> implements Shutdownable
}
}
- // TODO (require): Find a better way to test unwritten allocations and/or
corruption
- @VisibleForTesting
- public void unsafeConsumeBytesForTesting(int entrySize,
Consumer<ByteBuffer> corrupt)
- {
- allocate(entrySize).consumeBufferUnsafe(corrupt);
- }
-
private ActiveSegment<K, V>.Allocation allocate(int entrySize)
{
ActiveSegment<K, V> segment = currentSegment;
@@ -793,7 +835,7 @@ public class Journal<K, V> implements Shutdownable
private ActiveSegment<K, V> createSegment()
{
Descriptor descriptor = Descriptor.create(directory,
nextSegmentId.getAndIncrement(), params.userVersion());
- return ActiveSegment.create(descriptor, params, keySupport);
+ return ActiveSegment.create(descriptor, params, keySupport,
keyStatsFactory);
}
private void closeAllSegments()
@@ -810,7 +852,7 @@ public class Journal<K, V> implements Shutdownable
}
@SuppressWarnings("unused")
- public ReferencedSegments<K, V> selectAndReference(Predicate<Segment<K,V>>
selector)
+ ReferencedSegments<K, V> selectAndReference(Predicate<Segment<K,V>>
selector)
{
while (true)
{
@@ -847,6 +889,11 @@ public class Journal<K, V> implements Shutdownable
swapSegments(current -> current.withoutEmptySegment(activeSegment));
}
+ private void removeStaticSegments(Collection<StaticSegment<K, V>>
staticSegments)
+ {
+ swapSegments(current -> current.withoutStaticSegments(staticSegments));
+ }
+
private void replaceCompletedSegment(ActiveSegment<K, V> activeSegment,
StaticSegment<K, V> staticSegment)
{
swapSegments(current -> current.withCompletedSegment(activeSegment,
staticSegment));
@@ -948,7 +995,7 @@ public class Journal<K, V> implements Shutdownable
activeSegment.updateWrittenTo();
activeSegment.fsync();
activeSegment.persistComponents();
- replaceCompletedSegment(activeSegment,
StaticSegment.open(activeSegment.descriptor, keySupport));
+ replaceCompletedSegment(activeSegment,
StaticSegment.open(activeSegment.descriptor, keySupport, keyStatsFactory));
activeSegment.release(Journal.this);
if (onDone != null) onDone.run();
}
@@ -971,27 +1018,16 @@ public class Journal<K, V> implements Shutdownable
closer.execute(new CloseActiveSegmentRunnable(activeSegment, onDone));
}
- @VisibleForTesting
- public void closeCurrentSegmentForTestingIfNonEmpty()
+ public int dropStaticSegments(Predicate<StaticSegment<K, V>> dropIf)
{
- ActiveSegment<K, V> segment = currentSegment;
- if (segment.isEmpty())
- return;
- advanceSegment(segment);
- while (!segments().isSwitched(segment))
- {
- LockSupport.parkNanos(1000);
- }
- }
-
- /*
- * Static helper methods used by journal components
- */
-
- static void validateCRC(CRC32 crc, int readCRC) throws Crc.InvalidCrc
- {
- if (readCRC != (int) crc.getValue())
- throw new Crc.InvalidCrc(readCRC, (int) crc.getValue());
+ Set<StaticSegment<K, V>> toDrop = new HashSet<>();
+ segments().selectStatic(dropIf, toDrop);
+ if (toDrop.isEmpty())
+ return 0;
+ removeStaticSegments(toDrop);
+ for (StaticSegment<K, V> segment : toDrop)
+ segment.discard(this);
+ return toDrop.size();
}
/*
@@ -1045,24 +1081,6 @@ public class Journal<K, V> implements Shutdownable
message, segmentSize, availableDiskSpace, directory);
}
- @VisibleForTesting
- public void truncateForTesting()
- {
- ActiveSegment<?, ?> discarding = currentSegment;
- if (!discarding.isEmpty()) // if there is no data in the segement then
ignore it
- {
- closeCurrentSegmentForTestingIfNonEmpty();
- //TODO (desired): wait for the ActiveSegment to get released, else
can see weird race conditions;
- // this thread will see the static segmenet and will release it
(which will delete the file),
- // and the sync thread will then try to release and will fail as
the file no longer exists...
- while (discarding.selfRef().globalCount() > 0) {}
- }
-
- Segments<K, V> statics = swapSegments(s ->
s.select(Segment::isActive)).select(Segment::isStatic);
- for (Segment<K, V> segment : statics.all())
- ((StaticSegment) segment).discard(this);
- }
-
public interface Writer
{
void write(DataOutputPlus out, int userVersion) throws IOException;
@@ -1279,4 +1297,66 @@ public class Journal<K, V> implements Shutdownable
SHUTDOWN,
TERMINATED
}
+
+ /*
+ * Test helpers
+ */
+
+ @VisibleForTesting
+ public void unsafeConsumeBytesForTesting(int entrySize,
Consumer<ByteBuffer> corrupt)
+ {
+ // TODO (require): Find a better way to test unwritten allocations
and/or corruption
+ allocate(entrySize).consumeBufferUnsafe(corrupt);
+ }
+
+ @VisibleForTesting
+ public void truncateForTesting()
+ {
+ ActiveSegment<?, ?> discarding = currentSegment;
+ if (!discarding.isEmpty()) // if there is no data in the segment then
ignore it
+ {
+ closeCurrentSegmentForTestingIfNonEmpty();
+ //TODO (desired): wait for the ActiveSegment to get released, else
can see weird race conditions;
+ // this thread will see the static segmenet and will release it
(which will delete the file),
+ // and the sync thread will then try to release and will fail as
the file no longer exists...
+ while (discarding.selfRef().globalCount() > 0) {}
+ }
+
+ Segments<K, V> statics = swapSegments(s ->
s.select(Segment::isActive)).select(Segment::isStatic);
+ for (Segment<K, V> segment : statics.all())
+ ((StaticSegment) segment).discard(this);
+ }
+
+ @VisibleForTesting
+ public void runCompactorForTesting()
+ {
+ compactor.run();
+ }
+
+ @VisibleForTesting
+ public void closeCurrentSegmentForTestingIfNonEmpty()
+ {
+ ActiveSegment<K, V> segment = currentSegment;
+ if (segment.isEmpty())
+ return;
+ advanceSegment(segment);
+ while (!segments().isSwitched(segment))
+ {
+ LockSupport.parkNanos(1000);
+ }
+ }
+
+ @VisibleForTesting
+ public void clearNeedsReplayForTesting()
+ {
+ Set<StaticSegment<K, V>> toReset = new HashSet<>();
+ segments().selectStatic(toReset);
+ toReset.forEach(s -> s.metadata().clearNeedsReplay());
+ }
+
+ @VisibleForTesting
+ public int countStaticSegmentsForTesting()
+ {
+ return segments.get().count(Segment::isStatic);
+ }
}
diff --git a/src/java/org/apache/cassandra/journal/JournalReadError.java
b/src/java/org/apache/cassandra/journal/JournalReadError.java
index 87366c8d7c..70ae829b03 100644
--- a/src/java/org/apache/cassandra/journal/JournalReadError.java
+++ b/src/java/org/apache/cassandra/journal/JournalReadError.java
@@ -24,13 +24,13 @@ public class JournalReadError extends FSReadError
{
public final Descriptor descriptor;
- JournalReadError(Descriptor descriptor, File file, Throwable throwable)
+ public JournalReadError(Descriptor descriptor, File file, Throwable
throwable)
{
super(throwable, file);
this.descriptor = descriptor;
}
- JournalReadError(Descriptor descriptor, Component component, Throwable
throwable)
+ public JournalReadError(Descriptor descriptor, Component component,
Throwable throwable)
{
super(throwable, descriptor.fileFor(component));
this.descriptor = descriptor;
diff --git a/src/java/org/apache/cassandra/journal/JournalWriteError.java
b/src/java/org/apache/cassandra/journal/JournalWriteError.java
index 03193af545..ef3f278345 100644
--- a/src/java/org/apache/cassandra/journal/JournalWriteError.java
+++ b/src/java/org/apache/cassandra/journal/JournalWriteError.java
@@ -24,13 +24,13 @@ public class JournalWriteError extends FSWriteError
{
public final Descriptor descriptor;
- JournalWriteError(Descriptor descriptor, File file, Throwable throwable)
+ public JournalWriteError(Descriptor descriptor, File file, Throwable
throwable)
{
super(throwable, file);
this.descriptor = descriptor;
}
- JournalWriteError(Descriptor descriptor, Component component, Throwable
throwable)
+ public JournalWriteError(Descriptor descriptor, Component component,
Throwable throwable)
{
super(throwable, descriptor.fileFor(component));
this.descriptor = descriptor;
diff --git a/src/java/org/apache/cassandra/journal/KeyStats.java
b/src/java/org/apache/cassandra/journal/KeyStats.java
new file mode 100644
index 0000000000..86f5811040
--- /dev/null
+++ b/src/java/org/apache/cassandra/journal/KeyStats.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.journal;
+
+public interface KeyStats<K>
+{
+ KeyStats<?> NOOP = (KeyStats<Object>) o -> true;
+
+ static <K> KeyStats<K> noop()
+ {
+ //noinspection unchecked
+ return (KeyStats<K>) NOOP;
+ }
+
+ boolean mayContain(K k);
+
+ interface Active<K> extends KeyStats<K>
+ {
+ Active<Object> NOOP = new Active<>()
+ {
+ @Override
+ public void update(Object key)
+ {
+ // no-op
+ }
+
+ @Override
+ public boolean mayContain(Object key)
+ {
+ return true;
+ }
+
+ @Override
+ public void persist(Descriptor descriptor)
+ {
+ // no-op
+ }
+ };
+
+ void update(K key);
+ void persist(Descriptor descriptor);
+ }
+
+ interface Static<K> extends KeyStats<K>
+ {
+ Static<Object> NOOP = key -> true;
+ }
+
+ interface Factory<K>
+ {
+ Factory<?> NOOP = new Factory<>()
+ {
+ @Override
+ public Active<Object> create()
+ {
+ return Active.NOOP;
+ }
+
+ @Override
+ public Static<Object> load(Descriptor descriptor)
+ {
+ return Static.NOOP;
+ }
+
+ @Override
+ public Active<Object> rebuild(Descriptor descriptor,
KeySupport<Object> keySupport, int fsyncedLimit)
+ {
+ return Active.NOOP;
+ }
+ };
+
+ static <K> Factory<K> noop()
+ {
+ //noinspection unchecked
+ return (Factory<K>) NOOP;
+ }
+
+ Active<K> create();
+ Static<K> load(Descriptor descriptor);
+ Active<K> rebuild(Descriptor descriptor, KeySupport<K> keySupport, int
fsyncedLimit);
+ }
+}
diff --git a/src/java/org/apache/cassandra/journal/KeySupport.java
b/src/java/org/apache/cassandra/journal/KeySupport.java
index efc41aa6c8..5e1a7845f2 100644
--- a/src/java/org/apache/cassandra/journal/KeySupport.java
+++ b/src/java/org/apache/cassandra/journal/KeySupport.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.journal;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
-import java.util.zip.Checksum;
+import java.util.zip.CRC32;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -43,7 +43,7 @@ public interface KeySupport<K> extends Comparator<K>
K deserialize(ByteBuffer buffer, int position, int userVersion);
K deserialize(ByteBuffer buffer, int userVersion);
- void updateChecksum(Checksum crc, K key, int userVersion);
+ void updateChecksum(CRC32 crc, K key, int userVersion);
int compareWithKeyAt(K key, ByteBuffer buffer, int position, int
userVersion);
}
diff --git a/src/java/org/apache/cassandra/journal/Metadata.java
b/src/java/org/apache/cassandra/journal/Metadata.java
index 6f9552a229..a411713c16 100644
--- a/src/java/org/apache/cassandra/journal/Metadata.java
+++ b/src/java/org/apache/cassandra/journal/Metadata.java
@@ -25,7 +25,6 @@ import java.util.zip.CRC32;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.utils.Crc;
-import static org.apache.cassandra.journal.Journal.validateCRC;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
/**
@@ -110,7 +109,7 @@ public final class Metadata
updateChecksumInt(crc, recordsCount);
updateChecksumInt(crc, fsyncLimit);
updateChecksumInt(crc, needsReplay ? 1 : 0);
- validateCRC(crc, in.readInt());
+ Crc.validate(crc, in.readInt());
return new Metadata(recordsCount, fsyncLimit, needsReplay);
}
diff --git a/src/java/org/apache/cassandra/journal/OnDiskIndex.java
b/src/java/org/apache/cassandra/journal/OnDiskIndex.java
index 0a2ca8cc95..abe94ff1f2 100644
--- a/src/java/org/apache/cassandra/journal/OnDiskIndex.java
+++ b/src/java/org/apache/cassandra/journal/OnDiskIndex.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.Crc;
import org.apache.cassandra.utils.memory.MemoryUtil;
-import static org.apache.cassandra.journal.Journal.validateCRC;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumLong;
@@ -136,11 +135,11 @@ final class OnDiskIndex<K> extends Index<K>
{
int entryCount = in.readInt();
updateChecksumInt(crc, entryCount);
- validateCRC(crc, in.readInt());
+ Crc.validate(crc, in.readInt());
Crc.updateCrc32(crc, buffer, FILE_PREFIX_SIZE, FILE_PREFIX_SIZE +
entryCount * ENTRY_SIZE);
in.skipBytesFully(entryCount * ENTRY_SIZE);
- validateCRC(crc, in.readInt());
+ Crc.validate(crc, in.readInt());
if (in.available() != 0)
throw new IOException("Trailing data encountered in segment
index " + descriptor.fileFor(Component.INDEX));
diff --git a/src/java/org/apache/cassandra/journal/Segment.java
b/src/java/org/apache/cassandra/journal/Segment.java
index 6a9e80432f..9bc394c26f 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -63,6 +63,7 @@ public abstract class Segment<K, V> implements
SelfRefCounted<Segment<K, V>>, Co
}
abstract Index<K> index();
+ public abstract KeyStats<K> keyStats();
abstract boolean isActive();
abstract boolean isFlushed(long position);
@@ -90,6 +91,9 @@ public abstract class Segment<K, V> implements
SelfRefCounted<Segment<K, V>>, Co
boolean readLast(K id, RecordConsumer<K> consumer)
{
+ if (!keyStats().mayContain(id))
+ return false;
+
long offsetAndSize = index().lookUpLast(id);
if (offsetAndSize == -1)
return false;
@@ -108,6 +112,9 @@ public abstract class Segment<K, V> implements
SelfRefCounted<Segment<K, V>>, Co
boolean readLast(K id, EntrySerializer.EntryHolder<K> into)
{
+ if (!keyStats().mayContain(id))
+ return false;
+
long offsetAndSize = index().lookUpLast(id);
if (offsetAndSize == -1 || !read(Index.readOffset(offsetAndSize),
Index.readSize(offsetAndSize), into))
return false;
@@ -128,6 +135,9 @@ public abstract class Segment<K, V> implements
SelfRefCounted<Segment<K, V>>, Co
void readAll(K id, EntrySerializer.EntryHolder<K> into, RecordConsumer<K>
onEntry)
{
+ if (!keyStats().mayContain(id))
+ return;
+
long[] all = index().lookUpAll(id);
int prevOffset = Integer.MAX_VALUE;
for (int i = 0; i < all.length; i++)
diff --git a/src/java/org/apache/cassandra/journal/Segments.java
b/src/java/org/apache/cassandra/journal/Segments.java
index 4e01bd47b4..dae1bbeaac 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -67,11 +67,27 @@ class Segments<K, V>
Segments<K, V> withoutEmptySegment(ActiveSegment<K, V> activeSegment)
{
Long2ObjectHashMap<Segment<K, V>> newSegments = new
Long2ObjectHashMap<>(segments);
- Segment<K, V> oldValue =
segments.remove(activeSegment.descriptor.timestamp);
+ Segment<K, V> oldValue =
newSegments.remove(activeSegment.descriptor.timestamp);
+ Invariants.nonNull(oldValue);
Invariants.require(oldValue.asActive().isEmpty());
return new Segments<>(newSegments);
}
+ Segments<K, V> withoutStaticSegments(Collection<StaticSegment<K, V>>
removeSegments)
+ {
+ if (removeSegments.isEmpty())
+ return this;
+
+ Long2ObjectHashMap<Segment<K, V>> newSegments = new
Long2ObjectHashMap<>(segments);
+ for (StaticSegment<K, V> segment : removeSegments)
+ {
+ Segment<K, V> oldValue =
newSegments.remove(segment.descriptor.timestamp);
+ if (oldValue != null)
+ Invariants.require(oldValue.isStatic());
+ }
+ return new Segments<>(newSegments);
+ }
+
Segments<K, V> withCompletedSegment(ActiveSegment<K, V> activeSegment,
StaticSegment<K, V> staticSegment)
{
Invariants.requireArgument(activeSegment.descriptor.equals(staticSegment.descriptor));
@@ -192,6 +208,13 @@ class Segments<K, V>
into.add(segment.asStatic());
}
+ void selectStatic(Predicate<StaticSegment<K, V>> filter,
Collection<StaticSegment<K, V>> into)
+ {
+ for (Segment<K, V> segment : segments.values())
+ if (segment.isStatic() && filter.test(segment.asStatic()))
+ into.add(segment.asStatic());
+ }
+
/**
* Select segments that could potentially have an entry with the specified
ids and
* attempt to grab references to them all.
@@ -236,6 +259,15 @@ class Segments<K, V>
return new Segments<>(selectedSegments);
}
+ int count(Predicate<Segment<K, V>> filter)
+ {
+ int count = 0;
+ for (Segment<K, V> segment : segments.values())
+ if (filter.test(segment))
+ count++;
+ return count;
+ }
+
static class ReferencedSegments<K, V> extends Segments<K, V> implements
AutoCloseable
{
private final Refs<Segment<K, V>> refs;
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index 0dfbaaf838..efc2c796aa 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -52,16 +52,19 @@ public final class StaticSegment<K, V> extends Segment<K, V>
private final Ref<Segment<K, V>> selfRef;
private final OnDiskIndex<K> index;
+ private final KeyStats.Static<K> keyStats;
private StaticSegment(Descriptor descriptor,
FileChannel channel,
MappedByteBuffer buffer,
OnDiskIndex<K> index,
Metadata metadata,
+ KeyStats.Static<K> keyStats,
KeySupport<K> keySupport)
{
super(descriptor, metadata, keySupport);
this.index = index;
+ this.keyStats = keyStats;
this.channel = channel;
this.fsyncLimit = metadata.fsyncLimit();
@@ -76,12 +79,12 @@ public final class StaticSegment<K, V> extends Segment<K, V>
* @param descriptors descriptors of the segments to load
* @return list of the loaded segments
*/
- static <K, V> List<Segment<K, V>> open(Collection<Descriptor> descriptors,
KeySupport<K> keySupport)
+ static <K, V> List<Segment<K, V>> open(Collection<Descriptor> descriptors,
KeySupport<K> keySupport, KeyStats.Factory<K> keyStatsFactory)
{
List<Segment<K, V>> segments = new ArrayList<>(descriptors.size());
for (Descriptor descriptor : descriptors)
{
- StaticSegment<K, V> segment = open(descriptor, keySupport);
+ StaticSegment<K, V> segment = open(descriptor, keySupport,
keyStatsFactory);
segments.add(segment);
}
@@ -95,7 +98,7 @@ public final class StaticSegment<K, V> extends Segment<K, V>
* @return the loaded segment
*/
@SuppressWarnings({ "resource", "RedundantSuppression" })
- static <K, V> StaticSegment<K, V> open(Descriptor descriptor,
KeySupport<K> keySupport)
+ static <K, V> StaticSegment<K, V> open(Descriptor descriptor,
KeySupport<K> keySupport, KeyStats.Factory<K> keyStatsFactory)
{
if (!Component.DATA.existsFor(descriptor))
throw new IllegalArgumentException("Data file for segment " +
descriptor + " doesn't exist");
@@ -135,9 +138,31 @@ public final class StaticSegment<K, V> extends Segment<K,
V>
if (index == null)
index = OnDiskIndex.rebuildAndPersist(descriptor, keySupport,
metadata.fsyncLimit());
+ KeyStats.Static<K> keyStats = null;
+
+ if (Component.KEYSTATS.existsFor(descriptor))
+ {
+ try
+ {
+ keyStats = keyStatsFactory.load(descriptor);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Could not load keystats component for {};
rebuilding", descriptor, t);
+ Component.KEYSTATS.markCorrupted(descriptor);
+ }
+ }
+
+ if (keyStats == null)
+ {
+ KeyStats.Active<K> active = keyStatsFactory.rebuild(descriptor,
keySupport, metadata.fsyncLimit());
+ active.persist(descriptor);
+ keyStats = keyStatsFactory.load(descriptor);
+ }
+
try
{
- return internalOpen(descriptor, index, metadata, keySupport);
+ return internalOpen(descriptor, index, metadata, keyStats,
keySupport);
}
catch (IOException e)
{
@@ -146,13 +171,13 @@ public final class StaticSegment<K, V> extends Segment<K,
V>
}
private static <K, V> StaticSegment<K, V> internalOpen(
- Descriptor descriptor, OnDiskIndex<K> index, Metadata metadata,
KeySupport<K> keySupport)
+ Descriptor descriptor, OnDiskIndex<K> index, Metadata metadata,
KeyStats.Static<K> keyStats, KeySupport<K> keySupport)
throws IOException
{
File file = descriptor.fileFor(Component.DATA);
FileChannel channel = FileChannel.open(file.toPath(),
StandardOpenOption.READ);
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY,
0, channel.size());
- return new StaticSegment<>(descriptor, channel, buffer, index,
metadata, keySupport);
+ return new StaticSegment<>(descriptor, channel, buffer, index,
metadata, keyStats, keySupport);
}
public void close(Journal<K, V> journal)
@@ -242,6 +267,12 @@ public final class StaticSegment<K, V> extends Segment<K,
V>
return index;
}
+ @Override
+ public KeyStats.Static<K> keyStats()
+ {
+ return keyStats;
+ }
+
public int entryCount()
{
return index.entryCount();
@@ -402,7 +433,7 @@ public final class StaticSegment<K, V> extends Segment<K, V>
}
}
- static <K> SequentialReader<K> sequentialReader(Descriptor descriptor,
KeySupport<K> keySupport, int fsyncedLimit)
+ public static <K> SequentialReader<K> sequentialReader(Descriptor
descriptor, KeySupport<K> keySupport, int fsyncedLimit)
{
return new SequentialReader<>(descriptor, keySupport, fsyncedLimit);
}
@@ -415,7 +446,7 @@ public final class StaticSegment<K, V> extends Segment<K, V>
* strictly, throwing {@link JournalReadError}. Errors encountered in
unsynced portions
* of segments are treated as segment EOF.
*/
- static final class SequentialReader<K> extends Reader<K>
+ public static final class SequentialReader<K> extends Reader<K>
{
private final int fsyncedLimit; // exclusive
diff --git a/src/java/org/apache/cassandra/journal/ValueSerializer.java
b/src/java/org/apache/cassandra/journal/ValueSerializer.java
index 69690d39b2..b031ffff14 100644
--- a/src/java/org/apache/cassandra/journal/ValueSerializer.java
+++ b/src/java/org/apache/cassandra/journal/ValueSerializer.java
@@ -24,6 +24,27 @@ import org.apache.cassandra.io.util.DataOutputPlus;
public interface ValueSerializer<K, V>
{
+ ValueSerializer<?, ?> NONE = new ValueSerializer<>()
+ {
+ @Override
+ public void serialize(Object key, Object value, DataOutputPlus out,
int userVersion)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object deserialize(Object key, DataInputPlus in, int
userVersion)
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ static <K, V> ValueSerializer<K, V> none()
+ {
+ //noinspection unchecked
+ return (ValueSerializer<K, V>) NONE;
+ }
+
void serialize(K key, V value, DataOutputPlus out, int userVersion) throws
IOException;
/**
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index cebfcb3ee2..6f4a4aed4c 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -401,6 +401,11 @@ public abstract class CoordinatorLog
}
}
+ void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
+ {
+ into.add(reconciledPersistedOffsets);
+ }
+
boolean isDurablyReconciled(CoordinatorLogOffsets<?> logOffsets)
{
lock.readLock().lock();
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
index 101d95db9e..2f87f0ecf4 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
@@ -71,7 +71,7 @@ public class CoordinatorLogId implements Serializable
@VisibleForTesting
public static long asLong(int hostId, int hostLogId)
{
- return ((long) hostId << 32) | hostLogId;
+ return ((long) hostId << 32) | (hostLogId & 0xFFFFFFFFL);
}
static int hostId(long coordinatorLogId)
diff --git a/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java
b/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java
index 8e3b2e1bc1..cd59e62050 100644
--- a/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java
+++ b/src/java/org/apache/cassandra/replication/Log2OffsetsMap.java
@@ -56,10 +56,15 @@ public abstract class Log2OffsetsMap<T extends Offsets>
implements Iterable<Shor
return count;
}
+ public boolean contains(ShortMutationId id)
+ {
+ Offsets offsets = asMap().get(id.logId());
+ return offsets != null && offsets.contains(id.offset());
+ }
+
@Override
public String toString()
{
-
StringBuilder builder = new StringBuilder("Log2OffsetsMap{");
boolean isFirst = true;
for (Map.Entry<Long, T> entry : asMap().entrySet())
@@ -110,10 +115,11 @@ public abstract class Log2OffsetsMap<T extends Offsets>
implements Iterable<Shor
return create(new CoordinatorLogId(logId));
}
- public void add(ShortMutationId id)
+ public AbstractMutable<T> add(ShortMutationId id)
{
T offsets = offsetMap.computeIfAbsent(id.logId(), this::create);
offsets.add(id.offset());
+ return this;
}
public void add(Offsets offsets)
diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java
b/src/java/org/apache/cassandra/replication/MutationJournal.java
index e56cc6c9de..e2a2a82e64 100644
--- a/src/java/org/apache/cassandra/replication/MutationJournal.java
+++ b/src/java/org/apache/cassandra/replication/MutationJournal.java
@@ -24,19 +24,16 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
-import java.util.zip.Checksum;
+import java.util.zip.CRC32;
+
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.cassandra.journal.*;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import accord.utils.Invariants;
+import org.agrona.collections.Long2LongHashMap;
+import org.agrona.collections.Long2ObjectHashMap;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
@@ -47,11 +44,15 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileInputStreamPlus;
+import org.apache.cassandra.io.util.FileOutputStreamPlus;
+import org.apache.cassandra.journal.*;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Crc;
import org.apache.cassandra.utils.concurrent.Semaphore;
+import org.jctools.maps.NonBlockingHashMapLong;
import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors;
@@ -59,7 +60,6 @@ import static
org.apache.cassandra.utils.FBUtilities.getAvailableProcessors;
public class MutationJournal
{
public static final MutationJournal instance = new MutationJournal();
- private static final Logger log =
LoggerFactory.getLogger(MutationJournal.class);
private final Journal<ShortMutationId, Mutation> journal;
private final Map<Long, SegmentStateTracker> segmentStateTrackers;
@@ -98,17 +98,25 @@ public class MutationJournal
@VisibleForTesting
MutationJournal(File directory, Params params)
{
- journal = new Journal<>("MutationJournal", directory, params,
MutationIdSupport.INSTANCE, MutationSerializer.INSTANCE,
SegmentCompactor.noop()) {
- @Override
- protected void
closeActiveSegmentAndOpenAsStatic(ActiveSegment<ShortMutationId, Mutation>
activeSegment, Runnable onDone)
- {
- super.closeActiveSegmentAndOpenAsStatic(activeSegment,
- () -> {
-
maybeCleanupStaticSegment(Invariants.nonNull(getSegment(activeSegment.id())));
- if (onDone !=
null) onDone.run();
- });
- }
- };
+ journal =
+ new Journal<>("MutationJournal",
+ directory,
+ params,
+ MutationIdSupport.INSTANCE,
+ MutationSerializer.INSTANCE,
+ OffsetRangesFactory.INSTANCE,
+ SegmentCompactor.noop())
+ {
+ // TODO (expected): a cleaner way to override
it; pass a Callbacks object with sanctioned callbacks?
+ @Override
+ protected void
closeActiveSegmentAndOpenAsStatic(ActiveSegment<ShortMutationId, Mutation>
activeSegment, Runnable onDone)
+ {
+
super.closeActiveSegmentAndOpenAsStatic(activeSegment, () -> {
+
maybeCleanupStaticSegment(Invariants.nonNull(getSegment(activeSegment.id())));
+ if (onDone != null) onDone.run();
+ });
+ }
+ };
segmentStateTrackers = new NonBlockingHashMapLong<>();
}
@@ -171,7 +179,12 @@ public class MutationJournal
{
for (Segment<ShortMutationId, Mutation> segment :
journal.getSegments(lowerBound.segmentId, upperBound.segmentId))
{
-
Invariants.nonNull(segmentStateTrackers.get(segment.id())).markClean(tableId,
lowerBound, upperBound);
+ SegmentStateTracker tracker =
segmentStateTrackers.get(segment.id());
+ // upper flush bound can be first position in an empty active
segment
+ if (tracker == null)
+ continue;
+
+ segmentStateTrackers.get(segment.id()).markClean(tableId,
lowerBound, upperBound);
// We can only safely mark static segments as non-replayable.
Active segment can still be written to,
// so we only persist this metadata on flush.
@@ -291,16 +304,12 @@ public class MutationJournal
// TODO: respect
SystemKeyspace.getTruncatedPosition(cfs.metadata.id);
replayParallelism.acquireThrowUncheckedOnInterrupt(1);
Stage.MUTATION.submit(() -> journal.readLast(key, lastSegment,
replayOne))
- .addCallback(new BiConsumer<Object, Throwable>()
- {
- @Override
- public void accept(Object o, Throwable fail)
- {
- if (fail != null && !journal.handleError("Could not
replay mutation " + key, fail))
- abort.set(true);
- replayParallelism.release(1);
- }
- });
+ .addCallback((BiConsumer<Object, Throwable>) (o,
fail) ->
+ {
+ if (fail != null &&
!journal.handleError("Could not replay mutation " + key, fail))
+ abort.set(true);
+ replayParallelism.release(1);
+ });
}
// Wait for all mutations to be applied before returning
@@ -309,9 +318,12 @@ public class MutationJournal
}
@VisibleForTesting
- public void closeCurrentSegmentForTestingIfNonEmpty()
+ public int dropReconciledSegments(Log2OffsetsMap<?> reconciledOffsets)
{
- journal.closeCurrentSegmentForTestingIfNonEmpty();
+ return journal.dropStaticSegments((segment) -> {
+ StaticOffsetRanges ranges = (StaticOffsetRanges)
segment.keyStats();
+ return ranges.isFullyCovered(reconciledOffsets) &&
!segment.metadata().needsReplay();
+ });
}
public void readAll(RecordConsumer<ShortMutationId> consumer)
@@ -429,10 +441,10 @@ public class MutationJournal
}
@Override
- public void updateChecksum(Checksum crc, ShortMutationId id, int
userVersion)
+ public void updateChecksum(CRC32 crc, ShortMutationId id, int
userVersion)
{
- FBUtilities.updateChecksumLong(crc, id.logId());
- FBUtilities.updateChecksumInt(crc, id.offset());
+ Crc.updateWithLong(crc, id.logId());
+ Crc.updateWithInt(crc, id.offset());
}
@Override
@@ -453,6 +465,7 @@ public class MutationJournal
public static class MutationSerializer implements
ValueSerializer<ShortMutationId, Mutation>
{
public static MutationSerializer INSTANCE = new MutationSerializer();
+
@Override
public void serialize(ShortMutationId id, Mutation mutation,
DataOutputPlus out, int userVersion) throws IOException
{
@@ -467,4 +480,270 @@ public class MutationJournal
return Mutation.serializer.deserialize(in, userVersion);
}
}
+
+ /*
+ * KeyStats component to track per log min and max offset in a segment
+ */
+
+ static abstract class OffsetRanges implements KeyStats<ShortMutationId>
+ {
+ @Override
+ public abstract boolean mayContain(ShortMutationId id);
+
+ protected static boolean mayContain(long range, ShortMutationId id)
+ {
+ return id.offset() >= minOffset(range) && id.offset() <=
maxOffset(range);
+ }
+
+ protected static int minOffset(long range)
+ {
+ return (int) (range >>> 32);
+ }
+
+ protected static int maxOffset(long range)
+ {
+ return (int) range;
+ }
+
+ protected static long range(int minOffset, int maxOffset)
+ {
+ return ((long) minOffset << 32) | (maxOffset & 0xFFFFFFFFL);
+ }
+
+ abstract Map<Long, Long> asMap();
+
+ @Override
+ public String toString()
+ {
+ StringBuilder builder = new
StringBuilder(getClass().getSimpleName());
+ builder.append('{');
+ for (Map.Entry<Long, Long> entry : asMap().entrySet())
+ {
+ CoordinatorLogId logId = new CoordinatorLogId(entry.getKey());
+ long range = entry.getValue();
+ int min = minOffset(range);
+ int max = maxOffset(range);
+ builder.append(logId)
+ .append("->")
+ .append('[')
+ .append(min)
+ .append(", ")
+ .append(max)
+ .append(']')
+ .append(',');
+ }
+ return builder.append('}').toString();
+ }
+ }
+
+ // TODO (consider): an off-heap version
+ static class ActiveOffsetRanges extends OffsetRanges implements
KeyStats.Active<ShortMutationId>
+ {
+ private final NonBlockingHashMapLong<Long> ranges;
+
+ ActiveOffsetRanges()
+ {
+ ranges = new NonBlockingHashMapLong<>();
+ }
+
+ @Override
+ protected Map<Long, Long> asMap()
+ {
+ return ranges;
+ }
+
+ @Override
+ public boolean mayContain(ShortMutationId id)
+ {
+ Long range = ranges.get(id.logId());
+ return range != null && mayContain(range, id);
+ }
+
+ @Override
+ @SuppressWarnings("WrapperTypeMayBePrimitive")
+ public void update(ShortMutationId id)
+ {
+ Long prev, next;
+ do
+ {
+ prev = ranges.get(id.logId());
+ int min = prev == null ? id.offset() :
Math.min(minOffset(prev), id.offset());
+ int max = prev == null ? id.offset() :
Math.max(maxOffset(prev), id.offset());
+ next = range(min, max);
+ }
+ while (!compareAndSet(id.logId(), prev, next));
+ }
+
+ // NonBlockingHashMapLong doesn't expose putIfMatch() directly, so we
need to have this logic
+ private boolean compareAndSet(long logId, Long prevValue, Long
nextValue)
+ {
+ return prevValue == null
+ ? ranges.putIfAbsent(logId, nextValue) == null
+ : ranges.replace(logId, prevValue, nextValue);
+ }
+
+ @Override
+ public void persist(Descriptor descriptor)
+ {
+ File tmpFile = descriptor.tmpFileFor(Component.KEYSTATS);
+ try (FileOutputStreamPlus out = new FileOutputStreamPlus(tmpFile))
+ {
+ write(out);
+
+ out.flush();
+ out.sync();
+ }
+ catch (IOException e)
+ {
+ throw new JournalWriteError(descriptor, tmpFile, e);
+ }
+ tmpFile.move(descriptor.fileFor(Component.KEYSTATS));
+ }
+
+ private void write(DataOutputPlus out) throws IOException
+ {
+ CRC32 crc = Crc.crc32();
+ int count = ranges.size();
+ out.writeInt(count);
+ Crc.updateWithInt(crc, count);
+ out.writeInt((int) crc.getValue());
+ for (Map.Entry<Long, Long> entry : ranges.entrySet())
+ {
+ long logId = entry.getKey();
+ long range = entry.getValue();
+ out.writeLong(logId);
+ out.writeLong(range);
+ Crc.updateWithLong(crc, logId);
+ Crc.updateWithLong(crc, range);
+ }
+ out.writeInt((int) crc.getValue());
+ }
+ }
+
+ static class StaticOffsetRanges extends OffsetRanges implements
KeyStats.Static<ShortMutationId>
+ {
+ private static final long NO_VALUE = Long.MIN_VALUE;
+
+ private final Long2LongHashMap ranges;
+
+ StaticOffsetRanges(Long2LongHashMap ranges)
+ {
+ this.ranges = ranges;
+ }
+
+ @Override
+ protected Map<Long, Long> asMap()
+ {
+ return ranges;
+ }
+
+ @Override
+ public boolean mayContain(ShortMutationId id)
+ {
+ long range = ranges.get(id.logId());
+ return range != NO_VALUE && mayContain(range, id);
+ }
+
+ static StaticOffsetRanges read(DataInputPlus in) throws IOException
+ {
+ CRC32 crc = Crc.crc32();
+ int count = in.readInt();
+ Crc.updateWithInt(crc, count);
+ Crc.validate(crc, in.readInt());
+ Long2LongHashMap ranges = new Long2LongHashMap(count, 0.65f,
NO_VALUE, false);
+ for (int i = 0; i < count; i++)
+ {
+ long logId = in.readLong();
+ long range = in.readLong();
+ Crc.updateWithLong(crc, logId);
+ Crc.updateWithLong(crc, range);
+ ranges.put(logId, range);
+ }
+ Crc.validate(crc, in.readInt());
+ return new StaticOffsetRanges(ranges);
+ }
+
+ /**
+ * @return whether all keys in the segment are fully covered by the
specified (durably reconciled) offsets map
+ */
+ boolean isFullyCovered(Log2OffsetsMap<?> durablyReconciled)
+ {
+ Long2ObjectHashMap<Offsets> reconciledMap =
((Log2OffsetsMap<Offsets>) durablyReconciled).asMap();
+ for (Long2LongHashMap.EntryIterator iter =
ranges.entrySet().iterator(); iter.hasNext();)
+ {
+ iter.next();
+
+ long logId = iter.getLongKey();
+ long range = iter.getLongValue();
+ int min = minOffset(range);
+ int max = maxOffset(range);
+
+ Offsets offsets = reconciledMap.get(logId);
+ if (offsets == null)
+ return false;
+ if (!offsets.containsRange(min, max))
+ return false;
+ }
+ return true;
+ }
+ }
+
+ static final class OffsetRangesFactory implements
KeyStats.Factory<ShortMutationId>
+ {
+ static final OffsetRangesFactory INSTANCE = new OffsetRangesFactory();
+
+ @Override
+ public ActiveOffsetRanges create()
+ {
+ return new ActiveOffsetRanges();
+ }
+
+ @Override
+ public StaticOffsetRanges load(Descriptor descriptor)
+ {
+ File file = descriptor.fileFor(Component.KEYSTATS);
+ try (FileInputStreamPlus in = new FileInputStreamPlus(file))
+ {
+ return StaticOffsetRanges.read(in);
+ }
+ catch (IOException e)
+ {
+ throw new JournalReadError(descriptor, file, e);
+ }
+ }
+
+ @Override
+ public ActiveOffsetRanges rebuild(Descriptor descriptor,
KeySupport<ShortMutationId> keySupport, int fsyncedLimit)
+ {
+ ActiveOffsetRanges active = create();
+ try (StaticSegment.SequentialReader<ShortMutationId> reader =
StaticSegment.sequentialReader(descriptor, keySupport, fsyncedLimit))
+ {
+ while (reader.advance())
+ active.update(reader.key());
+ }
+ return active;
+ }
+ }
+
+ /*
+ * Test helpers
+ */
+
+ @VisibleForTesting
+ public void closeCurrentSegmentForTestingIfNonEmpty()
+ {
+ journal.closeCurrentSegmentForTestingIfNonEmpty();
+ }
+
+ @VisibleForTesting
+ void clearNeedsReplayForTesting()
+ {
+ journal.clearNeedsReplayForTesting();
+ }
+
+ @VisibleForTesting
+ public int countStaticSegmentsForTesting()
+ {
+ return journal.countStaticSegmentsForTesting();
+ }
}
diff --git
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 535a4100f2..4009dbd39a 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -87,7 +87,7 @@ public class MutationTrackingService
/**
* Split ranges into this many shards.
- *
+ * <p>
* TODO (expected): ability to rebalance / change this constant
*/
private static final int SHARD_MULTIPLIER = 8;
@@ -547,7 +547,6 @@ public class MutationTrackingService
return false;
}
-
private static ConcurrentHashMap<String, KeyspaceShards>
applyUpdatedMetadata(Map<String, KeyspaceShards> keyspaceShardsMap, @Nullable
ClusterMetadata prev, ClusterMetadata next, LongSupplier logIdProvider,
BiConsumer<Shard, CoordinatorLog> onNewLog)
{
Preconditions.checkNotNull(next);
@@ -619,6 +618,23 @@ public class MutationTrackingService
}
}
+ private void truncateMutationJournal()
+ {
+ Log2OffsetsMap.Mutable reconciledOffsets = new
Log2OffsetsMap.Mutable();
+ collectDurablyReconciledOffsets(reconciledOffsets);
+ MutationJournal.instance.dropReconciledSegments(reconciledOffsets);
+ }
+
+ /**
+ * Collect every log's durably reconciled offsets. Every mutation covered
+ * by these offsets can be compacted away by the journal, assuming that all
+ * relevant memtables had been flushed to disk.
+ */
+ private void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
+ {
+ forEachKeyspace(keyspace ->
keyspace.collectDurablyReconciledOffsets(into));
+ }
+
public static class KeyspaceShards
{
private enum UpdateDecision
@@ -847,6 +863,11 @@ public class MutationTrackingService
});
}
+ void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
+ {
+ forEachShard(shard -> shard.collectDurablyReconciledOffsets(into));
+ }
+
void forEachShard(Consumer<Shard> consumer)
{
for (Shard shard : shards.values())
@@ -979,8 +1000,15 @@ public class MutationTrackingService
@Override
public void run()
+ {
+ run(true);
+ }
+
+ private void run(boolean dropSegments)
{
MutationTrackingService.instance.forEachKeyspace(this::run);
+ if (dropSegments)
+ MutationTrackingService.instance.truncateMutationJournal();
}
private void run(KeyspaceShards shards)
@@ -1000,6 +1028,12 @@ public class MutationTrackingService
offsetsPersister.run();
}
+ @VisibleForTesting
+ public void persistLogStateForTesting(boolean dropSegments)
+ {
+ offsetsPersister.run(dropSegments);
+ }
+
@VisibleForTesting
public void broadcastOffsetsForTesting()
{
diff --git a/src/java/org/apache/cassandra/replication/Offsets.java
b/src/java/org/apache/cassandra/replication/Offsets.java
index dfb4f3f404..8724c02d4d 100644
--- a/src/java/org/apache/cassandra/replication/Offsets.java
+++ b/src/java/org/apache/cassandra/replication/Offsets.java
@@ -186,12 +186,29 @@ public abstract class Offsets implements
Iterable<ShortMutationId>
{
if (size == 0)
return false;
-
int pos = Arrays.binarySearch(bounds, 0, size, offset);
if (pos >= 0) return true; // matches one of the bounds
+ return (-pos - 1) % 2 != 0;
+ }
+
+ /**
+ * @return whether the entire [minOffset, maxOffset] range is contained in
these Offsets
+ */
+ public boolean containsRange(int minOffset, int maxOffset)
+ {
+ if (size == 0)
+ return false;
+
+ // find the range the min offset falls under
+ int pos = Arrays.binarySearch(bounds, 0, size, minOffset);
+ if (pos < 0 && (-pos - 1) % 2 == 0) // min offset is outside any
existing range
+ return false;
+
+ int end = pos >= 0
+ ? (pos % 2 == 0 ? pos + 1 : pos)
+ : -pos - 1;
- pos = -pos - 1;
- return (pos - 1) % 2 == 0; // offset falls within bounds of an
existing range if the bound to the left is an open one
+ return maxOffset <= bounds[end];
}
public void digest(Digest digest)
diff --git a/src/java/org/apache/cassandra/replication/Shard.java
b/src/java/org/apache/cassandra/replication/Shard.java
index 0717c22817..7274127fcb 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -276,6 +276,11 @@ public class Shard
return new BroadcastLogOffsets(keyspace, range, offsets, durable);
}
+ void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
+ {
+ logs.values().forEach(log ->
log.collectDurablyReconciledOffsets(into));
+ }
+
private CoordinatorLog getOrCreate(Mutation mutation)
{
return getOrCreate(mutation.id());
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index aa568104a2..8669586642 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -78,7 +78,6 @@ import org.apache.cassandra.journal.Params;
import org.apache.cassandra.journal.RecordPointer;
import org.apache.cassandra.journal.SegmentCompactor;
import org.apache.cassandra.journal.StaticSegment;
-import org.apache.cassandra.journal.ValueSerializer;
import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator;
import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport;
@@ -134,23 +133,10 @@ public class AccordJournal implements accord.api.Journal,
RangeSearcher.Supplier
public AccordJournal(Params params, File directory, ColumnFamilyStore cfs)
{
Version userVersion = Version.fromVersion(params.userVersion());
- this.journal = new Journal<>("AccordJournal", directory, params,
JournalKey.SUPPORT,
- // In Accord, we are using streaming
serialization, i.e. Reader/Writer interfaces instead of materializing objects
- new ValueSerializer<>()
- {
- @Override
- public void serialize(JournalKey key,
Object value, DataOutputPlus out, int userVersion)
- {
- throw new
UnsupportedOperationException();
- }
-
- @Override
- public Object deserialize(JournalKey
key, DataInputPlus in, int userVersion)
- {
- throw new
UnsupportedOperationException();
- }
- },
- compactor(cfs, userVersion));
+ this.journal =
+ Journal.builder("AccordJournal", directory, params,
JournalKey.SUPPORT)
+ .segmentCompactor(compactor(cfs, userVersion))
+ .build();
this.journalTable = new AccordJournalTable<>(journal,
JournalKey.SUPPORT, cfs, userVersion);
this.params = params;
}
diff --git a/src/java/org/apache/cassandra/service/accord/JournalKey.java
b/src/java/org/apache/cassandra/service/accord/JournalKey.java
index 9bd42ebaf1..4bb3bf19e2 100644
--- a/src/java/org/apache/cassandra/service/accord/JournalKey.java
+++ b/src/java/org/apache/cassandra/service/accord/JournalKey.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.service.accord;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-import java.util.zip.Checksum;
+import java.util.zip.CRC32;
import accord.local.Node.Id;
import accord.primitives.Timestamp;
@@ -177,7 +177,7 @@ public final class JournalKey
}
@Override
- public void updateChecksum(Checksum crc, JournalKey key, int
userVersion)
+ public void updateChecksum(CRC32 crc, JournalKey key, int userVersion)
{
byte[] out = AccordJournal.keyCRCBytes.get();
serialize(key, out);
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
index 1c57781c00..13e1e69d10 100644
---
a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
+++
b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java
@@ -263,19 +263,19 @@ public class ReadReconciliations implements
ExpiredStatePurger.Expireable
/**
* Remote summaries minus data node summary offsets
- *
+ * <p>
* This calculation combines BOTH reconciled and unreconciled
mutations reported by other nodes, and
* then subtracts mutations reported locally for correctness
- *
+ * <p>
* If we subtracted reconciled ids from the unreconciled ids, we could
violate read monotonicity in this scenario:
* 1. Read starts locally and doesn't see mutation M.
* 2. During reconciliation, mutation M arrives and is marked
reconciled, other replicas report mutation M as reconciled
* 3. If we filtered out reconciled mutations, this read wouldn't
augment with M
* 4. A concurrent read could see M in its initial data
* 5. This read returns without M
- *
+ * <p>
* Instead, we include all mutations and rely on token range filtering
during actual mutation
- * retrieval (in PartialTrackedRead.augment) to ensure we only augment
with mutations
+ * retrieval (in PartialTrackedRead.augment()) to ensure we only
augment with mutations
* relevant to this read's range/key
*/
private Log2OffsetsMap.Mutable augmentingOffsets()
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
index 066ebc22e0..fa6c3193b3 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
@@ -25,7 +25,6 @@ import com.google.common.base.Preconditions;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.service.reads.ReadCoordinator;
@@ -36,16 +35,14 @@ import org.apache.cassandra.replication.MutationSummary;
import org.apache.cassandra.replication.ShortMutationId;
import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
import org.jctools.maps.NonBlockingHashMap;
-import org.apache.cassandra.transport.Dispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.function.Consumer;
-
/**
* Since the read reconciliations don't use 2 way callbacks, maps of active
reads and reconciliations
* are maintained and expired here.
@@ -69,8 +66,7 @@ public class TrackedLocalReads implements
ExpiredStatePurger.Expireable
ReadCommand command,
ConsistencyLevel consistencyLevel,
int[] summaryNodes,
- Dispatcher.RequestTime requestTime,
- Consumer<PartialTrackedRead> partialReadConsumer)
+ Dispatcher.RequestTime requestTime)
{
Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
ColumnFamilyStore cfs =
keyspace.getColumnFamilyStore(command.metadata().id);
@@ -100,7 +96,7 @@ public class TrackedLocalReads implements
ExpiredStatePurger.Expireable
}
// TODO: confirm all summaryNodes are present in the replica plan
AsyncPromise<TrackedDataResponse> promise = new AsyncPromise<>();
- beginReadInternal(readId, command, replicaPlan, summaryNodes,
requestTime, partialReadConsumer, promise);
+ beginReadInternal(readId, command, replicaPlan, summaryNodes,
requestTime, promise);
return promise;
}
@@ -111,7 +107,6 @@ public class TrackedLocalReads implements
ExpiredStatePurger.Expireable
ReplicaPlan.AbstractForRead<?, ?> replicaPlan,
int[] summaryNodes,
Dispatcher.RequestTime requestTime,
- Consumer<PartialTrackedRead> partialReadConsumer,
AsyncPromise<TrackedDataResponse> promise)
{
PartialTrackedRead read = null;
@@ -137,7 +132,7 @@ public class TrackedLocalReads implements
ExpiredStatePurger.Expireable
}
Coordinator coordinator =
- new Coordinator(readId, promise, command.columnFilter(), read,
replicaPlan.consistencyLevel(), requestTime);
+ new Coordinator(readId, promise, read,
replicaPlan.consistencyLevel(), requestTime);
coordinators.put(readId, coordinator);
// TODO (expected): reconsider the approach to tracked mutation metrics
@@ -187,7 +182,6 @@ public class TrackedLocalReads implements
ExpiredStatePurger.Expireable
{
private final TrackedRead.Id readId;
private final AsyncPromise<TrackedDataResponse> promise;
- private final ColumnFilter selection;
private final PartialTrackedRead read;
private final ConsistencyLevel consistencyLevel;
private final Dispatcher.RequestTime requestTime;
@@ -195,14 +189,12 @@ public class TrackedLocalReads implements
ExpiredStatePurger.Expireable
Coordinator(
TrackedRead.Id readId,
AsyncPromise<TrackedDataResponse> promise,
- ColumnFilter selection,
PartialTrackedRead read,
ConsistencyLevel consistencyLevel,
Dispatcher.RequestTime requestTime)
{
this.readId = readId;
this.promise = promise;
- this.selection = selection;
this.read = Preconditions.checkNotNull(read);
this.consistencyLevel = consistencyLevel;
this.requestTime = requestTime;
@@ -246,7 +238,6 @@ public class TrackedLocalReads implements
ExpiredStatePurger.Expireable
if (followUp != null)
{
- ReadCommand command = read.command();
followUp.addCallback((newResponse, error) -> {
if (error != null)
{
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
index 267670a63e..975c2742fc 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.*;
-import org.apache.cassandra.replication.MutationSummary;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.service.reads.ReadCoordinator;
import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
@@ -280,7 +279,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P
extends ReplicaPlan.
logger.trace("Locally coordinating {}", readId);
Stage.READ.submit(() -> {
AsyncPromise<TrackedDataResponse> promise =
-
MutationTrackingService.instance.localReads().beginRead(readId,
ClusterMetadata.current(), command, consistencyLevel, summaryNodes,
requestTime, partialReadConsumer);
+
MutationTrackingService.instance.localReads().beginRead(readId,
ClusterMetadata.current(), command, consistencyLevel, summaryNodes,
requestTime);
promise.addCallback((response, error) -> {
if (error != null)
{
@@ -451,7 +450,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P
extends ReplicaPlan.
AsyncPromise<TrackedDataResponse> promise =
MutationTrackingService.instance
.localReads()
- .beginRead(readId, metadata, command,
consistencyLevel, summaryNodes, requestTime, null);
+ .beginRead(readId, metadata, command,
consistencyLevel, summaryNodes, requestTime);
promise.addCallback((response, error) -> {
if (error != null)
{
@@ -516,11 +515,6 @@ public abstract class TrackedRead<E extends Endpoints<E>,
P extends ReplicaPlan.
ReadReconciliations.instance.handleSummaryRequest((SummaryRequest)
message.payload);
}
- public TrackedSummaryResponse makeResponse(MutationSummary summary)
- {
- return new TrackedSummaryResponse(readId, summary, dataNode,
summaryNodes);
- }
-
public static final IVersionedSerializer<SummaryRequest> serializer =
new IVersionedSerializer<>()
{
@Override
diff --git a/src/java/org/apache/cassandra/utils/Crc.java
b/src/java/org/apache/cassandra/utils/Crc.java
index f1a31584f3..63b0055f2b 100644
--- a/src/java/org/apache/cassandra/utils/Crc.java
+++ b/src/java/org/apache/cassandra/utils/Crc.java
@@ -83,6 +83,26 @@ public class Crc
buffer.position(savePosition);
}
+ public static void updateWithInt(CRC32 crc, int v)
+ {
+ crc.update((v >>> 24) & 0xFF);
+ crc.update((v >>> 16) & 0xFF);
+ crc.update((v >>> 8) & 0xFF);
+ crc.update((v >>> 0) & 0xFF);
+ }
+
+ public static void updateWithLong(CRC32 crc, long v)
+ {
+ updateWithInt(crc, (int) (v >>> 32));
+ updateWithInt(crc, (int) (v & 0xFFFFFFFFL));
+ }
+
+ public static void validate(CRC32 crc, int expectedCRC) throws InvalidCrc
+ {
+ if (expectedCRC != (int) crc.getValue())
+ throw new InvalidCrc(expectedCRC, (int) crc.getValue());
+ }
+
private static final int CRC24_INIT = 0x875060;
/**
* Polynomial chosen from
https://users.ece.cmu.edu/~koopman/crc/index.html, by Philip Koopman
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index a4493f30a2..3a3a96285d 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -1184,6 +1184,7 @@ public class FBUtilities
checksum.update((v >>> 0) & 0xFF);
}
+ // TODO: migrate users to Crc class
public static void updateChecksumInt(Checksum checksum, int v)
{
checksum.update((v >>> 24) & 0xFF);
@@ -1192,6 +1193,7 @@ public class FBUtilities
checksum.update((v >>> 0) & 0xFF);
}
+ // TODO: migrate users to Crc class
public static void updateChecksumLong(Checksum checksum, long v)
{
updateChecksumInt(checksum, (int) (v >>> 32));
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingLogPersisterTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingLogPersisterTest.java
new file mode 100644
index 0000000000..5d0785454e
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingLogPersisterTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tracking;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.log.FuzzTestBase;
+import org.apache.cassandra.harry.SchemaSpec;
+import org.apache.cassandra.harry.dsl.HistoryBuilder;
+import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
+import org.apache.cassandra.harry.execution.InJvmDTestVisitExecutor;
+import org.apache.cassandra.harry.gen.Generator;
+import org.apache.cassandra.harry.gen.SchemaGenerators;
+import org.apache.cassandra.replication.MutationJournal;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.cassandra.harry.checker.TestHelper.withRandom;
+
+public class MutationTrackingLogPersisterTest extends FuzzTestBase
+{
+ private static final int POPULATION = 1000;
+
+ @Test
+ public void testLogPersisterClearsStaticSegments() throws Throwable
+ {
+ try (Cluster cluster = builder().withNodes(3)
+ .withConfig(cfg ->
cfg.with(Feature.NETWORK)
+
.with(Feature.GOSSIP)
+
.set("mutation_tracking_enabled", "true"))
+ .start())
+ {
+ int tables = 3;
+ int writesPerKey = 10;
+ int pks = 100;
+
+ withRandom(rng -> {
+ cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3} " +
+ "AND
replication_type='tracked'",
+ KEYSPACE));
+
+ List<HistoryBuilder> builders = new ArrayList<>();
+ for (int i = 0; i < tables; i++)
+ {
+ Generator<SchemaSpec> schemaGen =
SchemaGenerators.trivialSchema(KEYSPACE, () -> "log_persister_test_" +
(builders.size() + 1), POPULATION,
+
SchemaSpec.optionsBuilder());
+
+ SchemaSpec schema = schemaGen.generate(rng);
+ cluster.schemaChange(schema.compile());
+ builders.add(new
ReplayingHistoryBuilder(schema.valueGenerators,
+ hb ->
InJvmDTestVisitExecutor.builder()
+
.consistencyLevel(ConsistencyLevel.QUORUM)
+
.build(schema, hb, cluster)));
+ }
+
+ int counter = 0;
+ for (int pk = 0; pk < pks; pk++)
+ {
+ for (HistoryBuilder history : builders)
+ for (int i = 0; i < writesPerKey; i++)
+ history.insert(pk);
+
+ if (++counter % 10 == 0)
+ cluster.get(1).runOnInstance(() ->
MutationJournal.instance.closeCurrentSegmentForTestingIfNonEmpty());
+ }
+
+ cluster.forEach(i -> i.nodetoolResult("flush",
KEYSPACE).asserts().success());
+ cluster.forEach(i -> i.runOnInstance(() ->
MutationTrackingService.instance.persistLogStateForTesting()));
+ cluster.forEach(i -> i.runOnInstance(() ->
MutationTrackingService.instance.broadcastOffsetsForTesting()));
+ cluster.forEach(i -> i.runOnInstance(() ->
MutationTrackingService.instance.persistLogStateForTesting()));
+
+ cluster.forEach(i -> i.runOnInstance(() -> {
+ int staticSegments =
MutationJournal.instance.countStaticSegmentsForTesting();
+ Assert.assertEquals("Expected no static segments after log
persister runs", 0, staticSegments);
+ }));
+ });
+ }
+ }
+}
+
diff --git
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
index 36040e115c..3ff5f8e8b0 100644
---
a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
+++
b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.zip.Checksum;
+import java.util.zip.CRC32;
import com.google.common.collect.ImmutableMap;
import com.google.common.jimfs.Jimfs;
@@ -47,7 +47,6 @@ import org.apache.cassandra.io.util.File;
import org.apache.cassandra.journal.Journal;
import org.apache.cassandra.journal.KeySupport;
import org.apache.cassandra.journal.RecordPointer;
-import org.apache.cassandra.journal.SegmentCompactor;
import org.apache.cassandra.journal.ValueSerializer;
import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
@@ -75,14 +74,12 @@ public class AccordJournalSimulationTest extends
SimulationTestBase
AccordSpec.JournalSpec spec = new
AccordSpec.JournalSpec();
spec.flushPeriod = new DurationSpec.IntSecondsBound(1);
- State.journal = new Journal<>("AccordJournal",
- new File("/journal"),
- spec,
- new IdentityKeySerializer(),
- new
IdentityValueSerializer(),
- SegmentCompactor.noop());
+ State.journal =
+ Journal.<String, String>builder("AccordJournal", new
File("/journal"), spec, new IdentityKeySerializer())
+ .valueSerializer(new IdentityValueSerializer())
+ .build();
}),
- () -> check());
+ AccordJournalSimulationTest::check);
}
public static void check()
@@ -237,7 +234,7 @@ public class AccordJournalSimulationTest extends
SimulationTestBase
}
@Override
- public void updateChecksum(Checksum crc, String key, int userVersion)
+ public void updateChecksum(CRC32 crc, String key, int userVersion)
{
crc.update(key.getBytes());
}
diff --git a/test/unit/org/apache/cassandra/journal/JournalTest.java
b/test/unit/org/apache/cassandra/journal/JournalTest.java
index 90b394518c..7cdb7edabb 100644
--- a/test/unit/org/apache/cassandra/journal/JournalTest.java
+++ b/test/unit/org/apache/cassandra/journal/JournalTest.java
@@ -52,7 +52,9 @@ public class JournalTest
directory.deleteRecursiveOnExit();
Journal<TimeUUID, Long> journal =
- new Journal<>("TestJournal", directory, TestParams.ACCORD,
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop());
+ Journal.<TimeUUID, Long>builder("TestJournal", directory,
TestParams.ACCORD, TimeUUIDKeySupport.INSTANCE)
+ .valueSerializer(LongSerializer.INSTANCE)
+ .build();
journal.start();
@@ -73,7 +75,10 @@ public class JournalTest
journal.shutdown();
- journal = new Journal<>("TestJournal", directory, TestParams.ACCORD,
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop());
+ journal =
+ Journal.<TimeUUID, Long>builder("TestJournal", directory,
TestParams.ACCORD, TimeUUIDKeySupport.INSTANCE)
+ .valueSerializer(LongSerializer.INSTANCE)
+ .build();
journal.start();
assertEquals(1L, (long) journal.readLast(id1));
@@ -91,7 +96,9 @@ public class JournalTest
directory.deleteRecursiveOnExit();
Journal<TimeUUID, Long> journal =
- new Journal<>("TestJournalReadAll", directory, TestParams.ACCORD,
TimeUUIDKeySupport.INSTANCE, LongSerializer.INSTANCE, SegmentCompactor.noop());
+ Journal.<TimeUUID, Long>builder("TestJournalReadAll", directory,
TestParams.ACCORD, TimeUUIDKeySupport.INSTANCE)
+ .valueSerializer(LongSerializer.INSTANCE)
+ .build();
journal.start();
@@ -127,11 +134,6 @@ public class JournalTest
{
static final LongSerializer INSTANCE = new LongSerializer();
- public int serializedSize(TimeUUID key, Long value, int userVersion)
- {
- return Long.BYTES;
- }
-
public void serialize(TimeUUID key, Long value, DataOutputPlus out,
int userVersion) throws IOException
{
out.writeLong(value);
diff --git a/test/unit/org/apache/cassandra/journal/SegmentTest.java
b/test/unit/org/apache/cassandra/journal/SegmentTest.java
index 55c54870d8..05a0f43f54 100644
--- a/test/unit/org/apache/cassandra/journal/SegmentTest.java
+++ b/test/unit/org/apache/cassandra/journal/SegmentTest.java
@@ -54,7 +54,8 @@ public class SegmentTest
Descriptor descriptor = Descriptor.create(directory,
System.currentTimeMillis(), 1);
- ActiveSegment<TimeUUID, ByteBuffer> segment =
ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE);
+ ActiveSegment<TimeUUID, ByteBuffer> segment =
+ ActiveSegment.create(descriptor, params(),
TimeUUIDKeySupport.INSTANCE, KeyStats.Factory.noop());
segment.allocate(record1.remaining()).write(id1, record1);
segment.allocate(record2.remaining()).write(id2, record2);
@@ -101,7 +102,8 @@ public class SegmentTest
Descriptor descriptor = Descriptor.create(directory,
System.currentTimeMillis(), 1);
- ActiveSegment<TimeUUID, ByteBuffer> activeSegment =
ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE);
+ ActiveSegment<TimeUUID, ByteBuffer> activeSegment =
+ ActiveSegment.create(descriptor, params(),
TimeUUIDKeySupport.INSTANCE, KeyStats.Factory.noop());
activeSegment.allocate(record1.remaining()).write(id1, record1);
activeSegment.allocate(record2.remaining()).write(id2, record2);
@@ -110,7 +112,8 @@ public class SegmentTest
activeSegment.close(null);
- StaticSegment<TimeUUID, ByteBuffer> staticSegment =
StaticSegment.open(descriptor, TimeUUIDKeySupport.INSTANCE);
+ StaticSegment<TimeUUID, ByteBuffer> staticSegment =
+ StaticSegment.open(descriptor, TimeUUIDKeySupport.INSTANCE,
KeyStats.Factory.noop());
// read all 4 entries by id and compare with originals
EntrySerializer.EntryHolder<TimeUUID> holder = new
EntrySerializer.EntryHolder<>();
@@ -150,7 +153,8 @@ public class SegmentTest
Descriptor descriptor = Descriptor.create(directory,
System.currentTimeMillis(), 1);
- ActiveSegment<TimeUUID, ByteBuffer> activeSegment =
ActiveSegment.create(descriptor, params(), TimeUUIDKeySupport.INSTANCE);
+ ActiveSegment<TimeUUID, ByteBuffer> activeSegment =
+ ActiveSegment.create(descriptor, params(),
TimeUUIDKeySupport.INSTANCE, KeyStats.Factory.noop());
activeSegment.allocate(record1.remaining()).write(id1, record1);
activeSegment.allocate(record2.remaining()).write(id2, record2);
diff --git a/test/unit/org/apache/cassandra/journal/SegmentsTest.java
b/test/unit/org/apache/cassandra/journal/SegmentsTest.java
index ecacb5e197..2e029f5edd 100644
--- a/test/unit/org/apache/cassandra/journal/SegmentsTest.java
+++ b/test/unit/org/apache/cassandra/journal/SegmentsTest.java
@@ -35,7 +35,7 @@ public class SegmentsTest
@Test
public void testSelect()
{
- withRandom(0l, rng -> {
+ withRandom(0L, rng -> {
// Create mock segments with different timestamps
java.io.File file = File.createTempFile("segments", "test");
List<Segment<String, String>> segmentList = new ArrayList<>();
@@ -108,6 +108,13 @@ public class SegmentsTest
}
@Override Index<K> index() { throw new
UnsupportedOperationException(); }
+
+ @Override
+ public KeyStats<K> keyStats()
+ {
+ return KeyStats.noop();
+ }
+
@Override boolean isFlushed(long position) { throw new
UnsupportedOperationException(); }
@Override public void persistMetadata() { throw new
UnsupportedOperationException(); }
@Override boolean read(int offset, int size,
EntrySerializer.EntryHolder<K> into) { throw new
UnsupportedOperationException(); }
diff --git a/test/unit/org/apache/cassandra/journal/TimeUUIDKeySupport.java
b/test/unit/org/apache/cassandra/journal/TimeUUIDKeySupport.java
index 5694a29e7a..c75197511e 100644
--- a/test/unit/org/apache/cassandra/journal/TimeUUIDKeySupport.java
+++ b/test/unit/org/apache/cassandra/journal/TimeUUIDKeySupport.java
@@ -19,14 +19,13 @@ package org.apache.cassandra.journal;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.zip.Checksum;
+import java.util.zip.CRC32;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.Crc;
import org.apache.cassandra.utils.TimeUUID;
-import static org.apache.cassandra.utils.FBUtilities.updateChecksumLong;
-
class TimeUUIDKeySupport implements KeySupport<TimeUUID>
{
static final TimeUUIDKeySupport INSTANCE = new TimeUUIDKeySupport();
@@ -76,10 +75,10 @@ class TimeUUIDKeySupport implements KeySupport<TimeUUID>
}
@Override
- public void updateChecksum(Checksum crc, TimeUUID key, int userVersion)
+ public void updateChecksum(CRC32 crc, TimeUUID key, int userVersion)
{
- updateChecksumLong(crc, key.uuidTimestamp());
- updateChecksumLong(crc, key.lsb());
+ Crc.updateWithLong(crc, key.uuidTimestamp());
+ Crc.updateWithLong(crc, key.lsb());
}
@Override
diff --git
a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
index ba741799aa..72017bc8a1 100644
--- a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
+++ b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java
@@ -33,13 +33,19 @@ import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.Descriptor;
import org.apache.cassandra.journal.TestParams;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.replication.MutationJournal.ActiveOffsetRanges;
+import org.apache.cassandra.replication.MutationJournal.StaticOffsetRanges;
+import org.apache.cassandra.replication.MutationJournal.OffsetRangesFactory;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
/**
* Tests to sanity-check the integration points with Journal
@@ -51,6 +57,7 @@ public class MutationJournalTest
private static final String TABLE = "mjtt";
private static MutationJournal journal;
+ private static File directory;
@BeforeClass
public static void setUp() throws IOException
@@ -63,7 +70,7 @@ public class MutationJournalTest
.addRegularColumn("value",
UTF8Type.instance)
.build());
- File directory = new
File(Files.createTempDirectory("mutation-journal-test-simple"));
+ directory = new
File(Files.createTempDirectory("mutation-journal-test-simple"));
directory.deleteRecursiveOnExit();
journal = new MutationJournal(directory, TestParams.MUTATION_JOURNAL);
@@ -79,13 +86,8 @@ public class MutationJournalTest
@Test
public void testWriteOneReadOne()
{
- Mutation expected =
- new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE,
TABLE), 0, "key")
- .clustering("ck")
- .add("value", "value")
- .build();
-
- ShortMutationId id = new ShortMutationId(100L, 0);
+ ShortMutationId id = id(100L, 0);
+ Mutation expected = mutation("key", "ck", "value");
journal.write(id, expected);
// regular read
@@ -103,22 +105,14 @@ public class MutationJournalTest
@Test
public void testWriteManyReadMany()
{
- Mutation expected1 =
- new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE,
TABLE), 0, "key1")
- .clustering("ck1")
- .add("value", "value1")
- .build();
- Mutation expected2 =
- new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE,
TABLE), 0, "key2")
- .clustering("ck2")
- .add("value", "value2")
- .build();
- List<Mutation> expected = List.of(expected1, expected2);
-
- ShortMutationId id1 = new ShortMutationId(100L, 1);
- ShortMutationId id2 = new ShortMutationId(100L, 2);
+ ShortMutationId id1 = id(100L, 1);
+ ShortMutationId id2 = id(100L, 2);
List<ShortMutationId> ids = List.of(id1, id2);
+ Mutation expected1 = mutation("key1", "ck1", "value1");
+ Mutation expected2 = mutation("key2", "ck2", "value2");
+ List<Mutation> expected = List.of(expected1, expected2);
+
journal.write(id1, expected1);
journal.write(id2, expected2);
@@ -127,6 +121,149 @@ public class MutationJournalTest
assertMutationsEqual(expected, actual);
}
+ @Test
+ public void testActiveOffsetRanges()
+ {
+ {
+ ActiveOffsetRanges ranges = new ActiveOffsetRanges();
+ assertFalse(ranges.mayContain(id(0, 0)));
+ }
+
+ {
+ ActiveOffsetRanges ranges = new ActiveOffsetRanges();
+ ranges.update(id(0, 1));
+ ranges.update(id(0, 9));
+ assertFalse(ranges.mayContain(id(0, 0)));
+ assertFalse(ranges.mayContain(id(0, 10)));
+ for (int i = 1; i < 10; i++)
+ assertTrue(ranges.mayContain(id(0, i)));
+ }
+ }
+
+ @Test
+ public void testStaticOffsetRanges()
+ {
+ Descriptor descriptor = Descriptor.create(directory, 0, 1);
+
+ ActiveOffsetRanges active = new ActiveOffsetRanges();
+ for (int l = 1; l < 11; l++)
+ {
+ for (int o = 5; o > 0 ; o--) active.update(id(l, o));
+ for (int o = 6; o < 11; o++) active.update(id(l, o));
+ }
+
+ active.persist(descriptor);
+ StaticOffsetRanges loaded =
OffsetRangesFactory.INSTANCE.load(descriptor);
+ assertEquals(active.asMap(), loaded.asMap());
+
+ // absent log ids
+ for (int o = 0; o < 11; o++)
+ {
+ assertFalse(active.mayContain(id(0, o)));
+ assertFalse(loaded.mayContain(id(0, o)));
+ assertFalse(active.mayContain(id(11, o)));
+ assertFalse(loaded.mayContain(id(11, o)));
+ }
+
+ // present log ids
+ for (int l = 1; l < 11; l++)
+ {
+ assertFalse(active.mayContain(id(l, 0)));
+ assertFalse(loaded.mayContain(id(l, 0)));
+ assertFalse(active.mayContain(id(l, 11)));
+ assertFalse(loaded.mayContain(id(l, 11)));
+ for (int o = 1; o < 11; o++)
+ {
+ assertTrue(active.mayContain(id(l, o)));
+ assertTrue(loaded.mayContain(id(l, o)));
+ }
+ }
+ }
+
+ @Test
+ public void testDropReconcileSegments()
+ {
+ ShortMutationId id1 = id(100L, 0);
+ ShortMutationId id2 = id(100L, 1);
+ ShortMutationId id3 = id(200L, 2);
+ ShortMutationId id4 = id(200L, 3);
+
+ Mutation mutation1 = mutation("key1", "ck1", "value1");
+ Mutation mutation2 = mutation("key2", "ck2", "value2");
+ Mutation mutation3 = mutation("key3", "ck3", "value3");
+ Mutation mutation4 = mutation("key4", "ck4", "value4");
+
+ // write two mutations to the first segment and flush it to make static
+ journal.write(id1, mutation1);
+ journal.write(id2, mutation2);
+ journal.closeCurrentSegmentForTestingIfNonEmpty();
+
+ // write two mutations to the second segment and flush it to make
static
+ journal.write(id3, mutation3);
+ journal.write(id4, mutation4);
+ journal.closeCurrentSegmentForTestingIfNonEmpty();
+
+ {
+ // call dropReconciledSegments() with a log2offsets map that
covers both segments fully
+ // *BUT* with the segments still marked as needing replay nothing
should be dropped
+ Log2OffsetsMap.Immutable.Builder builder = new
Log2OffsetsMap.Immutable.Builder();
+ builder.add(id1);
+ builder.add(id2);
+ builder.add(id3);
+ builder.add(id4);
+ assertEquals(0, journal.dropReconciledSegments(builder.build()));
+ // confirm that no static segments have been dropped
+ assertEquals(2, journal.countStaticSegmentsForTesting());
+ }
+
+ // mark both segments as not needing replay
+ journal.clearNeedsReplayForTesting();
+
+ {
+ // call dropReconciledSegments() with a log2offsets map that
doesn't cover any segments fully
+ Log2OffsetsMap.Immutable.Builder builder = new
Log2OffsetsMap.Immutable.Builder();
+ builder.add(id1);
+ assertEquals(0, journal.dropReconciledSegments(builder.build()));
+ // confirm that no static segments got dropped
+ assertEquals(2, journal.countStaticSegmentsForTesting());
+ }
+
+ {
+ // call dropReconciledSegments() with a log2offsets map that
covers only the first segment fully
+ Log2OffsetsMap.Immutable.Builder builder = new
Log2OffsetsMap.Immutable.Builder();
+ builder.add(id1);
+ builder.add(id2);
+ assertEquals(1, journal.dropReconciledSegments(builder.build()));
+ // confirm that only one static segment got dropped
+ assertEquals(1, journal.countStaticSegmentsForTesting());
+ }
+
+ {
+ // call dropReconciledSegments() with a log2offsets map that
covers both segments fully
+ Log2OffsetsMap.Immutable.Builder builder = new
Log2OffsetsMap.Immutable.Builder();
+ builder.add(id1);
+ builder.add(id2);
+ builder.add(id3);
+ builder.add(id4);
+ assertEquals(1, journal.dropReconciledSegments(builder.build()));
+ // confirm that all static segments have now been dropped
+ assertEquals(0, journal.countStaticSegmentsForTesting());
+ }
+ }
+
+ private ShortMutationId id(long logId, int offset)
+ {
+ return new ShortMutationId(logId, offset);
+ }
+
+ private Mutation mutation(String pk, String ck, String column)
+ {
+ return new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE,
TABLE), 0, pk)
+ .clustering(ck)
+ .add("value", column)
+ .build();
+ }
+
public static void assertMutationEquals(Mutation expected, Mutation actual)
{
if (!serialize(expected).equals(serialize(actual)))
diff --git a/test/unit/org/apache/cassandra/replication/OffsetsTest.java
b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
index b0d5edda0f..1b39ff000e 100644
--- a/test/unit/org/apache/cassandra/replication/OffsetsTest.java
+++ b/test/unit/org/apache/cassandra/replication/OffsetsTest.java
@@ -861,9 +861,51 @@ public class OffsetsTest
}
}
+ @Test
public void asListFromListRoundTripTest()
{
for (Offsets.Mutable offsets : new Offsets.Mutable[] { offsets(),
offsets(1, 2), offsets(1, 3, 7, 9) })
assertOffsetsEqual(offsets, Offsets.fromList(LOG_ID,
offsets.asList()));
}
+
+ @Test
+ public void testContainsRange()
+ {
+ {
+ Offsets.Mutable offsets = offsets();
+ assertFalse(offsets.containsRange(0, 1));
+ }
+
+ {
+ Offsets.Mutable offsets = offsets(2, 4);
+
+ assertTrue(offsets.containsRange(2, 4));
+ assertTrue(offsets.containsRange(3, 4));
+ assertTrue(offsets.containsRange(2, 3));
+
+ assertFalse(offsets.containsRange(1, 1));
+ assertFalse(offsets.containsRange(1, 2));
+ assertFalse(offsets.containsRange(1, 3));
+ assertFalse(offsets.containsRange(1, 4));
+
+ assertFalse(offsets.containsRange(2, 5));
+ assertFalse(offsets.containsRange(3, 5));
+ assertFalse(offsets.containsRange(4, 5));
+ assertFalse(offsets.containsRange(5, 5));
+ }
+
+ {
+ Offsets.Mutable offsets = offsets(2, 4, 6, 8);
+
+ assertTrue(offsets.containsRange(2, 4));
+ assertTrue(offsets.containsRange(6, 8));
+
+ assertFalse(offsets.containsRange(0, 2));
+ assertFalse(offsets.containsRange(3, 5));
+ assertFalse(offsets.containsRange(4, 6));
+ assertFalse(offsets.containsRange(5, 7));
+ assertFalse(offsets.containsRange(7, 9));
+ assertFalse(offsets.containsRange(9, 9));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]