This is an automated email from the ASF dual-hosted git repository. aleksey pushed a commit to branch cep-45-mutation-tracking in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by this push: new dc13f327eb Fix-up offset broadcasting logic dc13f327eb is described below commit dc13f327ebc0a6685aa54e4d8f449464b2652934 Author: Aleksey Yeschenko <alek...@apache.org> AuthorDate: Wed May 7 18:24:58 2025 +0100 Fix-up offset broadcasting logic patch by Aleksey Yeschenko; reviewed by Abe Ratnofsky for CASSANDRA-20576 --- .../cassandra/replication/CoordinatorLog.java | 21 +++--- .../apache/cassandra/replication/Participants.java | 6 ++ .../MutationTrackingWriteForwardingTest.java | 81 +++++++++------------- 3 files changed, 48 insertions(+), 60 deletions(-) diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index 7dad88cf97..0e4bdb7cdb 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -84,7 +84,7 @@ public abstract class CoordinatorLog if (!getLocal().contains(mutationId.offset())) return; // local host hasn't witnessed yet -> no cleanup needed - if (hasWrittenToRemoteReplicas(mutationId.offset())) + if (remoteReplicasWitnessed(mutationId.offset())) { logger.trace("marking mutation {} as fully reconciled", mutationId); // if all replicas have now witnessed the id, remove it from the index @@ -107,9 +107,8 @@ public abstract class CoordinatorLog { for (int offset = start; offset <= end; ++offset) { - // TODO (desired): skip checking the host's offsets - all just added // TODO (desired): use the fact that Offsets are ordered to optimise this look up - if (hasWrittenLocally(offset) && hasWrittenToRemoteReplicas(offset)) + if (othersWitnessed(offset, onHostId)) { reconciledOffsets.add(offset); unreconciledMutations().remove(offset); @@ -167,7 +166,7 @@ public abstract class CoordinatorLog unreconciledMutations().finishWriting(mutation); - if (hasWrittenToRemoteReplicas(offset)) + if (remoteReplicasWitnessed(offset)) { reconciledOffsets.add(offset); unreconciledMutations().remove(offset); @@ -179,22 +178,22 @@ public abstract class CoordinatorLog } } - private boolean hasWrittenLocally(int offset) - { - return getLocal().contains(offset); - } - - private boolean hasWrittenToRemoteReplicas(int offset) + private boolean othersWitnessed(int offset, int exceptHostId) { for (int i = 0; i < participants.size(); ++i) { int hostId = participants.get(i); - if (hostId != localHostId && !get(hostId).contains(offset)) + if (hostId != exceptHostId && !get(hostId).contains(offset)) return false; } return true; } + private boolean remoteReplicasWitnessed(int offset) + { + return othersWitnessed(offset, localHostId); + } + /** * Look up unreconciled sequence ids of mutations witnessed by this host in this coordinataor log. * Adds the ids to the supplied collection, so it can be reused to aggregate lookups for multiple logs. diff --git a/src/java/org/apache/cassandra/replication/Participants.java b/src/java/org/apache/cassandra/replication/Participants.java index 494dd3adca..f245797586 100644 --- a/src/java/org/apache/cassandra/replication/Participants.java +++ b/src/java/org/apache/cassandra/replication/Participants.java @@ -57,4 +57,10 @@ public class Participants throw new IllegalArgumentException("Out of bounds host idx " + idx); return hosts[idx]; } + + @Override + public String toString() + { + return Arrays.toString(hosts); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java index 9eb19a5fc9..6209c1c1b6 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java @@ -15,14 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.distributed.test.tracking; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; +import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; @@ -31,7 +28,6 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; -import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.distributed.test.TestBaseImpl; @@ -39,7 +35,6 @@ import org.apache.cassandra.replication.MutationSummary; import org.apache.cassandra.replication.MutationTrackingService; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; -import org.assertj.core.api.Assertions; import static java.lang.String.format; import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; @@ -64,7 +59,7 @@ public class MutationTrackingWriteForwardingTest extends TestBaseImpl public void testBasicWriteForwarding() throws Throwable { // 2 DCs, 1 replica in each, to test forwarding to instances in remote DCs and local DCs - Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(5, (nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2", "rack2")); + Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(NODES, (nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2", "rack2")); // TODO: disable background reconciliation so we can test that writes are reconciling immediately try (Cluster cluster = Cluster.build(NODES) @@ -82,56 +77,44 @@ public class MutationTrackingWriteForwardingTest extends TestBaseImpl "AND replication_type='tracked';", keyspaceName)); cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v int, primary key (k, c));", keyspaceName, tableName)); - Map<IInstance, Integer> instanceUnreconciled = new HashMap<>(); int ROWS = 100; for (int inserted = 0; inserted < ROWS; inserted++) { // Writes should be completed for the client, regardless of whether they are forwarded or not cluster.coordinator(inst(inserted)).execute(format("INSERT INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName), ConsistencyLevel.ALL, inserted, inserted, inserted); + } - // Writes should be ack'd in the journal too, but these could lag behind client acks, so could be - // permissive here. Each write should be reconciled on 1 leader, unreconciled on TOTAL_RF-1 - // replicas (until background reconciliation broadcast is implemented), and ignored on others. - Set<IInvokableInstance> leaderOrNonReplica = new HashSet<>(2); - Set<IInvokableInstance> replicas = new HashSet<>(); - for (IInvokableInstance instance : cluster) - { - int unreconciled = instance.callOnInstance(() -> { - Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); - Range<Token> fullRange = new Range<>(token, token); - TableId tableId = Schema.instance.getTableMetadata(keyspaceName, tableName).id; - MutationSummary summary = MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, true); - return summary.unreconciledIds(); - }); - int lastUnreconciled = instanceUnreconciled.getOrDefault(instance, 0); - int newUnreconciled = unreconciled - lastUnreconciled; + Thread.sleep(1000); // allow time for all offsets to be broadcasted - if (newUnreconciled == 0) - { - // instance already reconciled (as leader) or did not receive new mutation ID (non-replica) - leaderOrNonReplica.add(instance); - } - else if (newUnreconciled == 1) - { - // instance has not reconciled, so it's a replica (until reconciliation broadcast is implemented) - replicas.add(instance); - } - else - { - Assertions.fail("Should not have more than one new unreconciled mutation"); - } - instanceUnreconciled.put(instance, unreconciled); - } - Assertions.assertThat(leaderOrNonReplica).hasSize(2); - Assertions.assertThat(replicas).hasSize(TOTAL_RF - 1); + int allReconciled = 0; + int allUnreconciled = 0; + + // Writes should be ack'd in the journal too, but these could lag behind client acks, so could be + // permissive here. Each write should be reconciled everywhere. + for (IInvokableInstance instance : cluster) + { + int reconciled = instance.callOnInstance(() -> { + Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); + Range<Token> fullRange = new Range<>(token, token); + TableId tableId = Schema.instance.getTableMetadata(keyspaceName, tableName).id; + MutationSummary summary = MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, true); + return summary.reconciledIds(); + }); + + int unreconciled = instance.callOnInstance(() -> { + Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); + Range<Token> fullRange = new Range<>(token, token); + TableId tableId = Schema.instance.getTableMetadata(keyspaceName, tableName).id; + MutationSummary summary = MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, true); + return summary.unreconciledIds(); + }); + + allReconciled += reconciled; + allUnreconciled += unreconciled; } - Assertions.assertThat(instanceUnreconciled).matches(map -> { - int sum = 0; - for (Integer value : map.values()) - sum += value; - // Each write is reconciled on the leader, unreconciled on all other replicas - return sum == (ROWS * (TOTAL_RF - 1)); - }); + + Assert.assertEquals(0, allUnreconciled); + Assert.assertEquals(ROWS * TOTAL_RF, allReconciled); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org