This is an automated email from the ASF dual-hosted git repository.
aber pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new 4ea4d893c4 CEP-45: Add virtual tables for Mutation Tracking
4ea4d893c4 is described below
commit 4ea4d893c4b30d14175b06dccc16d062797c34ca
Author: Aparna Naik <[email protected]>
AuthorDate: Wed Dec 10 16:19:31 2025 -0500
CEP-45: Add virtual tables for Mutation Tracking
patch by Aparna Naik; reviewed by Abe Ratnofsky and Blake Eggleston for
CASSANDRA-20987
---
.../cassandra/db/virtual/MutationJournalTable.java | 80 ++++++++++
.../db/virtual/MutationTrackingShardsTable.java | 110 ++++++++++++++
.../cassandra/db/virtual/SystemViewsKeyspace.java | 2 +
.../apache/cassandra/journal/ActiveSegment.java | 7 +
src/java/org/apache/cassandra/journal/Journal.java | 5 +
src/java/org/apache/cassandra/journal/Segment.java | 13 ++
.../apache/cassandra/journal/StaticSegment.java | 12 ++
.../cassandra/replication/CoordinatorLog.java | 40 +++++
.../cassandra/replication/MutationJournal.java | 5 +
.../replication/MutationTrackingService.java | 17 +++
.../org/apache/cassandra/replication/Shard.java | 31 ++++
.../db/virtual/MutationJournalTableTest.java | 119 +++++++++++++++
.../virtual/MutationTrackingShardsTableTest.java | 161 +++++++++++++++++++++
.../org/apache/cassandra/journal/SegmentsTest.java | 2 +
14 files changed, 604 insertions(+)
diff --git a/src/java/org/apache/cassandra/db/virtual/MutationJournalTable.java
b/src/java/org/apache/cassandra/db/virtual/MutationJournalTable.java
new file mode 100644
index 0000000000..603e6d521b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/MutationJournalTable.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.marshal.BooleanType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.journal.ActiveSegment;
+import org.apache.cassandra.journal.Segment;
+import org.apache.cassandra.replication.MutationJournal;
+import org.apache.cassandra.replication.ShortMutationId;
+import org.apache.cassandra.schema.TableMetadata;
+
+final class MutationJournalTable extends AbstractVirtualTable
+{
+ private static final String SEGMENT_ID = "segment_id";
+ private static final String IS_ACTIVE = "is_active";
+ private static final String BYTES_ON_DISK = "bytes_on_disk";
+ private static final String RECORDS_COUNT = "records_count";
+ private static final String WRITTEN_TO = "written_to";
+ private static final String FSYNCED_TO = "fsynced_to";
+ private static final String NEEDS_REPLAY = "needs_replay";
+ private static final String FILE_PATH = "file_path";
+
+ MutationJournalTable(String keyspace)
+ {
+ super(TableMetadata.builder(keyspace, "mutation_journal")
+ .comment("mutation journal segments and their
contents")
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .partitioner(new
LocalPartitioner(LongType.instance))
+ .addPartitionKeyColumn(SEGMENT_ID,
LongType.instance)
+ .addRegularColumn(IS_ACTIVE, BooleanType.instance)
+ .addRegularColumn(BYTES_ON_DISK, LongType.instance)
+ .addRegularColumn(RECORDS_COUNT, Int32Type.instance)
+ .addRegularColumn(WRITTEN_TO, Int32Type.instance)
+ .addRegularColumn(FSYNCED_TO, Int32Type.instance)
+ .addRegularColumn(NEEDS_REPLAY,
BooleanType.instance)
+ .addRegularColumn(FILE_PATH, UTF8Type.instance)
+ .build());
+ }
+
+ @Override
+ public DataSet data()
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata());
+
+ for (Segment<ShortMutationId, Mutation> segment :
MutationJournal.instance.getAllSegments())
+ {
+ result.row(segment.id())
+ .column(IS_ACTIVE, segment instanceof ActiveSegment)
+ .column(BYTES_ON_DISK, segment.segmentSizeOnDisk())
+ .column(RECORDS_COUNT, segment.metadata().totalCount())
+ .column(WRITTEN_TO, segment.writtenTo())
+ .column(FSYNCED_TO, segment.fsyncedTo())
+ .column(NEEDS_REPLAY, segment.metadata().needsReplay())
+ .column(FILE_PATH, segment.filePath());
+ }
+
+ return result;
+ }
+}
diff --git
a/src/java/org/apache/cassandra/db/virtual/MutationTrackingShardsTable.java
b/src/java/org/apache/cassandra/db/virtual/MutationTrackingShardsTable.java
new file mode 100644
index 0000000000..724b6a596d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/MutationTrackingShardsTable.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.replication.CoordinatorLog;
+import org.apache.cassandra.replication.CoordinatorLogId;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.replication.Shard;
+import org.apache.cassandra.schema.TableMetadata;
+
+import java.util.Map;
+
+public class MutationTrackingShardsTable extends AbstractVirtualTable
+{
+ private static final String KEYSPACE = "keyspace";
+ private static final String LOG_ID = "log_id";
+ private static final String RANGE_START = "range_start";
+ private static final String RANGE_END = "range_end";
+ private static final String LOCAL_NODE_ID = "local_node_id";
+ private static final String PARTICIPANTS = "participants";
+ private static final String WITNESSED_OFFSETS = "witnessed_offsets";
+ private static final String RECONCILED_OFFSETS = "reconciled_offsets";
+ private static final String PERSISTED_OFFSETS = "persisted_offsets";
+
+ MutationTrackingShardsTable(String keyspace) {
+ super(TableMetadata.builder(keyspace, "mutation_tracking_shards")
+ .comment("mutation tracking shards and their offset
information")
+ .kind(TableMetadata.Kind.VIRTUAL).partitioner(new
LocalPartitioner(UTF8Type.instance))
+ .addPartitionKeyColumn(KEYSPACE, UTF8Type.instance)
+ .addClusteringColumn(LOG_ID, UTF8Type.instance)
+ .addClusteringColumn(RANGE_START, UTF8Type.instance)
+ .addClusteringColumn(RANGE_END, UTF8Type.instance)
+ .addRegularColumn(LOCAL_NODE_ID, Int32Type.instance)
+ .addRegularColumn(PARTICIPANTS, UTF8Type.instance)
+ .addRegularColumn(WITNESSED_OFFSETS,
UTF8Type.instance)
+ .addRegularColumn(RECONCILED_OFFSETS,
UTF8Type.instance)
+ .addRegularColumn(PERSISTED_OFFSETS,
UTF8Type.instance)
+ .build());
+ }
+
+ private void addShardRows(Shard shard, SimpleDataSet result)
+ {
+ Shard.DebugInfo shardDebugInfo = shard.getDebugInfo();
+ for (Map.Entry<CoordinatorLogId, CoordinatorLog.DebugInfo> entry :
shardDebugInfo.logs.entrySet())
+ {
+ CoordinatorLogId logId = entry.getKey();
+ CoordinatorLog.DebugInfo logDebugInfo = entry.getValue();
+ result.row(shardDebugInfo.keyspace,
+ logId.toString(),
+ shardDebugInfo.range.left.toString(),
+ shardDebugInfo.range.right.toString())
+ .column(LOCAL_NODE_ID, shardDebugInfo.localNodeId)
+ .column(PARTICIPANTS, shardDebugInfo.participants.toString())
+ .column(WITNESSED_OFFSETS, logDebugInfo.witnessedOffsets)
+ .column(RECONCILED_OFFSETS, logDebugInfo.reconciledOffsets)
+ .column(PERSISTED_OFFSETS, logDebugInfo.persistedOffsets);
+ }
+ }
+
+ @Override
+ public DataSet data()
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata());
+
+ for (Shard shard : MutationTrackingService.instance.getShards())
+ {
+ addShardRows(shard, result);
+ }
+
+ return result;
+ }
+
+ @Override
+ public DataSet data(DecoratedKey key)
+ {
+ String keyspaceName = UTF8Type.instance.compose(key.getKey());
+ SimpleDataSet result = new SimpleDataSet(metadata());
+
+ for (Shard shard : MutationTrackingService.instance.getShards())
+ {
+ Shard.DebugInfo debugInfo = shard.getDebugInfo();
+ if (!debugInfo.keyspace.equals(keyspaceName))
+ continue;
+
+ addShardRows(shard, result);
+ }
+
+ return result;
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index 28c6dc8fef..3e52c26ab1 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -67,6 +67,8 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
.add(new LocalTable(VIRTUAL_VIEWS))
.add(new ClusterMetadataLogTable(VIRTUAL_VIEWS))
.add(new ClusterMetadataDirectoryTable(VIRTUAL_VIEWS))
+ .add(new MutationJournalTable(VIRTUAL_VIEWS))
+ .add(new MutationTrackingShardsTable(VIRTUAL_VIEWS))
.addAll(LocalRepairTables.getAll(VIRTUAL_VIEWS))
.addAll(CIDRFilteringMetricsTable.getAll(VIRTUAL_VIEWS))
.addAll(StorageAttachedIndexTables.getAll(VIRTUAL_VIEWS))
diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index e84ca40bcf..fadec666ff 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -316,6 +316,13 @@ public final class ActiveSegment<K, V> extends Segment<K,
V>
return writtenTo;
}
+ @Override
+ public int writtenTo()
+ {
+ return writtenTo;
+ }
+
+ @Override
public int fsyncedTo()
{
return fsyncedTo;
diff --git a/src/java/org/apache/cassandra/journal/Journal.java
b/src/java/org/apache/cassandra/journal/Journal.java
index c404c325be..61cac87561 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -945,6 +945,11 @@ public class Journal<K, V> implements Shutdownable
return res;
}
+ public List<Segment<K, V>> getAllSegments()
+ {
+ return segments().allSorted(true);
+ }
+
public ActiveSegment<K, V> currentActiveSegment()
{
return currentSegment;
diff --git a/src/java/org/apache/cassandra/journal/Segment.java
b/src/java/org/apache/cassandra/journal/Segment.java
index 9bc394c26f..31d37aaee7 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -83,6 +83,19 @@ public abstract class Segment<K, V> implements
SelfRefCounted<Segment<K, V>>, Co
return descriptor.timestamp;
}
+ public long segmentSizeOnDisk()
+ {
+ return file.length();
+ }
+
+ public String filePath()
+ {
+ return file.absolutePath();
+ }
+
+ public abstract int writtenTo();
+ public abstract int fsyncedTo();
+
public abstract void persistMetadata();
/*
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index efc2c796aa..e752945392 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -309,6 +309,18 @@ public final class StaticSegment<K, V> extends Segment<K,
V>
SyncUtil.trySyncDir(descriptor.directory);
}
+ @Override
+ public int writtenTo()
+ {
+ return fsyncLimit;
+ }
+
+ @Override
+ public int fsyncedTo()
+ {
+ return fsyncLimit;
+ }
+
/**
* Read the entry and specified offset into the entry holder.
* Expects the record to have been written at this offset, but potentially
not flushed and lost.
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index bd39863d67..56180c2594 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -547,6 +547,32 @@ public abstract class CoordinatorLog
}
}
+ public CoordinatorLogId getLogId()
+ {
+ return logId;
+ }
+
+ public DebugInfo getDebugState()
+ {
+ Map<Integer, List<Integer>> witnessed = new Int2ObjectHashMap<>();
+ Map<Integer, List<Integer>> persisted = new Int2ObjectHashMap<>();
+ String reconciledStr;
+
+ lock.readLock().lock();
+ try
+ {
+ witnessedOffsets.convertToPrimitiveMap(witnessed);
+ persistedOffsets.convertToPrimitiveMap(persisted);
+ reconciledStr = reconciledOffsets.toString();
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+
+ return new DebugInfo(witnessed.toString(), reconciledStr,
persisted.toString());
+ }
+
@Override
public String toString()
{
@@ -557,6 +583,20 @@ public abstract class CoordinatorLog
'}';
}
+ public static class DebugInfo
+ {
+ public final String witnessedOffsets;
+ public final String reconciledOffsets;
+ public final String persistedOffsets;
+
+ private DebugInfo(String witnessedOffsets, String reconciledOffsets,
String persistedOffsets)
+ {
+ this.witnessedOffsets = witnessedOffsets;
+ this.reconciledOffsets = reconciledOffsets;
+ this.persistedOffsets = persistedOffsets;
+ }
+ }
+
static class CoordinatorLogPrimary extends CoordinatorLog
{
private final AtomicLong sequenceId = new AtomicLong(-1);
diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java
b/src/java/org/apache/cassandra/replication/MutationJournal.java
index 21ff7d49c6..6568452699 100644
--- a/src/java/org/apache/cassandra/replication/MutationJournal.java
+++ b/src/java/org/apache/cassandra/replication/MutationJournal.java
@@ -69,6 +69,11 @@ public class MutationJournal
// in NBHM.
private SegmentStateTracker lastSegmentTracker;
+ public Iterable<Segment<ShortMutationId, Mutation>> getAllSegments()
+ {
+ return journal.getAllSegments();
+ }
+
public static class Snapshot implements AutoCloseable
{
private final Journal.Snapshot<ShortMutationId, Mutation> wrapped;
diff --git
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index aae8fe4fa3..28f3d6533c 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -478,6 +478,23 @@ public class MutationTrackingService
return count[0];
}
+ public Iterable<Shard> getShards()
+ {
+ List<Shard> shards = new ArrayList<>();
+ shardLock.readLock().lock();
+ try
+ {
+ keyspaceShards.forEach((keyspace, ksShards) -> {
+ ksShards.forEachShard(shards::add);
+ });
+ }
+ finally
+ {
+ shardLock.readLock().unlock();
+ }
+ return shards;
+ }
+
public void collectLocallyMissingMutations(MutationSummary remoteSummary,
Log2OffsetsMap.Mutable into)
{
shardLock.readLock().lock();
diff --git a/src/java/org/apache/cassandra/replication/Shard.java
b/src/java/org/apache/cassandra/replication/Shard.java
index a264b0141c..5f7f1e7ee6 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -19,8 +19,11 @@ package org.apache.cassandra.replication;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
@@ -402,4 +405,32 @@ public class Shard
{
logs.values().forEach(log -> keyspaceBuilder.put(log.logId,
log.collectReconciledOffsets(), range));
}
+
+ public DebugInfo getDebugInfo()
+ {
+ SortedMap<CoordinatorLogId, CoordinatorLog.DebugInfo> logDebugState =
new TreeMap<>(Comparator.comparing(CoordinatorLogId::asLong));
+ for (CoordinatorLog log : logs.values())
+ {
+ logDebugState.put(log.getLogId(), log.getDebugState());
+ }
+ return new DebugInfo(keyspace, range, localNodeId, participants,
logDebugState);
+ }
+
+ public static class DebugInfo
+ {
+ public final String keyspace;
+ public final Range<Token> range;
+ public final int localNodeId;
+ public final Participants participants;
+ public final SortedMap<CoordinatorLogId, CoordinatorLog.DebugInfo>
logs;
+
+ private DebugInfo(String keyspace, Range<Token> range, int
localNodeId, Participants participants, SortedMap<CoordinatorLogId,
CoordinatorLog.DebugInfo> logs)
+ {
+ this.keyspace = keyspace;
+ this.range = range;
+ this.localNodeId = localNodeId;
+ this.participants = participants;
+ this.logs = logs;
+ }
+ }
}
diff --git
a/test/unit/org/apache/cassandra/db/virtual/MutationJournalTableTest.java
b/test/unit/org/apache/cassandra/db/virtual/MutationJournalTableTest.java
new file mode 100644
index 0000000000..42f40fe7e7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/virtual/MutationJournalTableTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.replication.MutationJournal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MutationJournalTableTest extends CQLTester
+{
+ private static final String KS_NAME = "vts";
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ CQLTester.setUpClass();
+ MutationJournalTable table = new MutationJournalTable(KS_NAME);
+ VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME,
ImmutableList.of(table)));
+ }
+
+ @Before
+ public void setUp()
+ {
+ schemaChange("CREATE TABLE " + KEYSPACE + ".tbl(pk int PRIMARY KEY, v
int)");
+ }
+
+ @Test
+ public void testSelectAll() throws Throwable
+ {
+ // Start the mutation journal
+ MutationJournal.instance.start();
+
+ // Write data to trigger journal writes
+ for (int i = 0; i < 100; i++)
+ {
+ execute("INSERT INTO " + KEYSPACE + ".tbl(pk, v) VALUES (?, ?)",
i, i);
+ }
+
+ // Query the virtual table
+ ResultSet result = executeNet("SELECT * FROM vts.mutation_journal");
+
+ // Verify the existence of all columns
+ assertThat(result.getColumnDefinitions().asList()
+ .stream()
+ .map(ColumnDefinitions.Definition::getName)
+ .collect(Collectors.toSet()))
+ .containsAll(Arrays.asList(
+ "segment_id",
+ "is_active",
+ "bytes_on_disk",
+ "records_count",
+ "written_to",
+ "fsynced_to",
+ "needs_replay",
+ "file_path"
+ ));
+
+ boolean foundSegments = false;
+ boolean foundActiveSegment = false;
+
+ for (Row r : result)
+ {
+ foundSegments = true;
+
+ // Extract all columns
+ long segmentId = r.getLong("segment_id");
+ boolean isActive = r.getBool("is_active");
+ long sizeBytes = r.getLong("bytes_on_disk");
+ int recordsCount = r.getInt("records_count");
+ int writtenTo = r.getInt("written_to");
+ int fsyncedTo = r.getInt("fsynced_to");
+ r.getBool("needs_replay"); // Just verify it's accessible
+ String filePath = r.getString("file_path");
+
+ assertThat(segmentId).isGreaterThan(0L);
+
+ if (isActive) { foundActiveSegment = true; }
+
+ assertThat(sizeBytes).isGreaterThan(0L);
+ assertThat(recordsCount).isGreaterThanOrEqualTo(0);
+ assertThat(writtenTo).isGreaterThanOrEqualTo(0);
+ assertThat(fsyncedTo).isGreaterThanOrEqualTo(0);
+ assertThat(fsyncedTo).isLessThanOrEqualTo(writtenTo);
+ }
+
+ assertThat(foundSegments).isTrue();
+ assertThat(foundActiveSegment).isTrue();
+ }
+}
+
diff --git
a/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java
b/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java
new file mode 100644
index 0000000000..221daa2353
--- /dev/null
+++
b/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.replication.MutationJournal;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.tcm.ClusterMetadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MutationTrackingShardsTableTest extends CQLTester
+{
+ private static final String KS_NAME = "vts";
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ CQLTester.setUpClass();
+ MutationTrackingShardsTable table = new
MutationTrackingShardsTable(KS_NAME);
+ VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME,
ImmutableList.of(table)));
+ }
+
+ @Before
+ public void setUp()
+ {
+ // Start required services for mutation tracking
+ MutationJournal.instance.start();
+ MutationTrackingService.instance.start(ClusterMetadata.current());
+
+ // Create a tracked keyspace
+ schemaChange("CREATE KEYSPACE IF NOT EXISTS tracked_ks WITH
replication = " +
+ "{'class': 'SimpleStrategy', 'replication_factor': 1} AND
replication_type='tracked'");
+
+ // Create a table in the tracked keyspace
+ schemaChange("CREATE TABLE tracked_ks.tbl(" +
+ "pk int PRIMARY KEY, " +
+ "v int)");
+ }
+
+ @After
+ public void tearDown() throws InterruptedException
+ {
+ // Shutdown the service to prevent test hanging
+ MutationTrackingService.instance.shutdownBlocking();
+ }
+
+ @Test
+ public void testSelectAll()
+ {
+ // Write data to trigger shard creation
+ for (int i = 0; i < 100; i++)
+ {
+ execute("INSERT INTO tracked_ks.tbl(pk, v) VALUES (?, ?)", i, i);
+ }
+
+ // Query the virtual table
+ ResultSet result = executeNet("SELECT * FROM
vts.mutation_tracking_shards");
+
+ // Verify the existence of all columns
+ assertThat(result.getColumnDefinitions().asList()
+ .stream()
+ .map(ColumnDefinitions.Definition::getName)
+ .collect(Collectors.toSet()))
+ .containsAll(Arrays.asList(
+ "keyspace",
+ "range_start",
+ "range_end",
+ "log_id",
+ "local_node_id",
+ "participants",
+ "witnessed_offsets",
+ "reconciled_offsets",
+ "persisted_offsets"
+ ));
+
+ boolean foundShards = false;
+
+ for (Row r : result)
+ {
+ foundShards = true;
+
+ // Extract all columns
+ String keyspace = r.getString("keyspace");
+ String rangeStart = r.getString("range_start");
+ String rangeEnd = r.getString("range_end");
+ String logId = r.getString("log_id");
+ int localNodeId = r.getInt("local_node_id");
+ String participants = r.getString("participants");
+ String witnessedOffsets = r.getString("witnessed_offsets");
+ String reconciledOffsets = r.getString("reconciled_offsets");
+ String persistedOffsets = r.getString("persisted_offsets");
+
+ assertThat(keyspace).isNotNull();
+
+ assertThat(rangeStart).isNotNull();
+ assertThat(rangeEnd).isNotNull();
+
+ assertThat(logId).isNotNull();
+
+ assertThat(localNodeId).isGreaterThanOrEqualTo(0);
+
+ assertThat(participants).isNotNull(); // should show replica node
IDs
+
+ assertThat(witnessedOffsets).isNotNull();
+ assertThat(reconciledOffsets).isNotNull();
+ assertThat(persistedOffsets).isNotNull();
+ }
+
+ assertThat(foundShards).isTrue();
+ }
+
+ @Test
+ public void testSelectKeyspace()
+ {
+ // Write data to trigger shard creation
+ for (int i = 0; i < 100; i++)
+ {
+ execute("INSERT INTO tracked_ks.tbl(pk, v) VALUES (?, ?)", i, i);
+ }
+
+ ResultSet empty = executeNet("SELECT * FROM
vts.mutation_tracking_shards WHERE \"keyspace\" = 'doesnotexist'");
+ Assertions.assertThat(empty.all()).isEmpty();
+
+ ResultSet result = executeNet("SELECT * FROM
vts.mutation_tracking_shards WHERE \"keyspace\" = 'tracked_ks'");
+ List<Row> rows = result.all();
+ Assertions.assertThat(rows).isNotEmpty();
+ rows.forEach(row ->
Assertions.assertThat(row.getString("keyspace")).isEqualTo("tracked_ks"));
+ }
+}
diff --git a/test/unit/org/apache/cassandra/journal/SegmentsTest.java
b/test/unit/org/apache/cassandra/journal/SegmentsTest.java
index 2e029f5edd..cbb97421b9 100644
--- a/test/unit/org/apache/cassandra/journal/SegmentsTest.java
+++ b/test/unit/org/apache/cassandra/journal/SegmentsTest.java
@@ -116,6 +116,8 @@ public class SegmentsTest
}
@Override boolean isFlushed(long position) { throw new
UnsupportedOperationException(); }
+ @Override public int writtenTo() { return 0; }
+ @Override public int fsyncedTo() { return 0; }
@Override public void persistMetadata() { throw new
UnsupportedOperationException(); }
@Override boolean read(int offset, int size,
EntrySerializer.EntryHolder<K> into) { throw new
UnsupportedOperationException(); }
@Override public void readAll(RecordConsumer<K> consumer) { throw new
UnsupportedOperationException(); }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]