This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d06e496773 Update AST Harry CAS fuzz tests to validate the CAS response
d06e496773 is described below

commit d06e49677330b19db99691b8f2bd3f5faedeba5d
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Tue Apr 8 18:26:15 2025 -0700

    Update AST Harry CAS fuzz tests to validate the CAS response
    
    patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-20517
---
 .../cassandra/cql3/statements/CQL3CasRequest.java  |  23 +-
 .../test/cql3/CasMultiNodeTableWalkBase.java       |   3 +-
 .../test/cql3/SingleNodeTableWalkTest.java         |   3 +-
 .../test/cql3/SingleNodeTokenConflictTest.java     |   7 +-
 .../distributed/test/cql3/StatefulASTBase.java     |   8 +-
 .../apache/cassandra/harry/gen/BijectionCache.java |   6 +-
 .../cassandra/harry/model/ASTSingleTableModel.java | 615 ++++++++++++++++-----
 .../harry/model/ASTSingleTableModelTest.java       |  87 ++-
 .../cassandra/harry/model/BytesPartitionState.java | 153 +++--
 .../cassandra/harry/model/PartitionState.java      |   6 +-
 .../apache/cassandra/harry/util/StringUtils.java   |   2 +-
 .../unit/org/apache/cassandra/cql3/KnownIssue.java |   4 +
 .../cassandra/cql3/ast/AssignmentOperator.java     |   6 +
 .../apache/cassandra/cql3/ast/CasCondition.java    |  11 +
 .../org/apache/cassandra/cql3/ast/Conditional.java |   5 +
 .../org/apache/cassandra/cql3/ast/Elements.java    |   9 +
 .../org/apache/cassandra/cql3/ast/Expression.java  |   5 +
 .../org/apache/cassandra/cql3/ast/Literal.java     |   6 +
 .../org/apache/cassandra/cql3/ast/Mutation.java    | 147 ++++-
 .../org/apache/cassandra/cql3/ast/Operator.java    |   6 +
 .../unit/org/apache/cassandra/cql3/ast/Select.java |  15 +
 .../cassandra/cql3/ast/StandardVisitors.java       |  10 +
 .../org/apache/cassandra/cql3/ast/Statement.java   |   7 +-
 test/unit/org/apache/cassandra/cql3/ast/Value.java |   5 +
 .../org/apache/cassandra/utils/ASTGenerators.java  |  97 +++-
 .../cassandra/utils/ImmutableUniqueList.java       |  54 +-
 26 files changed, 1048 insertions(+), 252 deletions(-)

diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 4db98459ec..332f8b9388 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -149,7 +149,7 @@ public class CQL3CasRequest implements CASRequest
         }
         else if (!(condition instanceof ColumnsConditions))
         {
-            throw new InvalidRequestException("Cannot mix IF conditions and IF 
NOT EXISTS for the same row");
+            throw new InvalidRequestException("Cannot mix IF conditions and " 
+ ((ToCQL) condition).toCQL() + " for the same row");
         }
         ((ColumnsConditions)condition).addConditions(conds, options);
     }
@@ -352,7 +352,12 @@ public class CQL3CasRequest implements CASRequest
         public abstract boolean appliesTo(FilteredPartition current) throws 
InvalidRequestException;
     }
 
