This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new e5c101673a Ensure replica filtering protection does not trigger 
unnecessary short read protection reads
e5c101673a is described below

commit e5c101673a0ec9a097ba41ffd99090944f73124d
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Fri May 9 01:34:13 2025 -0500

    Ensure replica filtering protection does not trigger unnecessary short read 
protection reads
    
    patch by Caleb Rackliffe; reviewed by Blake Eggleston and Zhao Yang for 
CASSANDRA-20639
---
 CHANGES.txt                                        |   1 +
 .../db/partitions/PartitionIterators.java          |  15 --
 .../service/reads/ReplicaFilteringProtection.java  | 291 +++++++++++++--------
 .../repair/PartitionIteratorMergeListener.java     |   2 +-
 .../distributed/test/sai/StrictFilteringTest.java  |  48 ++++
 5 files changed, 234 insertions(+), 123 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e95055e0a3..50f6f851e8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0.5
+ * Ensure replica filtering protection does not trigger unnecessary short read 
protection reads (CASSANDRA-20639)
  * Unified Compaction does not properly validate min and target sizes 
(CASSANDRA-20398)
  * Avoid lambda usage in TrieMemoryIndex range queries and ensure queue size 
tracking is per column (CASSANDRA-20668)
  * Avoid CQLSH throwing an exception loading .cqlshrc on non-supported 
platforms (CASSANDRA-20478)
diff --git 
a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index b8a86d5a1a..5375b2cf0f 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -93,21 +93,6 @@ public abstract class PartitionIterators
         }
     }
 
-    /**
-     * Consumes all rows in the next partition of the provided partition 
iterator.
-     */
-    public static void consumeNext(PartitionIterator iterator)
-    {
-        if (iterator.hasNext())
-        {
-            try (RowIterator partition = iterator.next())
-            {
-                while (partition.hasNext())
-                    partition.next();
-            }
-        }
-    }
-
     /**
      * Wraps the provided iterator so it logs the returned rows for debugging 
purposes.
      * <p>
diff --git 
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java 
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
index c66c2007d6..72c1c85fc8 100644
--- 
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
+++ 
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
@@ -27,13 +27,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.Queue;
 import java.util.function.Function;
 
+import javax.annotation.concurrent.NotThreadSafe;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Columns;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionTime;
@@ -46,12 +47,12 @@ import 
org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionIterators;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -71,6 +72,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.reads.repair.NoopReadRepair;
+import 
org.apache.cassandra.service.reads.repair.PartitionIteratorMergeListener;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.utils.NoSpamLogger;
@@ -90,6 +92,7 @@ import org.apache.cassandra.utils.btree.BTreeSet;
  * @see <a 
href="https://issues.apache.org/jira/browse/CASSANDRA-15907";>CASSANDRA-15907</a>
  * @see <a 
href="https://issues.apache.org/jira/browse/CASSANDRA-19018";>CASSANDRA-19018</a>
  */
+@NotThreadSafe
 public class ReplicaFilteringProtection<E extends Endpoints<E>>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ReplicaFilteringProtection.class);
@@ -105,6 +108,8 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
     private final E sources;
     private final TableMetrics tableMetrics;
 
+    private final QueryMergeListener mergeListener;
+
     private final int cachedRowsWarnThreshold;
     private final int cachedRowsFailThreshold;
 
@@ -119,6 +124,12 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
      */
     private final List<Queue<PartitionBuilder>> originalPartitions;
 
+    /** Whether to consume entire partitions or not in {@link 
#queryProtectedPartitions}. */
+    private final boolean consumeEntirePartitions;
+
+    /** Tracks the current partitions when not consuming entire partitions in 
{@link #queryProtectedPartitions}. */
+    private RowIterator currentRowIterator = null;
+
     ReplicaFilteringProtection(Keyspace keyspace,
                                ReadCommand command,
                                ConsistencyLevel consistency,
@@ -129,6 +140,7 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
     {
         this.keyspace = keyspace;
         this.command = command;
+        this.consumeEntirePartitions = command.limits().isUnlimited() || 
!command.isLimitedToOnePartition() || command.rowFilter().hasStaticExpression();
         this.consistency = consistency;
         this.requestTime = requestTime;
         this.sources = sources;
@@ -143,6 +155,8 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
 
         this.cachedRowsWarnThreshold = cachedRowsWarnThreshold;
         this.cachedRowsFailThreshold = cachedRowsFailThreshold;
+
+        mergeListener = new QueryMergeListener();
     }
 
     private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, 
Replica source, ReplicaPlan.Shared<EndpointsForToken, ReplicaPlan.ForTokenRead> 
replicaPlan)
@@ -170,109 +184,136 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
         return resolver.getMessages().get(0).payload.makeIterator(command);
     }
 
