This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new b56edf2a5d Add support in CAS for -= on numeric types, and fixed improper handling of empty bytes which lead to NPE b56edf2a5d is described below commit b56edf2a5df8c320b33e38116f2742ab69f7b4fd Author: David Capwell <dcapw...@apache.org> AuthorDate: Fri Mar 28 11:48:51 2025 -0700 Add support in CAS for -= on numeric types, and fixed improper handling of empty bytes which lead to NPE patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-20477 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/cql3/Operation.java | 12 +- .../cassandra/cql3/statements/BatchStatement.java | 12 +- .../cassandra/cql3/statements/CQL3CasRequest.java | 12 +- .../cql3/statements/ModificationStatement.java | 12 +- .../org/apache/cassandra/cql3/terms/Constants.java | 64 ++-- .../cassandra/db/RegularAndStaticColumns.java | 7 + .../apache/cassandra/db/marshal/AbstractType.java | 26 +- .../org/apache/cassandra/service/CASRequest.java | 3 + .../org/apache/cassandra/service/paxos/Paxos.java | 9 +- .../org/apache/cassandra/transport/Dispatcher.java | 2 + .../test/cql3/CasMultiNodeTableWalkBase.java | 129 ++++++++ .../test/cql3/MultiNodeTableWalkBase.java | 21 +- .../cql3/MultiNodeTableWalkWithReadRepairTest.java | 4 +- .../MultiNodeTableWalkWithoutReadRepairTest.java | 4 +- ...est.java => PaxosV1MultiNodeTableWalkTest.java} | 12 +- ...est.java => PaxosV2MultiNodeTableWalkTest.java} | 12 +- .../test/cql3/SingleNodeTableWalkTest.java | 26 +- .../distributed/test/cql3/StatefulASTBase.java | 79 ++++- .../cassandra/harry/model/ASTSingleTableModel.java | 358 +++++++++++++++++++-- .../cassandra/harry/model/BytesPartitionState.java | 17 +- .../unit/org/apache/cassandra/cql3/KnownIssue.java | 4 +- .../cassandra/cql3/ast/AssignmentOperator.java | 7 +- test/unit/org/apache/cassandra/cql3/ast/Bind.java | 1 + .../apache/cassandra/cql3/ast/CasCondition.java | 20 +- .../org/apache/cassandra/cql3/ast/Conditional.java | 15 + .../cassandra/cql3/ast/ExpressionEvaluator.java | 5 + .../org/apache/cassandra/cql3/ast/Literal.java | 6 + .../org/apache/cassandra/cql3/ast/Mutation.java | 53 ++- .../org/apache/cassandra/cql3/ast/Reference.java | 5 + .../cassandra/cql3/ast/ReferenceExpression.java | 2 +- test/unit/org/apache/cassandra/cql3/ast/Value.java | 4 + .../org/apache/cassandra/cql3/ast/Visitor.java | 5 + .../org/apache/cassandra/utils/ASTGenerators.java | 17 +- 34 files changed, 864 insertions(+), 102 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 80eabfd705..361fb3cd07 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Add support in CAS for -= on numeric types, and fixed improper handling of empty bytes which lead to NPE (CASSANDRA-20477) * Do not fail to start a node with materialized views after they are turned off in config (CASSANDRA-20452) * Fix nodetool gcstats output, support human-readable units and more output formats (CASSANDRA-19022) * Various gossip to TCM upgrade fixes (CASSANDRA-20483) diff --git a/src/java/org/apache/cassandra/cql3/Operation.java b/src/java/org/apache/cassandra/cql3/Operation.java index 7a7c0e8420..7c5e02eb63 100644 --- a/src/java/org/apache/cassandra/cql3/Operation.java +++ b/src/java/org/apache/cassandra/cql3/Operation.java @@ -374,8 +374,16 @@ public abstract class Operation { if (!(receiver.type instanceof CollectionType)) { - if (!(receiver.type instanceof CounterColumnType)) - throw new InvalidRequestException(String.format("Invalid operation (%s) for non counter column %s", toString(receiver), receiver.name)); + if (canReadExistingState) + { + if (!(receiver.type instanceof NumberType<?>)) + throw new InvalidRequestException(String.format("Invalid operation (%s) for non-numeric type %s", toString(receiver), receiver.name)); + } + else + { + if (!(receiver.type instanceof CounterColumnType)) + throw new InvalidRequestException(String.format("Invalid operation (%s) for non counter column %s", toString(receiver), receiver.name)); + } return new Constants.Substracter(receiver, value.prepare(metadata.keyspace, receiver)); } else if (!(receiver.type.isMultiCell())) diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index bfec675464..6c5b121992 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -468,7 +468,7 @@ public class BatchStatement implements CQLStatement private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state, Dispatcher.RequestTime requestTime) { - Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, state); + Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, state, requestTime); CQL3CasRequest casRequest = p.left; Set<ColumnMetadata> columnsWithConditions = p.right; @@ -495,7 +495,7 @@ public class BatchStatement implements CQLStatement } } - private Pair<CQL3CasRequest,Set<ColumnMetadata>> makeCasRequest(BatchQueryOptions options, QueryState state) + private Pair<CQL3CasRequest,Set<ColumnMetadata>> makeCasRequest(BatchQueryOptions options, QueryState state, Dispatcher.RequestTime requestTime) { long batchTimestamp = options.getTimestamp(state); long nowInSeconds = options.getNowInSeconds(state); @@ -514,7 +514,7 @@ public class BatchStatement implements CQLStatement if (key == null) { key = statement.metadata().partitioner.decorateKey(pks.get(0)); - casRequest = new CQL3CasRequest(statement.metadata(), key, conditionColumns, updatesRegularRows, updatesStaticRow); + casRequest = new CQL3CasRequest(statement.metadata(), key, conditionColumns, updatesRegularRows, updatesStaticRow, requestTime); } else if (!key.getKey().equals(pks.get(0))) { @@ -570,7 +570,7 @@ public class BatchStatement implements CQLStatement BatchQueryOptions batchOptions = BatchQueryOptions.withoutPerStatementVariables(options); if (hasConditions) - return executeInternalWithConditions(batchOptions, queryState); + return executeInternalWithConditions(batchOptions, queryState, Dispatcher.RequestTime.forImmediateExecution()); executeInternalWithoutCondition(queryState, batchOptions, Dispatcher.RequestTime.forImmediateExecution()); return new ResultMessage.Void(); @@ -586,9 +586,9 @@ public class BatchStatement implements CQLStatement return null; } - private ResultMessage executeInternalWithConditions(BatchQueryOptions options, QueryState state) + private ResultMessage executeInternalWithConditions(BatchQueryOptions options, QueryState state, Dispatcher.RequestTime requestTime) { - Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, state); + Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, state, requestTime); CQL3CasRequest request = p.left; Set<ColumnMetadata> columnsWithConditions = p.right; diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java index 0d322691c6..4db98459ec 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@ -34,6 +34,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.service.CASRequest; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.paxos.Ballot; +import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.utils.TimeUUID; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -49,6 +50,7 @@ public class CQL3CasRequest implements CASRequest private final RegularAndStaticColumns conditionColumns; private final boolean updatesRegularRows; private final boolean updatesStaticRow; + private final Dispatcher.RequestTime requestTime; private boolean hasExists; // whether we have an exist or if not exist condition // Conditions on the static row. We keep it separate from 'conditions' as most things related to the static row are @@ -66,7 +68,8 @@ public class CQL3CasRequest implements CASRequest DecoratedKey key, RegularAndStaticColumns conditionColumns, boolean updatesRegularRows, - boolean updatesStaticRow) + boolean updatesStaticRow, + Dispatcher.RequestTime requestTime) { this.metadata = metadata; this.key = key; @@ -74,6 +77,13 @@ public class CQL3CasRequest implements CASRequest this.conditionColumns = conditionColumns; this.updatesRegularRows = updatesRegularRows; this.updatesStaticRow = updatesStaticRow; + this.requestTime = requestTime; + } + + @Override + public Dispatcher.RequestTime requestTime() + { + return requestTime; } void addRowUpdate(Clustering<?> clustering, ModificationStatement stmt, QueryOptions options, long timestamp, long nowInSeconds) diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index ce9da9a538..21da99c14a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -576,7 +576,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa private ResultMessage executeWithCondition(QueryState queryState, QueryOptions options, Dispatcher.RequestTime requestTime) { - CQL3CasRequest request = makeCasRequest(queryState, options); + CQL3CasRequest request = makeCasRequest(queryState, options, requestTime); try (RowIterator result = StorageProxy.cas(keyspace(), table(), @@ -592,7 +592,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa } } - private CQL3CasRequest makeCasRequest(QueryState queryState, QueryOptions options) + private CQL3CasRequest makeCasRequest(QueryState queryState, QueryOptions options, Dispatcher.RequestTime requestTime) { ClientState clientState = queryState.getClientState(); List<ByteBuffer> keys = buildPartitionKeyNames(options, clientState); @@ -610,7 +610,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa type.isUpdate()? "updates" : "deletions"); Clustering<?> clustering = Iterables.getOnlyElement(createClustering(options, clientState)); - CQL3CasRequest request = new CQL3CasRequest(metadata(), key, conditionColumns(), updatesRegularRows(), updatesStaticRow()); + CQL3CasRequest request = new CQL3CasRequest(metadata(), key, conditionColumns(), updatesRegularRows(), updatesStaticRow(), requestTime); addConditions(clustering, request, options); request.addRowUpdate(clustering, this, options, timestamp, nowInSeconds); @@ -718,7 +718,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa public ResultMessage executeLocally(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException { return hasConditions() - ? executeInternalWithCondition(queryState, options) + ? executeInternalWithCondition(queryState, options, Dispatcher.RequestTime.forImmediateExecution()) : executeInternalWithoutCondition(queryState, options, Dispatcher.RequestTime.forImmediateExecution()); } @@ -732,9 +732,9 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa return null; } - public ResultMessage executeInternalWithCondition(QueryState state, QueryOptions options) + public ResultMessage executeInternalWithCondition(QueryState state, QueryOptions options, Dispatcher.RequestTime requestTime) { - CQL3CasRequest request = makeCasRequest(state, options); + CQL3CasRequest request = makeCasRequest(state, options, requestTime); try (RowIterator result = casInternal(state.getClientState(), request, options.getTimestamp(state), options.getNowInSeconds(state))) { diff --git a/src/java/org/apache/cassandra/cql3/terms/Constants.java b/src/java/org/apache/cassandra/cql3/terms/Constants.java index f63d7ce28b..3b9ae6b4c9 100644 --- a/src/java/org/apache/cassandra/cql3/terms/Constants.java +++ b/src/java/org/apache/cassandra/cql3/terms/Constants.java @@ -44,6 +44,14 @@ import org.apache.cassandra.utils.FastByteOperations; */ public abstract class Constants { + + private static ByteBuffer getCurrentCellBuffer(ColumnMetadata column, DecoratedKey key, UpdateParameters params) + { + Row currentRow = params.getPrefetchedRow(key, column.isStatic() ? Clustering.STATIC_CLUSTERING : params.currentClustering()); + Cell<?> currentCell = currentRow == null ? null : currentRow.getCell(column); + return currentCell == null ? null : currentCell.buffer(); + } + public enum Type { STRING @@ -489,8 +497,10 @@ public abstract class Constants else if (column.type instanceof NumberType<?>) { @SuppressWarnings("unchecked") NumberType<Number> type = (NumberType<Number>) column.type; - ByteBuffer increment = t.bindAndGet(params.options); - ByteBuffer current = getCurrentCellBuffer(partitionKey, params); + ByteBuffer increment = type.sanitize(t.bindAndGet(params.options)); + if (increment == null) + return; + ByteBuffer current = type.sanitize(getCurrentCellBuffer(column, partitionKey, params)); if (current == null) return; ByteBuffer newValue = type.add(type.compose(current), type.compose(increment)); @@ -499,7 +509,9 @@ public abstract class Constants else if (column.type instanceof StringType) { ByteBuffer append = t.bindAndGet(params.options); - ByteBuffer current = getCurrentCellBuffer(partitionKey, params); + if (append == null) + return; + ByteBuffer current = getCurrentCellBuffer(column, partitionKey, params); if (current == null) return; ByteBuffer newValue = ByteBuffer.allocate(current.remaining() + append.remaining()); @@ -508,13 +520,6 @@ public abstract class Constants params.addCell(column, newValue); } } - - private ByteBuffer getCurrentCellBuffer(DecoratedKey key, UpdateParameters params) - { - Row currentRow = params.getPrefetchedRow(key, column.isStatic() ? Clustering.STATIC_CLUSTERING : params.currentClustering()); - Cell<?> currentCell = currentRow == null ? null : currentRow.getCell(column); - return currentCell == null ? null : currentCell.buffer(); - } } public static class Substracter extends Operation @@ -524,19 +529,40 @@ public abstract class Constants super(column, t); } + @Override + public boolean requiresRead() + { + return !column.type.isCounter(); + } + public void execute(DecoratedKey partitionKey, UpdateParameters params) throws InvalidRequestException { - ByteBuffer bytes = t.bindAndGet(params.options); - if (bytes == null) - throw new InvalidRequestException("Invalid null value for counter increment"); - if (bytes == ByteBufferUtil.UNSET_BYTE_BUFFER) - return; + if (column.type instanceof CounterColumnType) + { + ByteBuffer bytes = t.bindAndGet(params.options); + if (bytes == null) + throw new InvalidRequestException("Invalid null value for counter increment"); + if (bytes == ByteBufferUtil.UNSET_BYTE_BUFFER) + return; - long increment = ByteBufferUtil.toLong(bytes); - if (increment == Long.MIN_VALUE) - throw new InvalidRequestException("The negation of " + increment + " overflows supported counter precision (signed 8 bytes integer)"); + long increment = ByteBufferUtil.toLong(bytes); + if (increment == Long.MIN_VALUE) + throw new InvalidRequestException("The negation of " + increment + " overflows supported counter precision (signed 8 bytes integer)"); - params.addCounter(column, -increment); + params.addCounter(column, -increment); + } + else if (column.type instanceof NumberType<?>) + { + @SuppressWarnings("unchecked") NumberType<Number> type = (NumberType<Number>) column.type; + ByteBuffer increment = type.sanitize(t.bindAndGet(params.options)); + if (increment == null) + return; + ByteBuffer current = type.sanitize(getCurrentCellBuffer(column, partitionKey, params)); + if (current == null) + return; + ByteBuffer newValue = type.substract(type.compose(current), type.compose(increment)); + params.addCell(column, newValue); + } } } diff --git a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java index 4daea2f4f7..2032fe6fe9 100644 --- a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java +++ b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java @@ -18,6 +18,8 @@ package org.apache.cassandra.db; import java.util.*; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import com.google.common.collect.Iterators; @@ -58,6 +60,11 @@ public class RegularAndStaticColumns implements Iterable<ColumnMetadata> column.isStatic() ? regulars : regulars.without(column)); } + public Stream<ColumnMetadata> stream() + { + return StreamSupport.stream(spliterator(), false); + } + public RegularAndStaticColumns mergeTo(RegularAndStaticColumns that) { if (this == that) diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java index c624ce505b..5378a4cd3f 100644 --- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java +++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import javax.annotation.Nullable; + import org.apache.cassandra.cql3.AssignmentTestable; import org.apache.cassandra.cql3.CQL3Type; import org.apache.cassandra.cql3.ColumnSpecification; @@ -474,14 +476,6 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>, Assignm return this; } - /** - * Returns {@code true} for types where empty should be handled like {@code null} like {@link Int32Type}. - */ - public boolean isEmptyValueMeaningless() - { - return false; - } - /** * @param ignoreFreezing if true, the type string will not be wrapped with FrozenType(...), even if this type is frozen. */ @@ -537,6 +531,22 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>, Assignm return false; } + /** + * Returns {@code true} for types where empty should be handled like {@code null} like {@link Int32Type}. + */ + public boolean isEmptyValueMeaningless() + { + return false; + } + + @Nullable + public ByteBuffer sanitize(@Nullable ByteBuffer bb) + { + if (bb == null) return null; + // not checking allowsEmpty as this method assumes that the bb has already passed validation for the type + return bb.remaining() == 0 && isEmptyValueMeaningless() ? null : bb; + } + public boolean isNull(ByteBuffer bb) { return isNull(bb, ByteBufferAccessor.instance); diff --git a/src/java/org/apache/cassandra/service/CASRequest.java b/src/java/org/apache/cassandra/service/CASRequest.java index 6fb5eea20c..50ea5852a6 100644 --- a/src/java/org/apache/cassandra/service/CASRequest.java +++ b/src/java/org/apache/cassandra/service/CASRequest.java @@ -22,12 +22,15 @@ import org.apache.cassandra.db.partitions.FilteredPartition; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.service.paxos.Ballot; +import org.apache.cassandra.transport.Dispatcher; /** * Abstract the conditions and updates for a CAS operation. */ public interface CASRequest { + Dispatcher.RequestTime requestTime(); + /** * The command to use to fetch the value to compare for the CAS. */ diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java b/src/java/org/apache/cassandra/service/paxos/Paxos.java index 3412add1a2..06f90907d5 100644 --- a/src/java/org/apache/cassandra/service/paxos/Paxos.java +++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java @@ -737,7 +737,7 @@ public class Paxos Tracing.trace("Reading existing values for CAS precondition"); BeginResult begin = begin(proposeDeadline, readCommand, consistencyForConsensus, - true, minimumBallot, failedAttemptsDueToContention); + true, minimumBallot, failedAttemptsDueToContention, request.requestTime()); Ballot ballot = begin.ballot; Participants participants = begin.participants; failedAttemptsDueToContention = begin.failedAttemptsDueToContention; @@ -914,7 +914,7 @@ public class Paxos while (true) { // does the work of applying in-progress writes; throws UAE or timeout if it can't - final BeginResult begin = begin(deadline, read, consistencyForConsensus, false, minimumBallot, failedAttemptsDueToContention); + final BeginResult begin = begin(deadline, read, consistencyForConsensus, false, minimumBallot, failedAttemptsDueToContention, requestTime); failedAttemptsDueToContention = begin.failedAttemptsDueToContention; switch (PAXOS_VARIANT) @@ -1034,7 +1034,8 @@ public class Paxos ConsistencyLevel consistencyForConsensus, final boolean isWrite, Ballot minimumBallot, - int failedAttemptsDueToContention) + int failedAttemptsDueToContention, + Dispatcher.RequestTime requestTime) throws WriteTimeoutException, WriteFailureException, ReadTimeoutException, ReadFailureException { boolean acceptEarlyReadPermission = !isWrite; // if we're reading, begin by assuming a read permission is sufficient @@ -1111,7 +1112,7 @@ public class Paxos PaxosPrepare.Success success = prepare.success(); Supplier<Participants> plan = () -> success.participants; - DataResolver<?, ?> resolver = new DataResolver<>(query, plan, NoopReadRepair.instance, new Dispatcher.RequestTime(query.creationTimeNanos())); + DataResolver<?, ?> resolver = new DataResolver<>(query, plan, NoopReadRepair.instance, requestTime); for (int i = 0 ; i < success.responses.size() ; ++i) resolver.preprocess(success.responses.get(i)); diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java index d6cb5e822f..f701434d00 100644 --- a/src/java/org/apache/cassandra/transport/Dispatcher.java +++ b/src/java/org/apache/cassandra/transport/Dispatcher.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,6 +141,7 @@ public class Dispatcher implements CQLMessageHandler.MessageConsumer<Message.Req public RequestTime(long enqueuedAtNanos, long startedAtNanos) { + Preconditions.checkArgument(enqueuedAtNanos != -1); this.enqueuedAtNanos = enqueuedAtNanos; this.startedAtNanos = startedAtNanos; } diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java new file mode 100644 index 0000000000..bf8a44dcb9 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.cql3; + +import accord.utils.Gen; +import accord.utils.RandomSource; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.cql3.KnownIssue; +import org.apache.cassandra.cql3.ast.CasCondition; +import org.apache.cassandra.cql3.ast.Conditional; +import org.apache.cassandra.cql3.ast.Mutation; +import org.apache.cassandra.cql3.ast.Value; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; +import org.apache.cassandra.utils.ASTGenerators; + +import static org.apache.cassandra.utils.Generators.toGen; + +public abstract class CasMultiNodeTableWalkBase extends MultiNodeTableWalkBase +{ + protected final Config.PaxosVariant paxos_variant; + + protected CasMultiNodeTableWalkBase(Config.PaxosVariant paxos_variant) + { + super(ReadRepairStrategy.NONE); + this.paxos_variant = paxos_variant; + } + + @Override + protected void clusterConfig(IInstanceConfig c) + { + super.clusterConfig(c); + c.set("paxos_variant", paxos_variant); + c.set("cas_contention_timeout", "180s"); + //TODO (now): should these be included? They are in the benchmark clusters +// c.set("paxos_contention_min_wait", 0); +// c.set("paxos_contention_max_wait", "100ms"); +// c.set("paxos_contention_min_delta", 0); + } + + @Override + protected SingleNodeTableWalkTest.State createState(RandomSource rs, Cluster cluster) + { + return new State(rs, cluster); + } + + private static boolean isValueUDTSafe(Value value) + { + var bb = value.valueEncoded(); + return bb == null ? true : bb.hasRemaining(); + } + + protected class State extends MultiNodeState + { + private State(RandomSource rs, Cluster cluster) + { + super(rs, cluster); + } + + @Override + protected Gen<Mutation> toMutationGen(ASTGenerators.MutationGenBuilder mutationGenBuilder) + { + mutationGenBuilder.withCasGen(i -> true); + // generator might not always generate a cas statement... should fix generator! + Gen<Mutation> gen = toGen(mutationGenBuilder.build()).filter(Mutation::isCas); + if (metadata.regularAndStaticColumns().stream().anyMatch(c -> c.type.isUDT()) + && IGNORED_ISSUES.contains(KnownIssue.CAS_CONDITION_ON_UDT_W_EMPTY_BYTES)) + { + gen = gen.filter(m -> { + CasCondition condition; + switch (m.kind) + { + case INSERT: + return true; + case DELETE: + condition = ((Mutation.Delete) m).casCondition.get(); + break; + case UPDATE: + condition = ((Mutation.Update) m).casCondition.get(); + break; + default: + throw new UnsupportedOperationException(m.kind.name()); + } + return !condition.streamRecursive(true).anyMatch(e -> { + if (!(e instanceof Conditional.Where)) return false; + var where = (Conditional.Where) e; + if (!where.lhs.type().isUDT()) return false; + if (where.lhs instanceof Value && !isValueUDTSafe((Value) where.lhs)) + return true; + if (where.rhs instanceof Value && !isValueUDTSafe((Value) where.rhs)) + return true; + return false; + }); + }); + } + return gen; + } + + @Override + protected ConsistencyLevel selectCl() + { + return ConsistencyLevel.SERIAL; + } + + @Override + protected ConsistencyLevel mutationCl() + { + return ConsistencyLevel.SERIAL; + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java index f10d0edcf4..3e9c1195f6 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java @@ -23,6 +23,7 @@ import java.io.IOException; import accord.utils.RandomSource; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; @@ -52,13 +53,17 @@ public abstract class MultiNodeTableWalkBase extends SingleNodeTableWalkTest @Override protected Cluster createCluster() throws IOException { - return createCluster(mockMultiNode ? 1 : 3, c -> { - c.set("range_request_timeout", "180s") - .set("read_request_timeout", "180s") - .set("write_request_timeout", "180s") - .set("native_transport_timeout", "180s") - .set("slow_query_log_timeout", "180s"); - }); + return createCluster(mockMultiNode ? 1 : 3, this::clusterConfig); + } + + @Override + protected void clusterConfig(IInstanceConfig c) + { + c.set("range_request_timeout", "180s") + .set("read_request_timeout", "180s") + .set("write_request_timeout", "180s") + .set("native_transport_timeout", "180s") + .set("slow_query_log_timeout", "180s"); } @Override @@ -67,7 +72,7 @@ public abstract class MultiNodeTableWalkBase extends SingleNodeTableWalkTest return new MultiNodeState(rs, cluster); } - private class MultiNodeState extends State + protected class MultiNodeState extends State { public MultiNodeState(RandomSource rs, Cluster cluster) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithReadRepairTest.java index e8b01f8c71..7727e3a76a 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithReadRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithReadRepairTest.java @@ -38,7 +38,9 @@ public class MultiNodeTableWalkWithReadRepairTest extends MultiNodeTableWalkBase // if a failing seed is detected, populate here // Example: builder.withSeed(42L); // CQL operations may have opertors such as +, -, and / (example 4 + 4), to "apply" them to get a constant value -// CQL_DEBUG_APPLY_OPERATOR = true; + // CQL_DEBUG_APPLY_OPERATOR = true; + // When mutations look to be lost as seen by more complex SELECTs, it can be useful to just SELECT the partition/row right after to write to see if it was safe at the time. + // READ_AFTER_WRITE = true; // When an issue is found, it's a good idea to also run the same seed against MultiNodeTableWalkWithoutReadRepairTest; if Read Repair is given bad input, you should expect bad output! // This test needs to make sure it shares the same random history as MultiNodeTableWalkWithoutReadRepairTest to always allow the ability to maintain this property. } diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java index a18b80d68a..5a0ce66ccc 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java @@ -35,6 +35,8 @@ public class MultiNodeTableWalkWithoutReadRepairTest extends MultiNodeTableWalkB // if a failing seed is detected, populate here // Example: builder.withSeed(42L); // CQL operations may have opertors such as +, -, and / (example 4 + 4), to "apply" them to get a constant value -// CQL_DEBUG_APPLY_OPERATOR = true; + // CQL_DEBUG_APPLY_OPERATOR = true; + // When mutations look to be lost as seen by more complex SELECTs, it can be useful to just SELECT the partition/row right after to write to see if it was safe at the time. + // READ_AFTER_WRITE = true; } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV1MultiNodeTableWalkTest.java similarity index 73% copy from test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java copy to test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV1MultiNodeTableWalkTest.java index a18b80d68a..0cf333d2ab 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV1MultiNodeTableWalkTest.java @@ -19,14 +19,14 @@ package org.apache.cassandra.distributed.test.cql3; import accord.utils.Property; +import org.apache.cassandra.config.Config; import org.apache.cassandra.distributed.Cluster; -import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; -public class MultiNodeTableWalkWithoutReadRepairTest extends MultiNodeTableWalkBase +public class PaxosV1MultiNodeTableWalkTest extends CasMultiNodeTableWalkBase { - public MultiNodeTableWalkWithoutReadRepairTest() + public PaxosV1MultiNodeTableWalkTest() { - super(ReadRepairStrategy.NONE); + super(Config.PaxosVariant.v1); } @Override @@ -35,6 +35,8 @@ public class MultiNodeTableWalkWithoutReadRepairTest extends MultiNodeTableWalkB // if a failing seed is detected, populate here // Example: builder.withSeed(42L); // CQL operations may have opertors such as +, -, and / (example 4 + 4), to "apply" them to get a constant value -// CQL_DEBUG_APPLY_OPERATOR = true; + // CQL_DEBUG_APPLY_OPERATOR = true; + // When mutations look to be lost as seen by more complex SELECTs, it can be useful to just SELECT the partition/row right after to write to see if it was safe at the time. + // READ_AFTER_WRITE = true; } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV2MultiNodeTableWalkTest.java similarity index 73% copy from test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java copy to test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV2MultiNodeTableWalkTest.java index a18b80d68a..fa098edaac 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV2MultiNodeTableWalkTest.java @@ -19,14 +19,14 @@ package org.apache.cassandra.distributed.test.cql3; import accord.utils.Property; +import org.apache.cassandra.config.Config; import org.apache.cassandra.distributed.Cluster; -import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; -public class MultiNodeTableWalkWithoutReadRepairTest extends MultiNodeTableWalkBase +public class PaxosV2MultiNodeTableWalkTest extends CasMultiNodeTableWalkBase { - public MultiNodeTableWalkWithoutReadRepairTest() + public PaxosV2MultiNodeTableWalkTest() { - super(ReadRepairStrategy.NONE); + super(Config.PaxosVariant.v2); } @Override @@ -35,6 +35,8 @@ public class MultiNodeTableWalkWithoutReadRepairTest extends MultiNodeTableWalkB // if a failing seed is detected, populate here // Example: builder.withSeed(42L); // CQL operations may have opertors such as +, -, and / (example 4 + 4), to "apply" them to get a constant value -// CQL_DEBUG_APPLY_OPERATOR = true; + // CQL_DEBUG_APPLY_OPERATOR = true; + // When mutations look to be lost as seen by more complex SELECTs, it can be useful to just SELECT the partition/row right after to write to see if it was safe at the time. + // READ_AFTER_WRITE = true; } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java index 755d479e92..1feb9c5f86 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java @@ -59,6 +59,7 @@ import org.apache.cassandra.db.marshal.InetAddressType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.test.sai.SAIUtil; import org.apache.cassandra.harry.model.BytesPartitionState; import org.apache.cassandra.schema.ColumnMetadata; @@ -92,12 +93,16 @@ public class SingleNodeTableWalkTest extends StatefulASTBase .collect(Collectors.toList())); private static final Logger logger = LoggerFactory.getLogger(SingleNodeTableWalkTest.class); + protected static boolean READ_AFTER_WRITE = false; + protected void preCheck(Cluster cluster, Property.StatefulBuilder builder) { // if a failing seed is detected, populate here // Example: builder.withSeed(42L); // CQL operations may have opertors such as +, -, and / (example 4 + 4), to "apply" them to get a constant value // CQL_DEBUG_APPLY_OPERATOR = true; + // When mutations look to be lost as seen by more complex SELECTs, it can be useful to just SELECT the partition/row right after to write to see if it was safe at the time. + // READ_AFTER_WRITE = true; } protected TypeGenBuilder supportedTypes(RandomSource rs) @@ -345,7 +350,12 @@ public class SingleNodeTableWalkTest extends StatefulASTBase protected Cluster createCluster() throws IOException { - return createCluster(1, i -> {}); + return createCluster(1, this::clusterConfig); + } + + protected void clusterConfig(IInstanceConfig config) + { + } @Test @@ -460,7 +470,8 @@ public class SingleNodeTableWalkTest extends StatefulASTBase { model.factory.regularAndStaticColumns.forEach(mutationGenBuilder::allowEmpty); } - this.mutationGen = toGen(mutationGenBuilder.build()); + model.factory.regularAndStaticColumns.forEach(mutationGenBuilder::allowNull); + this.mutationGen = toMutationGen(mutationGenBuilder); var nonPartitionColumns = ImmutableList.<Symbol>builder() .addAll(model.factory.clusteringColumns) @@ -480,6 +491,17 @@ public class SingleNodeTableWalkTest extends StatefulASTBase .collect(Collectors.toList()); } + @Override + protected boolean readAfterWrite() + { + return READ_AFTER_WRITE; + } + + protected Gen<Mutation> toMutationGen(ASTGenerators.MutationGenBuilder mutationGenBuilder) + { + return toGen(mutationGenBuilder.build()); + } + private boolean isSearchable(Symbol symbol) { // See org.apache.cassandra.cql3.Operator.validateFor diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java index ec530b97fd..a90b467979 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java @@ -81,6 +81,7 @@ import org.apache.cassandra.utils.FastByteOperations; import org.apache.cassandra.utils.Generators; import org.quicktheories.generators.SourceDSL; +import static accord.utils.Property.multistep; import static org.apache.cassandra.distributed.test.JavaDriverUtils.toDriverCL; import static org.apache.cassandra.utils.AbstractTypeGenerators.overridePrimitiveTypeSupport; import static org.apache.cassandra.utils.AbstractTypeGenerators.stringComparator; @@ -179,7 +180,35 @@ public class StatefulASTBase extends TestBaseImpl protected static <S extends CommonState> Property.Command<S, Void, ?> insert(RandomSource rs, S state) { int timestamp = ++state.operations; - return state.command(rs, state.mutationGen().next(rs).withTimestamp(timestamp)); + Mutation mutation = state.mutationGen().next(rs).withTimestamp(timestamp); + + if (!state.readAfterWrite()) + return state.command(rs, mutation); + + return multistep(state.command(rs, mutation), + state.commandSafeRandomHistory(selectForMutation(state, mutation), "Select for Mutation Validation")); + } + + private static <S extends CommonState> Select selectForMutation(S state, Mutation mutation) + { + var select = Select.builder(state.metadata).allowFiltering(); + switch (mutation.kind) + { + case INSERT: + { + var insert = (Mutation.Insert) mutation; + for (var c : state.model.factory.partitionColumns) + select.value(c, insert.values.get(c)); + } + break; + default: + { + select.where(mutation.kind == Mutation.Kind.UPDATE + ? ((Mutation.Update) mutation).where + : ((Mutation.Delete) mutation).where); + } + } + return select.build(); } protected static <S extends BaseState> Property.Command<S, Void, ?> fullTableScan(RandomSource rs, S state) @@ -244,6 +273,11 @@ public class StatefulASTBase extends TestBaseImpl createTable(metadata); } + protected boolean readAfterWrite() + { + return false; + } + protected boolean isMultiNode() { return cluster.size() > 1; @@ -314,6 +348,17 @@ public class StatefulASTBase extends TestBaseImpl }); } + protected <S extends BaseState> Property.Command<S, Void, ?> commandSafeRandomHistory(Select select, @Nullable String annotate) + { + var inst = cluster.firstAlive(); + String postfix = "on " + inst; + if (annotate == null) annotate = postfix; + else annotate += ", " + postfix; + return new Property.SimpleCommand<>(humanReadable(select, annotate), s -> { + s.model.validate(s.executeQuery(inst, Integer.MAX_VALUE, s.selectCl(), select), select); + }); + } + protected ConsistencyLevel selectCl() { return ConsistencyLevel.LOCAL_QUORUM; @@ -333,11 +378,18 @@ public class StatefulASTBase extends TestBaseImpl { var inst = selectInstance(rs); String postfix = "on " + inst; + if (mutation.isCas()) + { + postfix += ", would apply " + model.shouldApply(mutation); + // CAS doesn't allow timestamps + mutation = mutation.withoutTimestamp(); + } if (annotate == null) annotate = postfix; else annotate += ", " + postfix; + Mutation finalMutation = mutation; return new Property.SimpleCommand<>(humanReadable(mutation, annotate), s -> { - s.executeQuery(inst, Integer.MAX_VALUE, s.mutationCl(), mutation); - s.model.update(mutation); + s.executeQuery(inst, Integer.MAX_VALUE, s.mutationCl(), finalMutation); + s.model.update(finalMutation); s.mutation(); }); } @@ -399,7 +451,26 @@ public class StatefulASTBase extends TestBaseImpl SimpleStatement ss = new SimpleStatement(stmt.toCQL(), (Object[]) stmt.bindsEncoded()); if (fetchSize != Integer.MAX_VALUE) ss.setFetchSize(fetchSize); - ss.setConsistencyLevel(toDriverCL(cl)); + if (stmt instanceof Mutation) + { + switch (cl) + { + case SERIAL: + ss.setSerialConsistencyLevel(toDriverCL(cl)); + ss.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.QUORUM); + break; + case LOCAL_SERIAL: + ss.setSerialConsistencyLevel(toDriverCL(cl)); + ss.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM); + break; + default: + ss.setConsistencyLevel(toDriverCL(cl)); + } + } + else + { + ss.setConsistencyLevel(toDriverCL(cl)); + } InetSocketAddress broadcastAddress = instance.config().broadcastAddress(); var host = client.getMetadata().getAllHosts().stream() diff --git a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java index d2fbb6edcc..f6a03bdd17 100644 --- a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java +++ b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java @@ -45,6 +45,7 @@ import com.google.common.collect.Sets; import accord.utils.Invariants; import org.apache.cassandra.cql3.ast.AssignmentOperator; +import org.apache.cassandra.cql3.ast.CasCondition; import org.apache.cassandra.cql3.ast.Conditional; import org.apache.cassandra.cql3.ast.Conditional.Where.Inequality; import org.apache.cassandra.cql3.ast.Element; @@ -54,10 +55,13 @@ import org.apache.cassandra.cql3.ast.FunctionCall; import org.apache.cassandra.cql3.ast.Literal; import org.apache.cassandra.cql3.ast.Mutation; import org.apache.cassandra.cql3.ast.Operator; +import org.apache.cassandra.cql3.ast.Reference; +import org.apache.cassandra.cql3.ast.ReferenceExpression; import org.apache.cassandra.cql3.ast.Select; import org.apache.cassandra.cql3.ast.StandardVisitors; import org.apache.cassandra.cql3.ast.Symbol; import org.apache.cassandra.cql3.ast.Value; +import org.apache.cassandra.cql3.ast.Visitor; import org.apache.cassandra.db.BufferClustering; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.marshal.AbstractType; @@ -75,6 +79,8 @@ import static org.apache.cassandra.harry.model.BytesPartitionState.asCQL; public class ASTSingleTableModel { + private static final ByteBuffer[][] NO_ROWS = new ByteBuffer[0][]; + public final BytesPartitionState.Factory factory; private final TreeMap<BytesPartitionState.Ref, BytesPartitionState> partitions = new TreeMap<>(); @@ -184,6 +190,7 @@ public class ASTSingleTableModel public void update(Mutation mutation) { + if (!shouldApply(mutation)) return; switch (mutation.kind) { case INSERT: @@ -200,7 +207,7 @@ public class ASTSingleTableModel } } - public void update(Mutation.Insert insert) + private void update(Mutation.Insert insert) { Clustering<ByteBuffer> pd = pd(insert); BytesPartitionState partition = partitions.get(factory.createRef(pd)); @@ -229,7 +236,7 @@ public class ASTSingleTableModel true); } - public void update(Mutation.Update update) + private void update(Mutation.Update update) { var split = splitOnPartition(update.where.simplify()); List<Clustering<ByteBuffer>> pks = split.left; @@ -250,9 +257,12 @@ public class ASTSingleTableModel for (Symbol col : Sets.intersection(factory.staticColumns.asSet(), set.keySet())) { ByteBuffer current = partition.staticRow().get(col); - write.put(col, eval(col, current, set.get(col))); + EvalResult result = eval(col, current, set.get(col)); + if (result.kind == EvalResult.Kind.SKIP) continue; + write.put(col, result.value); } - partition.setStaticColumns(write); + if (!write.isEmpty()) + partition.setStaticColumns(write); } // table has clustering but non are in the write, so only pk/static can be updated if (!factory.clusteringColumns.isEmpty() && remaining.isEmpty()) @@ -263,10 +273,13 @@ public class ASTSingleTableModel for (Symbol col : Sets.intersection(factory.regularColumns.asSet(), set.keySet())) { ByteBuffer current = partition.get(cd, col); - write.put(col, eval(col, current, set.get(col))); + EvalResult result = eval(col, current, set.get(col)); + if (result.kind == EvalResult.Kind.SKIP) continue; + write.put(col, result.value); } - partition.setColumns(cd, write, false); + if (!write.isEmpty()) + partition.setColumns(cd, write, false); } } } @@ -274,7 +287,7 @@ public class ASTSingleTableModel private enum DeleteKind {PARTITION, ROW, COLUMN} - public void update(Mutation.Delete delete) + private void update(Mutation.Delete delete) { //TODO (coverage): range deletes var split = splitOnPartition(delete.where.simplify()); @@ -328,6 +341,168 @@ public class ASTSingleTableModel } } + public boolean shouldApply(Mutation mutation) + { + if (!mutation.isCas()) return true; + return shouldApply(mutation, selectPartitionForCAS(mutation)); + } + + private SelectResult selectPartitionForCAS(Mutation mutation) + { + var partition = partitions.get(factory.createRef(pd(mutation))); + if (partition == null) return SelectResult.ordered(factory.selectionOrder, NO_ROWS); + + var cd = cdOrNull(mutation); + var row = cd == null ? null : partition.get(cd); + ImmutableUniqueList<Symbol> columns = cd != null ? factory.selectionOrder : factory.partitionAndStaticColumns; + return SelectResult.ordered(columns, new ByteBuffer[][] { getRowAsByteBuffer(columns, partition, row)}); + } + + private boolean shouldApply(Mutation mutation, SelectResult current) + { + Preconditions.checkArgument(mutation.isCas()); + // process condition + CasCondition condition; + switch (mutation.kind) + { + case INSERT: + condition = CasCondition.Simple.NotExists; + break; + case UPDATE: + condition = ((Mutation.Update) mutation).casCondition.get(); + break; + case DELETE: + condition = ((Mutation.Delete) mutation).casCondition.get(); + break; + default: + throw new UnsupportedOperationException(mutation.kind.name()); + } + if (condition instanceof CasCondition.Simple) + { + boolean hasPartition = current.rows.length > 0; + boolean partitionOrRow = current.columns.equals(factory.partitionAndStaticColumns); + boolean hasRow = partitionOrRow ? hasPartition : current.isAllDefined(factory.clusteringColumns); + var simple = (CasCondition.Simple) condition; + switch (simple) + { + case Exists: + return hasRow; + case NotExists: + return !hasRow; + default: + throw new UnsupportedOperationException(simple.name()); + } + } + var ifCondition = (CasCondition.IfCondition) condition; + String letRow = "row"; + Symbol rowSymbol = Symbol.unknownType(letRow); + Map<String, SelectResult> lets = Map.of(letRow, current); + // point the columns to be row.column that way it matches LET clause in BEGIN TRANSACTION, allowing better reuse + var updatedCondition = ifCondition.conditional.visit(new Visitor() + { + @Override + public ReferenceExpression visit(ReferenceExpression r) + { + Preconditions.checkArgument(!(r instanceof Reference), "Unexpected reference detected: %s", r); + return Reference.of(rowSymbol, r); + } + }); + return process(updatedCondition, lets); + } + + private boolean process(Conditional condition, Map<String, SelectResult> lets) + { + if (condition.getClass() == Conditional.Is.class) + { + var is = (Conditional.Is) condition; + Object result = extract(is.reference, lets); + return result == null + ? is.kind == Conditional.Is.Kind.Null + : is.kind == Conditional.Is.Kind.NotNull; + } + else if (condition.getClass() == Conditional.Where.class) + { + var where = (Conditional.Where) (condition); + if (!where.lhs.type().equals(where.rhs.type())) + throw new UnsupportedOperationException("For now where clause must always have matching types: given " + where.lhs.type() + ' ' + where.rhs.type()); + ByteBuffer lhs = where.lhs instanceof ReferenceExpression + ? (ByteBuffer) extract((ReferenceExpression) where.lhs, lets) + : eval(where.lhs); + ByteBuffer rhs = where.rhs instanceof ReferenceExpression + ? (ByteBuffer) extract((ReferenceExpression) where.rhs, lets) + : eval(where.rhs); + // If anything is null avoid doing the test, but there is a special case where this returns true... both sides are null! + // This logic isn't consistent with other parts of the database and is local to CAS IF clause + // see ML@Inconsistent null handling between WHERE and IF clauses + if (lhs == null || rhs == null) + return lhs == rhs; + return where.kind.test(where.lhs.type(), lhs, rhs); + } + else if (condition.getClass() == Conditional.And.class) + { + var conditions = condition.simplify(); + for (var c : conditions) + { + if (!process(c, lets)) + return false; + } + return true; + } + else + { + throw new UnsupportedOperationException("Unsupported condition type: " + condition.getClass() + "; " + condition.toCQL()); + } + } + + // Either ByteBuffer (cell) or ByteBuffer[] (row) + private static Object extract(ReferenceExpression expr, Map<String, SelectResult> lets) + { + Object result = extract0(expr, lets); + if (result instanceof SelectResult) + { + var rows = ((SelectResult) result).rows; + result = rows.length == 0 ? null : rows[0]; + } + return result; + } + + // o can be Map<String, SelectResult> (lets), SelectResult (row), ByteBuffer (cell) + private static Object extract0(ReferenceExpression expr, @Nullable Object o) + { + if (o == null) return null; + if (expr instanceof Reference) + { + Reference ref = (Reference) expr; + for (var symbol : ref.path) + o = extract0(symbol, o); + return o; + } + else if (expr instanceof Symbol) + { + var symbol = (Symbol) expr; + if (o instanceof Map) + { + Map<String, SelectResult> lets = (Map<String, SelectResult>) o; + return lets.get(symbol.symbol); + } + else if (o instanceof SelectResult) + { + SelectResult result = (SelectResult) o; + if (result.rows.length == 0) + return null; + return result.rows[0][result.columns.indexOf(symbol)]; + } + else + { + throw new UnsupportedOperationException("Unexpected object type: " + o.getClass()); + } + } + else + { + throw new UnsupportedOperationException("Unsupported ref type: " + expr.getClass() + "; " + expr.toCQL()); + } + } + private List<Clustering<ByteBuffer>> clustering(List<Conditional> conditionals) { if (conditionals.isEmpty()) @@ -422,11 +597,74 @@ public class ASTSingleTableModel return Collections.singletonList(BufferClustering.make(bbs)); } + private Clustering<ByteBuffer> pd(Mutation mutation) + { + switch (mutation.kind) + { + case INSERT: + return pd((Mutation.Insert) mutation); + case UPDATE: + return pd((Mutation.Update) mutation); + case DELETE: + return pd((Mutation.Delete) mutation); + default: + throw new UnsupportedOperationException(mutation.kind.name()); + } + } + private Clustering<ByteBuffer> pd(Mutation.Insert mutation) { return key(mutation.values, factory.partitionColumns); } + private Clustering<ByteBuffer> pd(Mutation.Update mutation) + { + return pd("Update", mutation.where.simplify()); + } + + private Clustering<ByteBuffer> pd(Mutation.Delete mutation) + { + return pd("Delete", mutation.where.simplify()); + } + + private Clustering<ByteBuffer> pd(String type, List<Conditional> conditionals) + { + var split = splitOnPartition(conditionals); + List<Clustering<ByteBuffer>> pks = split.left; + Preconditions.checkArgument(pks.size() == 1, "%s had more than 1 partition key! expected 1 but was %s", type, pks.size()); + return pks.get(0); + } + + @Nullable + private Clustering<ByteBuffer> cdOrNull(Mutation mutation) + { + if (factory.clusteringColumns.isEmpty()) return Clustering.EMPTY; + if (mutation.kind == Mutation.Kind.INSERT) + { + var insert = (Mutation.Insert) mutation; + return !insert.values.keySet().containsAll(factory.clusteringColumns) + ? null + : key(insert.values, factory.clusteringColumns); + } + Conditional where; + switch (mutation.kind) + { + case UPDATE: + where = ((Mutation.Update) mutation).where; + break; + case DELETE: + where = ((Mutation.Delete) mutation).where; + break; + default: + throw new UnsupportedOperationException("Unexpected mutation: " + mutation.kind); + } + var partitions = splitOnPartition(where.simplify()); + if (partitions.right.isEmpty()) return null; + var matches = clustering(partitions.right); + Preconditions.checkArgument(matches.size() == 1); + return matches.get(0); + } + public BytesPartitionState get(BytesPartitionState.Ref ref) { return partitions.get(ref); @@ -611,20 +849,53 @@ public class ASTSingleTableModel private static class SelectResult { + private final ImmutableUniqueList<Symbol> columns; private final ByteBuffer[][] rows; private final boolean unordered; - private SelectResult(ByteBuffer[][] rows, boolean unordered) + private SelectResult(ImmutableUniqueList<Symbol> columns, ByteBuffer[][] rows, boolean unordered) { + this.columns = columns; this.rows = rows; this.unordered = unordered; } + + private static SelectResult ordered(ImmutableUniqueList<Symbol> columns, ByteBuffer[][] rows) + { + return new SelectResult(columns, rows, false); + } + + private static SelectResult unordered(ImmutableUniqueList<Symbol> columns, ByteBuffer[][] rows) + { + return new SelectResult(columns, rows, true); + } + + public boolean isAllDefined(ImmutableUniqueList<Symbol> selectColumns) + { + if (rows.length == 0) return false; + for (var row : rows) + { + for (var col : selectColumns) + { + if (row[columns.indexOf(col)] == null) + return false; + } + } + return true; + } + } + + public ImmutableUniqueList<Symbol> columns(Select select) + { + if (select.selections.isEmpty()) return factory.selectionOrder; + throw new UnsupportedOperationException("Getting columns from select other than SELECT * is currently not supported"); } private SelectResult getRowsAsByteBuffer(Select select) { + ImmutableUniqueList<Symbol> columns = columns(select); if (select.where.isEmpty()) - return new SelectResult(getRowsAsByteBuffer(applyLimits(all(), select.perPartitionLimit, select.limit)), false); + return SelectResult.ordered(columns, getRowsAsByteBuffer(applyLimits(all(), select.perPartitionLimit, select.limit))); LookupContext ctx = context(select); List<PrimaryKey> primaryKeys; if (ctx.unmatchable) @@ -652,7 +923,7 @@ public class ASTSingleTableModel } primaryKeys = applyLimits(primaryKeys, select.perPartitionLimit, select.limit); //TODO (correctness): now that we have the rows we need to handle the selections/aggregation/limit/group-by/etc. - return new SelectResult(getRowsAsByteBuffer(primaryKeys), ctx.unordered); + return new SelectResult(columns, getRowsAsByteBuffer(primaryKeys), ctx.unordered); } private List<PrimaryKey> applyLimits(List<PrimaryKey> primaryKeys, Optional<Value> perPartitionLimitOpt, Optional<Value> limitOpt) @@ -709,20 +980,37 @@ public class ASTSingleTableModel } private ByteBuffer[] getRowAsByteBuffer(BytesPartitionState partition, @Nullable BytesPartitionState.Row row) + { + return getRowAsByteBuffer(factory.selectionOrder, partition, row); + } + + private ByteBuffer[] getRowAsByteBuffer(ImmutableUniqueList<Symbol> columns, BytesPartitionState partition, @Nullable BytesPartitionState.Row row) { Clustering<ByteBuffer> pd = partition.key; BytesPartitionState.Row staticRow = partition.staticRow(); - ByteBuffer[] bbs = new ByteBuffer[factory.selectionOrder.size()]; + ByteBuffer[] bbs = new ByteBuffer[columns.size()]; for (Symbol col : factory.partitionColumns) - bbs[factory.selectionOrder.indexOf(col)] = pd.bufferAt(factory.partitionColumns.indexOf(col)); + { + if (!columns.contains(col)) continue; + bbs[columns.indexOf(col)] = pd.bufferAt(factory.partitionColumns.indexOf(col)); + } for (Symbol col : factory.staticColumns) - bbs[factory.selectionOrder.indexOf(col)] = staticRow.get(col); + { + if (!columns.contains(col)) continue; + bbs[columns.indexOf(col)] = staticRow.get(col); + } if (row != null) { for (Symbol col : factory.clusteringColumns) - bbs[factory.selectionOrder.indexOf(col)] = row.clustering.bufferAt(factory.clusteringColumns.indexOf(col)); + { + if (!columns.contains(col)) continue; + bbs[columns.indexOf(col)] = row.clustering.bufferAt(factory.clusteringColumns.indexOf(col)); + } for (Symbol col : factory.regularColumns) - bbs[factory.selectionOrder.indexOf(col)] = row.get(col); + { + if (!columns.contains(col)) continue; + bbs[columns.indexOf(col)] = row.get(col); + } } return bbs; } @@ -942,26 +1230,52 @@ public class ASTSingleTableModel return current.stream().map(BufferClustering::new).collect(Collectors.toList()); } - private static ByteBuffer eval(Symbol col, @Nullable ByteBuffer current, Expression e) + private static class EvalResult + { + private static final EvalResult SKIP = new EvalResult(Kind.SKIP, null); + + private enum Kind { SKIP, ACCEPT } + + private final Kind kind; + private final @Nullable ByteBuffer value; + + private EvalResult(Kind kind, @Nullable ByteBuffer value) + { + this.kind = kind; + this.value = value; + } + + private static EvalResult accept(@Nullable ByteBuffer bb) + { + return new EvalResult(Kind.ACCEPT, bb); + } + } + + private static EvalResult eval(Symbol col, @Nullable ByteBuffer current, Expression e) { - if (!(e instanceof AssignmentOperator)) return eval(e); + if (!(e instanceof AssignmentOperator)) return EvalResult.accept(eval(e)); + current = col.type().sanitize(current); // multi cell collections have the property that they do update even if the current value is null boolean isFancy = col.type().isCollection() && col.type().isMultiCell(); - if (current == null && !isFancy) return null; // null + ? == null + if (current == null && !isFancy) return EvalResult.SKIP; // null + ? == null var assignment = (AssignmentOperator) e; if (isFancy && current == null) { return assignment.kind == AssignmentOperator.Kind.SUBTRACT // if it doesn't exist, then there is nothing to subtract - ? null - : eval(assignment.right); + ? EvalResult.SKIP + : EvalResult.accept(eval(assignment.right)); } + // validate your inputs... + ByteBuffer rhs = col.type().sanitize(eval(assignment.right)); + if (rhs == null) + return EvalResult.SKIP; switch (assignment.kind) { case ADD: - return eval(new Operator(Operator.Kind.ADD, new Literal(current, e.type()), assignment.right)); + return EvalResult.accept(eval(new Operator(Operator.Kind.ADD, new Literal(current, e.type()), assignment.right))); case SUBTRACT: - return eval(new Operator(Operator.Kind.SUBTRACT, new Literal(current, e.type()), assignment.right)); + return EvalResult.accept(eval(new Operator(Operator.Kind.SUBTRACT, new Literal(current, e.type()), assignment.right))); default: throw new UnsupportedOperationException(assignment.kind + ": " + assignment.toCQL()); } diff --git a/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java b/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java index a10524968a..c2d18e573d 100644 --- a/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java +++ b/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java @@ -449,9 +449,10 @@ public class BytesPartitionState public final TableMetadata metadata; public final ImmutableUniqueList<Symbol> partitionColumns; public final ImmutableUniqueList<Symbol> clusteringColumns; + public final ImmutableUniqueList<Symbol> primaryColumns; public final ImmutableUniqueList<Symbol> staticColumns; public final ImmutableUniqueList<Symbol> regularColumns; - public final ImmutableUniqueList<Symbol> selectionOrder, regularAndStaticColumns; + public final ImmutableUniqueList<Symbol> selectionOrder, partitionAndStaticColumns, regularAndStaticColumns; public final ClusteringComparator clusteringComparator; @@ -471,9 +472,23 @@ public class BytesPartitionState for (ColumnMetadata pk : metadata.clusteringColumns()) symbolListBuilder.add(Symbol.from(pk)); clusteringColumns = symbolListBuilder.buildAndClear(); + if (clusteringColumns.isEmpty()) primaryColumns = partitionColumns; + else + { + symbolListBuilder.addAll(partitionColumns); + symbolListBuilder.addAll(clusteringColumns); + primaryColumns = symbolListBuilder.buildAndClear(); + } for (ColumnMetadata pk : metadata.staticColumns()) symbolListBuilder.add(Symbol.from(pk)); staticColumns = symbolListBuilder.buildAndClear(); + if (staticColumns.isEmpty()) partitionAndStaticColumns = partitionColumns; + else + { + symbolListBuilder.addAll(partitionColumns); + symbolListBuilder.addAll(staticColumns); + partitionAndStaticColumns = symbolListBuilder.buildAndClear(); + } for (ColumnMetadata pk : metadata.regularColumns()) symbolListBuilder.add(Symbol.from(pk)); regularColumns = symbolListBuilder.buildAndClear(); diff --git a/test/unit/org/apache/cassandra/cql3/KnownIssue.java b/test/unit/org/apache/cassandra/cql3/KnownIssue.java index b1c2c09d66..be2dfe7524 100644 --- a/test/unit/org/apache/cassandra/cql3/KnownIssue.java +++ b/test/unit/org/apache/cassandra/cql3/KnownIssue.java @@ -40,7 +40,9 @@ public enum KnownIssue AF_MULTI_NODE_MULTI_COLUMN_AND_NODE_LOCAL_WRITES("https://issues.apache.org/jira/browse/CASSANDRA-19007", "When doing multi node/multi column queries, AF can miss data when the nodes are not in-sync"), SAI_AND_VECTOR_COLUMNS("https://issues.apache.org/jira/browse/CASSANDRA-20464", - "When doing an SAI query, if the where clause also contains a vector column bad results can be produced") + "When doing an SAI query, if the where clause also contains a vector column bad results can be produced"), + CAS_CONDITION_ON_UDT_W_EMPTY_BYTES("https://issues.apache.org/jira/browse/CASSANDRA-20479", + "WHERE clause blocks operations on UDTs but CAS allows in IF clause. During this path empty can be confused with null which allows non-existing rows to match empty bytes"), ; KnownIssue(String url, String description) diff --git a/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java b/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java index 0ffb65411b..e3918f70da 100644 --- a/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java +++ b/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java @@ -59,10 +59,15 @@ public class AssignmentOperator implements Expression EnumSet<Kind> result = EnumSet.noneOf(Kind.class); if (type instanceof CollectionType && type.isMultiCell()) { - if (type instanceof SetType || type instanceof ListType) + if (type instanceof SetType) return EnumSet.of(Kind.ADD, Kind.SUBTRACT); + if (type instanceof ListType) + return isTransaction + ? EnumSet.of(Kind.ADD, Kind.SUBTRACT) + : EnumSet.of(Kind.ADD); if (type instanceof MapType) { + //TODO (coverage): include SUBTRACT support // map supports subtract, but not map - map; only map - set! // since this is annoying to support, for now dropping - return EnumSet.of(Kind.ADD); diff --git a/test/unit/org/apache/cassandra/cql3/ast/Bind.java b/test/unit/org/apache/cassandra/cql3/ast/Bind.java index ac58dfc7df..788d301dbf 100644 --- a/test/unit/org/apache/cassandra/cql3/ast/Bind.java +++ b/test/unit/org/apache/cassandra/cql3/ast/Bind.java @@ -54,6 +54,7 @@ public class Bind implements Value @Override public ByteBuffer valueEncoded() { + if (value == null) return null; return value instanceof ByteBuffer ? (ByteBuffer) value : ((AbstractType) type).decompose(value); } diff --git a/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java b/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java index 9f8d5867e0..d0d4d0e35b 100644 --- a/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java +++ b/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java @@ -22,6 +22,8 @@ import java.util.stream.Stream; public interface CasCondition extends Element { + CasCondition visit(Visitor v); + enum Simple implements CasCondition { NotExists("IF NOT EXISTS"), @@ -39,11 +41,17 @@ public interface CasCondition extends Element { sb.append(cql); } + + @Override + public CasCondition visit(Visitor v) + { + return v.visit(this); + } } class IfCondition implements CasCondition { - private final Conditional conditional; + public final Conditional conditional; public IfCondition(Conditional conditional) { @@ -62,5 +70,15 @@ public interface CasCondition extends Element { return Stream.of(conditional); } + + @Override + public CasCondition visit(Visitor v) + { + var u = v.visit(this); + if (u != this) return u; + var c = conditional.visit(v); + if (c == conditional) return this; + return new IfCondition(c); + } } } diff --git a/test/unit/org/apache/cassandra/cql3/ast/Conditional.java b/test/unit/org/apache/cassandra/cql3/ast/Conditional.java index 52f79bb1dc..66012c060c 100644 --- a/test/unit/org/apache/cassandra/cql3/ast/Conditional.java +++ b/test/unit/org/apache/cassandra/cql3/ast/Conditional.java @@ -72,6 +72,21 @@ public interface Conditional extends Expression { this.value = value; } + + public boolean test(AbstractType<?> type, ByteBuffer a, ByteBuffer b) + { + int rc = type.compare(a, b); + switch (this) + { + case EQUAL: return rc == 0; + case NOT_EQUAL: return rc != 0; + case GREATER_THAN: return rc > 0; + case GREATER_THAN_EQ: return rc >=0; + case LESS_THAN: return rc < 0; + case LESS_THAN_EQ: return rc <=0; + default: throw new UnsupportedOperationException(this.name()); + } + } } public final Inequality kind; diff --git a/test/unit/org/apache/cassandra/cql3/ast/ExpressionEvaluator.java b/test/unit/org/apache/cassandra/cql3/ast/ExpressionEvaluator.java index 34acb843c7..a626dcd6b0 100644 --- a/test/unit/org/apache/cassandra/cql3/ast/ExpressionEvaluator.java +++ b/test/unit/org/apache/cassandra/cql3/ast/ExpressionEvaluator.java @@ -55,6 +55,11 @@ public class ExpressionEvaluator Object rhs = eval(e.right); if (rhs instanceof ByteBuffer) rhs = e.right.type().compose((ByteBuffer) rhs); + // null + 42 = null + // 42 + null = null + // if anything is null, everything is null! + if (lhs == null || rhs == null) + return null; switch (e.kind) { case ADD: diff --git a/test/unit/org/apache/cassandra/cql3/ast/Literal.java b/test/unit/org/apache/cassandra/cql3/ast/Literal.java index 3886fe91b4..4bd2f9b631 100644 --- a/test/unit/org/apache/cassandra/cql3/ast/Literal.java +++ b/test/unit/org/apache/cassandra/cql3/ast/Literal.java @@ -56,6 +56,7 @@ public class Literal implements Value @Override public ByteBuffer valueEncoded() { + if (value == null) return null; return value instanceof ByteBuffer ? (ByteBuffer) value : ((AbstractType) type).decompose(value); } @@ -69,6 +70,11 @@ public class Literal implements Value public void toCQL(StringBuilder sb, CQLFormatter formatter) { ByteBuffer bytes = valueEncoded(); + if (bytes == null) + { + sb.append("null"); + return; + } if (bytes.remaining() == 0 && !actuallySupportsEmpty(type)) { sb.append("<empty bytes>"); diff --git a/test/unit/org/apache/cassandra/cql3/ast/Mutation.java b/test/unit/org/apache/cassandra/cql3/ast/Mutation.java index 9126d8cbda..95987dc657 100644 --- a/test/unit/org/apache/cassandra/cql3/ast/Mutation.java +++ b/test/unit/org/apache/cassandra/cql3/ast/Mutation.java @@ -52,6 +52,8 @@ public abstract class Mutation implements Statement public abstract boolean isCas(); + public abstract Mutation withoutTimestamp(); + public Mutation withTimestamp(long timestamp) { return withTimestamp(new Timestamp(new Literal(timestamp, LongType.instance))); @@ -172,6 +174,11 @@ public abstract class Mutation implements Statement this.timestamp = timestamp; } + public Using withoutTimestamp() + { + return new Using(ttl, Optional.empty()); + } + public Using withTimestamp(Timestamp timestamp) { return new Using(ttl, Optional.of(timestamp)); @@ -299,6 +306,14 @@ public abstract class Mutation implements Statement return ifNotExists; } + @Override + public Mutation withoutTimestamp() + { + return new Insert(table, values, ifNotExists, using.isEmpty() + ? using + : using.map(u -> u.withoutTimestamp())); + } + @Override public Insert withTimestamp(Timestamp timestamp) { @@ -404,9 +419,20 @@ public abstract class Mutation implements Statement Conditional copiedWhere = where.visit(v); if (where != copiedWhere) updated = true; + Optional<? extends CasCondition> updatedCasCondition = casCondition; + if (casCondition.isPresent()) + { + CasCondition original = casCondition.get(); + var casCopy = original.visit(v); + if (casCopy != original) + { + updatedCasCondition = Optional.ofNullable(casCopy); + updated = true; + } + } if (!updated) return this; - return new Update(table, using, copied, copiedWhere, casCondition); + return new Update(table, using, copied, copiedWhere, updatedCasCondition); } @Override @@ -415,6 +441,12 @@ public abstract class Mutation implements Statement return casCondition.isPresent(); } + @Override + public Mutation withoutTimestamp() + { + return new Update(table, using.isEmpty() ? using : using.map(u -> u.withoutTimestamp()), set, where, casCondition); + } + @Override public Update withTimestamp(Timestamp timestamp) { @@ -520,9 +552,20 @@ WHERE PK_column_conditions var copiedWhere = where.visit(v); if (copiedWhere != where) updated = true; + Optional<? extends CasCondition> updatedCasCondition = casCondition; + if (casCondition.isPresent()) + { + CasCondition original = casCondition.get(); + var casCopy = original.visit(v); + if (casCopy != original) + { + updatedCasCondition = Optional.ofNullable(casCopy); + updated = true; + } + } if (!updated) return this; - return new Delete(copiedColumns, table, timestamp, copiedWhere, casCondition); + return new Delete(copiedColumns, table, timestamp, copiedWhere, updatedCasCondition); } @Override @@ -531,6 +574,12 @@ WHERE PK_column_conditions return casCondition.isPresent(); } + @Override + public Mutation withoutTimestamp() + { + return new Delete(columns, table, Optional.empty(), where, casCondition); + } + @Override public Delete withTimestamp(Timestamp timestamp) { diff --git a/test/unit/org/apache/cassandra/cql3/ast/Reference.java b/test/unit/org/apache/cassandra/cql3/ast/Reference.java index fe836a05dd..ee3b73d6fd 100644 --- a/test/unit/org/apache/cassandra/cql3/ast/Reference.java +++ b/test/unit/org/apache/cassandra/cql3/ast/Reference.java @@ -48,6 +48,11 @@ public class Reference implements ReferenceExpression return new Reference(Collections.singletonList(Objects.requireNonNull(top))); } + public static Reference of(ReferenceExpression top, ReferenceExpression next) + { + return new Reference(List.of(top, next)); + } + @Override public AbstractType<?> type() { diff --git a/test/unit/org/apache/cassandra/cql3/ast/ReferenceExpression.java b/test/unit/org/apache/cassandra/cql3/ast/ReferenceExpression.java index 7a99e030a4..d1727fccdb 100644 --- a/test/unit/org/apache/cassandra/cql3/ast/ReferenceExpression.java +++ b/test/unit/org/apache/cassandra/cql3/ast/ReferenceExpression.java @@ -26,6 +26,6 @@ public interface ReferenceExpression extends Expression @Override default ReferenceExpression visit(Visitor v) { - return this; + return v.visit(this); } } diff --git a/test/unit/org/apache/cassandra/cql3/ast/Value.java b/test/unit/org/apache/cassandra/cql3/ast/Value.java index f04b5fcd36..92ef010183 100644 --- a/test/unit/org/apache/cassandra/cql3/ast/Value.java +++ b/test/unit/org/apache/cassandra/cql3/ast/Value.java @@ -20,11 +20,15 @@ package org.apache.cassandra.cql3.ast; import java.nio.ByteBuffer; +import javax.annotation.Nullable; + import org.apache.cassandra.db.marshal.AbstractType; public interface Value extends Expression { + @Nullable Object value(); + @Nullable ByteBuffer valueEncoded(); Value with(Object value, AbstractType<?> type); diff --git a/test/unit/org/apache/cassandra/cql3/ast/Visitor.java b/test/unit/org/apache/cassandra/cql3/ast/Visitor.java index 71e4797c25..c87415e981 100644 --- a/test/unit/org/apache/cassandra/cql3/ast/Visitor.java +++ b/test/unit/org/apache/cassandra/cql3/ast/Visitor.java @@ -71,6 +71,11 @@ public interface Visitor default Value visit(Value v) { return v; } + default CasCondition visit(CasCondition s) + { + return s; + } + class CompositeVisitor implements Visitor { private final List<Visitor> visitors; diff --git a/test/unit/org/apache/cassandra/utils/ASTGenerators.java b/test/unit/org/apache/cassandra/utils/ASTGenerators.java index 47b2267de3..cdf533cb42 100644 --- a/test/unit/org/apache/cassandra/utils/ASTGenerators.java +++ b/test/unit/org/apache/cassandra/utils/ASTGenerators.java @@ -166,6 +166,7 @@ public class ASTGenerators private Gen<?> valueGen; private Gen<Boolean> useOperator = SourceDSL.booleans().all(); private Gen<Boolean> useEmpty = SourceDSL.arbitrary().constant(false); + private Gen<Boolean> useNull = SourceDSL.arbitrary().constant(false); private BiFunction<Object, AbstractType<?>, Gen<Value>> literalOrBindGen = ASTGenerators::valueGen; public ExpressionBuilder(AbstractType<?> type) @@ -182,6 +183,12 @@ public class ASTGenerators return this; } + public ExpressionBuilder allowNull() + { + useNull = SourceDSL.integers().between(1, 100).map(i -> i < 10); + return this; + } + public ExpressionBuilder withOperators() { useOperator = i -> true; @@ -217,6 +224,8 @@ public class ASTGenerators //TODO (coverage): rather than single level operators, allow nested (a + b + c + d) Gen<Value> leaf = rs -> literalOrBindGen.apply(valueGen.generate(rs), type).generate(rs); return rs -> { + if (useNull.generate(rs)) + return new Bind(null, type); if (useEmpty.generate(rs)) return new Bind(ByteBufferUtil.EMPTY_BYTE_BUFFER, type); Expression e = leaf.generate(rs); @@ -395,6 +404,12 @@ public class ASTGenerators return this; } + public MutationGenBuilder allowNull(Symbol symbol) + { + columnExpressions.get(symbol).allowNull(); + return this; + } + public MutationGenBuilder withDeletionKind(Gen<DeleteKind> deleteKindGen) { this.deleteKindGen = deleteKindGen; @@ -432,7 +447,7 @@ public class ASTGenerators public MutationGenBuilder withCasGen(Gen<Boolean> withCasGen) { - withCasGen = Objects.requireNonNull(withCasGen); + this.withCasGen = Objects.requireNonNull(withCasGen); return this; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org