This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new 07831c9cc7 SAI marks an index as non-empty when a partial partition/row modifications is flushed due to repair 07831c9cc7 is described below commit 07831c9cc7efcb9be7b227260467cf10a7be7724 Author: David Capwell <dcapw...@apache.org> AuthorDate: Wed Apr 23 15:18:17 2025 -0700 SAI marks an index as non-empty when a partial partition/row modifications is flushed due to repair patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-20567 --- CHANGES.txt | 1 + .../index/sai/disk/v1/MemtableIndexWriter.java | 15 ++-- .../cassandra/distributed/shared/ClusterUtils.java | 39 +++++++++ .../test/sai/PartialWritesWithRepairTest.java | 96 ++++++++++++++++++++++ 4 files changed, 145 insertions(+), 6 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 2682765f1c..21b9f481b2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0.5 + * SAI marks an index as non-empty when a partial partition/row modifications is flushed due to repair (CASSANDRA-20567) * SAI fails queries when multiple columns exist and a non-indexed column is a composite with a map (CASSANDRA-19891) * Grant permission on keyspaces system_views and system_virtual_schema not possible (CASSANDRA-20171) * Fix marking an SSTable as suspected and BufferPool leakage in case of a corrupted SSTable read during a compaction (CASSANDRA-20396) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java index 04d3185bfc..c5f833f4ac 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java @@ -146,12 +146,15 @@ public class MemtableIndexWriter implements PerColumnIndexWriter { final Iterator<Pair<ByteComparable, LongArrayList>> iterator = rowMapping.merge(memtable); - try (MemtableTermsIterator terms = new MemtableTermsIterator(memtable.getMinTerm(), memtable.getMaxTerm(), iterator)) + long cellCount = 0; + if (iterator.hasNext()) { - long cellCount = flush(terms); - - completeIndexFlush(cellCount, start, stopwatch); + try (MemtableTermsIterator terms = new MemtableTermsIterator(memtable.getMinTerm(), memtable.getMaxTerm(), iterator)) + { + cellCount = flush(terms); + } } + completeIndexFlush(cellCount, start, stopwatch); } } catch (Throwable t) @@ -217,8 +220,8 @@ public class MemtableIndexWriter implements PerColumnIndexWriter private void completeIndexFlush(long cellCount, long startTime, Stopwatch stopwatch) throws IOException { - // create a completion marker indicating that the index is complete and not-empty - ColumnCompletionMarkerUtil.create(indexDescriptor, indexIdentifier, false); + // create a completion marker indicating that the index is complete + ColumnCompletionMarkerUtil.create(indexDescriptor, indexIdentifier, cellCount == 0); indexMetrics.memtableIndexFlushCount.inc(); diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java index 5a8da8c7cf..3bded5cd16 100644 --- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java @@ -18,6 +18,7 @@ package org.apache.cassandra.distributed.shared; +import java.io.Serializable; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.security.Permission; @@ -60,6 +61,7 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tools.SystemExitException; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Isolated; +import org.apache.cassandra.utils.Shared; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS; @@ -977,4 +979,41 @@ public class ClusterUtils } }); } + + @Shared + public static class Range implements Serializable + { + public final String left, right; + + public Range(String left, String right) + { + this.left = left; + this.right = right; + } + + public Range(long left, long right) + { + this(Long.toString(left), Long.toString(right)); + } + + public long left() + { + return Long.parseLong(left); + } + + public long right() + { + return Long.parseLong(right); + } + } + + public static List<Range> getPrimaryRanges(IInvokableInstance instance, String keyspace) + { + return instance.callOnInstance(() -> { + var ranges = StorageService.instance.getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddressAndPort()); + return ranges.stream() + .flatMap(r -> r.unwrap().stream().map(r2 -> new Range(r2.left.toString(), r2.right.toString()))) + .collect(Collectors.toList()); + }); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/PartialWritesWithRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/PartialWritesWithRepairTest.java new file mode 100644 index 0000000000..703137c5c1 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/sai/PartialWritesWithRepairTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.sai; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.ClusterUtils.Range; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +public class PartialWritesWithRepairTest extends TestBaseImpl +{ + @Test + public void test() throws IOException + { + try (Cluster cluster = Cluster.build(2) + .withConfig(c -> c.with(Feature.values())) + .start()) + { + init(cluster); + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk vector<bigint, 2>, ck int, s1 int static, v1 int, v2 int, PRIMARY KEY (pk, ck))")); + cluster.schemaChange(withKeyspace("CREATE INDEX ON %s.tbl(s1) USING 'sai'")); + cluster.schemaChange(withKeyspace("CREATE INDEX ON %s.tbl(v1) USING 'sai'")); + cluster.schemaChange(withKeyspace("CREATE INDEX ON %s.tbl(v2) USING 'sai'")); + IInvokableInstance node1 = cluster.get(1); + IInvokableInstance node2 = cluster.get(2); + // see org.apache.cassandra.service.StorageService.repair + List<Range> partialRanges = ClusterUtils.getPrimaryRanges(node1, KEYSPACE); + var completeRanges = completeRanges(partialRanges); + + // write to each column for the complete set + // avoid writing to one of the columns for the partial set + for (var range : completeRanges) + { + ByteBuffer pk = key(range); + node2.executeInternal(withKeyspace("INSERT INTO %s.tbl(pk, ck, s1, v1, v2) VALUES (?, ?, ?, ?, ?)"), pk, 0, 0, 0, 0); + node2.executeInternal(withKeyspace("INSERT INTO %s.tbl(pk, ck, s1, v1, v2) VALUES (?, ?, ?, ?, ?)"), pk, 1, 0, 1, 1); + } + for (var range : partialRanges) + { + ByteBuffer pk = key(range); + node2.executeInternal(withKeyspace("INSERT INTO %s.tbl(pk, ck, v1) VALUES (?, ?, ?)"), pk, 0, 0); + node2.executeInternal(withKeyspace("INSERT INTO %s.tbl(pk, ck, v1) VALUES (?, ?, ?)"), pk, 1, 1); + } + + node1.nodetoolResult("repair", KEYSPACE, "-pr").asserts().success(); + } + } + + private static ByteBuffer key(Range range) + { + return Murmur3Partitioner.LongToken.keyForToken(range.right()); + } + + private static List<Range> completeRanges(List<Range> ranges) + { + ranges.sort(Comparator.comparingLong(Range::left)); + List<Range> list = new ArrayList<>(); + Range previous = ranges.get(0); + if (previous.left() != Long.MIN_VALUE) + list.add(new Range(Long.MIN_VALUE, ranges.get(0).left())); + for (int i = 1; i < ranges.size(); i++) + { + Range next = ranges.get(i); + if (!previous.right.equals(next.left)) + list.add(new Range(previous.right, next.left)); + previous = next; + } + return list; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org