-    /**
-     * This listener tracks both the accepted data and the primary keys of the 
rows that may be incomplete.
-     * That way, once the query results are merged using this listener, 
subsequent calls to
-     * {@link #queryProtectedPartitions(PartitionIterator, int)} will use the 
collected data to return a copy of the
-     * data originally collected from the specified replica, completed with 
the potentially outdated rows.
-     */
-    UnfilteredPartitionIterators.MergeListener mergeController()
+    private class PartitionMergeListerner implements 
UnfilteredRowIterators.MergeListener
     {
-        return new UnfilteredPartitionIterators.MergeListener()
+        final DecoratedKey key;
+        final List<PartitionBuilder> builders = new 
ArrayList<>(sources.size());
+        final RegularAndStaticColumns columns;
+        final EncodingStats stats;
+        final boolean[] silentRowAt;
+        final boolean[] silentColumnAt;
+        
+        PartitionMergeListerner(DecoratedKey partitionKey, 
List<UnfilteredRowIterator> versions)
         {
-            @Override
-            public void close()
-            {
-                // If we hit the failure threshold before consuming a single 
partition, record the current rows cached.
-                
tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached, 
maxRowsCached));
-            }
+            key = partitionKey;
+            columns = PartitionIteratorMergeListener.columns(versions);
+            stats = EncodingStats.merge(versions, NULL_TO_NO_STATS);
 
-            @Override
-            public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
-            {
-                List<PartitionBuilder> builders = new 
ArrayList<>(sources.size());
-                RegularAndStaticColumns columns = columns(versions);
-                EncodingStats stats = EncodingStats.merge(versions, 
NULL_TO_NO_STATS);
+            for (int i = 0; i < sources.size(); i++)
+                builders.add(i, new PartitionBuilder(partitionKey, 
sources.get(i), columns, stats));
+
+            silentRowAt = new boolean[builders.size()];
+            silentColumnAt = new boolean[builders.size()];
+        }
+
+        @Override
+        public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
+        {
+            // cache the deletion time versions to be able to regenerate the 
original row iterator
+            for (int i = 0; i < versions.length; i++)
+                builders.get(i).setDeletionTime(versions[i]);
+        }
 
