This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new dc13f327eb Fix-up offset broadcasting logic
dc13f327eb is described below

commit dc13f327ebc0a6685aa54e4d8f449464b2652934
Author: Aleksey Yeschenko <alek...@apache.org>
AuthorDate: Wed May 7 18:24:58 2025 +0100

    Fix-up offset broadcasting logic
    
    patch by Aleksey Yeschenko; reviewed by Abe Ratnofsky for CASSANDRA-20576
---
 .../cassandra/replication/CoordinatorLog.java      | 21 +++---
 .../apache/cassandra/replication/Participants.java |  6 ++
 .../MutationTrackingWriteForwardingTest.java       | 81 +++++++++-------------
 3 files changed, 48 insertions(+), 60 deletions(-)

diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index 7dad88cf97..0e4bdb7cdb 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -84,7 +84,7 @@ public abstract class CoordinatorLog
             if (!getLocal().contains(mutationId.offset()))
                 return; // local host hasn't witnessed yet -> no cleanup needed
 
-            if (hasWrittenToRemoteReplicas(mutationId.offset()))
+            if (remoteReplicasWitnessed(mutationId.offset()))
             {
                 logger.trace("marking mutation {} as fully reconciled", 
mutationId);
                 // if all replicas have now witnessed the id, remove it from 
the index
@@ -107,9 +107,8 @@ public abstract class CoordinatorLog
             {
                 for (int offset = start; offset <= end; ++offset)
                 {
-                    // TODO (desired): skip checking the host's offsets - all 
just added
                     // TODO (desired): use the fact that Offsets are ordered 
to optimise this look up
-                    if (hasWrittenLocally(offset) && 
hasWrittenToRemoteReplicas(offset))
+                    if (othersWitnessed(offset, onHostId))
                     {
                         reconciledOffsets.add(offset);
                         unreconciledMutations().remove(offset);
@@ -167,7 +166,7 @@ public abstract class CoordinatorLog
 
             unreconciledMutations().finishWriting(mutation);
 
-            if (hasWrittenToRemoteReplicas(offset))
+            if (remoteReplicasWitnessed(offset))
             {
                 reconciledOffsets.add(offset);
                 unreconciledMutations().remove(offset);
@@ -179,22 +178,22 @@ public abstract class CoordinatorLog
         }
     }
 
-    private boolean hasWrittenLocally(int offset)
-    {
-        return getLocal().contains(offset);
-    }
-
-    private boolean hasWrittenToRemoteReplicas(int offset)
+    private boolean othersWitnessed(int offset, int exceptHostId)
     {
         for (int i = 0; i < participants.size(); ++i)
         {
             int hostId = participants.get(i);
-            if (hostId != localHostId && !get(hostId).contains(offset))
+            if (hostId != exceptHostId && !get(hostId).contains(offset))
                 return false;
         }
         return true;
     }
 
+    private boolean remoteReplicasWitnessed(int offset)
+    {
+        return othersWitnessed(offset, localHostId);
+    }
+
     /**
      * Look up unreconciled sequence ids of mutations witnessed by this host 
in this coordinataor log.
      * Adds the ids to the supplied collection, so it can be reused to 
aggregate lookups for multiple logs.
diff --git a/src/java/org/apache/cassandra/replication/Participants.java 
b/src/java/org/apache/cassandra/replication/Participants.java
index 494dd3adca..f245797586 100644
--- a/src/java/org/apache/cassandra/replication/Participants.java
+++ b/src/java/org/apache/cassandra/replication/Participants.java
@@ -57,4 +57,10 @@ public class Participants
             throw new IllegalArgumentException("Out of bounds host idx " + 
idx);
         return hosts[idx];
     }
+
+    @Override
+    public String toString()
+    {
+        return Arrays.toString(hosts);
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
index 9eb19a5fc9..6209c1c1b6 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
@@ -15,14 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.distributed.test.tracking;
 
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -31,7 +28,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
-import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
@@ -39,7 +35,6 @@ import org.apache.cassandra.replication.MutationSummary;
 import org.apache.cassandra.replication.MutationTrackingService;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
-import org.assertj.core.api.Assertions;
 
 import static java.lang.String.format;
 import static 
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
@@ -64,7 +59,7 @@ public class MutationTrackingWriteForwardingTest extends 
TestBaseImpl
     public void testBasicWriteForwarding() throws Throwable
     {
         // 2 DCs, 1 replica in each, to test forwarding to instances in remote 
DCs and local DCs
-        Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(5, 
(nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2", 
"rack2"));
+        Map<Integer, NetworkTopology.DcAndRack> topology = 
networkTopology(NODES, (nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") 
: dcAndRack("dc2", "rack2"));
 
         // TODO: disable background reconciliation so we can test that writes 
are reconciling immediately
         try (Cluster cluster = Cluster.build(NODES)
@@ -82,56 +77,44 @@ public class MutationTrackingWriteForwardingTest extends 
TestBaseImpl
                                         "AND replication_type='tracked';", 
keyspaceName));
             cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v 
int, primary key (k, c));", keyspaceName, tableName));
 
-            Map<IInstance, Integer> instanceUnreconciled = new HashMap<>();
             int ROWS = 100;
             for (int inserted = 0; inserted < ROWS; inserted++)
             {
                 // Writes should be completed for the client, regardless of 
whether they are forwarded or not
                 cluster.coordinator(inst(inserted)).execute(format("INSERT 
INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName), 
ConsistencyLevel.ALL, inserted, inserted, inserted);
+            }
 
-                // Writes should be ack'd in the journal too, but these could 
lag behind client acks, so could be
-                // permissive here. Each write should be reconciled on 1 
leader, unreconciled on TOTAL_RF-1
-                // replicas (until background reconciliation broadcast is 
implemented), and ignored on others.
-                Set<IInvokableInstance> leaderOrNonReplica = new HashSet<>(2);
-                Set<IInvokableInstance> replicas = new HashSet<>();
-                for (IInvokableInstance instance : cluster)
-                {
-                    int unreconciled = instance.callOnInstance(() -> {
-                        Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
-                        Range<Token> fullRange = new Range<>(token, token);
-                        TableId tableId = 
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
-                        MutationSummary summary = 
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, 
true);
-                        return summary.unreconciledIds();
-                    });
-                    int lastUnreconciled = 
instanceUnreconciled.getOrDefault(instance, 0);
-                    int newUnreconciled = unreconciled - lastUnreconciled;
+            Thread.sleep(1000); // allow time for all offsets to be broadcasted
 
-                    if (newUnreconciled == 0)
-                    {
-                        // instance already reconciled (as leader) or did not 
receive new mutation ID (non-replica)
-                        leaderOrNonReplica.add(instance);
-                    }
-                    else if (newUnreconciled == 1)
-                    {
-                        // instance has not reconciled, so it's a replica 
(until reconciliation broadcast is implemented)
-                        replicas.add(instance);
-                    }
-                    else
-                    {
-                        Assertions.fail("Should not have more than one new 
unreconciled mutation");
-                    }
-                    instanceUnreconciled.put(instance, unreconciled);
-                }
-                Assertions.assertThat(leaderOrNonReplica).hasSize(2);
-                Assertions.assertThat(replicas).hasSize(TOTAL_RF - 1);
+            int allReconciled = 0;
+            int allUnreconciled = 0;
+
+            // Writes should be ack'd in the journal too, but these could lag 
behind client acks, so could be
+            // permissive here. Each write should be reconciled everywhere.
+            for (IInvokableInstance instance : cluster)
+            {
+                int reconciled = instance.callOnInstance(() -> {
+                    Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+                    Range<Token> fullRange = new Range<>(token, token);
+                    TableId tableId = 
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
+                    MutationSummary summary = 
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, 
true);
+                    return summary.reconciledIds();
+                });
+
+                int unreconciled = instance.callOnInstance(() -> {
+                    Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+                    Range<Token> fullRange = new Range<>(token, token);
+                    TableId tableId = 
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
+                    MutationSummary summary = 
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, 
true);
+                    return summary.unreconciledIds();
+                });
+
+                allReconciled += reconciled;
+                allUnreconciled += unreconciled;
             }
-            Assertions.assertThat(instanceUnreconciled).matches(map -> {
-                int sum = 0;
-                for (Integer value : map.values())
-                    sum += value;
-                // Each write is reconciled on the leader, unreconciled on all 
other replicas
-                return sum == (ROWS * (TOTAL_RF - 1));
-            });
+
+            Assert.assertEquals(0, allUnreconciled);
+            Assert.assertEquals(ROWS * TOTAL_RF, allReconciled);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to