This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new 07831c9cc7 SAI marks an index as non-empty when a partial 
partition/row modifications is flushed due to repair
07831c9cc7 is described below

commit 07831c9cc7efcb9be7b227260467cf10a7be7724
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Wed Apr 23 15:18:17 2025 -0700

    SAI marks an index as non-empty when a partial partition/row modifications 
is flushed due to repair
    
    patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-20567
---
 CHANGES.txt                                        |  1 +
 .../index/sai/disk/v1/MemtableIndexWriter.java     | 15 ++--
 .../cassandra/distributed/shared/ClusterUtils.java | 39 +++++++++
 .../test/sai/PartialWritesWithRepairTest.java      | 96 ++++++++++++++++++++++
 4 files changed, 145 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 2682765f1c..21b9f481b2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0.5
+ * SAI marks an index as non-empty when a partial partition/row modifications 
is flushed due to repair (CASSANDRA-20567)
  * SAI fails queries when multiple columns exist and a non-indexed column is a 
composite with a map (CASSANDRA-19891)
  * Grant permission on keyspaces system_views and system_virtual_schema not 
possible (CASSANDRA-20171)
  * Fix marking an SSTable as suspected and BufferPool leakage in case of a 
corrupted SSTable read during a compaction (CASSANDRA-20396)
diff --git 
a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java 
b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
index 04d3185bfc..c5f833f4ac 100644
--- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java
@@ -146,12 +146,15 @@ public class MemtableIndexWriter implements 
PerColumnIndexWriter
             {
                 final Iterator<Pair<ByteComparable, LongArrayList>> iterator = 
rowMapping.merge(memtable);
 
-                try (MemtableTermsIterator terms = new 
MemtableTermsIterator(memtable.getMinTerm(), memtable.getMaxTerm(), iterator))
+                long cellCount = 0;
+                if (iterator.hasNext())
                 {
-                    long cellCount = flush(terms);
-
-                    completeIndexFlush(cellCount, start, stopwatch);
+                    try (MemtableTermsIterator terms = new 
MemtableTermsIterator(memtable.getMinTerm(), memtable.getMaxTerm(), iterator))
+                    {
+                        cellCount = flush(terms);
+                    }
                 }
+                completeIndexFlush(cellCount, start, stopwatch);
             }
         }
         catch (Throwable t)
@@ -217,8 +220,8 @@ public class MemtableIndexWriter implements 
PerColumnIndexWriter
 
     private void completeIndexFlush(long cellCount, long startTime, Stopwatch 
stopwatch) throws IOException
     {
-        // create a completion marker indicating that the index is complete 
and not-empty
-        ColumnCompletionMarkerUtil.create(indexDescriptor, indexIdentifier, 
false);
+        // create a completion marker indicating that the index is complete
+        ColumnCompletionMarkerUtil.create(indexDescriptor, indexIdentifier, 
cellCount == 0);
 
         indexMetrics.memtableIndexFlushCount.inc();
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java 
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index 5a8da8c7cf..3bded5cd16 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.distributed.shared;
 
+import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.security.Permission;
@@ -60,6 +61,7 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tools.SystemExitException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Isolated;
+import org.apache.cassandra.utils.Shared;
 
 import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
@@ -977,4 +979,41 @@ public class ClusterUtils
             }
         });
     }
+
+    @Shared
+    public static class Range implements Serializable
+    {
+        public final String left, right;
+
+        public Range(String left, String right)
+        {
+            this.left = left;
+            this.right = right;
+        }
+
+        public Range(long left, long right)
+        {
+            this(Long.toString(left), Long.toString(right));
+        }
+
+        public long left()
+        {
+            return Long.parseLong(left);
+        }
+
+        public long right()
+        {
+            return Long.parseLong(right);
+        }
+    }
+
+    public static List<Range> getPrimaryRanges(IInvokableInstance instance, 
String keyspace)
+    {
+        return instance.callOnInstance(() -> {
+            var ranges = 
StorageService.instance.getPrimaryRangesForEndpoint(keyspace, 
FBUtilities.getBroadcastAddressAndPort());
+            return ranges.stream()
+                    .flatMap(r -> r.unwrap().stream().map(r2 -> new 
Range(r2.left.toString(), r2.right.toString())))
+                    .collect(Collectors.toList());
+        });
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/sai/PartialWritesWithRepairTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/sai/PartialWritesWithRepairTest.java
new file mode 100644
index 0000000000..703137c5c1
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/PartialWritesWithRepairTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.sai;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.shared.ClusterUtils.Range;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+public class PartialWritesWithRepairTest extends TestBaseImpl
+{
+    @Test
+    public void test() throws IOException
+    {
+        try (Cluster cluster = Cluster.build(2)
+                .withConfig(c -> c.with(Feature.values()))
+                .start())
+        {
+            init(cluster);
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk 
vector<bigint, 2>, ck int, s1 int static, v1 int, v2 int, PRIMARY KEY (pk, 
ck))"));
+            cluster.schemaChange(withKeyspace("CREATE INDEX ON %s.tbl(s1) 
USING 'sai'"));
+            cluster.schemaChange(withKeyspace("CREATE INDEX ON %s.tbl(v1) 
USING 'sai'"));
+            cluster.schemaChange(withKeyspace("CREATE INDEX ON %s.tbl(v2) 
USING 'sai'"));
+            IInvokableInstance node1 = cluster.get(1);
+            IInvokableInstance node2 = cluster.get(2);
+            // see org.apache.cassandra.service.StorageService.repair
+            List<Range> partialRanges = ClusterUtils.getPrimaryRanges(node1, 
KEYSPACE);
+            var completeRanges = completeRanges(partialRanges);
+
+            // write to each column for the complete set
+            // avoid writing to one of the columns for the partial set
+            for (var range : completeRanges)
+            {
+                ByteBuffer pk = key(range);
+                node2.executeInternal(withKeyspace("INSERT INTO %s.tbl(pk, ck, 
s1, v1, v2) VALUES (?, ?, ?, ?, ?)"), pk, 0, 0, 0, 0);
+                node2.executeInternal(withKeyspace("INSERT INTO %s.tbl(pk, ck, 
s1, v1, v2) VALUES (?, ?, ?, ?, ?)"), pk, 1, 0, 1, 1);
+            }
+            for (var range : partialRanges)
+            {
+                ByteBuffer pk = key(range);
+                node2.executeInternal(withKeyspace("INSERT INTO %s.tbl(pk, ck, 
v1) VALUES (?, ?, ?)"), pk, 0, 0);
+                node2.executeInternal(withKeyspace("INSERT INTO %s.tbl(pk, ck, 
v1) VALUES (?, ?, ?)"), pk, 1, 1);
+            }
+
+            node1.nodetoolResult("repair", KEYSPACE, 
"-pr").asserts().success();
+        }
+    }
+
+    private static ByteBuffer key(Range range)
+    {
+        return Murmur3Partitioner.LongToken.keyForToken(range.right());
+    }
+
+    private static List<Range> completeRanges(List<Range> ranges)
+    {
+        ranges.sort(Comparator.comparingLong(Range::left));
+        List<Range> list = new ArrayList<>();
+        Range previous = ranges.get(0);
+        if (previous.left() != Long.MIN_VALUE)
+            list.add(new Range(Long.MIN_VALUE, ranges.get(0).left()));
+        for (int i = 1; i < ranges.size(); i++)
+        {
+            Range next = ranges.get(i);
+            if (!previous.right.equals(next.left))
+                list.add(new Range(previous.right, next.left));
+            previous = next;
+        }
+        return list;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to