This is an automated email from the ASF dual-hosted git repository.
aber pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new 7f4bac3efb Fix failing test:
org.apache.cassandra.replication.CoordinatorLogOffsetsTest#reconciledBounds
7f4bac3efb is described below
commit 7f4bac3efb08fad610d8a90d8856ed4f04416229
Author: Abe Ratnofsky <[email protected]>
AuthorDate: Mon Sep 29 21:04:41 2025 -0400
Fix failing test:
org.apache.cassandra.replication.CoordinatorLogOffsetsTest#reconciledBounds
---
.../replication/CoordinatorLogOffsetsTest.java | 37 +++++++++++++++++++++-
1 file changed, 36 insertions(+), 1 deletion(-)
diff --git
a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
index 07f4bc5eaf..253d81526b 100644
--- a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
+++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java
@@ -20,8 +20,11 @@ package org.apache.cassandra.replication;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
import org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -49,7 +52,10 @@ import org.apache.cassandra.net.MessagingService;
import org.assertj.core.api.Assertions;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -263,6 +269,7 @@ public class CoordinatorLogOffsetsTest
ClusterMetadataTestHelper.createKeyspace(ks, KeyspaceParams.simple(3,
ReplicationType.tracked));
ClusterMetadataTestHelper.commit(new
AlterSchema(SchemaTransformations.addTable(tableMetadata, false)));
+ CommitLog.instance.start();
MutationTrackingService.instance.start(metadata);
// Eventually, will also run perturbations before checking
isReconciled (like log truncation, durability, etc.)
@@ -276,10 +283,16 @@ public class CoordinatorLogOffsetsTest
MutationTrackingService.instance.finishWriting(mutation);
MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr2);
MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr3);
+ MutationTrackingService.instance.persistLogStateForTesting();
ImmutableCoordinatorLogOffsets logOffsets = new
ImmutableCoordinatorLogOffsets.Builder()
.add(mutation.id())
.build();
+ Range<Token> range = getShardRange(mutation);
+ List<? extends Offsets> offsets =
Collections.singletonList(logOffsets.offsets(mutation.id().logId()));
+ MutationTrackingService.instance.updateReplicatedOffsets(ks,
range, offsets, true, addr2);
+ MutationTrackingService.instance.updateReplicatedOffsets(ks,
range, offsets, true, addr3);
+
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isTrue();
}
@@ -288,6 +301,7 @@ public class CoordinatorLogOffsetsTest
Mutation mutation =
MutationTrackingUtils.createMutation(tableMetadata, 2, 2);
MutationTrackingService.instance.startWriting(mutation);
MutationTrackingService.instance.finishWriting(mutation);
+ MutationTrackingService.instance.persistLogStateForTesting();
ImmutableCoordinatorLogOffsets logOffsets = new
ImmutableCoordinatorLogOffsets.Builder()
.add(mutation.id())
@@ -302,10 +316,17 @@ public class CoordinatorLogOffsetsTest
MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr2);
MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr3);
+ MutationTrackingService.instance.persistLogStateForTesting();
ImmutableCoordinatorLogOffsets logOffsets = new
ImmutableCoordinatorLogOffsets.Builder()
.add(mutation.id())
.build();
+
+ Range<Token> range = getShardRange(mutation);
+ List<? extends Offsets> offsets =
Collections.singletonList(logOffsets.offsets(mutation.id().logId()));
+ MutationTrackingService.instance.updateReplicatedOffsets(ks,
range, offsets, true, addr2);
+ MutationTrackingService.instance.updateReplicatedOffsets(ks,
range, offsets, true, addr3);
+
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isFalse();
}
@@ -317,6 +338,7 @@ public class CoordinatorLogOffsetsTest
MutationTrackingService.instance.finishWriting(mutation);
MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr2);
MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr3);
+ MutationTrackingService.instance.persistLogStateForTesting();
MutationId fakeMutationId = new
MutationId(CoordinatorLogId.asLong(111, 222), MutationId.sequenceId(333, 444));
Assertions.assertThat(metadata.directory.version(new
NodeId(fakeMutationId.hostId()))).isNull();
@@ -326,9 +348,22 @@ public class CoordinatorLogOffsetsTest
ImmutableCoordinatorLogOffsets.Builder logOffsetsBuilder = new
ImmutableCoordinatorLogOffsets.Builder();
logOffsetsBuilder.add(fakeMutationId);
-
Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsetsBuilder.build())).isFalse();
+ Assertions.assertThatThrownBy(() ->
MutationTrackingService.instance.isDurablyReconciled(logOffsetsBuilder.build()))
+ .hasSameClassAs(new IllegalStateException())
+ .hasMessageMatching("Could not find shard for logId \\d+");
}
MutationTrackingService.instance.shutdownBlocking();
+ CommitLog.instance.stopUnsafe(true);
+ }
+
+ private Range<Token> getShardRange(Mutation mutation)
+ {
+ Map<String, Range<Token>> ksRanges = new HashMap<>();
+ MutationTrackingService.instance.forEachKeyspace(shards -> {
+ Shard shard = shards.lookUp(mutation);
+ ksRanges.put(shard.keyspace, shard.range);
+ });
+ return ksRanges.get(mutation.getKeyspaceName());
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]