This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b56edf2a5d Add support in CAS for -= on numeric types, and fixed 
improper handling of empty bytes which lead to NPE
b56edf2a5d is described below

commit b56edf2a5df8c320b33e38116f2742ab69f7b4fd
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Fri Mar 28 11:48:51 2025 -0700

    Add support in CAS for -= on numeric types, and fixed improper handling of 
empty bytes which lead to NPE
    
    patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-20477
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/cql3/Operation.java  |  12 +-
 .../cassandra/cql3/statements/BatchStatement.java  |  12 +-
 .../cassandra/cql3/statements/CQL3CasRequest.java  |  12 +-
 .../cql3/statements/ModificationStatement.java     |  12 +-
 .../org/apache/cassandra/cql3/terms/Constants.java |  64 ++--
 .../cassandra/db/RegularAndStaticColumns.java      |   7 +
 .../apache/cassandra/db/marshal/AbstractType.java  |  26 +-
 .../org/apache/cassandra/service/CASRequest.java   |   3 +
 .../org/apache/cassandra/service/paxos/Paxos.java  |   9 +-
 .../org/apache/cassandra/transport/Dispatcher.java |   2 +
 .../test/cql3/CasMultiNodeTableWalkBase.java       | 129 ++++++++
 .../test/cql3/MultiNodeTableWalkBase.java          |  21 +-
 .../cql3/MultiNodeTableWalkWithReadRepairTest.java |   4 +-
 .../MultiNodeTableWalkWithoutReadRepairTest.java   |   4 +-
 ...est.java => PaxosV1MultiNodeTableWalkTest.java} |  12 +-
 ...est.java => PaxosV2MultiNodeTableWalkTest.java} |  12 +-
 .../test/cql3/SingleNodeTableWalkTest.java         |  26 +-
 .../distributed/test/cql3/StatefulASTBase.java     |  79 ++++-
 .../cassandra/harry/model/ASTSingleTableModel.java | 358 +++++++++++++++++++--
 .../cassandra/harry/model/BytesPartitionState.java |  17 +-
 .../unit/org/apache/cassandra/cql3/KnownIssue.java |   4 +-
 .../cassandra/cql3/ast/AssignmentOperator.java     |   7 +-
 test/unit/org/apache/cassandra/cql3/ast/Bind.java  |   1 +
 .../apache/cassandra/cql3/ast/CasCondition.java    |  20 +-
 .../org/apache/cassandra/cql3/ast/Conditional.java |  15 +
 .../cassandra/cql3/ast/ExpressionEvaluator.java    |   5 +
 .../org/apache/cassandra/cql3/ast/Literal.java     |   6 +
 .../org/apache/cassandra/cql3/ast/Mutation.java    |  53 ++-
 .../org/apache/cassandra/cql3/ast/Reference.java   |   5 +
 .../cassandra/cql3/ast/ReferenceExpression.java    |   2 +-
 test/unit/org/apache/cassandra/cql3/ast/Value.java |   4 +
 .../org/apache/cassandra/cql3/ast/Visitor.java     |   5 +
 .../org/apache/cassandra/utils/ASTGenerators.java  |  17 +-
 34 files changed, 864 insertions(+), 102 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 80eabfd705..361fb3cd07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Add support in CAS for -= on numeric types, and fixed improper handling of 
empty bytes which lead to NPE (CASSANDRA-20477)
  * Do not fail to start a node with materialized views after they are turned 
off in config (CASSANDRA-20452)
  * Fix nodetool gcstats output, support human-readable units and more output 
