This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch CASSANDRA-20639-trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit c5469594885361370085d221e5c08a1469ea5d31 Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Fri May 9 01:34:13 2025 -0500 very rough, likely not efficient WIP on CASSANDRA-20639 --- .../cassandra/db/rows/UnfilteredRowIterators.java | 9 +++++ .../cassandra/db/transform/BaseIterator.java | 5 +++ .../service/reads/ReplicaFilteringProtection.java | 47 ++++++++++++++++++++-- .../apache/cassandra/utils/CloseableIterator.java | 2 + .../MultiNodeTableWalkWithoutReadRepairTest.java | 4 +- .../distributed/test/sai/StrictFilteringTest.java | 27 ++++++++++++- 6 files changed, 89 insertions(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index 7ccc6ff970..2540b5247c 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -100,6 +100,8 @@ public abstract class UnfilteredRowIterators public void close(); + public default void checkpoint() { } + public static MergeListener NOOP = new MergeListener() { public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) {} @@ -540,6 +542,13 @@ public abstract class UnfilteredRowIterators listener.close(); } + @Override + public void checkpoint() + { + if (listener != null) + listener.checkpoint(); + } + private class MergeReducer extends MergeIterator.Reducer<Unfiltered, Unfiltered> { private final MergeListener listener; diff --git a/src/java/org/apache/cassandra/db/transform/BaseIterator.java b/src/java/org/apache/cassandra/db/transform/BaseIterator.java index 8d7de474e0..a788a7d3ea 100644 --- a/src/java/org/apache/cassandra/db/transform/BaseIterator.java +++ b/src/java/org/apache/cassandra/db/transform/BaseIterator.java @@ -115,6 +115,11 @@ abstract class BaseIterator<V, I extends CloseableIterator<? extends V>, O exten maybeFail(fail); } + public void checkpoint() + { + input.checkpoint(); + } + public final O next() { if (next == null && !hasNext()) diff --git a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java index fae560089b..079cfef26b 100644 --- a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java @@ -52,6 +52,7 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.RangeTombstoneMarker; import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.rows.Rows; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -201,8 +202,8 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> for (int i = 0; i < sources.size(); i++) builders.add(i, new PartitionBuilder(partitionKey, sources.get(i), columns, stats)); - boolean[] silentRowAt = new boolean[builders.size()]; - boolean[] silentColumnAt = new boolean[builders.size()]; + final boolean[] silentRowAt = new boolean[builders.size()]; + final boolean[] silentColumnAt = new boolean[builders.size()]; return new UnfilteredRowIterators.MergeListener() { @@ -270,9 +271,18 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> @Override public void close() + { + checkpoint(); + } + + @Override + public void checkpoint() { for (int i = 0; i < sources.size(); i++) originalPartitions.get(i).add(builders.get(i)); + + Arrays.fill(silentRowAt, false); + Arrays.fill(silentColumnAt, false); } }; } @@ -342,6 +352,7 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> return new UnfilteredPartitionIterator() { final Queue<PartitionBuilder> partitions = originalPartitions.get(source); + RowIterator currentRowIterator = null; @Override public TableMetadata metadata() @@ -359,7 +370,27 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> // will force the RFP merge listener to load at least the next protected partition. if (partitions.isEmpty()) { - PartitionIterators.consumeNext(merged); + if (command.rowFilter().hasStaticExpression() || !command.rowFilter().hasNonKeyExpression()) + { + PartitionIterators.consumeNext(merged); + } + else + { + if (currentRowIterator == null || !currentRowIterator.hasNext()) + { + if (currentRowIterator != null) + currentRowIterator.close(); + + if (merged.hasNext()) + currentRowIterator = merged.next(); + } + + if (currentRowIterator != null && currentRowIterator.hasNext()) + { + currentRowIterator.next(); + currentRowIterator.checkpoint(); + } + } } return !partitions.isEmpty(); @@ -420,6 +451,15 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> contents.add(row); } + private void clearContents() + { + contents.clear(); + toFetch = null; + + if (staticRow != Rows.EMPTY_STATIC_ROW) + addRow(staticRow); + } + private void addRangeTombstoneMarker(RangeTombstoneMarker marker) { if (marker != null) @@ -491,6 +531,7 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> public void close() { releaseCachedRows(partitionRowsCached); + clearContents(); } @Override diff --git a/src/java/org/apache/cassandra/utils/CloseableIterator.java b/src/java/org/apache/cassandra/utils/CloseableIterator.java index 634629f4be..8649af39cd 100644 --- a/src/java/org/apache/cassandra/utils/CloseableIterator.java +++ b/src/java/org/apache/cassandra/utils/CloseableIterator.java @@ -24,6 +24,8 @@ import java.util.NoSuchElementException; public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { public void close(); + + public default void checkpoint() { } public static <T> CloseableIterator<T> wrap(Iterator<T> iter) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java index 5a0ce66ccc..9120235bce 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java @@ -33,7 +33,9 @@ public class MultiNodeTableWalkWithoutReadRepairTest extends MultiNodeTableWalkB protected void preCheck(Cluster cluster, Property.StatefulBuilder builder) { // if a failing seed is detected, populate here - // Example: builder.withSeed(42L); +// builder.withSeed(1210048824849624538L).withExamples(1); + builder.withSeed(-7862021736520593557L).withExamples(1); +// builder.withExamples(Integer.MAX_VALUE); // CQL operations may have opertors such as +, -, and / (example 4 + 4), to "apply" them to get a constant value // CQL_DEBUG_APPLY_OPERATOR = true; // When mutations look to be lost as seen by more complex SELECTs, it can be useful to just SELECT the partition/row right after to write to see if it was safe at the time. diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java index 480a93abe3..2bb89ce340 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java @@ -47,7 +47,11 @@ public class StrictFilteringTest extends TestBaseImpl @BeforeClass public static void setUpCluster() throws IOException { - CLUSTER = init(Cluster.build(2).withConfig(config -> config.set("hinted_handoff_enabled", false).with(GOSSIP).with(NETWORK)).start()); + CLUSTER = init(Cluster.build(2).withConfig(config -> config.set("hinted_handoff_enabled", false).set("range_request_timeout", "180s") + .set("read_request_timeout", "180s") + .set("write_request_timeout", "180s") + .set("native_transport_timeout", "180s") + .set("slow_query_log_timeout", "180s").with(GOSSIP).with(NETWORK)).start()); } @Test @@ -216,6 +220,27 @@ public class StrictFilteringTest extends TestBaseImpl assertRows(initialRows, row(0, 1, 2)); } + @Test + public void testNoShortReadAtLimit() + { + CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.short_read_no_srp (k int, c int, a int, b int, PRIMARY KEY (k, c)) WITH read_repair = 'NONE'")); + CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON %s.short_read_no_srp(a) USING 'sai'")); + CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON %s.short_read_no_srp(b) USING 'sai'")); + SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE); + + CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s.short_read_no_srp(k, c, a) VALUES (0, 0, 1) USING TIMESTAMP 1")); + CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO %s.short_read_no_srp(k, c, a) VALUES (0, 0, 2) USING TIMESTAMP 2")); + + CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s.short_read_no_srp(k, c, a) VALUES (0, 1, 1) USING TIMESTAMP 3")); + CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO %s.short_read_no_srp(k, c, a) VALUES (0, 1, 2) USING TIMESTAMP 4")); + + CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s.short_read_no_srp(k, c, a) VALUES (0, 2, 1) USING TIMESTAMP 5")); + + String select = withKeyspace("SELECT * FROM %s.short_read_no_srp WHERE k = 0 AND a = 1"); + Iterator<Object[]> initialRows = CLUSTER.coordinator(1).executeWithPaging(select, ConsistencyLevel.ALL, 1); + assertRows(initialRows, row(0, 2, 1, null)); + } + @Test public void testShortReadWithStaticColumn() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org