-    private static class NotExistCondition extends RowCondition
+    private interface ToCQL
+    {
+        String toCQL();
+    }
+
+    private static class NotExistCondition extends RowCondition implements 
ToCQL
     {
         private NotExistCondition(Clustering<?> clustering)
         {
@@ -363,9 +368,15 @@ public class CQL3CasRequest implements CASRequest
         {
             return current.getRow(clustering) == null;
         }
+
+        @Override
+        public String toCQL()
+        {
+            return "IF NOT EXISTS";
+        }
     }
 
-    private static class ExistCondition extends RowCondition
+    private static class ExistCondition extends RowCondition implements ToCQL
     {
         private ExistCondition(Clustering<?> clustering)
         {
@@ -376,6 +387,12 @@ public class CQL3CasRequest implements CASRequest
         {
             return current.getRow(clustering) != null;
         }
+
+        @Override
+        public String toCQL()
+        {
+            return "IF EXISTS";
+        }
     }
 
     private static class ColumnsConditions extends RowCondition
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
index bf8a44dcb9..31d1aab31f 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
@@ -78,7 +78,8 @@ public abstract class CasMultiNodeTableWalkBase extends 
MultiNodeTableWalkBase
         @Override
         protected Gen<Mutation> toMutationGen(ASTGenerators.MutationGenBuilder 
mutationGenBuilder)
         {
-            mutationGenBuilder.withCasGen(i -> true);
+            mutationGenBuilder.withCasGen(i -> true)
+                              .withAllowUpdateMultipleClusteringKeys(false); 
// paxos supports but the model doesn't yet
             // generator might not always generate a cas statement... should 
fix generator!
             Gen<Mutation> gen = 
toGen(mutationGenBuilder.build()).filter(Mutation::isCas);
             if (metadata.regularAndStaticColumns().stream().anyMatch(c -> 
c.type.isUDT())
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
index 1feb9c5f86..785db954c8 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
@@ -458,7 +458,8 @@ public class SingleNodeTableWalkTest extends StatefulASTBase
                                                                   .withoutTtl()
                                                                   
.withoutTimestamp()
                                                                   
.withPartitions(Generators.fromGen(Gens.mixedDistribution(uniquePartitions).next(rs)))
-                                                                  
.withColumnExpressions(e -> 
e.withOperators(Generators.fromGen(BOOLEAN_DISTRIBUTION.next(rs))));
+                                                                  
.withColumnExpressions(e -> 
e.withOperators(Generators.fromGen(BOOLEAN_DISTRIBUTION.next(rs))))
+                                                                  
.withIgnoreIssues(IGNORED_ISSUES);
             if (IGNORED_ISSUES.contains(KnownIssue.SAI_EMPTY_TYPE))
             {
                 model.factory.regularAndStaticColumns.stream()
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java
index 179b107eae..dd15b99d55 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java
@@ -335,9 +335,9 @@ public class SingleNodeTokenConflictTest extends 
StatefulASTBase
                 this.neighbors = rs.nextBoolean() ? Collections.emptyList() : 
extractNeighbors(pkValues);
                 // in case neighbors conflicts with pkValues or tokenValues, 
use ImmutableUniqueList which will ignore rather than fail
                 this.pkValues = ImmutableUniqueList.<ByteBuffer>builder()
-                                                   .mayAddAll(pkValues)
-                                                   .mayAddAll(tokenValues)
-                                                   .mayAddAll(neighbors)
+                                                   .addAll(pkValues)
+                                                   .addAll(tokenValues)
+                                                   .addAll(neighbors)
                                                    .build();
                 this.pkGen = Gens.pick(pkValues);
                 this.order = new TreeSet<>(PK_TYPE);
@@ -367,6 +367,7 @@ public class SingleNodeTokenConflictTest extends 
StatefulASTBase
                                      .withoutTtl()
                                      .withoutTimestamp()
                                      
.withPartitions(SourceDSL.arbitrary().pick(uniquePartitions))
+                                     .withIgnoreIssues(IGNORED_ISSUES)
                                      .build());
         }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
index a90b467979..4a4a1e6a61 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
@@ -269,7 +269,7 @@ public class StatefulASTBase extends TestBaseImpl
 
             this.metadata = metadata;
             this.tableRef = TableReference.from(metadata);
-            this.model = new ASTSingleTableModel(metadata);
+            this.model = new ASTSingleTableModel(metadata, IGNORED_ISSUES);
             createTable(metadata);
         }
 
@@ -388,8 +388,8 @@ public class StatefulASTBase extends TestBaseImpl
             else                  annotate += ", " + postfix;
             Mutation finalMutation = mutation;
             return new Property.SimpleCommand<>(humanReadable(mutation, 
annotate), s -> {
-                s.executeQuery(inst, Integer.MAX_VALUE, s.mutationCl(), 
finalMutation);
-                s.model.update(finalMutation);
+                var result = s.executeQuery(inst, Integer.MAX_VALUE, 
s.mutationCl(), finalMutation);
+                s.model.updateAndValidate(result, finalMutation);
                 s.mutation();
             });
         }
@@ -451,7 +451,7 @@ public class StatefulASTBase extends TestBaseImpl
                 SimpleStatement ss = new SimpleStatement(stmt.toCQL(), 
(Object[]) stmt.bindsEncoded());
                 if (fetchSize != Integer.MAX_VALUE)
                     ss.setFetchSize(fetchSize);
-                if (stmt instanceof Mutation)
+                if (stmt.kind() == Statement.Kind.MUTATION)
                 {
                     switch (cl)
                     {
diff --git a/test/harry/main/org/apache/cassandra/harry/gen/BijectionCache.java 
b/test/harry/main/org/apache/cassandra/harry/gen/BijectionCache.java
index a388f195e9..d1f9e0db7b 100644
--- a/test/harry/main/org/apache/cassandra/harry/gen/BijectionCache.java
+++ b/test/harry/main/org/apache/cassandra/harry/gen/BijectionCache.java
@@ -41,6 +41,8 @@ public class BijectionCache<T> implements 
Bijections.Bijection<T>
     @Override
     public T inflate(long descriptor)
     {
+        if (MagicConstants.NIL_DESCR == descriptor)
+            throw new IllegalArgumentException("Asked for NIL_DESCR");
         T value = valueToDescriptor.inverse().get(descriptor);
         if (value == null)
             throw new IllegalArgumentException(String.format("Attempted to 
inflate %d, but it is undefined", descriptor));
@@ -124,6 +126,8 @@ public class BijectionCache<T> implements 
Bijections.Bijection<T>
     @Override
     public int compare(long l, long r)
     {
-        throw new UnsupportedOperationException();
+        T lhs = inflate(l);
+        T rhs = inflate(r);
+        return comparator.compare(lhs, rhs);
     }
 }
diff --git 
a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java 
b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
index f6a03bdd17..ee08634e1a 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
@@ -24,8 +24,10 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -33,17 +35,18 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.function.Function;
 import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import accord.utils.Invariants;
+import org.apache.cassandra.cql3.KnownIssue;
 import org.apache.cassandra.cql3.ast.AssignmentOperator;
 import org.apache.cassandra.cql3.ast.CasCondition;
 import org.apache.cassandra.cql3.ast.Conditional;
@@ -65,6 +68,7 @@ import org.apache.cassandra.cql3.ast.Visitor;
 import org.apache.cassandra.db.BufferClustering;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BooleanType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.harry.model.BytesPartitionState.PrimaryKey;
@@ -75,18 +79,32 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ImmutableUniqueList;
 import org.apache.cassandra.utils.Pair;
 
+import static org.apache.cassandra.cql3.ast.Elements.symbols;
 import static org.apache.cassandra.harry.model.BytesPartitionState.asCQL;
 
 public class ASTSingleTableModel
 {
     private static final ByteBuffer[][] NO_ROWS = new ByteBuffer[0][];
+    private static final Symbol CAS_APPLIED = new 
Symbol.UnquotedSymbol("[applied]", BooleanType.instance);
+    private static final ImmutableUniqueList<Symbol> CAS_APPLIED_COLUMNS = 
ImmutableUniqueList.<Symbol>builder().add(CAS_APPLIED).build();
+    private static final ByteBuffer[][] CAS_SUCCESS_RESULT = new 
ByteBuffer[][] { new ByteBuffer[] {BooleanType.instance.decompose(true)} };
+    private static final ByteBuffer FALSE = 
BooleanType.instance.decompose(false);
+    private static final ByteBuffer[][] CAS_REJECTION_RESULT = new 
ByteBuffer[][] { new ByteBuffer[] {FALSE} };
 
     public final BytesPartitionState.Factory factory;
+    private final EnumSet<KnownIssue> ignoredIssues;
     private final TreeMap<BytesPartitionState.Ref, BytesPartitionState> 
partitions = new TreeMap<>();
+    private long numMutations = 0;
 
     public ASTSingleTableModel(TableMetadata metadata)
+    {
+        this(metadata, EnumSet.noneOf(KnownIssue.class));
+    }
+
+    public ASTSingleTableModel(TableMetadata metadata, EnumSet<KnownIssue> 
ignoredIssues)
     {
         this.factory = new BytesPartitionState.Factory(metadata);
+        this.ignoredIssues = Objects.requireNonNull(ignoredIssues);
     }
 
     public NavigableSet<BytesPartitionState.Ref> partitionKeys()
@@ -191,6 +209,212 @@ public class ASTSingleTableModel
     public void update(Mutation mutation)
     {
         if (!shouldApply(mutation)) return;
+        updateInternal(mutation);
+    }
+
+    public void updateAndValidate(ByteBuffer[][] actual, Mutation mutation)
+    {
+        if (!shouldApply(mutation))
+        {
+            if (mutation.isCas())
+                validateCasNotApplied(actual, mutation);
+            return;
+        }
+        if (mutation.isCas())
+            validate(CAS_APPLIED_COLUMNS, actual, CAS_SUCCESS_RESULT);
+        updateInternal(mutation);
+    }
+
+    private void validateCasNotApplied(ByteBuffer[][] actual, Mutation 
mutation)
+    {
+        // see 
org.apache.cassandra.cql3.statements.ModificationStatement.buildCasFailureResultSet
+        var condition = mutation.casCondition().get();
+        var partition = partitions.get(referencePartition(mutation));
+        var cd = cdOrNull(mutation);
+        BytesPartitionState.Row row = partition == null ? null : 
partition.get(cd);
+        boolean touchesStaticColumns = !factory.staticColumns.isEmpty()
+                                       && 
symbols(mutation).anyMatch(factory.staticColumns::contains);
+        ImmutableUniqueList<Symbol> columns;
+        ByteBuffer[][] expected;
+        if (partition == null)
+        {
+            columns = CAS_APPLIED_COLUMNS;
+            expected = CAS_REJECTION_RESULT;
+        }
+        else if (condition instanceof CasCondition.IfCondition)
+        {
+            if (touchesStaticColumns
+                && cd == null
+                && ignoredIssues.contains(KnownIssue.CAS_ON_STATIC_ROW))
+            {
+                if (casOnStaticRowCouldReturnData(partition))
+                {
+                    // if the static row exists, we can match the col condition
+                    // if the static row doesn't exist, and there are rows, 
then we can return null
+                    List<Symbol> conditionReferencedColumns = 
conditionReferencedColumns(mutation);
+                    columns = 
ImmutableUniqueList.<Symbol>builder(conditionReferencedColumns.size() + 1)
+                                                 .add(CAS_APPLIED)
+                                                 
.addAll(conditionReferencedColumns)
+                                                 .build();
+                    ByteBuffer[] result = getRowAsByteBuffer(columns, 
partition, row);
+                    result[0] = FALSE;
+
+                    expected = new ByteBuffer[][]{ result };
+                }
+                else
+                {
+                    // static/row don't exist, so can't return a current state
+                    columns = CAS_APPLIED_COLUMNS;
+                    expected = CAS_REJECTION_RESULT;
+                }
+            }
+            else if (partition.staticRow().isEmpty()
+                && (cd == null || row == null))
+            {
+                // static/row don't exist, so can't return a current state
+                columns = CAS_APPLIED_COLUMNS;
+                expected = CAS_REJECTION_RESULT;
+            }
+            else
+            {
+                List<Symbol> conditionReferencedColumns = 
conditionReferencedColumns(mutation);
+                columns = 
ImmutableUniqueList.<Symbol>builder(conditionReferencedColumns.size() + 1)
+                                             .add(CAS_APPLIED)
+                                             
.addAll(conditionReferencedColumns)
+                                             .build();
+                ByteBuffer[] result = getRowAsByteBuffer(columns, partition, 
row);
+                result[0] = FALSE;
+
+                expected = new ByteBuffer[][]{ result };
+            }
+        }
+        else if (condition == CasCondition.Simple.Exists)
+        {
+            if (touchesStaticColumns
+                && cd == null
+                && ignoredIssues.contains(KnownIssue.CAS_ON_STATIC_ROW))
+            {
+                if (casOnStaticRowCouldReturnData(partition))
+                {
+                    if (!partition.rows().isEmpty())
+                        row = partition.rows().get(0);
+                    // Partition level IF EXISTS checks if the static row 
exists (which is defined as notEmpty), so its known that the static row is 
empty!
+                    // One would expect that the DELETE just returns 
[[applied]] but it actually returns a row... but we are not working with rows, 
we are working with partitions...
+                    // This is a leaky implementation detail!  Checking for 
the partition to exist is the following ReadCommand:
+                    // SELECT s0, s1 WHERE pk = ? LIMIT 1
+                    // this doesn't include the row columns, only the static 
columns... but the LIMIT returned a row and not
+                    // the static row (because the static row is empty)!
+                    columns = 
ImmutableUniqueList.<Symbol>builder(factory.selectionOrder.size() + 1)
+                                                 .add(CAS_APPLIED)
+                                                 
.addAll(factory.selectionOrder)
+                                                 .build();
+                    ByteBuffer[] result = getRowAsByteBuffer(columns, 
partition, row);
+                    result[0] = FALSE;
+                    if (row != null)
+                    {
+                        for (var c : factory.regularColumns)
+                            // null out the row columns....
+                            result[columns.indexOf(c)] = null;
+                    }
+
+                    expected = new ByteBuffer[][]{ result };
+                }
+                else
+                {
+                    // static/row don't exist, so can't return a current state
+                    columns = CAS_APPLIED_COLUMNS;
+                    expected = CAS_REJECTION_RESULT;
+                }
+            }
+            else if (!touchesStaticColumns || partition.staticRow().isEmpty())
+            {
+                columns = CAS_APPLIED_COLUMNS;
+                expected = CAS_REJECTION_RESULT;
+            }
+            else
+            {
+                columns = 
ImmutableUniqueList.<Symbol>builder(factory.selectionOrder.size() + 1)
+                                             .add(CAS_APPLIED)
+                                             .addAll(factory.selectionOrder)
+                                             .build();
+                ByteBuffer[] result = getRowAsByteBuffer(columns, partition, 
row);
+                result[0] = FALSE;
+
+                expected = new ByteBuffer[][]{ result };
+            }
+        }
+        else if (condition == CasCondition.Simple.NotExists)
+        {
+            if (touchesStaticColumns
+                && cd == null
+                && ignoredIssues.contains(KnownIssue.CAS_ON_STATIC_ROW)
+                && !partition.rows().isEmpty())
+                row = partition.rows().get(0);
+            columns = 
ImmutableUniqueList.<Symbol>builder(factory.selectionOrder.size() + 1)
+                                         .add(CAS_APPLIED)
+                                         .addAll(factory.selectionOrder)
+                                         .build();
+            ByteBuffer[] result = getRowAsByteBuffer(columns, partition, row);
+            result[0] = FALSE;
+            if (!touchesStaticColumns)
+            {
+                for (var s : factory.staticColumns)
+                    result[columns.indexOf(s)] = null;
+            }
+
+            if (cd == null
+                && ignoredIssues.contains(KnownIssue.CAS_ON_STATIC_ROW)
+                && row != null)
+            {
+                for (var c : factory.regularColumns)
+                    // null out the row columns....
+                    result[columns.indexOf(c)] = null;
+            }
+
+            expected = new ByteBuffer[][]{ result };
+        }
+        else
+        {
+            throw new AssertionError();
+        }
+        validate(columns, actual, expected);
+    }
+
+    private static boolean casOnStaticRowCouldReturnData(BytesPartitionState 
partition)
+    {
+        return !partition.staticRow().isEmpty()
+               || !partition.rows().isEmpty();
+    }
+    private List<Symbol> conditionReferencedColumns(Mutation mutation)
+    {
+        //TODO (correctness): does ast.AND support the correct "order" as seen 
from CAS?
+        LinkedHashSet<Symbol> regularCols = null, staticCols = null;
+        for (var c : (Iterable<Symbol>) () -> 
symbols(mutation.casCondition().get()).distinct().iterator())
+        {
+            if (factory.staticColumns.contains(c))
+            {
+                if (staticCols == null)
+                    staticCols = new LinkedHashSet<>();
+                staticCols.add(c);
+            }
+            else
+            {
+                if (regularCols == null)
+                    regularCols = new LinkedHashSet<>();
+                regularCols.add(c);
+            }
+        }
+        List<Symbol> ordered = new ArrayList<>();
+        if (regularCols != null)
+            ordered.addAll(regularCols);
+        if (staticCols != null)
+            ordered.addAll(staticCols);
+        return ordered;
+    }
+
+    private void updateInternal(Mutation mutation)
+    {
+        numMutations++;
         switch (mutation.kind)
         {
             case INSERT:
@@ -209,6 +433,7 @@ public class ASTSingleTableModel
 
     private void update(Mutation.Insert insert)
     {
+        long nowTs = insert.timestampOrDefault(numMutations);
         Clustering<ByteBuffer> pd = pd(insert);
         BytesPartitionState partition = partitions.get(factory.createRef(pd));
         if (partition == null)
@@ -219,25 +444,25 @@ public class ASTSingleTableModel
         Map<Symbol, Expression> values = insert.values;
         if (!factory.staticColumns.isEmpty() && 
!Sets.intersection(factory.staticColumns.asSet(), values.keySet()).isEmpty())
         {
-            // static columns to add in.  If we are doing something like += to 
a row that doesn't exist, we still update statics...
-            Map<Symbol, ByteBuffer> write = new HashMap<>();
-            for (Symbol col : Sets.intersection(factory.staticColumns.asSet(), 
values.keySet()))
-                write.put(col, eval(values.get(col)));
-            partition.setStaticColumns(write);
+            
maybeUpdateColumns(Sets.intersection(factory.staticColumns.asSet(), 
values.keySet()),
+                               partition.staticRow(),
+                               nowTs, values,
+                               partition::setStaticColumns);
         }
         // table has clustering but non are in the write, so only pk/static 
can be updated
         if (!factory.clusteringColumns.isEmpty() && 
Sets.intersection(factory.clusteringColumns.asSet(), values.keySet()).isEmpty())
             return;
-        Map<Symbol, ByteBuffer> write = new HashMap<>();
-        for (Symbol col : Sets.intersection(factory.regularColumns.asSet(), 
values.keySet()))
-            write.put(col, eval(values.get(col)));
-        partition.setColumns(key(insert.values, factory.clusteringColumns),
-                             write,
-                             true);
+        BytesPartitionState finalPartition = partition;
+        var cd = key(insert.values, factory.clusteringColumns);
+        maybeUpdateColumns(Sets.intersection(factory.regularColumns.asSet(), 
values.keySet()),
+                           partition.get(cd),
+                           nowTs, values,
+                           (ts, write) -> finalPartition.setColumns(cd, ts, 
write, true));
     }
 
     private void update(Mutation.Update update)
     {
+        long nowTs = update.timestampOrDefault(numMutations);
         var split = splitOnPartition(update.where.simplify());
         List<Clustering<ByteBuffer>> pks = split.left;
         List<Conditional> remaining = split.right;
@@ -252,43 +477,30 @@ public class ASTSingleTableModel
             Map<Symbol, Expression> set = update.set;
             if (!factory.staticColumns.isEmpty() && 
!Sets.intersection(factory.staticColumns.asSet(), set.keySet()).isEmpty())
             {
-                // static columns to add in.  If we are doing something like 
+= to a row that doesn't exist, we still update statics...
-                Map<Symbol, ByteBuffer> write = new HashMap<>();
-                for (Symbol col : 
Sets.intersection(factory.staticColumns.asSet(), set.keySet()))
-                {
-                    ByteBuffer current = partition.staticRow().get(col);
-                    EvalResult result = eval(col, current, set.get(col));
-                    if (result.kind == EvalResult.Kind.SKIP) continue;
-                    write.put(col, result.value);
-                }
-                if (!write.isEmpty())
-                    partition.setStaticColumns(write);
+                
maybeUpdateColumns(Sets.intersection(factory.staticColumns.asSet(), 
set.keySet()),
+                                   partition.staticRow(),
+                                   nowTs, set,
+                                   partition::setStaticColumns);
             }
             // table has clustering but non are in the write, so only 
pk/static can be updated
             if (!factory.clusteringColumns.isEmpty() && remaining.isEmpty())
                 return;
+            BytesPartitionState finalPartition = partition;
             for (Clustering<ByteBuffer> cd : clustering(remaining))
             {
-                Map<Symbol, ByteBuffer> write = new HashMap<>();
-                for (Symbol col : 
Sets.intersection(factory.regularColumns.asSet(), set.keySet()))
-                {
-                    ByteBuffer current = partition.get(cd, col);
-                    EvalResult result = eval(col, current, set.get(col));
-                    if (result.kind == EvalResult.Kind.SKIP) continue;
-                    write.put(col, result.value);
-                }
-
-                if (!write.isEmpty())
-                    partition.setColumns(cd, write, false);
+                
maybeUpdateColumns(Sets.intersection(factory.regularColumns.asSet(), 
set.keySet()),
+                                   partition.get(cd),
+                                   nowTs, set,
+                                   (ts, write) -> 
finalPartition.setColumns(cd, ts, write, false));
             }
         }
     }
 
     private enum DeleteKind
     {PARTITION, ROW, COLUMN}
-
     private void update(Mutation.Delete delete)
     {
+        long nowTs = delete.timestampOrDefault(numMutations);
         //TODO (coverage): range deletes
         var split = splitOnPartition(delete.where.simplify());
         List<Clustering<ByteBuffer>> pks = split.left;
@@ -313,7 +525,7 @@ public class ASTSingleTableModel
                 case ROW:
                     for (Clustering<ByteBuffer> cd : clusterings)
                     {
-                        partition.deleteRow(cd);
+                        partition.deleteRow(cd, nowTs);
                         if (partition.shouldDelete())
                             partitions.remove(partition.ref());
                     }
@@ -321,7 +533,7 @@ public class ASTSingleTableModel
                 case COLUMN:
                     if (clusterings.isEmpty())
                     {
-                        partition.deleteStaticColumns(columns);
+                        partition.deleteStaticColumns(nowTs, columns);
                         if (partition.shouldDelete())
                             partitions.remove(partition.ref());
                     }
@@ -329,7 +541,7 @@ public class ASTSingleTableModel
                     {
                         for (Clustering<ByteBuffer> cd : clusterings)
                         {
-                            partition.deleteColumns(cd, columns);
+                            partition.deleteColumns(cd, nowTs, columns);
                             if (partition.shouldDelete())
                                 partitions.remove(partition.ref());
                         }
@@ -341,54 +553,68 @@ public class ASTSingleTableModel
         }
     }
 
+    private static void maybeUpdateColumns(Set<Symbol> columns,
+                                           @Nullable BytesPartitionState.Row 
row,
+                                           long nowTs, Map<Symbol, Expression> 
set,
+                                           ColumnUpdate update)
+    {
+        if (columns.isEmpty())
+        {
+            update.update(nowTs, Collections.emptyMap());
+            return;
+        }
+        // static columns to add in.  If we are doing something like += to a 
row that doesn't exist, we still update statics...
+        Map<Symbol, ByteBuffer> write = new HashMap<>();
+        for (Symbol col : columns)
+        {
+            ByteBuffer current = row == null ? null : row.get(col);
+            EvalResult result = eval(col, current, set.get(col));
+            if (result.kind == EvalResult.Kind.SKIP) continue;
+            write.put(col, result.value);
+        }
+        if (!write.isEmpty())
+            update.update(nowTs, write);
+    }
+
     public boolean shouldApply(Mutation mutation)
     {
         if (!mutation.isCas()) return true;
         return shouldApply(mutation, selectPartitionForCAS(mutation));
     }
 
-    private SelectResult selectPartitionForCAS(Mutation mutation)
+    private CasContext selectPartitionForCAS(Mutation mutation)
     {
-        var partition = partitions.get(factory.createRef(pd(mutation)));
-        if (partition == null) return 
SelectResult.ordered(factory.selectionOrder, NO_ROWS);
-
-        var cd = cdOrNull(mutation);
-        var row = cd == null ? null : partition.get(cd);
-        ImmutableUniqueList<Symbol> columns = cd != null ? 
factory.selectionOrder : factory.partitionAndStaticColumns;
-        return SelectResult.ordered(columns, new ByteBuffer[][] { 
getRowAsByteBuffer(columns, partition, row)});
+        BytesPartitionState.Ref ref = referencePartition(mutation);
+        Clustering<ByteBuffer> cd = cdOrNull(mutation);
+        BytesPartitionState partition = partitions.get(ref);
+        return new CasContext(ref, cd, partition);
     }
 
-    private boolean shouldApply(Mutation mutation, SelectResult current)
+    private boolean shouldApply(Mutation mutation, CasContext ctx)
     {
         Preconditions.checkArgument(mutation.isCas());
         // process condition
-        CasCondition condition;
-        switch (mutation.kind)
-        {
-            case INSERT:
-                condition = CasCondition.Simple.NotExists;
-                break;
-            case UPDATE:
-                condition = ((Mutation.Update) mutation).casCondition.get();
-                break;
-            case DELETE:
-                condition = ((Mutation.Delete) mutation).casCondition.get();
-                break;
-            default:
-                throw new UnsupportedOperationException(mutation.kind.name());
-        }
+        CasCondition condition = mutation.casCondition().get();
+        boolean partitionOrRow = ctx.clustering == null;
+        boolean partitionKnown = ctx.partition != null;
+        BytesPartitionState.Row row = partitionKnown && !partitionOrRow
+                                      ? ctx.partition.get(ctx.clustering)
+                                      : null;
         if (condition instanceof CasCondition.Simple)
         {
-            boolean hasPartition = current.rows.length > 0;
-            boolean partitionOrRow = 
current.columns.equals(factory.partitionAndStaticColumns);
-            boolean hasRow = partitionOrRow ? hasPartition : 
current.isAllDefined(factory.clusteringColumns);
+            if (partitionOrRow && factory.staticColumns.isEmpty())
+                throw new AssertionError("Attempted to create a EXISTS 
condition on partition without static columns; " + mutation.toCQL());
+            // CAS's definition of partition EXISTS isn't based off the 
partition existing, its based off the static row
+            // existing (aka at least 1 static column exists and is not null).
+            boolean hasPartition = partitionKnown && 
!ctx.partition.staticRow().isEmpty();
+            boolean hasRow = row != null; // don't do !isEmpty here as 
liveness dictates the existence of a row.  If you INSERT a row then delete all 
its columns, it still exists!
             var simple = (CasCondition.Simple) condition;
             switch (simple)
             {
                 case Exists:
-                    return hasRow;
+                    return partitionOrRow ? hasPartition : hasRow;
                 case NotExists:
-                    return !hasRow;
+                    return partitionOrRow ? !hasPartition : !hasRow;
                 default:
                     throw new UnsupportedOperationException(simple.name());
             }
@@ -396,6 +622,11 @@ public class ASTSingleTableModel
         var ifCondition = (CasCondition.IfCondition) condition;
         String letRow = "row";
         Symbol rowSymbol = Symbol.unknownType(letRow);
+        ImmutableUniqueList<Symbol> columns = partitionOrRow ? 
factory.partitionAndStaticColumns : factory.selectionOrder;
+        SelectResult current = SelectResult.ordered(columns,
+                                                    partitionKnown
+                                                    ? new ByteBuffer[][] { 
getRowAsByteBuffer(columns, ctx.partition, row)}
+                                                    : NO_ROWS);
         Map<String, SelectResult> lets = Map.of(letRow, current);
         // point the columns to be row.column that way it matches LET clause 
in BEGIN TRANSACTION, allowing better reuse
         var updatedCondition = ifCondition.conditional.visit(new Visitor()
@@ -410,6 +641,11 @@ public class ASTSingleTableModel
         return process(updatedCondition, lets);
     }
 
+    public BytesPartitionState.Ref referencePartition(Mutation mutation)
+    {
+        return factory.createRef(pd(mutation));
+    }
+
     private boolean process(Conditional condition, Map<String, SelectResult> 
lets)
     {
         if (condition.getClass() == Conditional.Is.class)
@@ -531,7 +767,7 @@ public class ASTSingleTableModel
     private Pair<List<Clustering<ByteBuffer>>, List<Conditional>> 
splitOn(ImmutableUniqueList<Symbol>.AsSet columns, List<Conditional> 
conditionals)
     {
         // pk requires equality
-        Map<Symbol, Set<ByteBuffer>> pks = new HashMap<>();
+        Map<Symbol, List<ByteBuffer>> pks = new HashMap<>();
         List<Conditional> other = new ArrayList<>();
         for (Conditional c : conditionals)
         {
@@ -544,7 +780,7 @@ public class ASTSingleTableModel
                     ByteBuffer bb = eval(w.rhs);
                     if (pks.containsKey(col))
                         throw new IllegalArgumentException("Partition column " 
+ col + " was defined multiple times in the WHERE clause");
-                    pks.put(col, Collections.singleton(bb));
+                    pks.put(col, Collections.singletonList(bb));
                 }
                 else
                 {
@@ -559,8 +795,8 @@ public class ASTSingleTableModel
                     Symbol col = (Symbol) i.ref;
                     if (pks.containsKey(col))
                         throw new IllegalArgumentException("Partition column " 
+ col + " was defined multiple times in the WHERE clause");
-                    var set = 
i.expressions.stream().map(ASTSingleTableModel::eval).collect(Collectors.toSet());
-                    pks.put(col, set);
+                    var list = 
i.expressions.stream().map(ASTSingleTableModel::eval).collect(Collectors.toList());
+                    pks.put(col, list);
                 }
                 else
                 {
@@ -582,19 +818,51 @@ public class ASTSingleTableModel
         return Pair.create(partitionKeys, other);
     }
 
-    private List<Clustering<ByteBuffer>> keys(Collection<Symbol> columns, 
Map<Symbol, Set<ByteBuffer>> pks)
+    private static ImmutableUniqueList<Clustering<ByteBuffer>> 
keys(Collection<Symbol> columns, Map<Symbol, List<ByteBuffer>> columnValues)
     {
-        //TODO (coverage): handle IN
-        ByteBuffer[] bbs = new ByteBuffer[columns.size()];
+        return keys(columns, columnValues, Function.identity());
+    }
+
+    private static ImmutableUniqueList<Clustering<ByteBuffer>> 
keys(Map<Symbol, List<? extends Expression>> values, Collection<Symbol> columns)
+    {
+        return keys(columns, values, ASTSingleTableModel::eval);
+    }
+
+    private static <T> ImmutableUniqueList<Clustering<ByteBuffer>> 
keys(Collection<Symbol> columns,
+                                                                        
Map<Symbol, ? extends List<? extends T>> columnValues,
+                                                                        
Function<T, ByteBuffer> eval)
+    {
+        if (columns.isEmpty()) return ImmutableUniqueList.empty();
+        List<ByteBuffer[]> current = new ArrayList<>();
+        current.add(new ByteBuffer[columns.size()]);
         int idx = 0;
-        for (Symbol s : columns)
+        for (Symbol symbol : columns)
         {
-            Set<ByteBuffer> values = pks.get(s);
-            if (values.size() > 1)
-                throw new UnsupportedOperationException("IN clause is 
currently unsupported... its on the backlog!");
-            bbs[idx++] = Iterables.getFirst(values, null);
+            int position = idx++;
+            List<? extends T> expressions = columnValues.get(symbol);
+            ByteBuffer firstBB = eval.apply(expressions.get(0));
+            current.forEach(bbs -> bbs[position] = firstBB);
+            if (expressions.size() > 1)
+            {
+                // this has a multiplying effect... if there is 1 row and 
there are 2 expressions, then we have 2 rows
+                // if there are 2 rows and 2 expressions, we have 4 rows... 
and so on...
+                List<ByteBuffer[]> copy = new ArrayList<>(current);
+                for (int i = 1; i < expressions.size(); i++)
+                {
+                    ByteBuffer bb = eval.apply(expressions.get(i));
+                    for (ByteBuffer[] bbs : copy)
+                    {
+                        bbs = bbs.clone();
+                        bbs[position] = bb;
+                        current.add(bbs);
+                    }
+                }
+            }
         }
-        return Collections.singletonList(BufferClustering.make(bbs));
+        var builder = ImmutableUniqueList.<Clustering<ByteBuffer>>builder();
+        for (var row : current)
+            builder.add(new BufferClustering(row));
+        return builder.build();
     }
 
     private Clustering<ByteBuffer> pd(Mutation mutation)
@@ -683,6 +951,18 @@ public class ASTSingleTableModel
 
     public void validate(ByteBuffer[][] actual, Select select)
     {
+        if (select.source.isEmpty())
+            throw new AssertionError("SELECT without a FROM only allowed in a 
BEGIN TRANSACTION");
+        {
+            var ref = select.source.get();
+            if (ref.keyspace.isPresent())
+            {
+                if (!factory.metadata.keyspace.equals(ref.keyspace.get()))
+                    throw new AssertionError("Incorrect keyspace: expected " + 
factory.metadata.keyspace + " but given " + ref.keyspace.get());
+            }
+            if (!factory.metadata.name.equals(ref.name))
+                throw new AssertionError("Incorrect table: expected " + 
factory.metadata.name + " but given " + ref.name);
+        }
         SelectResult results = getRowsAsByteBuffer(select);
         try
         {
@@ -692,7 +972,7 @@ public class ASTSingleTableModel
             }
             else
             {
-                validate(actual, results.rows);
+                validate(results.columns, actual, results.rows);
             }
         }
         catch (AssertionError e)
@@ -704,13 +984,19 @@ public class ASTSingleTableModel
         }
     }
 
-    public void validate(ByteBuffer[][] actual, ByteBuffer[][] expected)
-    {
-        validate(factory.selectionOrder, actual, expected);
-    }
-
     private static void validate(ImmutableUniqueList<Symbol> columns, 
ByteBuffer[][] actual, ByteBuffer[][] expected)
     {
+        int expectedLength = columns.size();
+        for (var a : actual)
+        {
+            if (a.length != expectedLength)
+                throw new AssertionError("actual rows do not match the schema 
" + columns + "; found " + Arrays.toString(a));
+        }
+        for (var e : expected)
+        {
+            if (e.length != expectedLength)
+                throw new AssertionError("expected rows do not match the 
schema " + columns + "; found " + Arrays.toString(e));
+        }
         // check any order
         validateAnyOrder(columns, toRow(columns, actual), toRow(columns, 
expected));
         // all rows match, but are they in the right order?
@@ -722,27 +1008,9 @@ public class ASTSingleTableModel
         var unexpected = Sets.difference(actual, expected);
         var missing = Sets.difference(expected, actual);
         StringBuilder sb = null;
-        if (!unexpected.isEmpty())
-        {
-            sb = new StringBuilder();
-            sb.append("Unexpected rows found:\n").append(table(columns, 
unexpected));
-        }
-
-        if (!missing.isEmpty())
-        {
-            if (sb == null)
-            {
-                sb = new StringBuilder();
-            }
-            else
-            {
-                sb.append('\n');
-            }
-            if (actual.isEmpty()) sb.append("No rows returned");
-            else sb.append("Missing rows:\n").append(table(columns, missing));
-        }
         if (!unexpected.isEmpty() && unexpected.size() == missing.size())
         {
+            sb = new StringBuilder();
             // good chance a column differs
             StringBuilder finalSb = sb;
             Runnable runOnce = new Runnable()
@@ -780,6 +1048,35 @@ public class ASTSingleTableModel
                 sb.append(table(eSmall.columns, Arrays.asList(eSmall, 
aSmall)));
             }
         }
+        else
+        {
+            if (!unexpected.isEmpty())
+            {
+                if (sb == null)
+                {
+                    sb = new StringBuilder();
+                }
+                else
+                {
+                    sb.append('\n');
+                }
+                sb.append("Unexpected rows found:\n").append(table(columns, 
unexpected));
+            }
+
+            if (!missing.isEmpty())
+            {
+                if (sb == null)
+                {
+                    sb = new StringBuilder();
+                }
+                else
+                {
+                    sb.append('\n');
+                }
+                if (actual.isEmpty()) sb.append("No rows returned");
+                else sb.append("Missing rows:\n").append(table(columns, 
missing));
+            }
+        }
         if (sb != null)
         {
             sb.append("\nExpected:\n").append(table(columns, expected));
@@ -847,6 +1144,22 @@ public class ASTSingleTableModel
         return set;
     }
 
+    private static class CasContext
+    {
+        private final BytesPartitionState.Ref ref;
+        @Nullable
+        private final Clustering<ByteBuffer> clustering;
+        @Nullable
+        private final BytesPartitionState partition;
+
+        private CasContext(BytesPartitionState.Ref ref, @Nullable 
Clustering<ByteBuffer> clustering, @Nullable BytesPartitionState partition)
+        {
+            this.ref = ref;
+            this.clustering = clustering;
+            this.partition = partition;
+        }
+    }
+
     private static class SelectResult
     {
         private final ImmutableUniqueList<Symbol> columns;
@@ -885,17 +1198,47 @@ public class ASTSingleTableModel
         }
     }
 
-    public ImmutableUniqueList<Symbol> columns(Select select)
+    private ImmutableUniqueList<Symbol> columns(Select select)
     {
         if (select.selections.isEmpty()) return factory.selectionOrder;
-        throw new UnsupportedOperationException("Getting columns from select 
other than SELECT * is currently not supported");
+        var builder = ImmutableUniqueList.<Symbol>builder();
+        for (var e : select.selections)
+        {
+            if (!(e instanceof Symbol))
+                throw new UnsupportedOperationException("Only column selection 
is currently supported");
+            builder.add((Symbol) e);
+        }
+        return builder.build();
+    }
+
+    private static ByteBuffer[][] filter(ByteBuffer[][] rows, 
ImmutableUniqueList<Symbol> actualOrder, ImmutableUniqueList<Symbol> 
targetOrder)
+    {
+        if (actualOrder.equals(targetOrder)) return rows;
+        if (rows.length == 0) return rows;
+        if (!actualOrder.containsAll(targetOrder))
+            throw new UnsupportedOperationException("Only column selection is 
currently supported");
+        ByteBuffer[][] result = new ByteBuffer[rows.length][];
+        for (int i = 0; i < rows.length; i++)
+        {
+            ByteBuffer[] actual = rows[i];
+            ByteBuffer[] target = new ByteBuffer[targetOrder.size()];
+            for (int j = 0; j < targetOrder.size(); j++)
+            {
+                Symbol col = targetOrder.get(j);
+                int actualIndex = actualOrder.indexOf(col);
+                target[j] = actual[actualIndex];
+            }
+            result[i] = target;
+        }
+        return result;
     }
 
     private SelectResult getRowsAsByteBuffer(Select select)
     {
-        ImmutableUniqueList<Symbol> columns = columns(select);
+        ImmutableUniqueList<Symbol> selectOrder = factory.selectionOrder;
+        ImmutableUniqueList<Symbol> targetOrder = columns(select);
         if (select.where.isEmpty())
-            return SelectResult.ordered(columns, 
getRowsAsByteBuffer(applyLimits(all(), select.perPartitionLimit, 
select.limit)));
+            return SelectResult.ordered(targetOrder, 
filter(getRowsAsByteBuffer(applyLimits(all(), select.perPartitionLimit, 
select.limit)), selectOrder, targetOrder));
         LookupContext ctx = context(select);
         List<PrimaryKey> primaryKeys;
         if (ctx.unmatchable)
@@ -923,7 +1266,7 @@ public class ASTSingleTableModel
         }
         primaryKeys = applyLimits(primaryKeys, select.perPartitionLimit, 
select.limit);
         //TODO (correctness): now that we have the rows we need to handle the 
selections/aggregation/limit/group-by/etc.
-        return new SelectResult(columns, getRowsAsByteBuffer(primaryKeys), 
ctx.unordered);
+        return new SelectResult(targetOrder, 
filter(getRowsAsByteBuffer(primaryKeys), selectOrder, targetOrder), 
ctx.unordered);
     }
 
     private List<PrimaryKey> applyLimits(List<PrimaryKey> primaryKeys, 
Optional<Value> perPartitionLimitOpt, Optional<Value> limitOpt)
@@ -1199,37 +1542,6 @@ public class ASTSingleTableModel
         return keys.get(0);
     }
 
-    private List<Clustering<ByteBuffer>> keys(Map<Symbol, List<? extends 
Expression>> values, ImmutableUniqueList<Symbol> columns)
-    {
-        if (columns.isEmpty()) return 
Collections.singletonList(Clustering.EMPTY);
-        List<ByteBuffer[]> current = new ArrayList<>();
-        current.add(new ByteBuffer[columns.size()]);
-        for (Symbol symbol : columns)
-        {
-            int position = columns.indexOf(symbol);
-            List<? extends Expression> expressions = values.get(symbol);
-            ByteBuffer firstBB = eval(expressions.get(0));
-            current.forEach(bbs -> bbs[position] = firstBB);
-            if (expressions.size() > 1)
-            {
-                // this has a multiplying effect... if there is 1 row and 
there are 2 expressions, then we have 2 rows
-                // if there are 2 rows and 2 expressions, we have 4 rows... 
and so on...
-                List<ByteBuffer[]> copy = new ArrayList<>(current);
-                for (int i = 1; i < expressions.size(); i++)
-                {
-                    ByteBuffer bb = eval(expressions.get(i));
-                    for (ByteBuffer[] bbs : copy)
-                    {
-                        bbs = bbs.clone();
-                        bbs[position] = bb;
-                        current.add(bbs);
-                    }
-                }
-            }
-        }
-        return 
current.stream().map(BufferClustering::new).collect(Collectors.toList());
-    }
-
     private static class EvalResult
     {
         private static final EvalResult SKIP = new EvalResult(Kind.SKIP, null);
@@ -1393,6 +1705,22 @@ public class ASTSingleTableModel
             maybeNormalizeTokenBounds();
         }
 
+        private LookupContext(Mutation mutation)
+        {
+            if (mutation.kind == Mutation.Kind.INSERT)
+            {
+                var insert = mutation.asInsert();
+                for (var e : insert.values.entrySet())
+                    eq.put(e.getKey(), 
Collections.singletonList(e.getValue()));
+            }
+            else
+            {
+                addConditional(mutation.kind == Mutation.Kind.UPDATE
+                               ? mutation.asUpdate().where
+                               : mutation.asDelete().where);
+            }
+        }
+
         private void maybeNormalizeTokenBounds()
         {
             if (tokenLowerBound != null && tokenUpperBound != null)
@@ -1685,4 +2013,9 @@ public class ASTSingleTableModel
             this.token = token;
         }
     }
+
+    private interface ColumnUpdate
+    {
+        void update(long nowTs, Map<Symbol, ByteBuffer> write);
+    }
 }
diff --git 
a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModelTest.java 
b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModelTest.java
index a04425f827..3bdf742ce8 100644
--- 
a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModelTest.java
+++ 
b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModelTest.java
@@ -651,17 +651,102 @@ public class ASTSingleTableModelTest
         model.validate(rows(row(metadata, 0, List.of(42, 42), Set.of(0, 42), 
Map.of(42, 0), List.of(42, 42), Set.of(0, 42), Map.of(42, 0))), 
Select.builder(metadata).build());
     }
 
+    @Test
+    public void insertEmptyRow()
+    {
+        TableMetadata metadata = defaultTable()
+                                 .addPartitionKeyColumn("pk", 
Int32Type.instance)
+                                 .addStaticColumn("s", Int32Type.instance)
+                                 .addClusteringColumn("ck", Int32Type.instance)
+                                 .addRegularColumn("r", Int32Type.instance)
+                                 .build();
+        ASTSingleTableModel model = new ASTSingleTableModel(metadata);
+
+        model.update(Mutation.insert(metadata)
+                             .value("pk", 0)
+                             .value("s", 0)
+                             .value("ck", 0)
+                             .build());
+        model.validate(rows(row(metadata, 0, 0, 0, null)), 
Select.builder(metadata).build());
+    }
+
+    @Test
+    public void updateEmptyRow()
+    {
+        TableMetadata metadata = defaultTable()
+                                 .addPartitionKeyColumn("pk", 
Int32Type.instance)
+                                 .addStaticColumn("s", Int32Type.instance)
+                                 .addClusteringColumn("ck", Int32Type.instance)
+                                 .addRegularColumn("r", Int32Type.instance)
+                                 .build();
+        ASTSingleTableModel model = new ASTSingleTableModel(metadata);
+
+        model.update(Mutation.update(metadata)
+                             .set("s", 0)
+                             .value("pk", 0)
+                             .value("ck", 0)
+                             .build());
+        model.validate(rows(row(metadata, 0, null, 0, null)), 
Select.builder(metadata).build());
+    }
+
+    @Test
+    public void deleteColumnUpdateDoesntHavePartitionState()
+    {
+        TableMetadata metadata = defaultTable()
+                                 .addPartitionKeyColumn("pk", 
Int32Type.instance)
+                                 .addStaticColumn("s", Int32Type.instance)
+                                 .addClusteringColumn("ck", Int32Type.instance)
+                                 .addRegularColumn("r", 
ListType.getInstance(Int32Type.instance, true))
+                                 .build();
+        ASTSingleTableModel model = new ASTSingleTableModel(metadata);
+
+        model.update(Mutation.update(metadata)
+                             .set("r", List.of(0))
+                             .set("s", 0)
+                             .value("pk", 0)
+                             .value("ck", 0)
+                             .build());
+        model.update(Mutation.update(metadata)
+                             .set("r", List.of(1))
+                             .value("pk", 0)
+                             .value("ck", 1)
+                             .build());
+        model.validate(rows(row(metadata, 0, 0, 0, List.of(0)),
+                            row(metadata, 0, 1, 0, List.of(1))), 
Select.builder(metadata).build());
+
+        model.update(Mutation.delete(metadata)
+                             .columns("r", "s")
+                             .value("pk", 0)
+                             .value("ck", 0)
+                             .build());
+        model.validate(rows(row(metadata, 0, 1, null, List.of(1))), 
Select.builder(metadata).build());
+    }
+
+    private interface SimpleWrite<T>
+    {
+        void write(String name, T value, long ts);
+    }
+
     private static ByteBuffer[][] rows(ByteBuffer[]... rows)
     {
         return rows;
     }
 
+    private static ByteBuffer[] row(ByteBuffer... values)
+    {
+        return values;
+    }
+
     private static ByteBuffer[] row(TableMetadata metadata, Object... values)
     {
         ByteBuffer[] row = new ByteBuffer[values.length];
         var it = metadata.allColumnsInSelectOrder();
         for (int i = 0; i < values.length && it.hasNext(); i++)
-            row[i] = it.next().type.decomposeUntyped(values[i]);
+        {
+            ColumnMetadata column = it.next();
+            Object value = values[i];
+            row[i] = value == null ? null : 
column.type.decomposeUntyped(value);
+        }
         return row;
     }
 
diff --git 
a/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java 
b/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
index c2d18e573d..70988e7801 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.harry.model;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Comparator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -32,6 +32,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Sets;
+
 import org.apache.cassandra.cql3.ast.Symbol;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ClusteringComparator;
@@ -44,6 +46,7 @@ import org.apache.cassandra.harry.gen.ValueGenerators;
 import org.apache.cassandra.harry.util.BitSet;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FastByteOperations;
 import org.apache.cassandra.utils.ImmutableUniqueList;
 
@@ -62,31 +65,36 @@ public class BytesPartitionState
         this.state = factory.partitionState(key);
     }
 
-    public void deleteRow(Clustering<ByteBuffer> clustering)
+    public void deleteRow(Clustering<ByteBuffer> clustering, long ts)
     {
         long cd = factory.clusteringCache.deflateOrUndefined(clustering);
         if (MagicConstants.UNSET_DESCR == cd)
             return;
-        state.delete(cd, MagicConstants.NO_TIMESTAMP);
+        deleteRow(cd, ts);
     }
 
-    public void deleteColumns(Clustering<ByteBuffer> clustering, Set<Symbol> 
columns)
+    private void deleteRow(long cd, long ts)
+    {
+        state.delete(cd, ts);
+    }
+
+    public void deleteColumns(Clustering<ByteBuffer> clustering, long ts, 
Set<Symbol> columns)
     {
         long cd = factory.clusteringCache.deflateOrUndefined(clustering);
         if (cd != MagicConstants.UNSET_DESCR)
         {
             BitSet regularColumns = bitset(columns, true);
             if (!regularColumns.allUnset())
-                state.deleteRegularColumns(MagicConstants.NO_TIMESTAMP, cd, 
regularColumns);
+                state.deleteRegularColumns(ts, cd, regularColumns);
         }
-        deleteStaticColumns(columns);
+        deleteStaticColumns(ts, columns);
     }
 
-    public void deleteStaticColumns(Set<Symbol> columns)
+    public void deleteStaticColumns(long ts, Set<Symbol> columns)
     {
         BitSet staticColumns = bitset(columns, false);
         if (!staticColumns.allUnset())
-            state.deleteStaticColumns(MagicConstants.NO_TIMESTAMP, 
staticColumns);
+            state.deleteStaticColumns(ts, staticColumns);
     }
 
     private BitSet bitset(Set<Symbol> columns, boolean regular)
@@ -109,28 +117,27 @@ public class BytesPartitionState
 
     public PrimaryKey partitionRowRef()
     {
-        return new PrimaryKey(ref(), null);
+        return new PrimaryKey(factory, ref(), null);
     }
 
-    public void setStaticColumns(Map<Symbol, ByteBuffer> values)
+    public void setStaticColumns(long ts, Map<Symbol, ByteBuffer> values)
     {
         if (factory.staticColumns.isEmpty() || values.isEmpty())
             throw new IllegalStateException("Attempt to write to static 
columns; but they do not exist");
-        long[] sds = toDescriptor(factory.staticColumns, values);
-        state.writeStatic(sds, MagicConstants.NO_TIMESTAMP);
+
+        state.writeStatic(toDescriptor(factory.staticColumns, values), ts);
     }
 
-    public void setColumns(Clustering<ByteBuffer> clustering, Map<Symbol, 
ByteBuffer> values, boolean writePrimaryKeyLiveness)
+    public void setColumns(Clustering<ByteBuffer> clustering, long ts, 
Map<Symbol, ByteBuffer> values, boolean writePrimaryKeyLiveness)
     {
         long cd = factory.clusteringCache.deflate(clustering);
-        long[] vds = toDescriptor(factory.regularColumns, values);
-        state.writeRegular(cd, vds, MagicConstants.NO_TIMESTAMP, 
writePrimaryKeyLiveness);
+        state.writeRegular(cd, toDescriptor(factory.regularColumns, values), 
ts, writePrimaryKeyLiveness);
 
         // UDT's have the ability to "update" that triggers a delete; this 
allows creating an "empty" row.
         // When an empty row exists without liveness info, then purge the row
         var row = state.rows.get(cd);
         if (row.isEmpty() && !row.hasPrimaryKeyLivenessInfo)
-            state.delete(cd, MagicConstants.NO_TIMESTAMP);
+            deleteRow(cd, ts);
     }
 
     private long[] toDescriptor(ImmutableUniqueList<Symbol> positions, 
Map<Symbol, ByteBuffer> values)
@@ -200,6 +207,8 @@ public class BytesPartitionState
     @Nullable
     public Row get(Clustering<ByteBuffer> clustering)
     {
+        if (clustering == Clustering.STATIC_CLUSTERING)
+            return staticRow();
         long cd = factory.clusteringCache.deflateOrUndefined(clustering);
         if (cd == MagicConstants.UNSET_DESCR)
             return null;
@@ -216,6 +225,12 @@ public class BytesPartitionState
         return row == null ? null : row.get(column);
     }
 
+    public long timestamp(Clustering<ByteBuffer> clustering, Symbol column)
+    {
+        Row row = get(clustering);
+        return row == null ? MagicConstants.NO_TIMESTAMP : 
row.timestamp(column);
+    }
+
     private Row toRow(PartitionState.RowState rowState)
     {
         Clustering<ByteBuffer> clustering;
@@ -230,10 +245,10 @@ public class BytesPartitionState
             clustering = factory.clusteringCache.inflate(rowState.cd);
             values = fromDescriptor(factory.regularColumns, rowState.vds);
         }
-        return new Row(clustering, values);
+        return new Row(clustering, values, rowState.lts);
     }
 
-    public Collection<Row> rows()
+    public List<Row> rows()
     {
         return 
state.rows().values().stream().map(this::toRow).collect(Collectors.toList());
     }
@@ -281,18 +296,26 @@ public class BytesPartitionState
             sb.append(')');
     }
 
-    public class PrimaryKey implements Comparable<PrimaryKey>
+    public static class PrimaryKey implements Comparable<PrimaryKey>
     {
+        private final Factory factory;
         public final BytesPartitionState.Ref partition;
         @Nullable
         public final Clustering<ByteBuffer> clustering;
 
-        public PrimaryKey(BytesPartitionState.Ref partition, @Nullable 
Clustering<ByteBuffer> clustering)
+        private PrimaryKey(Factory factory, BytesPartitionState.Ref partition, 
@Nullable Clustering<ByteBuffer> clustering)
         {
+            this.factory = factory;
             this.partition = partition;
             this.clustering = clustering;
         }
 
+        public boolean isPartitionLevel()
+        {
+            return clustering == null                       // has clustering, 
but only referencing partition
+                   || Clustering.EMPTY.equals(clustering);  // doesn't have 
clustering
+        }
+
         @Override
         public int compareTo(PrimaryKey o)
         {
@@ -324,7 +347,8 @@ public class BytesPartitionState
             StringBuilder sb = new StringBuilder("(partition=");
             sb.append(partition);
             sb.append(", clustering=");
-            appendValues(sb, factory.clusteringColumns, clustering);
+            if (clustering == null) sb.append("null");
+            else                    appendValues(sb, 
factory.clusteringColumns, clustering);
             sb.append(')');
             return sb.toString();
         }
@@ -415,12 +439,22 @@ public class BytesPartitionState
         public final Clustering<ByteBuffer> clustering;
         private final ImmutableUniqueList<Symbol> columnNames;
         private final ByteBuffer[] columns;
+        private final long[] lts;
 
-        private Row(Clustering<ByteBuffer> clustering, ByteBuffer[] columns)
+        private Row(Clustering<ByteBuffer> clustering, ByteBuffer[] columns, 
long[] lts)
         {
             this.clustering = clustering;
             this.columnNames = clustering == Clustering.STATIC_CLUSTERING ? 
factory.staticColumns : factory.regularColumns;
             this.columns = columns;
+            this.lts = lts;
+        }
+
+        private Row(Clustering<ByteBuffer> clustering, 
ImmutableUniqueList<Symbol> columnNames, ByteBuffer[] columns, long[] lts)
+        {
+            this.clustering = clustering;
+            this.columnNames = columnNames;
+            this.columns = columns;
+            this.lts = lts;
         }
 
         public ByteBuffer get(Symbol col)
@@ -433,15 +467,50 @@ public class BytesPartitionState
             return columns[offset];
         }
 
+        public long timestamp(Symbol col)
+        {
+            return lts[columnNames.indexOf(col)];
+        }
+
+        public long timestamp(int offset)
+        {
+            return lts[offset];
+        }
+
         public PrimaryKey ref()
         {
-            return new PrimaryKey(BytesPartitionState.this.ref(), clustering);
+            return new PrimaryKey(factory, BytesPartitionState.this.ref(), 
clustering);
         }
 
         public boolean isEmpty()
         {
             return Stream.of(columns).allMatch(b -> b == null );
         }
+
+        public Row select(List<Symbol> selection)
+        {
+            if (columnNames.equals(selection)) return this;
+            selection = validateSelect(selection);
+            ByteBuffer[] selected = new ByteBuffer[selection.size()];
+            ImmutableUniqueList.Builder<Symbol> names = 
ImmutableUniqueList.builder(selected.length);
+            for (int i = 0; i < selection.size(); i++)
+            {
+                Symbol col = selection.get(i);
+                selected[i] = columns[columnNames.indexOf(col)];
+                names.add(col);
+            }
+
+            return new Row(clustering, names.build(), selected, lts);
+        }
+
+        private List<Symbol> validateSelect(List<Symbol> selection)
+        {
+            LinkedHashSet<Symbol> uniqueSelection = new 
LinkedHashSet<>(selection);
+            var unknown = Sets.difference(uniqueSelection, 
columnNames.asSet());
+            if (!unknown.isEmpty())
+                throw new AssertionError("Unable to select columns " + 
selection + "; has unknown columns " + unknown);
+            return uniqueSelection.size() == selection.size() ? selection : 
new ArrayList<>(uniqueSelection);
+        }
     }
 
     public static class Factory
@@ -452,14 +521,19 @@ public class BytesPartitionState
         public final ImmutableUniqueList<Symbol> primaryColumns;
         public final ImmutableUniqueList<Symbol> staticColumns;
         public final ImmutableUniqueList<Symbol> regularColumns;
-        public final ImmutableUniqueList<Symbol> selectionOrder, 
partitionAndStaticColumns, regularAndStaticColumns;
+        public final ImmutableUniqueList<Symbol> selectionOrder, 
partitionAndStaticColumns, clusteringAndRegularColumns, regularAndStaticColumns;
         public final ClusteringComparator clusteringComparator;
 
 
         // translation layer for harry interop
         private final BijectionCache<Clustering<ByteBuffer>> partitionCache = 
new BijectionCache<>(Reject.instance.as());
         private final BijectionCache<Clustering<ByteBuffer>> clusteringCache;
-        private final BijectionCache<Value> valueCache = new 
BijectionCache<>(Reject.instance.as());
+        private final BijectionCache<Value> valueCache = new 
BijectionCache<>((l, r) -> {
+            if (!l.type.equals(r.type))
+                throw new IllegalArgumentException("Unable to compare 
different types: " + l.type.asCQL3Type() + " != " + r.type.asCQL3Type());
+            // Cells resolve based off unsigned byte order and not type order
+            return ByteBufferUtil.compareUnsigned(l.value, r.value);
+        });
         private final ValueGenerators<Clustering<ByteBuffer>, 
Clustering<ByteBuffer>> valueGenerators;
 
         public Factory(TableMetadata metadata)
@@ -475,27 +549,27 @@ public class BytesPartitionState
             if (clusteringColumns.isEmpty()) primaryColumns = partitionColumns;
             else
             {
-                symbolListBuilder.addAll(partitionColumns);
-                symbolListBuilder.addAll(clusteringColumns);
-                primaryColumns = symbolListBuilder.buildAndClear();
+                primaryColumns = symbolListBuilder.addAll(partitionColumns)
+                                                  .addAll(clusteringColumns)
+                                                  .buildAndClear();
             }
-            for (ColumnMetadata pk : metadata.staticColumns())
-                symbolListBuilder.add(Symbol.from(pk));
+            metadata.staticColumns().selectOrderIterator().forEachRemaining(cm 
-> symbolListBuilder.add(Symbol.from(cm)));
             staticColumns = symbolListBuilder.buildAndClear();
             if (staticColumns.isEmpty()) partitionAndStaticColumns = 
partitionColumns;
             else
             {
-                symbolListBuilder.addAll(partitionColumns);
-                symbolListBuilder.addAll(staticColumns);
-                partitionAndStaticColumns = symbolListBuilder.buildAndClear();
+                partitionAndStaticColumns = 
symbolListBuilder.addAll(partitionColumns)
+                                                             
.addAll(staticColumns)
+                                                             .buildAndClear();
             }
-            for (ColumnMetadata pk : metadata.regularColumns())
-                symbolListBuilder.add(Symbol.from(pk));
+            
metadata.regularColumns().selectOrderIterator().forEachRemaining(cm -> 
symbolListBuilder.add(Symbol.from(cm)));
             regularColumns = symbolListBuilder.buildAndClear();
+            clusteringAndRegularColumns = 
symbolListBuilder.addAll(clusteringColumns)
+                                                           
.addAll(regularColumns)
+                                                           .buildAndClear();
             metadata.allColumnsInSelectOrder().forEachRemaining(cm -> 
symbolListBuilder.add(Symbol.from(cm)));
             selectionOrder = symbolListBuilder.buildAndClear();
-            metadata.regularAndStaticColumns().forEach(cm -> 
symbolListBuilder.add(Symbol.from(cm)));
-            regularAndStaticColumns = symbolListBuilder.buildAndClear();
+            regularAndStaticColumns = 
symbolListBuilder.addAll(staticColumns).addAll(regularColumns).buildAndClear();
 
             clusteringComparator = new 
ClusteringComparator(clusteringColumns.stream().map(Symbol::rawType).collect(Collectors.toList()));
 
@@ -569,6 +643,11 @@ public class BytesPartitionState
             return new BytesPartitionState.Ref(this, token, 
nullKeyGtMatchingToken);
         }
 
+        public PrimaryKey createPrimaryKey(Ref pk, @Nullable 
Clustering<ByteBuffer> cd)
+        {
+            return new BytesPartitionState.PrimaryKey(this, pk, cd);
+        }
+
         private PartitionState partitionState(Clustering<ByteBuffer> key)
         {
             return new PartitionState(partitionCache.deflate(key), 
valueGenerators);
diff --git 
a/test/harry/main/org/apache/cassandra/harry/model/PartitionState.java 
b/test/harry/main/org/apache/cassandra/harry/model/PartitionState.java
index cf4d70bd95..57ec2ad4af 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/PartitionState.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/PartitionState.java
@@ -236,8 +236,12 @@ public class PartitionState implements 
Iterable<PartitionState.RowState>
                 {
                     // Timestamp collision case
                     Bijections.Bijection<?> column = columns.apply(i);
-                    if (column.compare(vds[i], currentState.vds[i]) > 0)
+                    if (vds[i] == MagicConstants.NIL_DESCR // writing a null 
is the same as a tombstone, which has higher priority
+                        || (currentState.vds[i] != MagicConstants.NIL_DESCR
+                            && column.compare(vds[i], currentState.vds[i]) > 
0))
+                    {
                         currentState.vds[i] = vds[i];
+                    }
                 }
                 else
                 {
diff --git a/test/harry/main/org/apache/cassandra/harry/util/StringUtils.java 
b/test/harry/main/org/apache/cassandra/harry/util/StringUtils.java
index 0b3b94b73f..b33a407e52 100644
--- a/test/harry/main/org/apache/cassandra/harry/util/StringUtils.java
+++ b/test/harry/main/org/apache/cassandra/harry/util/StringUtils.java
@@ -35,7 +35,7 @@ public class StringUtils
         for (int i = 0; i < input.length(); i++)
         {
             char c = input.charAt(i);
-            if (Character.isISOControl(c))
+            if (Character.isISOControl(c) && c != '\n')
                 result.append(String.format("\\u%04X", (int) c));
             else
                 result.append(c);
diff --git a/test/unit/org/apache/cassandra/cql3/KnownIssue.java 
b/test/unit/org/apache/cassandra/cql3/KnownIssue.java
index be2dfe7524..4b2bec6d3c 100644
--- a/test/unit/org/apache/cassandra/cql3/KnownIssue.java
+++ b/test/unit/org/apache/cassandra/cql3/KnownIssue.java
@@ -43,6 +43,10 @@ public enum KnownIssue
                            "When doing an SAI query, if the where clause also 
contains a vector column bad results can be produced"),
     
CAS_CONDITION_ON_UDT_W_EMPTY_BYTES("https://issues.apache.org/jira/browse/CASSANDRA-20479";,
                                        "WHERE clause blocks operations on UDTs 
but CAS allows in IF clause.  During this path empty can be confused with null 
which allows non-existing rows to match empty bytes"),
+    CAS_ON_STATIC_ROW("",
+                      "When you do a CAS to the partition level the read is 
SELECT statics LIMIT 1, if the CAS doesn't apply the response includes the 
first row in the partition with its values redacted... this statement is 
partition level and not row level, would expect just the applied column like 
the other cases where the static row isn't present"),
+    STATIC_LIST_APPEND_WITH_CLUSTERING_IN("",
+                                          "When an 'UPDATE SET s += [0] WHERE 
pk = ? AND ck IN (?, ?)' happens the static operation happens twice, so the 
list append adds 2 elements!"),
     ;
 
     KnownIssue(String url, String description)
diff --git a/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java 
b/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java
index e3918f70da..499b36551f 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java
@@ -123,4 +123,10 @@ public class AssignmentOperator implements Expression
         if (r == right) return this;
         return new AssignmentOperator(kind, r);
     }
+
+    @Override
+    public String toString()
+    {
+        return debugCQL();
+    }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java 
b/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java
index d0d4d0e35b..c60e27a85d 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java
@@ -24,6 +24,11 @@ public interface CasCondition extends Element
 {
     CasCondition visit(Visitor v);
 
+    default String debugCQL()
+    {
+        return visit(StandardVisitors.DEBUG).toCQL();
+    }
+
     enum Simple implements CasCondition
     {
         NotExists("IF NOT EXISTS"),
@@ -80,5 +85,11 @@ public interface CasCondition extends Element
             if (c == conditional) return this;
             return new IfCondition(c);
         }
+
+        @Override
+        public String toString()
+        {
+            return toCQL();
+        }
     }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Conditional.java 
b/test/unit/org/apache/cassandra/cql3/ast/Conditional.java
index 66012c060c..4fc8a20850 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Conditional.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Conditional.java
@@ -50,6 +50,11 @@ public interface Conditional extends Expression
         return v.visit(this);
     }
 
+    default String debugCQL()
+    {
+        return visit(StandardVisitors.DEBUG).toCQL();
+    }
+
     default List<Conditional> simplify()
     {
         return Collections.singletonList(this);
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Elements.java 
b/test/unit/org/apache/cassandra/cql3/ast/Elements.java
index f750c4efc1..4713c30c15 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Elements.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Elements.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.cql3.ast;
 
+import java.util.stream.Stream;
+
 public final class Elements
 {
     private Elements()
@@ -29,4 +31,11 @@ public final class Elements
         for (int i = 0; i < indent; i++)
             sb.append(' ');
     }
+
+    public static Stream<Symbol> symbols(Element element)
+    {
+        return element.streamRecursive(true)
+                      .filter(e -> e instanceof Symbol)
+                      .map(e -> (Symbol) e);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Expression.java 
b/test/unit/org/apache/cassandra/cql3/ast/Expression.java
index 96bc41a78c..9a0e554968 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Expression.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Expression.java
@@ -32,4 +32,9 @@ public interface Expression extends Element
     {
         return v.visit(this);
     }
+
+    default String debugCQL()
+    {
+        return visit(StandardVisitors.DEBUG).toCQL();
+    }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Literal.java 
b/test/unit/org/apache/cassandra/cql3/ast/Literal.java
index 4bd2f9b631..eb6d83df41 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Literal.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Literal.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.StringType;
 
 public class Literal implements Value
@@ -41,6 +42,11 @@ public class Literal implements Value
         return new Literal(value, Int32Type.instance);
     }
 
+    public static Literal of(long value)
+    {
+        return new Literal(value, LongType.instance);
+    }
+
     @Override
     public AbstractType<?> type()
     {
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Mutation.java 
b/test/unit/org/apache/cassandra/cql3/ast/Mutation.java
index 95987dc657..7c764853e5 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Mutation.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Mutation.java
@@ -27,10 +27,10 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 
-import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.schema.ColumnMetadata;
@@ -50,6 +50,23 @@ public abstract class Mutation implements Statement
         this.table = table;
     }
 
+    public Insert asInsert()
+    {
+        return (Insert) this;
+    }
+
+    public Update asUpdate()
+    {
+        return (Update) this;
+    }
+
+    public Delete asDelete()
+    {
+        return (Delete) this;
+    }
+
+    public abstract long timestampOrDefault(long defaultValue);
+
     public abstract boolean isCas();
 
     public abstract Mutation withoutTimestamp();
@@ -61,6 +78,9 @@ public abstract class Mutation implements Statement
 
     public abstract Mutation withTimestamp(Timestamp timestamp);
 
+
+    public abstract Optional<? extends CasCondition> casCondition();
+
     public final Kind mutationKind()
     {
         return kind;
@@ -161,6 +181,13 @@ public abstract class Mutation implements Statement
         {
             return Stream.of(value);
         }
+
+        public long get()
+        {
+            if (value.value() instanceof Long)
+                return (long) value.value();
+            return LongType.instance.compose(value.valueEncoded());
+        }
     }
 
     public static class Using implements Element
@@ -168,15 +195,24 @@ public abstract class Mutation implements Statement
         public final Optional<TTL> ttl;
         public final Optional<Timestamp> timestamp;
 
-        public Using(Optional<TTL> ttl, Optional<Timestamp> timestamp)
+        private Using(Optional<TTL> ttl, Optional<Timestamp> timestamp)
         {
             this.ttl = ttl;
             this.timestamp = timestamp;
+            if (ttl.isEmpty() && timestamp.isEmpty())
+                throw new IllegalStateException("Empty USING isnt allowed");
+        }
+
+        public static Optional<Using> create(Optional<TTL> ttl, 
Optional<Timestamp> timestamp)
+        {
+            if (ttl.isEmpty() && timestamp.isEmpty()) return Optional.empty();
+            return Optional.of(new Using(ttl, timestamp));
         }
 
-        public Using withoutTimestamp()
+        public Optional<Using> withoutTimestamp()
         {
-            return new Using(ttl, Optional.empty());
+            if (ttl.isEmpty()) return Optional.empty();
+            return Optional.of(new Using(ttl, Optional.empty()));
         }
 
         public Using withTimestamp(Timestamp timestamp)
@@ -187,8 +223,6 @@ public abstract class Mutation implements Statement
         @Override
         public void toCQL(StringBuilder sb, CQLFormatter formatter)
         {
-            if (ttl.isEmpty() && timestamp.isEmpty())
-                return;
             sb.append("USING ");
             if (ttl.isPresent())
                 ttl.get().toCQL(sb, formatter);
@@ -227,6 +261,16 @@ public abstract class Mutation implements Statement
             this.using = using;
         }
 
+        @Override
+        public long timestampOrDefault(long defaultValue)
+        {
+            if (using.isEmpty()) return defaultValue;
+            var opt = using.get().timestamp;
+            if (opt.isEmpty()) return defaultValue;
+            var timestamp = opt.get();
+            return timestamp.get();
+        }
+
         @Override
         public void toCQL(StringBuilder sb, CQLFormatter formatter)
         {
@@ -311,7 +355,7 @@ public abstract class Mutation implements Statement
         {
             return new Insert(table, values, ifNotExists, using.isEmpty()
                                                           ? using
-                                                          : using.map(u -> 
u.withoutTimestamp()));
+                                                          : using.flatMap(u -> 
u.withoutTimestamp()));
         }
 
         @Override
@@ -321,6 +365,12 @@ public abstract class Mutation implements Statement
                                                           ? Optional.of(new 
Using(Optional.empty(), Optional.of(timestamp)))
                                                           : using.map(u -> 
u.withTimestamp(timestamp)));
         }
+
+        @Override
+        public Optional<? extends CasCondition> casCondition()
+        {
+            return ifNotExists ? Optional.of(CasCondition.Simple.NotExists) : 
Optional.empty();
+        }
     }
 
     public static class Update extends Mutation
@@ -339,6 +389,16 @@ public abstract class Mutation implements Statement
             this.casCondition = casCondition;
         }
 
+        @Override
+        public long timestampOrDefault(long defaultValue)
+        {
+            if (using.isEmpty()) return defaultValue;
+            var opt = using.get().timestamp;
+            if (opt.isEmpty()) return defaultValue;
+            var timestamp = opt.get();
+            return timestamp.get();
+        }
+
         @Override
         public void toCQL(StringBuilder sb, CQLFormatter formatter)
         {
@@ -444,7 +504,7 @@ public abstract class Mutation implements Statement
         @Override
         public Mutation withoutTimestamp()
         {
-            return new Update(table, using.isEmpty() ? using : using.map(u -> 
u.withoutTimestamp()), set, where, casCondition);
+            return new Update(table, using.isEmpty() ? using : using.flatMap(u 
-> u.withoutTimestamp()), set, where, casCondition);
         }
 
         @Override
@@ -455,6 +515,12 @@ public abstract class Mutation implements Statement
                           : using.map(u -> u.withTimestamp(timestamp));
             return new Update(table, updated, set, where, casCondition);
         }
+
+        @Override
+        public Optional<? extends CasCondition> casCondition()
+        {
+            return casCondition;
+        }
     }
 
     public static class Delete extends Mutation
@@ -477,6 +543,15 @@ public abstract class Mutation implements Statement
             this.casCondition = casCondition;
         }
 
+        @Override
+        public long timestampOrDefault(long defaultValue)
+        {
+            var opt = timestamp;
+            if (opt.isEmpty()) return defaultValue;
+            var timestamp = opt.get();
+            return timestamp.get();
+        }
+
         /*
 DELETE [column_name (term)][, ...]
 FROM [keyspace_name.] table_name
@@ -585,6 +660,12 @@ WHERE PK_column_conditions
         {
             return new Delete(columns, table, Optional.of(timestamp), where, 
casCondition);
         }
+
+        @Override
+        public Optional<? extends CasCondition> casCondition()
+        {
+            return casCondition;
+        }
     }
 
     public static abstract class BaseBuilder<T, B extends BaseBuilder<T, B>> 
implements Conditional.EqBuilderPlus<B>
@@ -612,6 +693,11 @@ WHERE PK_column_conditions
             neededPks.addAll(partitionColumns);
         }
 
+        protected Symbol find(String name)
+        {
+            return allColumns.stream().filter(s -> 
s.symbol.equals(name)).findAny().get();
+        }
+
         public abstract T build();
 
         @Override
@@ -678,6 +764,11 @@ WHERE PK_column_conditions
             return this;
         }
 
+        public InsertBuilder timestamp(long value)
+        {
+            return timestamp(Literal.of(value));
+        }
+
         public InsertBuilder timestamp(Value value)
         {
             this.timestamp = new Timestamp(value);
@@ -727,6 +818,11 @@ WHERE PK_column_conditions
             super(Kind.UPDATE, table);
         }
 
+        public UpdateBuilder timestamp(long value)
+        {
+            return timestamp(Literal.of(value));
+        }
+
         public UpdateBuilder timestamp(Value value)
         {
             this.timestamp = new Timestamp(value);
@@ -766,18 +862,32 @@ WHERE PK_column_conditions
 
         public UpdateBuilder set(String column, int value)
         {
-            return set(new Symbol(column, Int32Type.instance), Bind.of(value));
+            Symbol symbol = find(column);
+            if (!symbol.type().equals(Int32Type.instance))
+                throw new AssertionError("Expected int type but given " + 
symbol.type().asCQL3Type());
+            return set(symbol, Bind.of(value));
+        }
+
+        public UpdateBuilder set(String column, Object value)
+        {
+            Symbol symbol = find(column);
+            return set(symbol, new Bind(value, symbol.type()));
         }
 
         public UpdateBuilder set(String column, Expression expression)
         {
-            Symbol symbol = new Symbol(metadata.getColumn(new 
ColumnIdentifier(column, true)));
-            return set(symbol, expression);
+            return set(find(column), expression);
+        }
+
+        public UpdateBuilder set(String column, Function<Symbol, Expression> 
fn)
+        {
+            Symbol symbol = find(column);
+            return set(symbol, fn.apply(symbol));
         }
 
         public UpdateBuilder set(String column, String value)
         {
-            Symbol symbol = new Symbol(metadata.getColumn(new 
ColumnIdentifier(column, true)));
+            Symbol symbol = find(column);
             return set(symbol, new 
Bind(symbol.type().asCQL3Type().fromCQLLiteral(value), symbol.type()));
         }
 
@@ -857,9 +967,15 @@ WHERE PK_column_conditions
             return Collections.unmodifiableList(columns);
         }
 
+        public DeleteBuilder columns(String... names)
+        {
+            Stream.of(names).map(this::find).forEach(this::column);
+            return this;
+        }
+
         public DeleteBuilder column(String columnName)
         {
-            return column(Symbol.from(metadata.getColumn(new 
ColumnIdentifier(columnName, true))));
+            return column(find(columnName));
         }
 
         public DeleteBuilder column(Symbol symbol)
@@ -881,6 +997,11 @@ WHERE PK_column_conditions
             return this;
         }
 
+        public DeleteBuilder timestamp(long value)
+        {
+            return timestamp(Literal.of(value));
+        }
+
         public DeleteBuilder timestamp(Value value)
         {
             this.timestamp = new Timestamp(value);
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Operator.java 
b/test/unit/org/apache/cassandra/cql3/ast/Operator.java
index d0baa10ec0..35d745142d 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Operator.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Operator.java
@@ -103,4 +103,10 @@ public class Operator implements Expression
         if (left == this.left && right == this.right) return this;
         return new Operator(kind, left, right);
     }
+
+    @Override
+    public String toString()
+    {
+        return visit(StandardVisitors.DEBUG).toCQL();
+    }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Select.java 
b/test/unit/org/apache/cassandra/cql3/ast/Select.java
index 28134dde8e..10d98dee63 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Select.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Select.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ImmutableUniqueList;
 
 public class Select implements Statement
 {
@@ -479,11 +480,15 @@ FROM [keyspace_name.] table_name
     public static class TableBasedBuilder extends 
BaseBuilder<TableBasedBuilder> implements 
Conditional.ConditionalBuilderPlus<TableBasedBuilder>
     {
         private final TableMetadata metadata;
+        private final ImmutableUniqueList<Symbol> columns;
 
         public TableBasedBuilder(TableMetadata metadata)
         {
             this.metadata = metadata;
             source = Optional.of(TableReference.from(metadata));
+            var builder = ImmutableUniqueList.<Symbol>builder();
+            metadata.allColumnsInSelectOrder().forEachRemaining(c -> 
builder.add(Symbol.from(c)));
+            columns = builder.buildAndClear();
         }
 
         @Override
@@ -491,5 +496,15 @@ FROM [keyspace_name.] table_name
         {
             return metadata;
         }
+
+        private Symbol find(String name)
+        {
+            return columns.stream().filter(s -> 
s.symbol.equals(name)).findAny().get();
+        }
+
+        public TableBasedBuilder columnSelection(String name)
+        {
+            return selection(find(name));
+        }
     }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/ast/StandardVisitors.java 
b/test/unit/org/apache/cassandra/cql3/ast/StandardVisitors.java
index 4cbf3d989f..85c8a8bc70 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/StandardVisitors.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/StandardVisitors.java
@@ -30,6 +30,16 @@ public class StandardVisitors
             return new Literal(b.value(), b.type());
         }
     };
+    public static final Visitor LITERAL_TO_BIND = new Visitor()
+    {
+        @Override
+        public Value visit(Value v)
+        {
+            if (!(v instanceof Literal)) return v;
+            Literal b = (Literal) v;
+            return new Bind(b.value(), b.type());
+        }
+    };
 
     public static final Visitor UNWRAP_TYPE_HINT = new Visitor()
     {
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Statement.java 
b/test/unit/org/apache/cassandra/cql3/ast/Statement.java
index ffcba03465..fd9a9ab2a9 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Statement.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Statement.java
@@ -51,12 +51,17 @@ public interface Statement extends Element
     {
         Object[] binds = binds();
         return "CQL:\n" + toCQL() + "\nBinds:\n" + IntStream.range(0, 
binds.length)
-                                                            .mapToObj(i -> i + 
" -> " + binds[i].getClass().getCanonicalName() + "(" + normalize(binds[i]) + 
")")
+                                                            .mapToObj(i -> i + 
" -> " + binds[i] == null ? "null" : binds[i].getClass().getCanonicalName() + 
"(" + normalize(binds[i]) + ")")
                                                             
.collect(Collectors.joining("\n"));
     }
 
     Statement visit(Visitor v);
 
+    default String debugCQL()
+    {
+        return visit(StandardVisitors.DEBUG).toCQL();
+    }
+
     static boolean hasByteBuffer(Object value)
     {
         if (value == null)
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Value.java 
b/test/unit/org/apache/cassandra/cql3/ast/Value.java
index 92ef010183..ab467c3b83 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Value.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Value.java
@@ -37,4 +37,9 @@ public interface Value extends Expression
     {
         return v.visit(this);
     }
+
+    default String debugCQL()
+    {
+        return visit(StandardVisitors.DEBUG).toCQL();
+    }
 }
diff --git a/test/unit/org/apache/cassandra/utils/ASTGenerators.java 
b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
index cdf533cb42..1279f60cb7 100644
--- a/test/unit/org/apache/cassandra/utils/ASTGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
@@ -61,6 +61,7 @@ import org.apache.cassandra.cql3.ast.Value;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
@@ -373,6 +374,10 @@ public class ASTGenerators
         private BiFunction<RandomnessSource, List<Symbol>, List<Symbol>> 
ifConditionFilter = (rnd, symbols) -> symbols;
         private Gen<DeleteKind> deleteKindGen = 
SourceDSL.arbitrary().enumValues(DeleteKind.class);
         private Map<Symbol, ExpressionBuilder> columnExpressions = new 
LinkedHashMap<>();
+        private boolean allowPartitionOnlyUpdate = true;
+        private boolean allowPartitionOnlyInsert = true;
+        private boolean allowUpdateMultipleClusteringKeys = true;
+        private EnumSet<KnownIssue> ignoreIssues = IGNORE_ISSUES;
 
         public MutationGenBuilder(TableMetadata metadata)
         {
@@ -391,6 +396,30 @@ public class ASTGenerators
                 columnExpressions.put(symbol, new 
ExpressionBuilder(symbol.type()));
         }
 
+        public MutationGenBuilder withIgnoreIssues(EnumSet<KnownIssue> 
ignoreIssues)
+        {
+            this.ignoreIssues = Objects.requireNonNull(ignoreIssues);
+            return this;
+        }
+
+        public MutationGenBuilder withAllowPartitionOnlyUpdate(boolean value)
+        {
+            this.allowPartitionOnlyUpdate = value;
+            return this;
+        }
+
+        public MutationGenBuilder withAllowPartitionOnlyInsert(boolean value)
+        {
+            this.allowPartitionOnlyInsert = value;
+            return this;
+        }
+
+        public MutationGenBuilder 
withAllowUpdateMultipleClusteringKeys(boolean allowUpdateMultipleClusteringKeys)
+        {
+            this.allowUpdateMultipleClusteringKeys = 
allowUpdateMultipleClusteringKeys;
+            return this;
+        }
+
         public MutationGenBuilder 
withColumnExpressions(Consumer<ExpressionBuilder> fn)
         {
             for (Symbol symbol : allColumns)
@@ -534,16 +563,45 @@ public class ASTGenerators
             }
             else
             {
-                //TODO (coverage): support IN rather than just EQ
                 for (Symbol s : columns)
                     builder.value(s, 
columnExpressions.get(s).build().generate(rnd));
             }
         }
 
+        private static void where(RandomnessSource rnd,
+                                  Map<Symbol, ExpressionBuilder> 
columnExpressions,
+                                  Conditional.ConditionalBuilder<?> builder,
+                                  LinkedHashSet<Symbol> columns,
+                                  @Nullable Gen<? extends Map<Symbol, Object>> 
gen)
+        {
+            if (gen != null)
+            {
+                Map<Symbol, Object> map = gen.generate(rnd);
+                for (Map.Entry<Symbol, ?> e : 
assertDeterministic(map).entrySet())
+                    builder.value(e.getKey(), valueGen(e.getValue(), 
e.getKey().type()).generate(rnd));
+                return;
+            }
+
+            for (Symbol s : columns)
+            {
+                if (SourceDSL.booleans().all().generate(rnd))
+                {
+                    builder.value(s, 
columnExpressions.get(s).build().generate(rnd));
+                    continue;
+                }
+                var valueGen = columnExpressions.get(s).build();
+                builder.in(s, SourceDSL.lists().of(valueGen).ofSizeBetween(1, 
3).generate(rnd));
+            }
+        }
+
         public Gen<Mutation> build()
         {
             Gen<Boolean> bool = SourceDSL.booleans().all();
             Map<? extends AbstractType<?>, List<Reference>> typeToReference = 
references.stream().collect(Collectors.groupingBy(Reference::type));
+            if (allowUpdateMultipleClusteringKeys
+                && 
ignoreIssues.contains(KnownIssue.STATIC_LIST_APPEND_WITH_CLUSTERING_IN)
+                && staticColumns.stream().anyMatch(s -> s.type().isMultiCell() 
&& s.type().getClass() == ListType.class))
+                allowUpdateMultipleClusteringKeys = false;
             return rnd -> {
                 Mutation.Kind kind = kindGen.generate(rnd);
                 // when there are not non-primary-columns then can't support 
UPDATE
@@ -572,6 +630,12 @@ public class ASTGenerators
                         if (timestamp.isPresent())
                             builder.timestamp(valueGen(timestamp.getAsLong(), 
LongType.instance).generate(rnd));
                         values(rnd, columnExpressions, builder, 
partitionColumns, partitionValueGen);
+                        if (!staticColumns.isEmpty() && 
allowPartitionOnlyInsert && bool.generate(rnd))
+                        {
+                            var columnsToGenerate = new 
LinkedHashSet<>(subset(rnd, staticColumns));
+                            generateRemaining(rnd, bool, Mutation.Kind.INSERT, 
isTransaction, typeToReference, builder, columnsToGenerate);
+                            return builder.build();
+                        }
                         values(rnd, columnExpressions, builder, 
clusteringColumns, clusteringValueGen);
                         LinkedHashSet<Symbol> columnsToGenerate;
                         if (regularAndStaticColumns.isEmpty())
@@ -601,6 +665,35 @@ public class ASTGenerators
                         var timestamp = timestampGen.generate(rnd);
                         if (timestamp.isPresent())
                             builder.timestamp(valueGen(timestamp.getAsLong(), 
LongType.instance).generate(rnd));
+                        if (allowUpdateMultipleClusteringKeys)
+                            where(rnd, columnExpressions, builder, 
partitionColumns, partitionValueGen);
+                        else
+                            values(rnd, columnExpressions, builder, 
partitionColumns, partitionValueGen);
+
+                        if (!staticColumns.isEmpty() && 
allowPartitionOnlyUpdate && bool.generate(rnd))
+                        {
+                            var columnsToGenerate = new 
LinkedHashSet<>(subset(rnd, staticColumns));
+                            Conditional.EqBuilder<Mutation.UpdateBuilder> 
setBuilder = builder::set;
+                            generateRemaining(rnd, bool, Mutation.Kind.UPDATE, 
isTransaction, typeToReference, setBuilder, columnsToGenerate);
+
+                            if (isCas)
+                            {
+                                if (useCasIf.generate(rnd))
+                                {
+                                    ifGen(new 
ArrayList<>(staticColumns)).generate(rnd).ifPresent(c -> 
builder.ifCondition(c));
+                                }
+                                else
+                                {
+                                    builder.ifExists();
+                                }
+                            }
+                            return builder.build();
+                        }
+                        if (allowUpdateMultipleClusteringKeys)
+                            where(rnd, columnExpressions, builder, 
clusteringColumns, clusteringValueGen);
+                        else
+                            values(rnd, columnExpressions, builder, 
clusteringColumns, clusteringValueGen);
+
                         if (isCas)
                         {
                             if (useCasIf.generate(rnd))
@@ -612,8 +705,6 @@ public class ASTGenerators
                                 builder.ifExists();
                             }
                         }
-                        values(rnd, columnExpressions, builder, 
partitionColumns, partitionValueGen);
-                        values(rnd, columnExpressions, builder, 
clusteringColumns, clusteringValueGen);
 
                         LinkedHashSet<Symbol> columnsToGenerate;
                         if (regularAndStaticColumns.size() == 1 || 
bool.generate(rnd))
diff --git a/test/unit/org/apache/cassandra/utils/ImmutableUniqueList.java 
b/test/unit/org/apache/cassandra/utils/ImmutableUniqueList.java
index 00fabea136..d4b7393dcd 100644
--- a/test/unit/org/apache/cassandra/utils/ImmutableUniqueList.java
+++ b/test/unit/org/apache/cassandra/utils/ImmutableUniqueList.java
@@ -26,8 +26,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.RandomAccess;
 
-import com.google.common.collect.Iterators;
-
 import org.agrona.collections.Object2IntHashMap;
 
 public class ImmutableUniqueList<T> extends AbstractList<T> implements 
RandomAccess
@@ -43,6 +41,12 @@ public class ImmutableUniqueList<T> extends AbstractList<T> 
implements RandomAcc
         indexLookup = new Object2IntHashMap<>(builder.indexLookup);
     }
 
+    public static <T> ImmutableUniqueList<T> copyOf(Collection<T> collection)
+    {
+        if (collection instanceof ImmutableUniqueList) return 
(ImmutableUniqueList<T>) collection;
+        return ImmutableUniqueList.<T>builder().addAll(collection).build();
+    }
+
     public static <T> Builder<T> builder()
     {
         return new Builder<>();
@@ -95,7 +99,7 @@ public class ImmutableUniqueList<T> extends AbstractList<T> 
implements RandomAcc
         return values.length;
     }
 
-    public static final class Builder<T> extends AbstractSet<T>
+    public static final class Builder<T>
     {
         private final List<T> values;
         private final Object2IntHashMap<T> indexLookup = new 
Object2IntHashMap<>(-1);
@@ -111,29 +115,21 @@ public class ImmutableUniqueList<T> extends 
AbstractList<T> implements RandomAcc
             this.values = new ArrayList<>(expectedSize);
         }
 
-        public Builder<T> mayAddAll(Collection<? extends T> values)
-        {
-            addAll(values);
-            return this;
-        }
-
-        @Override
-        public boolean add(T t)
+        public Builder<T> add(T t)
         {
-            if (indexLookup.containsKey(t)) return false;
+            if (indexLookup.containsKey(t)) return this;
             int idx = this.idx++;
             indexLookup.put(t, idx);
             values.add(t);
-            return true;
+            return this;
         }
 
-        @Override
-        public boolean remove(Object o)
+        public Builder<T> addAll(Collection<? extends T> c)
         {
-            throw new UnsupportedOperationException();
+            c.forEach(this::add);
+            return this;
         }
 
-        @Override
         public void clear()
         {
             values.clear();
@@ -141,30 +137,6 @@ public class ImmutableUniqueList<T> extends 
AbstractList<T> implements RandomAcc
             idx = 0;
         }
 
-        @Override
-        public boolean isEmpty()
-        {
-            return values.isEmpty();
-        }
-
-        @Override
-        public boolean contains(Object o)
-        {
-            return indexLookup.containsKey(o);
-        }
-
-        @Override
-        public Iterator<T> iterator()
-        {
-            return Iterators.unmodifiableIterator(values.iterator());
-        }
-
-        @Override
-        public int size()
-        {
-            return values.size();
-        }
-
         public ImmutableUniqueList<T> build()
         {
             return new ImmutableUniqueList<>(this);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to