formats (CASSANDRA-19022)
  * Various gossip to TCM upgrade fixes (CASSANDRA-20483)
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java 
b/src/java/org/apache/cassandra/cql3/Operation.java
index 7a7c0e8420..7c5e02eb63 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -374,8 +374,16 @@ public abstract class Operation
         {
             if (!(receiver.type instanceof CollectionType))
             {
-                if (!(receiver.type instanceof CounterColumnType))
-                    throw new InvalidRequestException(String.format("Invalid 
operation (%s) for non counter column %s", toString(receiver), receiver.name));
+                if (canReadExistingState)
+                {
+                    if (!(receiver.type instanceof NumberType<?>))
+                        throw new 
InvalidRequestException(String.format("Invalid operation (%s) for non-numeric 
type %s", toString(receiver), receiver.name));
+                }
+                else
+                {
+                    if (!(receiver.type instanceof CounterColumnType))
+                        throw new 
InvalidRequestException(String.format("Invalid operation (%s) for non counter 
column %s", toString(receiver), receiver.name));
+                }
                 return new Constants.Substracter(receiver, 
value.prepare(metadata.keyspace, receiver));
             }
             else if (!(receiver.type.isMultiCell()))
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index bfec675464..6c5b121992 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -468,7 +468,7 @@ public class BatchStatement implements CQLStatement
 
     private ResultMessage executeWithConditions(BatchQueryOptions options, 
QueryState state, Dispatcher.RequestTime requestTime)
     {
-        Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, 
state);
+        Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, 
state, requestTime);
         CQL3CasRequest casRequest = p.left;
         Set<ColumnMetadata> columnsWithConditions = p.right;
 
@@ -495,7 +495,7 @@ public class BatchStatement implements CQLStatement
         }
     }
 
-    private Pair<CQL3CasRequest,Set<ColumnMetadata>> 
makeCasRequest(BatchQueryOptions options, QueryState state)
+    private Pair<CQL3CasRequest,Set<ColumnMetadata>> 
makeCasRequest(BatchQueryOptions options, QueryState state, 
Dispatcher.RequestTime requestTime)
     {
         long batchTimestamp = options.getTimestamp(state);
         long nowInSeconds = options.getNowInSeconds(state);
@@ -514,7 +514,7 @@ public class BatchStatement implements CQLStatement
             if (key == null)
             {
                 key = statement.metadata().partitioner.decorateKey(pks.get(0));
-                casRequest = new CQL3CasRequest(statement.metadata(), key, 
conditionColumns, updatesRegularRows, updatesStaticRow);
+                casRequest = new CQL3CasRequest(statement.metadata(), key, 
conditionColumns, updatesRegularRows, updatesStaticRow, requestTime);
             }
             else if (!key.getKey().equals(pks.get(0)))
             {
@@ -570,7 +570,7 @@ public class BatchStatement implements CQLStatement
         BatchQueryOptions batchOptions = 
BatchQueryOptions.withoutPerStatementVariables(options);
 
         if (hasConditions)
-            return executeInternalWithConditions(batchOptions, queryState);
+            return executeInternalWithConditions(batchOptions, queryState, 
Dispatcher.RequestTime.forImmediateExecution());
 
         executeInternalWithoutCondition(queryState, batchOptions, 
Dispatcher.RequestTime.forImmediateExecution());
         return new ResultMessage.Void();
@@ -586,9 +586,9 @@ public class BatchStatement implements CQLStatement
         return null;
     }
 
-    private ResultMessage executeInternalWithConditions(BatchQueryOptions 
options, QueryState state)
+    private ResultMessage executeInternalWithConditions(BatchQueryOptions 
options, QueryState state, Dispatcher.RequestTime requestTime)
     {
-        Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, 
state);
+        Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, 
state, requestTime);
         CQL3CasRequest request = p.left;
         Set<ColumnMetadata> columnsWithConditions = p.right;
 
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 0d322691c6..4db98459ec 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -34,6 +34,7 @@ import 
org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.CASRequest;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.utils.TimeUUID;
 
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -49,6 +50,7 @@ public class CQL3CasRequest implements CASRequest
     private final RegularAndStaticColumns conditionColumns;
     private final boolean updatesRegularRows;
     private final boolean updatesStaticRow;
+    private final Dispatcher.RequestTime requestTime;
     private boolean hasExists; // whether we have an exist or if not exist 
condition
 
     // Conditions on the static row. We keep it separate from 'conditions' as 
most things related to the static row are
@@ -66,7 +68,8 @@ public class CQL3CasRequest implements CASRequest
                           DecoratedKey key,
                           RegularAndStaticColumns conditionColumns,
                           boolean updatesRegularRows,
-                          boolean updatesStaticRow)
+                          boolean updatesStaticRow,
+                          Dispatcher.RequestTime requestTime)
     {
         this.metadata = metadata;
         this.key = key;
@@ -74,6 +77,13 @@ public class CQL3CasRequest implements CASRequest
         this.conditionColumns = conditionColumns;
         this.updatesRegularRows = updatesRegularRows;
         this.updatesStaticRow = updatesStaticRow;
+        this.requestTime = requestTime;
+    }
+
+    @Override
+    public Dispatcher.RequestTime requestTime()
+    {
+        return requestTime;
     }
 
     void addRowUpdate(Clustering<?> clustering, ModificationStatement stmt, 
QueryOptions options, long timestamp, long nowInSeconds)
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index ce9da9a538..21da99c14a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -576,7 +576,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
 
     private ResultMessage executeWithCondition(QueryState queryState, 
QueryOptions options, Dispatcher.RequestTime requestTime)
     {
-        CQL3CasRequest request = makeCasRequest(queryState, options);
+        CQL3CasRequest request = makeCasRequest(queryState, options, 
requestTime);
 
         try (RowIterator result = StorageProxy.cas(keyspace(),
                                                    table(),
@@ -592,7 +592,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
         }
     }
 
-    private CQL3CasRequest makeCasRequest(QueryState queryState, QueryOptions 
options)
+    private CQL3CasRequest makeCasRequest(QueryState queryState, QueryOptions 
options, Dispatcher.RequestTime requestTime)
     {
         ClientState clientState = queryState.getClientState();
         List<ByteBuffer> keys = buildPartitionKeyNames(options, clientState);
@@ -610,7 +610,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
                     type.isUpdate()? "updates" : "deletions");
 
         Clustering<?> clustering = 
Iterables.getOnlyElement(createClustering(options, clientState));
-        CQL3CasRequest request = new CQL3CasRequest(metadata(), key, 
conditionColumns(), updatesRegularRows(), updatesStaticRow());
+        CQL3CasRequest request = new CQL3CasRequest(metadata(), key, 
conditionColumns(), updatesRegularRows(), updatesStaticRow(), requestTime);
 
         addConditions(clustering, request, options);
         request.addRowUpdate(clustering, this, options, timestamp, 
nowInSeconds);
@@ -718,7 +718,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
     public ResultMessage executeLocally(QueryState queryState, QueryOptions 
options) throws RequestValidationException, RequestExecutionException
     {
         return hasConditions()
-               ? executeInternalWithCondition(queryState, options)
+               ? executeInternalWithCondition(queryState, options, 
Dispatcher.RequestTime.forImmediateExecution())
                : executeInternalWithoutCondition(queryState, options, 
Dispatcher.RequestTime.forImmediateExecution());
     }
 
@@ -732,9 +732,9 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
         return null;
     }
 
-    public ResultMessage executeInternalWithCondition(QueryState state, 
QueryOptions options)
+    public ResultMessage executeInternalWithCondition(QueryState state, 
QueryOptions options, Dispatcher.RequestTime requestTime)
     {
-        CQL3CasRequest request = makeCasRequest(state, options);
+        CQL3CasRequest request = makeCasRequest(state, options, requestTime);
 
         try (RowIterator result = casInternal(state.getClientState(), request, 
options.getTimestamp(state), options.getNowInSeconds(state)))
         {
diff --git a/src/java/org/apache/cassandra/cql3/terms/Constants.java 
b/src/java/org/apache/cassandra/cql3/terms/Constants.java
index f63d7ce28b..3b9ae6b4c9 100644
--- a/src/java/org/apache/cassandra/cql3/terms/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/terms/Constants.java
@@ -44,6 +44,14 @@ import org.apache.cassandra.utils.FastByteOperations;
  */
 public abstract class Constants
 {
+
+    private static ByteBuffer getCurrentCellBuffer(ColumnMetadata column, 
DecoratedKey key, UpdateParameters params)
+    {
+        Row currentRow = params.getPrefetchedRow(key, column.isStatic() ? 
Clustering.STATIC_CLUSTERING : params.currentClustering());
+        Cell<?> currentCell = currentRow == null ? null : 
currentRow.getCell(column);
+        return currentCell == null ? null : currentCell.buffer();
+    }
+
     public enum Type
     {
         STRING
@@ -489,8 +497,10 @@ public abstract class Constants
             else if (column.type instanceof NumberType<?>)
             {
                 @SuppressWarnings("unchecked") NumberType<Number> type = 
(NumberType<Number>) column.type;
-                ByteBuffer increment = t.bindAndGet(params.options);
-                ByteBuffer current = getCurrentCellBuffer(partitionKey, 
params);
+                ByteBuffer increment = 
type.sanitize(t.bindAndGet(params.options));
+                if (increment == null)
+                    return;
+                ByteBuffer current = 
type.sanitize(getCurrentCellBuffer(column, partitionKey, params));
                 if (current == null)
                     return;
                 ByteBuffer newValue = type.add(type.compose(current), 
type.compose(increment));
@@ -499,7 +509,9 @@ public abstract class Constants
             else if (column.type instanceof StringType)
             {
                 ByteBuffer append = t.bindAndGet(params.options);
-                ByteBuffer current = getCurrentCellBuffer(partitionKey, 
params);
+                if (append == null)
+                    return;
+                ByteBuffer current = getCurrentCellBuffer(column, 
partitionKey, params);
                 if (current == null)
                     return;
                 ByteBuffer newValue = ByteBuffer.allocate(current.remaining() 
+ append.remaining());
@@ -508,13 +520,6 @@ public abstract class Constants
                 params.addCell(column, newValue);
             }
         }
-
-        private ByteBuffer getCurrentCellBuffer(DecoratedKey key, 
UpdateParameters params)
-        {
-            Row currentRow = params.getPrefetchedRow(key, column.isStatic() ? 
Clustering.STATIC_CLUSTERING : params.currentClustering());
-            Cell<?> currentCell = currentRow == null ? null : 
currentRow.getCell(column);
-            return currentCell == null ? null : currentCell.buffer();
-        }
     }
 
     public static class Substracter extends Operation
@@ -524,19 +529,40 @@ public abstract class Constants
             super(column, t);
         }
 
+        @Override
+        public boolean requiresRead()
+        {
+            return !column.type.isCounter();
+        }
+
         public void execute(DecoratedKey partitionKey, UpdateParameters 
params) throws InvalidRequestException
         {
-            ByteBuffer bytes = t.bindAndGet(params.options);
-            if (bytes == null)
-                throw new InvalidRequestException("Invalid null value for 
counter increment");
-            if (bytes == ByteBufferUtil.UNSET_BYTE_BUFFER)
-                return;
+            if (column.type instanceof CounterColumnType)
+            {
+                ByteBuffer bytes = t.bindAndGet(params.options);
+                if (bytes == null)
+                    throw new InvalidRequestException("Invalid null value for 
counter increment");
+                if (bytes == ByteBufferUtil.UNSET_BYTE_BUFFER)
+                    return;
 
-            long increment = ByteBufferUtil.toLong(bytes);
-            if (increment == Long.MIN_VALUE)
-                throw new InvalidRequestException("The negation of " + 
increment + " overflows supported counter precision (signed 8 bytes integer)");
+                long increment = ByteBufferUtil.toLong(bytes);
+                if (increment == Long.MIN_VALUE)
+                    throw new InvalidRequestException("The negation of " + 
increment + " overflows supported counter precision (signed 8 bytes integer)");
 
-            params.addCounter(column, -increment);
+                params.addCounter(column, -increment);
+            }
+            else if (column.type instanceof NumberType<?>)
+            {
+                @SuppressWarnings("unchecked") NumberType<Number> type = 
(NumberType<Number>) column.type;
+                ByteBuffer increment = 
type.sanitize(t.bindAndGet(params.options));
+                if (increment == null)
+                    return;
+                ByteBuffer current = 
type.sanitize(getCurrentCellBuffer(column, partitionKey, params));
+                if (current == null)
+                    return;
+                ByteBuffer newValue = type.substract(type.compose(current), 
type.compose(increment));
+                params.addCell(column, newValue);
+            }
         }
     }
 
diff --git a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java 
b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
index 4daea2f4f7..2032fe6fe9 100644
--- a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
+++ b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
@@ -18,6 +18,8 @@
 package org.apache.cassandra.db;
 
 import java.util.*;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import com.google.common.collect.Iterators;
 
@@ -58,6 +60,11 @@ public class RegularAndStaticColumns implements 
Iterable<ColumnMetadata>
                                            column.isStatic() ? regulars : 
regulars.without(column));
     }
 
+    public Stream<ColumnMetadata> stream()
+    {
+        return StreamSupport.stream(spliterator(), false);
+    }
+
     public RegularAndStaticColumns mergeTo(RegularAndStaticColumns that)
     {
         if (this == that)
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java 
b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index c624ce505b..5378a4cd3f 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import javax.annotation.Nullable;
+
 import org.apache.cassandra.cql3.AssignmentTestable;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.ColumnSpecification;
@@ -474,14 +476,6 @@ public abstract class AbstractType<T> implements 
Comparator<ByteBuffer>, Assignm
         return this;
     }
 
-    /**
-     * Returns {@code true} for types where empty should be handled like 
{@code null} like {@link Int32Type}.
-     */
-    public boolean isEmptyValueMeaningless()
-    {
-        return false;
-    }
-
     /**
      * @param ignoreFreezing if true, the type string will not be wrapped with 
FrozenType(...), even if this type is frozen.
      */
@@ -537,6 +531,22 @@ public abstract class AbstractType<T> implements 
Comparator<ByteBuffer>, Assignm
         return false;
     }
 
+    /**
+     * Returns {@code true} for types where empty should be handled like 
{@code null} like {@link Int32Type}.
+     */
+    public boolean isEmptyValueMeaningless()
+    {
+        return false;
+    }
+
+    @Nullable
+    public ByteBuffer sanitize(@Nullable ByteBuffer bb)
+    {
+        if (bb == null) return null;
+        // not checking allowsEmpty as this method assumes that the bb has 
already passed validation for the type
+        return bb.remaining() == 0 && isEmptyValueMeaningless() ? null : bb;
+    }
+
     public boolean isNull(ByteBuffer bb)
     {
         return isNull(bb, ByteBufferAccessor.instance);
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java 
b/src/java/org/apache/cassandra/service/CASRequest.java
index 6fb5eea20c..50ea5852a6 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -22,12 +22,15 @@ import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.transport.Dispatcher;
 
 /**
  * Abstract the conditions and updates for a CAS operation.
  */
 public interface CASRequest
 {
+    Dispatcher.RequestTime requestTime();
+
     /**
      * The command to use to fetch the value to compare for the CAS.
      */
diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java 
b/src/java/org/apache/cassandra/service/paxos/Paxos.java
index 3412add1a2..06f90907d5 100644
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@ -737,7 +737,7 @@ public class Paxos
                 Tracing.trace("Reading existing values for CAS precondition");
 
                 BeginResult begin = begin(proposeDeadline, readCommand, 
consistencyForConsensus,
-                        true, minimumBallot, failedAttemptsDueToContention);
+                        true, minimumBallot, failedAttemptsDueToContention, 
request.requestTime());
                 Ballot ballot = begin.ballot;
                 Participants participants = begin.participants;
                 failedAttemptsDueToContention = 
begin.failedAttemptsDueToContention;
@@ -914,7 +914,7 @@ public class Paxos
             while (true)
             {
                 // does the work of applying in-progress writes; throws UAE or 
timeout if it can't
-                final BeginResult begin = begin(deadline, read, 
consistencyForConsensus, false, minimumBallot, failedAttemptsDueToContention);
+                final BeginResult begin = begin(deadline, read, 
consistencyForConsensus, false, minimumBallot, failedAttemptsDueToContention, 
requestTime);
                 failedAttemptsDueToContention = 
begin.failedAttemptsDueToContention;
 
                 switch (PAXOS_VARIANT)
@@ -1034,7 +1034,8 @@ public class Paxos
                                      ConsistencyLevel consistencyForConsensus,
                                      final boolean isWrite,
                                      Ballot minimumBallot,
-                                     int failedAttemptsDueToContention)
+                                     int failedAttemptsDueToContention,
+                                     Dispatcher.RequestTime requestTime)
             throws WriteTimeoutException, WriteFailureException, 
ReadTimeoutException, ReadFailureException
     {
         boolean acceptEarlyReadPermission = !isWrite; // if we're reading, 
begin by assuming a read permission is sufficient
@@ -1111,7 +1112,7 @@ public class Paxos
                     PaxosPrepare.Success success = prepare.success();
 
                     Supplier<Participants> plan = () -> success.participants;
-                    DataResolver<?, ?> resolver = new DataResolver<>(query, 
plan, NoopReadRepair.instance, new 
Dispatcher.RequestTime(query.creationTimeNanos()));
+                    DataResolver<?, ?> resolver = new DataResolver<>(query, 
plan, NoopReadRepair.instance, requestTime);
                     for (int i = 0 ; i < success.responses.size() ; ++i)
                         resolver.preprocess(success.responses.get(i));
 
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java 
b/src/java/org/apache/cassandra/transport/Dispatcher.java
index d6cb5e822f..f701434d00 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -140,6 +141,7 @@ public class Dispatcher implements 
CQLMessageHandler.MessageConsumer<Message.Req
 
         public RequestTime(long enqueuedAtNanos, long startedAtNanos)
         {
+            Preconditions.checkArgument(enqueuedAtNanos != -1);
             this.enqueuedAtNanos = enqueuedAtNanos;
             this.startedAtNanos = startedAtNanos;
         }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
new file mode 100644
index 0000000000..bf8a44dcb9
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.cql3;
+
+import accord.utils.Gen;
+import accord.utils.RandomSource;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.cql3.KnownIssue;
+import org.apache.cassandra.cql3.ast.CasCondition;
+import org.apache.cassandra.cql3.ast.Conditional;
+import org.apache.cassandra.cql3.ast.Mutation;
+import org.apache.cassandra.cql3.ast.Value;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
+import org.apache.cassandra.utils.ASTGenerators;
+
+import static org.apache.cassandra.utils.Generators.toGen;
+
+public abstract class CasMultiNodeTableWalkBase extends MultiNodeTableWalkBase
+{
+    protected final Config.PaxosVariant paxos_variant;
+
+    protected CasMultiNodeTableWalkBase(Config.PaxosVariant paxos_variant)
+    {
+        super(ReadRepairStrategy.NONE);
+        this.paxos_variant = paxos_variant;
+    }
+
+    @Override
+    protected void clusterConfig(IInstanceConfig c)
+    {
+        super.clusterConfig(c);
+        c.set("paxos_variant", paxos_variant);
+        c.set("cas_contention_timeout", "180s");
+        //TODO (now): should these be included?  They are in the benchmark 
clusters
+//        c.set("paxos_contention_min_wait", 0);
+//        c.set("paxos_contention_max_wait", "100ms");
+//        c.set("paxos_contention_min_delta", 0);
+    }
+
+    @Override
+    protected SingleNodeTableWalkTest.State createState(RandomSource rs, 
Cluster cluster)
+    {
+        return new State(rs, cluster);
+    }
+
+    private static boolean isValueUDTSafe(Value value)
+    {
+        var bb = value.valueEncoded();
+        return bb == null ? true : bb.hasRemaining();
+    }
+
+    protected class State extends MultiNodeState
+    {
+        private State(RandomSource rs, Cluster cluster)
+        {
+            super(rs, cluster);
+        }
+
+        @Override
+        protected Gen<Mutation> toMutationGen(ASTGenerators.MutationGenBuilder 
mutationGenBuilder)
+        {
+            mutationGenBuilder.withCasGen(i -> true);
+            // generator might not always generate a cas statement... should 
fix generator!
+            Gen<Mutation> gen = 
toGen(mutationGenBuilder.build()).filter(Mutation::isCas);
+            if (metadata.regularAndStaticColumns().stream().anyMatch(c -> 
c.type.isUDT())
+                && 
IGNORED_ISSUES.contains(KnownIssue.CAS_CONDITION_ON_UDT_W_EMPTY_BYTES))
+            {
+                gen = gen.filter(m -> {
+                    CasCondition condition;
+                    switch (m.kind)
+                    {
+                        case INSERT:
+                            return true;
+                        case DELETE:
+                            condition = ((Mutation.Delete) 
m).casCondition.get();
+                            break;
+                        case UPDATE:
+                            condition = ((Mutation.Update) 
m).casCondition.get();
+                            break;
+                        default:
+                            throw new 
UnsupportedOperationException(m.kind.name());
+                    }
+                    return !condition.streamRecursive(true).anyMatch(e -> {
+                        if (!(e instanceof Conditional.Where)) return false;
+                        var where = (Conditional.Where) e;
+                        if (!where.lhs.type().isUDT()) return false;
+                        if (where.lhs instanceof Value && 
!isValueUDTSafe((Value) where.lhs))
+                            return true;
+                        if (where.rhs instanceof Value && 
!isValueUDTSafe((Value) where.rhs))
+                            return true;
+                        return false;
+                    });
+                });
+            }
+            return gen;
+        }
+
+        @Override
+        protected ConsistencyLevel selectCl()
+        {
+            return ConsistencyLevel.SERIAL;
+        }
+
+        @Override
+        protected ConsistencyLevel mutationCl()
+        {
+            return ConsistencyLevel.SERIAL;
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
index f10d0edcf4..3e9c1195f6 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import accord.utils.RandomSource;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
@@ -52,13 +53,17 @@ public abstract class MultiNodeTableWalkBase extends 
SingleNodeTableWalkTest
     @Override
     protected Cluster createCluster() throws IOException
     {
-        return createCluster(mockMultiNode ? 1 : 3, c -> {
-            c.set("range_request_timeout", "180s")
-             .set("read_request_timeout", "180s")
-             .set("write_request_timeout", "180s")
-             .set("native_transport_timeout", "180s")
-             .set("slow_query_log_timeout", "180s");
-        });
+        return createCluster(mockMultiNode ? 1 : 3, this::clusterConfig);
+    }
+
+    @Override
+    protected void clusterConfig(IInstanceConfig c)
+    {
+        c.set("range_request_timeout", "180s")
+         .set("read_request_timeout", "180s")
+         .set("write_request_timeout", "180s")
+         .set("native_transport_timeout", "180s")
+         .set("slow_query_log_timeout", "180s");
     }
 
     @Override
@@ -67,7 +72,7 @@ public abstract class MultiNodeTableWalkBase extends 
SingleNodeTableWalkTest
         return new MultiNodeState(rs, cluster);
     }
 
-    private class MultiNodeState extends State
+    protected class MultiNodeState extends State
     {
         public MultiNodeState(RandomSource rs, Cluster cluster)
         {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithReadRepairTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithReadRepairTest.java
index e8b01f8c71..7727e3a76a 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithReadRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithReadRepairTest.java
@@ -38,7 +38,9 @@ public class MultiNodeTableWalkWithReadRepairTest extends 
MultiNodeTableWalkBase
         // if a failing seed is detected, populate here
         // Example: builder.withSeed(42L);
         // CQL operations may have opertors such as +, -, and / (example 4 + 
4), to "apply" them to get a constant value
-//         CQL_DEBUG_APPLY_OPERATOR = true;
+        // CQL_DEBUG_APPLY_OPERATOR = true;
+        // When mutations look to be lost as seen by more complex SELECTs, it 
can be useful to just SELECT the partition/row right after to write to see if 
it was safe at the time.
+        // READ_AFTER_WRITE = true;
         // When an issue is found, it's a good idea to also run the same seed 
against MultiNodeTableWalkWithoutReadRepairTest; if Read Repair is given bad 
input, you should expect bad output!
         // This test needs to make sure it shares the same random history as 
MultiNodeTableWalkWithoutReadRepairTest to always allow the ability to maintain 
this property.
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
index a18b80d68a..5a0ce66ccc 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
@@ -35,6 +35,8 @@ public class MultiNodeTableWalkWithoutReadRepairTest extends 
MultiNodeTableWalkB
         // if a failing seed is detected, populate here
         // Example: builder.withSeed(42L);
         // CQL operations may have opertors such as +, -, and / (example 4 + 
4), to "apply" them to get a constant value
-//         CQL_DEBUG_APPLY_OPERATOR = true;
+        // CQL_DEBUG_APPLY_OPERATOR = true;
+        // When mutations look to be lost as seen by more complex SELECTs, it 
can be useful to just SELECT the partition/row right after to write to see if 
it was safe at the time.
+        // READ_AFTER_WRITE = true;
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV1MultiNodeTableWalkTest.java
similarity index 73%
copy from 
test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
copy to 
test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV1MultiNodeTableWalkTest.java
index a18b80d68a..0cf333d2ab 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV1MultiNodeTableWalkTest.java
@@ -19,14 +19,14 @@
 package org.apache.cassandra.distributed.test.cql3;
 
 import accord.utils.Property;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 
-public class MultiNodeTableWalkWithoutReadRepairTest extends 
MultiNodeTableWalkBase
+public class PaxosV1MultiNodeTableWalkTest extends CasMultiNodeTableWalkBase
 {
-    public MultiNodeTableWalkWithoutReadRepairTest()
+    public PaxosV1MultiNodeTableWalkTest()
     {
-        super(ReadRepairStrategy.NONE);
+        super(Config.PaxosVariant.v1);
     }
 
     @Override
@@ -35,6 +35,8 @@ public class MultiNodeTableWalkWithoutReadRepairTest extends 
MultiNodeTableWalkB
         // if a failing seed is detected, populate here
         // Example: builder.withSeed(42L);
         // CQL operations may have opertors such as +, -, and / (example 4 + 
4), to "apply" them to get a constant value
-//         CQL_DEBUG_APPLY_OPERATOR = true;
+        // CQL_DEBUG_APPLY_OPERATOR = true;
+        // When mutations look to be lost as seen by more complex SELECTs, it 
can be useful to just SELECT the partition/row right after to write to see if 
it was safe at the time.
+        // READ_AFTER_WRITE = true;
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV2MultiNodeTableWalkTest.java
similarity index 73%
copy from 
test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
copy to 
test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV2MultiNodeTableWalkTest.java
index a18b80d68a..fa098edaac 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV2MultiNodeTableWalkTest.java
@@ -19,14 +19,14 @@
 package org.apache.cassandra.distributed.test.cql3;
 
 import accord.utils.Property;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 
-public class MultiNodeTableWalkWithoutReadRepairTest extends 
MultiNodeTableWalkBase
+public class PaxosV2MultiNodeTableWalkTest extends CasMultiNodeTableWalkBase
 {
-    public MultiNodeTableWalkWithoutReadRepairTest()
+    public PaxosV2MultiNodeTableWalkTest()
     {
-        super(ReadRepairStrategy.NONE);
+        super(Config.PaxosVariant.v2);
     }
 
     @Override
@@ -35,6 +35,8 @@ public class MultiNodeTableWalkWithoutReadRepairTest extends 
MultiNodeTableWalkB
         // if a failing seed is detected, populate here
         // Example: builder.withSeed(42L);
         // CQL operations may have opertors such as +, -, and / (example 4 + 
4), to "apply" them to get a constant value
-//         CQL_DEBUG_APPLY_OPERATOR = true;
+        // CQL_DEBUG_APPLY_OPERATOR = true;
+        // When mutations look to be lost as seen by more complex SELECTs, it 
can be useful to just SELECT the partition/row right after to write to see if 
it was safe at the time.
+        // READ_AFTER_WRITE = true;
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
index 755d479e92..1feb9c5f86 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.db.marshal.InetAddressType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.test.sai.SAIUtil;
 import org.apache.cassandra.harry.model.BytesPartitionState;
 import org.apache.cassandra.schema.ColumnMetadata;
@@ -92,12 +93,16 @@ public class SingleNodeTableWalkTest extends StatefulASTBase
                                                                                
                                          .collect(Collectors.toList()));
     private static final Logger logger = 
LoggerFactory.getLogger(SingleNodeTableWalkTest.class);
 
+    protected static boolean READ_AFTER_WRITE = false;
+
     protected void preCheck(Cluster cluster, Property.StatefulBuilder builder)
     {
         // if a failing seed is detected, populate here
         // Example: builder.withSeed(42L);
         // CQL operations may have opertors such as +, -, and / (example 4 + 
4), to "apply" them to get a constant value
         // CQL_DEBUG_APPLY_OPERATOR = true;
+        // When mutations look to be lost as seen by more complex SELECTs, it 
can be useful to just SELECT the partition/row right after to write to see if 
it was safe at the time.
+        // READ_AFTER_WRITE = true;
     }
 
     protected TypeGenBuilder supportedTypes(RandomSource rs)
@@ -345,7 +350,12 @@ public class SingleNodeTableWalkTest extends 
StatefulASTBase
 
     protected Cluster createCluster() throws IOException
     {
-        return createCluster(1, i -> {});
+        return createCluster(1, this::clusterConfig);
+    }
+
+    protected void clusterConfig(IInstanceConfig config)
+    {
+
     }
 
     @Test
@@ -460,7 +470,8 @@ public class SingleNodeTableWalkTest extends StatefulASTBase
             {
                 
model.factory.regularAndStaticColumns.forEach(mutationGenBuilder::allowEmpty);
             }
-            this.mutationGen = toGen(mutationGenBuilder.build());
+            
model.factory.regularAndStaticColumns.forEach(mutationGenBuilder::allowNull);
+            this.mutationGen = toMutationGen(mutationGenBuilder);
 
             var nonPartitionColumns = ImmutableList.<Symbol>builder()
                                                    
.addAll(model.factory.clusteringColumns)
@@ -480,6 +491,17 @@ public class SingleNodeTableWalkTest extends 
StatefulASTBase
                                 .collect(Collectors.toList());
         }
 
+        @Override
+        protected boolean readAfterWrite()
+        {
+            return READ_AFTER_WRITE;
+        }
+
+        protected Gen<Mutation> toMutationGen(ASTGenerators.MutationGenBuilder 
mutationGenBuilder)
+        {
+            return toGen(mutationGenBuilder.build());
+        }
+
         private boolean isSearchable(Symbol symbol)
         {
             // See org.apache.cassandra.cql3.Operator.validateFor
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
index ec530b97fd..a90b467979 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
@@ -81,6 +81,7 @@ import org.apache.cassandra.utils.FastByteOperations;
 import org.apache.cassandra.utils.Generators;
 import org.quicktheories.generators.SourceDSL;
 
+import static accord.utils.Property.multistep;
 import static org.apache.cassandra.distributed.test.JavaDriverUtils.toDriverCL;
 import static 
org.apache.cassandra.utils.AbstractTypeGenerators.overridePrimitiveTypeSupport;
 import static 
org.apache.cassandra.utils.AbstractTypeGenerators.stringComparator;
@@ -179,7 +180,35 @@ public class StatefulASTBase extends TestBaseImpl
     protected static <S extends CommonState> Property.Command<S, Void, ?> 
insert(RandomSource rs, S state)
     {
         int timestamp = ++state.operations;
-        return state.command(rs, 
state.mutationGen().next(rs).withTimestamp(timestamp));
+        Mutation mutation = 
state.mutationGen().next(rs).withTimestamp(timestamp);
+
+        if (!state.readAfterWrite())
+            return state.command(rs, mutation);
+
+        return multistep(state.command(rs, mutation),
+                         
state.commandSafeRandomHistory(selectForMutation(state, mutation), "Select for 
Mutation Validation"));
+    }
+
+    private static <S extends CommonState> Select selectForMutation(S state, 
Mutation mutation)
+    {
+        var select = Select.builder(state.metadata).allowFiltering();
+        switch (mutation.kind)
+        {
+            case INSERT:
+            {
+                var insert = (Mutation.Insert) mutation;
+                for (var c : state.model.factory.partitionColumns)
+                    select.value(c, insert.values.get(c));
+            }
+            break;
+            default:
+            {
+                select.where(mutation.kind == Mutation.Kind.UPDATE
+                             ? ((Mutation.Update) mutation).where
+                             : ((Mutation.Delete) mutation).where);
+            }
+        }
+        return select.build();
     }
 
     protected static <S extends BaseState> Property.Command<S, Void, ?> 
fullTableScan(RandomSource rs, S state)
@@ -244,6 +273,11 @@ public class StatefulASTBase extends TestBaseImpl
             createTable(metadata);
         }
 
+        protected boolean readAfterWrite()
+        {
+            return false;
+        }
+
         protected boolean isMultiNode()
         {
             return cluster.size() > 1;
@@ -314,6 +348,17 @@ public class StatefulASTBase extends TestBaseImpl
             });
         }
 
+        protected <S extends BaseState> Property.Command<S, Void, ?> 
commandSafeRandomHistory(Select select, @Nullable String annotate)
+        {
+            var inst = cluster.firstAlive();
+            String postfix = "on " + inst;
+            if (annotate == null) annotate = postfix;
+            else                  annotate += ", " + postfix;
+            return new Property.SimpleCommand<>(humanReadable(select, 
annotate), s -> {
+                s.model.validate(s.executeQuery(inst, Integer.MAX_VALUE, 
s.selectCl(), select), select);
+            });
+        }
+
         protected ConsistencyLevel selectCl()
         {
             return ConsistencyLevel.LOCAL_QUORUM;
@@ -333,11 +378,18 @@ public class StatefulASTBase extends TestBaseImpl
         {
             var inst = selectInstance(rs);
             String postfix = "on " + inst;
+            if (mutation.isCas())
+            {
+                postfix += ", would apply " + model.shouldApply(mutation);
+                // CAS doesn't allow timestamps
+                mutation = mutation.withoutTimestamp();
+            }
             if (annotate == null) annotate = postfix;
             else                  annotate += ", " + postfix;
+            Mutation finalMutation = mutation;
             return new Property.SimpleCommand<>(humanReadable(mutation, 
annotate), s -> {
-                s.executeQuery(inst, Integer.MAX_VALUE, s.mutationCl(), 
mutation);
-                s.model.update(mutation);
+                s.executeQuery(inst, Integer.MAX_VALUE, s.mutationCl(), 
finalMutation);
+                s.model.update(finalMutation);
                 s.mutation();
             });
         }
@@ -399,7 +451,26 @@ public class StatefulASTBase extends TestBaseImpl
                 SimpleStatement ss = new SimpleStatement(stmt.toCQL(), 
(Object[]) stmt.bindsEncoded());
                 if (fetchSize != Integer.MAX_VALUE)
                     ss.setFetchSize(fetchSize);
-                ss.setConsistencyLevel(toDriverCL(cl));
+                if (stmt instanceof Mutation)
+                {
+                    switch (cl)
+                    {
+                        case SERIAL:
+                            ss.setSerialConsistencyLevel(toDriverCL(cl));
+                            
ss.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.QUORUM);
+                            break;
+                        case LOCAL_SERIAL:
+                            ss.setSerialConsistencyLevel(toDriverCL(cl));
+                            
ss.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM);
+                            break;
+                        default:
+                            ss.setConsistencyLevel(toDriverCL(cl));
+                    }
+                }
+                else
+                {
+                    ss.setConsistencyLevel(toDriverCL(cl));
+                }
 
                 InetSocketAddress broadcastAddress = 
instance.config().broadcastAddress();
                 var host = client.getMetadata().getAllHosts().stream()
diff --git 
a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java 
b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
index d2fbb6edcc..f6a03bdd17 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
@@ -45,6 +45,7 @@ import com.google.common.collect.Sets;
 
 import accord.utils.Invariants;
 import org.apache.cassandra.cql3.ast.AssignmentOperator;
+import org.apache.cassandra.cql3.ast.CasCondition;
 import org.apache.cassandra.cql3.ast.Conditional;
 import org.apache.cassandra.cql3.ast.Conditional.Where.Inequality;
 import org.apache.cassandra.cql3.ast.Element;
@@ -54,10 +55,13 @@ import org.apache.cassandra.cql3.ast.FunctionCall;
 import org.apache.cassandra.cql3.ast.Literal;
 import org.apache.cassandra.cql3.ast.Mutation;
 import org.apache.cassandra.cql3.ast.Operator;
+import org.apache.cassandra.cql3.ast.Reference;
+import org.apache.cassandra.cql3.ast.ReferenceExpression;
 import org.apache.cassandra.cql3.ast.Select;
 import org.apache.cassandra.cql3.ast.StandardVisitors;
 import org.apache.cassandra.cql3.ast.Symbol;
 import org.apache.cassandra.cql3.ast.Value;
+import org.apache.cassandra.cql3.ast.Visitor;
 import org.apache.cassandra.db.BufferClustering;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -75,6 +79,8 @@ import static 
org.apache.cassandra.harry.model.BytesPartitionState.asCQL;
 
 public class ASTSingleTableModel
 {
+    private static final ByteBuffer[][] NO_ROWS = new ByteBuffer[0][];
+
     public final BytesPartitionState.Factory factory;
     private final TreeMap<BytesPartitionState.Ref, BytesPartitionState> 
partitions = new TreeMap<>();
 
@@ -184,6 +190,7 @@ public class ASTSingleTableModel
 
     public void update(Mutation mutation)
     {
+        if (!shouldApply(mutation)) return;
         switch (mutation.kind)
         {
             case INSERT:
@@ -200,7 +207,7 @@ public class ASTSingleTableModel
         }
     }
 
-    public void update(Mutation.Insert insert)
+    private void update(Mutation.Insert insert)
     {
         Clustering<ByteBuffer> pd = pd(insert);
         BytesPartitionState partition = partitions.get(factory.createRef(pd));
@@ -229,7 +236,7 @@ public class ASTSingleTableModel
                              true);
     }
 
-    public void update(Mutation.Update update)
+    private void update(Mutation.Update update)
     {
         var split = splitOnPartition(update.where.simplify());
         List<Clustering<ByteBuffer>> pks = split.left;
@@ -250,9 +257,12 @@ public class ASTSingleTableModel
                 for (Symbol col : 
Sets.intersection(factory.staticColumns.asSet(), set.keySet()))
                 {
                     ByteBuffer current = partition.staticRow().get(col);
-                    write.put(col, eval(col, current, set.get(col)));
+                    EvalResult result = eval(col, current, set.get(col));
+                    if (result.kind == EvalResult.Kind.SKIP) continue;
+                    write.put(col, result.value);
                 }
-                partition.setStaticColumns(write);
+                if (!write.isEmpty())
+                    partition.setStaticColumns(write);
             }
             // table has clustering but non are in the write, so only 
pk/static can be updated
             if (!factory.clusteringColumns.isEmpty() && remaining.isEmpty())
@@ -263,10 +273,13 @@ public class ASTSingleTableModel
                 for (Symbol col : 
Sets.intersection(factory.regularColumns.asSet(), set.keySet()))
                 {
                     ByteBuffer current = partition.get(cd, col);
-                    write.put(col, eval(col, current, set.get(col)));
+                    EvalResult result = eval(col, current, set.get(col));
+                    if (result.kind == EvalResult.Kind.SKIP) continue;
+                    write.put(col, result.value);
                 }
 
-                partition.setColumns(cd, write, false);
+                if (!write.isEmpty())
+                    partition.setColumns(cd, write, false);
             }
         }
     }
