This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 779bae02c8 Add LIMIT and PER PARTITION LIMIT to the AST SingleNodeTableWalkTest 779bae02c8 is described below commit 779bae02c8c9902c36ac0fca47610c5704abb357 Author: David Capwell <dcapw...@apache.org> AuthorDate: Mon Mar 17 14:07:00 2025 -0700 Add LIMIT and PER PARTITION LIMIT to the AST SingleNodeTableWalkTest patch by David Capwell; reviewed by Abe Ratnofsky for CASSANDRA-20399 --- .../db/compaction/AbstractCompactionStrategy.java | 4 +- .../db/compaction/LeveledCompactionStrategy.java | 6 +- .../SizeTieredCompactionStrategyOptions.java | 6 +- .../db/compaction/unified/Controller.java | 18 +- .../test/cql3/SingleNodeTableWalkTest.java | 7 +- .../distributed/test/cql3/StatefulASTBase.java | 53 ++- .../cassandra/harry/model/ASTSingleTableModel.java | 39 ++- .../harry/model/ASTSingleTableModelTest.java | 102 +++++- .../unit/org/apache/cassandra/cql3/ast/Select.java | 58 +++- .../cassandra/utils/CassandraGenerators.java | 361 ++++++++++++++++++++- 10 files changed, 615 insertions(+), 39 deletions(-) diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 9d189b9975..99a509c5f2 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -72,10 +72,10 @@ public abstract class AbstractCompactionStrategy protected static final boolean DEFAULT_UNCHECKED_TOMBSTONE_COMPACTION_OPTION = false; protected static final boolean DEFAULT_LOG_ALL_OPTION = false; - protected static final String TOMBSTONE_THRESHOLD_OPTION = "tombstone_threshold"; + public static final String TOMBSTONE_THRESHOLD_OPTION = "tombstone_threshold"; protected static final String TOMBSTONE_COMPACTION_INTERVAL_OPTION = "tombstone_compaction_interval"; // disable range overlap check when deciding if an SSTable is candidate for tombstone compaction (CASSANDRA-6563) - protected static final String UNCHECKED_TOMBSTONE_COMPACTION_OPTION = "unchecked_tombstone_compaction"; + public static final String UNCHECKED_TOMBSTONE_COMPACTION_OPTION = "unchecked_tombstone_compaction"; protected static final String LOG_ALL_OPTION = "log_all"; protected static final String COMPACTION_ENABLED = "enabled"; public static final String ONLY_PURGE_REPAIRED_TOMBSTONES = "only_purge_repaired_tombstones"; diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index ff90219224..a68efa120c 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -47,10 +47,10 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.TOLERATE_S public class LeveledCompactionStrategy extends AbstractCompactionStrategy { private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategy.class); - private static final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb"; + public static final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb"; private static final boolean tolerateSstableSize = TOLERATE_SSTABLE_SIZE.getBoolean(); - private static final String LEVEL_FANOUT_SIZE_OPTION = "fanout_size"; - private static final String SINGLE_SSTABLE_UPLEVEL_OPTION = "single_sstable_uplevel"; + public static final String LEVEL_FANOUT_SIZE_OPTION = "fanout_size"; + public static final String SINGLE_SSTABLE_UPLEVEL_OPTION = "single_sstable_uplevel"; public static final int DEFAULT_LEVEL_FANOUT_SIZE = 10; @VisibleForTesting diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java index eb1d8f97af..f95a19bc02 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java @@ -26,9 +26,9 @@ public final class SizeTieredCompactionStrategyOptions protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L; protected static final double DEFAULT_BUCKET_LOW = 0.5; protected static final double DEFAULT_BUCKET_HIGH = 1.5; - protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size"; - protected static final String BUCKET_LOW_KEY = "bucket_low"; - protected static final String BUCKET_HIGH_KEY = "bucket_high"; + public static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size"; + public static final String BUCKET_LOW_KEY = "bucket_low"; + public static final String BUCKET_HIGH_KEY = "bucket_high"; protected long minSSTableSize; protected double bucketLow; diff --git a/src/java/org/apache/cassandra/db/compaction/unified/Controller.java b/src/java/org/apache/cassandra/db/compaction/unified/Controller.java index df2a66590c..28684aa596 100644 --- a/src/java/org/apache/cassandra/db/compaction/unified/Controller.java +++ b/src/java/org/apache/cassandra/db/compaction/unified/Controller.java @@ -50,7 +50,7 @@ public class Controller * The scaling parameters W, one per bucket index and separated by a comma. * Higher indexes will use the value of the last index with a W specified. */ - final static String SCALING_PARAMETERS_OPTION = "scaling_parameters"; + public final static String SCALING_PARAMETERS_OPTION = "scaling_parameters"; private final static String DEFAULT_SCALING_PARAMETERS = CassandraRelevantProperties.UCS_SCALING_PARAMETER.getString(); @@ -58,7 +58,7 @@ public class Controller * The minimum sstable size. Sharded writers split sstables over shard only if they are at least as large as the * minimum size. */ - static final String MIN_SSTABLE_SIZE_OPTION = "min_sstable_size"; + public static final String MIN_SSTABLE_SIZE_OPTION = "min_sstable_size"; private static final String DEFAULT_MIN_SSTABLE_SIZE = CassandraRelevantProperties.UCS_MIN_SSTABLE_SIZE.getString(); @@ -66,9 +66,9 @@ public class Controller * Override for the flush size in MB. The database should be able to calculate this from executing flushes, this * should only be necessary in rare cases. */ - static final String FLUSH_SIZE_OVERRIDE_OPTION = "flush_size_override"; + public static final String FLUSH_SIZE_OVERRIDE_OPTION = "flush_size_override"; - static final String BASE_SHARD_COUNT_OPTION = "base_shard_count"; + public static final String BASE_SHARD_COUNT_OPTION = "base_shard_count"; /** * Default base shard count, used when a base count is not explicitly supplied. This value applies as long as the * table is not a system one, and directories are not defined. @@ -79,10 +79,10 @@ public class Controller public static final int DEFAULT_BASE_SHARD_COUNT = CassandraRelevantProperties.UCS_BASE_SHARD_COUNT.getInt(); - static final String TARGET_SSTABLE_SIZE_OPTION = "target_sstable_size"; + public static final String TARGET_SSTABLE_SIZE_OPTION = "target_sstable_size"; public static final long DEFAULT_TARGET_SSTABLE_SIZE = CassandraRelevantProperties.UCS_TARGET_SSTABLE_SIZE.getSizeInBytes(); - static final long MIN_TARGET_SSTABLE_SIZE = 1L << 20; + public static final long MIN_TARGET_SSTABLE_SIZE = 1L << 20; /** * Provision for growth of the constructed SSTables as the size of the data grows. By default, the target SSTable @@ -109,7 +109,7 @@ public class Controller * base count of 4, the number of SSTables will be 4 (~256GiB each) for a growth value of 1, 128 (~8GiB each) for * a growth value of 0.333, and 64 (~16GiB each) for a growth value of 0.5. */ - static final String SSTABLE_GROWTH_OPTION = "sstable_growth"; + public static final String SSTABLE_GROWTH_OPTION = "sstable_growth"; private static final double DEFAULT_SSTABLE_GROWTH = CassandraRelevantProperties.UCS_SSTABLE_GROWTH.getDouble(); /** @@ -127,7 +127,7 @@ public class Controller * * If the fanout factor is larger than the maximum number of sstables, the strategy will ignore the latter. */ - static final String MAX_SSTABLES_TO_COMPACT_OPTION = "max_sstables_to_compact"; + public static final String MAX_SSTABLES_TO_COMPACT_OPTION = "max_sstables_to_compact"; static final String ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_OPTION = "unsafe_aggressive_sstable_expiration"; static final boolean ALLOW_UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION = @@ -147,7 +147,7 @@ public class Controller * that overlap with participating (LCS-like, higher concurrency during upgrades but some double compaction), * TRANSITIVE to include overlaps of overlaps (likely to trigger whole level compactions, safest). */ - static final String OVERLAP_INCLUSION_METHOD_OPTION = "overlap_inclusion_method"; + public static final String OVERLAP_INCLUSION_METHOD_OPTION = "overlap_inclusion_method"; static final Overlaps.InclusionMethod DEFAULT_OVERLAP_INCLUSION_METHOD = CassandraRelevantProperties.UCS_OVERLAP_INCLUSION_METHOD.getEnum(Overlaps.InclusionMethod.TRANSITIVE); diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java index c18d6cfecf..2ba02ae769 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java @@ -130,7 +130,7 @@ public class SingleNodeTableWalkTest extends StatefulASTBase } } Select select = builder.build(); - return state.command(rs, select, (wholePartition ? "Whole Partition" : "Single Row")); + return state.command(rs, select, (wholePartition ? "By Partition Key" : "By Primary Key")); } public Property.Command<State, Void, ?> selectToken(RandomSource rs, State state) @@ -357,7 +357,10 @@ public class SingleNodeTableWalkTest extends StatefulASTBase //TODO (coverage): partition is defined at the cluster level, so have to hard code in this model as the table is changed rather than cluster being recreated... this limits coverage return toGen(new TableMetadataBuilder() .withTableKinds(TableMetadata.Kind.REGULAR) - .withKnownMemtables() + .withParams(b -> b.withKnownMemtables() + .withCaching() + .withCompaction() + .withCompression()) .withKeyspaceName(ks).withTableName("tbl") .withSimpleColumnNames() .withDefaultTypeGen(supportedTypes()) diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java index ac5df71ddf..ec530b97fd 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java @@ -99,8 +99,7 @@ public class StatefulASTBase extends TestBaseImpl */ protected static boolean CQL_DEBUG_APPLY_OPERATOR = false; - protected static final Gen<Gen<Boolean>> BIND_OR_LITERAL_DISTRO = Gens.bools().mixedDistribution(); - protected static final Gen<Gen<Boolean>> BETWEEN_EQ_DISTRO = Gens.bools().mixedDistribution(); + protected static final Gen<Gen<Boolean>> BOOL_DISTRIBUTION = Gens.bools().mixedDistribution(); protected static final Gen<Gen<Conditional.Where.Inequality>> LESS_THAN_DISTRO = Gens.mixedDistribution(Stream.of(Conditional.Where.Inequality.values()) .filter(i -> i == Conditional.Where.Inequality.LESS_THAN || i == Conditional.Where.Inequality.LESS_THAN_EQ) .collect(Collectors.toList())); @@ -111,6 +110,7 @@ public class StatefulASTBase extends TestBaseImpl .filter(i -> i != Conditional.Where.Inequality.EQUAL && i != Conditional.Where.Inequality.NOT_EQUAL) .collect(Collectors.toList())); protected static final Gen<Gen.IntGen> FETCH_SIZE_DISTRO = Gens.mixedDistribution(new int[] {1, 10, 100, 1000, 5000}); + protected static final Gen<Gen.IntGen> LIMIT_DISTRO = Gens.mixedDistribution(1, 1001); static { @@ -196,6 +196,8 @@ public class StatefulASTBase extends TestBaseImpl protected final Session session; protected final Gen<Boolean> bindOrLiteralGen; protected final Gen<Boolean> betweenEqGen; + protected final Gen<Boolean> useFetchSizeGen, usePerPartitionLimitGen, useLimitGen; + protected final Gen.IntGen perPartitionLimitGen, limitGen; protected final Gen<Conditional.Where.Inequality> lessThanGen; protected final Gen<Conditional.Where.Inequality> greaterThanGen; protected final Gen<Conditional.Where.Inequality> rangeInequalityGen; @@ -221,12 +223,17 @@ public class StatefulASTBase extends TestBaseImpl this.debug = CQL_DEBUG_APPLY_OPERATOR ? CompositeVisitor.of(StandardVisitors.APPLY_OPERATOR, StandardVisitors.DEBUG) : StandardVisitors.DEBUG; - this.bindOrLiteralGen = BIND_OR_LITERAL_DISTRO.next(rs); - this.betweenEqGen = BETWEEN_EQ_DISTRO.next(rs); + this.bindOrLiteralGen = BOOL_DISTRIBUTION.next(rs); + this.betweenEqGen = BOOL_DISTRIBUTION.next(rs); this.lessThanGen = LESS_THAN_DISTRO.next(rs); this.greaterThanGen = GREATER_THAN_DISTRO.next(rs); this.rangeInequalityGen = RANGE_INEQUALITY_DISTRO.next(rs); this.fetchSizeGen = FETCH_SIZE_DISTRO.next(rs); + this.useFetchSizeGen = BOOL_DISTRIBUTION.next(rs); + this.usePerPartitionLimitGen = BOOL_DISTRIBUTION.next(rs); + this.useLimitGen = BOOL_DISTRIBUTION.next(rs); + this.perPartitionLimitGen = LIMIT_DISTRO.next(rs); + this.limitGen = LIMIT_DISTRO.next(rs); this.enoughMemtables = rs.pickInt(3, 10, 50); this.enoughSSTables = rs.pickInt(3, 10, 50); @@ -260,18 +267,50 @@ public class StatefulASTBase extends TestBaseImpl return command(rs, select, null); } + protected boolean allowLimit(Select select) + { + //TODO (coverage): allow this in the model! + // LIMIT with IN clause on partition columns is non-deterministic which is not currently supported by the model + if (select.where.isEmpty()) return true; + return !select.where.get() + .streamRecursive(true) + .filter(e -> e instanceof Conditional.In) + .anyMatch(e -> { + var in = (Conditional.In) e; + // when expression is size 1, then this is deterministic + if (in.expressions.size() == 1) return false; + return model.factory.partitionColumns.contains(in.ref); + }); + } + + protected boolean allowPerPartitionLimit(Select select) + { + return true; + } + + protected boolean allowPaging(Select select) + { + return true; + } + protected <S extends BaseState> Property.Command<S, Void, ?> command(RandomSource rs, Select select, @Nullable String annotate) { var inst = selectInstance(rs); - //TODO (coverage): don't limit this to all selects, only those doing range queries! - int fetchSize = fetchSizeGen.nextInt(rs); + if (allowPerPartitionLimit(select) && usePerPartitionLimitGen.next(rs)) + select = select.withPerPartitionLimit(perPartitionLimitGen.nextInt(rs)); + if (allowLimit(select) && useLimitGen.next(rs)) + select = select.withLimit(limitGen.nextInt(rs)); + int fetchSize = allowPaging(select) && useFetchSizeGen.next(rs) + ? fetchSizeGen.nextInt(rs) + : Integer.MAX_VALUE; String postfix = "on " + inst; if (fetchSize != Integer.MAX_VALUE) postfix += ", fetch size " + fetchSize; if (annotate == null) annotate = postfix; else annotate += ", " + postfix; + Select finalSelect = select; return new Property.SimpleCommand<>(humanReadable(select, annotate), s -> { - s.model.validate(s.executeQuery(inst, fetchSize, s.selectCl(), select), select); + s.model.validate(s.executeQuery(inst, fetchSize, s.selectCl(), finalSelect), finalSelect); }); } diff --git a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java index 13180c061b..4c0a5f5cf5 100644 --- a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java +++ b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.function.IntFunction; @@ -52,9 +53,11 @@ import org.apache.cassandra.cql3.ast.Mutation; import org.apache.cassandra.cql3.ast.Select; import org.apache.cassandra.cql3.ast.StandardVisitors; import org.apache.cassandra.cql3.ast.Symbol; +import org.apache.cassandra.cql3.ast.Value; import org.apache.cassandra.db.BufferClustering; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.dht.Token; import org.apache.cassandra.harry.model.BytesPartitionState.PrimaryKey; import org.apache.cassandra.harry.util.StringUtils; @@ -572,7 +575,7 @@ public class ASTSingleTableModel private SelectResult getRowsAsByteBuffer(Select select) { if (select.where.isEmpty()) - return all(); + return new SelectResult(getRowsAsByteBuffer(applyLimits(all(), select.perPartitionLimit, select.limit)), false); LookupContext ctx = context(select); List<PrimaryKey> primaryKeys; if (ctx.unmatchable) @@ -598,11 +601,41 @@ public class ASTSingleTableModel // partial tested (handles many columns, tests are single column) primaryKeys = search(ctx); } + primaryKeys = applyLimits(primaryKeys, select.perPartitionLimit, select.limit); //TODO (correctness): now that we have the rows we need to handle the selections/aggregation/limit/group-by/etc. return new SelectResult(getRowsAsByteBuffer(primaryKeys), ctx.unordered); } - private SelectResult all() + private List<PrimaryKey> applyLimits(List<PrimaryKey> primaryKeys, Optional<Value> perPartitionLimitOpt, Optional<Value> limitOpt) + { + if (perPartitionLimitOpt.isPresent()) + { + int limit = Int32Type.instance.compose(eval(perPartitionLimitOpt.get())); + var it = primaryKeys.iterator(); + BytesPartitionState.Ref current = null; + int count = 0; + while (it.hasNext()) + { + PrimaryKey next = it.next(); + if (current == null || !current.equals(next.partition)) + { + current = next.partition; + count = 0; + } + if (++count > limit) + it.remove(); + } + } + if (limitOpt.isPresent()) + { + int limit = Int32Type.instance.compose(eval(limitOpt.get())); + if (primaryKeys.size() > limit) + primaryKeys = primaryKeys.subList(0, limit); + } + return primaryKeys; + } + + private List<PrimaryKey> all() { List<PrimaryKey> primaryKeys = new ArrayList<>(); for (var partition : partitions.values()) @@ -610,7 +643,7 @@ public class ASTSingleTableModel if (partition.staticOnly()) primaryKeys.add(partition.partitionRowRef()); else partition.rows().stream().map(BytesPartitionState.Row::ref).forEach(primaryKeys::add); } - return new SelectResult(getRowsAsByteBuffer(primaryKeys), false); + return primaryKeys; } public ByteBuffer[][] getRowsAsByteBuffer(List<PrimaryKey> primaryKeys) diff --git a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModelTest.java b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModelTest.java index af0aa22de9..16f4d66819 100644 --- a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModelTest.java +++ b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModelTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.TreeMap; import java.util.function.BooleanSupplier; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -83,8 +84,36 @@ public class ASTSingleTableModelTest Select.Builder builder = Select.builder().table(metadata); for (var pk : metadata.partitionKeyColumns()) builder.value(new Symbol(pk), ZERO); - Select select = builder.build(); - model.validate(expected, select); + + model.validate(expected, builder.build()); + } + } + + @Test + public void singlePartitionLimit() + { + for (TableMetadata metadata : defaultTables()) + { + if (metadata.clusteringColumns().isEmpty()) continue; + ASTSingleTableModel model = new ASTSingleTableModel(metadata); + Function<ByteBuffer, ColumnValue> update = bb -> partitionLevelUpdate(ZERO, bb); + ByteBuffer[][] expected = new ByteBuffer[][]{ + insert(model, ZERO), + insert(model, update.apply(ONE)), + insert(model, update.apply(TWO)), + insert(model, update.apply(THREE)), + }; + + Select.Builder builder = Select.builder().table(metadata); + for (var pk : metadata.partitionKeyColumns()) + builder.value(new Symbol(pk), ZERO); + // without limit + model.validate(expected, builder.build()); + for (int limit = 1; limit <= expected.length; limit++) + { + builder.limit(limit); + model.validate(Arrays.copyOf(expected, limit), builder.build()); + } } } @@ -328,7 +357,38 @@ public class ASTSingleTableModelTest modelModel.add(insert(model, value)); } - model.validate(modelModel.all(), Select.builder(metadata).build()); + var builder = Select.builder(metadata); + ByteBuffer[][] all = modelModel.all(); + model.validate(all, builder.build()); + for (int i = 1; i < all.length; i++) + model.validate(Arrays.copyOf(all, i), builder.limit(i).build()); + model.validate(all, builder.limit(all.length).build()); + model.validate(all, builder.limit(all.length + 1).build()); + } + } + + @Test + public void selectStarPerPartitionLimit() + { + List<ByteBuffer> values = Arrays.asList(ZERO, ONE, TWO, THREE); + for (TableMetadata metadata : defaultTables()) + { + if (metadata.clusteringColumns().isEmpty()) continue; + + ASTSingleTableModel model = new ASTSingleTableModel(metadata); + ModelModel modelModel = new ModelModel(model); + for (ByteBuffer pk : values) + { + for (ByteBuffer row : values) + modelModel.add(insert(model, partitionLevelUpdate(pk, row))); + } + + var builder = Select.builder(metadata); + model.validate(modelModel.all(), builder.build()); + for (int i = 1; i < values.size(); i++) + model.validate(modelModel.allPerPartitionLimit(i), builder.perPartitionLimit(i).build()); + + model.validate(modelModel.all(), builder.perPartitionLimit(values.size()).build()); } } @@ -522,6 +582,20 @@ public class ASTSingleTableModelTest return insert(model, (i1, i2) -> value); } + private static ColumnValue partitionLevelUpdate(ByteBuffer partitionLevel, ByteBuffer rowLevel) + { + return (kind, offset) -> { + switch (kind) + { + case PARTITION_KEY: + case STATIC: + return partitionLevel; + default: + return rowLevel; + } + }; + } + private static ByteBuffer[] insert(ASTSingleTableModel model, ColumnValue fn) { TableMetadata metadata = model.factory.metadata; @@ -617,6 +691,28 @@ public class ASTSingleTableModelTest return allWhere(i -> true); } + public ByteBuffer[][] allPerPartitionLimit(int limit) + { + class State + { + BytesPartitionState.Ref current = null; + int count = 0; + boolean nextPartition(BytesPartitionState.Ref ref) + { + current = ref; + count = 0; + return true; + } + + boolean nextRow(ByteBuffer[] row) + { + return ++count <= limit; + } + } + State state = new State(); + return allWhere(state::nextPartition, state::nextRow); + } + public ByteBuffer[][] allEq(Symbol column, ByteBuffer value) { return allWhere(column, Inequality.EQUAL, value); diff --git a/test/unit/org/apache/cassandra/cql3/ast/Select.java b/test/unit/org/apache/cassandra/cql3/ast/Select.java index 55f984df91..7fa497e16c 100644 --- a/test/unit/org/apache/cassandra/cql3/ast/Select.java +++ b/test/unit/org/apache/cassandra/cql3/ast/Select.java @@ -39,6 +39,7 @@ FROM [keyspace_name.] table_name [AND clustering_filters [AND static_filters]]] [ORDER BY PK_column_name ASC|DESC] +[PER PARTITION LIMIT N] [LIMIT N] [ALLOW FILTERING] */ @@ -49,7 +50,7 @@ FROM [keyspace_name.] table_name // where public final Optional<Conditional> where; public final Optional<OrderBy> orderBy; - public final Optional<Value> limit; + public final Optional<Value> perPartitionLimit, limit; public final boolean allowFiltering; public Select(List<Expression> selections) @@ -59,15 +60,16 @@ FROM [keyspace_name.] table_name public Select(List<Expression> selections, Optional<TableReference> source, Optional<Conditional> where, Optional<OrderBy> orderBy, Optional<Value> limit) { - this(selections, source, where, orderBy, limit, false); + this(selections, source, where, orderBy, Optional.empty(), limit, false); } - public Select(List<Expression> selections, Optional<TableReference> source, Optional<Conditional> where, Optional<OrderBy> orderBy, Optional<Value> limit, boolean allowFiltering) + public Select(List<Expression> selections, Optional<TableReference> source, Optional<Conditional> where, Optional<OrderBy> orderBy, Optional<Value> perPartitionLimit, Optional<Value> limit, boolean allowFiltering) { this.selections = selections; this.source = source; this.where = where; this.orderBy = orderBy; + this.perPartitionLimit = perPartitionLimit; this.limit = limit; this.allowFiltering = allowFiltering; @@ -96,7 +98,17 @@ FROM [keyspace_name.] table_name public Select withAllowFiltering() { - return new Select(selections, source, where, orderBy, limit, true); + return new Select(selections, source, where, orderBy, perPartitionLimit, limit, true); + } + + public Select withLimit(int limit) + { + return new Select(selections, source, where, orderBy, perPartitionLimit, Optional.of(Literal.of(limit)), allowFiltering); + } + + public Select withPerPartitionLimit(int perPartitionLimit) + { + return new Select(selections, source, where, orderBy, Optional.of(Literal.of(perPartitionLimit)), limit, allowFiltering); } @Override @@ -132,6 +144,12 @@ FROM [keyspace_name.] table_name sb.append("ORDER BY "); orderBy.get().toCQL(sb, formatter); } + if (perPartitionLimit.isPresent()) + { + formatter.section(sb); + sb.append("PER PARTITION LIMIT "); + perPartitionLimit.get().toCQL(sb, formatter); + } if (limit.isPresent()) { formatter.section(sb); @@ -154,6 +172,7 @@ FROM [keyspace_name.] table_name + (source.isPresent() ? 1 : 0) + (where.isPresent() ? 1 : 0) + (orderBy.isPresent() ? 1 : 0) + + (perPartitionLimit.isPresent() ? 1 : 0) + (limit.isPresent() ? 1 : 0)); es.addAll(selections); if (source.isPresent()) @@ -162,6 +181,8 @@ FROM [keyspace_name.] table_name es.add(where.get()); if (orderBy.isPresent()) es.add(orderBy.get()); + if (perPartitionLimit.isPresent()) + es.add(perPartitionLimit.get()); if (limit.isPresent()) es.add(limit.get()); return es.stream(); @@ -204,6 +225,18 @@ FROM [keyspace_name.] table_name { where = this.where; } + Optional<Value> perPartitionLimit; + if (this.perPartitionLimit.isPresent()) + { + var l = this.perPartitionLimit.get(); + var update = l.visit(v); + updated |= l != update; + perPartitionLimit = Optional.ofNullable(update); + } + else + { + perPartitionLimit = this.perPartitionLimit; + } Optional<Value> limit; if (this.limit.isPresent()) { @@ -217,9 +250,10 @@ FROM [keyspace_name.] table_name limit = this.limit; } if (!updated) return this; - return new Select(selections, source, where, orderBy, limit, allowFiltering); + return new Select(selections, source, where, orderBy, perPartitionLimit, limit, allowFiltering); } + public static class OrderBy implements Element { public enum Ordering @@ -313,6 +347,7 @@ FROM [keyspace_name.] table_name protected Optional<TableReference> source = Optional.empty(); private Conditional.Builder where = new Conditional.Builder(); private OrderBy.Builder orderBy = new OrderBy.Builder(); + private Optional<Value> perPartitionLimit = Optional.empty(); private Optional<Value> limit = Optional.empty(); private boolean allowFiltering = false; @@ -377,6 +412,17 @@ FROM [keyspace_name.] table_name return (T) this; } + public T perPartitionLimit(Value limit) + { + this.perPartitionLimit = Optional.of(limit); + return (T) this; + } + + public T perPartitionLimit(int limit) + { + return perPartitionLimit(Bind.of(limit)); + } + public T limit(Value limit) { this.limit = Optional.of(limit); @@ -394,7 +440,7 @@ FROM [keyspace_name.] table_name source, where.isEmpty() ? Optional.empty() : Optional.of(where.build()), orderBy.isEmpty() ? Optional.empty() : Optional.of(orderBy.build()), - limit, + perPartitionLimit, limit, allowFiltering); } } diff --git a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java index ca2576cdcc..8867d6141e 100644 --- a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java +++ b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Deque; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -51,6 +52,7 @@ import com.google.common.collect.Sets; import org.apache.commons.lang3.builder.MultilineRecursiveToStringStyle; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; +import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.Duration; @@ -60,6 +62,12 @@ import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.SchemaCQLHelper; import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; +import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; +import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy; +import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategyOptions; +import org.apache.cassandra.db.compaction.UnifiedCompactionStrategy; +import org.apache.cassandra.db.compaction.unified.Controller; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.ByteBufferAccessor; import org.apache.cassandra.db.marshal.CompositeType; @@ -82,6 +90,8 @@ import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.HeartBeatState; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.compress.ZstdCompressor; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.InetAddressAndPort; @@ -94,7 +104,10 @@ import org.apache.cassandra.net.Message; import org.apache.cassandra.net.NoPayload; import org.apache.cassandra.net.PingRequest; import org.apache.cassandra.net.Verb; +import org.apache.cassandra.schema.CachingParams; import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.CompactionParams; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.MemtableParams; @@ -196,6 +209,71 @@ public final class CassandraGenerators } + private static String humanReadableSignPrefix(RandomnessSource rnd) + { + switch (SourceDSL.integers().between(0, 2).generate(rnd)) + { + case 0: return ""; + case 1: return "-"; + case 2: return "+"; + default: + throw new AssertionError(); + } + } + + public static Gen<String> humanReadableStorageValue() + { + Gen<Long> valueGen = SourceDSL.longs().between(0, 1000); + return rnd -> { + // [+-]?\d+(\.\d+)?([eE]([+-]?)\d+)? + StringBuilder sb = new StringBuilder(); + sb.append(humanReadableSignPrefix(rnd)); + sb.append(valueGen.generate(rnd)); + if (nextBoolean(rnd)) + { + sb.append('.'); + sb.append(valueGen.generate(rnd)); + } + if (nextBoolean(rnd)) + { + sb.append('E'); + sb.append(humanReadableSignPrefix(rnd)); + sb.append(valueGen.generate(rnd)); + } + return sb.toString(); + }; + } + + public static Gen<String> humanReadableStorage() + { + Gen<DataStorageSpec.DataStorageUnit> unitGen = SourceDSL.arbitrary().enumValues(DataStorageSpec.DataStorageUnit.class); + return rnd -> { + DataStorageSpec.DataStorageUnit unit = unitGen.generate(rnd); + String value; + switch (SourceDSL.integers().between(0, 2).generate(rnd)) + { + case 0: + value = "NaN"; + break; + case 1: + value = humanReadableSignPrefix(rnd) + "Infinity"; + break; + case 2: + value = humanReadableStorageValue().generate(rnd); + break; + default: + throw new AssertionError(); + } + return value + ' ' + unit.getSymbol(); + }; + } + + public static Gen<String> humanReadableStorageSimple() + { + Gen<DataStorageSpec.DataStorageUnit> unitGen = SourceDSL.arbitrary().enumValues(DataStorageSpec.DataStorageUnit.class); + return rnd -> humanReadableStorageValue().generate(rnd) + ' ' + unitGen.generate(rnd).getSymbol(); + } + public static Set<UserType> extractUDTs(TableMetadata metadata) { Set<UserType> matches = new HashSet<>(); @@ -413,10 +491,261 @@ public final class CassandraGenerators } } + public static Gen<CachingParams> cachingParamsGen() + { + return rnd -> { + boolean cacheKeys = nextBoolean(rnd); + int rowsPerPartitionToCache; + switch (SourceDSL.integers().between(1, 3).generate(rnd)) + { + case 1: // ALL + rowsPerPartitionToCache = Integer.MAX_VALUE; + break; + case 2: // NONE + rowsPerPartitionToCache = 0; + break; + case 3: // num values + rowsPerPartitionToCache = Math.toIntExact(rnd.next(Constraint.between(1, Integer.MAX_VALUE - 1))); + break; + default: + throw new AssertionError(); + } + return new CachingParams(cacheKeys, rowsPerPartitionToCache); + }; + } + + public enum KnownCompactionAlgo + { + SizeTiered(SizeTieredCompactionStrategy.class), + Leveled(LeveledCompactionStrategy.class), + Unified(UnifiedCompactionStrategy.class); + private final Class<? extends AbstractCompactionStrategy> klass; + + KnownCompactionAlgo(Class<? extends AbstractCompactionStrategy> klass) + { + this.klass = klass; + } + } + + public static class CompactionParamsBuilder + { + private Gen<KnownCompactionAlgo> algoGen = SourceDSL.arbitrary().enumValues(KnownCompactionAlgo.class); + private Gen<CompactionParams.TombstoneOption> tombstoneOptionGen = SourceDSL.arbitrary().enumValues(CompactionParams.TombstoneOption.class); + private Gen<Map<String, String>> sizeTieredOptions = rnd -> { + if (nextBoolean(rnd)) return Map.of(); + Map<String, String> options = new HashMap<>(); + if (nextBoolean(rnd)) + // computes mb then converts to bytes + options.put(SizeTieredCompactionStrategyOptions.MIN_SSTABLE_SIZE_KEY, Long.toString(SourceDSL.longs().between(1, 100).generate(rnd) * 1024L * 1024L)); + if (nextBoolean(rnd)) + options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, Double.toString(SourceDSL.doubles().between(0.1, 0.9).generate(rnd))); + if (nextBoolean(rnd)) + options.put(SizeTieredCompactionStrategyOptions.BUCKET_HIGH_KEY, Double.toString(SourceDSL.doubles().between(1.1, 1.9).generate(rnd))); + return options; + }; + private Gen<Map<String, String>> leveledOptions = rnd -> { + if (nextBoolean(rnd)) return Map.of(); + Map<String, String> options = new HashMap<>(); + if (nextBoolean(rnd)) + options.putAll(sizeTieredOptions.generate(rnd)); + if (nextBoolean(rnd)) + // size in mb + options.put(LeveledCompactionStrategy.SSTABLE_SIZE_OPTION, SourceDSL.integers().between(1, 2_000).generate(rnd).toString()); + if (nextBoolean(rnd)) + options.put(LeveledCompactionStrategy.LEVEL_FANOUT_SIZE_OPTION, SourceDSL.integers().between(1, 100).generate(rnd).toString()); + if (nextBoolean(rnd)) + options.put(LeveledCompactionStrategy.SINGLE_SSTABLE_UPLEVEL_OPTION, nextBoolean(rnd).toString()); + return options; + }; + private Gen<Map<String, String>> unifiedOptions = rnd -> { + if (nextBoolean(rnd)) return Map.of(); + Gen<String> storageSizeGen = Generators.filter(humanReadableStorageSimple(), s -> Controller.MIN_TARGET_SSTABLE_SIZE <= FBUtilities.parseHumanReadableBytes(s)); + Map<String, String> options = new HashMap<>(); + if (nextBoolean(rnd)) + options.put(Controller.BASE_SHARD_COUNT_OPTION, SourceDSL.integers().between(1, 10).generate(rnd).toString()); + if (nextBoolean(rnd)) + options.put(Controller.FLUSH_SIZE_OVERRIDE_OPTION, storageSizeGen.generate(rnd)); + if (nextBoolean(rnd)) + options.put(Controller.MAX_SSTABLES_TO_COMPACT_OPTION, SourceDSL.integers().between(0, 32).generate(rnd).toString()); + if (nextBoolean(rnd)) + options.put(Controller.SSTABLE_GROWTH_OPTION, SourceDSL.integers().between(0, 100).generate(rnd) + "%"); + if (nextBoolean(rnd)) + options.put(Controller.OVERLAP_INCLUSION_METHOD_OPTION, SourceDSL.arbitrary().enumValues(Overlaps.InclusionMethod.class).generate(rnd).name()); + if (nextBoolean(rnd)) + { + int numLevels = SourceDSL.integers().between(1, 10).generate(rnd); + String[] scalingParams = new String[numLevels]; + Gen<Integer> levelSize = SourceDSL.integers().between(2, 10); + for (int i = 0; i < numLevels; i++) + { + String value; + switch (SourceDSL.integers().between(0, 3).generate(rnd)) + { + case 0: + value = "N"; + break; + case 1: + value = "L" + levelSize.generate(rnd); + break; + case 2: + value = "T" + levelSize.generate(rnd); + break; + case 3: + value = SourceDSL.integers().all().generate(rnd).toString(); + break; + default: + throw new AssertionError(); + } + scalingParams[i] = value; + } + options.put(Controller.SCALING_PARAMETERS_OPTION, String.join(",", scalingParams)); + } + if (nextBoolean(rnd)) + { + // Calculate TARGET then compute the MIN from that. The issue is that there is a hidden relationship + // between these 2 fields more complex than simple comparability, MIN must be < 70% * TARGET! + // See CASSANDRA-20398 + // 1MiB to 128MiB target + long targetBytes = SourceDSL.longs().between(1L << 20, 1L << 27).generate(rnd); + long limit = (long) Math.ceil(targetBytes * Math.sqrt(0.5)); + long minBytes = SourceDSL.longs().between(1, limit - 1).generate(rnd); + options.put(Controller.MIN_SSTABLE_SIZE_OPTION, minBytes + "B"); + options.put(Controller.TARGET_SSTABLE_SIZE_OPTION, targetBytes + "B"); + } + return options; + }; + //TODO (coverage): doesn't look to validate > 1, what does that even mean? + private Gen<Float> tombstoneThreshold = SourceDSL.floats().between(0, 1); + private Gen<Boolean> uncheckedTombstoneCompaction = SourceDSL.booleans().all(); + private Gen<Boolean> onlyPurgeRepairedTombstones = SourceDSL.booleans().all(); + + public Gen<CompactionParams> build() + { + return rnd -> { + KnownCompactionAlgo algo = algoGen.generate(rnd); + Map<String, String> options = new HashMap<>(); + if (nextBoolean(rnd)) + options.put(CompactionParams.Option.PROVIDE_OVERLAPPING_TOMBSTONES.toString(), tombstoneOptionGen.generate(rnd).name()); + if (CompactionParams.supportsThresholdParams(algo.klass) && nextBoolean(rnd)) + { + options.put(CompactionParams.Option.MIN_THRESHOLD.toString(), Long.toString(rnd.next(Constraint.between(2, 4)))); + options.put(CompactionParams.Option.MAX_THRESHOLD.toString(), Long.toString(rnd.next(Constraint.between(5, 32)))); + } + if (nextBoolean(rnd)) + options.put(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION, tombstoneThreshold.generate(rnd).toString()); + if (nextBoolean(rnd)) + options.put(AbstractCompactionStrategy.UNCHECKED_TOMBSTONE_COMPACTION_OPTION, uncheckedTombstoneCompaction.generate(rnd).toString()); + if (nextBoolean(rnd)) + options.put(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES, onlyPurgeRepairedTombstones.generate(rnd).toString()); + switch (algo) + { + case SizeTiered: + options.putAll(sizeTieredOptions.generate(rnd)); + break; + case Leveled: + options.putAll(leveledOptions.generate(rnd)); + break; + case Unified: + options.putAll(unifiedOptions.generate(rnd)); + break; + default: + throw new UnsupportedOperationException(algo.name()); + } + return CompactionParams.create(algo.klass, options); + }; + } + } + + private static Boolean nextBoolean(RandomnessSource rnd) + { + return SourceDSL.booleans().all().generate(rnd); + } + + public static Gen<CompactionParams> compactionParamsGen() + { + return new CompactionParamsBuilder().build(); + } + + public enum KnownCompressionAlgo + { + snappy("SnappyCompressor"), + deflate("DeflateCompressor"), + lz4("LZ4Compressor"), + zstd("ZstdCompressor"), + noop("NoopCompressor"); + + private final String compressor; + + KnownCompressionAlgo(String compressor) + { + this.compressor = compressor; + } + } + + public static class CompressionParamsBuilder + { + private Gen<Boolean> enabledGen = SourceDSL.booleans().all(); + private Gen<KnownCompressionAlgo> algoGen = SourceDSL.arbitrary().enumValues(KnownCompressionAlgo.class); + private Gen<Map<String, String>> lz4OptionsGen = rnd -> { + if (nextBoolean(rnd)) + return Map.of(); + Map<String, String> options = new HashMap<>(); + if (nextBoolean(rnd)) + options.put(LZ4Compressor.LZ4_COMPRESSOR_TYPE, nextBoolean(rnd) ? LZ4Compressor.LZ4_FAST_COMPRESSOR : LZ4Compressor.LZ4_HIGH_COMPRESSOR); + if (nextBoolean(rnd)) + options.put(LZ4Compressor.LZ4_HIGH_COMPRESSION_LEVEL, Integer.toString(Math.toIntExact(rnd.next(Constraint.between(1, 17))))); + return options; + }; + private Gen<Map<String, String>> zstdOptionsGen = rnd -> { + if (nextBoolean(rnd)) + return Map.of(); + int level = Math.toIntExact(rnd.next(Constraint.between(ZstdCompressor.FAST_COMPRESSION_LEVEL, ZstdCompressor.BEST_COMPRESSION_LEVEL))); + return Map.of(ZstdCompressor.COMPRESSION_LEVEL_OPTION_NAME, Integer.toString(level)); + }; + + public Gen<CompressionParams> build() + { + return rnd -> { + if (!enabledGen.generate(rnd)) + return CompressionParams.noCompression(); + KnownCompressionAlgo algo = algoGen.generate(rnd); + if (algo == KnownCompressionAlgo.noop) + return CompressionParams.noop(); + // when null disabled + int chunkLength = CompressionParams.DEFAULT_CHUNK_LENGTH; + double minCompressRatio = CompressionParams.DEFAULT_MIN_COMPRESS_RATIO; + Map<String, String> options; + switch (algo) + { + case lz4: + options = lz4OptionsGen.generate(rnd); + break; + case zstd: + options = zstdOptionsGen.generate(rnd); + break; + default: + options = Map.of(); + } + return new CompressionParams(algo.compressor, options, chunkLength, minCompressRatio); + }; + } + } + + public static Gen<CompressionParams> compressionParamsGen() + { + return new CompressionParamsBuilder().build(); + } + public static class TableParamsBuilder { @Nullable private Gen<String> memtableKeyGen = null; + @Nullable + private Gen<CachingParams> cachingParamsGen = null; + @Nullable + private Gen<CompactionParams> compactionParamsGen = null; + @Nullable + private Gen<CompressionParams> compressionParamsGen = null; public TableParamsBuilder withKnownMemtables() { @@ -427,12 +756,36 @@ public final class CassandraGenerators return this; } + public TableParamsBuilder withCaching() + { + cachingParamsGen = cachingParamsGen(); + return this; + } + + public TableParamsBuilder withCompaction() + { + compactionParamsGen = compactionParamsGen(); + return this; + } + + public TableParamsBuilder withCompression() + { + compressionParamsGen = compressionParamsGen(); + return this; + } + public Gen<TableParams> build() { return rnd -> { TableParams.Builder params = TableParams.builder(); if (memtableKeyGen != null) params.memtable(MemtableParams.get(memtableKeyGen.generate(rnd))); + if (cachingParamsGen != null) + params.caching(cachingParamsGen.generate(rnd)); + if (compactionParamsGen != null) + params.compaction(compactionParamsGen.generate(rnd)); + if (compressionParamsGen != null) + params.compression(compressionParamsGen.generate(rnd)); return params.build(); }; } @@ -515,6 +868,12 @@ public final class CassandraGenerators return this; } + public TableMetadataBuilder withParams(Consumer<TableParamsBuilder> fn) + { + fn.accept(paramsBuilder); + return this; + } + public TableMetadataBuilder withKeyspaceName(Gen<String> ksNameGen) { this.ksNameGen = ksNameGen; @@ -1358,7 +1717,7 @@ public final class CassandraGenerators public static Gen<Epoch> epochs() { return rnd -> { - if (SourceDSL.booleans().all().generate(rnd)) + if (nextBoolean(rnd)) { switch (SourceDSL.arbitrary().enumValues(EpochConstants.class).generate(rnd)) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org