This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch CASSANDRA-20639-trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit c5469594885361370085d221e5c08a1469ea5d31
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Fri May 9 01:34:13 2025 -0500

    very rough, likely not efficient WIP on CASSANDRA-20639
---
 .../cassandra/db/rows/UnfilteredRowIterators.java  |  9 +++++
 .../cassandra/db/transform/BaseIterator.java       |  5 +++
 .../service/reads/ReplicaFilteringProtection.java  | 47 ++++++++++++++++++++--
 .../apache/cassandra/utils/CloseableIterator.java  |  2 +
 .../MultiNodeTableWalkWithoutReadRepairTest.java   |  4 +-
 .../distributed/test/sai/StrictFilteringTest.java  | 27 ++++++++++++-
 6 files changed, 89 insertions(+), 5 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 7ccc6ff970..2540b5247c 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -100,6 +100,8 @@ public abstract class UnfilteredRowIterators
 
         public void close();
 
+        public default void checkpoint() { }
+
         public static MergeListener NOOP = new MergeListener()
         {
             public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions) {}
@@ -540,6 +542,13 @@ public abstract class UnfilteredRowIterators
                 listener.close();
         }
 
+        @Override
+        public void checkpoint()
+        {
+            if (listener != null)
+                listener.checkpoint();
+        }
+
         private class MergeReducer extends MergeIterator.Reducer<Unfiltered, 
Unfiltered>
         {
             private final MergeListener listener;
diff --git a/src/java/org/apache/cassandra/db/transform/BaseIterator.java 
b/src/java/org/apache/cassandra/db/transform/BaseIterator.java
index 8d7de474e0..a788a7d3ea 100644
--- a/src/java/org/apache/cassandra/db/transform/BaseIterator.java
+++ b/src/java/org/apache/cassandra/db/transform/BaseIterator.java
@@ -115,6 +115,11 @@ abstract class BaseIterator<V, I extends 
CloseableIterator<? extends V>, O exten
         maybeFail(fail);
     }
 
+    public void checkpoint()
+    {
+        input.checkpoint();
+    }
+
     public final O next()
     {
         if (next == null && !hasNext())
diff --git 
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java 
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
index fae560089b..079cfef26b 100644
--- 
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
+++ 
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
@@ -52,6 +52,7 @@ import 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -201,8 +202,8 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
                 for (int i = 0; i < sources.size(); i++)
                     builders.add(i, new PartitionBuilder(partitionKey, 
sources.get(i), columns, stats));
 
-                boolean[] silentRowAt = new boolean[builders.size()];
-                boolean[] silentColumnAt = new boolean[builders.size()];
+                final boolean[] silentRowAt = new boolean[builders.size()];
+                final boolean[] silentColumnAt = new boolean[builders.size()];
 
                 return new UnfilteredRowIterators.MergeListener()
                 {
@@ -270,9 +271,18 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
 
                     @Override
                     public void close()
+                    {
+                        checkpoint();
+                    }
+
+                    @Override
+                    public void checkpoint()
                     {
                         for (int i = 0; i < sources.size(); i++)
                             originalPartitions.get(i).add(builders.get(i));
+
+                        Arrays.fill(silentRowAt, false);
+                        Arrays.fill(silentColumnAt, false);
                     }
                 };
             }
@@ -342,6 +352,7 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
         return new UnfilteredPartitionIterator()
         {
             final Queue<PartitionBuilder> partitions = 
originalPartitions.get(source);
+            RowIterator currentRowIterator = null;
 
             @Override
             public TableMetadata metadata()
@@ -359,7 +370,27 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
                 // will force the RFP merge listener to load at least the next 
protected partition.
                 if (partitions.isEmpty())
                 {
-                    PartitionIterators.consumeNext(merged);
+                    if (command.rowFilter().hasStaticExpression() || 
!command.rowFilter().hasNonKeyExpression())
+                    {
+                        PartitionIterators.consumeNext(merged);
+                    }
+                    else
+                    {
+                        if (currentRowIterator == null || 
!currentRowIterator.hasNext())
+                        {
+                            if (currentRowIterator != null)
+                                currentRowIterator.close();
+
+                            if (merged.hasNext())
+                                currentRowIterator = merged.next();
+                        }
+
+                        if (currentRowIterator != null && 
currentRowIterator.hasNext())
+                        {
+                            currentRowIterator.next();
+                            currentRowIterator.checkpoint();
+                        }
+                    }
                 }
 
                 return !partitions.isEmpty();
@@ -420,6 +451,15 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
                 contents.add(row);
         }
 
