This is an automated email from the ASF dual-hosted git repository.

aber pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new 4ea4d893c4 CEP-45: Add virtual tables for Mutation Tracking
4ea4d893c4 is described below

commit 4ea4d893c4b30d14175b06dccc16d062797c34ca
Author: Aparna Naik <[email protected]>
AuthorDate: Wed Dec 10 16:19:31 2025 -0500

    CEP-45: Add virtual tables for Mutation Tracking
    
    patch by Aparna Naik; reviewed by Abe Ratnofsky and Blake Eggleston for 
CASSANDRA-20987
---
 .../cassandra/db/virtual/MutationJournalTable.java |  80 ++++++++++
 .../db/virtual/MutationTrackingShardsTable.java    | 110 ++++++++++++++
 .../cassandra/db/virtual/SystemViewsKeyspace.java  |   2 +
 .../apache/cassandra/journal/ActiveSegment.java    |   7 +
 src/java/org/apache/cassandra/journal/Journal.java |   5 +
 src/java/org/apache/cassandra/journal/Segment.java |  13 ++
 .../apache/cassandra/journal/StaticSegment.java    |  12 ++
 .../cassandra/replication/CoordinatorLog.java      |  40 +++++
 .../cassandra/replication/MutationJournal.java     |   5 +
 .../replication/MutationTrackingService.java       |  17 +++
 .../org/apache/cassandra/replication/Shard.java    |  31 ++++
 .../db/virtual/MutationJournalTableTest.java       | 119 +++++++++++++++
 .../virtual/MutationTrackingShardsTableTest.java   | 161 +++++++++++++++++++++
 .../org/apache/cassandra/journal/SegmentsTest.java |   2 +
 14 files changed, 604 insertions(+)