@@ -274,7 +287,7 @@ public class ASTSingleTableModel
     private enum DeleteKind
     {PARTITION, ROW, COLUMN}
 
-    public void update(Mutation.Delete delete)
+    private void update(Mutation.Delete delete)
     {
         //TODO (coverage): range deletes
         var split = splitOnPartition(delete.where.simplify());
@@ -328,6 +341,168 @@ public class ASTSingleTableModel
         }
     }
 
+    public boolean shouldApply(Mutation mutation)
+    {
+        if (!mutation.isCas()) return true;
+        return shouldApply(mutation, selectPartitionForCAS(mutation));
+    }
+
+    private SelectResult selectPartitionForCAS(Mutation mutation)
+    {
+        var partition = partitions.get(factory.createRef(pd(mutation)));
+        if (partition == null) return 
SelectResult.ordered(factory.selectionOrder, NO_ROWS);
+
+        var cd = cdOrNull(mutation);
+        var row = cd == null ? null : partition.get(cd);
+        ImmutableUniqueList<Symbol> columns = cd != null ? 
factory.selectionOrder : factory.partitionAndStaticColumns;
+        return SelectResult.ordered(columns, new ByteBuffer[][] { 
getRowAsByteBuffer(columns, partition, row)});
+    }
+
+    private boolean shouldApply(Mutation mutation, SelectResult current)
+    {
+        Preconditions.checkArgument(mutation.isCas());
+        // process condition
+        CasCondition condition;
+        switch (mutation.kind)
+        {
+            case INSERT:
+                condition = CasCondition.Simple.NotExists;
+                break;
+            case UPDATE:
+                condition = ((Mutation.Update) mutation).casCondition.get();
+                break;
+            case DELETE:
+                condition = ((Mutation.Delete) mutation).casCondition.get();
+                break;
+            default:
+                throw new UnsupportedOperationException(mutation.kind.name());
+        }
+        if (condition instanceof CasCondition.Simple)
+        {
+            boolean hasPartition = current.rows.length > 0;
+            boolean partitionOrRow = 
current.columns.equals(factory.partitionAndStaticColumns);
+            boolean hasRow = partitionOrRow ? hasPartition : 
current.isAllDefined(factory.clusteringColumns);
+            var simple = (CasCondition.Simple) condition;
+            switch (simple)
+            {
+                case Exists:
+                    return hasRow;
+                case NotExists:
+                    return !hasRow;
+                default:
+                    throw new UnsupportedOperationException(simple.name());
+            }
+        }
+        var ifCondition = (CasCondition.IfCondition) condition;
+        String letRow = "row";
+        Symbol rowSymbol = Symbol.unknownType(letRow);
+        Map<String, SelectResult> lets = Map.of(letRow, current);
+        // point the columns to be row.column that way it matches LET clause 
in BEGIN TRANSACTION, allowing better reuse
+        var updatedCondition = ifCondition.conditional.visit(new Visitor()
+        {
+            @Override
+            public ReferenceExpression visit(ReferenceExpression r)
+            {
+                Preconditions.checkArgument(!(r instanceof Reference), 
"Unexpected reference detected: %s", r);
+                return Reference.of(rowSymbol, r);
+            }
+        });
+        return process(updatedCondition, lets);
+    }
+
+    private boolean process(Conditional condition, Map<String, SelectResult> 
lets)
+    {
+        if (condition.getClass() == Conditional.Is.class)
+        {
+            var is = (Conditional.Is) condition;
+            Object result = extract(is.reference, lets);
+            return result == null
+                   ? is.kind == Conditional.Is.Kind.Null
+                   : is.kind == Conditional.Is.Kind.NotNull;
+        }
+        else if (condition.getClass() == Conditional.Where.class)
+        {
+            var where = (Conditional.Where) (condition);
+            if (!where.lhs.type().equals(where.rhs.type()))
+                throw new UnsupportedOperationException("For now where clause 
must always have matching types: given " + where.lhs.type() + ' ' + 
where.rhs.type());
+            ByteBuffer lhs = where.lhs instanceof ReferenceExpression
+                             ? (ByteBuffer) extract((ReferenceExpression) 
where.lhs, lets)
+                             : eval(where.lhs);
+            ByteBuffer rhs = where.rhs instanceof ReferenceExpression
+                             ? (ByteBuffer) extract((ReferenceExpression) 
where.rhs, lets)
+                             : eval(where.rhs);
+            // If anything is null avoid doing the test, but there is a 
special case where this returns true... both sides are null!
+            // This logic isn't consistent with other parts of the database 
and is local to CAS IF clause
+            // see ML@Inconsistent null handling between WHERE and IF clauses
+            if (lhs == null || rhs == null)
+                return lhs == rhs;
+            return where.kind.test(where.lhs.type(), lhs, rhs);
+        }
+        else if (condition.getClass() == Conditional.And.class)
+        {
+            var conditions = condition.simplify();
+            for (var c : conditions)
+            {
+                if (!process(c, lets))
+                    return false;
+            }
+            return true;
+        }
+        else
+        {
+            throw new UnsupportedOperationException("Unsupported condition 
type: " + condition.getClass() + "; " + condition.toCQL());
+        }
+    }
+
+    // Either ByteBuffer (cell) or ByteBuffer[] (row)
+    private static Object extract(ReferenceExpression expr, Map<String, 
SelectResult> lets)
+    {
+        Object result = extract0(expr, lets);
+        if (result instanceof SelectResult)
+        {
+            var rows = ((SelectResult) result).rows;
+            result = rows.length == 0 ? null : rows[0];
+        }
+        return result;
+    }
+
+    // o can be Map<String, SelectResult> (lets), SelectResult (row), 
ByteBuffer (cell)
+    private static Object extract0(ReferenceExpression expr, @Nullable Object 
o)
+    {
+        if (o == null) return null;
+        if (expr instanceof Reference)
+        {
+            Reference ref = (Reference) expr;
+            for (var symbol : ref.path)
+                o = extract0(symbol, o);
+            return o;
+        }
+        else if (expr instanceof Symbol)
+        {
+            var symbol = (Symbol) expr;
+            if (o instanceof Map)
+            {
+                Map<String, SelectResult> lets = (Map<String, SelectResult>) o;
+                return lets.get(symbol.symbol);
+            }
+            else if (o instanceof SelectResult)
+            {
+                SelectResult result = (SelectResult) o;
+                if (result.rows.length == 0)
+                    return null;
+                return result.rows[0][result.columns.indexOf(symbol)];
+            }
+            else
+            {
+                throw new UnsupportedOperationException("Unexpected object 
type: " + o.getClass());
+            }
+        }
+        else
+        {
+            throw new UnsupportedOperationException("Unsupported ref type: " + 
expr.getClass() + "; " + expr.toCQL());
+        }
+    }
+
     private List<Clustering<ByteBuffer>> clustering(List<Conditional> 
conditionals)
     {
         if (conditionals.isEmpty())
@@ -422,11 +597,74 @@ public class ASTSingleTableModel
         return Collections.singletonList(BufferClustering.make(bbs));
     }
 
+    private Clustering<ByteBuffer> pd(Mutation mutation)
+    {
+        switch (mutation.kind)
+        {
+            case INSERT:
+                return pd((Mutation.Insert) mutation);
+            case UPDATE:
+                return pd((Mutation.Update) mutation);
+            case DELETE:
+                return pd((Mutation.Delete) mutation);
+            default:
+                throw new UnsupportedOperationException(mutation.kind.name());
+        }
+    }
+
     private Clustering<ByteBuffer> pd(Mutation.Insert mutation)
     {
         return key(mutation.values, factory.partitionColumns);
     }
 
+    private Clustering<ByteBuffer> pd(Mutation.Update mutation)
+    {
+        return pd("Update", mutation.where.simplify());
+    }
+
+    private Clustering<ByteBuffer> pd(Mutation.Delete mutation)
+    {
+        return pd("Delete", mutation.where.simplify());
+    }
+
+    private Clustering<ByteBuffer> pd(String type, List<Conditional> 
conditionals)
+    {
+        var split = splitOnPartition(conditionals);
+        List<Clustering<ByteBuffer>> pks = split.left;
+        Preconditions.checkArgument(pks.size() == 1, "%s had more than 1 
partition key!  expected 1 but was %s", type, pks.size());
+        return pks.get(0);
+    }
+
+    @Nullable
+    private Clustering<ByteBuffer> cdOrNull(Mutation mutation)
+    {
+        if (factory.clusteringColumns.isEmpty()) return Clustering.EMPTY;
+        if (mutation.kind == Mutation.Kind.INSERT)
+        {
+            var insert = (Mutation.Insert) mutation;
+            return 
!insert.values.keySet().containsAll(factory.clusteringColumns)
+                   ? null
+                   : key(insert.values, factory.clusteringColumns);
+        }
+        Conditional where;
+        switch (mutation.kind)
+        {
+            case UPDATE:
+                where = ((Mutation.Update) mutation).where;
+                break;
+            case DELETE:
+                where = ((Mutation.Delete) mutation).where;
+            break;
+            default:
+                throw new UnsupportedOperationException("Unexpected mutation: 
" + mutation.kind);
+        }
+        var partitions = splitOnPartition(where.simplify());
+        if (partitions.right.isEmpty()) return null;
+        var matches = clustering(partitions.right);
+        Preconditions.checkArgument(matches.size() == 1);
+        return matches.get(0);
+    }
+
     public BytesPartitionState get(BytesPartitionState.Ref ref)
     {
         return partitions.get(ref);
@@ -611,20 +849,53 @@ public class ASTSingleTableModel
 
     private static class SelectResult
     {
+        private final ImmutableUniqueList<Symbol> columns;
         private final ByteBuffer[][] rows;
         private final boolean unordered;
 
-        private SelectResult(ByteBuffer[][] rows, boolean unordered)
+        private SelectResult(ImmutableUniqueList<Symbol> columns, 
ByteBuffer[][] rows, boolean unordered)
         {
+            this.columns = columns;
             this.rows = rows;
             this.unordered = unordered;
         }
+
+        private static SelectResult ordered(ImmutableUniqueList<Symbol> 
columns, ByteBuffer[][] rows)
+        {
+            return new SelectResult(columns, rows, false);
+        }
+
+        private static SelectResult unordered(ImmutableUniqueList<Symbol> 
columns, ByteBuffer[][] rows)
+        {
+            return new SelectResult(columns, rows, true);
+        }
+
+        public boolean isAllDefined(ImmutableUniqueList<Symbol> selectColumns)
+        {
+            if (rows.length == 0) return false;
+            for (var row : rows)
+            {
+                for (var col : selectColumns)
+                {
+                    if (row[columns.indexOf(col)] == null)
+                        return false;
+                }
+            }
+            return true;
+        }
+    }
+
+    public ImmutableUniqueList<Symbol> columns(Select select)
+    {
+        if (select.selections.isEmpty()) return factory.selectionOrder;
+        throw new UnsupportedOperationException("Getting columns from select 
other than SELECT * is currently not supported");
     }
 
     private SelectResult getRowsAsByteBuffer(Select select)
     {
+        ImmutableUniqueList<Symbol> columns = columns(select);
         if (select.where.isEmpty())
-            return new SelectResult(getRowsAsByteBuffer(applyLimits(all(), 
select.perPartitionLimit, select.limit)), false);
+            return SelectResult.ordered(columns, 
getRowsAsByteBuffer(applyLimits(all(), select.perPartitionLimit, 
select.limit)));
         LookupContext ctx = context(select);
         List<PrimaryKey> primaryKeys;
         if (ctx.unmatchable)
@@ -652,7 +923,7 @@ public class ASTSingleTableModel
         }
         primaryKeys = applyLimits(primaryKeys, select.perPartitionLimit, 
select.limit);
         //TODO (correctness): now that we have the rows we need to handle the 
selections/aggregation/limit/group-by/etc.
-        return new SelectResult(getRowsAsByteBuffer(primaryKeys), 
ctx.unordered);
+        return new SelectResult(columns, getRowsAsByteBuffer(primaryKeys), 
ctx.unordered);
     }
 
     private List<PrimaryKey> applyLimits(List<PrimaryKey> primaryKeys, 
Optional<Value> perPartitionLimitOpt, Optional<Value> limitOpt)
@@ -709,20 +980,37 @@ public class ASTSingleTableModel
     }
 
     private ByteBuffer[] getRowAsByteBuffer(BytesPartitionState partition, 
@Nullable BytesPartitionState.Row row)
+    {
+        return getRowAsByteBuffer(factory.selectionOrder, partition, row);
+    }
+
+    private ByteBuffer[] getRowAsByteBuffer(ImmutableUniqueList<Symbol> 
columns, BytesPartitionState partition, @Nullable BytesPartitionState.Row row)
     {
         Clustering<ByteBuffer> pd = partition.key;
         BytesPartitionState.Row staticRow = partition.staticRow();
-        ByteBuffer[] bbs = new ByteBuffer[factory.selectionOrder.size()];
+        ByteBuffer[] bbs = new ByteBuffer[columns.size()];
         for (Symbol col : factory.partitionColumns)
-            bbs[factory.selectionOrder.indexOf(col)] = 
pd.bufferAt(factory.partitionColumns.indexOf(col));
+        {
+            if (!columns.contains(col)) continue;
+            bbs[columns.indexOf(col)] = 
pd.bufferAt(factory.partitionColumns.indexOf(col));
+        }
         for (Symbol col : factory.staticColumns)
-            bbs[factory.selectionOrder.indexOf(col)] = staticRow.get(col);
+        {
+            if (!columns.contains(col)) continue;
+            bbs[columns.indexOf(col)] = staticRow.get(col);
+        }
         if (row != null)
         {
             for (Symbol col : factory.clusteringColumns)
-                bbs[factory.selectionOrder.indexOf(col)] = 
row.clustering.bufferAt(factory.clusteringColumns.indexOf(col));
+            {
+                if (!columns.contains(col)) continue;
+                bbs[columns.indexOf(col)] = 
row.clustering.bufferAt(factory.clusteringColumns.indexOf(col));
+            }
             for (Symbol col : factory.regularColumns)
-                bbs[factory.selectionOrder.indexOf(col)] = row.get(col);
+            {
+                if (!columns.contains(col)) continue;
+                bbs[columns.indexOf(col)] = row.get(col);
+            }
         }
         return bbs;
     }
@@ -942,26 +1230,52 @@ public class ASTSingleTableModel
         return 
current.stream().map(BufferClustering::new).collect(Collectors.toList());
     }
 
-    private static ByteBuffer eval(Symbol col, @Nullable ByteBuffer current, 
Expression e)
+    private static class EvalResult
+    {
+        private static final EvalResult SKIP = new EvalResult(Kind.SKIP, null);
+
+        private enum Kind { SKIP, ACCEPT }
+
+        private final Kind kind;
+        private final @Nullable ByteBuffer value;
+
+        private EvalResult(Kind kind, @Nullable ByteBuffer value)
+        {
+            this.kind = kind;
+            this.value = value;
+        }
+
+        private static EvalResult accept(@Nullable ByteBuffer bb)
+        {
+            return new EvalResult(Kind.ACCEPT, bb);
+        }
+    }
+
+    private static EvalResult eval(Symbol col, @Nullable ByteBuffer current, 
Expression e)
     {
-        if (!(e instanceof AssignmentOperator)) return eval(e);
+        if (!(e instanceof AssignmentOperator)) return 
EvalResult.accept(eval(e));
+        current = col.type().sanitize(current);
         // multi cell collections have the property that they do update even 
if the current value is null
         boolean isFancy = col.type().isCollection() && 
col.type().isMultiCell();
-        if (current == null && !isFancy) return null; // null + ? == null
+        if (current == null && !isFancy) return EvalResult.SKIP; // null + ? 
== null
         var assignment = (AssignmentOperator) e;
         if (isFancy && current == null)
         {
             return assignment.kind == AssignmentOperator.Kind.SUBTRACT
                    // if it doesn't exist, then there is nothing to subtract
-                   ? null
-                   : eval(assignment.right);
+                   ? EvalResult.SKIP
+                   : EvalResult.accept(eval(assignment.right));
         }
+        // validate your inputs...
+        ByteBuffer rhs = col.type().sanitize(eval(assignment.right));
+        if (rhs == null)
+            return EvalResult.SKIP;
         switch (assignment.kind)
         {
             case ADD:
-                return eval(new Operator(Operator.Kind.ADD, new 
Literal(current, e.type()), assignment.right));
+                return EvalResult.accept(eval(new Operator(Operator.Kind.ADD, 
new Literal(current, e.type()), assignment.right)));
             case SUBTRACT:
-                return eval(new Operator(Operator.Kind.SUBTRACT, new 
Literal(current, e.type()), assignment.right));
+                return EvalResult.accept(eval(new 
Operator(Operator.Kind.SUBTRACT, new Literal(current, e.type()), 
assignment.right)));
             default:
                 throw new UnsupportedOperationException(assignment.kind + ": " 
+ assignment.toCQL());
         }
diff --git 
a/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java 
b/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
index a10524968a..c2d18e573d 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
@@ -449,9 +449,10 @@ public class BytesPartitionState
         public final TableMetadata metadata;
         public final ImmutableUniqueList<Symbol> partitionColumns;
         public final ImmutableUniqueList<Symbol> clusteringColumns;
+        public final ImmutableUniqueList<Symbol> primaryColumns;
         public final ImmutableUniqueList<Symbol> staticColumns;
         public final ImmutableUniqueList<Symbol> regularColumns;
-        public final ImmutableUniqueList<Symbol> selectionOrder, 
regularAndStaticColumns;
+        public final ImmutableUniqueList<Symbol> selectionOrder, 
partitionAndStaticColumns, regularAndStaticColumns;
         public final ClusteringComparator clusteringComparator;
 
 
@@ -471,9 +472,23 @@ public class BytesPartitionState
             for (ColumnMetadata pk : metadata.clusteringColumns())
                 symbolListBuilder.add(Symbol.from(pk));
             clusteringColumns = symbolListBuilder.buildAndClear();
+            if (clusteringColumns.isEmpty()) primaryColumns = partitionColumns;
+            else
+            {
+                symbolListBuilder.addAll(partitionColumns);
+                symbolListBuilder.addAll(clusteringColumns);
+                primaryColumns = symbolListBuilder.buildAndClear();
+            }
             for (ColumnMetadata pk : metadata.staticColumns())
                 symbolListBuilder.add(Symbol.from(pk));
             staticColumns = symbolListBuilder.buildAndClear();
+            if (staticColumns.isEmpty()) partitionAndStaticColumns = 
partitionColumns;
+            else
+            {
+                symbolListBuilder.addAll(partitionColumns);
+                symbolListBuilder.addAll(staticColumns);
+                partitionAndStaticColumns = symbolListBuilder.buildAndClear();
+            }
             for (ColumnMetadata pk : metadata.regularColumns())
                 symbolListBuilder.add(Symbol.from(pk));
             regularColumns = symbolListBuilder.buildAndClear();
diff --git a/test/unit/org/apache/cassandra/cql3/KnownIssue.java 
b/test/unit/org/apache/cassandra/cql3/KnownIssue.java
index b1c2c09d66..be2dfe7524 100644
--- a/test/unit/org/apache/cassandra/cql3/KnownIssue.java
+++ b/test/unit/org/apache/cassandra/cql3/KnownIssue.java
@@ -40,7 +40,9 @@ public enum KnownIssue
     
AF_MULTI_NODE_MULTI_COLUMN_AND_NODE_LOCAL_WRITES("https://issues.apache.org/jira/browse/CASSANDRA-19007";,
                                                      "When doing multi 
node/multi column queries, AF can miss data when the nodes are not in-sync"),
     
SAI_AND_VECTOR_COLUMNS("https://issues.apache.org/jira/browse/CASSANDRA-20464";,
-                           "When doing an SAI query, if the where clause also 
contains a vector column bad results can be produced")
+                           "When doing an SAI query, if the where clause also 
contains a vector column bad results can be produced"),
+    
CAS_CONDITION_ON_UDT_W_EMPTY_BYTES("https://issues.apache.org/jira/browse/CASSANDRA-20479";,
+                                       "WHERE clause blocks operations on UDTs 
but CAS allows in IF clause.  During this path empty can be confused with null 
which allows non-existing rows to match empty bytes"),
     ;
 
     KnownIssue(String url, String description)
diff --git a/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java 
b/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java
index 0ffb65411b..e3918f70da 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java
@@ -59,10 +59,15 @@ public class AssignmentOperator implements Expression
         EnumSet<Kind> result = EnumSet.noneOf(Kind.class);
         if (type instanceof CollectionType && type.isMultiCell())
         {
-            if (type instanceof SetType || type instanceof ListType)
+            if (type instanceof SetType)
                 return EnumSet.of(Kind.ADD, Kind.SUBTRACT);
+            if (type instanceof ListType)
+                return isTransaction
+                       ? EnumSet.of(Kind.ADD, Kind.SUBTRACT)
+                       : EnumSet.of(Kind.ADD);
             if (type instanceof MapType)
             {
+                //TODO (coverage): include SUBTRACT support
                 // map supports subtract, but not map - map; only map - set!
                 // since this is annoying to support, for now dropping -
                 return EnumSet.of(Kind.ADD);
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Bind.java 
b/test/unit/org/apache/cassandra/cql3/ast/Bind.java
index ac58dfc7df..788d301dbf 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Bind.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Bind.java
@@ -54,6 +54,7 @@ public class Bind implements Value
     @Override
     public ByteBuffer valueEncoded()
     {
+        if (value == null) return null;
         return value instanceof ByteBuffer ? (ByteBuffer) value : 
((AbstractType) type).decompose(value);
     }
 
diff --git a/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java 
b/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java
index 9f8d5867e0..d0d4d0e35b 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java
@@ -22,6 +22,8 @@ import java.util.stream.Stream;
 
 public interface CasCondition extends Element
 {
+    CasCondition visit(Visitor v);
+
     enum Simple implements CasCondition
     {
         NotExists("IF NOT EXISTS"),
@@ -39,11 +41,17 @@ public interface CasCondition extends Element
         {
             sb.append(cql);
         }
+
+        @Override
+        public CasCondition visit(Visitor v)
+        {
+            return v.visit(this);
+        }
     }
 
     class IfCondition implements CasCondition
     {
-        private final Conditional conditional;
+        public final Conditional conditional;
 
         public IfCondition(Conditional conditional)
         {
@@ -62,5 +70,15 @@ public interface CasCondition extends Element
         {
             return Stream.of(conditional);
         }
+
+        @Override
+        public CasCondition visit(Visitor v)
+        {
+            var u = v.visit(this);
+            if (u != this) return u;
+            var c = conditional.visit(v);
+            if (c == conditional) return this;
+            return new IfCondition(c);
+        }
     }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Conditional.java 
b/test/unit/org/apache/cassandra/cql3/ast/Conditional.java
index 52f79bb1dc..66012c060c 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Conditional.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Conditional.java
@@ -72,6 +72,21 @@ public interface Conditional extends Expression
             {
                 this.value = value;
             }
+
+            public boolean test(AbstractType<?> type, ByteBuffer a, ByteBuffer 
b)
+            {
+                int rc = type.compare(a, b);
+                switch (this)
+                {
+                    case EQUAL: return rc == 0;
+                    case NOT_EQUAL: return rc != 0;
+                    case GREATER_THAN: return rc > 0;
+                    case GREATER_THAN_EQ: return rc >=0;
+                    case LESS_THAN: return rc < 0;
+                    case LESS_THAN_EQ: return rc <=0;
+                    default: throw new 
UnsupportedOperationException(this.name());
+                }
+            }
         }
 
         public final Inequality kind;
diff --git a/test/unit/org/apache/cassandra/cql3/ast/ExpressionEvaluator.java 
b/test/unit/org/apache/cassandra/cql3/ast/ExpressionEvaluator.java
index 34acb843c7..a626dcd6b0 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/ExpressionEvaluator.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/ExpressionEvaluator.java
@@ -55,6 +55,11 @@ public class ExpressionEvaluator
         Object rhs = eval(e.right);
         if (rhs instanceof ByteBuffer)
             rhs = e.right.type().compose((ByteBuffer) rhs);
+        // null + 42 = null
+        // 42 + null = null
+        // if anything is null, everything is null!
+        if (lhs == null || rhs == null)
+            return null;
         switch (e.kind)
         {
             case ADD:
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Literal.java 
b/test/unit/org/apache/cassandra/cql3/ast/Literal.java
index 3886fe91b4..4bd2f9b631 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Literal.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Literal.java
@@ -56,6 +56,7 @@ public class Literal implements Value
     @Override
     public ByteBuffer valueEncoded()
     {
+        if (value == null) return null;
         return value instanceof ByteBuffer ? (ByteBuffer) value : 
((AbstractType) type).decompose(value);
     }
 
@@ -69,6 +70,11 @@ public class Literal implements Value
     public void toCQL(StringBuilder sb, CQLFormatter formatter)
     {
         ByteBuffer bytes = valueEncoded();
+        if (bytes == null)
+        {
+            sb.append("null");
+            return;
+        }
         if (bytes.remaining() == 0 && !actuallySupportsEmpty(type))
         {
             sb.append("<empty bytes>");
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Mutation.java 
b/test/unit/org/apache/cassandra/cql3/ast/Mutation.java
index 9126d8cbda..95987dc657 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Mutation.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Mutation.java
@@ -52,6 +52,8 @@ public abstract class Mutation implements Statement
 
     public abstract boolean isCas();
 
+    public abstract Mutation withoutTimestamp();
+
     public Mutation withTimestamp(long timestamp)
     {
         return withTimestamp(new Timestamp(new Literal(timestamp, 
LongType.instance)));
@@ -172,6 +174,11 @@ public abstract class Mutation implements Statement
             this.timestamp = timestamp;
         }
 
+        public Using withoutTimestamp()
+        {
+            return new Using(ttl, Optional.empty());
+        }
+
         public Using withTimestamp(Timestamp timestamp)
         {
             return new Using(ttl, Optional.of(timestamp));
@@ -299,6 +306,14 @@ public abstract class Mutation implements Statement
             return ifNotExists;
         }
 
+        @Override
+        public Mutation withoutTimestamp()
+        {
+            return new Insert(table, values, ifNotExists, using.isEmpty()
+                                                          ? using
+                                                          : using.map(u -> 
u.withoutTimestamp()));
+        }
+
         @Override
         public Insert withTimestamp(Timestamp timestamp)
         {
@@ -404,9 +419,20 @@ public abstract class Mutation implements Statement
             Conditional copiedWhere = where.visit(v);
             if (where != copiedWhere)
                 updated = true;
+            Optional<? extends CasCondition> updatedCasCondition = 
casCondition;
+            if (casCondition.isPresent())
+            {
+                CasCondition original = casCondition.get();
+                var casCopy = original.visit(v);
+                if (casCopy != original)
+                {
+                    updatedCasCondition = Optional.ofNullable(casCopy);
+                    updated = true;
+                }
+            }
 
             if (!updated) return this;
-            return new Update(table, using, copied, copiedWhere, casCondition);
+            return new Update(table, using, copied, copiedWhere, 
updatedCasCondition);
         }
 
         @Override
@@ -415,6 +441,12 @@ public abstract class Mutation implements Statement
             return casCondition.isPresent();
         }
 
+        @Override
+        public Mutation withoutTimestamp()
+        {
+            return new Update(table, using.isEmpty() ? using : using.map(u -> 
u.withoutTimestamp()), set, where, casCondition);
+        }
+
         @Override
         public Update withTimestamp(Timestamp timestamp)
         {
@@ -520,9 +552,20 @@ WHERE PK_column_conditions
             var copiedWhere = where.visit(v);
             if (copiedWhere != where)
                 updated = true;
+            Optional<? extends CasCondition> updatedCasCondition = 
casCondition;
+            if (casCondition.isPresent())
+            {
+                CasCondition original = casCondition.get();
+                var casCopy = original.visit(v);
+                if (casCopy != original)
+                {
+                    updatedCasCondition = Optional.ofNullable(casCopy);
+                    updated = true;
+                }
+            }
 
             if (!updated) return this;
-            return new Delete(copiedColumns, table, timestamp, copiedWhere, 
casCondition);
+            return new Delete(copiedColumns, table, timestamp, copiedWhere, 
updatedCasCondition);
         }
 
         @Override
@@ -531,6 +574,12 @@ WHERE PK_column_conditions
             return casCondition.isPresent();
         }
 
+        @Override
+        public Mutation withoutTimestamp()
+        {
+            return new Delete(columns, table, Optional.empty(), where, 
casCondition);
+        }
+
         @Override
         public Delete withTimestamp(Timestamp timestamp)
         {
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Reference.java 
b/test/unit/org/apache/cassandra/cql3/ast/Reference.java
index fe836a05dd..ee3b73d6fd 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Reference.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Reference.java
@@ -48,6 +48,11 @@ public class Reference implements ReferenceExpression
         return new 
Reference(Collections.singletonList(Objects.requireNonNull(top)));
     }
 
+    public static Reference of(ReferenceExpression top, ReferenceExpression 
next)
+    {
+        return new Reference(List.of(top, next));
+    }
+
     @Override
     public AbstractType<?> type()
     {
diff --git a/test/unit/org/apache/cassandra/cql3/ast/ReferenceExpression.java 
b/test/unit/org/apache/cassandra/cql3/ast/ReferenceExpression.java
index 7a99e030a4..d1727fccdb 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/ReferenceExpression.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/ReferenceExpression.java
@@ -26,6 +26,6 @@ public interface ReferenceExpression extends Expression
     @Override
     default ReferenceExpression visit(Visitor v)
     {
-        return this;
+        return v.visit(this);
     }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Value.java 
b/test/unit/org/apache/cassandra/cql3/ast/Value.java
index f04b5fcd36..92ef010183 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Value.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Value.java
@@ -20,11 +20,15 @@ package org.apache.cassandra.cql3.ast;
 
 import java.nio.ByteBuffer;
 
+import javax.annotation.Nullable;
+
 import org.apache.cassandra.db.marshal.AbstractType;
 
 public interface Value extends Expression
 {
+    @Nullable
     Object value();
+    @Nullable
     ByteBuffer valueEncoded();
     Value with(Object value, AbstractType<?> type);
 
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Visitor.java 
b/test/unit/org/apache/cassandra/cql3/ast/Visitor.java
index 71e4797c25..c87415e981 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Visitor.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Visitor.java
@@ -71,6 +71,11 @@ public interface Visitor
 
     default Value visit(Value v) { return v; }
 
+    default CasCondition visit(CasCondition s)
+    {
+        return s;
+    }
+
     class CompositeVisitor implements Visitor
     {
         private final List<Visitor> visitors;
diff --git a/test/unit/org/apache/cassandra/utils/ASTGenerators.java 
b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
index 47b2267de3..cdf533cb42 100644
--- a/test/unit/org/apache/cassandra/utils/ASTGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
@@ -166,6 +166,7 @@ public class ASTGenerators
         private Gen<?> valueGen;
         private Gen<Boolean> useOperator = SourceDSL.booleans().all();
         private Gen<Boolean> useEmpty = SourceDSL.arbitrary().constant(false);
+        private Gen<Boolean> useNull = SourceDSL.arbitrary().constant(false);
         private BiFunction<Object, AbstractType<?>, Gen<Value>> 
literalOrBindGen = ASTGenerators::valueGen;
 
         public ExpressionBuilder(AbstractType<?> type)
@@ -182,6 +183,12 @@ public class ASTGenerators
             return this;
         }
 
+        public ExpressionBuilder allowNull()
+        {
+            useNull = SourceDSL.integers().between(1, 100).map(i -> i < 10);
+            return this;
+        }
+
         public ExpressionBuilder withOperators()
         {
             useOperator = i -> true;
@@ -217,6 +224,8 @@ public class ASTGenerators
             //TODO (coverage): rather than single level operators, allow 
nested (a + b + c + d)
             Gen<Value> leaf = rs -> 
literalOrBindGen.apply(valueGen.generate(rs), type).generate(rs);
             return rs -> {
+                if (useNull.generate(rs))
+                    return new Bind(null, type);
                 if (useEmpty.generate(rs))
                     return new Bind(ByteBufferUtil.EMPTY_BYTE_BUFFER, type);
                 Expression e = leaf.generate(rs);
@@ -395,6 +404,12 @@ public class ASTGenerators
             return this;
         }
 
+        public MutationGenBuilder allowNull(Symbol symbol)
+        {
+            columnExpressions.get(symbol).allowNull();
+            return this;
+        }
+
         public MutationGenBuilder withDeletionKind(Gen<DeleteKind> 
deleteKindGen)
         {
             this.deleteKindGen = deleteKindGen;
@@ -432,7 +447,7 @@ public class ASTGenerators
 
         public MutationGenBuilder withCasGen(Gen<Boolean> withCasGen)
         {
-            withCasGen = Objects.requireNonNull(withCasGen);
+            this.withCasGen = Objects.requireNonNull(withCasGen);
             return this;
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to