+        private void clearContents()
+        {
+            contents.clear();
+            toFetch = null;
+            
+            if (staticRow != Rows.EMPTY_STATIC_ROW)
+                addRow(staticRow);
+        }
+
         private void addRangeTombstoneMarker(RangeTombstoneMarker marker)
         {
             if (marker != null)
@@ -491,6 +531,7 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
                 public void close()
                 {
                     releaseCachedRows(partitionRowsCached);
+                    clearContents();
                 }
 
                 @Override
diff --git a/src/java/org/apache/cassandra/utils/CloseableIterator.java 
b/src/java/org/apache/cassandra/utils/CloseableIterator.java
index 634629f4be..8649af39cd 100644
--- a/src/java/org/apache/cassandra/utils/CloseableIterator.java
+++ b/src/java/org/apache/cassandra/utils/CloseableIterator.java
@@ -24,6 +24,8 @@ import java.util.NoSuchElementException;
 public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable
 {
     public void close();
+    
+    public default void checkpoint() { }
 
     public static <T> CloseableIterator<T> wrap(Iterator<T> iter)
     {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
index 5a0ce66ccc..9120235bce 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
@@ -33,7 +33,9 @@ public class MultiNodeTableWalkWithoutReadRepairTest extends 
MultiNodeTableWalkB
     protected void preCheck(Cluster cluster, Property.StatefulBuilder builder)
     {
         // if a failing seed is detected, populate here
-        // Example: builder.withSeed(42L);
+//        builder.withSeed(1210048824849624538L).withExamples(1);
+        builder.withSeed(-7862021736520593557L).withExamples(1);
+//        builder.withExamples(Integer.MAX_VALUE);
         // CQL operations may have opertors such as +, -, and / (example 4 + 
4), to "apply" them to get a constant value
         // CQL_DEBUG_APPLY_OPERATOR = true;
         // When mutations look to be lost as seen by more complex SELECTs, it 
can be useful to just SELECT the partition/row right after to write to see if 
it was safe at the time.
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
index 480a93abe3..2bb89ce340 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
@@ -47,7 +47,11 @@ public class StrictFilteringTest extends TestBaseImpl
     @BeforeClass
     public static void setUpCluster() throws IOException
     {
-        CLUSTER = init(Cluster.build(2).withConfig(config -> 
config.set("hinted_handoff_enabled", 
false).with(GOSSIP).with(NETWORK)).start());
+        CLUSTER = init(Cluster.build(2).withConfig(config -> 
config.set("hinted_handoff_enabled", false).set("range_request_timeout", "180s")
+                .set("read_request_timeout", "180s")
+                .set("write_request_timeout", "180s")
+                .set("native_transport_timeout", "180s")
+                .set("slow_query_log_timeout", 
"180s").with(GOSSIP).with(NETWORK)).start());
     }
 
     @Test
@@ -216,6 +220,27 @@ public class StrictFilteringTest extends TestBaseImpl
         assertRows(initialRows, row(0, 1, 2));
     }
 
+    @Test
+    public void testNoShortReadAtLimit()
+    {
+        CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.short_read_no_srp 
(k int, c int, a int, b int, PRIMARY KEY (k, c)) WITH read_repair = 'NONE'"));
+        CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON 
%s.short_read_no_srp(a) USING 'sai'"));
+        CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON 
%s.short_read_no_srp(b) USING 'sai'"));
+        SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+
+        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO 
%s.short_read_no_srp(k, c, a) VALUES (0, 0, 1) USING TIMESTAMP 1"));
+        CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO 
%s.short_read_no_srp(k, c, a) VALUES (0, 0, 2) USING TIMESTAMP 2"));
+
+        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO 
%s.short_read_no_srp(k, c, a) VALUES (0, 1, 1) USING TIMESTAMP 3"));
+        CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO 
%s.short_read_no_srp(k, c, a) VALUES (0, 1, 2) USING TIMESTAMP 4"));
+
+        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO 
%s.short_read_no_srp(k, c, a) VALUES (0, 2, 1) USING TIMESTAMP 5"));
+
+        String select = withKeyspace("SELECT * FROM %s.short_read_no_srp WHERE 
k = 0 AND a = 1");
+        Iterator<Object[]> initialRows = 
CLUSTER.coordinator(1).executeWithPaging(select, ConsistencyLevel.ALL, 1);
+        assertRows(initialRows, row(0, 2, 1, null));
+    }
+
     @Test
     public void testShortReadWithStaticColumn()
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to