This is an automated email from the ASF dual-hosted git repository.

aber pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new 7f4bac3efb Fix failing test: 
org.apache.cassandra.replication.CoordinatorLogOffsetsTest#reconciledBounds
7f4bac3efb is described below

commit 7f4bac3efb08fad610d8a90d8856ed4f04416229
Author: Abe Ratnofsky <[email protected]>
AuthorDate: Mon Sep 29 21:04:41 2025 -0400

    Fix failing test: 
org.apache.cassandra.replication.CoordinatorLogOffsetsTest#reconciledBounds
---
 .../replication/CoordinatorLogOffsetsTest.java     | 37 +++++++++++++++++++++-
 1 file changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java 
b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
index 07f4bc5eaf..253d81526b 100644
--- a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
+++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
@@ -20,8 +20,11 @@ package org.apache.cassandra.replication;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
 import org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -49,7 +52,10 @@ import org.apache.cassandra.net.MessagingService;
 import org.assertj.core.api.Assertions;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -263,6 +269,7 @@ public class CoordinatorLogOffsetsTest
         ClusterMetadataTestHelper.createKeyspace(ks, KeyspaceParams.simple(3, 
ReplicationType.tracked));
         ClusterMetadataTestHelper.commit(new 
AlterSchema(SchemaTransformations.addTable(tableMetadata, false)));
 
+        CommitLog.instance.start();
         MutationTrackingService.instance.start(metadata);
 
         // Eventually, will also run perturbations before checking 
isReconciled (like log truncation, durability, etc.)
@@ -276,10 +283,16 @@ public class CoordinatorLogOffsetsTest
             MutationTrackingService.instance.finishWriting(mutation);
             
MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr2);
             
MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr3);
+            MutationTrackingService.instance.persistLogStateForTesting();
 
             ImmutableCoordinatorLogOffsets logOffsets = new 
ImmutableCoordinatorLogOffsets.Builder()
                     .add(mutation.id())
                     .build();
+            Range<Token> range = getShardRange(mutation);
+            List<? extends Offsets> offsets = 
Collections.singletonList(logOffsets.offsets(mutation.id().logId()));
+            MutationTrackingService.instance.updateReplicatedOffsets(ks, 
range, offsets, true, addr2);
+            MutationTrackingService.instance.updateReplicatedOffsets(ks, 
range, offsets, true, addr3);
+
             
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isTrue();
         }
 
@@ -288,6 +301,7 @@ public class CoordinatorLogOffsetsTest
             Mutation mutation = 
MutationTrackingUtils.createMutation(tableMetadata, 2, 2);
             MutationTrackingService.instance.startWriting(mutation);
             MutationTrackingService.instance.finishWriting(mutation);
+            MutationTrackingService.instance.persistLogStateForTesting();
 
             ImmutableCoordinatorLogOffsets logOffsets = new 
ImmutableCoordinatorLogOffsets.Builder()
                     .add(mutation.id())
@@ -302,10 +316,17 @@ public class CoordinatorLogOffsetsTest
 
             
MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr2);
             
MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr3);
+            MutationTrackingService.instance.persistLogStateForTesting();
 
             ImmutableCoordinatorLogOffsets logOffsets = new 
ImmutableCoordinatorLogOffsets.Builder()
                     .add(mutation.id())
                     .build();
+
+            Range<Token> range = getShardRange(mutation);
+            List<? extends Offsets> offsets = 
Collections.singletonList(logOffsets.offsets(mutation.id().logId()));
+            MutationTrackingService.instance.updateReplicatedOffsets(ks, 
range, offsets, true, addr2);
+            MutationTrackingService.instance.updateReplicatedOffsets(ks, 
range, offsets, true, addr3);
+
             
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isFalse();
         }
 
@@ -317,6 +338,7 @@ public class CoordinatorLogOffsetsTest
             MutationTrackingService.instance.finishWriting(mutation);
             
MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr2);
             
MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr3);
+            MutationTrackingService.instance.persistLogStateForTesting();
 
             MutationId fakeMutationId = new 
MutationId(CoordinatorLogId.asLong(111, 222), MutationId.sequenceId(333, 444));
             Assertions.assertThat(metadata.directory.version(new 
NodeId(fakeMutationId.hostId()))).isNull();
@@ -326,9 +348,22 @@ public class CoordinatorLogOffsetsTest
 
             ImmutableCoordinatorLogOffsets.Builder logOffsetsBuilder = new 
ImmutableCoordinatorLogOffsets.Builder();
             logOffsetsBuilder.add(fakeMutationId);
-            
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsetsBuilder.build())).isFalse();
+            Assertions.assertThatThrownBy(() -> 
MutationTrackingService.instance.isDurablyReconciled(logOffsetsBuilder.build()))
+                    .hasSameClassAs(new IllegalStateException())
+                    .hasMessageMatching("Could not find shard for logId \\d+");
         }
 
         MutationTrackingService.instance.shutdownBlocking();
+        CommitLog.instance.stopUnsafe(true);
+    }
+
+    private Range<Token> getShardRange(Mutation mutation)
+    {
+        Map<String, Range<Token>> ksRanges = new HashMap<>();
+        MutationTrackingService.instance.forEachKeyspace(shards -> {
+            Shard shard = shards.lookUp(mutation);
+            ksRanges.put(shard.keyspace, shard.range);
+        });
+        return ksRanges.get(mutation.getKeyspaceName());
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to