This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new e5c101673a Ensure replica filtering protection does not trigger unnecessary short read protection reads e5c101673a is described below commit e5c101673a0ec9a097ba41ffd99090944f73124d Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Fri May 9 01:34:13 2025 -0500 Ensure replica filtering protection does not trigger unnecessary short read protection reads patch by Caleb Rackliffe; reviewed by Blake Eggleston and Zhao Yang for CASSANDRA-20639 --- CHANGES.txt | 1 + .../db/partitions/PartitionIterators.java | 15 -- .../service/reads/ReplicaFilteringProtection.java | 291 +++++++++++++-------- .../repair/PartitionIteratorMergeListener.java | 2 +- .../distributed/test/sai/StrictFilteringTest.java | 48 ++++ 5 files changed, 234 insertions(+), 123 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e95055e0a3..50f6f851e8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0.5 + * Ensure replica filtering protection does not trigger unnecessary short read protection reads (CASSANDRA-20639) * Unified Compaction does not properly validate min and target sizes (CASSANDRA-20398) * Avoid lambda usage in TrieMemoryIndex range queries and ensure queue size tracking is per column (CASSANDRA-20668) * Avoid CQLSH throwing an exception loading .cqlshrc on non-supported platforms (CASSANDRA-20478) diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java index b8a86d5a1a..5375b2cf0f 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java @@ -93,21 +93,6 @@ public abstract class PartitionIterators } } - /** - * Consumes all rows in the next partition of the provided partition iterator. - */ - public static void consumeNext(PartitionIterator iterator) - { - if (iterator.hasNext()) - { - try (RowIterator partition = iterator.next()) - { - while (partition.hasNext()) - partition.next(); - } - } - } - /** * Wraps the provided iterator so it logs the returned rows for debugging purposes. * <p> diff --git a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java index c66c2007d6..72c1c85fc8 100644 --- a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java @@ -27,13 +27,14 @@ import java.util.concurrent.TimeUnit; import java.util.Queue; import java.util.function.Function; +import javax.annotation.concurrent.NotThreadSafe; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Columns; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.DeletionTime; @@ -46,12 +47,12 @@ import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.partitions.PartitionIterators; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.RangeTombstoneMarker; import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.rows.Rows; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -71,6 +72,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.reads.repair.NoopReadRepair; +import org.apache.cassandra.service.reads.repair.PartitionIteratorMergeListener; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.utils.NoSpamLogger; @@ -90,6 +92,7 @@ import org.apache.cassandra.utils.btree.BTreeSet; * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-15907">CASSANDRA-15907</a> * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-19018">CASSANDRA-19018</a> */ +@NotThreadSafe public class ReplicaFilteringProtection<E extends Endpoints<E>> { private static final Logger logger = LoggerFactory.getLogger(ReplicaFilteringProtection.class); @@ -105,6 +108,8 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> private final E sources; private final TableMetrics tableMetrics; + private final QueryMergeListener mergeListener; + private final int cachedRowsWarnThreshold; private final int cachedRowsFailThreshold; @@ -119,6 +124,12 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> */ private final List<Queue<PartitionBuilder>> originalPartitions; + /** Whether to consume entire partitions or not in {@link #queryProtectedPartitions}. */ + private final boolean consumeEntirePartitions; + + /** Tracks the current partitions when not consuming entire partitions in {@link #queryProtectedPartitions}. */ + private RowIterator currentRowIterator = null; + ReplicaFilteringProtection(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, @@ -129,6 +140,7 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> { this.keyspace = keyspace; this.command = command; + this.consumeEntirePartitions = command.limits().isUnlimited() || !command.isLimitedToOnePartition() || command.rowFilter().hasStaticExpression(); this.consistency = consistency; this.requestTime = requestTime; this.sources = sources; @@ -143,6 +155,8 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> this.cachedRowsWarnThreshold = cachedRowsWarnThreshold; this.cachedRowsFailThreshold = cachedRowsFailThreshold; + + mergeListener = new QueryMergeListener(); } private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, Replica source, ReplicaPlan.Shared<EndpointsForToken, ReplicaPlan.ForTokenRead> replicaPlan) @@ -170,109 +184,136 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> return resolver.getMessages().get(0).payload.makeIterator(command); } - /** - * This listener tracks both the accepted data and the primary keys of the rows that may be incomplete. - * That way, once the query results are merged using this listener, subsequent calls to - * {@link #queryProtectedPartitions(PartitionIterator, int)} will use the collected data to return a copy of the - * data originally collected from the specified replica, completed with the potentially outdated rows. - */ - UnfilteredPartitionIterators.MergeListener mergeController() + private class PartitionMergeListerner implements UnfilteredRowIterators.MergeListener { - return new UnfilteredPartitionIterators.MergeListener() + final DecoratedKey key; + final List<PartitionBuilder> builders = new ArrayList<>(sources.size()); + final RegularAndStaticColumns columns; + final EncodingStats stats; + final boolean[] silentRowAt; + final boolean[] silentColumnAt; + + PartitionMergeListerner(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) { - @Override - public void close() - { - // If we hit the failure threshold before consuming a single partition, record the current rows cached. - tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached, maxRowsCached)); - } + key = partitionKey; + columns = PartitionIteratorMergeListener.columns(versions); + stats = EncodingStats.merge(versions, NULL_TO_NO_STATS); - @Override - public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) - { - List<PartitionBuilder> builders = new ArrayList<>(sources.size()); - RegularAndStaticColumns columns = columns(versions); - EncodingStats stats = EncodingStats.merge(versions, NULL_TO_NO_STATS); + for (int i = 0; i < sources.size(); i++) + builders.add(i, new PartitionBuilder(partitionKey, sources.get(i), columns, stats)); + + silentRowAt = new boolean[builders.size()]; + silentColumnAt = new boolean[builders.size()]; + } + + @Override + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + // cache the deletion time versions to be able to regenerate the original row iterator + for (int i = 0; i < versions.length; i++) + builders.get(i).setDeletionTime(versions[i]); + } - for (int i = 0; i < sources.size(); i++) - builders.add(i, new PartitionBuilder(partitionKey, sources.get(i), columns, stats)); + @Override + public void onMergedRows(Row merged, Row[] versions) + { + // Cache the row versions to be able to regenerate the original row iterator: + for (int i = 0; i < versions.length; i++) + builders.get(i).addRow(versions[i]); - boolean[] silentRowAt = new boolean[builders.size()]; - boolean[] silentColumnAt = new boolean[builders.size()]; + // If all versions are empty, there's no divergence to resolve: + if (merged.isEmpty()) + return; - return new UnfilteredRowIterators.MergeListener() + Arrays.fill(silentRowAt, false); + + // Mark replicas silent if they provide no data for the row: + for (int i = 0; i < versions.length; i++) + if (versions[i] == null || (merged.isStatic() && versions[i].isEmpty())) + silentRowAt[i] = true; + + // Even if there are no completely missing rows, replicas may still be silent about individual + // columns, so we need to check for divergence at the column level: + for (ColumnMetadata column : merged.isStatic() ? columns.statics : columns.regulars) + { + Arrays.fill(silentColumnAt, false); + boolean allSilent = true; + + for (int i = 0; i < versions.length; i++) { - @Override - public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) - { - // cache the deletion time versions to be able to regenerate the original row iterator - for (int i = 0; i < versions.length; i++) - builders.get(i).setDeletionTime(versions[i]); - } + // If the version at this replica is null, we've already marked it as silent: + if (versions[i] != null && versions[i].getColumnData(column) == null) + silentColumnAt[i] = true; + else + allSilent = false; + } - @Override - public void onMergedRows(Row merged, Row[] versions) - { - // Cache the row versions to be able to regenerate the original row iterator: - for (int i = 0; i < versions.length; i++) - builders.get(i).addRow(versions[i]); + for (int i = 0; i < versions.length; i++) + // Mark the replica silent if it is silent about this column and there is actually + // divergence between the replicas. (i.e. If all replicas are silent for this + // column, there is nothing to fetch to complete the row anyway.) + silentRowAt[i] |= silentColumnAt[i] && !allSilent; + } - // If all versions are empty, there's no divergence to resolve: - if (merged.isEmpty()) - return; + for (int i = 0; i < silentRowAt.length; i++) + if (silentRowAt[i]) + builders.get(i).addToFetch(merged); + } - Arrays.fill(silentRowAt, false); + @Override + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { + // cache the marker versions to be able to regenerate the original row iterator + for (int i = 0; i < versions.length; i++) + builders.get(i).addRangeTombstoneMarker(versions[i]); + } - // Mark replicas silent if they provide no data for the row: - for (int i = 0; i < versions.length; i++) - if (versions[i] == null || (merged.isStatic() && versions[i].isEmpty())) - silentRowAt[i] = true; + @Override + public void close() {} - // Even if there are no completely missing rows, replicas may still be silent about individual - // columns, so we need to check for divergence at the column level: - for (ColumnMetadata column : merged.isStatic() ? columns.statics : columns.regulars) - { - Arrays.fill(silentColumnAt, false); - boolean allSilent = true; + public void populate() + { + for (int i = 0; i < sources.size(); i++) + originalPartitions.get(i).add(builders.get(i)); + } + } - for (int i = 0; i < versions.length; i++) - { - // If the version at this replica is null, we've already marked it as silent: - if (versions[i] != null && versions[i].getColumnData(column) == null) - silentColumnAt[i] = true; - else - allSilent = false; - } + private class QueryMergeListener implements UnfilteredPartitionIterators.MergeListener + { + private PartitionMergeListerner currentListener; - for (int i = 0; i < versions.length; i++) - // Mark the replica silent if it is silent about this column and there is actually - // divergence between the replicas. (i.e. If all replicas are silent for this - // column, there is nothing to fetch to complete the row anyway.) - silentRowAt[i] |= silentColumnAt[i] && !allSilent; - } + @Override + public void close() + { + // If we hit the failure threshold before consuming a single partition, record the current rows cached. + tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached, maxRowsCached)); + } - for (int i = 0; i < silentRowAt.length; i++) - if (silentRowAt[i]) - builders.get(i).addToFetch(merged); - } + @Override + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + if (currentListener == null || !currentListener.key.equals(partitionKey)) + currentListener = new PartitionMergeListerner(partitionKey, versions); - @Override - public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) - { - // cache the marker versions to be able to regenerate the original row iterator - for (int i = 0; i < versions.length; i++) - builders.get(i).addRangeTombstoneMarker(versions[i]); - } + return currentListener; + } - @Override - public void close() - { - for (int i = 0; i < sources.size(); i++) - originalPartitions.get(i).add(builders.get(i)); - } - }; - } - }; + public void populate() + { + if (currentListener != null) + currentListener.populate(); + } + } + + /** + * This listener tracks both the accepted data and the primary keys of the rows that may be incomplete. + * That way, once the query results are merged using this listener, subsequent calls to + * {@link #queryProtectedPartitions(PartitionIterator, int)} will use the collected data to return a copy of the + * data originally collected from the specified replica, completed with the potentially outdated rows. + */ + UnfilteredPartitionIterators.MergeListener mergeController() + { + return mergeListener; } private void incrementCachedRows() @@ -309,22 +350,6 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> currentRowsCached -= count; } - private static RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions) - { - Columns statics = Columns.NONE; - Columns regulars = Columns.NONE; - for (UnfilteredRowIterator iter : versions) - { - if (iter == null) - continue; - - RegularAndStaticColumns cols = iter.columns(); - statics = statics.mergeTo(cols.statics); - regulars = regulars.mergeTo(cols.regulars); - } - return new RegularAndStaticColumns(statics, regulars); - } - /** * Returns the protected results for the specified replica. These are generated fetching the extra rows and merging * them with the cached original filtered results for that replica. @@ -346,16 +371,66 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> } @Override - public void close() { } + public void close() + { + if (currentRowIterator != null) + currentRowIterator.close(); + } @Override public boolean hasNext() { // If there are no cached partition builders for this source, advance the first phase iterator, which - // will force the RFP merge listener to load at least the next protected partition. + // will force the RFP merge listener to load rows from the next protected partition. if (partitions.isEmpty()) { - PartitionIterators.consumeNext(merged); + if (consumeEntirePartitions) + { + if (merged.hasNext()) + { + try (RowIterator partition = merged.next()) + { + while (partition.hasNext()) + partition.next(); + + mergeListener.populate(); + } + } + } + else + { + if (currentRowIterator == null || !currentRowIterator.hasNext()) + { + // If there is an iterator, it's done, so just close it. + if (currentRowIterator != null) + { + currentRowIterator.close(); + currentRowIterator = null; + } + + // Take the next filtered partition from the merged partition iterator. + if (merged.hasNext()) + currentRowIterator = merged.next(); + } + + if (currentRowIterator != null) + { + int i = 0; + + // Consume LIMIT filtered rows from the current partition, unless there are fewer results. + // The underlying iterator is short-read protected, and limiting the number of rows we + // consume avoids needless SRP reads when there are many more than LIMIT results. + while (i < command.limits().count() && currentRowIterator.hasNext()) + { + currentRowIterator.next(); + i++; + } + + // If we actually consumed a row, checkpoint to populate the builders. + if (i > 0) + mergeListener.populate(); + } + } } return !partitions.isEmpty(); @@ -487,6 +562,8 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> public void close() { releaseCachedRows(partitionRowsCached); + toFetch = null; + // TODO: the counters might not be accurate for the static row at this point? } @Override diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java index f77bd4d52c..5aacaf4329 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java @@ -49,7 +49,7 @@ public class PartitionIteratorMergeListener<E extends Endpoints<E>> return new RowIteratorMergeListener<>(partitionKey, columns(versions), isReversed(versions), replicaPlan, command, readRepair); } - protected RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions) + public static RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions) { Columns statics = Columns.NONE; Columns regulars = Columns.NONE; diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java index 6ec80fd0ae..6ab807f77c 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java @@ -27,11 +27,14 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.index.sai.plan.StorageAttachedIndexQueryPlan; +import static org.junit.Assert.assertEquals; + import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; @@ -222,6 +225,51 @@ public class StrictFilteringTest extends TestBaseImpl assertRows(initialRows, row(0, 1, 2)); } + @Test + public void testNoShortReadAtLimit() + { + CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.no_srp_at_limit (k int, c int, a int, PRIMARY KEY (k, c)) WITH read_repair = 'NONE'")); + CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON %s.no_srp_at_limit(a) USING 'sai'")); + SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE); + + CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s.no_srp_at_limit(k, c, a) VALUES (0, 2, 1) USING TIMESTAMP 5")); + CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO %s.no_srp_at_limit(k, c, a) VALUES (0, 3, 1) USING TIMESTAMP 6")); + + Long srpRequestsBefore = CLUSTER.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("no_srp_at_limit").metric.shortReadProtectionRequests.getCount()); + + String select = withKeyspace("SELECT * FROM %s.no_srp_at_limit WHERE k = 0 AND a = 1 LIMIT 1"); + Object[][] initialRows = CLUSTER.coordinator(1).execute(select, ConsistencyLevel.ALL); + assertRows(initialRows, row(0, 2, 1)); + + Long srpRequestsAfter = CLUSTER.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("no_srp_at_limit").metric.shortReadProtectionRequests.getCount()); + assertEquals(srpRequestsBefore, srpRequestsAfter); + } + + @Test + public void testNecessaryShortRead() + { + CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.necessary_short_read (k int, c int, a int, PRIMARY KEY (k, c)) WITH read_repair = 'NONE'")); + CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON %s.necessary_short_read(a) USING 'sai'")); + SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE); + + CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s.necessary_short_read(k, c, a) VALUES (0, 2, 1) USING TIMESTAMP 5")); + CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO %s.necessary_short_read(k, c, a) VALUES (0, 2, 2) USING TIMESTAMP 6")); + + CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO %s.necessary_short_read(k, c, a) VALUES (0, 3, 1) USING TIMESTAMP 7")); + CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s.necessary_short_read(k, c, a) VALUES (0, 3, 2) USING TIMESTAMP 8")); + + CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s.necessary_short_read(k, c, a) VALUES (0, 4, 1) USING TIMESTAMP 9")); + + Long srpRequestsBefore = CLUSTER.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("necessary_short_read").metric.shortReadProtectionRequests.getCount()); + + String select = withKeyspace("SELECT * FROM %s.necessary_short_read WHERE k = 0 AND a = 1 LIMIT 1"); + Object[][] initialRows = CLUSTER.coordinator(1).execute(select, ConsistencyLevel.ALL); + assertRows(initialRows, row(0, 4, 1)); + + Long srpRequestsAfter = CLUSTER.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("necessary_short_read").metric.shortReadProtectionRequests.getCount()); + assertEquals(srpRequestsBefore + 2L, srpRequestsAfter.longValue()); + } + @Test public void testShortReadWithStaticColumn() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org