This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9d5ebe159c4fd5ad5e7c3c941f3cd2e8c755a46e
Author: Aleksey Yeschenko <alek...@apache.org>
AuthorDate: Thu Jan 9 14:14:44 2025 +0000

    Introduce MutationJournal for coordinator logs
    
    patch by Aleksey Yeschenko; reviewed by Blake Eggleston for
    CASSANDRA-20353
---
 .../apache/cassandra/journal/ActiveSegment.java    |   5 +
 src/java/org/apache/cassandra/journal/Flusher.java |  15 +-
 src/java/org/apache/cassandra/journal/Journal.java |   5 +-
 src/java/org/apache/cassandra/journal/Segment.java |   4 +-
 .../metrics/CassandraMetricsRegistry.java          |   1 +
 .../cassandra/service/tracking/MutationId.java     |  84 ++++++++
 .../service/tracking/MutationJournal.java          | 235 +++++++++++++++++++++
 .../service/tracking/MutationJournalTest.java      | 154 ++++++++++++++
 8 files changed, 493 insertions(+), 10 deletions(-)

diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java 
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index af02885547..30e4e8f27d 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -497,6 +497,11 @@ final class ActiveSegment<K, V> extends Segment<K, V>
         {
             return start;
         }
+
+        RecordPointer recordPointer()
+        {
+            return new RecordPointer(descriptor.timestamp, start);
+        }
     }
 
     private int maybeCompleteInProgress()
diff --git a/src/java/org/apache/cassandra/journal/Flusher.java 
b/src/java/org/apache/cassandra/journal/Flusher.java
index 2aba68d8ea..4648ebcb61 100644
--- a/src/java/org/apache/cassandra/journal/Flusher.java
+++ b/src/java/org/apache/cassandra/journal/Flusher.java
@@ -383,7 +383,7 @@ final class Flusher<K, V>
 
     private interface Mode<K, V>
     {
-        void flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc);
+        RecordPointer flushAndAwaitDurable(ActiveSegment<K, V>.Allocation 
alloc);
         RecordPointer flushAsync(ActiveSegment<K, V>.Allocation alloc);
         boolean isDurable(RecordPointer recordPointer);
     }
@@ -391,13 +391,14 @@ final class Flusher<K, V>
     private class BatchMode implements Mode<K, V>
     {
         @Override
-        public void flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc)
+        public RecordPointer flushAndAwaitDurable(ActiveSegment<K, 
V>.Allocation alloc)
         {
             pending.incrementAndGet();
             requestExtraFlush();
             alloc.awaitDurable(journal.metrics.waitingOnFlush);
             pending.decrementAndGet();
             written.incrementAndGet();
+            return alloc.recordPointer();
         }
 
         @Override
@@ -405,7 +406,7 @@ final class Flusher<K, V>
         {
             requestExtraFlush();
             written.incrementAndGet();
-            return new RecordPointer(alloc.descriptor().timestamp, 
alloc.start());
+            return alloc.recordPointer();
         }
 
         @Override
@@ -418,19 +419,20 @@ final class Flusher<K, V>
     private class GroupMode implements Mode<K, V>
     {
         @Override
-        public void flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc)
+        public RecordPointer flushAndAwaitDurable(ActiveSegment<K, 
V>.Allocation alloc)
         {
             pending.incrementAndGet();
             alloc.awaitDurable(journal.metrics.waitingOnFlush);
             pending.decrementAndGet();
             written.incrementAndGet();
+            return alloc.recordPointer();
         }
 
         @Override
         public RecordPointer flushAsync(ActiveSegment<K, V>.Allocation alloc)
         {
             written.incrementAndGet();
-            return new RecordPointer(alloc.descriptor().timestamp, 
alloc.start());
+            return alloc.recordPointer();
         }
 
         @Override
