This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 25390eb8d5 Log queries scanning too many SSTables per read
25390eb8d5 is described below
commit 25390eb8d54d71fcbc519e030d215f44252f0dde
Author: Alan Wang <[email protected]>
AuthorDate: Fri Dec 12 12:38:28 2025 -0800
Log queries scanning too many SSTables per read
patch by Alan Wang; reviewed by David Capwell, Marcus Eriksson for
CASSANDRA-21048
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 4 +
src/java/org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../cassandra/db/PartitionRangeReadCommand.java | 5 +
.../cassandra/db/SinglePartitionReadCommand.java | 13 ++-
.../apache/cassandra/service/StorageService.java | 11 ++
.../cassandra/service/StorageServiceMBean.java | 5 +
.../distributed/test/SSTableReadLogsQueryTest.java | 120 +++++++++++++++++++++
9 files changed, 169 insertions(+), 1 deletion(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b2df8cb44..2391d10537 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Log queries scanning too many SSTables per read (CASSANDRA-21048)
* Extend nodetool verify to (optionally) validate SAI files (CASSANDRA-20949)
* Fix CompressionDictionary being closed while still in use (CASSANDRA-21047)
* When updating a multi cell collection element, if the update is rejected
then the shared Row.Builder is not freed causing all future mutations to be
rejected (CASSANDRA-21055)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 106f5e01e2..5736890b51 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1990,6 +1990,10 @@ transparent_data_encryption_options:
# SAFETY THRESHOLDS #
#####################
+# Log the query if it reads more than
+# sstables_per_read_log_threshold SSTables
+sstables_per_read_log_threshold: 100
+
# When executing a scan, within or across a partition, we need to keep the
# tombstones seen in memory so we can return them to the coordinator, which
# will use them to make sure other replicas also know about the deleted rows.
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 986346da41..d314cb0a8d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -579,6 +579,7 @@ public class Config
public volatile DataStorageSpec.LongBytesBound
row_index_read_size_warn_threshold = null;
public volatile DataStorageSpec.LongBytesBound
row_index_read_size_fail_threshold = null;
+ public volatile int sstables_per_read_log_threshold = 100;
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 23e0aa8341..0c4eb44496 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3215,6 +3215,16 @@ public class DatabaseDescriptor
return conf.max_mutation_size.toBytes();
}
+ public static int getSSTablesPerReadLogThreshold()
+ {
+ return conf.sstables_per_read_log_threshold;
+ }
+
+ public static void setSSTablesPerReadLogThreshold(int threshold)
+ {
+ conf.sstables_per_read_log_threshold = threshold;
+ }
+
public static int getTombstoneWarnThreshold()
{
return conf.tombstone_warn_threshold;
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index f448bcf872..d49cf986cb 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -58,12 +58,14 @@ import org.apache.cassandra.service.reads.ReadCoordinator;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.NoSpamLogger;
/**
* A read command that selects a (part of a) range of partitions.
*/
public class PartitionRangeReadCommand extends ReadCommand implements
PartitionRangeReadQuery
{
+ private static final NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
protected static final SelectionDeserializer selectionDeserializer = new
Deserializer();
protected final Slices requestedSlices;
@@ -434,6 +436,9 @@ public class PartitionRangeReadCommand extends ReadCommand
implements PartitionR
final int finalSelectedSSTables = selectedSSTablesCnt;
+ if (finalSelectedSSTables >
DatabaseDescriptor.getSSTablesPerReadLogThreshold())
+ noSpamLogger.info("The following query '{}' has read {}
SSTables.", this.toCQLString(), finalSelectedSSTables);
+
// iterators can be empty for offline tools
if (inputCollector.isEmpty())
return EmptyIterators.unfilteredPartition(metadata());
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 780a8c083f..f8a9c41010 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -87,12 +87,14 @@ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.NoSpamLogger;
/**
* A read command that selects a (part of a) single partition.
*/
public class SinglePartitionReadCommand extends ReadCommand implements
SinglePartitionReadQuery
{
+ private static final NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
protected static final SelectionDeserializer selectionDeserializer = new
Deserializer();
protected static final Function<Seekable, SelectionDeserializer>
accordSelectionDeserializer = AccordDeserializer::new;
@@ -876,7 +878,13 @@ public class SinglePartitionReadCommand extends
ReadCommand implements SinglePar
StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());
List<UnfilteredRowIterator> iterators =
inputCollector.finalizeIterators(cfs, nowInSec(),
controller.oldestUnrepairedTombstone());
- return withSSTablesIterated(iterators, cfs.metric,
metricsCollector);
+
+ UnfilteredRowIterator result = withSSTablesIterated(iterators,
cfs.metric, metricsCollector);
+
+ if (metricsCollector.getMergedSSTables() >
DatabaseDescriptor.getSSTablesPerReadLogThreshold())
+ noSpamLogger.info("The following query '{}' has read {}
SSTables.", this.toCQLString(), metricsCollector.getMergedSSTables());
+
+ return result;
}
catch (RuntimeException | Error e)
{
@@ -1084,6 +1092,9 @@ public class SinglePartitionReadCommand extends
ReadCommand implements SinglePar
cfs.metric.updateSSTableIterated(metricsCollector.getMergedSSTables());
+ if (metricsCollector.getMergedSSTables() >
DatabaseDescriptor.getSSTablesPerReadLogThreshold())
+ noSpamLogger.info("The following query '{}' has read {}
SSTables.", this.toCQLString(), metricsCollector.getMergedSSTables());
+
if (result == null || result.isEmpty())
return EmptyIterators.unfilteredRow(metadata(), partitionKey(),
false);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 1ba8cbdf51..3dc5adf5a5 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4719,6 +4719,17 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
DatabaseDescriptor.setInvalidateKeycacheOnSSTableDeletion(invalidate);
}
+ public int getSSTablesPerReadLogThreshold()
+ {
+ return DatabaseDescriptor.getSSTablesPerReadLogThreshold();
+ }
+
+ public void setSSTablesPerReadLogThreshold(int threshold)
+ {
+ DatabaseDescriptor.setSSTablesPerReadLogThreshold(threshold);
+ logger.info("updated sstables_per_read_log_threshold to {}",
threshold);
+ }
+
public int getTombstoneWarnThreshold()
{
return DatabaseDescriptor.getTombstoneWarnThreshold();
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 4c18dc34db..803edf855c 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -1036,6 +1036,11 @@ public interface StorageServiceMBean extends
NotificationEmitter
/** Returns the cluster partitioner */
public String getPartitionerName();
+ /** Returns the threshold for logging queries that read more than
threshold amount of SSTables */
+ public int getSSTablesPerReadLogThreshold();
+ /** Sets the threshold for logging queries that read more than threshold
amount of SSTables */
+ public void setSSTablesPerReadLogThreshold(int threshold);
+
/** Returns the threshold for warning of queries with many tombstones */
public int getTombstoneWarnThreshold();
/** Sets the threshold for warning queries with many tombstones */
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/SSTableReadLogsQueryTest.java
b/test/distributed/org/apache/cassandra/distributed/test/SSTableReadLogsQueryTest.java
new file mode 100644
index 0000000000..1336c10e8e
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/SSTableReadLogsQueryTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+
+public class SSTableReadLogsQueryTest extends TestBaseImpl
+{
+ @Test
+ public void logQueryTest() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.build(1)
+ .start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int
PRIMARY KEY, v counter)");
+
+ cluster.get(1).runOnInstance(() -> {
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction();
+ });
+
+ for (int i = 0; i <= 100; i++)
+ {
+ cluster.get(1).executeInternal("UPDATE " + KEYSPACE + ".tbl
SET v = v + 1 WHERE pk = 2");
+ cluster.get(1).flush(withKeyspace("%s"));
+ }
+
+ cluster.get(1).runOnInstance(() -> {
+ assertEquals(101,
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getTracker().getView().liveSSTables().size());
+ });
+
+ String query = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 2";
+ cluster.get(1).executeInternalWithResult(query);
+
+ assertEquals(1, cluster.get(1).logs().watchFor("The following
query").getResult().size());
+ }
+ }
+
+ @Test
+ public void setSSTablesPerReadLogThresholdTest() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.build(1)
+ .start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int
PRIMARY KEY, v counter)");
+
+ cluster.get(1).runOnInstance(() -> {
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction();
+ });
+
+ cluster.get(1).runOnInstance(() -> {
+ DatabaseDescriptor.setSSTablesPerReadLogThreshold(25);
+ });
+
+ for (int i = 0; i <= 25; i++)
+ {
+ cluster.get(1).executeInternal("UPDATE " + KEYSPACE + ".tbl
SET v = v + 1 WHERE pk = 2");
+ cluster.get(1).flush(withKeyspace("%s"));
+ }
+
+ cluster.get(1).runOnInstance(() -> {
+ assertEquals(26,
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getTracker().getView().liveSSTables().size());
+ });
+
+ String query = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 2";
+ cluster.get(1).executeInternalWithResult(query);
+
+ assertEquals(1, cluster.get(1).logs().watchFor("The following
query").getResult().size());
+ }
+ }
+
+ @Test
+ public void logRangeReadQueryTest() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.build(1)
+ .start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int
PRIMARY KEY, v int)");
+
+ cluster.get(1).runOnInstance(() -> {
+
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").disableAutoCompaction();
+ });
+
+ for (int i = 0; i <= 100; i++)
+ {
+ cluster.get(1).executeInternal(String.format("INSERT INTO " +
KEYSPACE + ".tbl (pk, v) VALUES (%s, %s)", i, i));
+ cluster.get(1).flush(withKeyspace("%s"));
+ }
+
+ cluster.get(1).runOnInstance(() -> {
+ assertEquals(101,
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").getTracker().getView().liveSSTables().size());
+ });
+
+ String query = "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk >= 0
AND pk < 51 ALLOW FILTERING";
+ cluster.get(1).executeInternalWithResult(query);
+
+ assertEquals(1, cluster.get(1).logs().watchFor("The following
query").getResult().size());
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]