-                for (int i = 0; i < sources.size(); i++)
-                    builders.add(i, new PartitionBuilder(partitionKey, 
sources.get(i), columns, stats));
+        @Override
+        public void onMergedRows(Row merged, Row[] versions)
+        {
+            // Cache the row versions to be able to regenerate the original 
row iterator:
+            for (int i = 0; i < versions.length; i++)
+                builders.get(i).addRow(versions[i]);
 
-                boolean[] silentRowAt = new boolean[builders.size()];
-                boolean[] silentColumnAt = new boolean[builders.size()];
+            // If all versions are empty, there's no divergence to resolve:
+            if (merged.isEmpty())
+                return;
 
-                return new UnfilteredRowIterators.MergeListener()
+            Arrays.fill(silentRowAt, false);
+
+            // Mark replicas silent if they provide no data for the row:
+            for (int i = 0; i < versions.length; i++)
+                if (versions[i] == null || (merged.isStatic() && 
versions[i].isEmpty()))
+                    silentRowAt[i] = true;
+
+            // Even if there are no completely missing rows, replicas may 
still be silent about individual
+            // columns, so we need to check for divergence at the column level:
+            for (ColumnMetadata column : merged.isStatic() ? columns.statics : 
columns.regulars)
+            {
+                Arrays.fill(silentColumnAt, false);
+                boolean allSilent = true;
+
+                for (int i = 0; i < versions.length; i++)
                 {
-                    @Override
-                    public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
-                    {
-                        // cache the deletion time versions to be able to 
regenerate the original row iterator
-                        for (int i = 0; i < versions.length; i++)
-                            builders.get(i).setDeletionTime(versions[i]);
-                    }
+                    // If the version at this replica is null, we've already 
marked it as silent:
+                    if (versions[i] != null && 
versions[i].getColumnData(column) == null)
+                        silentColumnAt[i] = true;
+                    else
+                        allSilent = false;
+                }
 
-                    @Override
-                    public void onMergedRows(Row merged, Row[] versions)
-                    {
-                        // Cache the row versions to be able to regenerate the 
original row iterator:
-                        for (int i = 0; i < versions.length; i++)
-                            builders.get(i).addRow(versions[i]);
+                for (int i = 0; i < versions.length; i++)
+                    // Mark the replica silent if it is silent about this 
column and there is actually 
+                    // divergence between the replicas. (i.e. If all replicas 
are silent for this 
+                    // column, there is nothing to fetch to complete the row 
anyway.)
+                    silentRowAt[i] |= silentColumnAt[i] && !allSilent;
+            }
 
-                        // If all versions are empty, there's no divergence to 
resolve:
-                        if (merged.isEmpty())
-                            return;
+            for (int i = 0; i < silentRowAt.length; i++)
+                if (silentRowAt[i])
+                    builders.get(i).addToFetch(merged);
+        }
 
-                        Arrays.fill(silentRowAt, false);
+        @Override
+        public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions)
+        {
+            // cache the marker versions to be able to regenerate the original 
row iterator
+            for (int i = 0; i < versions.length; i++)
+                builders.get(i).addRangeTombstoneMarker(versions[i]);
+        }
 
-                        // Mark replicas silent if they provide no data for 
the row:
-                        for (int i = 0; i < versions.length; i++)
-                            if (versions[i] == null || (merged.isStatic() && 
versions[i].isEmpty()))
-                                silentRowAt[i] = true;
+        @Override
+        public void close() {}
 
-                        // Even if there are no completely missing rows, 
replicas may still be silent about individual
-                        // columns, so we need to check for divergence at the 
column level:
-                        for (ColumnMetadata column : merged.isStatic() ? 
columns.statics : columns.regulars)
-                        {
-                            Arrays.fill(silentColumnAt, false);
-                            boolean allSilent = true;
+        public void populate()
+        {
+            for (int i = 0; i < sources.size(); i++)
+                originalPartitions.get(i).add(builders.get(i));
+        }
+    }
 
-                            for (int i = 0; i < versions.length; i++)
-                            {
-                                // If the version at this replica is null, 
we've already marked it as silent:
-                                if (versions[i] != null && 
versions[i].getColumnData(column) == null)
-                                    silentColumnAt[i] = true;
-                                else
-                                    allSilent = false;
-                            }
+    private class QueryMergeListener implements 
UnfilteredPartitionIterators.MergeListener
+    {
+        private PartitionMergeListerner currentListener;
 
-                            for (int i = 0; i < versions.length; i++)
-                                // Mark the replica silent if it is silent 
about this column and there is actually 
-                                // divergence between the replicas. (i.e. If 
all replicas are silent for this 
-                                // column, there is nothing to fetch to 
complete the row anyway.)
-                                silentRowAt[i] |= silentColumnAt[i] && 
!allSilent;
-                        }
+        @Override
+        public void close()
+        {
+            // If we hit the failure threshold before consuming a single 
partition, record the current rows cached.
+            
tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached, 
maxRowsCached));
+        }
 
-                        for (int i = 0; i < silentRowAt.length; i++)
-                            if (silentRowAt[i])
-                                builders.get(i).addToFetch(merged);
-                    }
+        @Override
+        public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
+        {
+            if (currentListener == null || 
!currentListener.key.equals(partitionKey))
+                currentListener = new PartitionMergeListerner(partitionKey, 
versions);
 
-                    @Override
-                    public void 
onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions)
-                    {
-                        // cache the marker versions to be able to regenerate 
the original row iterator
-                        for (int i = 0; i < versions.length; i++)
-                            
builders.get(i).addRangeTombstoneMarker(versions[i]);
-                    }
+            return currentListener;
+        }
 