diff --git a/src/java/org/apache/cassandra/db/virtual/MutationJournalTable.java 
b/src/java/org/apache/cassandra/db/virtual/MutationJournalTable.java
new file mode 100644
index 0000000000..603e6d521b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/MutationJournalTable.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.marshal.BooleanType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.journal.ActiveSegment;
+import org.apache.cassandra.journal.Segment;
+import org.apache.cassandra.replication.MutationJournal;
+import org.apache.cassandra.replication.ShortMutationId;
+import org.apache.cassandra.schema.TableMetadata;
+
+final class MutationJournalTable extends AbstractVirtualTable
+{
+    private static final String SEGMENT_ID = "segment_id";
+    private static final String IS_ACTIVE = "is_active";
+    private static final String BYTES_ON_DISK = "bytes_on_disk";
+    private static final String RECORDS_COUNT = "records_count";
+    private static final String WRITTEN_TO = "written_to";
+    private static final String FSYNCED_TO = "fsynced_to";
+    private static final String NEEDS_REPLAY = "needs_replay";
+    private static final String FILE_PATH = "file_path";
+
+    MutationJournalTable(String keyspace)
+    {
+        super(TableMetadata.builder(keyspace, "mutation_journal")
+                           .comment("mutation journal segments and their 
contents")
+                           .kind(TableMetadata.Kind.VIRTUAL)
+                           .partitioner(new 
LocalPartitioner(LongType.instance))
+                           .addPartitionKeyColumn(SEGMENT_ID, 
LongType.instance)
+                           .addRegularColumn(IS_ACTIVE, BooleanType.instance)
+                           .addRegularColumn(BYTES_ON_DISK, LongType.instance)
+                           .addRegularColumn(RECORDS_COUNT, Int32Type.instance)
+                           .addRegularColumn(WRITTEN_TO, Int32Type.instance)
+                           .addRegularColumn(FSYNCED_TO, Int32Type.instance)
+                           .addRegularColumn(NEEDS_REPLAY, 
BooleanType.instance)
+                           .addRegularColumn(FILE_PATH, UTF8Type.instance)
+                           .build());
+    }
+
+    @Override
+    public DataSet data()
+    {
+        SimpleDataSet result = new SimpleDataSet(metadata());
+
+        for (Segment<ShortMutationId, Mutation> segment : 
MutationJournal.instance.getAllSegments())
+        {
+            result.row(segment.id())
+                  .column(IS_ACTIVE, segment instanceof ActiveSegment)
+                  .column(BYTES_ON_DISK, segment.segmentSizeOnDisk())
+                  .column(RECORDS_COUNT, segment.metadata().totalCount())
+                  .column(WRITTEN_TO, segment.writtenTo())
+                  .column(FSYNCED_TO, segment.fsyncedTo())
+                  .column(NEEDS_REPLAY, segment.metadata().needsReplay())
+                  .column(FILE_PATH, segment.filePath());
+        }
+
+        return result;
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/db/virtual/MutationTrackingShardsTable.java 
b/src/java/org/apache/cassandra/db/virtual/MutationTrackingShardsTable.java
new file mode 100644
index 0000000000..724b6a596d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/MutationTrackingShardsTable.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.replication.CoordinatorLog;
+import org.apache.cassandra.replication.CoordinatorLogId;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.replication.Shard;
+import org.apache.cassandra.schema.TableMetadata;
+
+import java.util.Map;
+
+public class MutationTrackingShardsTable extends AbstractVirtualTable
+{
+    private static final String KEYSPACE = "keyspace";
+    private static final String LOG_ID = "log_id";
+    private static final String RANGE_START = "range_start";
+    private static final String RANGE_END = "range_end";
+    private static final String LOCAL_NODE_ID = "local_node_id";
+    private static final String PARTICIPANTS = "participants";
+    private static final String WITNESSED_OFFSETS = "witnessed_offsets";
+    private static final String RECONCILED_OFFSETS = "reconciled_offsets";
+    private static final String PERSISTED_OFFSETS = "persisted_offsets";
+
+    MutationTrackingShardsTable(String keyspace) {
+        super(TableMetadata.builder(keyspace, "mutation_tracking_shards")
+                           .comment("mutation tracking shards and their offset 
information")
+                           .kind(TableMetadata.Kind.VIRTUAL).partitioner(new 
LocalPartitioner(UTF8Type.instance))
+                           .addPartitionKeyColumn(KEYSPACE, UTF8Type.instance)
+                           .addClusteringColumn(LOG_ID, UTF8Type.instance)
+                           .addClusteringColumn(RANGE_START, UTF8Type.instance)
+                           .addClusteringColumn(RANGE_END, UTF8Type.instance)
+                           .addRegularColumn(LOCAL_NODE_ID, Int32Type.instance)
+                           .addRegularColumn(PARTICIPANTS, UTF8Type.instance)
+                           .addRegularColumn(WITNESSED_OFFSETS, 
UTF8Type.instance)
+                           .addRegularColumn(RECONCILED_OFFSETS, 
UTF8Type.instance)
+                           .addRegularColumn(PERSISTED_OFFSETS, 
UTF8Type.instance)
+                           .build());
+    }
+
+    private void addShardRows(Shard shard, SimpleDataSet result)
+    {
+        Shard.DebugInfo shardDebugInfo = shard.getDebugInfo();
+        for (Map.Entry<CoordinatorLogId, CoordinatorLog.DebugInfo> entry : 
shardDebugInfo.logs.entrySet())
+        {
+            CoordinatorLogId logId = entry.getKey();
+            CoordinatorLog.DebugInfo logDebugInfo = entry.getValue();
+            result.row(shardDebugInfo.keyspace,
+                       logId.toString(),
+                       shardDebugInfo.range.left.toString(),
+                       shardDebugInfo.range.right.toString())
+                  .column(LOCAL_NODE_ID, shardDebugInfo.localNodeId)
+                  .column(PARTICIPANTS, shardDebugInfo.participants.toString())
+                  .column(WITNESSED_OFFSETS, logDebugInfo.witnessedOffsets)
+                  .column(RECONCILED_OFFSETS, logDebugInfo.reconciledOffsets)
+                  .column(PERSISTED_OFFSETS, logDebugInfo.persistedOffsets);
+        }
+    }
+
+    @Override
+    public DataSet data()
+    {
+        SimpleDataSet result = new SimpleDataSet(metadata());
+
+        for (Shard shard : MutationTrackingService.instance.getShards())
+        {
+            addShardRows(shard, result);
+        }
+
+        return result;
+    }
+
+    @Override
+    public DataSet data(DecoratedKey key)
+    {
+        String keyspaceName = UTF8Type.instance.compose(key.getKey());
+        SimpleDataSet result = new SimpleDataSet(metadata());
+
+        for (Shard shard : MutationTrackingService.instance.getShards())
+        {
+            Shard.DebugInfo debugInfo = shard.getDebugInfo();
+            if (!debugInfo.keyspace.equals(keyspaceName))
+                continue;
+
+            addShardRows(shard, result);
+        }
+
+        return result;
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index 28c6dc8fef..3e52c26ab1 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -67,6 +67,8 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
                     .add(new LocalTable(VIRTUAL_VIEWS))
                     .add(new ClusterMetadataLogTable(VIRTUAL_VIEWS))
                     .add(new ClusterMetadataDirectoryTable(VIRTUAL_VIEWS))
+                    .add(new MutationJournalTable(VIRTUAL_VIEWS))
+                    .add(new MutationTrackingShardsTable(VIRTUAL_VIEWS))
                     .addAll(LocalRepairTables.getAll(VIRTUAL_VIEWS))
                     .addAll(CIDRFilteringMetricsTable.getAll(VIRTUAL_VIEWS))
                     .addAll(StorageAttachedIndexTables.getAll(VIRTUAL_VIEWS))
diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java 
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index e84ca40bcf..fadec666ff 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -316,6 +316,13 @@ public final class ActiveSegment<K, V> extends Segment<K, 
V>
         return writtenTo;
     }
 
+    @Override
+    public int writtenTo()
+    {
+        return writtenTo;
+    }
+
+    @Override
     public int fsyncedTo()
     {
         return fsyncedTo;
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index c404c325be..61cac87561 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -945,6 +945,11 @@ public class Journal<K, V> implements Shutdownable
         return res;
     }
 
+    public List<Segment<K, V>> getAllSegments()
+    {
+        return segments().allSorted(true);
+    }
+
     public ActiveSegment<K, V> currentActiveSegment()
     {
         return currentSegment;
diff --git a/src/java/org/apache/cassandra/journal/Segment.java 
b/src/java/org/apache/cassandra/journal/Segment.java
index 9bc394c26f..31d37aaee7 100644
--- a/src/java/org/apache/cassandra/journal/Segment.java
+++ b/src/java/org/apache/cassandra/journal/Segment.java
@@ -83,6 +83,19 @@ public abstract class Segment<K, V> implements 
SelfRefCounted<Segment<K, V>>, Co
         return descriptor.timestamp;
     }
 
+    public long segmentSizeOnDisk()
+    {
+        return file.length();
+    }
+
+    public String filePath()
+    {
+        return file.absolutePath();
+    }
+
+    public abstract int writtenTo();
+    public abstract int fsyncedTo();
+
     public abstract void persistMetadata();
 
     /*
diff --git a/src/java/org/apache/cassandra/journal/StaticSegment.java 
b/src/java/org/apache/cassandra/journal/StaticSegment.java
index efc2c796aa..e752945392 100644
--- a/src/java/org/apache/cassandra/journal/StaticSegment.java
+++ b/src/java/org/apache/cassandra/journal/StaticSegment.java
@@ -309,6 +309,18 @@ public final class StaticSegment<K, V> extends Segment<K, 
V>
         SyncUtil.trySyncDir(descriptor.directory);
     }
 
+    @Override
+    public int writtenTo()
+    {
+        return fsyncLimit;
+    }
+
+    @Override
+    public int fsyncedTo()
+    {
+        return fsyncLimit;
+    }
+
     /**
      * Read the entry and specified offset into the entry holder.
      * Expects the record to have been written at this offset, but potentially 
not flushed and lost.
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index bd39863d67..56180c2594 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -547,6 +547,32 @@ public abstract class CoordinatorLog
         }
     }
 
+    public CoordinatorLogId getLogId()
+    {
+        return logId;
+    }
+
+    public DebugInfo getDebugState()
+    {
+        Map<Integer, List<Integer>> witnessed = new Int2ObjectHashMap<>();
+        Map<Integer, List<Integer>> persisted = new Int2ObjectHashMap<>();
+        String reconciledStr;
+
+        lock.readLock().lock();
+        try
+        {
+            witnessedOffsets.convertToPrimitiveMap(witnessed);
+            persistedOffsets.convertToPrimitiveMap(persisted);
+            reconciledStr = reconciledOffsets.toString();
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+
+        return new DebugInfo(witnessed.toString(), reconciledStr, 
persisted.toString());
+    }
+
     @Override
     public String toString()
     {
@@ -557,6 +583,20 @@ public abstract class CoordinatorLog
                '}';
     }
 
+    public static class DebugInfo
+    {
+        public final String witnessedOffsets;
+        public final String reconciledOffsets;
+        public final String persistedOffsets;
+
+        private DebugInfo(String witnessedOffsets, String reconciledOffsets, 
String persistedOffsets)
+        {
+            this.witnessedOffsets = witnessedOffsets;
+            this.reconciledOffsets = reconciledOffsets;
+            this.persistedOffsets = persistedOffsets;
+        }
+    }
+
     static class CoordinatorLogPrimary extends CoordinatorLog
     {
         private final AtomicLong sequenceId = new AtomicLong(-1);
diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java 
b/src/java/org/apache/cassandra/replication/MutationJournal.java
index 21ff7d49c6..6568452699 100644
--- a/src/java/org/apache/cassandra/replication/MutationJournal.java
+++ b/src/java/org/apache/cassandra/replication/MutationJournal.java
@@ -69,6 +69,11 @@ public class MutationJournal
     // in NBHM.
     private SegmentStateTracker lastSegmentTracker;
 
+    public Iterable<Segment<ShortMutationId, Mutation>> getAllSegments()
+    {
+        return journal.getAllSegments();
+    }
+
     public static class Snapshot implements AutoCloseable
     {
         private final Journal.Snapshot<ShortMutationId, Mutation> wrapped;
diff --git 
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java 
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index aae8fe4fa3..28f3d6533c 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -478,6 +478,23 @@ public class MutationTrackingService
         return count[0];
     }
 
+    public Iterable<Shard> getShards()
+    {
+        List<Shard> shards = new ArrayList<>();
+        shardLock.readLock().lock();
+        try
+        {
+            keyspaceShards.forEach((keyspace, ksShards) -> {
+                ksShards.forEachShard(shards::add);
+            });
+        }
+        finally
+        {
+            shardLock.readLock().unlock();
+        }
+        return shards;
+    }
+
     public void collectLocallyMissingMutations(MutationSummary remoteSummary, 
Log2OffsetsMap.Mutable into)
     {
         shardLock.readLock().lock();
diff --git a/src/java/org/apache/cassandra/replication/Shard.java 
b/src/java/org/apache/cassandra/replication/Shard.java
index a264b0141c..5f7f1e7ee6 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -19,8 +19,11 @@ package org.apache.cassandra.replication;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.function.BiConsumer;
 import java.util.function.LongSupplier;
 
@@ -402,4 +405,32 @@ public class Shard
     {
         logs.values().forEach(log -> keyspaceBuilder.put(log.logId, 
log.collectReconciledOffsets(), range));
     }
+
+    public DebugInfo getDebugInfo()
+    {
+        SortedMap<CoordinatorLogId, CoordinatorLog.DebugInfo> logDebugState = 
new TreeMap<>(Comparator.comparing(CoordinatorLogId::asLong));
+        for (CoordinatorLog log : logs.values())
+        {
+            logDebugState.put(log.getLogId(), log.getDebugState());
+        }
+        return new DebugInfo(keyspace, range, localNodeId, participants, 
logDebugState);
+    }
+
+    public static class DebugInfo
+    {
+        public final String keyspace;
+        public final Range<Token> range;
+        public final int localNodeId;
+        public final Participants participants;
+        public final SortedMap<CoordinatorLogId, CoordinatorLog.DebugInfo> 
logs;
+
+        private DebugInfo(String keyspace, Range<Token> range, int 
localNodeId, Participants participants, SortedMap<CoordinatorLogId, 
CoordinatorLog.DebugInfo> logs)
+        {
+            this.keyspace = keyspace;
+            this.range = range;
+            this.localNodeId = localNodeId;
+            this.participants = participants;
+            this.logs = logs;
+        }
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/MutationJournalTableTest.java 
b/test/unit/org/apache/cassandra/db/virtual/MutationJournalTableTest.java
new file mode 100644
index 0000000000..42f40fe7e7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/virtual/MutationJournalTableTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.replication.MutationJournal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MutationJournalTableTest extends CQLTester
+{
+    private static final String KS_NAME = "vts";
+
+    @BeforeClass
+    public static void setUpClass()
+    {
+        CQLTester.setUpClass();
+        MutationJournalTable table = new MutationJournalTable(KS_NAME);
+        VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, 
ImmutableList.of(table)));
+    }
+
+    @Before
+    public void setUp()
+    {
+        schemaChange("CREATE TABLE " + KEYSPACE + ".tbl(pk int PRIMARY KEY, v 
int)");
+    }
+
+    @Test
+    public void testSelectAll() throws Throwable
+    {
+        // Start the mutation journal
+        MutationJournal.instance.start();
+
+        // Write data to trigger journal writes
+        for (int i = 0; i < 100; i++)
+        {
+            execute("INSERT INTO " + KEYSPACE + ".tbl(pk, v) VALUES (?, ?)", 
i, i);
+        }
+
+        // Query the virtual table
+        ResultSet result = executeNet("SELECT * FROM vts.mutation_journal");
+
+        // Verify the existence of all columns
+        assertThat(result.getColumnDefinitions().asList()
+                         .stream()
+                         .map(ColumnDefinitions.Definition::getName)
+                         .collect(Collectors.toSet()))
+            .containsAll(Arrays.asList(
+                "segment_id",
+                "is_active",
+                "bytes_on_disk",
+                "records_count",
+                "written_to",
+                "fsynced_to",
+                "needs_replay",
+                "file_path"
+            ));
+
+        boolean foundSegments = false;
+        boolean foundActiveSegment = false;
+
+        for (Row r : result)
+        {
+            foundSegments = true;
+
+            // Extract all columns
+            long segmentId = r.getLong("segment_id");
+            boolean isActive = r.getBool("is_active");
+            long sizeBytes = r.getLong("bytes_on_disk");
+            int recordsCount = r.getInt("records_count");
+            int writtenTo = r.getInt("written_to");
+            int fsyncedTo = r.getInt("fsynced_to");
+            r.getBool("needs_replay"); // Just verify it's accessible
+            String filePath = r.getString("file_path");
+
+            assertThat(segmentId).isGreaterThan(0L);
+
+            if (isActive) { foundActiveSegment = true; }
+
+            assertThat(sizeBytes).isGreaterThan(0L);
+            assertThat(recordsCount).isGreaterThanOrEqualTo(0);
+            assertThat(writtenTo).isGreaterThanOrEqualTo(0);
+            assertThat(fsyncedTo).isGreaterThanOrEqualTo(0);
+            assertThat(fsyncedTo).isLessThanOrEqualTo(writtenTo);
+        }
+
+        assertThat(foundSegments).isTrue();
+        assertThat(foundActiveSegment).isTrue();
+    }
+}
+
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java
 
b/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java
new file mode 100644
index 0000000000..221daa2353
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.replication.MutationJournal;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.tcm.ClusterMetadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MutationTrackingShardsTableTest extends CQLTester
+{
+    private static final String KS_NAME = "vts";
+
+    @BeforeClass
+    public static void setUpClass()
+    {
+        CQLTester.setUpClass();
+        MutationTrackingShardsTable table = new 
MutationTrackingShardsTable(KS_NAME);
+        VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, 
ImmutableList.of(table)));
+    }
+
+    @Before
+    public void setUp()
+    {
+        // Start required services for mutation tracking
+        MutationJournal.instance.start();
+        MutationTrackingService.instance.start(ClusterMetadata.current());
+
+        // Create a tracked keyspace
+        schemaChange("CREATE KEYSPACE IF NOT EXISTS tracked_ks WITH 
replication = " +
+                     "{'class': 'SimpleStrategy', 'replication_factor': 1} AND 
replication_type='tracked'");
+
+        // Create a table in the tracked keyspace
+        schemaChange("CREATE TABLE tracked_ks.tbl(" +
+                     "pk int PRIMARY KEY, " +
+                     "v int)");
+    }
+
+    @After
+    public void tearDown() throws InterruptedException
+    {
+        // Shutdown the service to prevent test hanging
+        MutationTrackingService.instance.shutdownBlocking();
+    }
+
+    @Test
+    public void testSelectAll()
+    {
+        // Write data to trigger shard creation
+        for (int i = 0; i < 100; i++)
+        {
+            execute("INSERT INTO tracked_ks.tbl(pk, v) VALUES (?, ?)", i, i);
+        }
+
+        // Query the virtual table
+        ResultSet result = executeNet("SELECT * FROM 
vts.mutation_tracking_shards");
+
+        // Verify the existence of all columns
+        assertThat(result.getColumnDefinitions().asList()
+                         .stream()
+                         .map(ColumnDefinitions.Definition::getName)
+                         .collect(Collectors.toSet()))
+            .containsAll(Arrays.asList(
+                "keyspace",
+                "range_start",
+                "range_end",
+                "log_id",
+                "local_node_id",
+                "participants",
+                "witnessed_offsets",
+                "reconciled_offsets",
+                "persisted_offsets"
+            ));
+
+        boolean foundShards = false;
+
+        for (Row r : result)
+        {
+            foundShards = true;
+
+            // Extract all columns
+            String keyspace = r.getString("keyspace");
+            String rangeStart = r.getString("range_start");
+            String rangeEnd = r.getString("range_end");
+            String logId = r.getString("log_id");
+            int localNodeId = r.getInt("local_node_id");
+            String participants = r.getString("participants");
+            String witnessedOffsets = r.getString("witnessed_offsets");
+            String reconciledOffsets = r.getString("reconciled_offsets");
+            String persistedOffsets = r.getString("persisted_offsets");
+
+            assertThat(keyspace).isNotNull();
+
+            assertThat(rangeStart).isNotNull();
+            assertThat(rangeEnd).isNotNull();
+
+            assertThat(logId).isNotNull();
+
+            assertThat(localNodeId).isGreaterThanOrEqualTo(0);
+
+            assertThat(participants).isNotNull(); // should show replica node 
IDs
+
+            assertThat(witnessedOffsets).isNotNull();
+            assertThat(reconciledOffsets).isNotNull();
+            assertThat(persistedOffsets).isNotNull();
+        }
+
+        assertThat(foundShards).isTrue();
+    }
+
+    @Test
+    public void testSelectKeyspace()
+    {
+        // Write data to trigger shard creation
+        for (int i = 0; i < 100; i++)
+        {
+            execute("INSERT INTO tracked_ks.tbl(pk, v) VALUES (?, ?)", i, i);
+        }
+
+        ResultSet empty = executeNet("SELECT * FROM 
vts.mutation_tracking_shards WHERE \"keyspace\" = 'doesnotexist'");
+        Assertions.assertThat(empty.all()).isEmpty();
+
+        ResultSet result = executeNet("SELECT * FROM 
vts.mutation_tracking_shards WHERE \"keyspace\" = 'tracked_ks'");
+        List<Row> rows = result.all();
+        Assertions.assertThat(rows).isNotEmpty();
+        rows.forEach(row -> 
Assertions.assertThat(row.getString("keyspace")).isEqualTo("tracked_ks"));
+    }
+}
diff --git a/test/unit/org/apache/cassandra/journal/SegmentsTest.java 
b/test/unit/org/apache/cassandra/journal/SegmentsTest.java
index 2e029f5edd..cbb97421b9 100644
--- a/test/unit/org/apache/cassandra/journal/SegmentsTest.java
+++ b/test/unit/org/apache/cassandra/journal/SegmentsTest.java
@@ -116,6 +116,8 @@ public class SegmentsTest
         }
 
         @Override boolean isFlushed(long position) { throw new 
UnsupportedOperationException(); }
+        @Override public int writtenTo() { return 0; }
+        @Override public int fsyncedTo() { return 0; }
         @Override public void persistMetadata() { throw new 
UnsupportedOperationException(); }
         @Override boolean read(int offset, int size, 
EntrySerializer.EntryHolder<K> into)  { throw new 
UnsupportedOperationException(); }
         @Override public void readAll(RecordConsumer<K> consumer) { throw new 
UnsupportedOperationException(); }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to