This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new 7a8335c273 Optimize initial skipping logic for SAI queries on large partitions 7a8335c273 is described below commit 7a8335c2739c207b77e90c05897285b3cbaba166 Author: Sunil Ramchandra Pawar <pawar...@apple.com> AuthorDate: Thu May 8 17:12:18 2025 +0530 Optimize initial skipping logic for SAI queries on large partitions patch by Sunil Ramchandra Pawar; reviewed by Caleb Rackliffe and David Capwell for CASSANDRA-20191 --- CHANGES.txt | 1 + .../sai/plan/StorageAttachedIndexSearcher.java | 58 +++- .../index/sai/cql/IntraPartitionSkippingTest.java | 318 +++++++++++++++++++++ 3 files changed, 375 insertions(+), 2 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 9a6dbe7ae5..c073719105 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0.5 + * Optimize initial skipping logic for SAI queries on large partitions (CASSANDRA-20191) * Fix reading mmapped trie-index exceeding 2GiB (CASSANDRA-20351) * zero copy streaming allocates direct memory that isn't used, but does help to fragment the memory space (CASSANDRA-20577) * CQLSSTableWriter supports setting the format (BTI or Big) (CASSANDRA-20609) diff --git a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java index 9116db0d31..20a9cad58c 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java +++ b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java @@ -18,6 +18,7 @@ package org.apache.cassandra.index.sai.plan; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -32,6 +33,8 @@ import javax.annotation.Nullable; import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ClusteringBound; +import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DataRange; import org.apache.cassandra.db.DecoratedKey; @@ -39,6 +42,10 @@ import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; @@ -138,6 +145,7 @@ public class StorageAttachedIndexSearcher implements Index.Searcher private final PrimaryKey firstPrimaryKey; private final PrimaryKey lastPrimaryKey; private final Iterator<DataRange> keyRanges; + private final DataRange firstDataRange; private AbstractBounds<PartitionPosition> currentKeyRange; private final KeyRangeIterator resultKeyIterator; @@ -152,7 +160,8 @@ public class StorageAttachedIndexSearcher implements Index.Searcher private ResultRetriever(ReadExecutionController executionController, boolean topK) { this.keyRanges = queryController.dataRanges().iterator(); - this.currentKeyRange = keyRanges.next().keyRange(); + this.firstDataRange = keyRanges.next(); + this.currentKeyRange = firstDataRange.keyRange(); this.resultKeyIterator = Operation.buildIterator(queryController); this.filterTree = Operation.buildFilter(queryController, queryController.usesStrictFiltering()); this.executionController = executionController; @@ -175,7 +184,52 @@ public class StorageAttachedIndexSearcher implements Index.Searcher // We can't put this code in the constructor because it may throw and the caller // may not be prepared for that. if (lastKey == null) - resultKeyIterator.skipTo(firstPrimaryKey); + { + PrimaryKey skipTarget = firstPrimaryKey; + ClusteringComparator comparator = command.metadata().comparator; + + // If there are no clusterings, the first data range selects an entire partitions, or we have static + // expressions, don't bother trying to skip forward within the partition. + if (comparator.size() > 0 && !firstDataRange.selectsAllPartition() && !command.rowFilter().hasStaticExpression()) + { + // Only attempt to skip if the first data range covers a single partition. + if (currentKeyRange.left.equals(currentKeyRange.right) && currentKeyRange.left instanceof DecoratedKey) + { + DecoratedKey decoratedKey = (DecoratedKey) currentKeyRange.left; + ClusteringIndexFilter filter = firstDataRange.clusteringIndexFilter(decoratedKey); + + if (filter instanceof ClusteringIndexSliceFilter) + { + Slices slices = ((ClusteringIndexSliceFilter) filter).requestedSlices(); + + if (!slices.isEmpty()) + { + ClusteringBound<?> startBound = slices.get(0).start(); + + if (!startBound.isEmpty()) + { + ByteBuffer[] rawValues = startBound.getBufferArray(); + + if (rawValues.length == comparator.size()) + skipTarget = keyFactory.create(decoratedKey, Clustering.make(rawValues)); + } + } + } + else if (filter instanceof ClusteringIndexNamesFilter) + { + ClusteringIndexNamesFilter namesFilter = (ClusteringIndexNamesFilter) filter; + + if (!namesFilter.requestedRows().isEmpty()) + { + Clustering<?> skipClustering = namesFilter.requestedRows().iterator().next(); + skipTarget = keyFactory.create(decoratedKey, skipClustering); + } + } + } + } + + resultKeyIterator.skipTo(skipTarget); + } // Theoretically we wouldn't need this if the caller of computeNext always ran the // returned iterators to the completion. Unfortunately, we have no control over the caller behavior here. diff --git a/test/unit/org/apache/cassandra/index/sai/cql/IntraPartitionSkippingTest.java b/test/unit/org/apache/cassandra/index/sai/cql/IntraPartitionSkippingTest.java new file mode 100644 index 0000000000..b9e42640e7 --- /dev/null +++ b/test/unit/org/apache/cassandra/index/sai/cql/IntraPartitionSkippingTest.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.cql; + +import org.junit.Ignore; +import org.junit.Test; + +import org.HdrHistogram.Histogram; +import org.apache.cassandra.index.sai.SAITester; + +/** + * Tests for verifying intra-partition and partition-level skipping optimizations + * introduced in CASSANDRA-20191 for SAI. + * <p> + * These tests validate that Cassandra can efficiently skip over rows + * within a partition using clustering filters (name and slice), paging, reversed order, + * and sparse matches. + * <p> + * Each test documents a scenario where skipping logic is expected to apply along with few where it doesn't skip. + */ +public class IntraPartitionSkippingTest extends SAITester +{ + @Test + public void testNameFilterExactMatch() throws Throwable + { + createTable("CREATE TABLE %S (pk int, ck int, val text, PRIMARY KEY (pk, ck))"); + createIndex("CREATE INDEX ON %s(val) USING 'sai'"); + + for (int ck = 0; ck < 10; ck++) + { + execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, ck, "val" + ck); + } + + beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck = 5 AND val = 'val5' ALLOW FILTERING"), + row(1, 5,"val5"))); + } + + @Test + public void testSliceFilterRangeMatch() throws Throwable + { + createTable("CREATE TABLE %S (pk int, ck int, val text, PRIMARY KEY (pk, ck))"); + createIndex("CREATE INDEX ON %s(val) USING 'sai'"); + + for (int ck = 0; ck < 100; ck++) + { + execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, ck, "val" + ck); + } + + beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck > 90 AND val = 'val99' ALLOW FILTERING"), + row(1, 99,"val99"))); + } + + @Test + public void testReversedClustering() throws Throwable + { + createTable("CREATE TABLE %S (pk int, ck int, val text, PRIMARY KEY (pk, ck)) WITH CLUSTERING ORDER BY (ck DESC)"); + createIndex("CREATE INDEX ON %s(val) USING 'sai'"); + + for (int ck = 0; ck < 20; ck++) + { + execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, ck, "val" + ck); + } + + beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck < 10 AND val = 'val5' ALLOW FILTERING"), + row(1,5,"val5"))); + } + + @Test + public void testSkippingWithPaging() throws Throwable + { + createTable("CREATE TABLE %S (pk int, ck int, val int, PRIMARY KEY (pk, ck))"); + + createIndex("CREATE INDEX ON %s(val) USING 'sai'"); + + for (int ck = 0; ck < 100; ck++) + { + int val = 1000 + ck; + execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, ck, val); + } + + beforeAndAfterFlush(() -> assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE pk = 1 AND ck > 90 AND val > 1090 ALLOW FILTERING", 5), + row(1, 91, 1091), + row(1, 92, 1092), + row(1, 93, 1093), + row(1, 94, 1094), + row(1, 95, 1095), + row(1, 96, 1096), + row(1, 97, 1097), + row(1, 98, 1098), + row(1, 99, 1099))); + } + + @Test + public void testCompositeClusteringKeySkipping() throws Throwable + { + createTable("CREATE TABLE %S (pk int, ck1 int, ck2 int, val text, PRIMARY KEY (pk, ck1, ck2))"); + createIndex("CREATE INDEX ON %s(val) USING 'sai'"); + + for (int ck1 = 0; ck1 < 10; ck1++) + for (int ck2 = 0; ck2 < 10; ck2++) + execute("INSERT INTO %s (pk, ck1, ck2, val) VALUES (?, ?, ?, ?)", 1, ck1, ck2, "v" + (ck1*10+ck2)); + + + beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck1 = 9 AND ck2 = 9 AND val = 'v99' ALLOW FILTERING"), + row(1,9,9,"v99"))); + + } + + @Test + public void testSparseMatch() throws Throwable + { + createTable("CREATE TABLE %S (pk int, ck int, val text, PRIMARY KEY (pk, ck))"); + createIndex("CREATE INDEX ON %s(val) USING 'sai'"); + + for (int ck = 0; ck < 1000; ck++) + { + String value = (ck % 450 == 0) ? "insert" : "skip"; + execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, ck, value); + } + + beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck > 899 AND val = 'insert' ALLOW FILTERING"), + row(1,900,"insert"))); + + } + + @Test + public void testMultipleNameFilters() throws Throwable + { + createTable("CREATE TABLE %S (pk int, ck int, val text, PRIMARY KEY (pk, ck))"); + createIndex("CREATE INDEX ON %s(val) USING 'sai'"); + + for (int i = 0; i < 20; i++) + execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, i, "v5"); + + beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck IN (5, 10, 15) AND val = 'v5' ALLOW FILTERING"), + row(1,5,"v5"), row(1,10,"v5"), row(1,15,"v5"))); + + } + + // Multiple partition range scans won't skip + @Test + public void testPartitionRangeSkipping() throws Throwable + { + createTable("CREATE TABLE %S (pk int, ck int, val text, PRIMARY KEY (pk, ck))"); + createIndex("CREATE INDEX ON %s(val) USING 'sai'"); + + for (int pk = 0; pk < 10; pk++) + for (int ck = 0; ck < 5; ck++) + execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", pk, ck, "value" + pk); + + beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE val = 'value9' AND ck > 2 ALLOW FILTERING"), + row(9,3,"value9"), row(9,4,"value9"))); + + } + + @Test + public void testStaticColumns() throws Throwable + { + createTable("CREATE TABLE %S (pk int, ck int, s text static, val text, PRIMARY KEY (pk, ck))"); + createIndex("CREATE INDEX ON %s(val) USING 'sai'"); + + execute("INSERT INTO %s (pk, s) VALUES (?, ?)", 1, "static1"); + + for (int ck = 0; ck < 200; ck++) + { + execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, ck, "val" + ck); + } + + + // We will not skip + beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck > 100 AND s = 'static1' AND val = 'val101' ALLOW FILTERING"), + row(1,101,"static1","val101"))); + + // we will skip + beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck > 100 AND val = 'val101' ALLOW FILTERING"), + row(1,101,"static1","val101"))); + } + + @Test + public void testNextKeyClusteringIndexNamesFilter() throws Throwable + { + createTable("CREATE TABLE %S (" + + "pk int," + + "ck int," + + "v int," + + "PRIMARY KEY (pk, ck))"); + + createIndex("CREATE INDEX ON %s(v) USING 'sai'"); + + int pk = 1; + for (int ck = 0; ck < 10; ck++) + { + int v = ck + 1000; + execute("INSERT INTO %s (pk, ck, v) VALUES (?, ?, ?)", pk, ck, v); + } + + int pk1 = 2; + for (int ck = 0; ck < 100; ck++) + { + execute("INSERT INTO %s (pk, ck, v) VALUES (?, ?, ?)", pk1, ck, ck); + } + + beforeAndAfterFlush(() -> { + assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck = 5 AND v > 1004 ALLOW FILTERING"), + row(1, 5, 1005)); + + assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck = 5 AND v > 1004 AND v < 20000 ALLOW FILTERING"), + row(1, 5, 1005)); + }); + + + } + + // Performance testing test-cases and can be ingnored. + @Ignore ("performance test case for Index Slice filter.") + @Test + public void testNextKeyPerfClusteringIndexSliceFilter() + { + createTable("CREATE TABLE %S (" + + "pk int, " + + "ck int, " + + "val text, " + + "PRIMARY KEY (pk, ck))"); + + createIndex("CREATE INDEX ON %s(val) USING 'sai'"); + + int pk = 1; + for (int ck = 0; ck < 10000; ck++) + { + execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", pk, ck, "hello1"); + } + + int pk1 = 2; + for (int ck = 0; ck < 100; ck++) + { + execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", pk1, ck, "hello2"); + } + + Histogram histogram = new Histogram(4); + + + for (int i = 0; i < 10000; i++) + { + long start = System.nanoTime(); + execute("SELECT * FROM %s WHERE pk = 1 AND ck > 9000 AND val = 'hello1' ALLOW FILTERING"); + histogram.recordValue(System.nanoTime() - start); + + if (i % 1000 == 0) + { + System.out.println("50th: " + histogram.getValueAtPercentile(0.5)); + System.out.println("95th: " + histogram.getValueAtPercentile(0.95)); + System.out.println("99th: " + histogram.getValueAtPercentile(0.99)); + } + } + + } + + + @Ignore ("performance test case for Index Names filter.") + @Test + public void testNextKeyPerfClusteringIndexNamesFilter() + { + createTable("CREATE TABLE %S (" + + "pk int," + + "ck int," + + "v int," + + "PRIMARY KEY (pk, ck))"); + + createIndex("CREATE INDEX ON %s(v) USING 'sai'"); + + int pk = 1; + for (int ck = 0; ck < 20000; ck++) + { + int v = ck + 10; + execute("INSERT INTO %s (pk, ck, v) VALUES (?, ?, ?)", pk, ck, v); + } + + int pk1 = 2; + for (int ck = 0; ck < 100; ck++) + { + execute("INSERT INTO %s (pk, ck, v) VALUES (?, ?, ?)", pk1, ck, ck); + } + + Histogram histogram = new Histogram(4); + + for (int i = 0; i < 10000; i++) + { + long start = System.nanoTime(); + execute("SELECT * FROM %s WHERE pk = 1 AND ck = 15000 AND v > 9000 ALLOW FILTERING"); + histogram.recordValue(System.nanoTime() - start); + + if (i % 1000 == 0) + { + System.out.println("50th: " + histogram.getValueAtPercentile(0.5)); + System.out.println("95th: " + histogram.getValueAtPercentile(0.95)); + System.out.println("99th: " + histogram.getValueAtPercentile(0.99)); + } + } + + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org