This is an automated email from the ASF dual-hosted git repository.
aber pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new 4b5bed734d CEP-45: Add metrics for Mutation Tracking
4b5bed734d is described below
commit 4b5bed734d34e47ce7dcdf0b3bc8e3bbf36d285e
Author: Aparna Naik <[email protected]>
AuthorDate: Wed Nov 5 15:42:12 2025 -0800
CEP-45: Add metrics for Mutation Tracking
patch by Aparna Naik; reviewed by Abe Ratnofsky and Blake Eggleston for
CASSANDRA-20986
---
src/java/org/apache/cassandra/journal/Journal.java | 16 +
.../metrics/CassandraMetricsRegistry.java | 1 +
.../cassandra/metrics/MutationTrackingMetrics.java | 57 ++++
.../cassandra/replication/CoordinatorLog.java | 26 ++
.../cassandra/replication/MutationJournal.java | 5 +
.../replication/MutationTrackingService.java | 23 +-
.../org/apache/cassandra/replication/Shard.java | 10 +
.../replication/UnreconciledMutations.java | 5 +
.../test/tracking/MutationTrackingMetricsTest.java | 357 +++++++++++++++++++++
9 files changed, 498 insertions(+), 2 deletions(-)
diff --git a/src/java/org/apache/cassandra/journal/Journal.java
b/src/java/org/apache/cassandra/journal/Journal.java
index 136d70a9dc..c404c325be 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -832,6 +832,22 @@ public class Journal<K, V> implements Shutdownable
}
}
+ public long getDiskSpaceUsed()
+ {
+ long totalSize = 0;
+
+ try (ReferencedSegments<K, V> refs = selectAndReference(s -> true))
+ {
+ for (Segment<K, V> segment : refs.all())
+ {
+ File dataFile = segment.descriptor.fileFor(Component.DATA);
+ if (dataFile.exists())
+ totalSize += dataFile.length();
+ }
+ }
+ return totalSize;
+ }
+
private ActiveSegment<K, V> createSegment()
{
Descriptor descriptor = Descriptor.create(directory,
nextSegmentId.getAndIncrement(), params.userVersion());
diff --git
a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 44bcf2d6cf..133acedcbf 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -139,6 +139,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
.add(KeyspaceMetrics.TYPE_NAME)
.add(MemtablePool.TYPE_NAME)
.add(MessagingMetrics.TYPE_NAME)
+ .add(MutationTrackingMetrics.TYPE_NAME)
.add(MutualTlsMetrics.TYPE_NAME)
.add(PaxosMetrics.TYPE_NAME)
.add(ReadRepairMetrics.TYPE_NAME)
diff --git a/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java
b/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java
new file mode 100644
index 0000000000..822d7c5ac1
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import org.apache.cassandra.replication.MutationJournal;
+import org.apache.cassandra.replication.MutationTrackingService;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class MutationTrackingMetrics
+{
+ public static final String TYPE_NAME = "MutationTracking";
+ private static final MetricNameFactory factory = new
DefaultNameFactory(TYPE_NAME);
+
+ public static final MutationTrackingMetrics instance = new
MutationTrackingMetrics();
+
+ public final Counter broadcastOffsetsDiscovered; // Newly-witnessed
offsets discovered via broadcast
+ public final Counter writeTimeOffsetsDiscovered; // Newly-witnessed
offsets discovered at write time
+ public final Histogram readSummarySize; // Read summary sizes
+ public final Gauge<Long> unreconciledMutationCount; // Number of
unreconciled mutations
+ public final Gauge<Long> journalDiskSpaceUsed; // Size of MutationJournal
on disk
+
+ @SuppressWarnings("Convert2MethodRef")
+ private MutationTrackingMetrics()
+ {
+ broadcastOffsetsDiscovered =
Metrics.counter(factory.createMetricName("BroadcastOffsetsDiscovered"));
+ writeTimeOffsetsDiscovered =
Metrics.counter(factory.createMetricName("WriteTimeOffsetsDiscovered"));
+ readSummarySize =
Metrics.histogram(factory.createMetricName("ReadSummarySize"), true);
+ unreconciledMutationCount = Metrics.register(
+ factory.createMetricName("UnreconciledMutationCount"),
+ () ->
MutationTrackingService.instance.getUnreconciledMutationCount()
+ );
+ journalDiskSpaceUsed = Metrics.register(
+ factory.createMetricName("JournalDiskSpaceUsed"),
+ () -> MutationJournal.instance.getDiskSpaceUsed()
+ );
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index 9dac9af12a..bd39863d67 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.metrics.MutationTrackingMetrics;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.tcm.ClusterMetadata;
@@ -206,8 +207,14 @@ public abstract class CoordinatorLog
private void updateWitnessedReplicatedOffsets(Offsets offsets, int
onNodeId)
{
+ // Track newly-witnessed offsets from broadcasts (use array for lambda)
+ int[] newlyWitnessedCount = {0};
+
witnessedOffsets.get(onNodeId).addAll(offsets, (ignore, start, end) ->
{
+ // Count the newly-witnessed offsets in this range
+ newlyWitnessedCount[0] += (end - start + 1);
+
for (int offset = start; offset <= end; ++offset)
{
// TODO (desired): use the fact that Offsets are ordered to
optimise this look up
@@ -219,6 +226,9 @@ public abstract class CoordinatorLog
logger.trace("done applying WRO, now {}", witnessedOffsets);
}
});
+
+ // Record metric for newly witnessed offsets only
+
MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.inc(newlyWitnessedCount[0]);
}
private void updatePersistedReplicatedOffsets(Offsets offsets, int
onNodeId)
@@ -273,6 +283,19 @@ public abstract class CoordinatorLog
}
}
+ public long getUnreconciledCount()
+ {
+ lock.readLock().lock();
+ try
+ {
+ return unreconciledMutations.size();
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
boolean startWriting(Mutation mutation)
{
lock.writeLock().lock();
@@ -302,6 +325,9 @@ public abstract class CoordinatorLog
if (!witnessedOffsets.get(localNodeId).add(offset))
return;
+ // Track write-time discovery of newly-witnessed offset
+ MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.inc();
+
unreconciledMutations.finishWriting(mutation);
if (remoteReplicasWitnessed(offset))
diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java
b/src/java/org/apache/cassandra/replication/MutationJournal.java
index e2a2a82e64..21ff7d49c6 100644
--- a/src/java/org/apache/cassandra/replication/MutationJournal.java
+++ b/src/java/org/apache/cassandra/replication/MutationJournal.java
@@ -746,4 +746,9 @@ public class MutationJournal
{
return journal.countStaticSegmentsForTesting();
}
+
+ public long getDiskSpaceUsed()
+ {
+ return journal.getDiskSpaceUsed();
+ }
}
diff --git
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 5a65ca7d47..aae8fe4fa3 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -57,6 +57,7 @@ import org.apache.cassandra.exceptions.RequestFailure;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.MutationTrackingMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
@@ -419,7 +420,9 @@ public class MutationTrackingService
shardLock.readLock().lock();
try
{
- return getOrCreateShards(tableId).createSummaryForKey(key,
tableId, includePending);
+ MutationSummary summary =
getOrCreateShards(tableId).createSummaryForKey(key, tableId, includePending);
+
MutationTrackingMetrics.instance.readSummarySize.update(summary.size());
+ return summary;
}
finally
{
@@ -432,7 +435,9 @@ public class MutationTrackingService
shardLock.readLock().lock();
try
{
- return getOrCreateShards(tableId).createSummaryForRange(range,
tableId, includePending);
+ MutationSummary summary =
getOrCreateShards(tableId).createSummaryForRange(range, tableId,
includePending);
+
MutationTrackingMetrics.instance.readSummarySize.update(summary.size());
+ return summary;
}
finally
{
@@ -459,6 +464,20 @@ public class MutationTrackingService
}
}
+ public long getUnreconciledMutationCount()
+ {
+ if (!isStarted())
+ return 0L;
+
+ final long[] count = {0L};
+ forEachKeyspace(ks -> {
+ ks.forEachShard(shard -> {
+ count[0] += shard.getUnreconciledCount();
+ });
+ });
+ return count[0];
+ }
+
public void collectLocallyMissingMutations(MutationSummary remoteSummary,
Log2OffsetsMap.Mutable into)
{
shardLock.readLock().lock();
diff --git a/src/java/org/apache/cassandra/replication/Shard.java
b/src/java/org/apache/cassandra/replication/Shard.java
index 924ae7f357..a264b0141c 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -314,6 +314,16 @@ public class Shard
return getOrCreate(logId.asLong());
}
+ public long getUnreconciledCount()
+ {
+ long count = 0;
+ for (CoordinatorLog log : logs.values())
+ {
+ count += log.getUnreconciledCount();
+ }
+ return count;
+ }
+
@Nonnull
private CoordinatorLog get(CoordinatorLogId logId)
{
diff --git
a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
index 314e9303d7..22c1550230 100644
--- a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
+++ b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java
@@ -248,6 +248,11 @@ public class UnreconciledMutations
return statesMap.isEmpty();
}
+ public int size()
+ {
+ return statesMap.size();
+ }
+
static UnreconciledMutations loadFromJournal(Node2OffsetsMap
witnessedOffsets, int localNodeId)
{
UnreconciledMutations result = new UnreconciledMutations();
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java
new file mode 100644
index 0000000000..54d3c6b973
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tracking;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.metrics.MutationTrackingMetrics;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.replication.MutationJournal;
+import org.apache.cassandra.replication.MutationTrackingService;
+
+import org.awaitility.Awaitility;
+import org.junit.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MutationTrackingMetricsTest extends TestBaseImpl
+{
+ private static final String CREATE_KEYSPACE =
+ "CREATE KEYSPACE %s WITH replication = " +
+ "{'class': 'SimpleStrategy', 'replication_factor': 3} " +
+ "AND replication_type = 'tracked'";
+
+ private static final String CREATE_TABLE =
+ "CREATE TABLE %s.tbl (pk int PRIMARY KEY, val text)";
+
+ @Test(timeout = 60000)
+ @SuppressWarnings("Convert2MethodRef")
+ public void testWriteTimeOffsetsDiscoveredMetric() throws Throwable
+ {
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(cfg -> cfg.with(Feature.NETWORK)
+ .with(Feature.GOSSIP))
+ .start())
+ {
+ cluster.schemaChange(withKeyspace(CREATE_KEYSPACE));
+ cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE));
+
+ // Get initial write-time discovery counts on all nodes
+ long initialNode1Count = cluster.get(1).callOnInstance(() ->
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount());
+ long initialNode2Count = cluster.get(2).callOnInstance(() ->
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount());
+ long initialNode3Count = cluster.get(3).callOnInstance(() ->
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount());
+
+ // Perform writes with QUORUM - each write goes to at least 2
replicas
+ int numWrites = 10;
+ for (int i = 0; i < numWrites; i++)
+ {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO
%s.tbl (pk, val) VALUES (?, ?)"),
+ ConsistencyLevel.QUORUM, i, "test" + i);
+ }
+
+ // Wait for all nodes to discover offsets at write time
+ // With RF=3, each node should discover offsets and total should
be at least numWrites * 3
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(5))
+ .pollInterval(Duration.ofMillis(100))
+ .until(() -> {
+ long node1Delta = cluster.get(1).callOnInstance(()
-> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()) -
initialNode1Count;
+ long node2Delta = cluster.get(2).callOnInstance(()
-> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()) -
initialNode2Count;
+ long node3Delta = cluster.get(3).callOnInstance(()
-> MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount()) -
initialNode3Count;
+ long totalDiscovered = node1Delta + node2Delta +
node3Delta;
+
+ return node1Delta > 0 && node2Delta > 0 &&
node3Delta > 0 && totalDiscovered >= (long) numWrites * 3;
+ });
+
+ // Verify final counts
+ long afterNode1Count = cluster.get(1).callOnInstance(() ->
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount());
+ long afterNode2Count = cluster.get(2).callOnInstance(() ->
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount());
+ long afterNode3Count = cluster.get(3).callOnInstance(() ->
MutationTrackingMetrics.instance.writeTimeOffsetsDiscovered.getCount());
+
+ long node1Delta = afterNode1Count - initialNode1Count;
+ long node2Delta = afterNode2Count - initialNode2Count;
+ long node3Delta = afterNode3Count - initialNode3Count;
+
+ assertThat(node1Delta)
+ .as("Node 1 should have discovered offsets at write time")
+ .isGreaterThan(0L);
+
+ assertThat(node2Delta)
+ .as("Node 2 should have discovered offsets at write time")
+ .isGreaterThan(0L);
+
+ assertThat(node3Delta)
+ .as("Node 3 should have discovered offsets at write time")
+ .isGreaterThan(0L);
+
+ long totalDiscovered = node1Delta + node2Delta + node3Delta;
+ assertThat(totalDiscovered)
+ .as("Total write-time discoveries across all nodes should
be at least %d (RF=3)", numWrites * 3)
+ .isGreaterThanOrEqualTo((long) numWrites * 3);
+ }
+ }
+
+ @Test(timeout = 60000)
+ @SuppressWarnings("Convert2MethodRef")
+ public void testBroadcastOffsetsDiscoveredMetric() throws Throwable
+ {
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(cfg -> cfg.with(Feature.NETWORK)
+ .with(Feature.GOSSIP))
+ .start())
+ {
+ cluster.schemaChange(withKeyspace(CREATE_KEYSPACE));
+ cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE));
+
+ // Record initial broadcast metrics on receiving node 3 since we
are next going to block this node to from receiving mutations
+ long initialNode3Count = cluster.get(3).callOnInstance(() ->
MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount());
+
+ // Block node 3 from receiving mutation writes (but allow
broadcast messages)
+ cluster.filters().verbs(Verb.MUTATION_REQ.id).to(3).drop();
+
+ // Write data - nodes 1 and 2 will get it, node 3 won't
+ int numWrites = 5;
+ for (int i = 0; i < numWrites; i++)
+ {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO
%s.tbl (pk, val) VALUES (?, ?)"),
+ ConsistencyLevel.QUORUM, i, "test" + i);
+ }
+
+ // Verify node 3 missed the writes
+ Object[][] node3Before = cluster.coordinator(3).execute(
+ withKeyspace("SELECT * FROM %s.tbl"),
ConsistencyLevel.ONE);
+ assertThat(node3Before.length)
+ .as("Node 3 should have no data (was blocked)")
+ .isEqualTo(0);
+
+ // Broadcast offsets from node 1 to other nodes
+ // This tells node 3 about mutations it's missing
+ cluster.get(1).runOnInstance(() ->
MutationTrackingService.instance.broadcastOffsetsForTesting());
+
+ // Wait for broadcasts to propagate to node 3
+ long[] previousCount = {0};
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(5))
+ .pollInterval(Duration.ofMillis(100))
+ .until(() -> {
+ long currentCount = cluster.get(3).callOnInstance(()
-> MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount());
+ boolean hasDiscoveredOffsets = currentCount >
initialNode3Count;
+ boolean isStable = hasDiscoveredOffsets &&
currentCount == previousCount[0];
+ previousCount[0] = currentCount;
+ return isStable;
+ });
+
+ // Get the count after first broadcast
+ long afterFirstBroadcast = cluster.get(3).callOnInstance(() ->
MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount());
+
+ // Broadcast the same offsets again (duplicate) - should NOT
increment metric
+ cluster.get(1).runOnInstance(() ->
MutationTrackingService.instance.broadcastOffsetsForTesting());
+
+ // Wait for duplicate broadcast to propagate, then verify metric
stayed the same
+ // We poll to ensure the broadcast had time to arrive, then check
it didn't increment
+ Awaitility.await()
+ .pollDelay(Duration.ofMillis(200))
+ .atMost(Duration.ofSeconds(2))
+ .pollInterval(Duration.ofMillis(100))
+ .until(() -> {
+ long count = cluster.get(3).callOnInstance(() ->
MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount());
+ return count == afterFirstBroadcast; // Should
remain at the same value (duplicate doesn't increment)
+ });
+
+ // Clear filter to allow reconciliation
+ cluster.filters().reset();
+
+ // Read from node 3 to trigger reconciliation using broadcast
offsets
+ // Node 3 knows it's missing data (from broadcast offsets) and
will request it
+ // Poll for reconciliation to complete
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(10))
+ .pollInterval(Duration.ofMillis(200))
+ .until(() -> {
+ Object[][] result = cluster.coordinator(3).execute(
+ withKeyspace("SELECT * FROM %s.tbl"),
+ ConsistencyLevel.QUORUM);
+ return result.length == numWrites;
+ });
+
+ // Verify all rows data is present after reconciliation
+ Object[][] result = cluster.coordinator(3).execute(
+ withKeyspace("SELECT * FROM %s.tbl"),
+ ConsistencyLevel.QUORUM);
+ assertThat(result.length)
+ .as("Should return all rows after reconciliation")
+ .isEqualTo(numWrites);
+
+ // Check metrics after reconciliation - if reconciliation worked,
broadcasts happened
+ long afterNode3Count = cluster.get(3).callOnInstance(() ->
MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount());
+ long node3Delta = afterNode3Count - initialNode3Count;
+
+ // Node 3 was blocked before and now must have applied broadcast
offsets
+ assertThat(node3Delta)
+ .as("Node 3 should have applied broadcast offsets")
+ .isGreaterThan(0L);
+ }
+ }
+
+ @Test(timeout = 60000)
+ @SuppressWarnings("Convert2MethodRef")
+ public void testReadSummarySizeMetric() throws Throwable
+ {
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(cfg -> cfg.with(Feature.NETWORK)
+ .with(Feature.GOSSIP))
+ .start())
+ {
+ cluster.schemaChange(withKeyspace(CREATE_KEYSPACE));
+ cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE));
+
+ // Get initial metric value from coordinator node
+ long initialSize = cluster.get(1).callOnInstance(() ->
MutationTrackingMetrics.instance.readSummarySize.getCount());
+
+ // Insert test data
+ int numWrites = 10;
+ for (int i = 0; i < numWrites; i++)
+ {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO
%s.tbl (pk, val) VALUES (?, ?)"),
+ ConsistencyLevel.QUORUM, i, "test" + i);
+ }
+
+ // Execute read operations (metric should increment once per read
request)
+ int numReads = 10;
+ for (int i = 0; i < numReads; i++)
+ {
+ cluster.coordinator(1).execute(withKeyspace("SELECT * FROM
%s.tbl WHERE pk = ?"),
+ ConsistencyLevel.QUORUM, i);
+ }
+
+ // Verify metric incremented by at least twice the number of reads
as
+ // each read creates TWO summaries: initial (before read) +
secondary (after read)
+ // This is to detect concurrent writes during read execution for
proper reconciliation
+ long afterSize = cluster.get(1).callOnInstance(() ->
MutationTrackingMetrics.instance.readSummarySize.getCount());
+
+ long delta = afterSize - initialSize;
+ assertThat(delta)
+ .as("Should have at least twice of %d summaries", numReads)
+ .isGreaterThanOrEqualTo(2L * numReads);
+ }
+ }
+
+ @Test(timeout = 60000)
+ @SuppressWarnings("Convert2MethodRef")
+ public void testUnreconciledMutationCountMetric() throws Throwable
+ {
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(cfg -> cfg.with(Feature.NETWORK)
+ .with(Feature.GOSSIP))
+ .start())
+ {
+ cluster.schemaChange(withKeyspace(CREATE_KEYSPACE));
+ cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE));
+
+ // Get initial unreconciled count (should be 0)
+ long initialCount = cluster.get(1).callOnInstance(() ->
MutationTrackingMetrics.instance.unreconciledMutationCount.getValue());
+ assertThat(initialCount)
+ .as("Initial unreconciled count should be 0")
+ .isEqualTo(0L);
+
+ // Block node 3 from receiving messages from node 1
+ cluster.filters().verbs(Verb.MUTATION_REQ.id).from(1).to(3).drop();
+
+ // Write with QUORUM (only nodes 1 and 2 will receive writes, node
3 won't)
+ int numWrites = 10;
+ for (int i = 0; i < numWrites; i++)
+ {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO
%s.tbl (pk, val) VALUES (?, ?)"),
+ ConsistencyLevel.QUORUM, i, "test" + i);
+ }
+
+ // Node 1 should now have unreconciled mutations (since node 3
didn't get them)
+ long afterWrites = cluster.get(1).callOnInstance(() ->
MutationTrackingMetrics.instance.unreconciledMutationCount.getValue());
+ assertThat(afterWrites)
+ .as("Expected %d unreconciled mutations (node 3 blocked)",
numWrites)
+ .isEqualTo((long) numWrites);
+
+ // Clear filters to allow reconciliation
+ cluster.filters().reset();
+
+ // Perform reads to trigger reconciliation
+ for (int i = 0; i < numWrites; i++)
+ {
+ cluster.coordinator(1).execute(withKeyspace("SELECT * FROM
%s.tbl WHERE pk = ?"),
+ ConsistencyLevel.QUORUM, i);
+ }
+
+ // Wait for reconciliation to complete
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(5))
+ .pollInterval(Duration.ofMillis(100))
+ .until(() -> {
+ long count = cluster.get(1).callOnInstance(() ->
MutationTrackingMetrics.instance.unreconciledMutationCount.getValue());
+ return count == 0;
+ });
+
+ // Verify reconciliation actually happened
+ long afterReconcile = cluster.get(1).callOnInstance(() ->
MutationTrackingMetrics.instance.unreconciledMutationCount.getValue());
+ assertThat(afterReconcile)
+ .as("Unreconciled count should be 0 after reconciliation")
+ .isEqualTo(0L);
+ }
+ }
+
+ @Test(timeout = 60000)
+ @SuppressWarnings("Convert2MethodRef")
+ public void testJournalDiskSpaceUsedMetric() throws Throwable
+ {
+ try (Cluster cluster = Cluster.build(1)
+ .withConfig(cfg -> cfg.with(Feature.NETWORK)
+ .set("commitlog_segment_size", "1MiB")) // Create a
smaller size segment
+ .start())
+ {
+ cluster.schemaChange(withKeyspace(CREATE_KEYSPACE));
+ cluster.schemaChange(String.format(CREATE_TABLE, KEYSPACE));
+
+ // Get initial disk space - would be 2 * 1024 * 1024 as 2
segements are allocated by default
+ long initialSpace = cluster.get(1).callOnInstance(() ->
MutationTrackingMetrics.instance.journalDiskSpaceUsed.getValue());
+
+ // Write enough data to fill 1MiB segment and force new segment
creation
+ int numWrites = 200;
+ for (int i = 0; i < numWrites; i++)
+ {
+ cluster.coordinator(1).execute(
+ withKeyspace("INSERT INTO %s.tbl (pk, val) VALUES (?,
?)"),
+ ConsistencyLevel.ONE, i, "test-" + i);
+
+ // Close segment every 20 writes to create multiple segments
+ if (i % 20 == 0 && i > 0)
+ cluster.get(1).runOnInstance(() ->
MutationJournal.instance.closeCurrentSegmentForTestingIfNonEmpty());
+ }
+
+ // Verify disk space increased
+ long afterWrites = cluster.get(1).callOnInstance(() ->
MutationTrackingMetrics.instance.journalDiskSpaceUsed.getValue());
+
+ assertThat(afterWrites)
+ .as("Disk space should increase after writes: before=%d",
initialSpace)
+ .isGreaterThan(initialSpace);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]