-                    @Override
-                    public void close()
-                    {
-                        for (int i = 0; i < sources.size(); i++)
-                            originalPartitions.get(i).add(builders.get(i));
-                    }
-                };
-            }
-        };
+        public void populate()
+        {
+            if (currentListener != null)
+                currentListener.populate();
+        }
+    }
+
+    /**
+     * This listener tracks both the accepted data and the primary keys of the 
rows that may be incomplete.
+     * That way, once the query results are merged using this listener, 
subsequent calls to
+     * {@link #queryProtectedPartitions(PartitionIterator, int)} will use the 
collected data to return a copy of the
+     * data originally collected from the specified replica, completed with 
the potentially outdated rows.
+     */
+    UnfilteredPartitionIterators.MergeListener mergeController()
+    {
+        return mergeListener;
     }
 
     private void incrementCachedRows()
@@ -309,22 +350,6 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
         currentRowsCached -= count;
     }
 
-    private static RegularAndStaticColumns columns(List<UnfilteredRowIterator> 
versions)
-    {
-        Columns statics = Columns.NONE;
-        Columns regulars = Columns.NONE;
-        for (UnfilteredRowIterator iter : versions)
-        {
-            if (iter == null)
-                continue;
-
-            RegularAndStaticColumns cols = iter.columns();
-            statics = statics.mergeTo(cols.statics);
-            regulars = regulars.mergeTo(cols.regulars);
-        }
-        return new RegularAndStaticColumns(statics, regulars);
-    }
-
     /**
      * Returns the protected results for the specified replica. These are 
generated fetching the extra rows and merging
      * them with the cached original filtered results for that replica.
@@ -346,16 +371,66 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
             }
 
             @Override
-            public void close() { }
+            public void close()
+            {
+                if (currentRowIterator != null)
+                    currentRowIterator.close();
+            }
 
             @Override
             public boolean hasNext()
             {
                 // If there are no cached partition builders for this source, 
advance the first phase iterator, which
-                // will force the RFP merge listener to load at least the next 
protected partition.
+                // will force the RFP merge listener to load rows from the 
next protected partition.
                 if (partitions.isEmpty())
                 {
-                    PartitionIterators.consumeNext(merged);
+                    if (consumeEntirePartitions)
+                    {
+                        if (merged.hasNext())
+                        {
+                            try (RowIterator partition = merged.next())
+                            {
+                                while (partition.hasNext())
+                                    partition.next();
+
+                                mergeListener.populate();
+                            }
+                        }
+                    }
+                    else
+                    {
+                        if (currentRowIterator == null || 
!currentRowIterator.hasNext())
+                        {
+                            // If there is an iterator, it's done, so just 
close it.
+                            if (currentRowIterator != null)
+                            {
+                                currentRowIterator.close();
+                                currentRowIterator = null;
+                            }
+
+                            // Take the next filtered partition from the 
merged partition iterator.
+                            if (merged.hasNext())
+                                currentRowIterator = merged.next();
+                        }
+
+                        if (currentRowIterator != null)
+                        {
+                            int i = 0;
+
+                            // Consume LIMIT filtered rows from the current 
partition, unless there are fewer results.
+                            // The underlying iterator is short-read 
protected, and limiting the number of rows we
+                            // consume avoids needless SRP reads when there 
are many more than LIMIT results.
+                            while (i < command.limits().count() && 
currentRowIterator.hasNext())
+                            {
+                                currentRowIterator.next();
+                                i++;
+                            }
+
+                            // If we actually consumed a row, checkpoint to 
populate the builders.
+                            if (i > 0)
+                                mergeListener.populate();
+                        }
+                    }
                 }
 
                 return !partitions.isEmpty();
@@ -487,6 +562,8 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
                 public void close()
                 {
                     releaseCachedRows(partitionRowsCached);
+                    toFetch = null;
+                    // TODO: the counters might not be accurate for the static 
row at this point?
                 }
 
                 @Override
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
 
b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
index f77bd4d52c..5aacaf4329 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -49,7 +49,7 @@ public class PartitionIteratorMergeListener<E extends 
Endpoints<E>>
         return new RowIteratorMergeListener<>(partitionKey, columns(versions), 
isReversed(versions), replicaPlan, command, readRepair);
     }
 
-    protected RegularAndStaticColumns columns(List<UnfilteredRowIterator> 
versions)
+    public static RegularAndStaticColumns columns(List<UnfilteredRowIterator> 
versions)
     {
         Columns statics = Columns.NONE;
         Columns regulars = Columns.NONE;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
index 6ec80fd0ae..6ab807f77c 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
@@ -27,11 +27,14 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.index.sai.plan.StorageAttachedIndexQueryPlan;
 
+import static org.junit.Assert.assertEquals;
+
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
@@ -222,6 +225,51 @@ public class StrictFilteringTest extends TestBaseImpl
         assertRows(initialRows, row(0, 1, 2));
     }
 
+    @Test
+    public void testNoShortReadAtLimit()
+    {
+        CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.no_srp_at_limit (k 
int, c int, a int, PRIMARY KEY (k, c)) WITH read_repair = 'NONE'"));
+        CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON 
%s.no_srp_at_limit(a) USING 'sai'"));
+        SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+
+        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO 
%s.no_srp_at_limit(k, c, a) VALUES (0, 2, 1) USING TIMESTAMP 5"));
+        CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO 
%s.no_srp_at_limit(k, c, a) VALUES (0, 3, 1) USING TIMESTAMP 6"));
+
+        Long srpRequestsBefore = CLUSTER.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore("no_srp_at_limit").metric.shortReadProtectionRequests.getCount());
+
+        String select = withKeyspace("SELECT * FROM %s.no_srp_at_limit WHERE k 
= 0 AND a = 1 LIMIT 1");
+        Object[][] initialRows = CLUSTER.coordinator(1).execute(select, 
ConsistencyLevel.ALL);
+        assertRows(initialRows, row(0, 2, 1));
+
+        Long srpRequestsAfter = CLUSTER.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore("no_srp_at_limit").metric.shortReadProtectionRequests.getCount());
+        assertEquals(srpRequestsBefore, srpRequestsAfter);
+    }
+
+    @Test
+    public void testNecessaryShortRead()
+    {
+        CLUSTER.schemaChange(withKeyspace("CREATE TABLE 
%s.necessary_short_read (k int, c int, a int, PRIMARY KEY (k, c)) WITH 
read_repair = 'NONE'"));
+        CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON 
%s.necessary_short_read(a) USING 'sai'"));
+        SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+
+        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO 
%s.necessary_short_read(k, c, a) VALUES (0, 2, 1) USING TIMESTAMP 5"));
+        CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO 
%s.necessary_short_read(k, c, a) VALUES (0, 2, 2) USING TIMESTAMP 6"));
+
+        CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO 
%s.necessary_short_read(k, c, a) VALUES (0, 3, 1) USING TIMESTAMP 7"));
+        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO 
%s.necessary_short_read(k, c, a) VALUES (0, 3, 2) USING TIMESTAMP 8"));
+
+        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO 
%s.necessary_short_read(k, c, a) VALUES (0, 4, 1) USING TIMESTAMP 9"));
+
+        Long srpRequestsBefore = CLUSTER.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore("necessary_short_read").metric.shortReadProtectionRequests.getCount());
+
+        String select = withKeyspace("SELECT * FROM %s.necessary_short_read 
WHERE k = 0 AND a = 1 LIMIT 1");
+        Object[][] initialRows = CLUSTER.coordinator(1).execute(select, 
ConsistencyLevel.ALL);
+        assertRows(initialRows, row(0, 4, 1));
+
+        Long srpRequestsAfter = CLUSTER.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore("necessary_short_read").metric.shortReadProtectionRequests.getCount());
+        assertEquals(srpRequestsBefore + 2L, srpRequestsAfter.longValue());
+    }
+
     @Test
     public void testShortReadWithStaticColumn()
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to