This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cep-45-mutation-tracking in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9d5ebe159c4fd5ad5e7c3c941f3cd2e8c755a46e Author: Aleksey Yeschenko <alek...@apache.org> AuthorDate: Thu Jan 9 14:14:44 2025 +0000 Introduce MutationJournal for coordinator logs patch by Aleksey Yeschenko; reviewed by Blake Eggleston for CASSANDRA-20353 --- .../apache/cassandra/journal/ActiveSegment.java | 5 + src/java/org/apache/cassandra/journal/Flusher.java | 15 +- src/java/org/apache/cassandra/journal/Journal.java | 5 +- src/java/org/apache/cassandra/journal/Segment.java | 4 +- .../metrics/CassandraMetricsRegistry.java | 1 + .../cassandra/service/tracking/MutationId.java | 84 ++++++++ .../service/tracking/MutationJournal.java | 235 +++++++++++++++++++++ .../service/tracking/MutationJournalTest.java | 154 ++++++++++++++ 8 files changed, 493 insertions(+), 10 deletions(-) diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java b/src/java/org/apache/cassandra/journal/ActiveSegment.java index af02885547..30e4e8f27d 100644 --- a/src/java/org/apache/cassandra/journal/ActiveSegment.java +++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java @@ -497,6 +497,11 @@ final class ActiveSegment<K, V> extends Segment<K, V> { return start; } + + RecordPointer recordPointer() + { + return new RecordPointer(descriptor.timestamp, start); + } } private int maybeCompleteInProgress() diff --git a/src/java/org/apache/cassandra/journal/Flusher.java b/src/java/org/apache/cassandra/journal/Flusher.java index 2aba68d8ea..4648ebcb61 100644 --- a/src/java/org/apache/cassandra/journal/Flusher.java +++ b/src/java/org/apache/cassandra/journal/Flusher.java @@ -383,7 +383,7 @@ final class Flusher<K, V> private interface Mode<K, V> { - void flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc); + RecordPointer flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc); RecordPointer flushAsync(ActiveSegment<K, V>.Allocation alloc); boolean isDurable(RecordPointer recordPointer); } @@ -391,13 +391,14 @@ final class Flusher<K, V> private class BatchMode implements Mode<K, V> { @Override - public void flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc) + public RecordPointer flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc) { pending.incrementAndGet(); requestExtraFlush(); alloc.awaitDurable(journal.metrics.waitingOnFlush); pending.decrementAndGet(); written.incrementAndGet(); + return alloc.recordPointer(); } @Override @@ -405,7 +406,7 @@ final class Flusher<K, V> { requestExtraFlush(); written.incrementAndGet(); - return new RecordPointer(alloc.descriptor().timestamp, alloc.start()); + return alloc.recordPointer(); } @Override @@ -418,19 +419,20 @@ final class Flusher<K, V> private class GroupMode implements Mode<K, V> { @Override - public void flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc) + public RecordPointer flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc) { pending.incrementAndGet(); alloc.awaitDurable(journal.metrics.waitingOnFlush); pending.decrementAndGet(); written.incrementAndGet(); + return alloc.recordPointer(); } @Override public RecordPointer flushAsync(ActiveSegment<K, V>.Allocation alloc) { written.incrementAndGet(); - return new RecordPointer(alloc.descriptor().timestamp, alloc.start()); + return alloc.recordPointer(); } @Override @@ -443,7 +445,7 @@ final class Flusher<K, V> private class PeriodicMode implements Mode<K, V> { @Override - public void flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc) + public RecordPointer flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc) { RecordPointer pointer = flushAsync(alloc); @@ -454,6 +456,7 @@ final class Flusher<K, V> awaitFsyncAt(expectedFsyncTime, journal.metrics.waitingOnFlush.time()); pending.decrementAndGet(); } + return pointer; } @Override diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index bfebb71204..cebfb7a702 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -261,12 +261,12 @@ public class Journal<K, V> implements Shutdownable segmentPrepared.signalAll(); // Wake up all threads waiting on the new segment compactor.shutdown(); compactor.awaitTermination(1, TimeUnit.MINUTES); + closeAllSegments(); flusher.shutdown(); closer.shutdown(); releaser.shutdown(); closer.awaitTermination(1, TimeUnit.MINUTES); releaser.awaitTermination(1, TimeUnit.MINUTES); - closeAllSegments(); metrics.deregister(); Invariants.checkState(state.compareAndSet(State.SHUTDOWN, State.TERMINATED), "Unexpected journal state while trying to shut down", state); @@ -446,7 +446,7 @@ public class Journal<K, V> implements Shutdownable * @param id user-provided record id, expected to roughly correlate with time and go up * @param record the record to store */ - public void blockingWrite(K id, V record) + public RecordPointer blockingWrite(K id, V record) { try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get()) { @@ -454,6 +454,7 @@ public class Journal<K, V> implements Shutdownable ActiveSegment<K, V>.Allocation alloc = allocate(dob.getLength()); alloc.writeInternal(id, dob.unsafeGetBufferAndFlip()); flusher.flushAndAwaitDurable(alloc); + return alloc.recordPointer(); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/journal/Segment.java b/src/java/org/apache/cassandra/journal/Segment.java index a5053a9629..05ae0ea86c 100644 --- a/src/java/org/apache/cassandra/journal/Segment.java +++ b/src/java/org/apache/cassandra/journal/Segment.java @@ -86,7 +86,7 @@ public abstract class Segment<K, V> implements SelfRefCounted<Segment<K, V>>, Co int size = Index.readSize(offsetAndSize); if (read(offset, size, into)) { - Invariants.checkState(id.equals(into.key), "Index for %s read incorrect key: expected %s but read %s", descriptor, id, into.key); + Invariants.checkState(id.equals(into.key)); consumer.accept(descriptor.timestamp, offset, id, into.value, descriptor.userVersion); return true; } @@ -98,7 +98,7 @@ public abstract class Segment<K, V> implements SelfRefCounted<Segment<K, V>>, Co long offsetAndSize = index().lookUpLast(id); if (offsetAndSize == -1 || !read(Index.readOffset(offsetAndSize), Index.readSize(offsetAndSize), into)) return false; - Invariants.checkState(id.equals(into.key), "Index for %s read incorrect key: expected %s but read %s", descriptor, id, into.key); + Invariants.checkState(id.equals(into.key)); return true; } diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java index 8cf83f5208..d901324c03 100644 --- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java +++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java @@ -132,6 +132,7 @@ public class CassandraMetricsRegistry extends MetricRegistry .add(HintsServiceMetrics.TYPE_NAME) .add(InternodeInboundMetrics.TYPE_NAME) .add(InternodeOutboundMetrics.TYPE_NAME) + .add(org.apache.cassandra.journal.Metrics.TYPE_NAME) .add(KeyspaceMetrics.TYPE_NAME) .add(MemtablePool.TYPE_NAME) .add(MessagingMetrics.TYPE_NAME) diff --git a/src/java/org/apache/cassandra/service/tracking/MutationId.java b/src/java/org/apache/cassandra/service/tracking/MutationId.java new file mode 100644 index 0000000000..9132676530 --- /dev/null +++ b/src/java/org/apache/cassandra/service/tracking/MutationId.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.tracking; + +public class MutationId +{ + /** + * 4 byte TCM host id + 4 byte host log id packed into a long. + * Host log ID is unique within the host, allocated + * anew on host restart - one per token range replicated by the host, + * persisted on allocation, unique within the host. + */ + public final long logId; + + /** + * 4 byte position + 4 byte timestamp packed into a long. + * Position is incremented, the timestamp is monotonically non-decreasing. + * The position is enough to identify the entry within a coordinator log, + * the timestamp is added for correlation purposes. + */ + public final long sequenceId; + + MutationId(long logId, long sequenceId) + { + this.logId = logId; + this.sequenceId = sequenceId; + } + + public int hostId() + { + return (int) (0xffffffffL & (logId >> 32)); + } + + public int hostLogId() + { + return (int) (0xffffffffL & logId); + } + + public int position() + { + return (int) (0xffffffffL & (sequenceId >> 32)); + } + + public int timestamp() + { + return (int) (0xffffffffL & sequenceId); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof MutationId)) return false; + MutationId that = (MutationId) o; + return this.logId == that.logId && this.sequenceId == that.sequenceId; + } + + @Override + public int hashCode() + { + return Long.hashCode(logId) + 31 * Long.hashCode(sequenceId); + } + + @Override + public String toString() + { + return "MutationId{" + logId + ", " + sequenceId + '}'; + } +} diff --git a/src/java/org/apache/cassandra/service/tracking/MutationJournal.java b/src/java/org/apache/cassandra/service/tracking/MutationJournal.java new file mode 100644 index 0000000000..3f00e4387c --- /dev/null +++ b/src/java/org/apache/cassandra/service/tracking/MutationJournal.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.tracking; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.zip.Checksum; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.journal.Journal; +import org.apache.cassandra.journal.KeySupport; +import org.apache.cassandra.journal.Params; +import org.apache.cassandra.journal.RecordConsumer; +import org.apache.cassandra.journal.RecordPointer; +import org.apache.cassandra.journal.SegmentCompactor; +import org.apache.cassandra.journal.ValueSerializer; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; + +public class MutationJournal +{ + private final Journal<MutationId, Mutation> journal; + + private MutationJournal() + { + this(new File(DatabaseDescriptor.getCommitLogLocation()), new JournalParams()); + } + + @VisibleForTesting + MutationJournal(File directory, Params params) + { + journal = new Journal<>("MutationJournal", directory, params, new MutationIdSupport(), new MutationSerializer(), SegmentCompactor.noop()); + } + + public void start() + { + journal.start(); + } + + public void shutdownBlocking() + { + journal.shutdown(); + } + + public RecordPointer write(MutationId id, Mutation mutation) + { + return journal.blockingWrite(id, mutation); + } + + @Nullable + public Mutation read(MutationId id) + { + return journal.readLast(id); + } + + public boolean read(MutationId id, RecordConsumer<MutationId> consumer) + { + return journal.readLast(id, consumer); + } + + public void readAll(Iterable<MutationId> ids, Collection<Mutation> into) + { + for (MutationId id : ids) + { + Mutation mutation = read(id); + Preconditions.checkState(mutation != null); + into.add(mutation); + } + } + + static class JournalParams implements Params + { + @Override + public int segmentSize() + { + return DatabaseDescriptor.getCommitLogSegmentSize(); + } + + @Override + public FailurePolicy failurePolicy() + { + return FailurePolicy.STOP; + } + + @Override + public FlushMode flushMode() + { + return FlushMode.PERIODIC; + } + + @Override + public boolean enableCompaction() + { + return false; + } + + @Override + public long compactionPeriod(TimeUnit units) + { + return 0; + } + + @Override + public long flushPeriod(TimeUnit units) + { + return units.convert(DatabaseDescriptor.getCommitLogSyncPeriod(), TimeUnit.MILLISECONDS); + } + + @Override + public long periodicBlockPeriod(TimeUnit units) + { + return units.convert(DatabaseDescriptor.getPeriodicCommitLogSyncBlock(), TimeUnit.MILLISECONDS); + } + + @Override + public int userVersion() + { + return MessagingService.current_version; + } + } + + static class MutationIdSupport implements KeySupport<MutationId> + { + static final int LOG_ID_OFFSET = 0; + static final int SEQUENCE_ID_OFFSET = LOG_ID_OFFSET + TypeSizes.LONG_SIZE; + + @Override + public int serializedSize(int userVersion) + { + return TypeSizes.LONG_SIZE // logId + + TypeSizes.LONG_SIZE; // sequenceId + } + + @Override + public void serialize(MutationId id, DataOutputPlus out, int userVersion) throws IOException + { + out.writeLong(id.logId); + out.writeLong(id.sequenceId); + } + + @Override + public void serialize(MutationId id, ByteBuffer out, int userVersion) throws IOException + { + out.putLong(id.logId); + out.putLong(id.sequenceId); + } + + @Override + public MutationId deserialize(DataInputPlus in, int userVersion) throws IOException + { + long logId = in.readLong(); + long sequenceId = in.readLong(); + return new MutationId(logId, sequenceId); + } + + @Override + public MutationId deserialize(ByteBuffer buffer, int position, int userVersion) + { + long logId = buffer.getLong(position + LOG_ID_OFFSET); + long sequenceId = buffer.getLong(position + SEQUENCE_ID_OFFSET); + return new MutationId(logId, sequenceId); + } + + @Override + public MutationId deserialize(ByteBuffer buffer, int userVersion) + { + long logId = buffer.getLong(); + long sequenceId = buffer.getLong(); + return new MutationId(logId, sequenceId); + } + + @Override + public void updateChecksum(Checksum crc, MutationId id, int userVersion) + { + FBUtilities.updateChecksumLong(crc, id.logId); + FBUtilities.updateChecksumLong(crc, id.sequenceId); + } + + @Override + public int compareWithKeyAt(MutationId id, ByteBuffer buffer, int position, int userVersion) + { + int cmp = Long.compare(id.logId, buffer.getLong(position + LOG_ID_OFFSET)); + return cmp != 0 ? cmp : Long.compare(id.sequenceId, buffer.getLong(position + SEQUENCE_ID_OFFSET)); + } + + @Override + public int compare(MutationId id1, MutationId id2) + { + int cmp = Long.compare(id1.logId, id2.logId); + return cmp != 0 ? cmp : Long.compare(id1.sequenceId, id2.sequenceId); + } + } + + static class MutationSerializer implements ValueSerializer<MutationId, Mutation> + { + @Override + public void serialize(MutationId id, Mutation mutation, DataOutputPlus out, int userVersion) throws IOException + { + Mutation.serializer.serialize(mutation, out, userVersion); + } + + @Override + public Mutation deserialize(MutationId id, DataInputPlus in, int userVersion) throws IOException + { + return Mutation.serializer.deserialize(in, userVersion); + } + } +} diff --git a/test/unit/org/apache/cassandra/service/tracking/MutationJournalTest.java b/test/unit/org/apache/cassandra/service/tracking/MutationJournalTest.java new file mode 100644 index 0000000000..55e97f57bd --- /dev/null +++ b/test/unit/org/apache/cassandra/service/tracking/MutationJournalTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.tracking; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.journal.TestParams; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; + +import static org.junit.Assert.assertEquals; + +/** + * Tests to sanity-check the integration points with Journal + * (mutation id and mutation ser/de, comparison, etc.) + */ +public class MutationJournalTest +{ + private static final String KEYSPACE = "mjtks"; + private static final String TABLE = "mjtt"; + + private static MutationJournal journal; + + @BeforeClass + public static void setUp() throws IOException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(3), + TableMetadata.builder(KEYSPACE, TABLE) + .addPartitionKeyColumn("pk", UTF8Type.instance) + .addClusteringColumn("ck", UTF8Type.instance) + .addRegularColumn("value", UTF8Type.instance) + .build()); + + File directory = new File(Files.createTempDirectory("mutation-journal-test-simple")); + directory.deleteRecursiveOnExit(); + + journal = new MutationJournal(directory, TestParams.INSTANCE); + journal.start(); + } + + @AfterClass + public static void tearDown() + { + journal.shutdownBlocking(); + } + + @Test + public void testWriteOneReadOne() + { + Mutation expected = + new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, TABLE), 0, "key") + .clustering("ck") + .add("value", "value") + .build(); + + MutationId id = new MutationId(100L, 0); + journal.write(id, expected); + + // regular read + Mutation actual = journal.read(id); + assertMutationEquals(expected, actual); + + // read via RecordConsumer + journal.read(id, ((segment, position, key, buffer, userVersion) -> + { + assertEquals(id, key); + assertEquals(serialize(expected), buffer); + })); + } + + @Test + public void testWriteManyReadMany() + { + Mutation expected1 = + new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, TABLE), 0, "key1") + .clustering("ck1") + .add("value", "value1") + .build(); + Mutation expected2 = + new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, TABLE), 0, "key2") + .clustering("ck2") + .add("value", "value2") + .build(); + List<Mutation> expected = List.of(expected1, expected2); + + MutationId id1 = new MutationId(100L, 1); + MutationId id2 = new MutationId(100L, 2); + List<MutationId> ids = List.of(id1, id2); + + journal.write(id1, expected1); + journal.write(id2, expected2); + + List<Mutation> actual = new ArrayList<>(); + journal.readAll(ids, actual); + assertMutationsEqual(expected, actual); + } + + private static void assertMutationEquals(Mutation expected, Mutation actual) + { + assertEquals(serialize(expected), serialize(actual)); + } + + private static void assertMutationsEqual(List<Mutation> expected, List<Mutation> actual) + { + assertEquals(expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i++) + assertMutationEquals(expected.get(i), actual.get(i)); + } + + private static ByteBuffer serialize(Mutation mutation) + { + try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get()) + { + Mutation.serializer.serialize(mutation, out, MessagingService.maximum_version); + return out.asNewBuffer(); + } + catch (IOException e) + { + throw new AssertionError(e); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org