@@ -443,7 +445,7 @@ final class Flusher<K, V>
     private class PeriodicMode implements Mode<K, V>
     {
         @Override
-        public void flushAndAwaitDurable(ActiveSegment<K, V>.Allocation alloc)
+        public RecordPointer flushAndAwaitDurable(ActiveSegment<K, 
V>.Allocation alloc)
         {
             RecordPointer pointer = flushAsync(alloc);
 
@@ -454,6 +456,7 @@ final class Flusher<K, V>
                 awaitFsyncAt(expectedFsyncTime, 
journal.metrics.waitingOnFlush.time());
                 pending.decrementAndGet();
             }
+            return pointer;
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index bfebb71204..cebfb7a702 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -261,12 +261,12 @@ public class Journal<K, V> implements Shutdownable
             segmentPrepared.signalAll(); // Wake up all threads waiting on the 
new segment
             compactor.shutdown();
             compactor.awaitTermination(1, TimeUnit.MINUTES);
+            closeAllSegments();
             flusher.shutdown();
             closer.shutdown();
             releaser.shutdown();
             closer.awaitTermination(1, TimeUnit.MINUTES);
             releaser.awaitTermination(1, TimeUnit.MINUTES);
-            closeAllSegments();
             metrics.deregister();
             Invariants.checkState(state.compareAndSet(State.SHUTDOWN, 
State.TERMINATED),
                                   "Unexpected journal state while trying to 
shut down", state);
@@ -446,7 +446,7 @@ public class Journal<K, V> implements Shutdownable
      * @param id user-provided record id, expected to roughly correlate with 
time and go up
      * @param record the record to store
      */
-    public void blockingWrite(K id, V record)
+    public RecordPointer blockingWrite(K id, V record)
     {
         try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
         {
@@ -454,6 +454,7 @@ public class Journal<K, V> implements Shutdownable
             ActiveSegment<K, V>.Allocation alloc = allocate(dob.getLength());
             alloc.writeInternal(id, dob.unsafeGetBufferAndFlip());
             flusher.flushAndAwaitDurable(alloc);
+            return alloc.recordPointer();
         }
         catch (IOException e)
         {
diff --git a/src/java/org/apache/cassandra/journal/Segment.java 
b/src/java/org/apache/cassandra/journal/Segment.java
index a5053a9629..05ae0ea86c 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -86,7 +86,7 @@ public abstract class Segment<K, V> implements 
SelfRefCounted<Segment<K, V>>, Co
         int size = Index.readSize(offsetAndSize);
         if (read(offset, size, into))
         {
-            Invariants.checkState(id.equals(into.key), "Index for %s read 
incorrect key: expected %s but read %s", descriptor, id, into.key);
+            Invariants.checkState(id.equals(into.key));
             consumer.accept(descriptor.timestamp, offset, id, into.value, 
descriptor.userVersion);
             return true;
         }
@@ -98,7 +98,7 @@ public abstract class Segment<K, V> implements 
SelfRefCounted<Segment<K, V>>, Co
         long offsetAndSize = index().lookUpLast(id);
         if (offsetAndSize == -1 || !read(Index.readOffset(offsetAndSize), 
Index.readSize(offsetAndSize), into))
             return false;
-        Invariants.checkState(id.equals(into.key), "Index for %s read 
incorrect key: expected %s but read %s", descriptor, id, into.key);
+        Invariants.checkState(id.equals(into.key));
         return true;
     }
 
diff --git 
a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java 
b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 8cf83f5208..d901324c03 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -132,6 +132,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
                                    .add(HintsServiceMetrics.TYPE_NAME)
                                    .add(InternodeInboundMetrics.TYPE_NAME)
                                    .add(InternodeOutboundMetrics.TYPE_NAME)
+                                   
.add(org.apache.cassandra.journal.Metrics.TYPE_NAME)
                                    .add(KeyspaceMetrics.TYPE_NAME)
                                    .add(MemtablePool.TYPE_NAME)
                                    .add(MessagingMetrics.TYPE_NAME)
diff --git a/src/java/org/apache/cassandra/service/tracking/MutationId.java 
b/src/java/org/apache/cassandra/service/tracking/MutationId.java
new file mode 100644
index 0000000000..9132676530
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/tracking/MutationId.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.tracking;
+
+public class MutationId
+{
+    /**
+     * 4 byte TCM host id + 4 byte host log id packed into a long.
+     * Host log ID is unique within the host, allocated
+     * anew on host restart - one per token range replicated by the host,
+     * persisted on allocation, unique within the host.
+     */
+    public final long logId;
+
+    /**
+     * 4 byte position + 4 byte timestamp packed into a long.
+     * Position is incremented, the timestamp is monotonically non-decreasing.
+     * The position is enough to identify the entry within a coordinator log,
+     * the timestamp is added for correlation purposes.
+     */
+    public final long sequenceId;
+
+    MutationId(long logId, long sequenceId)
+    {
+        this.logId = logId;
+        this.sequenceId = sequenceId;
+    }
+
+    public int hostId()
+    {
+        return (int) (0xffffffffL & (logId >> 32));
+    }
+
+    public int hostLogId()
+    {
+        return (int) (0xffffffffL & logId);
+    }
+
+    public int position()
+    {
+        return (int) (0xffffffffL & (sequenceId >> 32));
+    }
+
+    public int timestamp()
+    {
+        return (int) (0xffffffffL & sequenceId);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof MutationId)) return false;
+        MutationId that = (MutationId) o;
+        return this.logId == that.logId && this.sequenceId == that.sequenceId;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Long.hashCode(logId) + 31 * Long.hashCode(sequenceId);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "MutationId{" + logId + ", " + sequenceId + '}';
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/tracking/MutationJournal.java 
b/src/java/org/apache/cassandra/service/tracking/MutationJournal.java
new file mode 100644
index 0000000000..3f00e4387c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/tracking/MutationJournal.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.tracking;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.Checksum;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.KeySupport;
+import org.apache.cassandra.journal.Params;
+import org.apache.cassandra.journal.RecordConsumer;
+import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.SegmentCompactor;
+import org.apache.cassandra.journal.ValueSerializer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class MutationJournal
+{
+    private final Journal<MutationId, Mutation> journal;
+
+    private MutationJournal()
+    {
+        this(new File(DatabaseDescriptor.getCommitLogLocation()), new 
JournalParams());
+    }
+
+    @VisibleForTesting
+    MutationJournal(File directory, Params params)
+    {
+        journal = new Journal<>("MutationJournal", directory, params, new 
MutationIdSupport(), new MutationSerializer(), SegmentCompactor.noop());
+    }
+
+    public void start()
+    {
+        journal.start();
+    }
+
+    public void shutdownBlocking()
+    {
+        journal.shutdown();
+    }
+
+    public RecordPointer write(MutationId id, Mutation mutation)
+    {
+        return journal.blockingWrite(id, mutation);
+    }
+
+    @Nullable
+    public Mutation read(MutationId id)
+    {
+        return journal.readLast(id);
+    }
+
+    public boolean read(MutationId id, RecordConsumer<MutationId> consumer)
+    {
+        return journal.readLast(id, consumer);
+    }
+
+    public void readAll(Iterable<MutationId> ids, Collection<Mutation> into)
+    {
+        for (MutationId id : ids)
+        {
+            Mutation mutation = read(id);
+            Preconditions.checkState(mutation != null);
+            into.add(mutation);
+        }
+    }
+
+    static class JournalParams implements Params
+    {
+        @Override
+        public int segmentSize()
+        {
+            return DatabaseDescriptor.getCommitLogSegmentSize();
+        }
+
+        @Override
+        public FailurePolicy failurePolicy()
+        {
+            return FailurePolicy.STOP;
+        }
+
+        @Override
+        public FlushMode flushMode()
+        {
+            return FlushMode.PERIODIC;
+        }
+
+        @Override
+        public boolean enableCompaction()
+        {
+            return false;
+        }
+
+        @Override
+        public long compactionPeriod(TimeUnit units)
+        {
+            return 0;
+        }
+
+        @Override
+        public long flushPeriod(TimeUnit units)
+        {
+            return units.convert(DatabaseDescriptor.getCommitLogSyncPeriod(), 
TimeUnit.MILLISECONDS);
+        }
+
+        @Override
+        public long periodicBlockPeriod(TimeUnit units)
+        {
+            return 
units.convert(DatabaseDescriptor.getPeriodicCommitLogSyncBlock(), 
TimeUnit.MILLISECONDS);
+        }
+
+        @Override
+        public int userVersion()
+        {
+            return MessagingService.current_version;
+        }
+    }
+
+    static class MutationIdSupport implements KeySupport<MutationId>
+    {
+        static final int LOG_ID_OFFSET = 0;
+        static final int SEQUENCE_ID_OFFSET = LOG_ID_OFFSET + 
TypeSizes.LONG_SIZE;
+
+        @Override
+        public int serializedSize(int userVersion)
+        {
+            return TypeSizes.LONG_SIZE  // logId
+                 + TypeSizes.LONG_SIZE; // sequenceId
+        }
+
+        @Override
+        public void serialize(MutationId id, DataOutputPlus out, int 
userVersion) throws IOException
+        {
+            out.writeLong(id.logId);
+            out.writeLong(id.sequenceId);
+        }
+
+        @Override
+        public void serialize(MutationId id, ByteBuffer out, int userVersion) 
throws IOException
+        {
+            out.putLong(id.logId);
+            out.putLong(id.sequenceId);
+        }
+
+        @Override
+        public MutationId deserialize(DataInputPlus in, int userVersion) 
throws IOException
+        {
+            long logId = in.readLong();
+            long sequenceId = in.readLong();
+            return new MutationId(logId, sequenceId);
+        }
+
+        @Override
+        public MutationId deserialize(ByteBuffer buffer, int position, int 
userVersion)
+        {
+            long logId = buffer.getLong(position + LOG_ID_OFFSET);
+            long sequenceId = buffer.getLong(position + SEQUENCE_ID_OFFSET);
+            return new MutationId(logId, sequenceId);
+        }
+
+        @Override
+        public MutationId deserialize(ByteBuffer buffer, int userVersion)
+        {
+            long logId = buffer.getLong();
+            long sequenceId = buffer.getLong();
+            return new MutationId(logId, sequenceId);
+        }
+
+        @Override
+        public void updateChecksum(Checksum crc, MutationId id, int 
userVersion)
+        {
+            FBUtilities.updateChecksumLong(crc, id.logId);
+            FBUtilities.updateChecksumLong(crc, id.sequenceId);
+        }
+
+        @Override
+        public int compareWithKeyAt(MutationId id, ByteBuffer buffer, int 
position, int userVersion)
+        {
+            int cmp = Long.compare(id.logId, buffer.getLong(position + 
LOG_ID_OFFSET));
+            return cmp != 0 ? cmp : Long.compare(id.sequenceId, 
buffer.getLong(position + SEQUENCE_ID_OFFSET));
+        }
+
+        @Override
+        public int compare(MutationId id1, MutationId id2)
+        {
+            int cmp = Long.compare(id1.logId, id2.logId);
+            return cmp != 0 ? cmp : Long.compare(id1.sequenceId, 
id2.sequenceId);
+        }
+    }
+
+    static class MutationSerializer implements ValueSerializer<MutationId, 
Mutation>
+    {
+        @Override
+        public void serialize(MutationId id, Mutation mutation, DataOutputPlus 
out, int userVersion) throws IOException
+        {
+            Mutation.serializer.serialize(mutation, out, userVersion);
+        }
+
+        @Override
+        public Mutation deserialize(MutationId id, DataInputPlus in, int 
userVersion) throws IOException
+        {
+            return Mutation.serializer.deserialize(in, userVersion);
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/service/tracking/MutationJournalTest.java 
b/test/unit/org/apache/cassandra/service/tracking/MutationJournalTest.java
new file mode 100644
index 0000000000..55e97f57bd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/tracking/MutationJournalTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.tracking;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.journal.TestParams;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests to sanity-check the integration points with Journal
+ * (mutation id and mutation ser/de, comparison, etc.)
+ */
+public class MutationJournalTest
+{
+    private static final String KEYSPACE = "mjtks";
+    private static final String TABLE = "mjtt";
+
+    private static MutationJournal journal;
+
+    @BeforeClass
+    public static void setUp() throws IOException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(3),
+                                    TableMetadata.builder(KEYSPACE, TABLE)
+                                                 .addPartitionKeyColumn("pk", 
UTF8Type.instance)
+                                                 .addClusteringColumn("ck", 
UTF8Type.instance)
+                                                 .addRegularColumn("value", 
UTF8Type.instance)
+                                                 .build());
+
+        File directory = new 
File(Files.createTempDirectory("mutation-journal-test-simple"));
+        directory.deleteRecursiveOnExit();
+
+        journal = new MutationJournal(directory, TestParams.INSTANCE);
+        journal.start();
+    }
+
+    @AfterClass
+    public static void tearDown()
+    {
+        journal.shutdownBlocking();
+    }
+
+    @Test
+    public void testWriteOneReadOne()
+    {
+        Mutation expected =
+            new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, 
TABLE), 0, "key")
+               .clustering("ck")
+               .add("value", "value")
+               .build();
+
+        MutationId id = new MutationId(100L, 0);
+        journal.write(id, expected);
+
+        // regular read
+        Mutation actual = journal.read(id);
+        assertMutationEquals(expected, actual);
+
+        // read via RecordConsumer
+        journal.read(id, ((segment, position, key, buffer, userVersion) ->
+                          {
+                              assertEquals(id, key);
+                              assertEquals(serialize(expected), buffer);
+                          }));
+    }
+
+    @Test
+    public void testWriteManyReadMany()
+    {
+        Mutation expected1 =
+            new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, 
TABLE), 0, "key1")
+               .clustering("ck1")
+               .add("value", "value1")
+               .build();
+        Mutation expected2 =
+            new RowUpdateBuilder(Schema.instance.getTableMetadata(KEYSPACE, 
TABLE), 0, "key2")
+               .clustering("ck2")
+               .add("value", "value2")
+               .build();
+        List<Mutation> expected = List.of(expected1, expected2);
+
+        MutationId id1 = new MutationId(100L, 1);
+        MutationId id2 = new MutationId(100L, 2);
+        List<MutationId> ids = List.of(id1, id2);
+
+        journal.write(id1, expected1);
+        journal.write(id2, expected2);
+
+        List<Mutation> actual = new ArrayList<>();
+        journal.readAll(ids, actual);
+        assertMutationsEqual(expected, actual);
+    }
+
+    private static void assertMutationEquals(Mutation expected, Mutation 
actual)
+    {
+        assertEquals(serialize(expected), serialize(actual));
+    }
+
+    private static void assertMutationsEqual(List<Mutation> expected, 
List<Mutation> actual)
+    {
+        assertEquals(expected.size(), actual.size());
+        for (int i = 0; i < expected.size(); i++)
+            assertMutationEquals(expected.get(i), actual.get(i));
+    }
+
+    private static ByteBuffer serialize(Mutation mutation)
+    {
+        try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
+        {
+            Mutation.serializer.serialize(mutation, out, 
MessagingService.maximum_version);
+            return out.asNewBuffer();
+        }
+        catch (IOException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to